Compatibility with psycopg 3.0 (#2088)

By default `psycopg2` is preferred. The `psycopg>=3.0` will be used only if `psycopg2` is not available or its version is too old.
This commit is contained in:
Alexander Kukushkin
2021-11-19 14:32:54 +01:00
committed by GitHub
parent edfe2a84e9
commit fce889cd04
30 changed files with 231 additions and 138 deletions

View File

@@ -18,7 +18,10 @@ def install_requirements(what):
finally:
sys.path = old_path
requirements = ['mock>=2.0.0', 'flake8', 'pytest', 'pytest-cov'] if what == 'all' else ['behave']
requirements += ['psycopg2-binary', 'coverage']
requirements += ['coverage']
# try to split tests between psycopg2 and psycopg3
requirements += ['psycopg[binary]'] if sys.version_info >= (3, 6, 0) and\
(sys.platform != 'darwin' or what == 'etcd3') else ['psycopg2-binary']
for r in read('requirements.txt').split('\n'):
r = r.strip()
if r != '':

View File

@@ -61,7 +61,7 @@ To install requirements on a Mac, run the following:
brew install postgresql etcd haproxy libyaml python
**Psycopg2**
**Psycopg**
Starting from `psycopg2-2.8 <http://initd.org/psycopg/articles/2019/04/04/psycopg-28-released/>`__ the binary version of psycopg2 will no longer be installed by default. Installing it from the source code requires C compiler and postgres+python dev packages.
Since in the python world it is not possible to specify dependency as ``psycopg2 OR psycopg2-binary`` you will have to decide how to install it.
@@ -88,6 +88,12 @@ There are a few options available:
pip install psycopg2>=2.5.4
4. Use psycopg 3.0 instead of psycopg2
::
pip install psycopg[binary]
**General installation for pip**
Patroni can be installed with pip:

View File

@@ -35,7 +35,7 @@ To install requirements on a Mac, run the following:
.. _psycopg2_install_options:
**Psycopg2**
**Psycopg**
Starting from `psycopg2-2.8 <http://initd.org/psycopg/articles/2019/04/04/psycopg-28-released/>`__ the binary version of psycopg2 will no longer be installed by default. Installing it from the source code requires C compiler and postgres+python dev packages.
Since in the python world it is not possible to specify dependency as ``psycopg2 OR psycopg2-binary`` you will have to decide how to install it.
@@ -62,6 +62,12 @@ There are a few options available:
pip install psycopg2>=2.5.4
4. Use psycopg 3.0 instead of psycopg2
::
pip install psycopg[binary]>=3.0.0
**General installation for pip**
Patroni can be installed with pip:

View File

@@ -1,7 +1,6 @@
import abc
import datetime
import os
import psycopg2
import json
import shutil
import signal
@@ -13,6 +12,8 @@ import threading
import time
import yaml
import patroni.psycopg as psycopg
from six.moves.BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
@@ -205,16 +206,16 @@ class PatroniController(AbstractController):
user = config['postgresql'].get('authentication', config['postgresql']).get('superuser', {})
self._connkwargs = {k: user[n] for n, k in [('username', 'user'), ('password', 'password')] if n in user}
self._connkwargs.update({'host': host, 'port': self.__PORT, 'database': 'postgres'})
self._connkwargs.update({'host': host, 'port': self.__PORT, 'dbname': 'postgres'})
self._replication = config['postgresql'].get('authentication', config['postgresql']).get('replication', {})
self._replication.update({'host': host, 'port': self.__PORT, 'database': 'postgres'})
self._replication.update({'host': host, 'port': self.__PORT, 'dbname': 'postgres'})
return patroni_config_path
def _connection(self):
if not self._conn or self._conn.closed != 0:
self._conn = psycopg2.connect(**self._connkwargs)
self._conn = psycopg.connect(**self._connkwargs)
self._conn.autocommit = True
return self._conn
@@ -228,7 +229,7 @@ class PatroniController(AbstractController):
cursor = self._cursor()
cursor.execute(query)
return cursor
except psycopg2.Error:
except psycopg.Error:
if not fail_ok:
raise
@@ -268,7 +269,7 @@ class PatroniController(AbstractController):
@property
def backup_source(self):
return 'postgres://{username}:{password}@{host}:{port}/{database}'.format(**self._replication)
return 'postgres://{username}:{password}@{host}:{port}/{dbname}'.format(**self._replication)
def backup(self, dest=os.path.join('data', 'basebackup')):
subprocess.call(PatroniPoolController.BACKUP_SCRIPT + ['--walmethod=none',

View File

@@ -1,4 +1,4 @@
import psycopg2 as pg
import patroni.psycopg as pg
from behave import step, then
from time import sleep, time

View File

@@ -1,7 +1,7 @@
import time
import psycopg2
from behave import step, then
import patroni.psycopg as pg
@step('I create a logical replication slot {slot_name} on {pg_name:w} with the {plugin:w} plugin')
@@ -10,7 +10,7 @@ def create_logical_replication_slot(context, slot_name, pg_name, plugin):
output = context.pctl.query(pg_name, ("SELECT pg_create_logical_replication_slot('{0}', '{1}'),"
" current_database()").format(slot_name, plugin))
print(output.fetchone())
except psycopg2.Error as e:
except pg.Error as e:
print(e)
assert False, "Error creating slot {0} on {1} with plugin {2}".format(slot_name, pg_name, plugin)
@@ -24,7 +24,7 @@ def has_logical_replication_slot(context, pg_name, slot_name, plugin):
assert row[0] == "logical", "Found replication slot named {0} but wasn't a logical slot".format(slot_name)
assert row[1] == plugin, ("Found replication slot named {0} but was using plugin "
"{1} rather than {2}").format(slot_name, row[1], plugin)
except psycopg2.Error:
except pg.Error:
assert False, "Error looking for slot {0} on {1} with plugin {2}".format(slot_name, pg_name, plugin)
@@ -34,7 +34,7 @@ def does_not_have_logical_replication_slot(context, pg_name, slot_name):
row = context.pctl.query(pg_name, ("SELECT 1 FROM pg_replication_slots"
" WHERE slot_name = '{0}'").format(slot_name)).fetchone()
assert not row, "Found unexpected replication slot named {0}".format(slot_name)
except psycopg2.Error:
except pg.Error:
assert False, "Error looking for slot {0} on {1}".format(slot_name, pg_name)

View File

@@ -11,6 +11,7 @@ logger = logging.getLogger(__name__)
PATRONI_ENV_PREFIX = 'PATRONI_'
KUBERNETES_ENV_PREFIX = 'KUBERNETES_'
MIN_PSYCOPG2 = (2, 5, 4)
class Patroni(AbstractPatroniDaemon):
@@ -144,30 +145,40 @@ def fatal(string, *args):
sys.exit(1)
def check_psycopg2():
min_psycopg2 = (2, 5, 4)
min_psycopg2_str = '.'.join(map(str, min_psycopg2))
def parse_version(version):
def parse_version(version):
def _parse_version(version):
for e in version.split('.'):
try:
yield int(e)
except ValueError:
break
return tuple(_parse_version(version.split(' ')[0]))
# We pass MIN_PSYCOPG2 and parse_version as arguments to simplify usage of check_psycopg from the setup.py
def check_psycopg(_min_psycopg2=MIN_PSYCOPG2, _parse_version=parse_version):
min_psycopg2_str = '.'.join(map(str, _min_psycopg2))
try:
import psycopg2
version_str = psycopg2.__version__.split(' ')[0]
version = tuple(parse_version(version_str))
if version < min_psycopg2:
fatal('Patroni requires psycopg2>={0}, but only {1} is available', min_psycopg2_str, version_str)
from psycopg2 import __version__
if _parse_version(__version__) >= _min_psycopg2:
return
version_str = __version__.split(' ')[0]
except ImportError:
fatal('Patroni requires psycopg2>={0} or psycopg2-binary', min_psycopg2_str)
version_str = None
try:
from psycopg import __version__
except ImportError:
error = 'Patroni requires psycopg2>={0}, psycopg2-binary, or psycopg>=3.0'.format(min_psycopg2_str)
if version_str:
error += ', but only psycopg2=={0} is available'.format(version_str)
fatal(error)
def main():
if os.getpid() != 1:
check_psycopg2()
check_psycopg()
return patroni_main()
# Patroni started with PID=1, it looks like we are in the container

View File

@@ -2,7 +2,6 @@ import base64
import hmac
import json
import logging
import psycopg2
import time
import traceback
import dateutil.parser
@@ -18,6 +17,7 @@ from six.moves.socketserver import ThreadingMixIn
from six.moves.urllib_parse import urlparse, parse_qs
from threading import Thread
from . import psycopg
from .exceptions import PostgresConnectionException, PostgresException
from .postgresql.misc import postgres_version_to_int
from .utils import deep_compare, enable_keepalive, parse_bool, patch_config, Retry, \
@@ -625,7 +625,7 @@ class RestApiHandler(BaseHTTPRequestHandler):
if row[7]:
result['replication'] = row[7]
except (psycopg2.Error, RetryFailedError, PostgresConnectionException):
except (psycopg.Error, RetryFailedError, PostgresConnectionException):
state = postgresql.state
if state == 'running':
logger.exception('get_postgresql_status')
@@ -665,7 +665,7 @@ class RestApiServer(ThreadingMixIn, HTTPServer, Thread):
with self.patroni.postgresql.connection().cursor() as cursor:
cursor.execute(sql, params)
return [r for r in cursor]
except psycopg2.Error as e:
except psycopg.Error as e:
if cursor and cursor.connection.closed == 0:
raise e
raise PostgresConnectionException('connection problems')

View File

@@ -264,13 +264,13 @@ def get_cursor(cluster, connect_parameters, role='master', member=None):
params = member.conn_kwargs(connect_parameters)
params.update({'fallback_application_name': 'Patroni ctl', 'connect_timeout': '5'})
if 'database' in connect_parameters:
params['database'] = connect_parameters['database']
if 'dbname' in connect_parameters:
params['dbname'] = connect_parameters['dbname']
else:
params.pop('database')
params.pop('dbname')
import psycopg2
conn = psycopg2.connect(**params)
from . import psycopg
conn = psycopg.connect(**params)
conn.autocommit = True
cursor = conn.cursor()
if role == 'any':
@@ -401,7 +401,7 @@ def query(
if password:
connect_parameters['password'] = click.prompt('Password', hide_input=True, type=str)
if dbname:
connect_parameters['database'] = dbname
connect_parameters['dbname'] = dbname
if p_file is not None:
command = p_file.read()
@@ -418,7 +418,7 @@ def query(
def query_member(cluster, cursor, member, role, command, connect_parameters):
import psycopg2
from . import psycopg
try:
if cursor is None:
cursor = get_cursor(cluster, connect_parameters, role=role, member=member)
@@ -433,11 +433,11 @@ def query_member(cluster, cursor, member, role, command, connect_parameters):
cursor.execute(command)
return cursor.fetchall(), [d.name for d in cursor.description]
except (psycopg2.OperationalError, psycopg2.DatabaseError) as oe:
logging.debug(oe)
except psycopg.DatabaseError as de:
logging.debug(de)
if cursor is not None and not cursor.connection.closed:
cursor.connection.close()
message = oe.pgcode or oe.pgerror or str(oe)
message = de.diag.sqlstate or str(de)
message = message.replace('\n', ' ')
return [[timestamp(0), 'ERROR, SQLSTATE: {0}'.format(message)]], None

View File

@@ -160,7 +160,7 @@ class Member(namedtuple('Member', 'index,name,session,data')):
defaults = {
"host": None,
"port": None,
"database": None
"dbname": None
}
ret = self.data.get('conn_kwargs')
if ret:
@@ -174,7 +174,7 @@ class Member(namedtuple('Member', 'index,name,session,data')):
ret = {
'host': r.hostname,
'port': r.port or 5432,
'database': r.path[1:]
'dbname': r.path[1:]
}
self.data['conn_kwargs'] = ret.copy()

View File

@@ -2,7 +2,6 @@ import datetime
import functools
import json
import logging
import psycopg2
import six
import sys
import time
@@ -10,15 +9,17 @@ import uuid
from collections import namedtuple
from multiprocessing.pool import ThreadPool
from patroni.async_executor import AsyncExecutor, CriticalTask
from patroni.exceptions import DCSError, PostgresConnectionException, PatroniFatalException
from patroni.postgresql import ACTION_ON_START, ACTION_ON_ROLE_CHANGE
from patroni.postgresql.misc import postgres_version_to_int
from patroni.postgresql.rewind import Rewind
from patroni.utils import polling_loop, tzutc, is_standby_cluster as _is_standby_cluster, parse_int
from patroni.dcs import RemoteMember
from threading import RLock
from . import psycopg
from .async_executor import AsyncExecutor, CriticalTask
from .exceptions import DCSError, PostgresConnectionException, PatroniFatalException
from .postgresql import ACTION_ON_START, ACTION_ON_ROLE_CHANGE
from .postgresql.misc import postgres_version_to_int
from .postgresql.rewind import Rewind
from .utils import polling_loop, tzutc, is_standby_cluster as _is_standby_cluster, parse_int
from .dcs import RemoteMember
logger = logging.getLogger(__name__)
@@ -1491,7 +1492,7 @@ class Ha(object):
self.demote('offline')
return 'demoted self because DCS is not accessible and i was a leader'
return 'DCS is not accessible'
except (psycopg2.Error, PostgresConnectionException):
except (psycopg.Error, PostgresConnectionException):
return 'Error communicating with PostgreSQL. Will try again later'
finally:
if not dcs_failed:

View File

@@ -1,6 +1,5 @@
import logging
import os
import psycopg2
import re
import shlex
import shutil
@@ -23,6 +22,7 @@ from .connection import Connection, get_connection_cursor
from .misc import parse_history, parse_lsn, postgres_major_version_to_int
from .postmaster import PostmasterProcess
from .slots import SlotsHandler
from .. import psycopg
from ..exceptions import PostgresConnectionException
from ..utils import Retry, RetryFailedError, polling_loop, data_directory_is_empty, parse_int
@@ -266,13 +266,13 @@ class Postgresql(object):
cursor = self._connection.cursor()
cursor.execute(sql, params)
return cursor
except psycopg2.Error as e:
except psycopg.Error as e:
if cursor and cursor.connection.closed == 0:
# When connected via unix socket, psycopg2 can't recoginze 'connection lost'
# and leaves `_cursor_holder.connection.closed == 0`, but psycopg2.OperationalError
# is still raised (what is correct). It doesn't make sense to continiue with existing
# connection and we will close it, to avoid its reuse by the `cursor` method.
if isinstance(e, psycopg2.OperationalError):
if isinstance(e, psycopg.OperationalError):
self._connection.close()
else:
raise e
@@ -598,7 +598,7 @@ class Postgresql(object):
if cur.fetchone()[0]:
return 'is_in_recovery=true'
return cur.execute('CHECKPOINT')
except psycopg2.Error:
except psycopg.Error:
logger.exception('Exception during CHECKPOINT')
return 'not accessible or not healty'
@@ -706,7 +706,7 @@ class Postgresql(object):
while postmaster.is_running(): # Need a timeout here?
cur.execute("SELECT 1")
time.sleep(STOP_POLLING_INTERVAL)
except psycopg2.Error:
except psycopg.Error:
pass
def reload(self, block_callbacks=False):
@@ -979,7 +979,7 @@ class Postgresql(object):
with self.connection().cursor() as cursor:
cursor.execute(query)
return cursor.fetchone()[0].isoformat(sep=' ')
except psycopg2.Error:
except psycopg.Error:
return None
def last_operation(self):

View File

@@ -4,10 +4,12 @@ import shlex
import tempfile
import time
from patroni.dcs import RemoteMember
from patroni.utils import deep_compare
from six import string_types
from ..dcs import RemoteMember
from ..psycopg import quote_ident, quote_literal
from ..utils import deep_compare
logger = logging.getLogger(__name__)
@@ -297,26 +299,24 @@ class Bootstrap(object):
if 'NOLOGIN' not in options and 'LOGIN' not in options:
options.append('LOGIN')
params = [name]
if password:
options.extend(['PASSWORD', '%s'])
params.extend([password, password])
options.extend(['PASSWORD', quote_literal(password)])
sql = """DO $$
BEGIN
SET local synchronous_commit = 'local';
PERFORM * FROM pg_authid WHERE rolname = %s;
PERFORM * FROM pg_catalog.pg_authid WHERE rolname = {0};
IF FOUND THEN
ALTER ROLE "{0}" WITH {1};
ALTER ROLE {1} WITH {2};
ELSE
CREATE ROLE "{0}" WITH {1};
CREATE ROLE {1} WITH {2};
END IF;
END;$$""".format(name, ' '.join(options))
END;$$""".format(quote_literal(name), quote_ident(name, self._postgresql.connection()), ' '.join(options))
self._postgresql.query('SET log_statement TO none')
self._postgresql.query('SET log_min_duration_statement TO -1')
self._postgresql.query("SET log_min_error_statement TO 'log'")
try:
self._postgresql.query(sql, *params)
self._postgresql.query(sql)
finally:
self._postgresql.query('RESET log_min_error_statement')
self._postgresql.query('RESET log_min_duration_statement')
@@ -342,8 +342,8 @@ END;$$""".format(name, ' '.join(options))
sql = """DO $$
BEGIN
SET local synchronous_commit = 'local';
GRANT EXECUTE ON function pg_catalog.{0} TO "{1}";
END;$$""".format(f, rewind['username'])
GRANT EXECUTE ON function pg_catalog.{0} TO {1};
END;$$""".format(f, quote_ident(rewind['username'], self._postgresql.connection()))
postgresql.query(sql)
for name, value in (config.get('users') or {}).items():

View File

@@ -12,6 +12,7 @@ from .validator import CaseInsensitiveDict, recovery_parameters,\
transform_postgresql_parameter_value, transform_recovery_parameter_value
from ..dcs import slot_name_from_member_name, RemoteMember
from ..exceptions import PatroniFatalException
from ..psycopg import quote_ident as _quote_ident
from ..utils import compare_values, parse_bool, parse_int, split_host_port, uri, \
validate_directory, is_subpath
@@ -23,7 +24,7 @@ PARAMETER_RE = re.compile(r'([a-z_]+)\s*=\s*')
def quote_ident(value):
"""Very simplified version of quote_ident"""
return value if SYNC_STANDBY_NAME_RE.match(value) else '"' + value + '"'
return value if SYNC_STANDBY_NAME_RE.match(value) else _quote_ident(value)
def conninfo_uri_parse(dsn):
@@ -477,8 +478,8 @@ class ConfigHandler(object):
ret.setdefault('channel_binding', 'prefer')
if self._krbsrvname:
ret['krbsrvname'] = self._krbsrvname
if 'database' in ret:
del ret['database']
if 'dbname' in ret:
del ret['dbname']
return ret
def format_dsn(self, params, include_dbname=False):
@@ -488,7 +489,8 @@ class ConfigHandler(object):
'sslcrldir', 'application_name', 'krbsrvname', 'gssencmode', 'channel_binding')
if include_dbname:
params = params.copy()
params['dbname'] = params.get('database') or self._postgresql.database
if 'dbname' not in params:
params['dbname'] = self._postgresql.database
# we are abusing information about the necessity of dbname
# dsn should contain passfile or password only if there is no dbname in it (it is used in recovery.conf)
skip = {'passfile', 'password'}
@@ -870,7 +872,7 @@ class ConfigHandler(object):
ret['user'] = self._superuser['username']
del ret['username']
# ensure certain Patroni configurations are available
ret.update({'database': self._postgresql.database,
ret.update({'dbname': self._postgresql.database,
'fallback_application_name': 'Patroni',
'connect_timeout': 3,
'options': '-c statement_timeout=2000'})

View File

@@ -1,9 +1,10 @@
import logging
import psycopg2
from contextlib import contextmanager
from threading import Lock
from .. import psycopg
logger = logging.getLogger(__name__)
@@ -20,7 +21,7 @@ class Connection(object):
def get(self):
with self._lock:
if not self._connection or self._connection.closed != 0:
self._connection = psycopg2.connect(**self._conn_kwargs)
self._connection = psycopg.connect(**self._conn_kwargs)
self._connection.autocommit = True
self.server_version = self._connection.server_version
return self._connection
@@ -40,7 +41,7 @@ class Connection(object):
@contextmanager
def get_connection_cursor(**kwargs):
conn = psycopg2.connect(**kwargs)
conn = psycopg.connect(**kwargs)
conn.autocommit = True
with conn.cursor() as cur:
yield cur

View File

@@ -152,8 +152,8 @@ class Rewind(object):
def _conn_kwargs(self, member, auth):
ret = member.conn_kwargs(auth)
if not ret.get('database'):
ret['database'] = self._postgresql.database
if not ret.get('dbname'):
ret['dbname'] = self._postgresql.database
return ret
def _check_timeline_and_lsn(self, leader):
@@ -179,7 +179,7 @@ class Rewind(object):
elif local_timeline == master_timeline:
need_rewind = False
elif master_timeline > 1:
cur.execute('TIMELINE_HISTORY %s', (master_timeline,))
cur.execute('TIMELINE_HISTORY {0}'.format(master_timeline))
history = cur.fetchone()[1]
if not isinstance(history, six.string_types):
history = bytes(history).decode('utf-8')

View File

@@ -5,10 +5,10 @@ import shutil
from collections import defaultdict
from contextlib import contextmanager
from psycopg2.errors import UndefinedFile
from .connection import get_connection_cursor
from .misc import format_lsn
from ..psycopg import UndefinedFile
logger = logging.getLogger(__name__)
@@ -162,7 +162,7 @@ class SlotsHandler(object):
# Create new logical slots
for database, values in logical_slots.items():
with self._get_local_connection_cursor(database=database) as cur:
with self._get_local_connection_cursor(dbname=database) as cur:
for name, value in values.items():
try:
cur.execute("SELECT pg_catalog.pg_create_logical_replication_slot(%s, %s)" +
@@ -194,7 +194,7 @@ class SlotsHandler(object):
# Advance logical slots
for database, values in advance_slots.items():
with self._get_local_connection_cursor(database=database, options='-c statement_timeout=0') as cur:
with self._get_local_connection_cursor(dbname=database, options='-c statement_timeout=0') as cur:
for name, value in values.items():
try:
cur.execute("SELECT pg_catalog.pg_replication_slot_advance(%s, %s)",
@@ -236,7 +236,7 @@ class SlotsHandler(object):
@contextmanager
def _get_leader_connection_cursor(self, leader):
conn_kwargs = leader.conn_kwargs(self._postgresql.config.rewind_credentials)
conn_kwargs['database'] = self._postgresql.database
conn_kwargs['dbname'] = self._postgresql.database
with get_connection_cursor(connect_timeout=3, options="-c statement_timeout=2000", **conn_kwargs) as cur:
yield cur

43
patroni/psycopg.py Normal file
View File

@@ -0,0 +1,43 @@
__all__ = ['connect', 'quote_ident', 'quote_literal', 'DatabaseError',
'Error', 'OperationalError', 'ProgrammingError', 'UndefinedFile']
_legacy = False
try:
from psycopg2 import __version__
from . import MIN_PSYCOPG2, parse_version
if parse_version(__version__) < MIN_PSYCOPG2:
raise ImportError
from psycopg2 import connect, Error, DatabaseError, OperationalError, ProgrammingError
from psycopg2.errors import UndefinedFile
from psycopg2.extensions import adapt
try:
from psycopg2.extensions import quote_ident as _quote_ident
except ImportError:
_legacy = True
def quote_literal(value, conn=None):
value = adapt(value)
if conn:
value.prepare(conn)
return value.getquoted().decode('utf-8')
except ImportError:
from psycopg import connect as _connect, sql, Error, DatabaseError, OperationalError, ProgrammingError
from psycopg.errors import UndefinedFile
def connect(*args, **kwargs):
ret = _connect(*args, **kwargs)
ret.server_version = ret.pgconn.server_version # compatibility with psycopg2
return ret
def _quote_ident(value, conn):
return sql.Identifier(value).as_string(conn)
def quote_literal(value, conn=None):
return sql.Literal(value).as_string(conn)
def quote_ident(value, conn=None):
if _legacy or conn is None:
return '"{0}"'.format(value.replace('"', '""'))
return _quote_ident(value, conn)

View File

@@ -27,13 +27,14 @@ import argparse
import csv
import logging
import os
import psycopg2
import subprocess
import sys
import time
from collections import namedtuple
from .. import psycopg
logger = logging.getLogger(__name__)
RETRY_SLEEP_INTERVAL = 1
@@ -215,7 +216,7 @@ class WALERestore(object):
if self.master_connection:
try:
# get the difference in bytes between the current WAL location and the backup start offset
with psycopg2.connect(self.master_connection) as con:
with psycopg.connect(self.master_connection) as con:
if con.server_version >= 100000:
wal_name = 'wal'
lsn_name = 'lsn'
@@ -233,7 +234,7 @@ class WALERestore(object):
(backup_start_lsn, backup_start_lsn, backup_start_lsn))
diff_in_bytes = int(cur.fetchone()[0])
except psycopg2.Error:
except psycopg.Error:
logger.exception('could not determine difference with the master location')
if attempts_no < self.retries: # retry in case of a temporarily connection issue
attempts_no = attempts_no + 1

View File

@@ -216,13 +216,13 @@ def setup_package(version):
if __name__ == '__main__':
old_modules = sys.modules.copy()
try:
from patroni import check_psycopg2, fatal, __version__
from patroni import check_psycopg, fatal, __version__
finally:
sys.modules.clear()
sys.modules.update(old_modules)
if sys.version_info < (2, 7, 0):
fatal('Patroni needs to be run with Python 2.7+')
check_psycopg2()
check_psycopg()
setup_package(__version__)

View File

@@ -5,9 +5,10 @@ import unittest
from mock import Mock, patch
import psycopg2
import urllib3
import patroni.psycopg as psycopg
from patroni.dcs import Leader, Member
from patroni.postgresql import Postgresql
from patroni.postgresql.config import ConfigHandler
@@ -85,9 +86,9 @@ class MockCursor(object):
def execute(self, sql, *params):
if sql.startswith('blabla'):
raise psycopg2.ProgrammingError()
raise psycopg.ProgrammingError()
elif sql == 'CHECKPOINT' or sql.startswith('SELECT pg_catalog.pg_create_'):
raise psycopg2.OperationalError()
raise psycopg.OperationalError()
elif sql.startswith('RetryFailedError'):
raise RetryFailedError('retry')
elif sql.startswith('SELECT catalog_xmin'):
@@ -162,7 +163,7 @@ class MockConnect(object):
pass
def psycopg2_connect(*args, **kwargs):
def psycopg_connect(*args, **kwargs):
return MockConnect()
@@ -176,7 +177,7 @@ class PostgresInit(unittest.TestCase):
'force_parallel_mode': '1', 'constraint_exclusion': '',
'max_stack_depth': 'Z', 'vacuum_cost_limit': -1, 'vacuum_cost_delay': 200}
@patch('psycopg2.connect', psycopg2_connect)
@patch('patroni.psycopg.connect', psycopg_connect)
@patch('patroni.postgresql.CallbackExecutor', Mock())
@patch.object(ConfigHandler, 'write_postgresql_conf', Mock())
@patch.object(ConfigHandler, 'replace_pg_hba', Mock())

View File

@@ -1,9 +1,10 @@
import datetime
import json
import psycopg2
import unittest
import socket
import patroni.psycopg as psycopg
from mock import Mock, PropertyMock, patch
from patroni.api import RestApiHandler, RestApiServer
from patroni.dcs import ClusterConfig, Member
@@ -11,7 +12,7 @@ from patroni.ha import _MemberStatus
from patroni.utils import tzutc
from six import BytesIO as IO
from six.moves import BaseHTTPServer
from . import psycopg2_connect, MockCursor
from . import psycopg_connect, MockCursor
from .test_ha import get_cluster_initialized_without_leader
@@ -35,7 +36,7 @@ class MockPostgresql(object):
@staticmethod
def connection():
return psycopg2_connect()
return psycopg_connect()
@staticmethod
def postmaster_start_time():
@@ -436,9 +437,9 @@ class TestRestApiHandler(unittest.TestCase):
@patch('time.sleep', Mock())
def test_RestApiServer_query(self):
with patch.object(MockCursor, 'execute', Mock(side_effect=psycopg2.OperationalError)):
with patch.object(MockCursor, 'execute', Mock(side_effect=psycopg.OperationalError)):
self.assertIsNotNone(MockRestApiServer(RestApiHandler, 'GET /patroni'))
with patch.object(MockPostgresql, 'connection', Mock(side_effect=psycopg2.OperationalError)):
with patch.object(MockPostgresql, 'connection', Mock(side_effect=psycopg.OperationalError)):
self.assertIsNotNone(MockRestApiServer(RestApiHandler, 'GET /patroni'))
@patch('time.sleep', Mock())

View File

@@ -8,11 +8,11 @@ from patroni.postgresql.bootstrap import Bootstrap
from patroni.postgresql.cancellable import CancellableSubprocess
from patroni.postgresql.config import ConfigHandler
from . import psycopg2_connect, BaseTestPostgresql
from . import psycopg_connect, BaseTestPostgresql
@patch('subprocess.call', Mock(return_value=0))
@patch('psycopg2.connect', psycopg2_connect)
@patch('patroni.psycopg.connect', psycopg_connect)
@patch('os.rename', Mock())
class TestBootstrap(BaseTestPostgresql):
@@ -164,6 +164,7 @@ class TestBootstrap(BaseTestPostgresql):
@patch('os.unlink', Mock())
@patch('shutil.copy', Mock())
@patch('os.path.isfile', Mock(return_value=True))
@patch('patroni.postgresql.bootstrap.quote_ident', Mock())
@patch.object(Bootstrap, 'call_post_bootstrap', Mock(return_value=True))
@patch.object(Bootstrap, '_custom_bootstrap', Mock(return_value=True))
@patch.object(Postgresql, 'start', Mock(return_value=True))

View File

@@ -9,11 +9,11 @@ from patroni.ctl import ctl, store_config, load_config, output_members, get_dcs,
get_all_members, get_any_member, get_cursor, query_member, configure, PatroniCtlException, apply_config_changes, \
format_config_for_editing, show_diff, invoke_editor, format_pg_version, CONFIG_FILE_PATH
from patroni.dcs.etcd import AbstractEtcdClientWithFailover, Failover
from patroni.psycopg import OperationalError
from patroni.utils import tzutc
from psycopg2 import OperationalError
from urllib3 import PoolManager
from . import MockConnect, MockCursor, MockResponse, psycopg2_connect
from . import MockConnect, MockCursor, MockResponse, psycopg_connect
from .test_etcd import etcd_read, socket_getaddrinfo
from .test_ha import get_cluster_initialized_without_leader, get_cluster_initialized_with_leader, \
get_cluster_initialized_with_only_leader, get_cluster_not_initialized_without_leader, get_cluster, Member
@@ -48,7 +48,7 @@ class TestCtl(unittest.TestCase):
self.assertRaises(PatroniCtlException, load_config, './non-existing-config-file', None)
self.assertRaises(PatroniCtlException, load_config, './non-existing-config-file', None)
@patch('psycopg2.connect', psycopg2_connect)
@patch('patroni.psycopg.connect', psycopg_connect)
def test_get_cursor(self):
self.assertIsNone(get_cursor(get_cluster_initialized_without_leader(), {}, role='master'))
@@ -57,7 +57,7 @@ class TestCtl(unittest.TestCase):
# MockCursor returns pg_is_in_recovery as false
self.assertIsNone(get_cursor(get_cluster_initialized_with_leader(), {}, role='replica'))
self.assertIsNotNone(get_cursor(get_cluster_initialized_with_leader(), {'database': 'foo'}, role='any'))
self.assertIsNotNone(get_cursor(get_cluster_initialized_with_leader(), {'dbname': 'foo'}, role='any'))
def test_parse_dcs(self):
assert parse_dcs(None) is None
@@ -165,7 +165,7 @@ class TestCtl(unittest.TestCase):
def test_get_dcs(self):
self.assertRaises(PatroniCtlException, get_dcs, {'dummy': {}}, 'dummy')
@patch('psycopg2.connect', psycopg2_connect)
@patch('patroni.psycopg.connect', psycopg_connect)
@patch('patroni.ctl.query_member', Mock(return_value=([['mock column']], None)))
@patch('patroni.ctl.get_dcs')
@patch.object(etcd.Client, 'read', etcd_read)

View File

@@ -19,7 +19,7 @@ from patroni.utils import tzutc
from patroni.watchdog import Watchdog
from six.moves import builtins
from . import PostgresInit, MockPostmaster, psycopg2_connect, requests_get
from . import PostgresInit, MockPostmaster, psycopg_connect, requests_get
from .test_etcd import socket_getaddrinfo, etcd_read, etcd_write
SYSID = '12345678901'
@@ -323,7 +323,7 @@ class TestHa(PostgresInit):
self.p.controldata = lambda: {'Database cluster state': 'in production', 'Database system identifier': SYSID}
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader because I had the session lock')
@patch('psycopg2.connect', psycopg2_connect)
@patch('patroni.psycopg.connect', psycopg_connect)
def test_acquire_lock_as_master(self):
self.assertEqual(self.ha.run_cycle(), 'acquired session lock as a leader')
@@ -460,6 +460,8 @@ class TestHa(PostgresInit):
self.ha.cluster = get_cluster_not_initialized_without_leader()
self.assertEqual(self.ha.bootstrap(), 'failed to acquire initialize lock')
@patch('patroni.psycopg.connect', psycopg_connect)
@patch.object(Postgresql, 'connection', Mock(return_value=None))
def test_bootstrap_initialized_new_cluster(self):
self.ha.cluster = get_cluster_not_initialized_without_leader()
self.e.initialize = true
@@ -477,6 +479,8 @@ class TestHa(PostgresInit):
self.p.is_running = false
self.assertRaises(PatroniFatalException, self.ha.post_bootstrap)
@patch('patroni.psycopg.connect', psycopg_connect)
@patch.object(Postgresql, 'connection', Mock(return_value=None))
def test_bootstrap_release_initialize_key_on_watchdog_failure(self):
self.ha.cluster = get_cluster_not_initialized_without_leader()
self.e.initialize = true
@@ -487,7 +491,7 @@ class TestHa(PostgresInit):
self.assertEqual(self.ha.post_bootstrap(), 'running post_bootstrap')
self.assertRaises(PatroniFatalException, self.ha.post_bootstrap)
@patch('psycopg2.connect', psycopg2_connect)
@patch('patroni.psycopg.connect', psycopg_connect)
def test_reinitialize(self):
self.assertIsNotNone(self.ha.reinitialize())
@@ -1153,7 +1157,7 @@ class TestHa(PostgresInit):
self.ha.is_paused = false
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
@patch('psycopg2.connect', psycopg2_connect)
@patch('patroni.psycopg.connect', psycopg_connect)
def test_permanent_logical_slots_after_promote(self):
config = ClusterConfig(1, {'slots': {'l': {'database': 'postgres', 'plugin': 'test_decoding'}}}, 1)
self.p.name = 'other'
@@ -1185,7 +1189,7 @@ class TestHa(PostgresInit):
self.ha.has_lock = true
self.assertEqual(self.ha.run_cycle(), 'PAUSE: released leader key voluntarily due to the system ID mismatch')
@patch('psycopg2.connect', psycopg2_connect)
@patch('patroni.psycopg.connect', psycopg_connect)
@patch('os.path.exists', Mock(return_value=True))
@patch('shutil.rmtree', Mock())
@patch('os.makedirs', Mock())

View File

@@ -13,15 +13,23 @@ from patroni.dcs.etcd import AbstractEtcdClientWithFailover
from patroni.exceptions import DCSError
from patroni.postgresql import Postgresql
from patroni.postgresql.config import ConfigHandler
from patroni import Patroni, main as _main, patroni_main, check_psycopg2
from patroni import Patroni, main as _main, patroni_main, check_psycopg
from six.moves import BaseHTTPServer, builtins
from threading import Thread
from . import psycopg2_connect, SleepException
from . import psycopg_connect, SleepException
from .test_etcd import etcd_read, etcd_write
from .test_postgresql import MockPostmaster
def mock_import(*args, **kwargs):
if args[0] == 'psycopg':
raise ImportError
ret = Mock()
ret.__version__ = '2.5.3.dev1 a b c'
return ret
class MockFrozenImporter(object):
toc = set(['patroni.dcs.etcd'])
@@ -29,7 +37,7 @@ class MockFrozenImporter(object):
@patch('time.sleep', Mock())
@patch('subprocess.call', Mock(return_value=0))
@patch('psycopg2.connect', psycopg2_connect)
@patch('patroni.psycopg.connect', psycopg_connect)
@patch.object(ConfigHandler, 'append_pg_hba', Mock())
@patch.object(ConfigHandler, 'write_postgresql_conf', Mock())
@patch.object(ConfigHandler, 'write_recovery_conf', Mock())
@@ -181,8 +189,8 @@ class TestPatroni(unittest.TestCase):
self.p.ha.shutdown = Mock(side_effect=Exception)
self.p.shutdown()
def test_check_psycopg2(self):
def test_check_psycopg(self):
with patch.object(builtins, '__import__', Mock(side_effect=ImportError)):
self.assertRaises(SystemExit, check_psycopg2)
with patch('psycopg2.__version__', '2.5.3.dev1 a b c'):
self.assertRaises(SystemExit, check_psycopg2)
self.assertRaises(SystemExit, check_psycopg)
with patch.object(builtins, '__import__', mock_import):
self.assertRaises(SystemExit, check_psycopg)

View File

@@ -1,12 +1,14 @@
import datetime
import os
import psutil
import psycopg2
import re
import subprocess
import time
from mock import Mock, MagicMock, PropertyMock, patch, mock_open
import patroni.psycopg as psycopg
from patroni.async_executor import CriticalTask
from patroni.dcs import Cluster, RemoteMember, SyncState
from patroni.exceptions import PostgresConnectionException, PatroniException
@@ -17,7 +19,7 @@ from patroni.utils import RetryFailedError
from six.moves import builtins
from threading import Thread, current_thread
from . import BaseTestPostgresql, MockCursor, MockPostmaster, psycopg2_connect
from . import BaseTestPostgresql, MockCursor, MockPostmaster, psycopg_connect
mtime_ret = {}
@@ -87,7 +89,7 @@ Data page checksum version: 0
@patch('subprocess.call', Mock(return_value=0))
@patch('psycopg2.connect', psycopg2_connect)
@patch('patroni.psycopg.connect', psycopg_connect)
class TestPostgresql(BaseTestPostgresql):
@patch('subprocess.call', Mock(return_value=0))
@@ -319,7 +321,7 @@ class TestPostgresql(BaseTestPostgresql):
m = RemoteMember('1', {'restore_command': '2', 'primary_slot_name': 'foo', 'conn_kwargs': {'host': 'bar'}})
self.p.follow(m)
@patch.object(MockCursor, 'execute', Mock(side_effect=psycopg2.OperationalError))
@patch.object(MockCursor, 'execute', Mock(side_effect=psycopg.OperationalError))
def test__query(self):
self.assertRaises(PostgresConnectionException, self.p._query, 'blabla')
self.p._state = 'restarting'
@@ -328,7 +330,7 @@ class TestPostgresql(BaseTestPostgresql):
def test_query(self):
self.p.query('select 1')
self.assertRaises(PostgresConnectionException, self.p.query, 'RetryFailedError')
self.assertRaises(psycopg2.ProgrammingError, self.p.query, 'blabla')
self.assertRaises(psycopg.ProgrammingError, self.p.query, 'blabla')
@patch.object(Postgresql, 'pg_isready', Mock(return_value=STATE_REJECT))
def test_is_leader(self):
@@ -430,7 +432,7 @@ class TestPostgresql(BaseTestPostgresql):
@patch.object(Postgresql, 'is_running', Mock(return_value=MockPostmaster()))
def test_is_leader_exception(self):
self.p.start()
self.p.query = Mock(side_effect=psycopg2.OperationalError("not supported"))
self.p.query = Mock(side_effect=psycopg.OperationalError("not supported"))
self.assertTrue(self.p.stop())
@patch('os.rename', Mock())
@@ -559,7 +561,7 @@ class TestPostgresql(BaseTestPostgresql):
t.start()
t.join()
with patch.object(MockCursor, "execute", side_effect=psycopg2.Error):
with patch.object(MockCursor, "execute", side_effect=psycopg.Error):
self.assertIsNone(self.p.postmaster_start_time())
def test_check_for_startup(self):
@@ -722,7 +724,7 @@ class TestPostgresql(BaseTestPostgresql):
self.p.stop(on_safepoint=mock_callback)
mock_postmaster.is_running.side_effect = [True, False, False]
with patch.object(MockCursor, "execute", Mock(side_effect=psycopg2.Error)):
with patch.object(MockCursor, "execute", Mock(side_effect=psycopg.Error)):
self.p.stop(on_safepoint=mock_callback)
def test_terminate_starting_postmaster(self):

View File

@@ -5,7 +5,7 @@ from patroni.postgresql.cancellable import CancellableSubprocess
from patroni.postgresql.rewind import Rewind
from six.moves import builtins
from . import BaseTestPostgresql, MockCursor, psycopg2_connect
from . import BaseTestPostgresql, MockCursor, psycopg_connect
class MockThread(object):
@@ -47,7 +47,7 @@ def mock_single_user_mode(self, communicate, options):
@patch('subprocess.call', Mock(return_value=0))
@patch('psycopg2.connect', psycopg2_connect)
@patch('patroni.psycopg.connect', psycopg_connect)
class TestRewind(BaseTestPostgresql):
def setUp(self):
@@ -143,7 +143,7 @@ class TestRewind(BaseTestPostgresql):
mock_check_leader_is_not_in_recovery.return_value = True
self.assertFalse(self.r.rewind_or_reinitialize_needed_and_possible(self.leader))
self.r.trigger_check_diverged_lsn()
with patch('psycopg2.connect', Mock(side_effect=Exception)):
with patch('patroni.psycopg.connect', Mock(side_effect=Exception)):
self.assertFalse(self.r.rewind_or_reinitialize_needed_and_possible(self.leader))
self.r.trigger_check_diverged_lsn()
with patch.object(MockCursor, 'fetchone', Mock(side_effect=[('', 3, '0/0'), ('', b'3\t0/40159C0\tn\n')])):

View File

@@ -1,20 +1,20 @@
import mock
import os
import psycopg2
import unittest
from mock import Mock, PropertyMock, patch
from patroni import psycopg
from patroni.dcs import Cluster, ClusterConfig, Member
from patroni.postgresql import Postgresql
from patroni.postgresql.slots import SlotsHandler, fsync_dir
from . import BaseTestPostgresql, psycopg2_connect, MockCursor
from . import BaseTestPostgresql, psycopg_connect, MockCursor
@patch('subprocess.call', Mock(return_value=0))
@patch('psycopg2.connect', psycopg2_connect)
@patch('patroni.psycopg.connect', psycopg_connect)
@patch.object(Postgresql, 'is_running', Mock(return_value=True))
class TestSlotsHandler(BaseTestPostgresql):
@@ -34,7 +34,7 @@ class TestSlotsHandler(BaseTestPostgresql):
'ignore_slots': [{'name': 'blabla'}]}, 1)
cluster = Cluster(True, config, self.leader, 0,
[self.me, self.other, self.leadermem], None, None, None, {'test_3': 10})
with mock.patch('patroni.postgresql.Postgresql._query', Mock(side_effect=psycopg2.OperationalError)):
with mock.patch('patroni.postgresql.Postgresql._query', Mock(side_effect=psycopg.OperationalError)):
self.s.sync_replication_slots(cluster, False)
self.p.set_role('standby_leader')
self.s.sync_replication_slots(cluster, False)
@@ -86,14 +86,14 @@ class TestSlotsHandler(BaseTestPostgresql):
[self.me, self.other, self.leadermem], None, None, None, {'ls': 12346})
self.assertEqual(self.s.sync_replication_slots(cluster, False), [])
self.s._schedule_load_slots = False
with patch.object(MockCursor, 'execute', Mock(side_effect=psycopg2.errors.UndefinedFile)):
with patch.object(MockCursor, 'execute', Mock(side_effect=psycopg.UndefinedFile)):
self.assertEqual(self.s.sync_replication_slots(cluster, False), ['ls'])
cluster.slots['ls'] = 'a'
self.assertEqual(self.s.sync_replication_slots(cluster, False), [])
with patch.object(MockCursor, 'rowcount', PropertyMock(return_value=1), create=True):
self.assertEqual(self.s.sync_replication_slots(cluster, False), ['ls'])
@patch.object(MockCursor, 'execute', Mock(side_effect=psycopg2.OperationalError))
@patch.object(MockCursor, 'execute', Mock(side_effect=psycopg.OperationalError))
def test_copy_logical_slots(self):
self.s.copy_logical_slots(self.leader, ['foo'])

View File

@@ -1,14 +1,15 @@
import psycopg2
import subprocess
import unittest
import patroni.psycopg as psycopg
from mock import Mock, PropertyMock, patch, mock_open
from patroni.scripts import wale_restore
from patroni.scripts.wale_restore import WALERestore, main as _main, get_major_version
from six.moves import builtins
from threading import current_thread
from . import MockConnect, psycopg2_connect
from . import MockConnect, psycopg_connect
wale_output_header = (
b'name\tlast_modified\t'
@@ -34,7 +35,7 @@ WALE_TEST_RETRIES = 2
@patch('os.makedirs', Mock(return_value=True))
@patch('os.path.exists', Mock(return_value=True))
@patch('os.path.isdir', Mock(return_value=True))
@patch('psycopg2.connect', psycopg2_connect)
@patch('patroni.psycopg.connect', psycopg_connect)
@patch('subprocess.check_output', Mock(return_value=wale_output))
class TestWALERestore(unittest.TestCase):
@@ -57,7 +58,7 @@ class TestWALERestore(unittest.TestCase):
with patch('subprocess.check_output', Mock(return_value=wale_output.replace(b'167772160', b'1'))):
self.assertFalse(self.wale_restore.should_use_s3_to_create_replica())
with patch('psycopg2.connect', Mock(side_effect=psycopg2.Error("foo"))):
with patch('patroni.psycopg.connect', Mock(side_effect=psycopg.Error("foo"))):
save_no_master = self.wale_restore.no_master
save_master_connection = self.wale_restore.master_connection