Permanent replication slots (#819)

Permanent replication slots are preserved on failover/switchover, that is Patroni on the new primary will create configured replication slots right after doing promote.

Slots could be configured with the help of `patronictl edit-config`.
The initial configuration could be also done in the `bootstrap.dcs`

```yaml
slots:
  permanent_physical_1:
    type: physical
  permanent_logical_1:
    type: logical
    database: foo
    plugin: pgoutput
```

It is the responsibility of the operator to make sure that there are no clashes in names between replication slots automatically created by Patroni for members and permanent replication slots.

Closes https://github.com/zalando/patroni/issues/656
This commit is contained in:
Alexander Kukushkin
2018-10-31 11:37:42 +01:00
committed by GitHub
parent c65c4f1ffe
commit 2efd97baab
12 changed files with 267 additions and 109 deletions

View File

@@ -33,6 +33,11 @@ Bootstrap configuration
- **restore\_command**: command to restore WAL records from the remote master to standby leader, can be different from the list defined in :ref:`postgresql_settings`
- **archive\_cleanup\_command**: cleanup command for standby leader
- **recovery\_min\_apply\_delay**: how long to wait before actually apply WAL records on a standby leader
- **slots**: define permanent replication slots. These slots will be preserved during switchover/failover. Patroni will try to create slots before opening connections to the cluster.
- **my_slot_name**: the name of replication slot. It is the responsibility of the operator to make sure that there are no clashes in names between replication slots automatically created by Patroni for members and permanent replication slots.
- **type**: slot type. Could be ``physical`` or ``logical``. If the slot is logical, you have to additionally define ``database`` and ``plugin``.
**database**: the database name where logical slots should be created.
**plugin**: the plugin name for the logical slot.
- **method**: custom script to use for bootstrapping this cluster.
See :ref:`custom bootstrap methods documentation <custom_bootstrap>` for details.
When ``initdb`` is specified revert to the default ``initdb`` command. ``initdb`` is also triggered when no ``method``

5
features/callback.sh Executable file
View File

@@ -0,0 +1,5 @@
#!/bin/bash
[[ "$3" == "master" ]] || exit
PGPASSWORD=zalando psql -h localhost -U postgres -p $1 -w -tAc "SELECT slot_name FROM pg_replication_slots WHERE slot_type = 'logical'" >> data/postgres0/label

View File

@@ -182,6 +182,9 @@ class PatroniController(AbstractController):
dst[k] = v
recursive_update(config, custom_config)
if config['postgresql'].get('callbacks', {}).get('on_role_change'):
config['postgresql']['callbacks']['on_role_change'] += ' ' + str(self.__PORT)
with open(patroni_config_path, 'w') as f:
yaml.safe_dump(config, f, default_flow_style=False)

View File

@@ -1,11 +1,25 @@
Feature: standby cluster
Scenario: check permanent logical slots are preserved on failover/switchover
Given I start postgres1
Then postgres1 is a leader after 10 seconds
When I issue a PATCH request to http://127.0.0.1:8009/config with {"slots": {"test_logical": {"type": "logical", "database": "postgres", "plugin": "test_decoding"}}}
Then I receive a response code 200
When I issue a PATCH request to http://127.0.0.1:8009/config with {"slots": {"pm_1": {"type": "physical"}}, "postgresql": {"parameters": {"wal_level": "logical"}}}
Then I receive a response code 200
When I start postgres0 with callback configured
Then "members/postgres0" key in DCS has state=running after 10 seconds
When I shut down postgres1
Then postgres0 is a leader after 10 seconds
And I sleep for 2 seconds
When I issue a GET request to http://127.0.0.1:8008/
Then I receive a response code 200
And there is a label with "test_logical" in postgres0 data directory
Scenario: check replication of a single table in a standby cluster
Given I start postgres0 without slots sync
And I create a replication slot postgres1 on postgres0
And I start postgres1 in a standby cluster batman1 as a clone of postgres0
Given I start postgres1 in a standby cluster batman1 as a clone of postgres0
Then postgres1 is a leader of batman1 after 10 seconds
When I add the table foo to postgres0
When I issue a PATCH request to http://127.0.0.1:8009/config with {"ttl": 20, "loop_wait": 2}
And I add the table foo to postgres0
Then table foo is present on postgres1 after 20 seconds
When I start postgres2 in a cluster batman1
Then postgres2 role is the replica after 24 seconds
@@ -13,4 +27,5 @@ Feature: standby cluster
Scenario: check failover
When I kill postgres1
Then postgres2 is replicating from postgres0 after 20 seconds
And I kill postmaster on postgres1
Then postgres2 is replicating from postgres0 after 20 seconds

View File

@@ -1,3 +1,4 @@
import os
import time
from behave import step
@@ -8,19 +9,13 @@ SELECT * FROM pg_catalog.pg_stat_replication
WHERE application_name = '{0}'
"""
create_replication_slot_query = """
SELECT pg_create_physical_replication_slot('{0}')
"""
@step('I start {name:w} without slots sync')
def start_patroni_without_slots_sync(context, name):
@step('I start {name:w} with callback configured')
def start_patroni_with_callbacks(context, name):
return context.pctl.start(name, custom_config={
"bootstrap": {
"dcs": {
"postgresql": {
"use_slots": False
}
"postgresql": {
"callbacks": {
"on_role_change": "features/callback.sh"
}
}
})
@@ -35,6 +30,10 @@ def start_patroni(context, name, cluster_name):
@step('I start {name:w} in a standby cluster {cluster_name:w} as a clone of {name2:w}')
def start_patroni_stanby_cluster(context, name, cluster_name, name2):
ctl = context.pctl._processes.pop(name, None)
# we need to remove patroni.dynamic.json in order to "bootstrap" standby cluster with existing PGDATA
if ctl:
os.unlink(os.path.join(ctl._data_dir, 'patroni.dynamic.json'))
port = context.pctl._processes[name2]._connkwargs.get('port')
return context.pctl.start(name, custom_config={
"scope": cluster_name,
@@ -43,7 +42,7 @@ def start_patroni_stanby_cluster(context, name, cluster_name, name2):
"standby_cluster": {
"host": "localhost",
"port": port,
"primary_slot_name": "postgres1",
"primary_slot_name": "pm_1",
}
}
}
@@ -67,12 +66,3 @@ def check_replication_status(context, pg_name1, pg_name2, timeout):
time.sleep(1)
return False
@step('I create a replication slot {slot_name:w} on {pg_name:w}')
def create_replication_slot(context, slot_name, pg_name):
return context.pctl.query(
pg_name,
create_replication_slot_query.format(slot_name),
fail_ok=True
)

View File

@@ -10,16 +10,34 @@ import re
import six
import sys
from collections import namedtuple
from collections import defaultdict, namedtuple
from copy import deepcopy
from patroni.exceptions import PatroniException
from patroni.utils import parse_bool
from random import randint
from six.moves.urllib_parse import urlparse, urlunparse, parse_qsl
from threading import Event, Lock
slot_name_re = re.compile('^[a-z0-9_]{1,63}$')
logger = logging.getLogger(__name__)
def slot_name_from_member_name(member_name):
"""Translate member name to valid PostgreSQL slot name.
PostgreSQL replication slot names must be valid PostgreSQL names. This function maps the wider space of
member names to valid PostgreSQL names. Names are lowercased, dashes and periods common in hostnames
are replaced with underscores, other characters are encoded as their unicode codepoint. Name is truncated
to 64 characters. Multiple different member names may map to a single slot name."""
def replace_char(match):
c = match.group(0)
return '_' if c in '-.' else "u{:04d}".format(ord(c))
slot_name = re.sub('[^a-z0-9_]', replace_char, member_name.lower())
return slot_name[0:63]
def parse_connection_string(value):
"""Original Governor stores connection strings for each cluster members if a following format:
postgres://{username}:{password}@{connect_address}/postgres
@@ -274,14 +292,24 @@ class ClusterConfig(namedtuple('ClusterConfig', 'index,data,modify_index')):
def from_node(index, data, modify_index=None):
"""
>>> ClusterConfig.from_node(1, '{') is None
True
False
"""
try:
data = json.loads(data)
except (TypeError, ValueError):
return None
return ClusterConfig(index, data, modify_index or index)
data = None
modify_index = 0
if not isinstance(data, dict):
data = {}
return ClusterConfig(index, data, index if modify_index is None else modify_index)
@property
def permanent_slots(self):
return isinstance(self.data, dict) and (
self.data.get('permanent_replication_slots') or
self.data.get('permanent_slots') or self.data.get('slots')
) or {}
class SyncState(namedtuple('SyncState', 'index,leader,sync_standby')):
@@ -397,6 +425,65 @@ class Cluster(namedtuple('Cluster', 'initialize,config,leader,last_leader_operat
def is_synchronous_mode(self):
return self.check_mode('synchronous_mode')
def get_replication_slots(self, name, role):
# if the replicatefrom tag is set on the member - we should not create the replication slot for it on
# the current master, because that member would replicate from elsewhere. We still create the slot if
# the replicatefrom destination member is currently not a member of the cluster (fallback to the
# master), or if replicatefrom destination member happens to be the current master
if role in ('master', 'standby_leader'):
slot_members = [m.name for m in self.members if m.name != name and
(m.replicatefrom is None or m.replicatefrom == name or
not self.has_member(m.replicatefrom))]
permanent_slots = (self.config and self.config.permanent_slots or {}).copy()
else:
# only manage slots for replicas that replicate from this one, except for the leader among them
slot_members = [m.name for m in self.members if m.replicatefrom == name and m.name != self.leader.name]
permanent_slots = {}
slots = {slot_name_from_member_name(name): {'type': 'physical'} for name in slot_members}
if len(slots) < len(slot_members):
# Find which names are conflicting for a nicer error message
slot_conflicts = defaultdict(list)
for name in slot_members:
slot_conflicts[slot_name_from_member_name(name)].append(name)
logger.error("Following cluster members share a replication slot name: %s",
"; ".join("{} map to {}".format(", ".join(v), k)
for k, v in slot_conflicts.items() if len(v) > 1))
# "merge" replication slots for members with permanent_replication_slots
for name, value in permanent_slots.items():
if not slot_name_re.match(name):
logger.error("Invalid permanent replication slot name '%s'", name)
logger.error("Slot name may only contain lower case letters, numbers, and the underscore chars")
continue
if name in slots:
logger.error("Permanent replication slot {'%s': %s} is conflicting with" +
" physical replication slot for cluster member", name, value)
continue
value = deepcopy(value)
if not value:
value = {'type': 'physical'}
if isinstance(value, dict):
if 'type' not in value:
value['type'] = 'logical' if value.get('database') and value.get('plugin') else 'physical'
if value['type'] == 'physical' or value['type'] == 'logical' \
and value.get('database') and value.get('plugin'):
slots[name] = value
continue
logger.error("Bad value for slot '%s' in permanent_slots: %s", name, permanent_slots[name])
return slots
def has_permanent_logical_slots(self, name):
slots = self.get_replication_slots(name, 'master').values()
return any(v for v in slots if v.get("type") == "logical")
@six.add_metaclass(abc.ABCMeta)
class AbstractDCS(object):
@@ -533,7 +620,7 @@ class AbstractDCS(object):
You have to use CAS (Compare And Swap) operation in order to update leader key,
for example for etcd `prevValue` parameter must be used."""
def update_leader(self, last_operation):
def update_leader(self, last_operation, access_is_restricted=False):
"""Update leader key (or session) ttl and optime/leader
:param last_operation: absolute xlog location in bytes

View File

@@ -152,7 +152,8 @@ class Kubernetes(AbstractDCS):
# get global dynamic configuration
config = ClusterConfig.from_node(metadata and metadata.resource_version,
annotations.get(self._CONFIG) or '{}')
annotations.get(self._CONFIG) or '{}',
metadata.resource_version if self._CONFIG in annotations else 0)
# get timeline history
history = TimelineHistory.from_node(metadata and metadata.resource_version,
@@ -279,7 +280,7 @@ class Kubernetes(AbstractDCS):
def _update_leader(self):
"""Unused"""
def update_leader(self, last_operation):
def update_leader(self, last_operation, access_is_restricted=False):
now = datetime.datetime.now(tzutc).isoformat()
annotations = {self._LEADER: self._name, 'ttl': str(self._ttl), 'renewTime': now,
'acquireTime': self._leader_observed_record.get('acquireTime') or now,
@@ -287,7 +288,11 @@ class Kubernetes(AbstractDCS):
if last_operation:
annotations[self._OPTIME] = last_operation
ret = self.patch_or_create(self.leader_path, annotations, self._leader_resource_version, subsets=self.__subsets)
subsets = self.__subsets
if subsets is not None and access_is_restricted:
subsets = []
ret = self.patch_or_create(self.leader_path, annotations, self._leader_resource_version, subsets=subsets)
if ret:
self._leader_resource_version = ret.metadata.resource_version
return ret
@@ -307,7 +312,8 @@ class Kubernetes(AbstractDCS):
else:
annotations['acquireTime'] = self._leader_observed_record.get('acquireTime') or now
annotations['transitions'] = str(transitions)
ret = self.patch_or_create(self.leader_path, annotations, self._leader_resource_version, subsets=self.__subsets)
subsets = [] if self.__subsets else None
ret = self.patch_or_create(self.leader_path, annotations, self._leader_resource_version, subsets=subsets)
if ret:
self._leader_resource_version = ret.metadata.resource_version
else:

View File

@@ -12,7 +12,7 @@ from collections import namedtuple
from multiprocessing.pool import ThreadPool
from patroni.async_executor import AsyncExecutor, CriticalTask
from patroni.exceptions import DCSError, PostgresConnectionException, PatroniException
from patroni.postgresql import ACTION_ON_START
from patroni.postgresql import ACTION_ON_START, ACTION_ON_ROLE_CHANGE
from patroni.utils import polling_loop, tzutc
from patroni.dcs import RemoteMember
from threading import RLock
@@ -61,6 +61,7 @@ class Ha(object):
self.old_cluster = None
self._is_leader = False
self._is_leader_lock = RLock()
self._leader_access_is_restricted = False
self._was_paused = False
self._leader_timeline = None
self.recovering = False
@@ -105,12 +106,16 @@ class Ha(object):
def is_leader(self):
with self._is_leader_lock:
return self._is_leader
return self._is_leader and not self._leader_access_is_restricted
def set_is_leader(self, value):
with self._is_leader_lock:
self._is_leader = value
def set_leader_access_is_restricted(self, value):
with self._is_leader_lock:
self._leader_access_is_restricted = value
def load_cluster_from_dcs(self):
cluster = self.dcs.get_cluster()
@@ -125,6 +130,7 @@ class Ha(object):
self._leader_timeline = None if cluster.is_unlocked() else cluster.leader.timeline
def acquire_lock(self):
self.set_leader_access_is_restricted(self.cluster.has_permanent_logical_slots(self.state_handler.name))
ret = self.dcs.attempt_to_acquire_leader()
self.set_is_leader(ret)
return ret
@@ -136,7 +142,7 @@ class Ha(object):
last_operation = self.state_handler.last_operation()
except Exception:
logger.exception('Exception when called state_handler.last_operation()')
ret = self.dcs.update_leader(last_operation)
ret = self.dcs.update_leader(last_operation, self._leader_access_is_restricted)
self.set_is_leader(ret)
if ret:
self.watchdog.keepalive()
@@ -163,6 +169,10 @@ class Ha(object):
'state': self.state_handler.state,
'role': self.state_handler.role
}
# following two lines are mainly necessary for consul, to avoid creation of master service
if data['role'] == 'master' and not self.is_leader():
data['role'] = 'promoted'
tags = self.get_effective_tags()
if tags:
data['tags'] = tags
@@ -499,8 +509,10 @@ class Ha(object):
return 'Postponing promotion because synchronous replication state was updated by somebody else'
self.state_handler.set_synchronous_standby('*' if self.is_synchronous_mode_strict() else None)
if self.state_handler.role != 'master':
self.set_leader_access_is_restricted(self.cluster.has_permanent_logical_slots(self.state_handler.name))
self._async_executor.schedule('promote')
self._async_executor.run_async(self.state_handler.promote, args=(self.dcs.loop_wait,))
self._async_executor.run_async(self.state_handler.promote,
args=(self.dcs.loop_wait, self._leader_access_is_restricted))
return promote_message
@staticmethod
@@ -837,6 +849,11 @@ class Ha(object):
self.dcs.reset_cluster()
return 'removed leader lock because postgres is not running as master'
if self.state_handler.is_leader() and self._leader_access_is_restricted:
self.state_handler.sync_replication_slots(self.cluster)
self.state_handler.call_nowait(ACTION_ON_ROLE_CHANGE)
self.set_leader_access_is_restricted(False)
if self.update_lock(True):
msg = self.process_manual_failover_from_leader()
if msg is not None:
@@ -1076,6 +1093,7 @@ class Ha(object):
if not self.watchdog.activate():
logger.error('Cancelling bootstrap because watchdog activation failed')
self.cancel_initialization()
self.state_handler.sync_replication_slots(self.cluster)
self.dcs.take_leader()
self.set_is_leader(True)
self.state_handler.call_nowait(ACTION_ON_START)
@@ -1234,11 +1252,11 @@ class Ha(object):
# stops PostgreSQL, therefore, we only reload replication slots if no
# asynchronous processes are running (should be always the case for the master)
if not self._async_executor.busy and not self.state_handler.is_starting():
self.state_handler.sync_replication_slots(self.cluster)
if not self.state_handler.cb_called:
if not self.state_handler.is_leader():
self.state_handler.trigger_check_diverged_lsn()
self.state_handler.call_nowait(ACTION_ON_START)
self.state_handler.sync_replication_slots(self.cluster)
except DCSError:
dcs_failed = True
logger.error('Error communicating with DCS')

View File

@@ -15,7 +15,7 @@ from patroni.callback_executor import CallbackExecutor
from patroni.exceptions import PostgresConnectionException, PostgresException
from patroni.utils import compare_values, parse_bool, parse_int, Retry, RetryFailedError, polling_loop, split_host_port
from patroni.postmaster import PostmasterProcess
from patroni.dcs import RemoteMember
from patroni.dcs import slot_name_from_member_name, RemoteMember
from requests.structures import CaseInsensitiveDict
from six import string_types
from six.moves.urllib.parse import quote_plus
@@ -52,22 +52,6 @@ def quote_ident(value):
return value if sync_standby_name_re.match(value) else '"' + value + '"'
def slot_name_from_member_name(member_name):
"""Translate member name to valid PostgreSQL slot name.
PostgreSQL replication slot names must be valid PostgreSQL names. This function maps the wider space of
member names to valid PostgreSQL names. Names are lowercased, dashes and periods common in hostnames
are replaced with underscores, other characters are encoded as their unicode codepoint. Name is truncated
to 64 characters. Multiple different member names may map to a single slot name."""
def replace_char(match):
c = match.group(0)
return '_' if c in '-.' else "u{:04d}".format(ord(c))
slot_name = re.sub('[^a-z0-9_]', replace_char, member_name.lower())
return slot_name[0:63]
@contextmanager
def null_context():
yield
@@ -154,7 +138,7 @@ class Postgresql(object):
self._connection = None
self._cursor_holder = None
self._sysid = None
self._replication_slots = [] # list of already existing replication slots
self._replication_slots = {} # already existing replication slots
self.retry = Retry(max_tries=-1, deadline=config['retry_timeout']/2.0, max_delay=1,
retry_exceptions=PostgresConnectionException)
@@ -1259,8 +1243,9 @@ class Postgresql(object):
yield cur
@contextmanager
def _get_replication_connection_cursor(self, host='localhost', port=5432, **kwargs):
with self._get_connection_cursor(host=host, port=int(port), database=self._database, replication=1,
def _get_replication_connection_cursor(self, host='localhost', port=5432, database=None, **kwargs):
database = database or self._database
with self._get_connection_cursor(host=host, port=int(port), database=database, replication='database',
user=self._replication['username'], password=self._replication['password'],
connect_timeout=3, options='-c statement_timeout=2000') as cur:
yield cur
@@ -1511,7 +1496,7 @@ class Postgresql(object):
if data.get('Database cluster state') == 'in production':
return True
def promote(self, wait_seconds):
def promote(self, wait_seconds, access_is_restricted=False):
if self.role == 'master':
return True
ret = self.pg_ctl('promote', '-W')
@@ -1519,7 +1504,8 @@ class Postgresql(object):
self.set_role('master')
logger.info("cleared rewind state after becoming the leader")
self._rewind_state = REWIND_STATUS.INITIAL
self.call_nowait(ACTION_ON_ROLE_CHANGE)
if not access_is_restricted:
self.call_nowait(ACTION_ON_ROLE_CHANGE)
ret = self._wait_promote(wait_seconds)
return ret
@@ -1552,8 +1538,14 @@ $$""".format(name, ' '.join(options)), name, password, password)
def load_replication_slots(self):
if self.use_slots and self._schedule_load_slots:
cursor = self._query("SELECT slot_name FROM pg_replication_slots WHERE slot_type='physical'")
self._replication_slots = [r[0] for r in cursor]
replication_slots = {}
cursor = self._query('SELECT slot_name, slot_type, plugin, database FROM pg_replication_slots')
for r in cursor:
value = {'type': r[1]}
if r[1] == 'logical':
value.update({'plugin': r[2], 'database': r[3]})
replication_slots[r[0]] = value
self._replication_slots = replication_slots
self._schedule_load_slots = False
def postmaster_start_time(self):
@@ -1563,50 +1555,70 @@ $$""".format(name, ' '.join(options)), name, password, password)
except psycopg2.Error:
return None
def drop_replication_slot(self, name):
cursor = self._query(('SELECT pg_drop_replication_slot(%s) WHERE EXISTS (SELECT 1 ' +
'FROM pg_replication_slots WHERE slot_name = %s AND NOT active)'), name, name)
# In normal situation rowcount should be 1, otherwise either slot doesn't exists or it is still active
return cursor.rowcount == 1
@staticmethod
def compare_slots(s1, s2):
return s1['type'] == s2['type'] and\
(s1['type'] == 'physical' or s1['database'] == s2['database'] and s1['plugin'] == s2['plugin'])
def sync_replication_slots(self, cluster):
if self.use_slots:
try:
self.load_replication_slots()
# if the replicatefrom tag is set on the member - we should not create the replication slot for it on
# the current master, because that member would replicate from elsewhere. We still create the slot if
# the replicatefrom destination member is currently not a member of the cluster (fallback to the
# master), or if replicatefrom destination member happens to be the current master
if self.role in ('master', 'standby_leader'):
slot_members = [m.name for m in cluster.members if m.name != self.name and
(m.replicatefrom is None or m.replicatefrom == self.name or
not cluster.has_member(m.replicatefrom))]
else:
# only manage slots for replicas that replicate from this one, except for the leader among them
slot_members = [m.name for m in cluster.members if m.replicatefrom == self.name and
m.name != cluster.leader.name]
slots = set(slot_name_from_member_name(name) for name in slot_members)
if len(slots) < len(slot_members):
# Find which names are conflicting for a nicer error message
slot_conflicts = defaultdict(list)
for name in slot_members:
slot_conflicts[slot_name_from_member_name(name)].append(name)
logger.error("Following cluster members share a replication slot name: %s",
"; ".join("{} map to {}".format(", ".join(v), k)
for k, v in slot_conflicts.items() if len(v) > 1))
slots = cluster.get_replication_slots(self.name, self.role)
# drop unused slots
for slot in set(self._replication_slots) - slots:
cursor = self._query("""SELECT pg_drop_replication_slot(%s)
WHERE EXISTS(SELECT 1 FROM pg_replication_slots
WHERE slot_name = %s AND NOT active)""", slot, slot)
if cursor.rowcount != 1: # Either slot doesn't exists or it is still active
self._schedule_load_slots = True # schedule load_replication_slots on the next iteration
# drop old replication slots which are not presented in desired slots
for name in set(self._replication_slots) - set(slots):
if not self.drop_replication_slot(name):
logger.error("Failed to drop replication slot '%s'", name)
self._schedule_load_slots = True
immediately_reserve = ', true' if self._major_version >= 90600 else ''
# create new slots
for slot in slots - set(self._replication_slots):
self._query("""SELECT pg_create_physical_replication_slot(%s{0})
WHERE NOT EXISTS (SELECT 1 FROM pg_replication_slots
WHERE slot_name = %s)""".format(immediately_reserve), slot, slot)
logical_slots = defaultdict(dict)
for name, value in slots.items():
if name in self._replication_slots and not self.compare_slots(value, self._replication_slots[name]):
logger.info("Trying to drop replication slot '%s' because value is changing from %s to %s",
name, self._replication_slots[name], value)
if not self.drop_replication_slot(name):
logger.error("Failed to drop replication slot '%s'", name)
self._schedule_load_slots = True
continue
self._replication_slots.pop(name)
if name not in self._replication_slots:
if value['type'] == 'physical':
try:
self._query(("SELECT pg_create_physical_replication_slot(%s{0})" +
" WHERE NOT EXISTS (SELECT 1 FROM pg_replication_slots" +
" WHERE slot_type = 'physical' AND slot_name = %s)").format(
immediately_reserve), name, name)
except Exception:
logger.exception("Failed to create physical replication slot '%s'", name)
self._schedule_load_slots = True
elif value['type'] == 'logical' and name not in self._replication_slots:
logical_slots[value['database']][name] = value
# create new logical slots
for database, values in logical_slots.items():
conn_kwargs = self._local_connect_kwargs
conn_kwargs['database'] = database
with self._get_connection_cursor(**conn_kwargs) as cur:
for name, value in values.items():
try:
cur.execute("SELECT pg_create_logical_replication_slot(%s, %s)" +
" WHERE NOT EXISTS (SELECT 1 FROM pg_replication_slots" +
" WHERE slot_type = 'logical' AND slot_name = %s)",
(name, value['plugin'], name))
except Exception:
logger.exception("Failed to create logical replication slot '%s' plugin='%s'",
name, value['plugin'])
self._schedule_load_slots = True
self._replication_slots = slots
except Exception:
logger.exception('Exception when changing replication slots')

View File

@@ -146,7 +146,6 @@ def run_async(self, func, args=()):
@patch.object(Postgresql, 'write_recovery_conf', Mock())
@patch.object(Postgresql, 'query', Mock())
@patch.object(Postgresql, 'checkpoint', Mock())
@patch.object(Postgresql, 'call_nowait', Mock())
@patch.object(Postgresql, 'cancellable_subprocess_call', Mock(return_value=0))
@patch.object(etcd.Client, 'write', etcd_write)
@patch.object(etcd.Client, 'read', etcd_read)
@@ -1001,3 +1000,12 @@ class TestHa(unittest.TestCase):
self.assertEqual(self.ha.run_cycle(), 'PAUSE: no action. i am the leader with the lock')
self.ha.is_paused = false
self.assertEqual(self.ha.run_cycle(), 'no action. i am the leader with the lock')
@patch('psycopg2.connect', psycopg2_connect)
def test_permanent_logical_slots_after_promote(self):
config = ClusterConfig(1, {'slots': {'l': {'database': 'postgres', 'plugin': 'test_decoding'}}}, 1)
self.ha.cluster = get_cluster_initialized_without_leader(cluster_config=config)
self.assertEqual(self.ha.run_cycle(), 'acquired session lock as a leader')
self.ha.cluster = get_cluster_initialized_without_leader(leader=True, cluster_config=config)
self.ha.has_lock = true
self.assertEqual(self.ha.run_cycle(), 'no action. i am the leader with the lock')

View File

@@ -50,6 +50,13 @@ class TestKubernetes(unittest.TestCase):
'labels': {'f': 'b'}, 'use_endpoints': True, 'pod_ip': '10.0.0.0'})
self.assertIsNotNone(k.update_leader('123'))
@patch('kubernetes.config.load_kube_config', Mock())
@patch.object(k8s_client.CoreV1Api, 'create_namespaced_endpoints', Mock())
def test_update_leader_with_restricted_access(self):
k = Kubernetes({'ttl': 30, 'scope': 'test', 'name': 'p-0', 'retry_timeout': 10,
'labels': {'f': 'b'}, 'use_endpoints': True, 'pod_ip': '10.0.0.0'})
self.assertIsNotNone(k.update_leader('123', True))
def test_take_leader(self):
self.k.take_leader()
self.k._leader_observed_record['leader'] = 'test'

View File

@@ -8,7 +8,7 @@ import unittest
from mock import Mock, MagicMock, PropertyMock, patch, mock_open
from patroni.async_executor import CriticalTask
from patroni.dcs import Cluster, Leader, Member, RemoteMember, SyncState
from patroni.dcs import Cluster, ClusterConfig, Leader, Member, RemoteMember, SyncState
from patroni.exceptions import PostgresConnectionException, PostgresException
from patroni.postgresql import Postgresql, STATE_REJECT, STATE_NO_RESPONSE
from patroni.postmaster import PostmasterProcess
@@ -28,12 +28,12 @@ class MockCursor(object):
def execute(self, sql, *params):
if sql.startswith('blabla'):
raise psycopg2.ProgrammingError()
elif sql == 'CHECKPOINT':
elif sql == 'CHECKPOINT' or sql.startswith('SELECT pg_create_'):
raise psycopg2.OperationalError()
elif sql.startswith('RetryFailedError'):
raise RetryFailedError('retry')
elif sql.startswith('SELECT slot_name'):
self.results = [('blabla',), ('foobar',)]
self.results = [('blabla', 'physical'), ('foobar', 'physical'), ('ls', 'logical', 'a', 'b')]
elif sql.startswith('SELECT CASE WHEN pg_is_in_recovery()'):
self.results = [(1, 2)]
elif sql.startswith('SELECT pg_is_in_recovery()'):
@@ -498,23 +498,25 @@ class TestPostgresql(unittest.TestCase):
@patch.object(Postgresql, 'is_running', Mock(return_value=True))
def test_sync_replication_slots(self):
self.p.start()
cluster = Cluster(True, None, self.leader, 0, [self.me, self.other, self.leadermem], None, None, None)
config = ClusterConfig(1, {'slots': {'ls': {'database': 'a', 'plugin': 'b'},
'A': 0, 'test_3': 0, 'b': {'type': 'logical', 'plugin': '1'}}}, 1)
cluster = Cluster(True, config, self.leader, 0, [self.me, self.other, self.leadermem], None, None, None)
with mock.patch('patroni.postgresql.Postgresql._query', Mock(side_effect=psycopg2.OperationalError)):
self.p.sync_replication_slots(cluster)
self.p.sync_replication_slots(cluster)
with mock.patch('patroni.postgresql.Postgresql.role', new_callable=PropertyMock(return_value='replica')):
self.p.sync_replication_slots(cluster)
with mock.patch('patroni.postgresql.logger.error', new_callable=Mock()) as errorlog_mock:
with patch.object(Postgresql, 'drop_replication_slot', Mock(return_value=True)),\
patch('patroni.dcs.logger.error', new_callable=Mock()) as errorlog_mock:
self.p.query = Mock()
alias1 = Member(0, 'test-3', 28, {'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres'})
alias2 = Member(0, 'test.3', 28, {'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres'})
cluster.members.extend([alias1, alias2])
self.p.sync_replication_slots(cluster)
errorlog_mock.assert_called_once()
self.assertTrue("test-3" in errorlog_mock.call_args[0][1],
"non matching {0}".format(errorlog_mock.call_args[0][1]))
self.assertTrue("test.3" in errorlog_mock.call_args[0][1],
"non matching {0}".format(errorlog_mock.call_args[0][1]))
self.assertEqual(errorlog_mock.call_count, 5)
ca = errorlog_mock.call_args_list[0][0][1]
self.assertTrue("test-3" in ca, "non matching {0}".format(ca))
self.assertTrue("test.3" in ca, "non matching {0}".format(ca))
@patch.object(MockCursor, 'execute', Mock(side_effect=psycopg2.OperationalError))
def test__query(self):