mirror of
https://github.com/outbackdingo/patroni.git
synced 2026-01-27 10:20:10 +00:00
Make GlobalConfig really global (#2935)
1. extract `GlobalConfig` class to its own module 2. make the module instantiate the `GlobalConfig` object on load and replace sys.modules with the this instance 3. don't pass `GlobalConfig` object around, but use `patroni.global_config` module everywhere. 4. move `ignore_slots_matchers`, `max_timelines_history`, and `permanent_slots` from `ClusterConfig` to `GlobalConfig`. 5. add `use_slots` property to global_config and remove duplicated code from `Cluster` and `Postgresql.ConfigHandler`. Besides that improve readability of couple of checks in ha.py and formatting of `/config` key when saved from patronictl.
This commit is contained in:
committed by
GitHub
parent
91327f943c
commit
193c73f6b8
@@ -26,7 +26,7 @@ from urllib.parse import urlparse, parse_qs
|
||||
|
||||
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, TYPE_CHECKING, Union
|
||||
|
||||
from . import psycopg
|
||||
from . import global_config, psycopg
|
||||
from .__main__ import Patroni
|
||||
from .dcs import Cluster
|
||||
from .exceptions import PostgresConnectionException, PostgresException
|
||||
@@ -290,7 +290,7 @@ class RestApiHandler(BaseHTTPRequestHandler):
|
||||
|
||||
patroni = self.server.patroni
|
||||
cluster = patroni.dcs.cluster
|
||||
global_config = patroni.config.get_global_config(cluster)
|
||||
config = global_config.from_cluster(cluster)
|
||||
|
||||
leader_optime = cluster and cluster.last_lsn or 0
|
||||
replayed_location = response.get('xlog', {}).get('replayed_location', 0)
|
||||
@@ -308,7 +308,7 @@ class RestApiHandler(BaseHTTPRequestHandler):
|
||||
standby_leader_status_code = 200 if response.get('role') == 'standby_leader' else 503
|
||||
elif patroni.ha.is_leader():
|
||||
leader_status_code = 200
|
||||
if global_config.is_standby_cluster:
|
||||
if config.is_standby_cluster:
|
||||
primary_status_code = replica_status_code = 503
|
||||
standby_leader_status_code = 200 if response.get('role') in ('replica', 'standby_leader') else 503
|
||||
else:
|
||||
@@ -452,9 +452,8 @@ class RestApiHandler(BaseHTTPRequestHandler):
|
||||
HTTP status ``200`` and the JSON representation of the cluster topology.
|
||||
"""
|
||||
cluster = self.server.patroni.dcs.get_cluster()
|
||||
global_config = self.server.patroni.config.get_global_config(cluster)
|
||||
|
||||
response = cluster_as_json(cluster, global_config)
|
||||
response = cluster_as_json(cluster)
|
||||
response['scope'] = self.server.patroni.postgresql.scope
|
||||
self._write_json_response(200, response)
|
||||
|
||||
@@ -864,7 +863,7 @@ class RestApiHandler(BaseHTTPRequestHandler):
|
||||
if request:
|
||||
logger.debug("received restart request: {0}".format(request))
|
||||
|
||||
if self.server.patroni.config.get_global_config(cluster).is_paused and 'schedule' in request:
|
||||
if global_config.from_cluster(cluster).is_paused and 'schedule' in request:
|
||||
self.write_response(status_code, "Can't schedule restart in the paused state")
|
||||
return
|
||||
|
||||
@@ -1033,7 +1032,7 @@ class RestApiHandler(BaseHTTPRequestHandler):
|
||||
|
||||
:returns: a string with the error message or ``None`` if good nodes are found.
|
||||
"""
|
||||
is_synchronous_mode = self.server.patroni.config.get_global_config(cluster).is_synchronous_mode
|
||||
is_synchronous_mode = global_config.from_cluster(cluster).is_synchronous_mode
|
||||
if leader and (not cluster.leader or cluster.leader.name != leader):
|
||||
return 'leader name does not match'
|
||||
if candidate:
|
||||
@@ -1091,7 +1090,7 @@ class RestApiHandler(BaseHTTPRequestHandler):
|
||||
candidate = request.get('candidate') or request.get('member')
|
||||
scheduled_at = request.get('scheduled_at')
|
||||
cluster = self.server.patroni.dcs.get_cluster()
|
||||
global_config = self.server.patroni.config.get_global_config(cluster)
|
||||
config = global_config.from_cluster(cluster)
|
||||
|
||||
logger.info("received %s request with leader=%s candidate=%s scheduled_at=%s",
|
||||
action, leader, candidate, scheduled_at)
|
||||
@@ -1104,12 +1103,12 @@ class RestApiHandler(BaseHTTPRequestHandler):
|
||||
if not data and scheduled_at:
|
||||
if action == 'failover':
|
||||
data = "Failover can't be scheduled"
|
||||
elif global_config.is_paused:
|
||||
elif config.is_paused:
|
||||
data = "Can't schedule switchover in the paused state"
|
||||
else:
|
||||
(status_code, data, scheduled_at) = self.parse_schedule(scheduled_at, action)
|
||||
|
||||
if not data and global_config.is_paused and not candidate:
|
||||
if not data and config.is_paused and not candidate:
|
||||
data = 'Switchover is possible only to a specific candidate in a paused state'
|
||||
|
||||
if action == 'failover' and leader:
|
||||
@@ -1260,7 +1259,7 @@ class RestApiHandler(BaseHTTPRequestHandler):
|
||||
"""
|
||||
postgresql = self.server.patroni.postgresql
|
||||
cluster = self.server.patroni.dcs.cluster
|
||||
global_config = self.server.patroni.config.get_global_config(cluster)
|
||||
config = global_config.from_cluster(cluster)
|
||||
try:
|
||||
|
||||
if postgresql.state not in ('running', 'restarting', 'starting'):
|
||||
@@ -1291,10 +1290,10 @@ class RestApiHandler(BaseHTTPRequestHandler):
|
||||
})
|
||||
}
|
||||
|
||||
if result['role'] == 'replica' and global_config.is_standby_cluster:
|
||||
if result['role'] == 'replica' and config.is_standby_cluster:
|
||||
result['role'] = postgresql.role
|
||||
|
||||
if result['role'] == 'replica' and global_config.is_synchronous_mode\
|
||||
if result['role'] == 'replica' and config.is_synchronous_mode\
|
||||
and cluster and cluster.sync.matches(postgresql.name):
|
||||
result['sync_standby'] = True
|
||||
|
||||
@@ -1319,7 +1318,7 @@ class RestApiHandler(BaseHTTPRequestHandler):
|
||||
state = 'unknown'
|
||||
result: Dict[str, Any] = {'state': state, 'role': postgresql.role}
|
||||
|
||||
if global_config.is_paused:
|
||||
if config.is_paused:
|
||||
result['pause'] = True
|
||||
if not cluster or cluster.is_unlocked():
|
||||
result['cluster_unlocked'] = True
|
||||
|
||||
@@ -12,7 +12,7 @@ from typing import Any, Callable, Collection, Dict, List, Optional, Union, TYPE_
|
||||
|
||||
from . import PATRONI_ENV_PREFIX
|
||||
from .collections import CaseInsensitiveDict
|
||||
from .dcs import ClusterConfig, Cluster
|
||||
from .dcs import ClusterConfig
|
||||
from .exceptions import ConfigParseError
|
||||
from .file_perm import pg_perm
|
||||
from .postgresql.config import ConfigHandler
|
||||
@@ -54,154 +54,6 @@ def default_validator(conf: Dict[str, Any]) -> List[str]:
|
||||
return []
|
||||
|
||||
|
||||
class GlobalConfig(object):
|
||||
"""A class that wraps global configuration and provides convenient methods to access/check values.
|
||||
|
||||
It is instantiated either by calling :func:`get_global_config` or :meth:`Config.get_global_config`, which picks
|
||||
either a configuration from provided :class:`Cluster` object (the most up-to-date) or from the
|
||||
local cache if :class:`ClusterConfig` is not initialized or doesn't have a valid config.
|
||||
"""
|
||||
|
||||
def __init__(self, config: Dict[str, Any]) -> None:
|
||||
"""Initialize :class:`GlobalConfig` object with given *config*.
|
||||
|
||||
:param config: current configuration either from
|
||||
:class:`ClusterConfig` or from :func:`Config.dynamic_configuration`.
|
||||
"""
|
||||
self.__config = config
|
||||
|
||||
def get(self, name: str) -> Any:
|
||||
"""Gets global configuration value by *name*.
|
||||
|
||||
:param name: parameter name.
|
||||
|
||||
:returns: configuration value or ``None`` if it is missing.
|
||||
"""
|
||||
return self.__config.get(name)
|
||||
|
||||
def check_mode(self, mode: str) -> bool:
|
||||
"""Checks whether the certain parameter is enabled.
|
||||
|
||||
:param mode: parameter name, e.g. ``synchronous_mode``, ``failsafe_mode``, ``pause``, ``check_timeline``, and
|
||||
so on.
|
||||
|
||||
:returns: ``True`` if parameter *mode* is enabled in the global configuration.
|
||||
"""
|
||||
return bool(parse_bool(self.__config.get(mode)))
|
||||
|
||||
@property
|
||||
def is_paused(self) -> bool:
|
||||
"""``True`` if cluster is in maintenance mode."""
|
||||
return self.check_mode('pause')
|
||||
|
||||
@property
|
||||
def is_synchronous_mode(self) -> bool:
|
||||
"""``True`` if synchronous replication is requested and it is not a standby cluster config."""
|
||||
return self.check_mode('synchronous_mode') and not self.is_standby_cluster
|
||||
|
||||
@property
|
||||
def is_synchronous_mode_strict(self) -> bool:
|
||||
"""``True`` if at least one synchronous node is required."""
|
||||
return self.check_mode('synchronous_mode_strict')
|
||||
|
||||
def get_standby_cluster_config(self) -> Union[Dict[str, Any], Any]:
|
||||
"""Get ``standby_cluster`` configuration.
|
||||
|
||||
:returns: a copy of ``standby_cluster`` configuration.
|
||||
"""
|
||||
return deepcopy(self.get('standby_cluster'))
|
||||
|
||||
@property
|
||||
def is_standby_cluster(self) -> bool:
|
||||
"""``True`` if global configuration has a valid ``standby_cluster`` section."""
|
||||
config = self.get_standby_cluster_config()
|
||||
return isinstance(config, dict) and\
|
||||
bool(config.get('host') or config.get('port') or config.get('restore_command'))
|
||||
|
||||
def get_int(self, name: str, default: int = 0) -> int:
|
||||
"""Gets current value of *name* from the global configuration and try to return it as :class:`int`.
|
||||
|
||||
:param name: name of the parameter.
|
||||
:param default: default value if *name* is not in the configuration or invalid.
|
||||
|
||||
:returns: currently configured value of *name* from the global configuration or *default* if it is not set or
|
||||
invalid.
|
||||
"""
|
||||
ret = parse_int(self.get(name))
|
||||
return default if ret is None else ret
|
||||
|
||||
@property
|
||||
def min_synchronous_nodes(self) -> int:
|
||||
"""The minimal number of synchronous nodes based on whether ``synchronous_mode_strict`` is enabled or not."""
|
||||
return 1 if self.is_synchronous_mode_strict else 0
|
||||
|
||||
@property
|
||||
def synchronous_node_count(self) -> int:
|
||||
"""Currently configured value of ``synchronous_node_count`` from the global configuration.
|
||||
|
||||
Assume ``1`` if it is not set or invalid.
|
||||
"""
|
||||
return max(self.get_int('synchronous_node_count', 1), self.min_synchronous_nodes)
|
||||
|
||||
@property
|
||||
def maximum_lag_on_failover(self) -> int:
|
||||
"""Currently configured value of ``maximum_lag_on_failover`` from the global configuration.
|
||||
|
||||
Assume ``1048576`` if it is not set or invalid.
|
||||
"""
|
||||
return self.get_int('maximum_lag_on_failover', 1048576)
|
||||
|
||||
@property
|
||||
def maximum_lag_on_syncnode(self) -> int:
|
||||
"""Currently configured value of ``maximum_lag_on_syncnode`` from the global configuration.
|
||||
|
||||
Assume ``-1`` if it is not set or invalid.
|
||||
"""
|
||||
return self.get_int('maximum_lag_on_syncnode', -1)
|
||||
|
||||
@property
|
||||
def primary_start_timeout(self) -> int:
|
||||
"""Currently configured value of ``primary_start_timeout`` from the global configuration.
|
||||
|
||||
Assume ``300`` if it is not set or invalid.
|
||||
|
||||
.. note::
|
||||
``master_start_timeout`` is still supported to keep backward compatibility.
|
||||
"""
|
||||
default = 300
|
||||
return self.get_int('primary_start_timeout', default)\
|
||||
if 'primary_start_timeout' in self.__config else self.get_int('master_start_timeout', default)
|
||||
|
||||
@property
|
||||
def primary_stop_timeout(self) -> int:
|
||||
"""Currently configured value of ``primary_stop_timeout`` from the global configuration.
|
||||
|
||||
Assume ``0`` if it is not set or invalid.
|
||||
|
||||
.. note::
|
||||
``master_stop_timeout`` is still supported to keep backward compatibility.
|
||||
"""
|
||||
default = 0
|
||||
return self.get_int('primary_stop_timeout', default)\
|
||||
if 'primary_stop_timeout' in self.__config else self.get_int('master_stop_timeout', default)
|
||||
|
||||
|
||||
def get_global_config(cluster: Optional[Cluster], default: Optional[Dict[str, Any]] = None) -> GlobalConfig:
|
||||
"""Instantiates :class:`GlobalConfig` based on the input.
|
||||
|
||||
:param cluster: the currently known cluster state from DCS.
|
||||
:param default: default configuration, which will be used if there is no valid *cluster.config*.
|
||||
|
||||
:returns: :class:`GlobalConfig` object.
|
||||
"""
|
||||
# Try to protect from the case when DCS was wiped out
|
||||
if cluster and cluster.config and cluster.config.modify_version:
|
||||
config = cluster.config.data
|
||||
else:
|
||||
config = default or {}
|
||||
return GlobalConfig(deepcopy(config))
|
||||
|
||||
|
||||
class Config(object):
|
||||
"""Handle Patroni configuration.
|
||||
|
||||
@@ -949,18 +801,6 @@ class Config(object):
|
||||
"""
|
||||
return deepcopy(self.__effective_configuration)
|
||||
|
||||
def get_global_config(self, cluster: Optional[Cluster]) -> GlobalConfig:
|
||||
"""Instantiate :class:`GlobalConfig` based on input.
|
||||
|
||||
Use the configuration from provided *cluster* (the most up-to-date) or from the
|
||||
local cache if *cluster.config* is not initialized or doesn't have a valid config.
|
||||
|
||||
:param cluster: the currently known cluster state from DCS.
|
||||
|
||||
:returns: :class:`GlobalConfig` object.
|
||||
"""
|
||||
return get_global_config(cluster, self._dynamic_configuration)
|
||||
|
||||
def _validate_failover_tags(self) -> None:
|
||||
"""Check ``nofailover``/``failover_priority`` config and warn user if it's contradictory.
|
||||
|
||||
|
||||
@@ -46,7 +46,8 @@ try:
|
||||
except ImportError: # pragma: no cover
|
||||
from cdiff import markup_to_pager, PatchStream # pyright: ignore [reportMissingModuleSource]
|
||||
|
||||
from .config import Config, get_global_config
|
||||
from . import global_config
|
||||
from .config import Config
|
||||
from .dcs import get_dcs as _get_dcs, AbstractDCS, Cluster, Member
|
||||
from .exceptions import PatroniException
|
||||
from .postgresql.misc import postgres_version_to_int
|
||||
@@ -1026,7 +1027,7 @@ def reload(cluster_name: str, member_names: List[str], group: Optional[int], for
|
||||
if r.status == 200:
|
||||
click.echo('No changes to apply on member {0}'.format(member.name))
|
||||
elif r.status == 202:
|
||||
config = get_global_config(cluster)
|
||||
config = global_config.from_cluster(cluster)
|
||||
click.echo('Reload request received for member {0} and will be processed within {1} seconds'.format(
|
||||
member.name, config.get('loop_wait') or dcs.loop_wait)
|
||||
)
|
||||
@@ -1105,7 +1106,7 @@ def restart(cluster_name: str, group: Optional[int], member_names: List[str],
|
||||
content['postgres_version'] = version
|
||||
|
||||
if scheduled_at:
|
||||
if get_global_config(cluster).is_paused:
|
||||
if global_config.from_cluster(cluster).is_paused:
|
||||
raise PatroniCtlException("Can't schedule restart in the paused state")
|
||||
content['schedule'] = scheduled_at.isoformat()
|
||||
|
||||
@@ -1228,7 +1229,7 @@ def _do_failover_or_switchover(action: str, cluster_name: str, group: Optional[i
|
||||
dcs = get_dcs(cluster_name, group)
|
||||
cluster = dcs.get_cluster()
|
||||
|
||||
global_config = get_global_config(cluster)
|
||||
config = global_config.from_cluster(cluster)
|
||||
|
||||
# leader has to be be defined for switchover only
|
||||
if action == 'switchover':
|
||||
@@ -1239,7 +1240,7 @@ def _do_failover_or_switchover(action: str, cluster_name: str, group: Optional[i
|
||||
if force:
|
||||
leader = cluster.leader.name
|
||||
else:
|
||||
prompt = 'Standby Leader' if global_config.is_standby_cluster else 'Primary'
|
||||
prompt = 'Standby Leader' if config.is_standby_cluster else 'Primary'
|
||||
leader = click.prompt(prompt, type=str, default=(cluster.leader and cluster.leader.name))
|
||||
|
||||
if cluster.leader.name != leader:
|
||||
@@ -1268,7 +1269,7 @@ def _do_failover_or_switchover(action: str, cluster_name: str, group: Optional[i
|
||||
|
||||
if all((not force,
|
||||
action == 'failover',
|
||||
global_config.is_synchronous_mode,
|
||||
config.is_synchronous_mode,
|
||||
not cluster.sync.is_empty,
|
||||
not cluster.sync.matches(candidate, True))):
|
||||
if click.confirm(f'Are you sure you want to failover to the asynchronous node {candidate}'):
|
||||
@@ -1285,7 +1286,7 @@ def _do_failover_or_switchover(action: str, cluster_name: str, group: Optional[i
|
||||
|
||||
scheduled_at = parse_scheduled(scheduled)
|
||||
if scheduled_at:
|
||||
if global_config.is_paused:
|
||||
if config.is_paused:
|
||||
raise PatroniCtlException("Can't schedule switchover in the paused state")
|
||||
scheduled_at_str = scheduled_at.isoformat()
|
||||
|
||||
@@ -1739,7 +1740,7 @@ def wait_until_pause_is_applied(dcs: AbstractDCS, paused: bool, old_cluster: Clu
|
||||
:param old_cluster: original cluster information before pause or unpause has been requested. Used to report which
|
||||
nodes are still pending to have ``pause`` equal *paused* at a given point in time.
|
||||
"""
|
||||
config = get_global_config(old_cluster)
|
||||
config = global_config.from_cluster(old_cluster)
|
||||
|
||||
click.echo("'{0}' request sent, waiting until it is recognized by all nodes".format(paused and 'pause' or 'resume'))
|
||||
old = {m.name: m.version for m in old_cluster.members if m.api_url}
|
||||
@@ -1777,7 +1778,7 @@ def toggle_pause(cluster_name: str, group: Optional[int], paused: bool, wait: bo
|
||||
"""
|
||||
dcs = get_dcs(cluster_name, group)
|
||||
cluster = dcs.get_cluster()
|
||||
if get_global_config(cluster).is_paused == paused:
|
||||
if global_config.from_cluster(cluster).is_paused == paused:
|
||||
raise PatroniCtlException('Cluster is {0} paused'.format(paused and 'already' or 'not'))
|
||||
|
||||
for member in get_all_members_leader_first(cluster):
|
||||
@@ -2122,7 +2123,7 @@ def edit_config(cluster_name: str, group: Optional[int], force: bool, quiet: boo
|
||||
return
|
||||
|
||||
if force or click.confirm('Apply these changes?'):
|
||||
if not dcs.set_config_value(json.dumps(changed_data), cluster.config.version):
|
||||
if not dcs.set_config_value(json.dumps(changed_data, separators=(',', ':')), cluster.config.version):
|
||||
raise PatroniCtlException("Config modification aborted due to concurrent changes")
|
||||
click.echo("Configuration changed")
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ from urllib.parse import urlparse, urlunparse, parse_qsl
|
||||
|
||||
import dateutil.parser
|
||||
|
||||
from .. import global_config
|
||||
from ..dynamic_loader import iter_classes, iter_modules
|
||||
from ..exceptions import PatroniFatalException
|
||||
from ..utils import deep_compare, uri
|
||||
@@ -538,24 +539,6 @@ class ClusterConfig(NamedTuple):
|
||||
modify_version = 0
|
||||
return ClusterConfig(version, data, version if modify_version is None else modify_version)
|
||||
|
||||
@property
|
||||
def permanent_slots(self) -> Dict[str, Any]:
|
||||
"""Dictionary of permanent slots information looked up from :attr:`~ClusterConfig.data`."""
|
||||
return (self.data.get('permanent_replication_slots')
|
||||
or self.data.get('permanent_slots')
|
||||
or self.data.get('slots')
|
||||
or {})
|
||||
|
||||
@property
|
||||
def ignore_slots_matchers(self) -> List[Dict[str, Any]]:
|
||||
"""The value for ``ignore_slots`` from :attr:`~ClusterConfig.data` if defined or an empty list."""
|
||||
return self.data.get('ignore_slots') or []
|
||||
|
||||
@property
|
||||
def max_timelines_history(self) -> int:
|
||||
"""The value for ``max_timelines_history`` from :attr:`~ClusterConfig.data` if defined or ``0``."""
|
||||
return self.data.get('max_timelines_history', 0)
|
||||
|
||||
|
||||
class SyncState(NamedTuple):
|
||||
"""Immutable object (namedtuple) which represents last observed synchronous replication state.
|
||||
@@ -944,7 +927,7 @@ class Cluster(NamedTuple('Cluster',
|
||||
@property
|
||||
def __permanent_slots(self) -> Dict[str, Union[Dict[str, Any], Any]]:
|
||||
"""Dictionary of permanent replication slots with their known LSN."""
|
||||
ret: Dict[str, Union[Dict[str, Any], Any]] = deepcopy(self.config.permanent_slots if self.config else {})
|
||||
ret: Dict[str, Union[Dict[str, Any], Any]] = global_config.permanent_slots
|
||||
|
||||
members: Dict[str, int] = {slot_name_from_member_name(m.name): m.lsn or 0 for m in self.members}
|
||||
slots: Dict[str, int] = {k: parse_int(v) or 0 for k, v in (self.slots or {}).items()}
|
||||
@@ -973,13 +956,8 @@ class Cluster(NamedTuple('Cluster',
|
||||
"""Dictionary of permanent ``logical`` replication slots."""
|
||||
return {name: value for name, value in self.__permanent_slots.items() if self.is_logical_slot(value)}
|
||||
|
||||
@property
|
||||
def use_slots(self) -> bool:
|
||||
"""``True`` if cluster is configured to use replication slots."""
|
||||
return bool(self.config and (self.config.data.get('postgresql') or {}).get('use_slots', True))
|
||||
|
||||
def get_replication_slots(self, my_name: str, role: str, nofailover: bool, major_version: int, *,
|
||||
is_standby_cluster: bool = False, show_error: bool = False) -> Dict[str, Dict[str, Any]]:
|
||||
show_error: bool = False) -> Dict[str, Dict[str, Any]]:
|
||||
"""Lookup configured slot names in the DCS, report issues found and merge with permanent slots.
|
||||
|
||||
Will log an error if:
|
||||
@@ -990,15 +968,12 @@ class Cluster(NamedTuple('Cluster',
|
||||
:param role: role of this node.
|
||||
:param nofailover: ``True`` if this node is tagged to not be a failover candidate.
|
||||
:param major_version: postgresql major version.
|
||||
:param is_standby_cluster: ``True`` if it is known that this is a standby cluster. We pass the value from
|
||||
the outside because we want to protect from the ``/config`` key removal.
|
||||
:param show_error: if ``True`` report error if any disabled logical slots or conflicting slot names are found.
|
||||
|
||||
:returns: final dictionary of slot names, after merging with permanent slots and performing sanity checks.
|
||||
"""
|
||||
slots: Dict[str, Dict[str, str]] = self._get_members_slots(my_name, role)
|
||||
permanent_slots: Dict[str, Any] = self._get_permanent_slots(is_standby_cluster=is_standby_cluster,
|
||||
role=role, nofailover=nofailover,
|
||||
permanent_slots: Dict[str, Any] = self._get_permanent_slots(role=role, nofailover=nofailover,
|
||||
major_version=major_version)
|
||||
|
||||
disabled_permanent_logical_slots: List[str] = self._merge_permanent_slots(
|
||||
@@ -1058,8 +1033,7 @@ class Cluster(NamedTuple('Cluster',
|
||||
logger.error("Bad value for slot '%s' in permanent_slots: %s", name, permanent_slots[name])
|
||||
return disabled_permanent_logical_slots
|
||||
|
||||
def _get_permanent_slots(self, *, is_standby_cluster: bool, role: str,
|
||||
nofailover: bool, major_version: int) -> Dict[str, Any]:
|
||||
def _get_permanent_slots(self, *, role: str, nofailover: bool, major_version: int) -> Dict[str, Any]:
|
||||
"""Get configured permanent replication slots.
|
||||
|
||||
.. note::
|
||||
@@ -1071,18 +1045,16 @@ class Cluster(NamedTuple('Cluster',
|
||||
The returned dictionary for a non-standby cluster always contains permanent logical replication slots in
|
||||
order to show a warning if they are not supported by PostgreSQL before v11.
|
||||
|
||||
:param is_standby_cluster: ``True`` if it is known that this is a standby cluster. We pass the value from
|
||||
the outside because we want to protect from the ``/config`` key removal.
|
||||
:param role: role of this node -- ``primary``, ``standby_leader`` or ``replica``.
|
||||
:param nofailover: ``True`` if this node is tagged to not be a failover candidate.
|
||||
:param major_version: postgresql major version.
|
||||
|
||||
:returns: dictionary of permanent slot names mapped to attributes.
|
||||
"""
|
||||
if not self.use_slots or nofailover:
|
||||
if not global_config.use_slots or nofailover:
|
||||
return {}
|
||||
|
||||
if is_standby_cluster:
|
||||
if global_config.is_standby_cluster:
|
||||
return self.__permanent_physical_slots \
|
||||
if major_version >= SLOT_ADVANCE_AVAILABLE_VERSION or role == 'standby_leader' else {}
|
||||
|
||||
@@ -1108,7 +1080,7 @@ class Cluster(NamedTuple('Cluster',
|
||||
|
||||
:returns: dictionary of physical replication slots that should exist on a given node.
|
||||
"""
|
||||
if not self.use_slots:
|
||||
if not global_config.use_slots:
|
||||
return {}
|
||||
|
||||
# we always want to exclude the member with our name from the list
|
||||
@@ -1132,13 +1104,11 @@ class Cluster(NamedTuple('Cluster',
|
||||
for k, v in slot_conflicts.items() if len(v) > 1))
|
||||
return slots
|
||||
|
||||
def has_permanent_slots(self, my_name: str, *, is_standby_cluster: bool = False, nofailover: bool = False,
|
||||
def has_permanent_slots(self, my_name: str, *, nofailover: bool = False,
|
||||
major_version: int = SLOT_ADVANCE_AVAILABLE_VERSION) -> bool:
|
||||
"""Check if the given member node has permanent replication slots configured.
|
||||
|
||||
:param my_name: name of the member node to check.
|
||||
:param is_standby_cluster: ``True`` if it is known that this is a standby cluster. We pass the value from
|
||||
the outside because we want to protect from the ``/config`` key removal.
|
||||
:param nofailover: ``True`` if this node is tagged to not be a failover candidate.
|
||||
:param major_version: postgresql major version.
|
||||
|
||||
@@ -1146,20 +1116,16 @@ class Cluster(NamedTuple('Cluster',
|
||||
"""
|
||||
role = 'replica'
|
||||
members_slots: Dict[str, Dict[str, str]] = self._get_members_slots(my_name, role)
|
||||
permanent_slots: Dict[str, Any] = self._get_permanent_slots(is_standby_cluster=is_standby_cluster,
|
||||
role=role, nofailover=nofailover,
|
||||
permanent_slots: Dict[str, Any] = self._get_permanent_slots(role=role, nofailover=nofailover,
|
||||
major_version=major_version)
|
||||
slots = deepcopy(members_slots)
|
||||
self._merge_permanent_slots(slots, permanent_slots, my_name, major_version)
|
||||
return len(slots) > len(members_slots) or any(self.is_physical_slot(v) for v in permanent_slots.values())
|
||||
|
||||
def filter_permanent_slots(self, slots: Dict[str, int], is_standby_cluster: bool,
|
||||
major_version: int) -> Dict[str, int]:
|
||||
def filter_permanent_slots(self, slots: Dict[str, int], major_version: int) -> Dict[str, int]:
|
||||
"""Filter out all non-permanent slots from provided *slots* dict.
|
||||
|
||||
:param slots: slot names with LSN values
|
||||
:param is_standby_cluster: ``True`` if it is known that this is a standby cluster. We pass the value from
|
||||
the outside because we want to protect from the ``/config`` key removal.
|
||||
:param major_version: postgresql major version.
|
||||
|
||||
:returns: a :class:`dict` object that contains only slots that are known to be permanent.
|
||||
@@ -1167,9 +1133,7 @@ class Cluster(NamedTuple('Cluster',
|
||||
if major_version < SLOT_ADVANCE_AVAILABLE_VERSION:
|
||||
return {} # for legacy PostgreSQL we don't support permanent slots on standby nodes
|
||||
|
||||
permanent_slots: Dict[str, Any] = self._get_permanent_slots(is_standby_cluster=is_standby_cluster,
|
||||
role='replica',
|
||||
nofailover=False,
|
||||
permanent_slots: Dict[str, Any] = self._get_permanent_slots(role='replica', nofailover=False,
|
||||
major_version=major_version)
|
||||
members_slots = {slot_name_from_member_name(m.name) for m in self.members}
|
||||
|
||||
@@ -1203,7 +1167,7 @@ class Cluster(NamedTuple('Cluster',
|
||||
if self._has_permanent_logical_slots(my_name, nofailover):
|
||||
return True
|
||||
|
||||
if self.use_slots:
|
||||
if global_config.use_slots:
|
||||
members = [m for m in self.members if m.replicatefrom == my_name and m.name != self.leader_name]
|
||||
return any(self.should_enforce_hot_standby_feedback(m.name, m.nofailover) for m in members)
|
||||
return False
|
||||
|
||||
227
patroni/global_config.py
Normal file
227
patroni/global_config.py
Normal file
@@ -0,0 +1,227 @@
|
||||
"""Implements *global_config* facilities.
|
||||
|
||||
The :class:`GlobalConfig` object is instantiated on import and replaces
|
||||
``patroni.global_config`` module in :data:`sys.modules`, what allows to use
|
||||
its properties and methods like they were module variables and functions.
|
||||
"""
|
||||
import sys
|
||||
import types
|
||||
|
||||
from copy import deepcopy
|
||||
from typing import Any, Dict, List, Optional, Union, TYPE_CHECKING
|
||||
|
||||
from .utils import parse_bool, parse_int
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
from .dcs import Cluster
|
||||
|
||||
|
||||
def __getattr__(mod: types.ModuleType, name: str) -> Any:
|
||||
"""This function exists just to make pyright happy.
|
||||
|
||||
Without it pyright complains about access to unknown members of global_config module.
|
||||
"""
|
||||
return getattr(sys.modules[__name__], name) # pragma: no cover
|
||||
|
||||
|
||||
class GlobalConfig(types.ModuleType):
|
||||
"""A class that wraps global configuration and provides convenient methods to access/check values."""
|
||||
|
||||
__file__ = __file__ # just to make unittest and pytest happy
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""Initialize :class:`GlobalConfig` object."""
|
||||
super().__init__(__name__)
|
||||
self.__config = {}
|
||||
|
||||
@staticmethod
|
||||
def _cluster_has_valid_config(cluster: Optional['Cluster']) -> bool:
|
||||
"""Check if provided *cluster* object has a valid global configuration.
|
||||
|
||||
:param cluster: the currently known cluster state from DCS.
|
||||
|
||||
:returns: ``True`` if provided *cluster* object has a valid global configuration, otherwise ``False``.
|
||||
"""
|
||||
return bool(cluster and cluster.config and cluster.config.modify_version)
|
||||
|
||||
def update(self, cluster: Optional['Cluster']) -> None:
|
||||
"""Update with the new global configuration from the :class:`Cluster` object view.
|
||||
|
||||
.. note::
|
||||
Global configuration is updated only when configuration in the *cluster* view is valid.
|
||||
|
||||
Update happens in-place and is executed only from the main heartbeat thread.
|
||||
|
||||
:param cluster: the currently known cluster state from DCS.
|
||||
"""
|
||||
# Try to protect from the case when DCS was wiped out
|
||||
if self._cluster_has_valid_config(cluster):
|
||||
self.__config = cluster.config.data # pyright: ignore [reportOptionalMemberAccess]
|
||||
|
||||
def from_cluster(self, cluster: Optional['Cluster']) -> 'GlobalConfig':
|
||||
"""Return :class:`GlobalConfig` instance from the provided :class:`Cluster` object view.
|
||||
|
||||
.. note::
|
||||
If the provided *cluster* object doesn't have a valid global configuration we return
|
||||
the last known valid state of the :class:`GlobalConfig` object.
|
||||
|
||||
This method is used when we need to have the most up-to-date values in the global configuration,
|
||||
but we don't want to update the global object.
|
||||
|
||||
:param cluster: the currently known cluster state from DCS.
|
||||
|
||||
:returns: :class:`GlobalConfig` object.
|
||||
"""
|
||||
if not self._cluster_has_valid_config(cluster):
|
||||
return self
|
||||
|
||||
ret = GlobalConfig()
|
||||
ret.update(cluster)
|
||||
return ret
|
||||
|
||||
def get(self, name: str) -> Any:
|
||||
"""Gets global configuration value by *name*.
|
||||
|
||||
:param name: parameter name.
|
||||
|
||||
:returns: configuration value or ``None`` if it is missing.
|
||||
"""
|
||||
return self.__config.get(name)
|
||||
|
||||
def check_mode(self, mode: str) -> bool:
|
||||
"""Checks whether the certain parameter is enabled.
|
||||
|
||||
:param mode: parameter name, e.g. ``synchronous_mode``, ``failsafe_mode``, ``pause``, ``check_timeline``, and
|
||||
so on.
|
||||
|
||||
:returns: ``True`` if parameter *mode* is enabled in the global configuration.
|
||||
"""
|
||||
return bool(parse_bool(self.__config.get(mode)))
|
||||
|
||||
@property
|
||||
def is_paused(self) -> bool:
|
||||
"""``True`` if cluster is in maintenance mode."""
|
||||
return self.check_mode('pause')
|
||||
|
||||
@property
|
||||
def is_synchronous_mode(self) -> bool:
|
||||
"""``True`` if synchronous replication is requested and it is not a standby cluster config."""
|
||||
return self.check_mode('synchronous_mode') and not self.is_standby_cluster
|
||||
|
||||
@property
|
||||
def is_synchronous_mode_strict(self) -> bool:
|
||||
"""``True`` if at least one synchronous node is required."""
|
||||
return self.check_mode('synchronous_mode_strict')
|
||||
|
||||
def get_standby_cluster_config(self) -> Union[Dict[str, Any], Any]:
|
||||
"""Get ``standby_cluster`` configuration.
|
||||
|
||||
:returns: a copy of ``standby_cluster`` configuration.
|
||||
"""
|
||||
return deepcopy(self.get('standby_cluster'))
|
||||
|
||||
@property
|
||||
def is_standby_cluster(self) -> bool:
|
||||
"""``True`` if global configuration has a valid ``standby_cluster`` section."""
|
||||
config = self.get_standby_cluster_config()
|
||||
return isinstance(config, dict) and\
|
||||
bool(config.get('host') or config.get('port') or config.get('restore_command'))
|
||||
|
||||
def get_int(self, name: str, default: int = 0) -> int:
|
||||
"""Gets current value of *name* from the global configuration and try to return it as :class:`int`.
|
||||
|
||||
:param name: name of the parameter.
|
||||
:param default: default value if *name* is not in the configuration or invalid.
|
||||
|
||||
:returns: currently configured value of *name* from the global configuration or *default* if it is not set or
|
||||
invalid.
|
||||
"""
|
||||
ret = parse_int(self.get(name))
|
||||
return default if ret is None else ret
|
||||
|
||||
@property
|
||||
def min_synchronous_nodes(self) -> int:
|
||||
"""The minimum number of synchronous nodes based on whether ``synchronous_mode_strict`` is enabled or not."""
|
||||
return 1 if self.is_synchronous_mode_strict else 0
|
||||
|
||||
@property
|
||||
def synchronous_node_count(self) -> int:
|
||||
"""Currently configured value of ``synchronous_node_count`` from the global configuration.
|
||||
|
||||
Assume ``1`` if it is not set or invalid.
|
||||
"""
|
||||
return max(self.get_int('synchronous_node_count', 1), self.min_synchronous_nodes)
|
||||
|
||||
@property
|
||||
def maximum_lag_on_failover(self) -> int:
|
||||
"""Currently configured value of ``maximum_lag_on_failover`` from the global configuration.
|
||||
|
||||
Assume ``1048576`` if it is not set or invalid.
|
||||
"""
|
||||
return self.get_int('maximum_lag_on_failover', 1048576)
|
||||
|
||||
@property
|
||||
def maximum_lag_on_syncnode(self) -> int:
|
||||
"""Currently configured value of ``maximum_lag_on_syncnode`` from the global configuration.
|
||||
|
||||
Assume ``-1`` if it is not set or invalid.
|
||||
"""
|
||||
return self.get_int('maximum_lag_on_syncnode', -1)
|
||||
|
||||
@property
|
||||
def primary_start_timeout(self) -> int:
|
||||
"""Currently configured value of ``primary_start_timeout`` from the global configuration.
|
||||
|
||||
Assume ``300`` if it is not set or invalid.
|
||||
|
||||
.. note::
|
||||
``master_start_timeout`` is still supported to keep backward compatibility.
|
||||
"""
|
||||
default = 300
|
||||
return self.get_int('primary_start_timeout', default)\
|
||||
if 'primary_start_timeout' in self.__config else self.get_int('master_start_timeout', default)
|
||||
|
||||
@property
|
||||
def primary_stop_timeout(self) -> int:
|
||||
"""Currently configured value of ``primary_stop_timeout`` from the global configuration.
|
||||
|
||||
Assume ``0`` if it is not set or invalid.
|
||||
|
||||
.. note::
|
||||
``master_stop_timeout`` is still supported to keep backward compatibility.
|
||||
"""
|
||||
default = 0
|
||||
return self.get_int('primary_stop_timeout', default)\
|
||||
if 'primary_stop_timeout' in self.__config else self.get_int('master_stop_timeout', default)
|
||||
|
||||
@property
|
||||
def ignore_slots_matchers(self) -> List[Dict[str, Any]]:
|
||||
"""Currently configured value of ``ignore_slots`` from the global configuration.
|
||||
|
||||
Assume an empty :class:`list` if not set.
|
||||
"""
|
||||
return self.get('ignore_slots') or []
|
||||
|
||||
@property
|
||||
def max_timelines_history(self) -> int:
|
||||
"""Currently configured value of ``max_timelines_history`` from the global configuration.
|
||||
|
||||
Assume ``0`` if not set or invalid.
|
||||
"""
|
||||
return self.get_int('max_timelines_history', 0)
|
||||
|
||||
@property
|
||||
def use_slots(self) -> bool:
|
||||
"""``True`` if cluster is configured to use replication slots."""
|
||||
return bool(parse_bool((self.get('postgresql') or {}).get('use_slots', True)))
|
||||
|
||||
@property
|
||||
def permanent_slots(self) -> Dict[str, Any]:
|
||||
"""Dictionary of permanent slots information from the global configuration."""
|
||||
return deepcopy(self.get('permanent_replication_slots')
|
||||
or self.get('permanent_slots')
|
||||
or self.get('slots')
|
||||
or {})
|
||||
|
||||
|
||||
sys.modules[__name__] = GlobalConfig()
|
||||
@@ -10,7 +10,7 @@ from multiprocessing.pool import ThreadPool
|
||||
from threading import RLock
|
||||
from typing import Any, Callable, Collection, Dict, List, NamedTuple, Optional, Union, Tuple, TYPE_CHECKING
|
||||
|
||||
from . import psycopg
|
||||
from . import global_config, psycopg
|
||||
from .__main__ import Patroni
|
||||
from .async_executor import AsyncExecutor, CriticalTask
|
||||
from .collections import CaseInsensitiveSet
|
||||
@@ -156,7 +156,6 @@ class Ha(object):
|
||||
self._rewind = Rewind(self.state_handler)
|
||||
self.dcs = patroni.dcs
|
||||
self.cluster = Cluster.empty()
|
||||
self.global_config = self.patroni.config.get_global_config(None)
|
||||
self.old_cluster = Cluster.empty()
|
||||
self._leader_expiry = 0
|
||||
self._leader_expiry_lock = RLock()
|
||||
@@ -188,20 +187,20 @@ class Ha(object):
|
||||
|
||||
def primary_stop_timeout(self) -> Union[int, None]:
|
||||
""":returns: "primary_stop_timeout" from the global configuration or `None` when not in synchronous mode."""
|
||||
ret = self.global_config.primary_stop_timeout
|
||||
ret = global_config.primary_stop_timeout
|
||||
return ret if ret > 0 and self.is_synchronous_mode() else None
|
||||
|
||||
def is_paused(self) -> bool:
|
||||
""":returns: `True` if in maintenance mode."""
|
||||
return self.global_config.is_paused
|
||||
return global_config.is_paused
|
||||
|
||||
def check_timeline(self) -> bool:
|
||||
""":returns: `True` if should check whether the timeline is latest during the leader race."""
|
||||
return self.global_config.check_mode('check_timeline')
|
||||
return global_config.check_mode('check_timeline')
|
||||
|
||||
def is_standby_cluster(self) -> bool:
|
||||
""":returns: `True` if global configuration has a valid "standby_cluster" section."""
|
||||
return self.global_config.is_standby_cluster
|
||||
return global_config.is_standby_cluster
|
||||
|
||||
def is_leader(self) -> bool:
|
||||
""":returns: `True` if the current node is the leader, based on expiration set when it last held the key."""
|
||||
@@ -296,7 +295,6 @@ class Ha(object):
|
||||
last_lsn = self.state_handler.last_operation()
|
||||
slots = self.cluster.filter_permanent_slots(
|
||||
{**self.state_handler.slots(), slot_name_from_member_name(self.state_handler.name): last_lsn},
|
||||
self.is_standby_cluster(),
|
||||
self.state_handler.major_version)
|
||||
except Exception:
|
||||
logger.exception('Exception when called state_handler.last_operation()')
|
||||
@@ -450,7 +448,7 @@ class Ha(object):
|
||||
return ret or 'trying to bootstrap {0}'.format(msg)
|
||||
|
||||
# no leader, but configuration may allowed replica creation using backup tools
|
||||
create_replica_methods = self.global_config.get_standby_cluster_config().get('create_replica_methods', []) \
|
||||
create_replica_methods = global_config.get_standby_cluster_config().get('create_replica_methods', []) \
|
||||
if self.is_standby_cluster() else None
|
||||
can_bootstrap = self.state_handler.can_create_replica_without_replication_connection(create_replica_methods)
|
||||
concurrent_bootstrap = self.cluster.initialize == ""
|
||||
@@ -525,7 +523,7 @@ class Ha(object):
|
||||
:returns: action message, describing what was performed.
|
||||
"""
|
||||
if self.has_lock() and self.update_lock():
|
||||
timeout = self.global_config.primary_start_timeout
|
||||
timeout = global_config.primary_start_timeout
|
||||
if timeout == 0:
|
||||
# We are requested to prefer failing over to restarting primary. But see first if there
|
||||
# is anyone to fail over to.
|
||||
@@ -622,7 +620,7 @@ class Ha(object):
|
||||
for param in params: # It is highly unlikely to happen, but we want to protect from the case
|
||||
node_to_follow.data.pop(param, None) # when above-mentioned params came from outside.
|
||||
if self.is_standby_cluster():
|
||||
standby_config = self.global_config.get_standby_cluster_config()
|
||||
standby_config = global_config.get_standby_cluster_config()
|
||||
node_to_follow.data.update({p: standby_config[p] for p in params if standby_config.get(p)})
|
||||
|
||||
return node_to_follow
|
||||
@@ -684,11 +682,11 @@ class Ha(object):
|
||||
|
||||
def is_synchronous_mode(self) -> bool:
|
||||
""":returns: `True` if synchronous replication is requested."""
|
||||
return self.global_config.is_synchronous_mode
|
||||
return global_config.is_synchronous_mode
|
||||
|
||||
def is_failsafe_mode(self) -> bool:
|
||||
""":returns: `True` if failsafe_mode is enabled in global configuration."""
|
||||
return self.global_config.check_mode('failsafe_mode')
|
||||
return global_config.check_mode('failsafe_mode')
|
||||
|
||||
def process_sync_replication(self) -> None:
|
||||
"""Process synchronous standby beahvior.
|
||||
@@ -732,7 +730,7 @@ class Ha(object):
|
||||
return logger.info('Synchronous replication key updated by someone else.')
|
||||
|
||||
# When strict mode and no suitable replication connections put "*" to synchronous_standby_names
|
||||
if self.global_config.is_synchronous_mode_strict and not picked:
|
||||
if global_config.is_synchronous_mode_strict and not picked:
|
||||
picked = CaseInsensitiveSet('*')
|
||||
logger.warning("No standbys available!")
|
||||
|
||||
@@ -805,7 +803,7 @@ class Ha(object):
|
||||
cluster_history_dict: Dict[int, List[Any]] = {line[0]: list(line) for line in cluster_history}
|
||||
history: List[List[Any]] = list(map(list, self.state_handler.get_history(primary_timeline)))
|
||||
if self.cluster.config:
|
||||
history = history[-self.cluster.config.max_timelines_history:]
|
||||
history = history[-global_config.max_timelines_history:]
|
||||
for line in history:
|
||||
# enrich current history with promotion timestamps stored in DCS
|
||||
cluster_history_line = cluster_history_dict.get(line[0], [])
|
||||
@@ -863,7 +861,7 @@ class Ha(object):
|
||||
# promotion until next cycle. TODO: trigger immediate retry of run_cycle
|
||||
return 'Postponing promotion because synchronous replication state was updated by somebody else'
|
||||
self.state_handler.sync_handler.set_synchronous_standby_names(
|
||||
CaseInsensitiveSet('*') if self.global_config.is_synchronous_mode_strict else CaseInsensitiveSet())
|
||||
CaseInsensitiveSet('*') if global_config.is_synchronous_mode_strict else CaseInsensitiveSet())
|
||||
if self.state_handler.role not in ('master', 'promoted', 'primary'):
|
||||
# reset failsafe state when promote
|
||||
self._failsafe.set_is_active(0)
|
||||
@@ -974,7 +972,7 @@ class Ha(object):
|
||||
:returns True when node is lagging
|
||||
"""
|
||||
lag = (self.cluster.last_lsn or 0) - wal_position
|
||||
return lag > self.global_config.maximum_lag_on_failover
|
||||
return lag > global_config.maximum_lag_on_failover
|
||||
|
||||
def _is_healthiest_node(self, members: Collection[Member], check_replication_lag: bool = True) -> bool:
|
||||
"""This method tries to determine whether I am healthy enough to became a new leader candidate or not."""
|
||||
@@ -1541,7 +1539,7 @@ class Ha(object):
|
||||
|
||||
# Now that restart is scheduled we can set timeout for startup, it will get reset
|
||||
# once async executor runs and main loop notices PostgreSQL as up.
|
||||
timeout = restart_data.get('timeout', self.global_config.primary_start_timeout)
|
||||
timeout = restart_data.get('timeout', global_config.primary_start_timeout)
|
||||
self.set_start_timeout(timeout)
|
||||
|
||||
def before_shutdown() -> None:
|
||||
@@ -1605,7 +1603,7 @@ class Ha(object):
|
||||
"""Figure out what to do with the task AsyncExecutor is performing."""
|
||||
if self.has_lock() and self.update_lock():
|
||||
if self._async_executor.scheduled_action == 'doing crash recovery in a single user mode':
|
||||
time_left = self.global_config.primary_start_timeout - (time.time() - self._crash_recovery_started)
|
||||
time_left = global_config.primary_start_timeout - (time.time() - self._crash_recovery_started)
|
||||
if time_left <= 0 and self.is_failover_possible():
|
||||
logger.info("Demoting self because crash recovery is taking too long")
|
||||
self.state_handler.cancellable.cancel(True)
|
||||
@@ -1690,7 +1688,7 @@ class Ha(object):
|
||||
self.set_is_leader(True)
|
||||
if self.is_synchronous_mode():
|
||||
self.state_handler.sync_handler.set_synchronous_standby_names(
|
||||
CaseInsensitiveSet('*') if self.global_config.is_synchronous_mode_strict else CaseInsensitiveSet())
|
||||
CaseInsensitiveSet('*') if global_config.is_synchronous_mode_strict else CaseInsensitiveSet())
|
||||
self.state_handler.call_nowait(CallbackAction.ON_START)
|
||||
self.load_cluster_from_dcs()
|
||||
|
||||
@@ -1713,7 +1711,7 @@ class Ha(object):
|
||||
self.demote('immediate-nolock')
|
||||
return 'stopped PostgreSQL while starting up because leader key was lost'
|
||||
|
||||
timeout = self._start_timeout or self.global_config.primary_start_timeout
|
||||
timeout = self._start_timeout or global_config.primary_start_timeout
|
||||
time_left = timeout - self.state_handler.time_in_state()
|
||||
|
||||
if time_left <= 0:
|
||||
@@ -1746,8 +1744,8 @@ class Ha(object):
|
||||
try:
|
||||
try:
|
||||
self.load_cluster_from_dcs()
|
||||
self.global_config = self.patroni.config.get_global_config(self.cluster)
|
||||
self.state_handler.reset_cluster_info_state(self.cluster, self.patroni.nofailover, self.global_config)
|
||||
global_config.update(self.cluster)
|
||||
self.state_handler.reset_cluster_info_state(self.cluster, self.patroni.nofailover)
|
||||
except Exception:
|
||||
self.state_handler.reset_cluster_info_state(None)
|
||||
raise
|
||||
@@ -1767,10 +1765,10 @@ class Ha(object):
|
||||
self.touch_member()
|
||||
|
||||
# cluster has leader key but not initialize key
|
||||
if not (self.cluster.is_unlocked() or self.sysid_valid(self.cluster.initialize)) and self.has_lock():
|
||||
if self.has_lock(False) and not self.sysid_valid(self.cluster.initialize):
|
||||
self.dcs.initialize(create_new=(self.cluster.initialize is None), sysid=self.state_handler.sysid)
|
||||
|
||||
if not (self.cluster.is_unlocked() or self.cluster.config and self.cluster.config.data) and self.has_lock():
|
||||
if self.has_lock(False) and not (self.cluster.config and self.cluster.config.data):
|
||||
self.dcs.set_config_value(json.dumps(self.patroni.config.dynamic_configuration, separators=(',', ':')))
|
||||
self.cluster = self.dcs.get_cluster()
|
||||
|
||||
@@ -2047,7 +2045,7 @@ class Ha(object):
|
||||
config or cluster.config.data.
|
||||
"""
|
||||
data: Dict[str, Any] = {}
|
||||
cluster_params = self.global_config.get_standby_cluster_config()
|
||||
cluster_params = global_config.get_standby_cluster_config()
|
||||
|
||||
if cluster_params:
|
||||
data.update({k: v for k, v in cluster_params.items() if k in RemoteMember.ALLOWED_KEYS})
|
||||
|
||||
@@ -24,7 +24,7 @@ from .misc import parse_history, parse_lsn, postgres_major_version_to_int
|
||||
from .postmaster import PostmasterProcess
|
||||
from .slots import SlotsHandler
|
||||
from .sync import SyncHandler
|
||||
from .. import psycopg
|
||||
from .. import global_config, psycopg
|
||||
from ..async_executor import CriticalTask
|
||||
from ..collections import CaseInsensitiveSet
|
||||
from ..dcs import Cluster, Leader, Member, SLOT_ADVANCE_AVAILABLE_VERSION
|
||||
@@ -34,7 +34,6 @@ from ..utils import Retry, RetryFailedError, polling_loop, data_directory_is_emp
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
from psycopg import Connection as Connection3, Cursor
|
||||
from psycopg2 import connection as connection3, cursor
|
||||
from ..config import GlobalConfig
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -73,7 +72,6 @@ class Postgresql(object):
|
||||
self.connection_string: str
|
||||
self.proxy_url: Optional[str]
|
||||
self._major_version = self.get_major_version()
|
||||
self._global_config = None
|
||||
|
||||
self._state_lock = Lock()
|
||||
self.set_state('stopped')
|
||||
@@ -217,7 +215,7 @@ class Postgresql(object):
|
||||
"FROM pg_catalog.pg_stat_get_wal_senders() w,"
|
||||
" pg_catalog.pg_stat_get_activity(w.pid)"
|
||||
" WHERE w.state = 'streaming') r)").format(self.wal_name, self.lsn_name)
|
||||
if (not self.global_config or self.global_config.is_synchronous_mode)
|
||||
if global_config.is_synchronous_mode
|
||||
and self.role in ('master', 'primary', 'promoted') else "'on', '', NULL")
|
||||
|
||||
if self._major_version >= 90600:
|
||||
@@ -426,12 +424,7 @@ class Postgresql(object):
|
||||
self.config.write_postgresql_conf()
|
||||
self.reload()
|
||||
|
||||
@property
|
||||
def global_config(self) -> Optional['GlobalConfig']:
|
||||
return self._global_config
|
||||
|
||||
def reset_cluster_info_state(self, cluster: Union[Cluster, None], nofailover: bool = False,
|
||||
global_config: Optional['GlobalConfig'] = None) -> None:
|
||||
def reset_cluster_info_state(self, cluster: Union[Cluster, None], nofailover: bool = False) -> None:
|
||||
"""Reset monitoring query cache.
|
||||
|
||||
It happens in the beginning of heart-beat loop and on change of `synchronous_standby_names`.
|
||||
@@ -440,30 +433,22 @@ class Postgresql(object):
|
||||
:param nofailover: whether this node could become a new primary.
|
||||
Important when there are logical permanent replication slots because "nofailover"
|
||||
node could do cascading replication and should enable `hot_standby_feedback`
|
||||
:param global_config: last known :class:`GlobalConfig` object
|
||||
"""
|
||||
self._cluster_info_state = {}
|
||||
|
||||
if global_config:
|
||||
self._global_config = global_config
|
||||
|
||||
if not self._global_config:
|
||||
return
|
||||
|
||||
if self._global_config.is_standby_cluster:
|
||||
if global_config.is_standby_cluster:
|
||||
# Standby cluster can't have logical replication slots, and we don't need to enforce hot_standby_feedback
|
||||
self.set_enforce_hot_standby_feedback(False)
|
||||
|
||||
if cluster and cluster.config and cluster.config.modify_version:
|
||||
# We want to enable hot_standby_feedback if the replica is supposed
|
||||
# to have a logical slot or in case if it is the cascading replica.
|
||||
self.set_enforce_hot_standby_feedback(not self._global_config.is_standby_cluster and self.can_advance_slots
|
||||
self.set_enforce_hot_standby_feedback(not global_config.is_standby_cluster and self.can_advance_slots
|
||||
and cluster.should_enforce_hot_standby_feedback(self.name,
|
||||
nofailover))
|
||||
|
||||
self._has_permanent_slots = cluster.has_permanent_slots(
|
||||
my_name=self.name,
|
||||
is_standby_cluster=self._global_config.is_standby_cluster,
|
||||
nofailover=nofailover,
|
||||
major_version=self.major_version)
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@ from types import TracebackType
|
||||
from typing import Any, Collection, Dict, Iterator, List, Optional, Union, Tuple, Type, TYPE_CHECKING
|
||||
|
||||
from .validator import recovery_parameters, transform_postgresql_parameter_value, transform_recovery_parameter_value
|
||||
from .. import global_config
|
||||
from ..collections import CaseInsensitiveDict, CaseInsensitiveSet
|
||||
from ..dcs import Leader, Member, RemoteMember, slot_name_from_member_name
|
||||
from ..exceptions import PatroniFatalException, PostgresConnectionException
|
||||
@@ -595,7 +596,7 @@ class ConfigHandler(object):
|
||||
is_remote_member = isinstance(member, RemoteMember)
|
||||
primary_conninfo = self.primary_conninfo_params(member)
|
||||
if primary_conninfo:
|
||||
use_slots = self.get('use_slots', True) and self._postgresql.major_version >= 90400
|
||||
use_slots = global_config.use_slots and self._postgresql.major_version >= 90400
|
||||
if use_slots and not (is_remote_member and member.no_replication_slot):
|
||||
primary_slot_name = member.primary_slot_name if is_remote_member else self._postgresql.name
|
||||
recovery_params['primary_slot_name'] = slot_name_from_member_name(primary_slot_name)
|
||||
@@ -930,10 +931,10 @@ class ConfigHandler(object):
|
||||
parameters = config['parameters'].copy()
|
||||
listen_addresses, port = split_host_port(config['listen'], 5432)
|
||||
parameters.update(cluster_name=self._postgresql.scope, listen_addresses=listen_addresses, port=str(port))
|
||||
if not self._postgresql.global_config or self._postgresql.global_config.is_synchronous_mode:
|
||||
if global_config.is_synchronous_mode:
|
||||
synchronous_standby_names = self._server_parameters.get('synchronous_standby_names')
|
||||
if synchronous_standby_names is None:
|
||||
if self._postgresql.global_config and self._postgresql.global_config.is_synchronous_mode_strict\
|
||||
if global_config.is_synchronous_mode_strict\
|
||||
and self._postgresql.role in ('master', 'primary', 'promoted'):
|
||||
parameters['synchronous_standby_names'] = '*'
|
||||
else:
|
||||
|
||||
@@ -13,6 +13,7 @@ from typing import Any, Dict, Iterator, List, Optional, Union, Tuple, TYPE_CHECK
|
||||
|
||||
from .connection import get_connection_cursor
|
||||
from .misc import format_lsn, fsync_dir
|
||||
from .. import global_config
|
||||
from ..dcs import Cluster, Leader
|
||||
from ..file_perm import pg_perm
|
||||
from ..psycopg import OperationalError
|
||||
@@ -293,7 +294,7 @@ class SlotsHandler:
|
||||
"""
|
||||
slot = self._replication_slots[name]
|
||||
if cluster.config:
|
||||
for matcher in cluster.config.ignore_slots_matchers:
|
||||
for matcher in global_config.ignore_slots_matchers:
|
||||
if (
|
||||
(matcher.get("name") is None or matcher["name"] == name)
|
||||
and all(not matcher.get(a) or matcher[a] == slot.get(a)
|
||||
@@ -510,13 +511,12 @@ class SlotsHandler:
|
||||
:returns: list of logical replication slots names that should be copied from the primary.
|
||||
"""
|
||||
ret = []
|
||||
if self._postgresql.major_version >= 90400 and self._postgresql.global_config and cluster.config:
|
||||
if self._postgresql.major_version >= 90400 and cluster.config:
|
||||
try:
|
||||
self.load_replication_slots()
|
||||
|
||||
slots = cluster.get_replication_slots(
|
||||
self._postgresql.name, self._postgresql.role, nofailover, self._postgresql.major_version,
|
||||
is_standby_cluster=self._postgresql.global_config.is_standby_cluster, show_error=True)
|
||||
slots = cluster.get_replication_slots(self._postgresql.name, self._postgresql.role,
|
||||
nofailover, self._postgresql.major_version, show_error=True)
|
||||
|
||||
self._drop_incorrect_slots(cluster, slots, paused)
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ import time
|
||||
from copy import deepcopy
|
||||
from typing import Collection, List, NamedTuple, Tuple, TYPE_CHECKING
|
||||
|
||||
from .. import global_config
|
||||
from ..collections import CaseInsensitiveDict, CaseInsensitiveSet
|
||||
from ..dcs import Cluster
|
||||
from ..psycopg import quote_ident as _quote_ident
|
||||
@@ -303,11 +304,8 @@ END;$$""")
|
||||
replica_list = _ReplicaList(self._postgresql, cluster)
|
||||
self._process_replica_readiness(cluster, replica_list)
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
assert self._postgresql.global_config is not None
|
||||
sync_node_count = self._postgresql.global_config.synchronous_node_count\
|
||||
if self._postgresql.supports_multiple_sync else 1
|
||||
sync_node_maxlag = self._postgresql.global_config.maximum_lag_on_syncnode
|
||||
sync_node_count = global_config.synchronous_node_count if self._postgresql.supports_multiple_sync else 1
|
||||
sync_node_maxlag = global_config.maximum_lag_on_syncnode
|
||||
|
||||
candidates = CaseInsensitiveSet()
|
||||
sync_nodes = CaseInsensitiveSet()
|
||||
|
||||
@@ -33,7 +33,6 @@ from .version import __version__
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
from .dcs import Cluster
|
||||
from .config import GlobalConfig
|
||||
|
||||
tzutc = tz.tzutc()
|
||||
|
||||
@@ -759,12 +758,10 @@ def iter_response_objects(response: HTTPResponse) -> Iterator[Dict[str, Any]]:
|
||||
prev = chunk[idx:]
|
||||
|
||||
|
||||
def cluster_as_json(cluster: 'Cluster', global_config: Optional['GlobalConfig'] = None) -> Dict[str, Any]:
|
||||
def cluster_as_json(cluster: 'Cluster') -> Dict[str, Any]:
|
||||
"""Get a JSON representation of *cluster*.
|
||||
|
||||
:param cluster: the :class:`~patroni.dcs.Cluster` object to be parsed as JSON.
|
||||
:param global_config: optional :class:`~patroni.config.GlobalConfig` object to check the cluster state.
|
||||
if not provided will be instantiated from the `Cluster.config`.
|
||||
|
||||
:returns: JSON representation of *cluster*.
|
||||
|
||||
@@ -793,16 +790,16 @@ def cluster_as_json(cluster: 'Cluster', global_config: Optional['GlobalConfig']
|
||||
* ``from``: name of the member to be demoted;
|
||||
* ``to``: name of the member to be promoted.
|
||||
"""
|
||||
if not global_config:
|
||||
from patroni.config import get_global_config
|
||||
global_config = get_global_config(cluster)
|
||||
from . import global_config
|
||||
|
||||
config = global_config.from_cluster(cluster)
|
||||
leader_name = cluster.leader.name if cluster.leader else None
|
||||
cluster_lsn = cluster.last_lsn or 0
|
||||
|
||||
ret: Dict[str, Any] = {'members': []}
|
||||
for m in cluster.members:
|
||||
if m.name == leader_name:
|
||||
role = 'standby_leader' if global_config.is_standby_cluster else 'leader'
|
||||
role = 'standby_leader' if config.is_standby_cluster else 'leader'
|
||||
elif cluster.sync.matches(m.name):
|
||||
role = 'sync_standby'
|
||||
else:
|
||||
@@ -832,7 +829,7 @@ def cluster_as_json(cluster: 'Cluster', global_config: Optional['GlobalConfig']
|
||||
# sort members by name for consistency
|
||||
cmp: Callable[[Dict[str, Any]], bool] = lambda m: m['name']
|
||||
ret['members'].sort(key=cmp)
|
||||
if global_config.is_paused:
|
||||
if config.is_paused:
|
||||
ret['pause'] = True
|
||||
if cluster.failover and cluster.failover.scheduled_at:
|
||||
ret['scheduled_switchover'] = {'at': cluster.failover.scheduled_at.isoformat()}
|
||||
|
||||
@@ -8,8 +8,8 @@ from io import BytesIO as IO
|
||||
from mock import Mock, PropertyMock, patch
|
||||
from socketserver import ThreadingMixIn
|
||||
|
||||
from patroni import global_config
|
||||
from patroni.api import RestApiHandler, RestApiServer
|
||||
from patroni.config import GlobalConfig
|
||||
from patroni.dcs import ClusterConfig, Member
|
||||
from patroni.exceptions import PostgresConnectionException
|
||||
from patroni.ha import _MemberStatus
|
||||
@@ -148,16 +148,9 @@ class MockLogger(object):
|
||||
records_lost = 1
|
||||
|
||||
|
||||
class MockConfig(object):
|
||||
|
||||
def get_global_config(self, _):
|
||||
return GlobalConfig({})
|
||||
|
||||
|
||||
class MockPatroni(object):
|
||||
|
||||
ha = MockHa()
|
||||
config = MockConfig()
|
||||
postgresql = ha.state_handler
|
||||
dcs = Mock()
|
||||
logger = MockLogger()
|
||||
@@ -211,7 +204,7 @@ class TestRestApiHandler(unittest.TestCase):
|
||||
def test_do_GET(self):
|
||||
MockPatroni.dcs.cluster.last_lsn = 20
|
||||
MockPatroni.dcs.cluster.sync.members = [MockPostgresql.name]
|
||||
with patch.object(GlobalConfig, 'is_synchronous_mode', PropertyMock(return_value=True)):
|
||||
with patch.object(global_config.__class__, 'is_synchronous_mode', PropertyMock(return_value=True)):
|
||||
MockRestApiServer(RestApiHandler, 'GET /replica')
|
||||
MockRestApiServer(RestApiHandler, 'GET /replica?lag=1M')
|
||||
MockRestApiServer(RestApiHandler, 'GET /replica?lag=10MB')
|
||||
@@ -234,7 +227,7 @@ class TestRestApiHandler(unittest.TestCase):
|
||||
with patch.object(MockHa, 'is_leader', Mock(return_value=True)):
|
||||
MockRestApiServer(RestApiHandler, 'GET /replica')
|
||||
MockRestApiServer(RestApiHandler, 'GET /read-only-sync')
|
||||
with patch.object(GlobalConfig, 'is_standby_cluster', Mock(return_value=True)):
|
||||
with patch.object(global_config.__class__, 'is_standby_cluster', Mock(return_value=True)):
|
||||
MockRestApiServer(RestApiHandler, 'GET /standby_leader')
|
||||
MockPatroni.dcs.cluster = None
|
||||
with patch.object(RestApiHandler, 'get_postgresql_status', Mock(return_value={'role': 'primary'})):
|
||||
@@ -244,8 +237,8 @@ class TestRestApiHandler(unittest.TestCase):
|
||||
self.assertIsNotNone(MockRestApiServer(RestApiHandler, 'GET /primary'))
|
||||
with patch.object(RestApiServer, 'query', Mock(return_value=[('', 1, '', '', '', '', False, None, None, '')])):
|
||||
self.assertIsNotNone(MockRestApiServer(RestApiHandler, 'GET /patroni'))
|
||||
with patch.object(GlobalConfig, 'is_standby_cluster', Mock(return_value=True)), \
|
||||
patch.object(GlobalConfig, 'is_paused', Mock(return_value=True)):
|
||||
with patch.object(global_config.__class__, 'is_standby_cluster', Mock(return_value=True)), \
|
||||
patch.object(global_config.__class__, 'is_paused', Mock(return_value=True)):
|
||||
MockRestApiServer(RestApiHandler, 'GET /standby_leader')
|
||||
|
||||
# test tags
|
||||
@@ -475,7 +468,7 @@ class TestRestApiHandler(unittest.TestCase):
|
||||
request = make_request(role='primary', postgres_version='9.5.2')
|
||||
MockRestApiServer(RestApiHandler, request)
|
||||
|
||||
with patch.object(GlobalConfig, 'is_paused', PropertyMock(return_value=True)):
|
||||
with patch.object(global_config.__class__, 'is_paused', PropertyMock(return_value=True)):
|
||||
MockRestApiServer(RestApiHandler, make_request(schedule='2016-08-42 12:45TZ+1', role='primary'))
|
||||
# Valid timeout
|
||||
MockRestApiServer(RestApiHandler, make_request(timeout='60s'))
|
||||
@@ -537,7 +530,7 @@ class TestRestApiHandler(unittest.TestCase):
|
||||
|
||||
# Switchover in pause mode
|
||||
with patch.object(RestApiHandler, 'write_response') as response_mock, \
|
||||
patch.object(GlobalConfig, 'is_paused', PropertyMock(return_value=True)):
|
||||
patch.object(global_config.__class__, 'is_paused', PropertyMock(return_value=True)):
|
||||
MockRestApiServer(RestApiHandler, request)
|
||||
response_mock.assert_called_with(
|
||||
400, 'Switchover is possible only to a specific candidate in a paused state')
|
||||
@@ -546,7 +539,8 @@ class TestRestApiHandler(unittest.TestCase):
|
||||
for is_synchronous_mode, response in (
|
||||
(True, 'switchover is not possible: can not find sync_standby'),
|
||||
(False, 'switchover is not possible: cluster does not have members except leader')):
|
||||
with patch.object(GlobalConfig, 'is_synchronous_mode', PropertyMock(return_value=is_synchronous_mode)), \
|
||||
with patch.object(global_config.__class__, 'is_synchronous_mode',
|
||||
PropertyMock(return_value=is_synchronous_mode)), \
|
||||
patch.object(RestApiHandler, 'write_response') as response_mock:
|
||||
MockRestApiServer(RestApiHandler, request)
|
||||
response_mock.assert_called_with(412, response)
|
||||
@@ -571,7 +565,8 @@ class TestRestApiHandler(unittest.TestCase):
|
||||
cluster.sync.matches.return_value = False
|
||||
for is_synchronous_mode, response in (
|
||||
(True, 'candidate name does not match with sync_standby'), (False, 'candidate does not exists')):
|
||||
with patch.object(GlobalConfig, 'is_synchronous_mode', PropertyMock(return_value=is_synchronous_mode)), \
|
||||
with patch.object(global_config.__class__, 'is_synchronous_mode',
|
||||
PropertyMock(return_value=is_synchronous_mode)), \
|
||||
patch.object(RestApiHandler, 'write_response') as response_mock:
|
||||
MockRestApiServer(RestApiHandler, request)
|
||||
response_mock.assert_called_with(412, response)
|
||||
@@ -632,7 +627,7 @@ class TestRestApiHandler(unittest.TestCase):
|
||||
|
||||
# Schedule in paused mode
|
||||
with patch.object(RestApiHandler, 'write_response') as response_mock, \
|
||||
patch.object(GlobalConfig, 'is_paused', PropertyMock(return_value=True)):
|
||||
patch.object(global_config.__class__, 'is_paused', PropertyMock(return_value=True)):
|
||||
dcs.manual_failover.return_value = False
|
||||
MockRestApiServer(RestApiHandler, request)
|
||||
response_mock.assert_called_with(400, "Can't schedule switchover in the paused state")
|
||||
|
||||
@@ -5,7 +5,11 @@ import io
|
||||
|
||||
from copy import deepcopy
|
||||
from mock import MagicMock, Mock, patch
|
||||
from patroni.config import Config, ConfigParseError, GlobalConfig
|
||||
|
||||
from patroni import global_config
|
||||
from patroni.config import ClusterConfig, Config, ConfigParseError
|
||||
|
||||
from .test_ha import get_cluster_initialized_with_only_leader
|
||||
|
||||
|
||||
class TestConfig(unittest.TestCase):
|
||||
@@ -248,4 +252,6 @@ class TestConfig(unittest.TestCase):
|
||||
def test_global_config_is_synchronous_mode(self):
|
||||
# we should ignore synchronous_mode setting in a standby cluster
|
||||
config = {'standby_cluster': {'host': 'some_host'}, 'synchronous_mode': True}
|
||||
self.assertFalse(GlobalConfig(config).is_synchronous_mode)
|
||||
cluster = get_cluster_initialized_with_only_leader(cluster_config=ClusterConfig(1, config, 1))
|
||||
test_config = global_config.from_cluster(cluster)
|
||||
self.assertFalse(test_config.is_synchronous_mode)
|
||||
|
||||
@@ -7,6 +7,7 @@ import unittest
|
||||
from click.testing import CliRunner
|
||||
from datetime import datetime, timedelta
|
||||
from mock import patch, Mock, PropertyMock
|
||||
from patroni import global_config
|
||||
from patroni.ctl import ctl, load_config, output_members, get_dcs, parse_dcs, \
|
||||
get_all_members, get_any_member, get_cursor, query_member, PatroniCtlException, apply_config_changes, \
|
||||
format_config_for_editing, show_diff, invoke_editor, format_pg_version, CONFIG_FILE_PATH, PatronictlPrettyTable
|
||||
@@ -147,7 +148,7 @@ class TestCtl(unittest.TestCase):
|
||||
self.assertEqual(result.exit_code, 0)
|
||||
|
||||
# Scheduled in pause mode
|
||||
with patch('patroni.config.GlobalConfig.is_paused', PropertyMock(return_value=True)):
|
||||
with patch.object(global_config.__class__, 'is_paused', PropertyMock(return_value=True)):
|
||||
result = self.runner.invoke(ctl, ['switchover', 'dummy', '--group', '0',
|
||||
'--force', '--scheduled', '2015-01-01T12:00:00'])
|
||||
self.assertEqual(result.exit_code, 1)
|
||||
@@ -369,7 +370,7 @@ class TestCtl(unittest.TestCase):
|
||||
result = self.runner.invoke(ctl, ['restart', 'alpha', 'other', '--force', '--scheduled', '2300-10-01T14:30'])
|
||||
assert 'Failed: flush scheduled restart' in result.output
|
||||
|
||||
with patch('patroni.config.GlobalConfig.is_paused', PropertyMock(return_value=True)):
|
||||
with patch.object(global_config.__class__, 'is_paused', PropertyMock(return_value=True)):
|
||||
result = self.runner.invoke(ctl,
|
||||
['restart', 'alpha', 'other', '--force', '--scheduled', '2300-10-01T14:30'])
|
||||
assert result.exit_code == 1
|
||||
@@ -533,7 +534,7 @@ class TestCtl(unittest.TestCase):
|
||||
result = self.runner.invoke(ctl, ['pause', 'dummy'])
|
||||
assert 'Failed' in result.output
|
||||
|
||||
with patch('patroni.config.GlobalConfig.is_paused', PropertyMock(return_value=True)):
|
||||
with patch.object(global_config.__class__, 'is_paused', PropertyMock(return_value=True)):
|
||||
result = self.runner.invoke(ctl, ['pause', 'dummy'])
|
||||
assert 'Cluster is already paused' in result.output
|
||||
|
||||
@@ -552,11 +553,11 @@ class TestCtl(unittest.TestCase):
|
||||
@patch('patroni.dcs.AbstractDCS.get_cluster', Mock(return_value=get_cluster_initialized_with_leader()))
|
||||
def test_resume_cluster(self, mock_post):
|
||||
mock_post.return_value.status = 200
|
||||
with patch('patroni.config.GlobalConfig.is_paused', PropertyMock(return_value=False)):
|
||||
with patch.object(global_config.__class__, 'is_paused', PropertyMock(return_value=False)):
|
||||
result = self.runner.invoke(ctl, ['resume', 'dummy'])
|
||||
assert 'Cluster is not paused' in result.output
|
||||
|
||||
with patch('patroni.config.GlobalConfig.is_paused', PropertyMock(return_value=True)):
|
||||
with patch.object(global_config.__class__, 'is_paused', PropertyMock(return_value=True)):
|
||||
result = self.runner.invoke(ctl, ['resume', 'dummy'])
|
||||
assert 'Success' in result.output
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ import os
|
||||
import sys
|
||||
|
||||
from mock import Mock, MagicMock, PropertyMock, patch, mock_open
|
||||
from patroni import global_config
|
||||
from patroni.collections import CaseInsensitiveSet
|
||||
from patroni.config import Config
|
||||
from patroni.dcs import Cluster, ClusterConfig, Failover, Leader, Member, get_dcs, Status, SyncState, TimelineHistory
|
||||
@@ -217,6 +218,7 @@ class TestHa(PostgresInit):
|
||||
self.ha = Ha(MockPatroni(self.p, self.e))
|
||||
self.ha.old_cluster = self.e.get_cluster()
|
||||
self.ha.cluster = get_cluster_initialized_without_leader()
|
||||
global_config.update(self.ha.cluster)
|
||||
self.ha.load_cluster_from_dcs = Mock()
|
||||
|
||||
def test_update_lock(self):
|
||||
@@ -251,8 +253,10 @@ class TestHa(PostgresInit):
|
||||
@patch('patroni.dcs.etcd.Etcd.initialize', return_value=True)
|
||||
def test_bootstrap_as_standby_leader(self, initialize):
|
||||
self.p.data_directory_empty = true
|
||||
self.ha.cluster = get_cluster_not_initialized_without_leader(
|
||||
cluster_config=ClusterConfig(1, {"standby_cluster": {"port": 5432}}, 1))
|
||||
global_config.update(self.ha.cluster)
|
||||
self.ha.cluster = get_cluster_not_initialized_without_leader(cluster_config=ClusterConfig(0, {}, 0))
|
||||
self.ha.patroni.config._dynamic_configuration = {"standby_cluster": {"port": 5432}}
|
||||
self.assertEqual(self.ha.run_cycle(), 'trying to bootstrap a new standby leader')
|
||||
|
||||
def test_bootstrap_waiting_for_standby_leader(self):
|
||||
@@ -318,7 +322,7 @@ class TestHa(PostgresInit):
|
||||
self.ha.state_handler.cancellable._process = Mock()
|
||||
self.ha._crash_recovery_started -= 600
|
||||
self.ha.cluster.config.data.update({'maximum_lag_on_failover': 10})
|
||||
self.ha.global_config = self.ha.patroni.config.get_global_config(self.ha.cluster)
|
||||
global_config.update(self.ha.cluster)
|
||||
self.assertEqual(self.ha.run_cycle(), 'terminated crash recovery because of startup timeout')
|
||||
|
||||
@patch.object(Rewind, 'ensure_clean_shutdown', Mock())
|
||||
@@ -509,7 +513,7 @@ class TestHa(PostgresInit):
|
||||
def test_check_failsafe_topology(self):
|
||||
self.ha.load_cluster_from_dcs = Mock(side_effect=DCSError('Etcd is not responding properly'))
|
||||
self.ha.cluster = get_cluster_initialized_with_leader_and_failsafe()
|
||||
self.ha.global_config = self.ha.patroni.config.get_global_config(self.ha.cluster)
|
||||
global_config.update(self.ha.cluster)
|
||||
self.ha.dcs._last_failsafe = self.ha.cluster.failsafe
|
||||
self.assertEqual(self.ha.run_cycle(), 'demoting self because DCS is not accessible and I was a leader')
|
||||
self.ha.state_handler.name = self.ha.cluster.leader.name
|
||||
@@ -529,7 +533,7 @@ class TestHa(PostgresInit):
|
||||
def test_no_dcs_connection_primary_failsafe(self):
|
||||
self.ha.load_cluster_from_dcs = Mock(side_effect=DCSError('Etcd is not responding properly'))
|
||||
self.ha.cluster = get_cluster_initialized_with_leader_and_failsafe()
|
||||
self.ha.global_config = self.ha.patroni.config.get_global_config(self.ha.cluster)
|
||||
global_config.update(self.ha.cluster)
|
||||
self.ha.dcs._last_failsafe = self.ha.cluster.failsafe
|
||||
self.ha.state_handler.name = self.ha.cluster.leader.name
|
||||
self.assertEqual(self.ha.run_cycle(),
|
||||
@@ -546,7 +550,7 @@ class TestHa(PostgresInit):
|
||||
def test_no_dcs_connection_replica_failsafe(self):
|
||||
self.ha.load_cluster_from_dcs = Mock(side_effect=DCSError('Etcd is not responding properly'))
|
||||
self.ha.cluster = get_cluster_initialized_with_leader_and_failsafe()
|
||||
self.ha.global_config = self.ha.patroni.config.get_global_config(self.ha.cluster)
|
||||
global_config.update(self.ha.cluster)
|
||||
self.ha.update_failsafe({'name': 'leader', 'api_url': 'http://127.0.0.1:8008/patroni',
|
||||
'conn_url': 'postgres://127.0.0.1:5432/postgres', 'slots': {'foo': 1000}})
|
||||
self.p.is_primary = false
|
||||
@@ -766,7 +770,7 @@ class TestHa(PostgresInit):
|
||||
with patch('patroni.ha.logger.info') as mock_info:
|
||||
self.ha.fetch_node_status = get_node_status(wal_position=1)
|
||||
self.ha.cluster.config.data.update({'maximum_lag_on_failover': 5})
|
||||
self.ha.global_config = self.ha.patroni.config.get_global_config(self.ha.cluster)
|
||||
global_config.update(self.ha.cluster)
|
||||
self.assertEqual(self.ha.run_cycle(), 'no action. I am (postgresql0), the leader with the lock')
|
||||
self.assertEqual(mock_info.call_args_list[0][0], ('Member %s exceeds maximum replication lag', 'leader'))
|
||||
|
||||
@@ -1032,7 +1036,7 @@ class TestHa(PostgresInit):
|
||||
def test__is_healthiest_node(self):
|
||||
self.p.is_primary = false
|
||||
self.ha.cluster = get_cluster_initialized_without_leader(sync=('postgresql1', self.p.name))
|
||||
self.ha.global_config = self.ha.patroni.config.get_global_config(self.ha.cluster)
|
||||
global_config.update(self.ha.cluster)
|
||||
self.assertTrue(self.ha._is_healthiest_node(self.ha.old_cluster.members))
|
||||
self.ha.fetch_node_status = get_node_status() # accessible, in_recovery
|
||||
self.assertTrue(self.ha._is_healthiest_node(self.ha.old_cluster.members))
|
||||
@@ -1049,7 +1053,7 @@ class TestHa(PostgresInit):
|
||||
with patch.object(Ha, 'is_synchronous_mode', Mock(return_value=True)):
|
||||
self.assertTrue(self.ha._is_healthiest_node(self.ha.old_cluster.members))
|
||||
self.ha.cluster.config.data.update({'maximum_lag_on_failover': 5})
|
||||
self.ha.global_config = self.ha.patroni.config.get_global_config(self.ha.cluster)
|
||||
global_config.update(self.ha.cluster)
|
||||
with patch('patroni.postgresql.Postgresql.last_operation', return_value=1):
|
||||
self.assertFalse(self.ha._is_healthiest_node(self.ha.old_cluster.members))
|
||||
with patch('patroni.postgresql.Postgresql.replica_cached_timeline', return_value=None):
|
||||
@@ -1272,7 +1276,7 @@ class TestHa(PostgresInit):
|
||||
self.p.is_running = false
|
||||
self.ha.cluster = get_cluster_initialized_with_leader(sync=(self.p.name, 'other'))
|
||||
self.ha.cluster.config.data.update({'synchronous_mode': True, 'primary_start_timeout': 0})
|
||||
self.ha.global_config = self.ha.patroni.config.get_global_config(self.ha.cluster)
|
||||
global_config.update(self.ha.cluster)
|
||||
self.ha.has_lock = true
|
||||
self.ha.update_lock = true
|
||||
self.ha.fetch_node_status = get_node_status() # accessible, in_recovery
|
||||
@@ -1282,13 +1286,13 @@ class TestHa(PostgresInit):
|
||||
def test_primary_stop_timeout(self):
|
||||
self.assertEqual(self.ha.primary_stop_timeout(), None)
|
||||
self.ha.cluster.config.data.update({'primary_stop_timeout': 30})
|
||||
self.ha.global_config = self.ha.patroni.config.get_global_config(self.ha.cluster)
|
||||
global_config.update(self.ha.cluster)
|
||||
with patch.object(Ha, 'is_synchronous_mode', Mock(return_value=True)):
|
||||
self.assertEqual(self.ha.primary_stop_timeout(), 30)
|
||||
with patch.object(Ha, 'is_synchronous_mode', Mock(return_value=False)):
|
||||
self.assertEqual(self.ha.primary_stop_timeout(), None)
|
||||
self.ha.cluster.config.data['primary_stop_timeout'] = None
|
||||
self.ha.global_config = self.ha.patroni.config.get_global_config(self.ha.cluster)
|
||||
global_config.update(self.ha.cluster)
|
||||
self.assertEqual(self.ha.primary_stop_timeout(), None)
|
||||
|
||||
@patch('patroni.postgresql.Postgresql.follow')
|
||||
@@ -1380,8 +1384,9 @@ class TestHa(PostgresInit):
|
||||
# Test sync set to '*' when synchronous_mode_strict is enabled
|
||||
mock_set_sync.reset_mock()
|
||||
self.p.sync_handler.current_state = Mock(return_value=(CaseInsensitiveSet(), CaseInsensitiveSet()))
|
||||
with patch('patroni.config.GlobalConfig.is_synchronous_mode_strict', PropertyMock(return_value=True)):
|
||||
self.ha.run_cycle()
|
||||
self.ha.cluster.config.data['synchronous_mode_strict'] = True
|
||||
global_config.update(self.ha.cluster)
|
||||
self.ha.run_cycle()
|
||||
mock_set_sync.assert_called_once_with(CaseInsensitiveSet('*'))
|
||||
|
||||
def test_sync_replication_become_primary(self):
|
||||
@@ -1514,7 +1519,6 @@ class TestHa(PostgresInit):
|
||||
|
||||
@patch('patroni.postgresql.mtime', Mock(return_value=1588316884))
|
||||
@patch('builtins.open', Mock(side_effect=Exception))
|
||||
@patch.object(Cluster, 'is_unlocked', Mock(return_value=False))
|
||||
def test_restore_cluster_config(self):
|
||||
self.ha.cluster.config.data.clear()
|
||||
self.ha.has_lock = true
|
||||
|
||||
@@ -154,6 +154,7 @@ class TestPatroni(unittest.TestCase):
|
||||
self.p.api.start = Mock()
|
||||
self.p.logger.start = Mock()
|
||||
self.p.config._dynamic_configuration = {}
|
||||
self.assertRaises(SleepException, self.p.run)
|
||||
with patch('patroni.dcs.Cluster.is_unlocked', Mock(return_value=True)):
|
||||
self.assertRaises(SleepException, self.p.run)
|
||||
with patch('patroni.config.Config.reload_local_configuration', Mock(return_value=False)):
|
||||
|
||||
@@ -9,9 +9,9 @@ from mock import Mock, MagicMock, PropertyMock, patch, mock_open
|
||||
|
||||
import patroni.psycopg as psycopg
|
||||
|
||||
from patroni import global_config
|
||||
from patroni.async_executor import CriticalTask
|
||||
from patroni.collections import CaseInsensitiveSet
|
||||
from patroni.config import GlobalConfig
|
||||
from patroni.dcs import RemoteMember
|
||||
from patroni.exceptions import PostgresConnectionException, PatroniException
|
||||
from patroni.postgresql import Postgresql, STATE_REJECT, STATE_NO_RESPONSE
|
||||
@@ -692,12 +692,12 @@ class TestPostgresql(BaseTestPostgresql):
|
||||
|
||||
def test_get_server_parameters(self):
|
||||
config = {'parameters': {'wal_level': 'hot_standby', 'max_prepared_transactions': 100}, 'listen': '0'}
|
||||
self.p._global_config = GlobalConfig({'synchronous_mode': True})
|
||||
self.p.config.get_server_parameters(config)
|
||||
self.p._global_config = GlobalConfig({'synchronous_mode': True, 'synchronous_mode_strict': True})
|
||||
self.p.config.get_server_parameters(config)
|
||||
self.p.config.set_synchronous_standby_names('foo')
|
||||
self.assertTrue(str(self.p.config.get_server_parameters(config)).startswith('<CaseInsensitiveDict'))
|
||||
with patch.object(global_config.__class__, 'is_synchronous_mode', PropertyMock(return_value=True)):
|
||||
self.p.config.get_server_parameters(config)
|
||||
with patch.object(global_config.__class__, 'is_synchronous_mode_strict', PropertyMock(return_value=True)):
|
||||
self.p.config.get_server_parameters(config)
|
||||
self.p.config.set_synchronous_standby_names('foo')
|
||||
self.assertTrue(str(self.p.config.get_server_parameters(config)).startswith('<CaseInsensitiveDict'))
|
||||
|
||||
@patch('time.sleep', Mock())
|
||||
def test__wait_for_connection_close(self):
|
||||
|
||||
@@ -6,8 +6,7 @@ import unittest
|
||||
from mock import Mock, PropertyMock, patch
|
||||
from threading import Thread
|
||||
|
||||
from patroni import psycopg
|
||||
from patroni.config import GlobalConfig
|
||||
from patroni import global_config, psycopg
|
||||
from patroni.dcs import Cluster, ClusterConfig, Member, Status, SyncState
|
||||
from patroni.postgresql import Postgresql
|
||||
from patroni.postgresql.misc import fsync_dir
|
||||
@@ -29,12 +28,12 @@ class TestSlotsHandler(BaseTestPostgresql):
|
||||
@patch.object(Postgresql, 'is_running', Mock(return_value=True))
|
||||
def setUp(self):
|
||||
super(TestSlotsHandler, self).setUp()
|
||||
self.p._global_config = GlobalConfig({})
|
||||
self.s = self.p.slots_handler
|
||||
self.p.start()
|
||||
config = ClusterConfig(1, {'slots': {'ls': {'database': 'a', 'plugin': 'b'}, 'ls2': None}}, 1)
|
||||
self.cluster = Cluster(True, config, self.leader, Status(0, {'ls': 12345, 'ls2': 12345}),
|
||||
[self.me, self.other, self.leadermem], None, SyncState.empty(), None, None)
|
||||
global_config.update(self.cluster)
|
||||
|
||||
def test_sync_replication_slots(self):
|
||||
config = ClusterConfig(1, {'slots': {'test_3': {'database': 'a', 'plugin': 'b'},
|
||||
@@ -42,11 +41,12 @@ class TestSlotsHandler(BaseTestPostgresql):
|
||||
'ignore_slots': [{'name': 'blabla'}]}, 1)
|
||||
cluster = Cluster(True, config, self.leader, Status(0, {'test_3': 10}),
|
||||
[self.me, self.other, self.leadermem], None, SyncState.empty(), None, None)
|
||||
global_config.update(cluster)
|
||||
with mock.patch('patroni.postgresql.Postgresql._query', Mock(side_effect=psycopg.OperationalError)):
|
||||
self.s.sync_replication_slots(cluster, False)
|
||||
self.p.set_role('standby_leader')
|
||||
with patch.object(SlotsHandler, 'drop_replication_slot', Mock(return_value=(True, False))), \
|
||||
patch.object(GlobalConfig, 'is_standby_cluster', PropertyMock(return_value=True)), \
|
||||
patch.object(global_config.__class__, 'is_standby_cluster', PropertyMock(return_value=True)), \
|
||||
patch('patroni.postgresql.slots.logger.debug') as mock_debug:
|
||||
self.s.sync_replication_slots(cluster, False)
|
||||
mock_debug.assert_called_once()
|
||||
@@ -94,6 +94,7 @@ class TestSlotsHandler(BaseTestPostgresql):
|
||||
'ignore_slots': [{'name': 'blabla'}]}, 1)
|
||||
cluster = Cluster(True, config, self.leader, Status.empty(), [self.me, self.other, self.leadermem],
|
||||
None, SyncState.empty(), None, None)
|
||||
global_config.update(cluster)
|
||||
|
||||
self.s.sync_replication_slots(cluster, False)
|
||||
with patch.object(Postgresql, '_query') as mock_query:
|
||||
@@ -189,6 +190,7 @@ class TestSlotsHandler(BaseTestPostgresql):
|
||||
config = ClusterConfig(1, {'slots': {'blabla': {'type': 'physical'}, 'leader': None}}, 1)
|
||||
cluster = Cluster(True, config, self.leader, Status(0, {'blabla': 12346}),
|
||||
[self.me, self.other, self.leadermem], None, SyncState.empty(), None, None)
|
||||
global_config.update(cluster)
|
||||
self.s.sync_replication_slots(cluster, False)
|
||||
with patch.object(SlotsHandler, '_query', Mock(side_effect=[[('blabla', 'physical', 12345, None, None, None,
|
||||
None, None)], Exception])) as mock_query, \
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
import os
|
||||
|
||||
from mock import Mock, patch
|
||||
from mock import Mock, patch, PropertyMock
|
||||
|
||||
from patroni import global_config
|
||||
from patroni.collections import CaseInsensitiveSet
|
||||
from patroni.config import GlobalConfig
|
||||
from patroni.dcs import Cluster, SyncState
|
||||
from patroni.postgresql import Postgresql
|
||||
|
||||
@@ -13,6 +13,7 @@ from . import BaseTestPostgresql, psycopg_connect, mock_available_gucs
|
||||
@patch('subprocess.call', Mock(return_value=0))
|
||||
@patch('patroni.psycopg.connect', psycopg_connect)
|
||||
@patch.object(Postgresql, 'available_gucs', mock_available_gucs)
|
||||
@patch.object(global_config.__class__, 'is_synchronous_mode', PropertyMock(return_value=True))
|
||||
class TestSync(BaseTestPostgresql):
|
||||
|
||||
@patch('subprocess.call', Mock(return_value=0))
|
||||
@@ -24,7 +25,6 @@ class TestSync(BaseTestPostgresql):
|
||||
def setUp(self):
|
||||
super(TestSync, self).setUp()
|
||||
self.p.config.write_postgresql_conf()
|
||||
self.p._global_config = GlobalConfig({'synchronous_mode': True})
|
||||
self.s = self.p.sync_handler
|
||||
|
||||
@patch.object(Postgresql, 'last_operation', Mock(return_value=1))
|
||||
@@ -96,7 +96,6 @@ class TestSync(BaseTestPostgresql):
|
||||
self.assertEqual(value_in_conf(), None)
|
||||
|
||||
mock_reload.reset_mock()
|
||||
self.p._global_config = GlobalConfig({'synchronous_mode': True})
|
||||
self.s.set_synchronous_standby_names(CaseInsensitiveSet('*'))
|
||||
mock_reload.assert_called()
|
||||
self.assertEqual(value_in_conf(), "synchronous_standby_names = '*'")
|
||||
|
||||
Reference in New Issue
Block a user