Make GlobalConfig really global (#2935)

1. extract `GlobalConfig` class to its own module
2. make the module instantiate the `GlobalConfig` object on load and replace sys.modules with the this instance
3. don't pass `GlobalConfig` object around, but use `patroni.global_config` module everywhere.
4. move `ignore_slots_matchers`, `max_timelines_history`,  and `permanent_slots` from `ClusterConfig` to `GlobalConfig`.
5. add `use_slots` property to global_config and remove duplicated code from `Cluster` and `Postgresql.ConfigHandler`.

Besides that improve readability of couple of checks in ha.py and formatting of `/config` key when saved from patronictl.
This commit is contained in:
Alexander Kukushkin
2023-11-24 09:26:05 +01:00
committed by GitHub
parent 91327f943c
commit 193c73f6b8
19 changed files with 372 additions and 354 deletions

View File

@@ -26,7 +26,7 @@ from urllib.parse import urlparse, parse_qs
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, TYPE_CHECKING, Union
from . import psycopg
from . import global_config, psycopg
from .__main__ import Patroni
from .dcs import Cluster
from .exceptions import PostgresConnectionException, PostgresException
@@ -290,7 +290,7 @@ class RestApiHandler(BaseHTTPRequestHandler):
patroni = self.server.patroni
cluster = patroni.dcs.cluster
global_config = patroni.config.get_global_config(cluster)
config = global_config.from_cluster(cluster)
leader_optime = cluster and cluster.last_lsn or 0
replayed_location = response.get('xlog', {}).get('replayed_location', 0)
@@ -308,7 +308,7 @@ class RestApiHandler(BaseHTTPRequestHandler):
standby_leader_status_code = 200 if response.get('role') == 'standby_leader' else 503
elif patroni.ha.is_leader():
leader_status_code = 200
if global_config.is_standby_cluster:
if config.is_standby_cluster:
primary_status_code = replica_status_code = 503
standby_leader_status_code = 200 if response.get('role') in ('replica', 'standby_leader') else 503
else:
@@ -452,9 +452,8 @@ class RestApiHandler(BaseHTTPRequestHandler):
HTTP status ``200`` and the JSON representation of the cluster topology.
"""
cluster = self.server.patroni.dcs.get_cluster()
global_config = self.server.patroni.config.get_global_config(cluster)
response = cluster_as_json(cluster, global_config)
response = cluster_as_json(cluster)
response['scope'] = self.server.patroni.postgresql.scope
self._write_json_response(200, response)
@@ -864,7 +863,7 @@ class RestApiHandler(BaseHTTPRequestHandler):
if request:
logger.debug("received restart request: {0}".format(request))
if self.server.patroni.config.get_global_config(cluster).is_paused and 'schedule' in request:
if global_config.from_cluster(cluster).is_paused and 'schedule' in request:
self.write_response(status_code, "Can't schedule restart in the paused state")
return
@@ -1033,7 +1032,7 @@ class RestApiHandler(BaseHTTPRequestHandler):
:returns: a string with the error message or ``None`` if good nodes are found.
"""
is_synchronous_mode = self.server.patroni.config.get_global_config(cluster).is_synchronous_mode
is_synchronous_mode = global_config.from_cluster(cluster).is_synchronous_mode
if leader and (not cluster.leader or cluster.leader.name != leader):
return 'leader name does not match'
if candidate:
@@ -1091,7 +1090,7 @@ class RestApiHandler(BaseHTTPRequestHandler):
candidate = request.get('candidate') or request.get('member')
scheduled_at = request.get('scheduled_at')
cluster = self.server.patroni.dcs.get_cluster()
global_config = self.server.patroni.config.get_global_config(cluster)
config = global_config.from_cluster(cluster)
logger.info("received %s request with leader=%s candidate=%s scheduled_at=%s",
action, leader, candidate, scheduled_at)
@@ -1104,12 +1103,12 @@ class RestApiHandler(BaseHTTPRequestHandler):
if not data and scheduled_at:
if action == 'failover':
data = "Failover can't be scheduled"
elif global_config.is_paused:
elif config.is_paused:
data = "Can't schedule switchover in the paused state"
else:
(status_code, data, scheduled_at) = self.parse_schedule(scheduled_at, action)
if not data and global_config.is_paused and not candidate:
if not data and config.is_paused and not candidate:
data = 'Switchover is possible only to a specific candidate in a paused state'
if action == 'failover' and leader:
@@ -1260,7 +1259,7 @@ class RestApiHandler(BaseHTTPRequestHandler):
"""
postgresql = self.server.patroni.postgresql
cluster = self.server.patroni.dcs.cluster
global_config = self.server.patroni.config.get_global_config(cluster)
config = global_config.from_cluster(cluster)
try:
if postgresql.state not in ('running', 'restarting', 'starting'):
@@ -1291,10 +1290,10 @@ class RestApiHandler(BaseHTTPRequestHandler):
})
}
if result['role'] == 'replica' and global_config.is_standby_cluster:
if result['role'] == 'replica' and config.is_standby_cluster:
result['role'] = postgresql.role
if result['role'] == 'replica' and global_config.is_synchronous_mode\
if result['role'] == 'replica' and config.is_synchronous_mode\
and cluster and cluster.sync.matches(postgresql.name):
result['sync_standby'] = True
@@ -1319,7 +1318,7 @@ class RestApiHandler(BaseHTTPRequestHandler):
state = 'unknown'
result: Dict[str, Any] = {'state': state, 'role': postgresql.role}
if global_config.is_paused:
if config.is_paused:
result['pause'] = True
if not cluster or cluster.is_unlocked():
result['cluster_unlocked'] = True

View File

@@ -12,7 +12,7 @@ from typing import Any, Callable, Collection, Dict, List, Optional, Union, TYPE_
from . import PATRONI_ENV_PREFIX
from .collections import CaseInsensitiveDict
from .dcs import ClusterConfig, Cluster
from .dcs import ClusterConfig
from .exceptions import ConfigParseError
from .file_perm import pg_perm
from .postgresql.config import ConfigHandler
@@ -54,154 +54,6 @@ def default_validator(conf: Dict[str, Any]) -> List[str]:
return []
class GlobalConfig(object):
"""A class that wraps global configuration and provides convenient methods to access/check values.
It is instantiated either by calling :func:`get_global_config` or :meth:`Config.get_global_config`, which picks
either a configuration from provided :class:`Cluster` object (the most up-to-date) or from the
local cache if :class:`ClusterConfig` is not initialized or doesn't have a valid config.
"""
def __init__(self, config: Dict[str, Any]) -> None:
"""Initialize :class:`GlobalConfig` object with given *config*.
:param config: current configuration either from
:class:`ClusterConfig` or from :func:`Config.dynamic_configuration`.
"""
self.__config = config
def get(self, name: str) -> Any:
"""Gets global configuration value by *name*.
:param name: parameter name.
:returns: configuration value or ``None`` if it is missing.
"""
return self.__config.get(name)
def check_mode(self, mode: str) -> bool:
"""Checks whether the certain parameter is enabled.
:param mode: parameter name, e.g. ``synchronous_mode``, ``failsafe_mode``, ``pause``, ``check_timeline``, and
so on.
:returns: ``True`` if parameter *mode* is enabled in the global configuration.
"""
return bool(parse_bool(self.__config.get(mode)))
@property
def is_paused(self) -> bool:
"""``True`` if cluster is in maintenance mode."""
return self.check_mode('pause')
@property
def is_synchronous_mode(self) -> bool:
"""``True`` if synchronous replication is requested and it is not a standby cluster config."""
return self.check_mode('synchronous_mode') and not self.is_standby_cluster
@property
def is_synchronous_mode_strict(self) -> bool:
"""``True`` if at least one synchronous node is required."""
return self.check_mode('synchronous_mode_strict')
def get_standby_cluster_config(self) -> Union[Dict[str, Any], Any]:
"""Get ``standby_cluster`` configuration.
:returns: a copy of ``standby_cluster`` configuration.
"""
return deepcopy(self.get('standby_cluster'))
@property
def is_standby_cluster(self) -> bool:
"""``True`` if global configuration has a valid ``standby_cluster`` section."""
config = self.get_standby_cluster_config()
return isinstance(config, dict) and\
bool(config.get('host') or config.get('port') or config.get('restore_command'))
def get_int(self, name: str, default: int = 0) -> int:
"""Gets current value of *name* from the global configuration and try to return it as :class:`int`.
:param name: name of the parameter.
:param default: default value if *name* is not in the configuration or invalid.
:returns: currently configured value of *name* from the global configuration or *default* if it is not set or
invalid.
"""
ret = parse_int(self.get(name))
return default if ret is None else ret
@property
def min_synchronous_nodes(self) -> int:
"""The minimal number of synchronous nodes based on whether ``synchronous_mode_strict`` is enabled or not."""
return 1 if self.is_synchronous_mode_strict else 0
@property
def synchronous_node_count(self) -> int:
"""Currently configured value of ``synchronous_node_count`` from the global configuration.
Assume ``1`` if it is not set or invalid.
"""
return max(self.get_int('synchronous_node_count', 1), self.min_synchronous_nodes)
@property
def maximum_lag_on_failover(self) -> int:
"""Currently configured value of ``maximum_lag_on_failover`` from the global configuration.
Assume ``1048576`` if it is not set or invalid.
"""
return self.get_int('maximum_lag_on_failover', 1048576)
@property
def maximum_lag_on_syncnode(self) -> int:
"""Currently configured value of ``maximum_lag_on_syncnode`` from the global configuration.
Assume ``-1`` if it is not set or invalid.
"""
return self.get_int('maximum_lag_on_syncnode', -1)
@property
def primary_start_timeout(self) -> int:
"""Currently configured value of ``primary_start_timeout`` from the global configuration.
Assume ``300`` if it is not set or invalid.
.. note::
``master_start_timeout`` is still supported to keep backward compatibility.
"""
default = 300
return self.get_int('primary_start_timeout', default)\
if 'primary_start_timeout' in self.__config else self.get_int('master_start_timeout', default)
@property
def primary_stop_timeout(self) -> int:
"""Currently configured value of ``primary_stop_timeout`` from the global configuration.
Assume ``0`` if it is not set or invalid.
.. note::
``master_stop_timeout`` is still supported to keep backward compatibility.
"""
default = 0
return self.get_int('primary_stop_timeout', default)\
if 'primary_stop_timeout' in self.__config else self.get_int('master_stop_timeout', default)
def get_global_config(cluster: Optional[Cluster], default: Optional[Dict[str, Any]] = None) -> GlobalConfig:
"""Instantiates :class:`GlobalConfig` based on the input.
:param cluster: the currently known cluster state from DCS.
:param default: default configuration, which will be used if there is no valid *cluster.config*.
:returns: :class:`GlobalConfig` object.
"""
# Try to protect from the case when DCS was wiped out
if cluster and cluster.config and cluster.config.modify_version:
config = cluster.config.data
else:
config = default or {}
return GlobalConfig(deepcopy(config))
class Config(object):
"""Handle Patroni configuration.
@@ -949,18 +801,6 @@ class Config(object):
"""
return deepcopy(self.__effective_configuration)
def get_global_config(self, cluster: Optional[Cluster]) -> GlobalConfig:
"""Instantiate :class:`GlobalConfig` based on input.
Use the configuration from provided *cluster* (the most up-to-date) or from the
local cache if *cluster.config* is not initialized or doesn't have a valid config.
:param cluster: the currently known cluster state from DCS.
:returns: :class:`GlobalConfig` object.
"""
return get_global_config(cluster, self._dynamic_configuration)
def _validate_failover_tags(self) -> None:
"""Check ``nofailover``/``failover_priority`` config and warn user if it's contradictory.

View File

@@ -46,7 +46,8 @@ try:
except ImportError: # pragma: no cover
from cdiff import markup_to_pager, PatchStream # pyright: ignore [reportMissingModuleSource]
from .config import Config, get_global_config
from . import global_config
from .config import Config
from .dcs import get_dcs as _get_dcs, AbstractDCS, Cluster, Member
from .exceptions import PatroniException
from .postgresql.misc import postgres_version_to_int
@@ -1026,7 +1027,7 @@ def reload(cluster_name: str, member_names: List[str], group: Optional[int], for
if r.status == 200:
click.echo('No changes to apply on member {0}'.format(member.name))
elif r.status == 202:
config = get_global_config(cluster)
config = global_config.from_cluster(cluster)
click.echo('Reload request received for member {0} and will be processed within {1} seconds'.format(
member.name, config.get('loop_wait') or dcs.loop_wait)
)
@@ -1105,7 +1106,7 @@ def restart(cluster_name: str, group: Optional[int], member_names: List[str],
content['postgres_version'] = version
if scheduled_at:
if get_global_config(cluster).is_paused:
if global_config.from_cluster(cluster).is_paused:
raise PatroniCtlException("Can't schedule restart in the paused state")
content['schedule'] = scheduled_at.isoformat()
@@ -1228,7 +1229,7 @@ def _do_failover_or_switchover(action: str, cluster_name: str, group: Optional[i
dcs = get_dcs(cluster_name, group)
cluster = dcs.get_cluster()
global_config = get_global_config(cluster)
config = global_config.from_cluster(cluster)
# leader has to be be defined for switchover only
if action == 'switchover':
@@ -1239,7 +1240,7 @@ def _do_failover_or_switchover(action: str, cluster_name: str, group: Optional[i
if force:
leader = cluster.leader.name
else:
prompt = 'Standby Leader' if global_config.is_standby_cluster else 'Primary'
prompt = 'Standby Leader' if config.is_standby_cluster else 'Primary'
leader = click.prompt(prompt, type=str, default=(cluster.leader and cluster.leader.name))
if cluster.leader.name != leader:
@@ -1268,7 +1269,7 @@ def _do_failover_or_switchover(action: str, cluster_name: str, group: Optional[i
if all((not force,
action == 'failover',
global_config.is_synchronous_mode,
config.is_synchronous_mode,
not cluster.sync.is_empty,
not cluster.sync.matches(candidate, True))):
if click.confirm(f'Are you sure you want to failover to the asynchronous node {candidate}'):
@@ -1285,7 +1286,7 @@ def _do_failover_or_switchover(action: str, cluster_name: str, group: Optional[i
scheduled_at = parse_scheduled(scheduled)
if scheduled_at:
if global_config.is_paused:
if config.is_paused:
raise PatroniCtlException("Can't schedule switchover in the paused state")
scheduled_at_str = scheduled_at.isoformat()
@@ -1739,7 +1740,7 @@ def wait_until_pause_is_applied(dcs: AbstractDCS, paused: bool, old_cluster: Clu
:param old_cluster: original cluster information before pause or unpause has been requested. Used to report which
nodes are still pending to have ``pause`` equal *paused* at a given point in time.
"""
config = get_global_config(old_cluster)
config = global_config.from_cluster(old_cluster)
click.echo("'{0}' request sent, waiting until it is recognized by all nodes".format(paused and 'pause' or 'resume'))
old = {m.name: m.version for m in old_cluster.members if m.api_url}
@@ -1777,7 +1778,7 @@ def toggle_pause(cluster_name: str, group: Optional[int], paused: bool, wait: bo
"""
dcs = get_dcs(cluster_name, group)
cluster = dcs.get_cluster()
if get_global_config(cluster).is_paused == paused:
if global_config.from_cluster(cluster).is_paused == paused:
raise PatroniCtlException('Cluster is {0} paused'.format(paused and 'already' or 'not'))
for member in get_all_members_leader_first(cluster):
@@ -2122,7 +2123,7 @@ def edit_config(cluster_name: str, group: Optional[int], force: bool, quiet: boo
return
if force or click.confirm('Apply these changes?'):
if not dcs.set_config_value(json.dumps(changed_data), cluster.config.version):
if not dcs.set_config_value(json.dumps(changed_data, separators=(',', ':')), cluster.config.version):
raise PatroniCtlException("Config modification aborted due to concurrent changes")
click.echo("Configuration changed")

View File

@@ -15,6 +15,7 @@ from urllib.parse import urlparse, urlunparse, parse_qsl
import dateutil.parser
from .. import global_config
from ..dynamic_loader import iter_classes, iter_modules
from ..exceptions import PatroniFatalException
from ..utils import deep_compare, uri
@@ -538,24 +539,6 @@ class ClusterConfig(NamedTuple):
modify_version = 0
return ClusterConfig(version, data, version if modify_version is None else modify_version)
@property
def permanent_slots(self) -> Dict[str, Any]:
"""Dictionary of permanent slots information looked up from :attr:`~ClusterConfig.data`."""
return (self.data.get('permanent_replication_slots')
or self.data.get('permanent_slots')
or self.data.get('slots')
or {})
@property
def ignore_slots_matchers(self) -> List[Dict[str, Any]]:
"""The value for ``ignore_slots`` from :attr:`~ClusterConfig.data` if defined or an empty list."""
return self.data.get('ignore_slots') or []
@property
def max_timelines_history(self) -> int:
"""The value for ``max_timelines_history`` from :attr:`~ClusterConfig.data` if defined or ``0``."""
return self.data.get('max_timelines_history', 0)
class SyncState(NamedTuple):
"""Immutable object (namedtuple) which represents last observed synchronous replication state.
@@ -944,7 +927,7 @@ class Cluster(NamedTuple('Cluster',
@property
def __permanent_slots(self) -> Dict[str, Union[Dict[str, Any], Any]]:
"""Dictionary of permanent replication slots with their known LSN."""
ret: Dict[str, Union[Dict[str, Any], Any]] = deepcopy(self.config.permanent_slots if self.config else {})
ret: Dict[str, Union[Dict[str, Any], Any]] = global_config.permanent_slots
members: Dict[str, int] = {slot_name_from_member_name(m.name): m.lsn or 0 for m in self.members}
slots: Dict[str, int] = {k: parse_int(v) or 0 for k, v in (self.slots or {}).items()}
@@ -973,13 +956,8 @@ class Cluster(NamedTuple('Cluster',
"""Dictionary of permanent ``logical`` replication slots."""
return {name: value for name, value in self.__permanent_slots.items() if self.is_logical_slot(value)}
@property
def use_slots(self) -> bool:
"""``True`` if cluster is configured to use replication slots."""
return bool(self.config and (self.config.data.get('postgresql') or {}).get('use_slots', True))
def get_replication_slots(self, my_name: str, role: str, nofailover: bool, major_version: int, *,
is_standby_cluster: bool = False, show_error: bool = False) -> Dict[str, Dict[str, Any]]:
show_error: bool = False) -> Dict[str, Dict[str, Any]]:
"""Lookup configured slot names in the DCS, report issues found and merge with permanent slots.
Will log an error if:
@@ -990,15 +968,12 @@ class Cluster(NamedTuple('Cluster',
:param role: role of this node.
:param nofailover: ``True`` if this node is tagged to not be a failover candidate.
:param major_version: postgresql major version.
:param is_standby_cluster: ``True`` if it is known that this is a standby cluster. We pass the value from
the outside because we want to protect from the ``/config`` key removal.
:param show_error: if ``True`` report error if any disabled logical slots or conflicting slot names are found.
:returns: final dictionary of slot names, after merging with permanent slots and performing sanity checks.
"""
slots: Dict[str, Dict[str, str]] = self._get_members_slots(my_name, role)
permanent_slots: Dict[str, Any] = self._get_permanent_slots(is_standby_cluster=is_standby_cluster,
role=role, nofailover=nofailover,
permanent_slots: Dict[str, Any] = self._get_permanent_slots(role=role, nofailover=nofailover,
major_version=major_version)
disabled_permanent_logical_slots: List[str] = self._merge_permanent_slots(
@@ -1058,8 +1033,7 @@ class Cluster(NamedTuple('Cluster',
logger.error("Bad value for slot '%s' in permanent_slots: %s", name, permanent_slots[name])
return disabled_permanent_logical_slots
def _get_permanent_slots(self, *, is_standby_cluster: bool, role: str,
nofailover: bool, major_version: int) -> Dict[str, Any]:
def _get_permanent_slots(self, *, role: str, nofailover: bool, major_version: int) -> Dict[str, Any]:
"""Get configured permanent replication slots.
.. note::
@@ -1071,18 +1045,16 @@ class Cluster(NamedTuple('Cluster',
The returned dictionary for a non-standby cluster always contains permanent logical replication slots in
order to show a warning if they are not supported by PostgreSQL before v11.
:param is_standby_cluster: ``True`` if it is known that this is a standby cluster. We pass the value from
the outside because we want to protect from the ``/config`` key removal.
:param role: role of this node -- ``primary``, ``standby_leader`` or ``replica``.
:param nofailover: ``True`` if this node is tagged to not be a failover candidate.
:param major_version: postgresql major version.
:returns: dictionary of permanent slot names mapped to attributes.
"""
if not self.use_slots or nofailover:
if not global_config.use_slots or nofailover:
return {}
if is_standby_cluster:
if global_config.is_standby_cluster:
return self.__permanent_physical_slots \
if major_version >= SLOT_ADVANCE_AVAILABLE_VERSION or role == 'standby_leader' else {}
@@ -1108,7 +1080,7 @@ class Cluster(NamedTuple('Cluster',
:returns: dictionary of physical replication slots that should exist on a given node.
"""
if not self.use_slots:
if not global_config.use_slots:
return {}
# we always want to exclude the member with our name from the list
@@ -1132,13 +1104,11 @@ class Cluster(NamedTuple('Cluster',
for k, v in slot_conflicts.items() if len(v) > 1))
return slots
def has_permanent_slots(self, my_name: str, *, is_standby_cluster: bool = False, nofailover: bool = False,
def has_permanent_slots(self, my_name: str, *, nofailover: bool = False,
major_version: int = SLOT_ADVANCE_AVAILABLE_VERSION) -> bool:
"""Check if the given member node has permanent replication slots configured.
:param my_name: name of the member node to check.
:param is_standby_cluster: ``True`` if it is known that this is a standby cluster. We pass the value from
the outside because we want to protect from the ``/config`` key removal.
:param nofailover: ``True`` if this node is tagged to not be a failover candidate.
:param major_version: postgresql major version.
@@ -1146,20 +1116,16 @@ class Cluster(NamedTuple('Cluster',
"""
role = 'replica'
members_slots: Dict[str, Dict[str, str]] = self._get_members_slots(my_name, role)
permanent_slots: Dict[str, Any] = self._get_permanent_slots(is_standby_cluster=is_standby_cluster,
role=role, nofailover=nofailover,
permanent_slots: Dict[str, Any] = self._get_permanent_slots(role=role, nofailover=nofailover,
major_version=major_version)
slots = deepcopy(members_slots)
self._merge_permanent_slots(slots, permanent_slots, my_name, major_version)
return len(slots) > len(members_slots) or any(self.is_physical_slot(v) for v in permanent_slots.values())
def filter_permanent_slots(self, slots: Dict[str, int], is_standby_cluster: bool,
major_version: int) -> Dict[str, int]:
def filter_permanent_slots(self, slots: Dict[str, int], major_version: int) -> Dict[str, int]:
"""Filter out all non-permanent slots from provided *slots* dict.
:param slots: slot names with LSN values
:param is_standby_cluster: ``True`` if it is known that this is a standby cluster. We pass the value from
the outside because we want to protect from the ``/config`` key removal.
:param major_version: postgresql major version.
:returns: a :class:`dict` object that contains only slots that are known to be permanent.
@@ -1167,9 +1133,7 @@ class Cluster(NamedTuple('Cluster',
if major_version < SLOT_ADVANCE_AVAILABLE_VERSION:
return {} # for legacy PostgreSQL we don't support permanent slots on standby nodes
permanent_slots: Dict[str, Any] = self._get_permanent_slots(is_standby_cluster=is_standby_cluster,
role='replica',
nofailover=False,
permanent_slots: Dict[str, Any] = self._get_permanent_slots(role='replica', nofailover=False,
major_version=major_version)
members_slots = {slot_name_from_member_name(m.name) for m in self.members}
@@ -1203,7 +1167,7 @@ class Cluster(NamedTuple('Cluster',
if self._has_permanent_logical_slots(my_name, nofailover):
return True
if self.use_slots:
if global_config.use_slots:
members = [m for m in self.members if m.replicatefrom == my_name and m.name != self.leader_name]
return any(self.should_enforce_hot_standby_feedback(m.name, m.nofailover) for m in members)
return False

227
patroni/global_config.py Normal file
View File

@@ -0,0 +1,227 @@
"""Implements *global_config* facilities.
The :class:`GlobalConfig` object is instantiated on import and replaces
``patroni.global_config`` module in :data:`sys.modules`, what allows to use
its properties and methods like they were module variables and functions.
"""
import sys
import types
from copy import deepcopy
from typing import Any, Dict, List, Optional, Union, TYPE_CHECKING
from .utils import parse_bool, parse_int
if TYPE_CHECKING: # pragma: no cover
from .dcs import Cluster
def __getattr__(mod: types.ModuleType, name: str) -> Any:
"""This function exists just to make pyright happy.
Without it pyright complains about access to unknown members of global_config module.
"""
return getattr(sys.modules[__name__], name) # pragma: no cover
class GlobalConfig(types.ModuleType):
"""A class that wraps global configuration and provides convenient methods to access/check values."""
__file__ = __file__ # just to make unittest and pytest happy
def __init__(self) -> None:
"""Initialize :class:`GlobalConfig` object."""
super().__init__(__name__)
self.__config = {}
@staticmethod
def _cluster_has_valid_config(cluster: Optional['Cluster']) -> bool:
"""Check if provided *cluster* object has a valid global configuration.
:param cluster: the currently known cluster state from DCS.
:returns: ``True`` if provided *cluster* object has a valid global configuration, otherwise ``False``.
"""
return bool(cluster and cluster.config and cluster.config.modify_version)
def update(self, cluster: Optional['Cluster']) -> None:
"""Update with the new global configuration from the :class:`Cluster` object view.
.. note::
Global configuration is updated only when configuration in the *cluster* view is valid.
Update happens in-place and is executed only from the main heartbeat thread.
:param cluster: the currently known cluster state from DCS.
"""
# Try to protect from the case when DCS was wiped out
if self._cluster_has_valid_config(cluster):
self.__config = cluster.config.data # pyright: ignore [reportOptionalMemberAccess]
def from_cluster(self, cluster: Optional['Cluster']) -> 'GlobalConfig':
"""Return :class:`GlobalConfig` instance from the provided :class:`Cluster` object view.
.. note::
If the provided *cluster* object doesn't have a valid global configuration we return
the last known valid state of the :class:`GlobalConfig` object.
This method is used when we need to have the most up-to-date values in the global configuration,
but we don't want to update the global object.
:param cluster: the currently known cluster state from DCS.
:returns: :class:`GlobalConfig` object.
"""
if not self._cluster_has_valid_config(cluster):
return self
ret = GlobalConfig()
ret.update(cluster)
return ret
def get(self, name: str) -> Any:
"""Gets global configuration value by *name*.
:param name: parameter name.
:returns: configuration value or ``None`` if it is missing.
"""
return self.__config.get(name)
def check_mode(self, mode: str) -> bool:
"""Checks whether the certain parameter is enabled.
:param mode: parameter name, e.g. ``synchronous_mode``, ``failsafe_mode``, ``pause``, ``check_timeline``, and
so on.
:returns: ``True`` if parameter *mode* is enabled in the global configuration.
"""
return bool(parse_bool(self.__config.get(mode)))
@property
def is_paused(self) -> bool:
"""``True`` if cluster is in maintenance mode."""
return self.check_mode('pause')
@property
def is_synchronous_mode(self) -> bool:
"""``True`` if synchronous replication is requested and it is not a standby cluster config."""
return self.check_mode('synchronous_mode') and not self.is_standby_cluster
@property
def is_synchronous_mode_strict(self) -> bool:
"""``True`` if at least one synchronous node is required."""
return self.check_mode('synchronous_mode_strict')
def get_standby_cluster_config(self) -> Union[Dict[str, Any], Any]:
"""Get ``standby_cluster`` configuration.
:returns: a copy of ``standby_cluster`` configuration.
"""
return deepcopy(self.get('standby_cluster'))
@property
def is_standby_cluster(self) -> bool:
"""``True`` if global configuration has a valid ``standby_cluster`` section."""
config = self.get_standby_cluster_config()
return isinstance(config, dict) and\
bool(config.get('host') or config.get('port') or config.get('restore_command'))
def get_int(self, name: str, default: int = 0) -> int:
"""Gets current value of *name* from the global configuration and try to return it as :class:`int`.
:param name: name of the parameter.
:param default: default value if *name* is not in the configuration or invalid.
:returns: currently configured value of *name* from the global configuration or *default* if it is not set or
invalid.
"""
ret = parse_int(self.get(name))
return default if ret is None else ret
@property
def min_synchronous_nodes(self) -> int:
"""The minimum number of synchronous nodes based on whether ``synchronous_mode_strict`` is enabled or not."""
return 1 if self.is_synchronous_mode_strict else 0
@property
def synchronous_node_count(self) -> int:
"""Currently configured value of ``synchronous_node_count`` from the global configuration.
Assume ``1`` if it is not set or invalid.
"""
return max(self.get_int('synchronous_node_count', 1), self.min_synchronous_nodes)
@property
def maximum_lag_on_failover(self) -> int:
"""Currently configured value of ``maximum_lag_on_failover`` from the global configuration.
Assume ``1048576`` if it is not set or invalid.
"""
return self.get_int('maximum_lag_on_failover', 1048576)
@property
def maximum_lag_on_syncnode(self) -> int:
"""Currently configured value of ``maximum_lag_on_syncnode`` from the global configuration.
Assume ``-1`` if it is not set or invalid.
"""
return self.get_int('maximum_lag_on_syncnode', -1)
@property
def primary_start_timeout(self) -> int:
"""Currently configured value of ``primary_start_timeout`` from the global configuration.
Assume ``300`` if it is not set or invalid.
.. note::
``master_start_timeout`` is still supported to keep backward compatibility.
"""
default = 300
return self.get_int('primary_start_timeout', default)\
if 'primary_start_timeout' in self.__config else self.get_int('master_start_timeout', default)
@property
def primary_stop_timeout(self) -> int:
"""Currently configured value of ``primary_stop_timeout`` from the global configuration.
Assume ``0`` if it is not set or invalid.
.. note::
``master_stop_timeout`` is still supported to keep backward compatibility.
"""
default = 0
return self.get_int('primary_stop_timeout', default)\
if 'primary_stop_timeout' in self.__config else self.get_int('master_stop_timeout', default)
@property
def ignore_slots_matchers(self) -> List[Dict[str, Any]]:
"""Currently configured value of ``ignore_slots`` from the global configuration.
Assume an empty :class:`list` if not set.
"""
return self.get('ignore_slots') or []
@property
def max_timelines_history(self) -> int:
"""Currently configured value of ``max_timelines_history`` from the global configuration.
Assume ``0`` if not set or invalid.
"""
return self.get_int('max_timelines_history', 0)
@property
def use_slots(self) -> bool:
"""``True`` if cluster is configured to use replication slots."""
return bool(parse_bool((self.get('postgresql') or {}).get('use_slots', True)))
@property
def permanent_slots(self) -> Dict[str, Any]:
"""Dictionary of permanent slots information from the global configuration."""
return deepcopy(self.get('permanent_replication_slots')
or self.get('permanent_slots')
or self.get('slots')
or {})
sys.modules[__name__] = GlobalConfig()

View File

@@ -10,7 +10,7 @@ from multiprocessing.pool import ThreadPool
from threading import RLock
from typing import Any, Callable, Collection, Dict, List, NamedTuple, Optional, Union, Tuple, TYPE_CHECKING
from . import psycopg
from . import global_config, psycopg
from .__main__ import Patroni
from .async_executor import AsyncExecutor, CriticalTask
from .collections import CaseInsensitiveSet
@@ -156,7 +156,6 @@ class Ha(object):
self._rewind = Rewind(self.state_handler)
self.dcs = patroni.dcs
self.cluster = Cluster.empty()
self.global_config = self.patroni.config.get_global_config(None)
self.old_cluster = Cluster.empty()
self._leader_expiry = 0
self._leader_expiry_lock = RLock()
@@ -188,20 +187,20 @@ class Ha(object):
def primary_stop_timeout(self) -> Union[int, None]:
""":returns: "primary_stop_timeout" from the global configuration or `None` when not in synchronous mode."""
ret = self.global_config.primary_stop_timeout
ret = global_config.primary_stop_timeout
return ret if ret > 0 and self.is_synchronous_mode() else None
def is_paused(self) -> bool:
""":returns: `True` if in maintenance mode."""
return self.global_config.is_paused
return global_config.is_paused
def check_timeline(self) -> bool:
""":returns: `True` if should check whether the timeline is latest during the leader race."""
return self.global_config.check_mode('check_timeline')
return global_config.check_mode('check_timeline')
def is_standby_cluster(self) -> bool:
""":returns: `True` if global configuration has a valid "standby_cluster" section."""
return self.global_config.is_standby_cluster
return global_config.is_standby_cluster
def is_leader(self) -> bool:
""":returns: `True` if the current node is the leader, based on expiration set when it last held the key."""
@@ -296,7 +295,6 @@ class Ha(object):
last_lsn = self.state_handler.last_operation()
slots = self.cluster.filter_permanent_slots(
{**self.state_handler.slots(), slot_name_from_member_name(self.state_handler.name): last_lsn},
self.is_standby_cluster(),
self.state_handler.major_version)
except Exception:
logger.exception('Exception when called state_handler.last_operation()')
@@ -450,7 +448,7 @@ class Ha(object):
return ret or 'trying to bootstrap {0}'.format(msg)
# no leader, but configuration may allowed replica creation using backup tools
create_replica_methods = self.global_config.get_standby_cluster_config().get('create_replica_methods', []) \
create_replica_methods = global_config.get_standby_cluster_config().get('create_replica_methods', []) \
if self.is_standby_cluster() else None
can_bootstrap = self.state_handler.can_create_replica_without_replication_connection(create_replica_methods)
concurrent_bootstrap = self.cluster.initialize == ""
@@ -525,7 +523,7 @@ class Ha(object):
:returns: action message, describing what was performed.
"""
if self.has_lock() and self.update_lock():
timeout = self.global_config.primary_start_timeout
timeout = global_config.primary_start_timeout
if timeout == 0:
# We are requested to prefer failing over to restarting primary. But see first if there
# is anyone to fail over to.
@@ -622,7 +620,7 @@ class Ha(object):
for param in params: # It is highly unlikely to happen, but we want to protect from the case
node_to_follow.data.pop(param, None) # when above-mentioned params came from outside.
if self.is_standby_cluster():
standby_config = self.global_config.get_standby_cluster_config()
standby_config = global_config.get_standby_cluster_config()
node_to_follow.data.update({p: standby_config[p] for p in params if standby_config.get(p)})
return node_to_follow
@@ -684,11 +682,11 @@ class Ha(object):
def is_synchronous_mode(self) -> bool:
""":returns: `True` if synchronous replication is requested."""
return self.global_config.is_synchronous_mode
return global_config.is_synchronous_mode
def is_failsafe_mode(self) -> bool:
""":returns: `True` if failsafe_mode is enabled in global configuration."""
return self.global_config.check_mode('failsafe_mode')
return global_config.check_mode('failsafe_mode')
def process_sync_replication(self) -> None:
"""Process synchronous standby beahvior.
@@ -732,7 +730,7 @@ class Ha(object):
return logger.info('Synchronous replication key updated by someone else.')
# When strict mode and no suitable replication connections put "*" to synchronous_standby_names
if self.global_config.is_synchronous_mode_strict and not picked:
if global_config.is_synchronous_mode_strict and not picked:
picked = CaseInsensitiveSet('*')
logger.warning("No standbys available!")
@@ -805,7 +803,7 @@ class Ha(object):
cluster_history_dict: Dict[int, List[Any]] = {line[0]: list(line) for line in cluster_history}
history: List[List[Any]] = list(map(list, self.state_handler.get_history(primary_timeline)))
if self.cluster.config:
history = history[-self.cluster.config.max_timelines_history:]
history = history[-global_config.max_timelines_history:]
for line in history:
# enrich current history with promotion timestamps stored in DCS
cluster_history_line = cluster_history_dict.get(line[0], [])
@@ -863,7 +861,7 @@ class Ha(object):
# promotion until next cycle. TODO: trigger immediate retry of run_cycle
return 'Postponing promotion because synchronous replication state was updated by somebody else'
self.state_handler.sync_handler.set_synchronous_standby_names(
CaseInsensitiveSet('*') if self.global_config.is_synchronous_mode_strict else CaseInsensitiveSet())
CaseInsensitiveSet('*') if global_config.is_synchronous_mode_strict else CaseInsensitiveSet())
if self.state_handler.role not in ('master', 'promoted', 'primary'):
# reset failsafe state when promote
self._failsafe.set_is_active(0)
@@ -974,7 +972,7 @@ class Ha(object):
:returns True when node is lagging
"""
lag = (self.cluster.last_lsn or 0) - wal_position
return lag > self.global_config.maximum_lag_on_failover
return lag > global_config.maximum_lag_on_failover
def _is_healthiest_node(self, members: Collection[Member], check_replication_lag: bool = True) -> bool:
"""This method tries to determine whether I am healthy enough to became a new leader candidate or not."""
@@ -1541,7 +1539,7 @@ class Ha(object):
# Now that restart is scheduled we can set timeout for startup, it will get reset
# once async executor runs and main loop notices PostgreSQL as up.
timeout = restart_data.get('timeout', self.global_config.primary_start_timeout)
timeout = restart_data.get('timeout', global_config.primary_start_timeout)
self.set_start_timeout(timeout)
def before_shutdown() -> None:
@@ -1605,7 +1603,7 @@ class Ha(object):
"""Figure out what to do with the task AsyncExecutor is performing."""
if self.has_lock() and self.update_lock():
if self._async_executor.scheduled_action == 'doing crash recovery in a single user mode':
time_left = self.global_config.primary_start_timeout - (time.time() - self._crash_recovery_started)
time_left = global_config.primary_start_timeout - (time.time() - self._crash_recovery_started)
if time_left <= 0 and self.is_failover_possible():
logger.info("Demoting self because crash recovery is taking too long")
self.state_handler.cancellable.cancel(True)
@@ -1690,7 +1688,7 @@ class Ha(object):
self.set_is_leader(True)
if self.is_synchronous_mode():
self.state_handler.sync_handler.set_synchronous_standby_names(
CaseInsensitiveSet('*') if self.global_config.is_synchronous_mode_strict else CaseInsensitiveSet())
CaseInsensitiveSet('*') if global_config.is_synchronous_mode_strict else CaseInsensitiveSet())
self.state_handler.call_nowait(CallbackAction.ON_START)
self.load_cluster_from_dcs()
@@ -1713,7 +1711,7 @@ class Ha(object):
self.demote('immediate-nolock')
return 'stopped PostgreSQL while starting up because leader key was lost'
timeout = self._start_timeout or self.global_config.primary_start_timeout
timeout = self._start_timeout or global_config.primary_start_timeout
time_left = timeout - self.state_handler.time_in_state()
if time_left <= 0:
@@ -1746,8 +1744,8 @@ class Ha(object):
try:
try:
self.load_cluster_from_dcs()
self.global_config = self.patroni.config.get_global_config(self.cluster)
self.state_handler.reset_cluster_info_state(self.cluster, self.patroni.nofailover, self.global_config)
global_config.update(self.cluster)
self.state_handler.reset_cluster_info_state(self.cluster, self.patroni.nofailover)
except Exception:
self.state_handler.reset_cluster_info_state(None)
raise
@@ -1767,10 +1765,10 @@ class Ha(object):
self.touch_member()
# cluster has leader key but not initialize key
if not (self.cluster.is_unlocked() or self.sysid_valid(self.cluster.initialize)) and self.has_lock():
if self.has_lock(False) and not self.sysid_valid(self.cluster.initialize):
self.dcs.initialize(create_new=(self.cluster.initialize is None), sysid=self.state_handler.sysid)
if not (self.cluster.is_unlocked() or self.cluster.config and self.cluster.config.data) and self.has_lock():
if self.has_lock(False) and not (self.cluster.config and self.cluster.config.data):
self.dcs.set_config_value(json.dumps(self.patroni.config.dynamic_configuration, separators=(',', ':')))
self.cluster = self.dcs.get_cluster()
@@ -2047,7 +2045,7 @@ class Ha(object):
config or cluster.config.data.
"""
data: Dict[str, Any] = {}
cluster_params = self.global_config.get_standby_cluster_config()
cluster_params = global_config.get_standby_cluster_config()
if cluster_params:
data.update({k: v for k, v in cluster_params.items() if k in RemoteMember.ALLOWED_KEYS})

View File

@@ -24,7 +24,7 @@ from .misc import parse_history, parse_lsn, postgres_major_version_to_int
from .postmaster import PostmasterProcess
from .slots import SlotsHandler
from .sync import SyncHandler
from .. import psycopg
from .. import global_config, psycopg
from ..async_executor import CriticalTask
from ..collections import CaseInsensitiveSet
from ..dcs import Cluster, Leader, Member, SLOT_ADVANCE_AVAILABLE_VERSION
@@ -34,7 +34,6 @@ from ..utils import Retry, RetryFailedError, polling_loop, data_directory_is_emp
if TYPE_CHECKING: # pragma: no cover
from psycopg import Connection as Connection3, Cursor
from psycopg2 import connection as connection3, cursor
from ..config import GlobalConfig
logger = logging.getLogger(__name__)
@@ -73,7 +72,6 @@ class Postgresql(object):
self.connection_string: str
self.proxy_url: Optional[str]
self._major_version = self.get_major_version()
self._global_config = None
self._state_lock = Lock()
self.set_state('stopped')
@@ -217,7 +215,7 @@ class Postgresql(object):
"FROM pg_catalog.pg_stat_get_wal_senders() w,"
" pg_catalog.pg_stat_get_activity(w.pid)"
" WHERE w.state = 'streaming') r)").format(self.wal_name, self.lsn_name)
if (not self.global_config or self.global_config.is_synchronous_mode)
if global_config.is_synchronous_mode
and self.role in ('master', 'primary', 'promoted') else "'on', '', NULL")
if self._major_version >= 90600:
@@ -426,12 +424,7 @@ class Postgresql(object):
self.config.write_postgresql_conf()
self.reload()
@property
def global_config(self) -> Optional['GlobalConfig']:
return self._global_config
def reset_cluster_info_state(self, cluster: Union[Cluster, None], nofailover: bool = False,
global_config: Optional['GlobalConfig'] = None) -> None:
def reset_cluster_info_state(self, cluster: Union[Cluster, None], nofailover: bool = False) -> None:
"""Reset monitoring query cache.
It happens in the beginning of heart-beat loop and on change of `synchronous_standby_names`.
@@ -440,30 +433,22 @@ class Postgresql(object):
:param nofailover: whether this node could become a new primary.
Important when there are logical permanent replication slots because "nofailover"
node could do cascading replication and should enable `hot_standby_feedback`
:param global_config: last known :class:`GlobalConfig` object
"""
self._cluster_info_state = {}
if global_config:
self._global_config = global_config
if not self._global_config:
return
if self._global_config.is_standby_cluster:
if global_config.is_standby_cluster:
# Standby cluster can't have logical replication slots, and we don't need to enforce hot_standby_feedback
self.set_enforce_hot_standby_feedback(False)
if cluster and cluster.config and cluster.config.modify_version:
# We want to enable hot_standby_feedback if the replica is supposed
# to have a logical slot or in case if it is the cascading replica.
self.set_enforce_hot_standby_feedback(not self._global_config.is_standby_cluster and self.can_advance_slots
self.set_enforce_hot_standby_feedback(not global_config.is_standby_cluster and self.can_advance_slots
and cluster.should_enforce_hot_standby_feedback(self.name,
nofailover))
self._has_permanent_slots = cluster.has_permanent_slots(
my_name=self.name,
is_standby_cluster=self._global_config.is_standby_cluster,
nofailover=nofailover,
major_version=self.major_version)

View File

@@ -12,6 +12,7 @@ from types import TracebackType
from typing import Any, Collection, Dict, Iterator, List, Optional, Union, Tuple, Type, TYPE_CHECKING
from .validator import recovery_parameters, transform_postgresql_parameter_value, transform_recovery_parameter_value
from .. import global_config
from ..collections import CaseInsensitiveDict, CaseInsensitiveSet
from ..dcs import Leader, Member, RemoteMember, slot_name_from_member_name
from ..exceptions import PatroniFatalException, PostgresConnectionException
@@ -595,7 +596,7 @@ class ConfigHandler(object):
is_remote_member = isinstance(member, RemoteMember)
primary_conninfo = self.primary_conninfo_params(member)
if primary_conninfo:
use_slots = self.get('use_slots', True) and self._postgresql.major_version >= 90400
use_slots = global_config.use_slots and self._postgresql.major_version >= 90400
if use_slots and not (is_remote_member and member.no_replication_slot):
primary_slot_name = member.primary_slot_name if is_remote_member else self._postgresql.name
recovery_params['primary_slot_name'] = slot_name_from_member_name(primary_slot_name)
@@ -930,10 +931,10 @@ class ConfigHandler(object):
parameters = config['parameters'].copy()
listen_addresses, port = split_host_port(config['listen'], 5432)
parameters.update(cluster_name=self._postgresql.scope, listen_addresses=listen_addresses, port=str(port))
if not self._postgresql.global_config or self._postgresql.global_config.is_synchronous_mode:
if global_config.is_synchronous_mode:
synchronous_standby_names = self._server_parameters.get('synchronous_standby_names')
if synchronous_standby_names is None:
if self._postgresql.global_config and self._postgresql.global_config.is_synchronous_mode_strict\
if global_config.is_synchronous_mode_strict\
and self._postgresql.role in ('master', 'primary', 'promoted'):
parameters['synchronous_standby_names'] = '*'
else:

View File

@@ -13,6 +13,7 @@ from typing import Any, Dict, Iterator, List, Optional, Union, Tuple, TYPE_CHECK
from .connection import get_connection_cursor
from .misc import format_lsn, fsync_dir
from .. import global_config
from ..dcs import Cluster, Leader
from ..file_perm import pg_perm
from ..psycopg import OperationalError
@@ -293,7 +294,7 @@ class SlotsHandler:
"""
slot = self._replication_slots[name]
if cluster.config:
for matcher in cluster.config.ignore_slots_matchers:
for matcher in global_config.ignore_slots_matchers:
if (
(matcher.get("name") is None or matcher["name"] == name)
and all(not matcher.get(a) or matcher[a] == slot.get(a)
@@ -510,13 +511,12 @@ class SlotsHandler:
:returns: list of logical replication slots names that should be copied from the primary.
"""
ret = []
if self._postgresql.major_version >= 90400 and self._postgresql.global_config and cluster.config:
if self._postgresql.major_version >= 90400 and cluster.config:
try:
self.load_replication_slots()
slots = cluster.get_replication_slots(
self._postgresql.name, self._postgresql.role, nofailover, self._postgresql.major_version,
is_standby_cluster=self._postgresql.global_config.is_standby_cluster, show_error=True)
slots = cluster.get_replication_slots(self._postgresql.name, self._postgresql.role,
nofailover, self._postgresql.major_version, show_error=True)
self._drop_incorrect_slots(cluster, slots, paused)

View File

@@ -5,6 +5,7 @@ import time
from copy import deepcopy
from typing import Collection, List, NamedTuple, Tuple, TYPE_CHECKING
from .. import global_config
from ..collections import CaseInsensitiveDict, CaseInsensitiveSet
from ..dcs import Cluster
from ..psycopg import quote_ident as _quote_ident
@@ -303,11 +304,8 @@ END;$$""")
replica_list = _ReplicaList(self._postgresql, cluster)
self._process_replica_readiness(cluster, replica_list)
if TYPE_CHECKING: # pragma: no cover
assert self._postgresql.global_config is not None
sync_node_count = self._postgresql.global_config.synchronous_node_count\
if self._postgresql.supports_multiple_sync else 1
sync_node_maxlag = self._postgresql.global_config.maximum_lag_on_syncnode
sync_node_count = global_config.synchronous_node_count if self._postgresql.supports_multiple_sync else 1
sync_node_maxlag = global_config.maximum_lag_on_syncnode
candidates = CaseInsensitiveSet()
sync_nodes = CaseInsensitiveSet()

View File

@@ -33,7 +33,6 @@ from .version import __version__
if TYPE_CHECKING: # pragma: no cover
from .dcs import Cluster
from .config import GlobalConfig
tzutc = tz.tzutc()
@@ -759,12 +758,10 @@ def iter_response_objects(response: HTTPResponse) -> Iterator[Dict[str, Any]]:
prev = chunk[idx:]
def cluster_as_json(cluster: 'Cluster', global_config: Optional['GlobalConfig'] = None) -> Dict[str, Any]:
def cluster_as_json(cluster: 'Cluster') -> Dict[str, Any]:
"""Get a JSON representation of *cluster*.
:param cluster: the :class:`~patroni.dcs.Cluster` object to be parsed as JSON.
:param global_config: optional :class:`~patroni.config.GlobalConfig` object to check the cluster state.
if not provided will be instantiated from the `Cluster.config`.
:returns: JSON representation of *cluster*.
@@ -793,16 +790,16 @@ def cluster_as_json(cluster: 'Cluster', global_config: Optional['GlobalConfig']
* ``from``: name of the member to be demoted;
* ``to``: name of the member to be promoted.
"""
if not global_config:
from patroni.config import get_global_config
global_config = get_global_config(cluster)
from . import global_config
config = global_config.from_cluster(cluster)
leader_name = cluster.leader.name if cluster.leader else None
cluster_lsn = cluster.last_lsn or 0
ret: Dict[str, Any] = {'members': []}
for m in cluster.members:
if m.name == leader_name:
role = 'standby_leader' if global_config.is_standby_cluster else 'leader'
role = 'standby_leader' if config.is_standby_cluster else 'leader'
elif cluster.sync.matches(m.name):
role = 'sync_standby'
else:
@@ -832,7 +829,7 @@ def cluster_as_json(cluster: 'Cluster', global_config: Optional['GlobalConfig']
# sort members by name for consistency
cmp: Callable[[Dict[str, Any]], bool] = lambda m: m['name']
ret['members'].sort(key=cmp)
if global_config.is_paused:
if config.is_paused:
ret['pause'] = True
if cluster.failover and cluster.failover.scheduled_at:
ret['scheduled_switchover'] = {'at': cluster.failover.scheduled_at.isoformat()}

View File

@@ -8,8 +8,8 @@ from io import BytesIO as IO
from mock import Mock, PropertyMock, patch
from socketserver import ThreadingMixIn
from patroni import global_config
from patroni.api import RestApiHandler, RestApiServer
from patroni.config import GlobalConfig
from patroni.dcs import ClusterConfig, Member
from patroni.exceptions import PostgresConnectionException
from patroni.ha import _MemberStatus
@@ -148,16 +148,9 @@ class MockLogger(object):
records_lost = 1
class MockConfig(object):
def get_global_config(self, _):
return GlobalConfig({})
class MockPatroni(object):
ha = MockHa()
config = MockConfig()
postgresql = ha.state_handler
dcs = Mock()
logger = MockLogger()
@@ -211,7 +204,7 @@ class TestRestApiHandler(unittest.TestCase):
def test_do_GET(self):
MockPatroni.dcs.cluster.last_lsn = 20
MockPatroni.dcs.cluster.sync.members = [MockPostgresql.name]
with patch.object(GlobalConfig, 'is_synchronous_mode', PropertyMock(return_value=True)):
with patch.object(global_config.__class__, 'is_synchronous_mode', PropertyMock(return_value=True)):
MockRestApiServer(RestApiHandler, 'GET /replica')
MockRestApiServer(RestApiHandler, 'GET /replica?lag=1M')
MockRestApiServer(RestApiHandler, 'GET /replica?lag=10MB')
@@ -234,7 +227,7 @@ class TestRestApiHandler(unittest.TestCase):
with patch.object(MockHa, 'is_leader', Mock(return_value=True)):
MockRestApiServer(RestApiHandler, 'GET /replica')
MockRestApiServer(RestApiHandler, 'GET /read-only-sync')
with patch.object(GlobalConfig, 'is_standby_cluster', Mock(return_value=True)):
with patch.object(global_config.__class__, 'is_standby_cluster', Mock(return_value=True)):
MockRestApiServer(RestApiHandler, 'GET /standby_leader')
MockPatroni.dcs.cluster = None
with patch.object(RestApiHandler, 'get_postgresql_status', Mock(return_value={'role': 'primary'})):
@@ -244,8 +237,8 @@ class TestRestApiHandler(unittest.TestCase):
self.assertIsNotNone(MockRestApiServer(RestApiHandler, 'GET /primary'))
with patch.object(RestApiServer, 'query', Mock(return_value=[('', 1, '', '', '', '', False, None, None, '')])):
self.assertIsNotNone(MockRestApiServer(RestApiHandler, 'GET /patroni'))
with patch.object(GlobalConfig, 'is_standby_cluster', Mock(return_value=True)), \
patch.object(GlobalConfig, 'is_paused', Mock(return_value=True)):
with patch.object(global_config.__class__, 'is_standby_cluster', Mock(return_value=True)), \
patch.object(global_config.__class__, 'is_paused', Mock(return_value=True)):
MockRestApiServer(RestApiHandler, 'GET /standby_leader')
# test tags
@@ -475,7 +468,7 @@ class TestRestApiHandler(unittest.TestCase):
request = make_request(role='primary', postgres_version='9.5.2')
MockRestApiServer(RestApiHandler, request)
with patch.object(GlobalConfig, 'is_paused', PropertyMock(return_value=True)):
with patch.object(global_config.__class__, 'is_paused', PropertyMock(return_value=True)):
MockRestApiServer(RestApiHandler, make_request(schedule='2016-08-42 12:45TZ+1', role='primary'))
# Valid timeout
MockRestApiServer(RestApiHandler, make_request(timeout='60s'))
@@ -537,7 +530,7 @@ class TestRestApiHandler(unittest.TestCase):
# Switchover in pause mode
with patch.object(RestApiHandler, 'write_response') as response_mock, \
patch.object(GlobalConfig, 'is_paused', PropertyMock(return_value=True)):
patch.object(global_config.__class__, 'is_paused', PropertyMock(return_value=True)):
MockRestApiServer(RestApiHandler, request)
response_mock.assert_called_with(
400, 'Switchover is possible only to a specific candidate in a paused state')
@@ -546,7 +539,8 @@ class TestRestApiHandler(unittest.TestCase):
for is_synchronous_mode, response in (
(True, 'switchover is not possible: can not find sync_standby'),
(False, 'switchover is not possible: cluster does not have members except leader')):
with patch.object(GlobalConfig, 'is_synchronous_mode', PropertyMock(return_value=is_synchronous_mode)), \
with patch.object(global_config.__class__, 'is_synchronous_mode',
PropertyMock(return_value=is_synchronous_mode)), \
patch.object(RestApiHandler, 'write_response') as response_mock:
MockRestApiServer(RestApiHandler, request)
response_mock.assert_called_with(412, response)
@@ -571,7 +565,8 @@ class TestRestApiHandler(unittest.TestCase):
cluster.sync.matches.return_value = False
for is_synchronous_mode, response in (
(True, 'candidate name does not match with sync_standby'), (False, 'candidate does not exists')):
with patch.object(GlobalConfig, 'is_synchronous_mode', PropertyMock(return_value=is_synchronous_mode)), \
with patch.object(global_config.__class__, 'is_synchronous_mode',
PropertyMock(return_value=is_synchronous_mode)), \
patch.object(RestApiHandler, 'write_response') as response_mock:
MockRestApiServer(RestApiHandler, request)
response_mock.assert_called_with(412, response)
@@ -632,7 +627,7 @@ class TestRestApiHandler(unittest.TestCase):
# Schedule in paused mode
with patch.object(RestApiHandler, 'write_response') as response_mock, \
patch.object(GlobalConfig, 'is_paused', PropertyMock(return_value=True)):
patch.object(global_config.__class__, 'is_paused', PropertyMock(return_value=True)):
dcs.manual_failover.return_value = False
MockRestApiServer(RestApiHandler, request)
response_mock.assert_called_with(400, "Can't schedule switchover in the paused state")

View File

@@ -5,7 +5,11 @@ import io
from copy import deepcopy
from mock import MagicMock, Mock, patch
from patroni.config import Config, ConfigParseError, GlobalConfig
from patroni import global_config
from patroni.config import ClusterConfig, Config, ConfigParseError
from .test_ha import get_cluster_initialized_with_only_leader
class TestConfig(unittest.TestCase):
@@ -248,4 +252,6 @@ class TestConfig(unittest.TestCase):
def test_global_config_is_synchronous_mode(self):
# we should ignore synchronous_mode setting in a standby cluster
config = {'standby_cluster': {'host': 'some_host'}, 'synchronous_mode': True}
self.assertFalse(GlobalConfig(config).is_synchronous_mode)
cluster = get_cluster_initialized_with_only_leader(cluster_config=ClusterConfig(1, config, 1))
test_config = global_config.from_cluster(cluster)
self.assertFalse(test_config.is_synchronous_mode)

View File

@@ -7,6 +7,7 @@ import unittest
from click.testing import CliRunner
from datetime import datetime, timedelta
from mock import patch, Mock, PropertyMock
from patroni import global_config
from patroni.ctl import ctl, load_config, output_members, get_dcs, parse_dcs, \
get_all_members, get_any_member, get_cursor, query_member, PatroniCtlException, apply_config_changes, \
format_config_for_editing, show_diff, invoke_editor, format_pg_version, CONFIG_FILE_PATH, PatronictlPrettyTable
@@ -147,7 +148,7 @@ class TestCtl(unittest.TestCase):
self.assertEqual(result.exit_code, 0)
# Scheduled in pause mode
with patch('patroni.config.GlobalConfig.is_paused', PropertyMock(return_value=True)):
with patch.object(global_config.__class__, 'is_paused', PropertyMock(return_value=True)):
result = self.runner.invoke(ctl, ['switchover', 'dummy', '--group', '0',
'--force', '--scheduled', '2015-01-01T12:00:00'])
self.assertEqual(result.exit_code, 1)
@@ -369,7 +370,7 @@ class TestCtl(unittest.TestCase):
result = self.runner.invoke(ctl, ['restart', 'alpha', 'other', '--force', '--scheduled', '2300-10-01T14:30'])
assert 'Failed: flush scheduled restart' in result.output
with patch('patroni.config.GlobalConfig.is_paused', PropertyMock(return_value=True)):
with patch.object(global_config.__class__, 'is_paused', PropertyMock(return_value=True)):
result = self.runner.invoke(ctl,
['restart', 'alpha', 'other', '--force', '--scheduled', '2300-10-01T14:30'])
assert result.exit_code == 1
@@ -533,7 +534,7 @@ class TestCtl(unittest.TestCase):
result = self.runner.invoke(ctl, ['pause', 'dummy'])
assert 'Failed' in result.output
with patch('patroni.config.GlobalConfig.is_paused', PropertyMock(return_value=True)):
with patch.object(global_config.__class__, 'is_paused', PropertyMock(return_value=True)):
result = self.runner.invoke(ctl, ['pause', 'dummy'])
assert 'Cluster is already paused' in result.output
@@ -552,11 +553,11 @@ class TestCtl(unittest.TestCase):
@patch('patroni.dcs.AbstractDCS.get_cluster', Mock(return_value=get_cluster_initialized_with_leader()))
def test_resume_cluster(self, mock_post):
mock_post.return_value.status = 200
with patch('patroni.config.GlobalConfig.is_paused', PropertyMock(return_value=False)):
with patch.object(global_config.__class__, 'is_paused', PropertyMock(return_value=False)):
result = self.runner.invoke(ctl, ['resume', 'dummy'])
assert 'Cluster is not paused' in result.output
with patch('patroni.config.GlobalConfig.is_paused', PropertyMock(return_value=True)):
with patch.object(global_config.__class__, 'is_paused', PropertyMock(return_value=True)):
result = self.runner.invoke(ctl, ['resume', 'dummy'])
assert 'Success' in result.output

View File

@@ -4,6 +4,7 @@ import os
import sys
from mock import Mock, MagicMock, PropertyMock, patch, mock_open
from patroni import global_config
from patroni.collections import CaseInsensitiveSet
from patroni.config import Config
from patroni.dcs import Cluster, ClusterConfig, Failover, Leader, Member, get_dcs, Status, SyncState, TimelineHistory
@@ -217,6 +218,7 @@ class TestHa(PostgresInit):
self.ha = Ha(MockPatroni(self.p, self.e))
self.ha.old_cluster = self.e.get_cluster()
self.ha.cluster = get_cluster_initialized_without_leader()
global_config.update(self.ha.cluster)
self.ha.load_cluster_from_dcs = Mock()
def test_update_lock(self):
@@ -251,8 +253,10 @@ class TestHa(PostgresInit):
@patch('patroni.dcs.etcd.Etcd.initialize', return_value=True)
def test_bootstrap_as_standby_leader(self, initialize):
self.p.data_directory_empty = true
self.ha.cluster = get_cluster_not_initialized_without_leader(
cluster_config=ClusterConfig(1, {"standby_cluster": {"port": 5432}}, 1))
global_config.update(self.ha.cluster)
self.ha.cluster = get_cluster_not_initialized_without_leader(cluster_config=ClusterConfig(0, {}, 0))
self.ha.patroni.config._dynamic_configuration = {"standby_cluster": {"port": 5432}}
self.assertEqual(self.ha.run_cycle(), 'trying to bootstrap a new standby leader')
def test_bootstrap_waiting_for_standby_leader(self):
@@ -318,7 +322,7 @@ class TestHa(PostgresInit):
self.ha.state_handler.cancellable._process = Mock()
self.ha._crash_recovery_started -= 600
self.ha.cluster.config.data.update({'maximum_lag_on_failover': 10})
self.ha.global_config = self.ha.patroni.config.get_global_config(self.ha.cluster)
global_config.update(self.ha.cluster)
self.assertEqual(self.ha.run_cycle(), 'terminated crash recovery because of startup timeout')
@patch.object(Rewind, 'ensure_clean_shutdown', Mock())
@@ -509,7 +513,7 @@ class TestHa(PostgresInit):
def test_check_failsafe_topology(self):
self.ha.load_cluster_from_dcs = Mock(side_effect=DCSError('Etcd is not responding properly'))
self.ha.cluster = get_cluster_initialized_with_leader_and_failsafe()
self.ha.global_config = self.ha.patroni.config.get_global_config(self.ha.cluster)
global_config.update(self.ha.cluster)
self.ha.dcs._last_failsafe = self.ha.cluster.failsafe
self.assertEqual(self.ha.run_cycle(), 'demoting self because DCS is not accessible and I was a leader')
self.ha.state_handler.name = self.ha.cluster.leader.name
@@ -529,7 +533,7 @@ class TestHa(PostgresInit):
def test_no_dcs_connection_primary_failsafe(self):
self.ha.load_cluster_from_dcs = Mock(side_effect=DCSError('Etcd is not responding properly'))
self.ha.cluster = get_cluster_initialized_with_leader_and_failsafe()
self.ha.global_config = self.ha.patroni.config.get_global_config(self.ha.cluster)
global_config.update(self.ha.cluster)
self.ha.dcs._last_failsafe = self.ha.cluster.failsafe
self.ha.state_handler.name = self.ha.cluster.leader.name
self.assertEqual(self.ha.run_cycle(),
@@ -546,7 +550,7 @@ class TestHa(PostgresInit):
def test_no_dcs_connection_replica_failsafe(self):
self.ha.load_cluster_from_dcs = Mock(side_effect=DCSError('Etcd is not responding properly'))
self.ha.cluster = get_cluster_initialized_with_leader_and_failsafe()
self.ha.global_config = self.ha.patroni.config.get_global_config(self.ha.cluster)
global_config.update(self.ha.cluster)
self.ha.update_failsafe({'name': 'leader', 'api_url': 'http://127.0.0.1:8008/patroni',
'conn_url': 'postgres://127.0.0.1:5432/postgres', 'slots': {'foo': 1000}})
self.p.is_primary = false
@@ -766,7 +770,7 @@ class TestHa(PostgresInit):
with patch('patroni.ha.logger.info') as mock_info:
self.ha.fetch_node_status = get_node_status(wal_position=1)
self.ha.cluster.config.data.update({'maximum_lag_on_failover': 5})
self.ha.global_config = self.ha.patroni.config.get_global_config(self.ha.cluster)
global_config.update(self.ha.cluster)
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
self.assertEqual(mock_info.call_args_list[0][0], ('Member %s exceeds maximum replication lag', 'leader'))
@@ -1032,7 +1036,7 @@ class TestHa(PostgresInit):
def test__is_healthiest_node(self):
self.p.is_primary = false
self.ha.cluster = get_cluster_initialized_without_leader(sync=('postgresql1', self.p.name))
self.ha.global_config = self.ha.patroni.config.get_global_config(self.ha.cluster)
global_config.update(self.ha.cluster)
self.assertTrue(self.ha._is_healthiest_node(self.ha.old_cluster.members))
self.ha.fetch_node_status = get_node_status() # accessible, in_recovery
self.assertTrue(self.ha._is_healthiest_node(self.ha.old_cluster.members))
@@ -1049,7 +1053,7 @@ class TestHa(PostgresInit):
with patch.object(Ha, 'is_synchronous_mode', Mock(return_value=True)):
self.assertTrue(self.ha._is_healthiest_node(self.ha.old_cluster.members))
self.ha.cluster.config.data.update({'maximum_lag_on_failover': 5})
self.ha.global_config = self.ha.patroni.config.get_global_config(self.ha.cluster)
global_config.update(self.ha.cluster)
with patch('patroni.postgresql.Postgresql.last_operation', return_value=1):
self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members))
with patch('patroni.postgresql.Postgresql.replica_cached_timeline', return_value=None):
@@ -1272,7 +1276,7 @@ class TestHa(PostgresInit):
self.p.is_running = false
self.ha.cluster = get_cluster_initialized_with_leader(sync=(self.p.name, 'other'))
self.ha.cluster.config.data.update({'synchronous_mode': True, 'primary_start_timeout': 0})
self.ha.global_config = self.ha.patroni.config.get_global_config(self.ha.cluster)
global_config.update(self.ha.cluster)
self.ha.has_lock = true
self.ha.update_lock = true
self.ha.fetch_node_status = get_node_status() # accessible, in_recovery
@@ -1282,13 +1286,13 @@ class TestHa(PostgresInit):
def test_primary_stop_timeout(self):
self.assertEqual(self.ha.primary_stop_timeout(), None)
self.ha.cluster.config.data.update({'primary_stop_timeout': 30})
self.ha.global_config = self.ha.patroni.config.get_global_config(self.ha.cluster)
global_config.update(self.ha.cluster)
with patch.object(Ha, 'is_synchronous_mode', Mock(return_value=True)):
self.assertEqual(self.ha.primary_stop_timeout(), 30)
with patch.object(Ha, 'is_synchronous_mode', Mock(return_value=False)):
self.assertEqual(self.ha.primary_stop_timeout(), None)
self.ha.cluster.config.data['primary_stop_timeout'] = None
self.ha.global_config = self.ha.patroni.config.get_global_config(self.ha.cluster)
global_config.update(self.ha.cluster)
self.assertEqual(self.ha.primary_stop_timeout(), None)
@patch('patroni.postgresql.Postgresql.follow')
@@ -1380,8 +1384,9 @@ class TestHa(PostgresInit):
# Test sync set to '*' when synchronous_mode_strict is enabled
mock_set_sync.reset_mock()
self.p.sync_handler.current_state = Mock(return_value=(CaseInsensitiveSet(), CaseInsensitiveSet()))
with patch('patroni.config.GlobalConfig.is_synchronous_mode_strict', PropertyMock(return_value=True)):
self.ha.run_cycle()
self.ha.cluster.config.data['synchronous_mode_strict'] = True
global_config.update(self.ha.cluster)
self.ha.run_cycle()
mock_set_sync.assert_called_once_with(CaseInsensitiveSet('*'))
def test_sync_replication_become_primary(self):
@@ -1514,7 +1519,6 @@ class TestHa(PostgresInit):
@patch('patroni.postgresql.mtime', Mock(return_value=1588316884))
@patch('builtins.open', Mock(side_effect=Exception))
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
def test_restore_cluster_config(self):
self.ha.cluster.config.data.clear()
self.ha.has_lock = true

View File

@@ -154,6 +154,7 @@ class TestPatroni(unittest.TestCase):
self.p.api.start = Mock()
self.p.logger.start = Mock()
self.p.config._dynamic_configuration = {}
self.assertRaises(SleepException, self.p.run)
with patch('patroni.dcs.Cluster.is_unlocked', Mock(return_value=True)):
self.assertRaises(SleepException, self.p.run)
with patch('patroni.config.Config.reload_local_configuration', Mock(return_value=False)):

View File

@@ -9,9 +9,9 @@ from mock import Mock, MagicMock, PropertyMock, patch, mock_open
import patroni.psycopg as psycopg
from patroni import global_config
from patroni.async_executor import CriticalTask
from patroni.collections import CaseInsensitiveSet
from patroni.config import GlobalConfig
from patroni.dcs import RemoteMember
from patroni.exceptions import PostgresConnectionException, PatroniException
from patroni.postgresql import Postgresql, STATE_REJECT, STATE_NO_RESPONSE
@@ -692,12 +692,12 @@ class TestPostgresql(BaseTestPostgresql):
def test_get_server_parameters(self):
config = {'parameters': {'wal_level': 'hot_standby', 'max_prepared_transactions': 100}, 'listen': '0'}
self.p._global_config = GlobalConfig({'synchronous_mode': True})
self.p.config.get_server_parameters(config)
self.p._global_config = GlobalConfig({'synchronous_mode': True, 'synchronous_mode_strict': True})
self.p.config.get_server_parameters(config)
self.p.config.set_synchronous_standby_names('foo')
self.assertTrue(str(self.p.config.get_server_parameters(config)).startswith('<CaseInsensitiveDict'))
with patch.object(global_config.__class__, 'is_synchronous_mode', PropertyMock(return_value=True)):
self.p.config.get_server_parameters(config)
with patch.object(global_config.__class__, 'is_synchronous_mode_strict', PropertyMock(return_value=True)):
self.p.config.get_server_parameters(config)
self.p.config.set_synchronous_standby_names('foo')
self.assertTrue(str(self.p.config.get_server_parameters(config)).startswith('<CaseInsensitiveDict'))
@patch('time.sleep', Mock())
def test__wait_for_connection_close(self):

View File

@@ -6,8 +6,7 @@ import unittest
from mock import Mock, PropertyMock, patch
from threading import Thread
from patroni import psycopg
from patroni.config import GlobalConfig
from patroni import global_config, psycopg
from patroni.dcs import Cluster, ClusterConfig, Member, Status, SyncState
from patroni.postgresql import Postgresql
from patroni.postgresql.misc import fsync_dir
@@ -29,12 +28,12 @@ class TestSlotsHandler(BaseTestPostgresql):
@patch.object(Postgresql, 'is_running', Mock(return_value=True))
def setUp(self):
super(TestSlotsHandler, self).setUp()
self.p._global_config = GlobalConfig({})
self.s = self.p.slots_handler
self.p.start()
config = ClusterConfig(1, {'slots': {'ls': {'database': 'a', 'plugin': 'b'}, 'ls2': None}}, 1)
self.cluster = Cluster(True, config, self.leader, Status(0, {'ls': 12345, 'ls2': 12345}),
[self.me, self.other, self.leadermem], None, SyncState.empty(), None, None)
global_config.update(self.cluster)
def test_sync_replication_slots(self):
config = ClusterConfig(1, {'slots': {'test_3': {'database': 'a', 'plugin': 'b'},
@@ -42,11 +41,12 @@ class TestSlotsHandler(BaseTestPostgresql):
'ignore_slots': [{'name': 'blabla'}]}, 1)
cluster = Cluster(True, config, self.leader, Status(0, {'test_3': 10}),
[self.me, self.other, self.leadermem], None, SyncState.empty(), None, None)
global_config.update(cluster)
with mock.patch('patroni.postgresql.Postgresql._query', Mock(side_effect=psycopg.OperationalError)):
self.s.sync_replication_slots(cluster, False)
self.p.set_role('standby_leader')
with patch.object(SlotsHandler, 'drop_replication_slot', Mock(return_value=(True, False))), \
patch.object(GlobalConfig, 'is_standby_cluster', PropertyMock(return_value=True)), \
patch.object(global_config.__class__, 'is_standby_cluster', PropertyMock(return_value=True)), \
patch('patroni.postgresql.slots.logger.debug') as mock_debug:
self.s.sync_replication_slots(cluster, False)
mock_debug.assert_called_once()
@@ -94,6 +94,7 @@ class TestSlotsHandler(BaseTestPostgresql):
'ignore_slots': [{'name': 'blabla'}]}, 1)
cluster = Cluster(True, config, self.leader, Status.empty(), [self.me, self.other, self.leadermem],
None, SyncState.empty(), None, None)
global_config.update(cluster)
self.s.sync_replication_slots(cluster, False)
with patch.object(Postgresql, '_query') as mock_query:
@@ -189,6 +190,7 @@ class TestSlotsHandler(BaseTestPostgresql):
config = ClusterConfig(1, {'slots': {'blabla': {'type': 'physical'}, 'leader': None}}, 1)
cluster = Cluster(True, config, self.leader, Status(0, {'blabla': 12346}),
[self.me, self.other, self.leadermem], None, SyncState.empty(), None, None)
global_config.update(cluster)
self.s.sync_replication_slots(cluster, False)
with patch.object(SlotsHandler, '_query', Mock(side_effect=[[('blabla', 'physical', 12345, None, None, None,
None, None)], Exception])) as mock_query, \

View File

@@ -1,9 +1,9 @@
import os
from mock import Mock, patch
from mock import Mock, patch, PropertyMock
from patroni import global_config
from patroni.collections import CaseInsensitiveSet
from patroni.config import GlobalConfig
from patroni.dcs import Cluster, SyncState
from patroni.postgresql import Postgresql
@@ -13,6 +13,7 @@ from . import BaseTestPostgresql, psycopg_connect, mock_available_gucs
@patch('subprocess.call', Mock(return_value=0))
@patch('patroni.psycopg.connect', psycopg_connect)
@patch.object(Postgresql, 'available_gucs', mock_available_gucs)
@patch.object(global_config.__class__, 'is_synchronous_mode', PropertyMock(return_value=True))
class TestSync(BaseTestPostgresql):
@patch('subprocess.call', Mock(return_value=0))
@@ -24,7 +25,6 @@ class TestSync(BaseTestPostgresql):
def setUp(self):
super(TestSync, self).setUp()
self.p.config.write_postgresql_conf()
self.p._global_config = GlobalConfig({'synchronous_mode': True})
self.s = self.p.sync_handler
@patch.object(Postgresql, 'last_operation', Mock(return_value=1))
@@ -96,7 +96,6 @@ class TestSync(BaseTestPostgresql):
self.assertEqual(value_in_conf(), None)
mock_reload.reset_mock()
self.p._global_config = GlobalConfig({'synchronous_mode': True})
self.s.set_synchronous_standby_names(CaseInsensitiveSet('*'))
mock_reload.assert_called()
self.assertEqual(value_in_conf(), "synchronous_standby_names = '*'")