diff --git a/.github/workflows/install_deps.py b/.github/workflows/install_deps.py
index 187d634f..46a3fffe 100644
--- a/.github/workflows/install_deps.py
+++ b/.github/workflows/install_deps.py
@@ -18,7 +18,10 @@ def install_requirements(what):
finally:
sys.path = old_path
requirements = ['mock>=2.0.0', 'flake8', 'pytest', 'pytest-cov'] if what == 'all' else ['behave']
- requirements += ['psycopg2-binary', 'coverage']
+ requirements += ['coverage']
+ # try to split tests between psycopg2 and psycopg3
+ requirements += ['psycopg[binary]'] if sys.version_info >= (3, 6, 0) and\
+ (sys.platform != 'darwin' or what == 'etcd3') else ['psycopg2-binary']
for r in read('requirements.txt').split('\n'):
r = r.strip()
if r != '':
diff --git a/README.rst b/README.rst
index 3591d82f..a6ef374e 100644
--- a/README.rst
+++ b/README.rst
@@ -61,7 +61,7 @@ To install requirements on a Mac, run the following:
brew install postgresql etcd haproxy libyaml python
-**Psycopg2**
+**Psycopg**
Starting from `psycopg2-2.8 `__ the binary version of psycopg2 will no longer be installed by default. Installing it from the source code requires C compiler and postgres+python dev packages.
Since in the python world it is not possible to specify dependency as ``psycopg2 OR psycopg2-binary`` you will have to decide how to install it.
@@ -88,6 +88,12 @@ There are a few options available:
pip install psycopg2>=2.5.4
+4. Use psycopg 3.0 instead of psycopg2
+
+::
+
+ pip install psycopg[binary]
+
**General installation for pip**
Patroni can be installed with pip:
diff --git a/docs/README.rst b/docs/README.rst
index ee8f0c20..94045a7a 100644
--- a/docs/README.rst
+++ b/docs/README.rst
@@ -35,7 +35,7 @@ To install requirements on a Mac, run the following:
.. _psycopg2_install_options:
-**Psycopg2**
+**Psycopg**
Starting from `psycopg2-2.8 `__ the binary version of psycopg2 will no longer be installed by default. Installing it from the source code requires C compiler and postgres+python dev packages.
Since in the python world it is not possible to specify dependency as ``psycopg2 OR psycopg2-binary`` you will have to decide how to install it.
@@ -62,6 +62,12 @@ There are a few options available:
pip install psycopg2>=2.5.4
+4. Use psycopg 3.0 instead of psycopg2
+
+::
+
+ pip install psycopg[binary]>=3.0.0
+
**General installation for pip**
Patroni can be installed with pip:
diff --git a/features/environment.py b/features/environment.py
index cfa16f04..ff4b032f 100644
--- a/features/environment.py
+++ b/features/environment.py
@@ -1,7 +1,6 @@
import abc
import datetime
import os
-import psycopg2
import json
import shutil
import signal
@@ -13,6 +12,8 @@ import threading
import time
import yaml
+import patroni.psycopg as psycopg
+
from six.moves.BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
@@ -205,16 +206,16 @@ class PatroniController(AbstractController):
user = config['postgresql'].get('authentication', config['postgresql']).get('superuser', {})
self._connkwargs = {k: user[n] for n, k in [('username', 'user'), ('password', 'password')] if n in user}
- self._connkwargs.update({'host': host, 'port': self.__PORT, 'database': 'postgres'})
+ self._connkwargs.update({'host': host, 'port': self.__PORT, 'dbname': 'postgres'})
self._replication = config['postgresql'].get('authentication', config['postgresql']).get('replication', {})
- self._replication.update({'host': host, 'port': self.__PORT, 'database': 'postgres'})
+ self._replication.update({'host': host, 'port': self.__PORT, 'dbname': 'postgres'})
return patroni_config_path
def _connection(self):
if not self._conn or self._conn.closed != 0:
- self._conn = psycopg2.connect(**self._connkwargs)
+ self._conn = psycopg.connect(**self._connkwargs)
self._conn.autocommit = True
return self._conn
@@ -228,7 +229,7 @@ class PatroniController(AbstractController):
cursor = self._cursor()
cursor.execute(query)
return cursor
- except psycopg2.Error:
+ except psycopg.Error:
if not fail_ok:
raise
@@ -268,7 +269,7 @@ class PatroniController(AbstractController):
@property
def backup_source(self):
- return 'postgres://{username}:{password}@{host}:{port}/{database}'.format(**self._replication)
+ return 'postgres://{username}:{password}@{host}:{port}/{dbname}'.format(**self._replication)
def backup(self, dest=os.path.join('data', 'basebackup')):
subprocess.call(PatroniPoolController.BACKUP_SCRIPT + ['--walmethod=none',
diff --git a/features/steps/basic_replication.py b/features/steps/basic_replication.py
index 728dbe5a..90b148b1 100644
--- a/features/steps/basic_replication.py
+++ b/features/steps/basic_replication.py
@@ -1,4 +1,4 @@
-import psycopg2 as pg
+import patroni.psycopg as pg
from behave import step, then
from time import sleep, time
diff --git a/features/steps/slots.py b/features/steps/slots.py
index f761cceb..8a742b1c 100644
--- a/features/steps/slots.py
+++ b/features/steps/slots.py
@@ -1,7 +1,7 @@
import time
-import psycopg2
from behave import step, then
+import patroni.psycopg as pg
@step('I create a logical replication slot {slot_name} on {pg_name:w} with the {plugin:w} plugin')
@@ -10,7 +10,7 @@ def create_logical_replication_slot(context, slot_name, pg_name, plugin):
output = context.pctl.query(pg_name, ("SELECT pg_create_logical_replication_slot('{0}', '{1}'),"
" current_database()").format(slot_name, plugin))
print(output.fetchone())
- except psycopg2.Error as e:
+ except pg.Error as e:
print(e)
assert False, "Error creating slot {0} on {1} with plugin {2}".format(slot_name, pg_name, plugin)
@@ -24,7 +24,7 @@ def has_logical_replication_slot(context, pg_name, slot_name, plugin):
assert row[0] == "logical", "Found replication slot named {0} but wasn't a logical slot".format(slot_name)
assert row[1] == plugin, ("Found replication slot named {0} but was using plugin "
"{1} rather than {2}").format(slot_name, row[1], plugin)
- except psycopg2.Error:
+ except pg.Error:
assert False, "Error looking for slot {0} on {1} with plugin {2}".format(slot_name, pg_name, plugin)
@@ -34,7 +34,7 @@ def does_not_have_logical_replication_slot(context, pg_name, slot_name):
row = context.pctl.query(pg_name, ("SELECT 1 FROM pg_replication_slots"
" WHERE slot_name = '{0}'").format(slot_name)).fetchone()
assert not row, "Found unexpected replication slot named {0}".format(slot_name)
- except psycopg2.Error:
+ except pg.Error:
assert False, "Error looking for slot {0} on {1}".format(slot_name, pg_name)
diff --git a/patroni/__init__.py b/patroni/__init__.py
index 04e7e595..46aac101 100644
--- a/patroni/__init__.py
+++ b/patroni/__init__.py
@@ -11,6 +11,7 @@ logger = logging.getLogger(__name__)
PATRONI_ENV_PREFIX = 'PATRONI_'
KUBERNETES_ENV_PREFIX = 'KUBERNETES_'
+MIN_PSYCOPG2 = (2, 5, 4)
class Patroni(AbstractPatroniDaemon):
@@ -144,30 +145,40 @@ def fatal(string, *args):
sys.exit(1)
-def check_psycopg2():
- min_psycopg2 = (2, 5, 4)
- min_psycopg2_str = '.'.join(map(str, min_psycopg2))
-
- def parse_version(version):
+def parse_version(version):
+ def _parse_version(version):
for e in version.split('.'):
try:
yield int(e)
except ValueError:
break
+ return tuple(_parse_version(version.split(' ')[0]))
+
+
+# We pass MIN_PSYCOPG2 and parse_version as arguments to simplify usage of check_psycopg from the setup.py
+def check_psycopg(_min_psycopg2=MIN_PSYCOPG2, _parse_version=parse_version):
+ min_psycopg2_str = '.'.join(map(str, _min_psycopg2))
try:
- import psycopg2
- version_str = psycopg2.__version__.split(' ')[0]
- version = tuple(parse_version(version_str))
- if version < min_psycopg2:
- fatal('Patroni requires psycopg2>={0}, but only {1} is available', min_psycopg2_str, version_str)
+ from psycopg2 import __version__
+ if _parse_version(__version__) >= _min_psycopg2:
+ return
+ version_str = __version__.split(' ')[0]
except ImportError:
- fatal('Patroni requires psycopg2>={0} or psycopg2-binary', min_psycopg2_str)
+ version_str = None
+
+ try:
+ from psycopg import __version__
+ except ImportError:
+ error = 'Patroni requires psycopg2>={0}, psycopg2-binary, or psycopg>=3.0'.format(min_psycopg2_str)
+ if version_str:
+ error += ', but only psycopg2=={0} is available'.format(version_str)
+ fatal(error)
def main():
if os.getpid() != 1:
- check_psycopg2()
+ check_psycopg()
return patroni_main()
# Patroni started with PID=1, it looks like we are in the container
diff --git a/patroni/api.py b/patroni/api.py
index 82fe1042..505852a9 100644
--- a/patroni/api.py
+++ b/patroni/api.py
@@ -2,7 +2,6 @@ import base64
import hmac
import json
import logging
-import psycopg2
import time
import traceback
import dateutil.parser
@@ -18,6 +17,7 @@ from six.moves.socketserver import ThreadingMixIn
from six.moves.urllib_parse import urlparse, parse_qs
from threading import Thread
+from . import psycopg
from .exceptions import PostgresConnectionException, PostgresException
from .postgresql.misc import postgres_version_to_int
from .utils import deep_compare, enable_keepalive, parse_bool, patch_config, Retry, \
@@ -625,7 +625,7 @@ class RestApiHandler(BaseHTTPRequestHandler):
if row[7]:
result['replication'] = row[7]
- except (psycopg2.Error, RetryFailedError, PostgresConnectionException):
+ except (psycopg.Error, RetryFailedError, PostgresConnectionException):
state = postgresql.state
if state == 'running':
logger.exception('get_postgresql_status')
@@ -665,7 +665,7 @@ class RestApiServer(ThreadingMixIn, HTTPServer, Thread):
with self.patroni.postgresql.connection().cursor() as cursor:
cursor.execute(sql, params)
return [r for r in cursor]
- except psycopg2.Error as e:
+ except psycopg.Error as e:
if cursor and cursor.connection.closed == 0:
raise e
raise PostgresConnectionException('connection problems')
diff --git a/patroni/ctl.py b/patroni/ctl.py
index 10b07a69..83197bfa 100644
--- a/patroni/ctl.py
+++ b/patroni/ctl.py
@@ -264,13 +264,13 @@ def get_cursor(cluster, connect_parameters, role='master', member=None):
params = member.conn_kwargs(connect_parameters)
params.update({'fallback_application_name': 'Patroni ctl', 'connect_timeout': '5'})
- if 'database' in connect_parameters:
- params['database'] = connect_parameters['database']
+ if 'dbname' in connect_parameters:
+ params['dbname'] = connect_parameters['dbname']
else:
- params.pop('database')
+ params.pop('dbname')
- import psycopg2
- conn = psycopg2.connect(**params)
+ from . import psycopg
+ conn = psycopg.connect(**params)
conn.autocommit = True
cursor = conn.cursor()
if role == 'any':
@@ -401,7 +401,7 @@ def query(
if password:
connect_parameters['password'] = click.prompt('Password', hide_input=True, type=str)
if dbname:
- connect_parameters['database'] = dbname
+ connect_parameters['dbname'] = dbname
if p_file is not None:
command = p_file.read()
@@ -418,7 +418,7 @@ def query(
def query_member(cluster, cursor, member, role, command, connect_parameters):
- import psycopg2
+ from . import psycopg
try:
if cursor is None:
cursor = get_cursor(cluster, connect_parameters, role=role, member=member)
@@ -433,11 +433,11 @@ def query_member(cluster, cursor, member, role, command, connect_parameters):
cursor.execute(command)
return cursor.fetchall(), [d.name for d in cursor.description]
- except (psycopg2.OperationalError, psycopg2.DatabaseError) as oe:
- logging.debug(oe)
+ except psycopg.DatabaseError as de:
+ logging.debug(de)
if cursor is not None and not cursor.connection.closed:
cursor.connection.close()
- message = oe.pgcode or oe.pgerror or str(oe)
+ message = de.diag.sqlstate or str(de)
message = message.replace('\n', ' ')
return [[timestamp(0), 'ERROR, SQLSTATE: {0}'.format(message)]], None
diff --git a/patroni/dcs/__init__.py b/patroni/dcs/__init__.py
index b3e68d1d..dc2c57ea 100644
--- a/patroni/dcs/__init__.py
+++ b/patroni/dcs/__init__.py
@@ -160,7 +160,7 @@ class Member(namedtuple('Member', 'index,name,session,data')):
defaults = {
"host": None,
"port": None,
- "database": None
+ "dbname": None
}
ret = self.data.get('conn_kwargs')
if ret:
@@ -174,7 +174,7 @@ class Member(namedtuple('Member', 'index,name,session,data')):
ret = {
'host': r.hostname,
'port': r.port or 5432,
- 'database': r.path[1:]
+ 'dbname': r.path[1:]
}
self.data['conn_kwargs'] = ret.copy()
diff --git a/patroni/ha.py b/patroni/ha.py
index 218e8621..664349a5 100644
--- a/patroni/ha.py
+++ b/patroni/ha.py
@@ -2,7 +2,6 @@ import datetime
import functools
import json
import logging
-import psycopg2
import six
import sys
import time
@@ -10,15 +9,17 @@ import uuid
from collections import namedtuple
from multiprocessing.pool import ThreadPool
-from patroni.async_executor import AsyncExecutor, CriticalTask
-from patroni.exceptions import DCSError, PostgresConnectionException, PatroniFatalException
-from patroni.postgresql import ACTION_ON_START, ACTION_ON_ROLE_CHANGE
-from patroni.postgresql.misc import postgres_version_to_int
-from patroni.postgresql.rewind import Rewind
-from patroni.utils import polling_loop, tzutc, is_standby_cluster as _is_standby_cluster, parse_int
-from patroni.dcs import RemoteMember
from threading import RLock
+from . import psycopg
+from .async_executor import AsyncExecutor, CriticalTask
+from .exceptions import DCSError, PostgresConnectionException, PatroniFatalException
+from .postgresql import ACTION_ON_START, ACTION_ON_ROLE_CHANGE
+from .postgresql.misc import postgres_version_to_int
+from .postgresql.rewind import Rewind
+from .utils import polling_loop, tzutc, is_standby_cluster as _is_standby_cluster, parse_int
+from .dcs import RemoteMember
+
logger = logging.getLogger(__name__)
@@ -1491,7 +1492,7 @@ class Ha(object):
self.demote('offline')
return 'demoted self because DCS is not accessible and i was a leader'
return 'DCS is not accessible'
- except (psycopg2.Error, PostgresConnectionException):
+ except (psycopg.Error, PostgresConnectionException):
return 'Error communicating with PostgreSQL. Will try again later'
finally:
if not dcs_failed:
diff --git a/patroni/postgresql/__init__.py b/patroni/postgresql/__init__.py
index ad48501d..57d41823 100644
--- a/patroni/postgresql/__init__.py
+++ b/patroni/postgresql/__init__.py
@@ -1,6 +1,5 @@
import logging
import os
-import psycopg2
import re
import shlex
import shutil
@@ -23,6 +22,7 @@ from .connection import Connection, get_connection_cursor
from .misc import parse_history, parse_lsn, postgres_major_version_to_int
from .postmaster import PostmasterProcess
from .slots import SlotsHandler
+from .. import psycopg
from ..exceptions import PostgresConnectionException
from ..utils import Retry, RetryFailedError, polling_loop, data_directory_is_empty, parse_int
@@ -266,13 +266,13 @@ class Postgresql(object):
cursor = self._connection.cursor()
cursor.execute(sql, params)
return cursor
- except psycopg2.Error as e:
+ except psycopg.Error as e:
if cursor and cursor.connection.closed == 0:
# When connected via unix socket, psycopg2 can't recoginze 'connection lost'
# and leaves `_cursor_holder.connection.closed == 0`, but psycopg2.OperationalError
# is still raised (what is correct). It doesn't make sense to continiue with existing
# connection and we will close it, to avoid its reuse by the `cursor` method.
- if isinstance(e, psycopg2.OperationalError):
+ if isinstance(e, psycopg.OperationalError):
self._connection.close()
else:
raise e
@@ -598,7 +598,7 @@ class Postgresql(object):
if cur.fetchone()[0]:
return 'is_in_recovery=true'
return cur.execute('CHECKPOINT')
- except psycopg2.Error:
+ except psycopg.Error:
logger.exception('Exception during CHECKPOINT')
return 'not accessible or not healty'
@@ -706,7 +706,7 @@ class Postgresql(object):
while postmaster.is_running(): # Need a timeout here?
cur.execute("SELECT 1")
time.sleep(STOP_POLLING_INTERVAL)
- except psycopg2.Error:
+ except psycopg.Error:
pass
def reload(self, block_callbacks=False):
@@ -979,7 +979,7 @@ class Postgresql(object):
with self.connection().cursor() as cursor:
cursor.execute(query)
return cursor.fetchone()[0].isoformat(sep=' ')
- except psycopg2.Error:
+ except psycopg.Error:
return None
def last_operation(self):
diff --git a/patroni/postgresql/bootstrap.py b/patroni/postgresql/bootstrap.py
index c52be97d..dc693aea 100644
--- a/patroni/postgresql/bootstrap.py
+++ b/patroni/postgresql/bootstrap.py
@@ -4,10 +4,12 @@ import shlex
import tempfile
import time
-from patroni.dcs import RemoteMember
-from patroni.utils import deep_compare
from six import string_types
+from ..dcs import RemoteMember
+from ..psycopg import quote_ident, quote_literal
+from ..utils import deep_compare
+
logger = logging.getLogger(__name__)
@@ -297,26 +299,24 @@ class Bootstrap(object):
if 'NOLOGIN' not in options and 'LOGIN' not in options:
options.append('LOGIN')
- params = [name]
if password:
- options.extend(['PASSWORD', '%s'])
- params.extend([password, password])
+ options.extend(['PASSWORD', quote_literal(password)])
sql = """DO $$
BEGIN
SET local synchronous_commit = 'local';
- PERFORM * FROM pg_authid WHERE rolname = %s;
+ PERFORM * FROM pg_catalog.pg_authid WHERE rolname = {0};
IF FOUND THEN
- ALTER ROLE "{0}" WITH {1};
+ ALTER ROLE {1} WITH {2};
ELSE
- CREATE ROLE "{0}" WITH {1};
+ CREATE ROLE {1} WITH {2};
END IF;
-END;$$""".format(name, ' '.join(options))
+END;$$""".format(quote_literal(name), quote_ident(name, self._postgresql.connection()), ' '.join(options))
self._postgresql.query('SET log_statement TO none')
self._postgresql.query('SET log_min_duration_statement TO -1')
self._postgresql.query("SET log_min_error_statement TO 'log'")
try:
- self._postgresql.query(sql, *params)
+ self._postgresql.query(sql)
finally:
self._postgresql.query('RESET log_min_error_statement')
self._postgresql.query('RESET log_min_duration_statement')
@@ -342,8 +342,8 @@ END;$$""".format(name, ' '.join(options))
sql = """DO $$
BEGIN
SET local synchronous_commit = 'local';
- GRANT EXECUTE ON function pg_catalog.{0} TO "{1}";
-END;$$""".format(f, rewind['username'])
+ GRANT EXECUTE ON function pg_catalog.{0} TO {1};
+END;$$""".format(f, quote_ident(rewind['username'], self._postgresql.connection()))
postgresql.query(sql)
for name, value in (config.get('users') or {}).items():
diff --git a/patroni/postgresql/config.py b/patroni/postgresql/config.py
index 543004ed..eb981cd8 100644
--- a/patroni/postgresql/config.py
+++ b/patroni/postgresql/config.py
@@ -12,6 +12,7 @@ from .validator import CaseInsensitiveDict, recovery_parameters,\
transform_postgresql_parameter_value, transform_recovery_parameter_value
from ..dcs import slot_name_from_member_name, RemoteMember
from ..exceptions import PatroniFatalException
+from ..psycopg import quote_ident as _quote_ident
from ..utils import compare_values, parse_bool, parse_int, split_host_port, uri, \
validate_directory, is_subpath
@@ -23,7 +24,7 @@ PARAMETER_RE = re.compile(r'([a-z_]+)\s*=\s*')
def quote_ident(value):
"""Very simplified version of quote_ident"""
- return value if SYNC_STANDBY_NAME_RE.match(value) else '"' + value + '"'
+ return value if SYNC_STANDBY_NAME_RE.match(value) else _quote_ident(value)
def conninfo_uri_parse(dsn):
@@ -477,8 +478,8 @@ class ConfigHandler(object):
ret.setdefault('channel_binding', 'prefer')
if self._krbsrvname:
ret['krbsrvname'] = self._krbsrvname
- if 'database' in ret:
- del ret['database']
+ if 'dbname' in ret:
+ del ret['dbname']
return ret
def format_dsn(self, params, include_dbname=False):
@@ -488,7 +489,8 @@ class ConfigHandler(object):
'sslcrldir', 'application_name', 'krbsrvname', 'gssencmode', 'channel_binding')
if include_dbname:
params = params.copy()
- params['dbname'] = params.get('database') or self._postgresql.database
+ if 'dbname' not in params:
+ params['dbname'] = self._postgresql.database
# we are abusing information about the necessity of dbname
# dsn should contain passfile or password only if there is no dbname in it (it is used in recovery.conf)
skip = {'passfile', 'password'}
@@ -870,7 +872,7 @@ class ConfigHandler(object):
ret['user'] = self._superuser['username']
del ret['username']
# ensure certain Patroni configurations are available
- ret.update({'database': self._postgresql.database,
+ ret.update({'dbname': self._postgresql.database,
'fallback_application_name': 'Patroni',
'connect_timeout': 3,
'options': '-c statement_timeout=2000'})
diff --git a/patroni/postgresql/connection.py b/patroni/postgresql/connection.py
index 434bda59..c6457235 100644
--- a/patroni/postgresql/connection.py
+++ b/patroni/postgresql/connection.py
@@ -1,9 +1,10 @@
import logging
-import psycopg2
from contextlib import contextmanager
from threading import Lock
+from .. import psycopg
+
logger = logging.getLogger(__name__)
@@ -20,7 +21,7 @@ class Connection(object):
def get(self):
with self._lock:
if not self._connection or self._connection.closed != 0:
- self._connection = psycopg2.connect(**self._conn_kwargs)
+ self._connection = psycopg.connect(**self._conn_kwargs)
self._connection.autocommit = True
self.server_version = self._connection.server_version
return self._connection
@@ -40,7 +41,7 @@ class Connection(object):
@contextmanager
def get_connection_cursor(**kwargs):
- conn = psycopg2.connect(**kwargs)
+ conn = psycopg.connect(**kwargs)
conn.autocommit = True
with conn.cursor() as cur:
yield cur
diff --git a/patroni/postgresql/rewind.py b/patroni/postgresql/rewind.py
index 506e822f..fadc41a7 100644
--- a/patroni/postgresql/rewind.py
+++ b/patroni/postgresql/rewind.py
@@ -152,8 +152,8 @@ class Rewind(object):
def _conn_kwargs(self, member, auth):
ret = member.conn_kwargs(auth)
- if not ret.get('database'):
- ret['database'] = self._postgresql.database
+ if not ret.get('dbname'):
+ ret['dbname'] = self._postgresql.database
return ret
def _check_timeline_and_lsn(self, leader):
@@ -179,7 +179,7 @@ class Rewind(object):
elif local_timeline == master_timeline:
need_rewind = False
elif master_timeline > 1:
- cur.execute('TIMELINE_HISTORY %s', (master_timeline,))
+ cur.execute('TIMELINE_HISTORY {0}'.format(master_timeline))
history = cur.fetchone()[1]
if not isinstance(history, six.string_types):
history = bytes(history).decode('utf-8')
diff --git a/patroni/postgresql/slots.py b/patroni/postgresql/slots.py
index 015108ac..09d62463 100644
--- a/patroni/postgresql/slots.py
+++ b/patroni/postgresql/slots.py
@@ -5,10 +5,10 @@ import shutil
from collections import defaultdict
from contextlib import contextmanager
-from psycopg2.errors import UndefinedFile
from .connection import get_connection_cursor
from .misc import format_lsn
+from ..psycopg import UndefinedFile
logger = logging.getLogger(__name__)
@@ -162,7 +162,7 @@ class SlotsHandler(object):
# Create new logical slots
for database, values in logical_slots.items():
- with self._get_local_connection_cursor(database=database) as cur:
+ with self._get_local_connection_cursor(dbname=database) as cur:
for name, value in values.items():
try:
cur.execute("SELECT pg_catalog.pg_create_logical_replication_slot(%s, %s)" +
@@ -194,7 +194,7 @@ class SlotsHandler(object):
# Advance logical slots
for database, values in advance_slots.items():
- with self._get_local_connection_cursor(database=database, options='-c statement_timeout=0') as cur:
+ with self._get_local_connection_cursor(dbname=database, options='-c statement_timeout=0') as cur:
for name, value in values.items():
try:
cur.execute("SELECT pg_catalog.pg_replication_slot_advance(%s, %s)",
@@ -236,7 +236,7 @@ class SlotsHandler(object):
@contextmanager
def _get_leader_connection_cursor(self, leader):
conn_kwargs = leader.conn_kwargs(self._postgresql.config.rewind_credentials)
- conn_kwargs['database'] = self._postgresql.database
+ conn_kwargs['dbname'] = self._postgresql.database
with get_connection_cursor(connect_timeout=3, options="-c statement_timeout=2000", **conn_kwargs) as cur:
yield cur
diff --git a/patroni/psycopg.py b/patroni/psycopg.py
new file mode 100644
index 00000000..487c2161
--- /dev/null
+++ b/patroni/psycopg.py
@@ -0,0 +1,43 @@
+__all__ = ['connect', 'quote_ident', 'quote_literal', 'DatabaseError',
+ 'Error', 'OperationalError', 'ProgrammingError', 'UndefinedFile']
+
+_legacy = False
+try:
+ from psycopg2 import __version__
+ from . import MIN_PSYCOPG2, parse_version
+ if parse_version(__version__) < MIN_PSYCOPG2:
+ raise ImportError
+ from psycopg2 import connect, Error, DatabaseError, OperationalError, ProgrammingError
+ from psycopg2.errors import UndefinedFile
+ from psycopg2.extensions import adapt
+
+ try:
+ from psycopg2.extensions import quote_ident as _quote_ident
+ except ImportError:
+ _legacy = True
+
+ def quote_literal(value, conn=None):
+ value = adapt(value)
+ if conn:
+ value.prepare(conn)
+ return value.getquoted().decode('utf-8')
+except ImportError:
+ from psycopg import connect as _connect, sql, Error, DatabaseError, OperationalError, ProgrammingError
+ from psycopg.errors import UndefinedFile
+
+ def connect(*args, **kwargs):
+ ret = _connect(*args, **kwargs)
+ ret.server_version = ret.pgconn.server_version # compatibility with psycopg2
+ return ret
+
+ def _quote_ident(value, conn):
+ return sql.Identifier(value).as_string(conn)
+
+ def quote_literal(value, conn=None):
+ return sql.Literal(value).as_string(conn)
+
+
+def quote_ident(value, conn=None):
+ if _legacy or conn is None:
+ return '"{0}"'.format(value.replace('"', '""'))
+ return _quote_ident(value, conn)
diff --git a/patroni/scripts/wale_restore.py b/patroni/scripts/wale_restore.py
index 055fc204..670851c9 100755
--- a/patroni/scripts/wale_restore.py
+++ b/patroni/scripts/wale_restore.py
@@ -27,13 +27,14 @@ import argparse
import csv
import logging
import os
-import psycopg2
import subprocess
import sys
import time
from collections import namedtuple
+from .. import psycopg
+
logger = logging.getLogger(__name__)
RETRY_SLEEP_INTERVAL = 1
@@ -215,7 +216,7 @@ class WALERestore(object):
if self.master_connection:
try:
# get the difference in bytes between the current WAL location and the backup start offset
- with psycopg2.connect(self.master_connection) as con:
+ with psycopg.connect(self.master_connection) as con:
if con.server_version >= 100000:
wal_name = 'wal'
lsn_name = 'lsn'
@@ -233,7 +234,7 @@ class WALERestore(object):
(backup_start_lsn, backup_start_lsn, backup_start_lsn))
diff_in_bytes = int(cur.fetchone()[0])
- except psycopg2.Error:
+ except psycopg.Error:
logger.exception('could not determine difference with the master location')
if attempts_no < self.retries: # retry in case of a temporarily connection issue
attempts_no = attempts_no + 1
diff --git a/setup.py b/setup.py
index 8cc5940a..13bee08e 100644
--- a/setup.py
+++ b/setup.py
@@ -216,13 +216,13 @@ def setup_package(version):
if __name__ == '__main__':
old_modules = sys.modules.copy()
try:
- from patroni import check_psycopg2, fatal, __version__
+ from patroni import check_psycopg, fatal, __version__
finally:
sys.modules.clear()
sys.modules.update(old_modules)
if sys.version_info < (2, 7, 0):
fatal('Patroni needs to be run with Python 2.7+')
- check_psycopg2()
+ check_psycopg()
setup_package(__version__)
diff --git a/tests/__init__.py b/tests/__init__.py
index 4c433862..b4d6002d 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -5,9 +5,10 @@ import unittest
from mock import Mock, patch
-import psycopg2
import urllib3
+import patroni.psycopg as psycopg
+
from patroni.dcs import Leader, Member
from patroni.postgresql import Postgresql
from patroni.postgresql.config import ConfigHandler
@@ -85,9 +86,9 @@ class MockCursor(object):
def execute(self, sql, *params):
if sql.startswith('blabla'):
- raise psycopg2.ProgrammingError()
+ raise psycopg.ProgrammingError()
elif sql == 'CHECKPOINT' or sql.startswith('SELECT pg_catalog.pg_create_'):
- raise psycopg2.OperationalError()
+ raise psycopg.OperationalError()
elif sql.startswith('RetryFailedError'):
raise RetryFailedError('retry')
elif sql.startswith('SELECT catalog_xmin'):
@@ -162,7 +163,7 @@ class MockConnect(object):
pass
-def psycopg2_connect(*args, **kwargs):
+def psycopg_connect(*args, **kwargs):
return MockConnect()
@@ -176,7 +177,7 @@ class PostgresInit(unittest.TestCase):
'force_parallel_mode': '1', 'constraint_exclusion': '',
'max_stack_depth': 'Z', 'vacuum_cost_limit': -1, 'vacuum_cost_delay': 200}
- @patch('psycopg2.connect', psycopg2_connect)
+ @patch('patroni.psycopg.connect', psycopg_connect)
@patch('patroni.postgresql.CallbackExecutor', Mock())
@patch.object(ConfigHandler, 'write_postgresql_conf', Mock())
@patch.object(ConfigHandler, 'replace_pg_hba', Mock())
diff --git a/tests/test_api.py b/tests/test_api.py
index 60c3dd03..1e87140d 100644
--- a/tests/test_api.py
+++ b/tests/test_api.py
@@ -1,9 +1,10 @@
import datetime
import json
-import psycopg2
import unittest
import socket
+import patroni.psycopg as psycopg
+
from mock import Mock, PropertyMock, patch
from patroni.api import RestApiHandler, RestApiServer
from patroni.dcs import ClusterConfig, Member
@@ -11,7 +12,7 @@ from patroni.ha import _MemberStatus
from patroni.utils import tzutc
from six import BytesIO as IO
from six.moves import BaseHTTPServer
-from . import psycopg2_connect, MockCursor
+from . import psycopg_connect, MockCursor
from .test_ha import get_cluster_initialized_without_leader
@@ -35,7 +36,7 @@ class MockPostgresql(object):
@staticmethod
def connection():
- return psycopg2_connect()
+ return psycopg_connect()
@staticmethod
def postmaster_start_time():
@@ -436,9 +437,9 @@ class TestRestApiHandler(unittest.TestCase):
@patch('time.sleep', Mock())
def test_RestApiServer_query(self):
- with patch.object(MockCursor, 'execute', Mock(side_effect=psycopg2.OperationalError)):
+ with patch.object(MockCursor, 'execute', Mock(side_effect=psycopg.OperationalError)):
self.assertIsNotNone(MockRestApiServer(RestApiHandler, 'GET /patroni'))
- with patch.object(MockPostgresql, 'connection', Mock(side_effect=psycopg2.OperationalError)):
+ with patch.object(MockPostgresql, 'connection', Mock(side_effect=psycopg.OperationalError)):
self.assertIsNotNone(MockRestApiServer(RestApiHandler, 'GET /patroni'))
@patch('time.sleep', Mock())
diff --git a/tests/test_bootstrap.py b/tests/test_bootstrap.py
index 29c38127..9bbc8112 100644
--- a/tests/test_bootstrap.py
+++ b/tests/test_bootstrap.py
@@ -8,11 +8,11 @@ from patroni.postgresql.bootstrap import Bootstrap
from patroni.postgresql.cancellable import CancellableSubprocess
from patroni.postgresql.config import ConfigHandler
-from . import psycopg2_connect, BaseTestPostgresql
+from . import psycopg_connect, BaseTestPostgresql
@patch('subprocess.call', Mock(return_value=0))
-@patch('psycopg2.connect', psycopg2_connect)
+@patch('patroni.psycopg.connect', psycopg_connect)
@patch('os.rename', Mock())
class TestBootstrap(BaseTestPostgresql):
@@ -164,6 +164,7 @@ class TestBootstrap(BaseTestPostgresql):
@patch('os.unlink', Mock())
@patch('shutil.copy', Mock())
@patch('os.path.isfile', Mock(return_value=True))
+ @patch('patroni.postgresql.bootstrap.quote_ident', Mock())
@patch.object(Bootstrap, 'call_post_bootstrap', Mock(return_value=True))
@patch.object(Bootstrap, '_custom_bootstrap', Mock(return_value=True))
@patch.object(Postgresql, 'start', Mock(return_value=True))
diff --git a/tests/test_ctl.py b/tests/test_ctl.py
index c03f3a85..8df03dfd 100644
--- a/tests/test_ctl.py
+++ b/tests/test_ctl.py
@@ -9,11 +9,11 @@ from patroni.ctl import ctl, store_config, load_config, output_members, get_dcs,
get_all_members, get_any_member, get_cursor, query_member, configure, PatroniCtlException, apply_config_changes, \
format_config_for_editing, show_diff, invoke_editor, format_pg_version, CONFIG_FILE_PATH
from patroni.dcs.etcd import AbstractEtcdClientWithFailover, Failover
+from patroni.psycopg import OperationalError
from patroni.utils import tzutc
-from psycopg2 import OperationalError
from urllib3 import PoolManager
-from . import MockConnect, MockCursor, MockResponse, psycopg2_connect
+from . import MockConnect, MockCursor, MockResponse, psycopg_connect
from .test_etcd import etcd_read, socket_getaddrinfo
from .test_ha import get_cluster_initialized_without_leader, get_cluster_initialized_with_leader, \
get_cluster_initialized_with_only_leader, get_cluster_not_initialized_without_leader, get_cluster, Member
@@ -48,7 +48,7 @@ class TestCtl(unittest.TestCase):
self.assertRaises(PatroniCtlException, load_config, './non-existing-config-file', None)
self.assertRaises(PatroniCtlException, load_config, './non-existing-config-file', None)
- @patch('psycopg2.connect', psycopg2_connect)
+ @patch('patroni.psycopg.connect', psycopg_connect)
def test_get_cursor(self):
self.assertIsNone(get_cursor(get_cluster_initialized_without_leader(), {}, role='master'))
@@ -57,7 +57,7 @@ class TestCtl(unittest.TestCase):
# MockCursor returns pg_is_in_recovery as false
self.assertIsNone(get_cursor(get_cluster_initialized_with_leader(), {}, role='replica'))
- self.assertIsNotNone(get_cursor(get_cluster_initialized_with_leader(), {'database': 'foo'}, role='any'))
+ self.assertIsNotNone(get_cursor(get_cluster_initialized_with_leader(), {'dbname': 'foo'}, role='any'))
def test_parse_dcs(self):
assert parse_dcs(None) is None
@@ -165,7 +165,7 @@ class TestCtl(unittest.TestCase):
def test_get_dcs(self):
self.assertRaises(PatroniCtlException, get_dcs, {'dummy': {}}, 'dummy')
- @patch('psycopg2.connect', psycopg2_connect)
+ @patch('patroni.psycopg.connect', psycopg_connect)
@patch('patroni.ctl.query_member', Mock(return_value=([['mock column']], None)))
@patch('patroni.ctl.get_dcs')
@patch.object(etcd.Client, 'read', etcd_read)
diff --git a/tests/test_ha.py b/tests/test_ha.py
index fdb348db..90467b20 100644
--- a/tests/test_ha.py
+++ b/tests/test_ha.py
@@ -19,7 +19,7 @@ from patroni.utils import tzutc
from patroni.watchdog import Watchdog
from six.moves import builtins
-from . import PostgresInit, MockPostmaster, psycopg2_connect, requests_get
+from . import PostgresInit, MockPostmaster, psycopg_connect, requests_get
from .test_etcd import socket_getaddrinfo, etcd_read, etcd_write
SYSID = '12345678901'
@@ -323,7 +323,7 @@ class TestHa(PostgresInit):
self.p.controldata = lambda: {'Database cluster state': 'in production', 'Database system identifier': SYSID}
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader because I had the session lock')
- @patch('psycopg2.connect', psycopg2_connect)
+ @patch('patroni.psycopg.connect', psycopg_connect)
def test_acquire_lock_as_master(self):
self.assertEqual(self.ha.run_cycle(), 'acquired session lock as a leader')
@@ -460,6 +460,8 @@ class TestHa(PostgresInit):
self.ha.cluster = get_cluster_not_initialized_without_leader()
self.assertEqual(self.ha.bootstrap(), 'failed to acquire initialize lock')
+ @patch('patroni.psycopg.connect', psycopg_connect)
+ @patch.object(Postgresql, 'connection', Mock(return_value=None))
def test_bootstrap_initialized_new_cluster(self):
self.ha.cluster = get_cluster_not_initialized_without_leader()
self.e.initialize = true
@@ -477,6 +479,8 @@ class TestHa(PostgresInit):
self.p.is_running = false
self.assertRaises(PatroniFatalException, self.ha.post_bootstrap)
+ @patch('patroni.psycopg.connect', psycopg_connect)
+ @patch.object(Postgresql, 'connection', Mock(return_value=None))
def test_bootstrap_release_initialize_key_on_watchdog_failure(self):
self.ha.cluster = get_cluster_not_initialized_without_leader()
self.e.initialize = true
@@ -487,7 +491,7 @@ class TestHa(PostgresInit):
self.assertEqual(self.ha.post_bootstrap(), 'running post_bootstrap')
self.assertRaises(PatroniFatalException, self.ha.post_bootstrap)
- @patch('psycopg2.connect', psycopg2_connect)
+ @patch('patroni.psycopg.connect', psycopg_connect)
def test_reinitialize(self):
self.assertIsNotNone(self.ha.reinitialize())
@@ -1153,7 +1157,7 @@ class TestHa(PostgresInit):
self.ha.is_paused = false
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
- @patch('psycopg2.connect', psycopg2_connect)
+ @patch('patroni.psycopg.connect', psycopg_connect)
def test_permanent_logical_slots_after_promote(self):
config = ClusterConfig(1, {'slots': {'l': {'database': 'postgres', 'plugin': 'test_decoding'}}}, 1)
self.p.name = 'other'
@@ -1185,7 +1189,7 @@ class TestHa(PostgresInit):
self.ha.has_lock = true
self.assertEqual(self.ha.run_cycle(), 'PAUSE: released leader key voluntarily due to the system ID mismatch')
- @patch('psycopg2.connect', psycopg2_connect)
+ @patch('patroni.psycopg.connect', psycopg_connect)
@patch('os.path.exists', Mock(return_value=True))
@patch('shutil.rmtree', Mock())
@patch('os.makedirs', Mock())
diff --git a/tests/test_patroni.py b/tests/test_patroni.py
index 225cf32a..5801085e 100644
--- a/tests/test_patroni.py
+++ b/tests/test_patroni.py
@@ -13,15 +13,23 @@ from patroni.dcs.etcd import AbstractEtcdClientWithFailover
from patroni.exceptions import DCSError
from patroni.postgresql import Postgresql
from patroni.postgresql.config import ConfigHandler
-from patroni import Patroni, main as _main, patroni_main, check_psycopg2
+from patroni import Patroni, main as _main, patroni_main, check_psycopg
from six.moves import BaseHTTPServer, builtins
from threading import Thread
-from . import psycopg2_connect, SleepException
+from . import psycopg_connect, SleepException
from .test_etcd import etcd_read, etcd_write
from .test_postgresql import MockPostmaster
+def mock_import(*args, **kwargs):
+ if args[0] == 'psycopg':
+ raise ImportError
+ ret = Mock()
+ ret.__version__ = '2.5.3.dev1 a b c'
+ return ret
+
+
class MockFrozenImporter(object):
toc = set(['patroni.dcs.etcd'])
@@ -29,7 +37,7 @@ class MockFrozenImporter(object):
@patch('time.sleep', Mock())
@patch('subprocess.call', Mock(return_value=0))
-@patch('psycopg2.connect', psycopg2_connect)
+@patch('patroni.psycopg.connect', psycopg_connect)
@patch.object(ConfigHandler, 'append_pg_hba', Mock())
@patch.object(ConfigHandler, 'write_postgresql_conf', Mock())
@patch.object(ConfigHandler, 'write_recovery_conf', Mock())
@@ -181,8 +189,8 @@ class TestPatroni(unittest.TestCase):
self.p.ha.shutdown = Mock(side_effect=Exception)
self.p.shutdown()
- def test_check_psycopg2(self):
+ def test_check_psycopg(self):
with patch.object(builtins, '__import__', Mock(side_effect=ImportError)):
- self.assertRaises(SystemExit, check_psycopg2)
- with patch('psycopg2.__version__', '2.5.3.dev1 a b c'):
- self.assertRaises(SystemExit, check_psycopg2)
+ self.assertRaises(SystemExit, check_psycopg)
+ with patch.object(builtins, '__import__', mock_import):
+ self.assertRaises(SystemExit, check_psycopg)
diff --git a/tests/test_postgresql.py b/tests/test_postgresql.py
index 937db9c1..f037fd4d 100644
--- a/tests/test_postgresql.py
+++ b/tests/test_postgresql.py
@@ -1,12 +1,14 @@
import datetime
import os
import psutil
-import psycopg2
import re
import subprocess
import time
from mock import Mock, MagicMock, PropertyMock, patch, mock_open
+
+import patroni.psycopg as psycopg
+
from patroni.async_executor import CriticalTask
from patroni.dcs import Cluster, RemoteMember, SyncState
from patroni.exceptions import PostgresConnectionException, PatroniException
@@ -17,7 +19,7 @@ from patroni.utils import RetryFailedError
from six.moves import builtins
from threading import Thread, current_thread
-from . import BaseTestPostgresql, MockCursor, MockPostmaster, psycopg2_connect
+from . import BaseTestPostgresql, MockCursor, MockPostmaster, psycopg_connect
mtime_ret = {}
@@ -87,7 +89,7 @@ Data page checksum version: 0
@patch('subprocess.call', Mock(return_value=0))
-@patch('psycopg2.connect', psycopg2_connect)
+@patch('patroni.psycopg.connect', psycopg_connect)
class TestPostgresql(BaseTestPostgresql):
@patch('subprocess.call', Mock(return_value=0))
@@ -319,7 +321,7 @@ class TestPostgresql(BaseTestPostgresql):
m = RemoteMember('1', {'restore_command': '2', 'primary_slot_name': 'foo', 'conn_kwargs': {'host': 'bar'}})
self.p.follow(m)
- @patch.object(MockCursor, 'execute', Mock(side_effect=psycopg2.OperationalError))
+ @patch.object(MockCursor, 'execute', Mock(side_effect=psycopg.OperationalError))
def test__query(self):
self.assertRaises(PostgresConnectionException, self.p._query, 'blabla')
self.p._state = 'restarting'
@@ -328,7 +330,7 @@ class TestPostgresql(BaseTestPostgresql):
def test_query(self):
self.p.query('select 1')
self.assertRaises(PostgresConnectionException, self.p.query, 'RetryFailedError')
- self.assertRaises(psycopg2.ProgrammingError, self.p.query, 'blabla')
+ self.assertRaises(psycopg.ProgrammingError, self.p.query, 'blabla')
@patch.object(Postgresql, 'pg_isready', Mock(return_value=STATE_REJECT))
def test_is_leader(self):
@@ -430,7 +432,7 @@ class TestPostgresql(BaseTestPostgresql):
@patch.object(Postgresql, 'is_running', Mock(return_value=MockPostmaster()))
def test_is_leader_exception(self):
self.p.start()
- self.p.query = Mock(side_effect=psycopg2.OperationalError("not supported"))
+ self.p.query = Mock(side_effect=psycopg.OperationalError("not supported"))
self.assertTrue(self.p.stop())
@patch('os.rename', Mock())
@@ -559,7 +561,7 @@ class TestPostgresql(BaseTestPostgresql):
t.start()
t.join()
- with patch.object(MockCursor, "execute", side_effect=psycopg2.Error):
+ with patch.object(MockCursor, "execute", side_effect=psycopg.Error):
self.assertIsNone(self.p.postmaster_start_time())
def test_check_for_startup(self):
@@ -722,7 +724,7 @@ class TestPostgresql(BaseTestPostgresql):
self.p.stop(on_safepoint=mock_callback)
mock_postmaster.is_running.side_effect = [True, False, False]
- with patch.object(MockCursor, "execute", Mock(side_effect=psycopg2.Error)):
+ with patch.object(MockCursor, "execute", Mock(side_effect=psycopg.Error)):
self.p.stop(on_safepoint=mock_callback)
def test_terminate_starting_postmaster(self):
diff --git a/tests/test_rewind.py b/tests/test_rewind.py
index c2607e3f..ba890b1b 100644
--- a/tests/test_rewind.py
+++ b/tests/test_rewind.py
@@ -5,7 +5,7 @@ from patroni.postgresql.cancellable import CancellableSubprocess
from patroni.postgresql.rewind import Rewind
from six.moves import builtins
-from . import BaseTestPostgresql, MockCursor, psycopg2_connect
+from . import BaseTestPostgresql, MockCursor, psycopg_connect
class MockThread(object):
@@ -47,7 +47,7 @@ def mock_single_user_mode(self, communicate, options):
@patch('subprocess.call', Mock(return_value=0))
-@patch('psycopg2.connect', psycopg2_connect)
+@patch('patroni.psycopg.connect', psycopg_connect)
class TestRewind(BaseTestPostgresql):
def setUp(self):
@@ -143,7 +143,7 @@ class TestRewind(BaseTestPostgresql):
mock_check_leader_is_not_in_recovery.return_value = True
self.assertFalse(self.r.rewind_or_reinitialize_needed_and_possible(self.leader))
self.r.trigger_check_diverged_lsn()
- with patch('psycopg2.connect', Mock(side_effect=Exception)):
+ with patch('patroni.psycopg.connect', Mock(side_effect=Exception)):
self.assertFalse(self.r.rewind_or_reinitialize_needed_and_possible(self.leader))
self.r.trigger_check_diverged_lsn()
with patch.object(MockCursor, 'fetchone', Mock(side_effect=[('', 3, '0/0'), ('', b'3\t0/40159C0\tn\n')])):
diff --git a/tests/test_slots.py b/tests/test_slots.py
index 8c281ce7..9eb1cdec 100644
--- a/tests/test_slots.py
+++ b/tests/test_slots.py
@@ -1,20 +1,20 @@
import mock
import os
-import psycopg2
import unittest
from mock import Mock, PropertyMock, patch
+from patroni import psycopg
from patroni.dcs import Cluster, ClusterConfig, Member
from patroni.postgresql import Postgresql
from patroni.postgresql.slots import SlotsHandler, fsync_dir
-from . import BaseTestPostgresql, psycopg2_connect, MockCursor
+from . import BaseTestPostgresql, psycopg_connect, MockCursor
@patch('subprocess.call', Mock(return_value=0))
-@patch('psycopg2.connect', psycopg2_connect)
+@patch('patroni.psycopg.connect', psycopg_connect)
@patch.object(Postgresql, 'is_running', Mock(return_value=True))
class TestSlotsHandler(BaseTestPostgresql):
@@ -34,7 +34,7 @@ class TestSlotsHandler(BaseTestPostgresql):
'ignore_slots': [{'name': 'blabla'}]}, 1)
cluster = Cluster(True, config, self.leader, 0,
[self.me, self.other, self.leadermem], None, None, None, {'test_3': 10})
- with mock.patch('patroni.postgresql.Postgresql._query', Mock(side_effect=psycopg2.OperationalError)):
+ with mock.patch('patroni.postgresql.Postgresql._query', Mock(side_effect=psycopg.OperationalError)):
self.s.sync_replication_slots(cluster, False)
self.p.set_role('standby_leader')
self.s.sync_replication_slots(cluster, False)
@@ -86,14 +86,14 @@ class TestSlotsHandler(BaseTestPostgresql):
[self.me, self.other, self.leadermem], None, None, None, {'ls': 12346})
self.assertEqual(self.s.sync_replication_slots(cluster, False), [])
self.s._schedule_load_slots = False
- with patch.object(MockCursor, 'execute', Mock(side_effect=psycopg2.errors.UndefinedFile)):
+ with patch.object(MockCursor, 'execute', Mock(side_effect=psycopg.UndefinedFile)):
self.assertEqual(self.s.sync_replication_slots(cluster, False), ['ls'])
cluster.slots['ls'] = 'a'
self.assertEqual(self.s.sync_replication_slots(cluster, False), [])
with patch.object(MockCursor, 'rowcount', PropertyMock(return_value=1), create=True):
self.assertEqual(self.s.sync_replication_slots(cluster, False), ['ls'])
- @patch.object(MockCursor, 'execute', Mock(side_effect=psycopg2.OperationalError))
+ @patch.object(MockCursor, 'execute', Mock(side_effect=psycopg.OperationalError))
def test_copy_logical_slots(self):
self.s.copy_logical_slots(self.leader, ['foo'])
diff --git a/tests/test_wale_restore.py b/tests/test_wale_restore.py
index 0b450b33..fc5b4e08 100644
--- a/tests/test_wale_restore.py
+++ b/tests/test_wale_restore.py
@@ -1,14 +1,15 @@
-import psycopg2
import subprocess
import unittest
+import patroni.psycopg as psycopg
+
from mock import Mock, PropertyMock, patch, mock_open
from patroni.scripts import wale_restore
from patroni.scripts.wale_restore import WALERestore, main as _main, get_major_version
from six.moves import builtins
from threading import current_thread
-from . import MockConnect, psycopg2_connect
+from . import MockConnect, psycopg_connect
wale_output_header = (
b'name\tlast_modified\t'
@@ -34,7 +35,7 @@ WALE_TEST_RETRIES = 2
@patch('os.makedirs', Mock(return_value=True))
@patch('os.path.exists', Mock(return_value=True))
@patch('os.path.isdir', Mock(return_value=True))
-@patch('psycopg2.connect', psycopg2_connect)
+@patch('patroni.psycopg.connect', psycopg_connect)
@patch('subprocess.check_output', Mock(return_value=wale_output))
class TestWALERestore(unittest.TestCase):
@@ -57,7 +58,7 @@ class TestWALERestore(unittest.TestCase):
with patch('subprocess.check_output', Mock(return_value=wale_output.replace(b'167772160', b'1'))):
self.assertFalse(self.wale_restore.should_use_s3_to_create_replica())
- with patch('psycopg2.connect', Mock(side_effect=psycopg2.Error("foo"))):
+ with patch('patroni.psycopg.connect', Mock(side_effect=psycopg.Error("foo"))):
save_no_master = self.wale_restore.no_master
save_master_connection = self.wale_restore.master_connection