mirror of
https://github.com/outbackdingo/patroni.git
synced 2026-01-27 10:20:10 +00:00
Merge branch 'release/v3.3.0' of github.com:zalando/patroni into feature/citus-secondaries
This commit is contained in:
2
.github/workflows/tests.yaml
vendored
2
.github/workflows/tests.yaml
vendored
@@ -174,7 +174,7 @@ jobs:
|
||||
|
||||
- uses: jakebailey/pyright-action@v1
|
||||
with:
|
||||
version: 1.1.347
|
||||
version: 1.1.356
|
||||
|
||||
docs:
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
@@ -3,6 +3,67 @@
|
||||
Release notes
|
||||
=============
|
||||
|
||||
Version 3.3.0
|
||||
-------------
|
||||
|
||||
**New features**
|
||||
|
||||
- Add ability to pass ``auth_data`` to Zookeeper client (Aras Mumcuyan)
|
||||
|
||||
It allows to specify the authentication credentials to use for the connection.
|
||||
|
||||
- Add a contrib script for ``Barman`` integration (Israel Barth Rubio)
|
||||
|
||||
Provide an application ``patroni_barman`` that allows to perform ``Barman`` operations remotely and can be used as a custom bootstrap/custom replica method or as an ``on_role_change`` callback. Please check :ref:`here <tools_integration>` for more information.
|
||||
|
||||
- Support JSON log format (alisalemmi)
|
||||
|
||||
Apart from ``plain``, Patroni now also supports ``json`` log format. Requires ``python-json-logger`` library to be installed.
|
||||
|
||||
- Show ``pending_restart_reason`` information (Polina Bungina)
|
||||
|
||||
Provide extended information about the PostgreSQL parameters that caused ``pending_restart`` flag to be set. Both ``patronictl list`` and ``/patroni`` REST API endpoint now show the parameters names and their "diff" as ``pending_restart_reason``.
|
||||
|
||||
- Implement ``nostream`` tag (Grigory Smolkin)
|
||||
|
||||
If ``nostream`` tag is set to ``true``, the node will not use replication protocol to stream WAL but instead rely on archive recovery (if ``restore_command`` is configured). It also disables copying and synchronization of permanent logical replication slots on the node itself and all its cascading replicas.
|
||||
|
||||
|
||||
**Improvements**
|
||||
|
||||
- Implement validation of the log section (Alexander Kukushkin)
|
||||
|
||||
Until now validator was not checking the correctness of the logging configuration provided.
|
||||
|
||||
- Improve logging for PostgreSQL parameters change (Polina Bungina)
|
||||
|
||||
Convert old values to a human-readable format and log information about the ``pg_controldata`` vs Patroni global configuration mismatch.
|
||||
|
||||
|
||||
**Bugfixes**
|
||||
|
||||
- Properly filter out not allowed ``pg_basebackup`` options (Israel Barth Rubio)
|
||||
|
||||
Due to a bug, Patroni was not properly filtering out the not allowed options configured for the ``basebackup`` replica bootstrap method, when provided in the ``- setting: value`` format.
|
||||
|
||||
- Fix ``etcd3`` authentication error handling (Alexander Kukushkin)
|
||||
|
||||
Always retry one time on ``etcd3`` authentication error if authentication was not done right before executing the request. Also, do not restart watchers on reauthentication.
|
||||
|
||||
- Improve logic of the validator files discovery (Waynerv)
|
||||
|
||||
Use ``importlib`` library to discover the files with available configuration parameters when possible (for Python 3.9+). This implementation is more stable and doesn't break the Patroni distributions based on ``zip`` archives.
|
||||
|
||||
- Use ``target_session_attrs`` only when multiple hosts are specified in the ``standby_cluster`` section (Alexander Kukushkin)
|
||||
|
||||
``target_session_attrs=read-write`` is now added to the ``primary_conninfo`` on the standby leader node only when ``standby_cluster.host`` section contains multiple hosts separated by commas.
|
||||
|
||||
- Add compatibility code for ``ydiff`` library version 1.3+ (Alexander Kukushkin)
|
||||
|
||||
.. warning::
|
||||
All older Partoni versions are not compatible with ``ydiff`` 1.3+. Please upgrade Patroni, use ``ydiff`` version <1.3, or install ``cdiff``.
|
||||
|
||||
|
||||
Version 3.2.2
|
||||
-------------
|
||||
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
.. _tools_integration:
|
||||
|
||||
Integration with other tools
|
||||
============================
|
||||
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
"""Patroni custom object types somewhat like :mod:`collections` module.
|
||||
|
||||
Provides a case insensitive :class:`dict` and :class:`set` object types.
|
||||
Provides a case insensitive :class:`dict` and :class:`set` object types, and `EMPTY_DICT` frozen dictionary object.
|
||||
"""
|
||||
from collections import OrderedDict
|
||||
from typing import Any, Collection, Dict, Iterator, KeysView, MutableMapping, MutableSet, Optional
|
||||
from copy import deepcopy
|
||||
from typing import Any, Collection, Dict, Iterator, KeysView, Mapping, MutableMapping, MutableSet, Optional
|
||||
|
||||
|
||||
class CaseInsensitiveSet(MutableSet[str]):
|
||||
@@ -48,7 +49,7 @@ class CaseInsensitiveSet(MutableSet[str]):
|
||||
"""
|
||||
return str(set(self._values.values()))
|
||||
|
||||
def __contains__(self, value: str) -> bool:
|
||||
def __contains__(self, value: object) -> bool:
|
||||
"""Check if set contains *value*.
|
||||
|
||||
The check is performed case-insensitively.
|
||||
@@ -57,7 +58,7 @@ class CaseInsensitiveSet(MutableSet[str]):
|
||||
|
||||
:returns: ``True`` if *value* is already in the set, ``False`` otherwise.
|
||||
"""
|
||||
return value.lower() in self._values
|
||||
return isinstance(value, str) and value.lower() in self._values
|
||||
|
||||
def __iter__(self) -> Iterator[str]:
|
||||
"""Iterate over the values in this set.
|
||||
@@ -207,3 +208,47 @@ class CaseInsensitiveDict(MutableMapping[str, Any]):
|
||||
"<CaseInsensitiveDict{'A': 'B', 'c': 'd'} at ..."
|
||||
"""
|
||||
return '<{0}{1} at {2:x}>'.format(type(self).__name__, dict(self.items()), id(self))
|
||||
|
||||
|
||||
class _FrozenDict(Mapping[str, Any]):
|
||||
"""Frozen dictionary object."""
|
||||
|
||||
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
||||
"""Create a new instance of :class:`_FrozenDict` with given data."""
|
||||
self.__values: Dict[str, Any] = dict(*args, **kwargs)
|
||||
|
||||
def __iter__(self) -> Iterator[str]:
|
||||
"""Iterate over keys of this dict.
|
||||
|
||||
:yields: each key present in the dict. Yields each key with its last case that has been stored.
|
||||
"""
|
||||
return iter(self.__values)
|
||||
|
||||
def __len__(self) -> int:
|
||||
"""Get the length of this dict.
|
||||
|
||||
:returns: number of keys in the dict.
|
||||
|
||||
:Example:
|
||||
|
||||
>>> len(_FrozenDict())
|
||||
0
|
||||
"""
|
||||
return len(self.__values)
|
||||
|
||||
def __getitem__(self, key: str) -> Any:
|
||||
"""Get the value corresponding to *key*.
|
||||
|
||||
:returns: value corresponding to *key*.
|
||||
"""
|
||||
return self.__values[key]
|
||||
|
||||
def copy(self) -> Dict[str, Any]:
|
||||
"""Create a copy of this dict.
|
||||
|
||||
:return: a new dict object with the same keys and values of this dict.
|
||||
"""
|
||||
return deepcopy(self.__values)
|
||||
|
||||
|
||||
EMPTY_DICT = _FrozenDict()
|
||||
|
||||
@@ -12,7 +12,7 @@ from copy import deepcopy
|
||||
from typing import Any, Callable, Collection, Dict, List, Optional, Union, TYPE_CHECKING
|
||||
|
||||
from . import PATRONI_ENV_PREFIX
|
||||
from .collections import CaseInsensitiveDict
|
||||
from .collections import CaseInsensitiveDict, EMPTY_DICT
|
||||
from .dcs import ClusterConfig
|
||||
from .exceptions import ConfigParseError
|
||||
from .file_perm import pg_perm
|
||||
@@ -445,14 +445,14 @@ class Config(object):
|
||||
|
||||
for name, value in dynamic_configuration.items():
|
||||
if name == 'postgresql':
|
||||
for name, value in (value or {}).items():
|
||||
for name, value in (value or EMPTY_DICT).items():
|
||||
if name == 'parameters':
|
||||
config['postgresql'][name].update(self._process_postgresql_parameters(value))
|
||||
elif name not in ('connect_address', 'proxy_address', 'listen',
|
||||
'config_dir', 'data_dir', 'pgpass', 'authentication'):
|
||||
config['postgresql'][name] = deepcopy(value)
|
||||
elif name == 'standby_cluster':
|
||||
for name, value in (value or {}).items():
|
||||
for name, value in (value or EMPTY_DICT).items():
|
||||
if name in self.__DEFAULT_CONFIG['standby_cluster']:
|
||||
config['standby_cluster'][name] = deepcopy(value)
|
||||
elif name in config: # only variables present in __DEFAULT_CONFIG allowed to be overridden from DCS
|
||||
|
||||
@@ -15,6 +15,7 @@ if TYPE_CHECKING: # pragma: no cover
|
||||
from psycopg2 import cursor
|
||||
|
||||
from . import psycopg
|
||||
from .collections import EMPTY_DICT
|
||||
from .config import Config
|
||||
from .exceptions import PatroniException
|
||||
from .log import PatroniLogger
|
||||
@@ -244,7 +245,8 @@ class SampleConfigGenerator(AbstractConfigGenerator):
|
||||
See :func:`~patroni.postgresql.misc.postgres_major_version_to_int` and
|
||||
:func:`~patroni.utils.get_major_version`.
|
||||
"""
|
||||
postgres_bin = ((self.config.get('postgresql') or {}).get('bin_name') or {}).get('postgres', 'postgres')
|
||||
postgres_bin = ((self.config.get('postgresql')
|
||||
or EMPTY_DICT).get('bin_name') or EMPTY_DICT).get('postgres', 'postgres')
|
||||
return postgres_major_version_to_int(get_major_version(self.config['postgresql'].get('bin_dir'), postgres_bin))
|
||||
|
||||
def generate(self) -> None:
|
||||
@@ -411,8 +413,10 @@ class RunningClusterConfigGenerator(AbstractConfigGenerator):
|
||||
val = self.parsed_dsn.get(conn_param, os.getenv(env_var))
|
||||
if val:
|
||||
su_params[conn_param] = val
|
||||
patroni_env_su_username = ((self.config.get('authentication') or {}).get('superuser') or {}).get('username')
|
||||
patroni_env_su_pwd = ((self.config.get('authentication') or {}).get('superuser') or {}).get('password')
|
||||
patroni_env_su_username = ((self.config.get('authentication')
|
||||
or EMPTY_DICT).get('superuser') or EMPTY_DICT).get('username')
|
||||
patroni_env_su_pwd = ((self.config.get('authentication')
|
||||
or EMPTY_DICT).get('superuser') or EMPTY_DICT).get('password')
|
||||
# because we use "username" in the config for some reason
|
||||
su_params['username'] = su_params.pop('user', patroni_env_su_username) or getuser()
|
||||
su_params['password'] = su_params.get('password', patroni_env_su_pwd) or \
|
||||
|
||||
@@ -85,6 +85,8 @@ def dcs_modules() -> List[str]:
|
||||
|
||||
:returns: list of known module names with absolute python module path namespace, e.g. ``patroni.dcs.etcd``.
|
||||
"""
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
assert isinstance(__package__, str)
|
||||
return iter_modules(__package__)
|
||||
|
||||
|
||||
@@ -101,6 +103,8 @@ def iter_dcs_classes(
|
||||
|
||||
:returns: an iterator of tuples, each containing the module ``name`` and the imported DCS class object.
|
||||
"""
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
assert isinstance(__package__, str)
|
||||
return iter_classes(__package__, AbstractDCS, config)
|
||||
|
||||
|
||||
|
||||
@@ -444,8 +444,9 @@ class Consul(AbstractDCS):
|
||||
|
||||
:returns: all MPP groups as :class:`dict`, with group IDs as keys and :class:`Cluster` objects as values.
|
||||
"""
|
||||
results: Optional[List[Dict[str, Any]]]
|
||||
_, results = self.retry(self._client.kv.get, path, recurse=True, consistency=self._consistency)
|
||||
clusters: Dict[int, Dict[str, Cluster]] = defaultdict(dict)
|
||||
clusters: Dict[int, Dict[str, Dict[str, Any]]] = defaultdict(dict)
|
||||
for node in results or []:
|
||||
key = node['Key'][len(path):].split('/', 1)
|
||||
if len(key) == 2 and self._mpp.group_re.match(key[0]):
|
||||
|
||||
@@ -20,6 +20,7 @@ from threading import Condition, Lock, Thread
|
||||
from typing import Any, Callable, Collection, Dict, List, Optional, Tuple, Type, Union, TYPE_CHECKING
|
||||
|
||||
from . import AbstractDCS, Cluster, ClusterConfig, Failover, Leader, Member, Status, SyncState, TimelineHistory
|
||||
from ..collections import EMPTY_DICT
|
||||
from ..exceptions import DCSError
|
||||
from ..postgresql.mpp import AbstractMPP
|
||||
from ..utils import deep_compare, iter_response_objects, keepalive_socket_options, \
|
||||
@@ -470,7 +471,7 @@ class K8sClient(object):
|
||||
if len(args) == 3: # name, namespace, body
|
||||
body = args[2]
|
||||
elif action == 'create': # namespace, body
|
||||
body = args[1]
|
||||
body = args[1] # pyright: ignore [reportGeneralTypeIssues]
|
||||
elif action == 'delete': # name, namespace
|
||||
body = kwargs.pop('body', None)
|
||||
else:
|
||||
@@ -509,7 +510,7 @@ class KubernetesRetriableException(k8s_client.rest.ApiException):
|
||||
@property
|
||||
def sleeptime(self) -> Optional[int]:
|
||||
try:
|
||||
return int((self.headers or {}).get('retry-after', ''))
|
||||
return int((self.headers or EMPTY_DICT).get('retry-after', ''))
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
@@ -654,7 +655,7 @@ class ObjectCache(Thread):
|
||||
obj = K8sObject(obj)
|
||||
success, old_value = self.set(name, obj)
|
||||
if success:
|
||||
new_value = (obj.metadata.annotations or {}).get(self._annotations_map.get(name))
|
||||
new_value = (obj.metadata.annotations or EMPTY_DICT).get(self._annotations_map.get(name, ''))
|
||||
elif ev_type == 'DELETED':
|
||||
success, old_value = self.delete(name, obj['metadata']['resourceVersion'])
|
||||
else:
|
||||
@@ -662,7 +663,7 @@ class ObjectCache(Thread):
|
||||
|
||||
if success and obj.get('kind') != 'Pod':
|
||||
if old_value:
|
||||
old_value = (old_value.metadata.annotations or {}).get(self._annotations_map.get(name))
|
||||
old_value = (old_value.metadata.annotations or EMPTY_DICT).get(self._annotations_map.get(name, ''))
|
||||
|
||||
value_changed = old_value != new_value and \
|
||||
(name != self._dcs.config_path or old_value is not None and new_value is not None)
|
||||
@@ -844,7 +845,7 @@ class Kubernetes(AbstractDCS):
|
||||
|
||||
@staticmethod
|
||||
def member(pod: K8sObject) -> Member:
|
||||
annotations = pod.metadata.annotations or {}
|
||||
annotations = pod.metadata.annotations or EMPTY_DICT
|
||||
member = Member.from_node(pod.metadata.resource_version, pod.metadata.name, None, annotations.get('status', ''))
|
||||
member.data['pod_labels'] = pod.metadata.labels
|
||||
return member
|
||||
@@ -925,7 +926,7 @@ class Kubernetes(AbstractDCS):
|
||||
failover = nodes.get(path + self._FAILOVER)
|
||||
metadata = failover and failover.metadata
|
||||
failover = metadata and Failover.from_node(metadata.resource_version,
|
||||
(metadata.annotations or {}).copy())
|
||||
(metadata.annotations or EMPTY_DICT).copy())
|
||||
|
||||
# get synchronization state
|
||||
sync = nodes.get(path + self._SYNC)
|
||||
@@ -1047,8 +1048,9 @@ class Kubernetes(AbstractDCS):
|
||||
|
||||
def __target_ref(self, leader_ip: str, latest_subsets: List[K8sObject], pod: K8sObject) -> K8sObject:
|
||||
# we want to re-use existing target_ref if possible
|
||||
empty_addresses: List[K8sObject] = []
|
||||
for subset in latest_subsets:
|
||||
for address in subset.addresses or []:
|
||||
for address in subset.addresses or empty_addresses:
|
||||
if address.ip == leader_ip and address.target_ref and address.target_ref.name == self._name:
|
||||
return address.target_ref
|
||||
return k8s_client.V1ObjectReference(kind='Pod', uid=pod.metadata.uid, namespace=self._namespace,
|
||||
@@ -1056,7 +1058,8 @@ class Kubernetes(AbstractDCS):
|
||||
|
||||
def _map_subsets(self, endpoints: Dict[str, Any], ips: List[str]) -> None:
|
||||
leader = self._kinds.get(self.leader_path)
|
||||
latest_subsets = leader and leader.subsets or []
|
||||
empty_addresses: List[K8sObject] = []
|
||||
latest_subsets = leader and leader.subsets or empty_addresses
|
||||
if not ips:
|
||||
# We want to have subsets empty
|
||||
if latest_subsets:
|
||||
@@ -1212,7 +1215,7 @@ class Kubernetes(AbstractDCS):
|
||||
if not retry.ensure_deadline(0.5):
|
||||
return False
|
||||
|
||||
kind_annotations = kind and kind.metadata.annotations or {}
|
||||
kind_annotations = kind and kind.metadata.annotations or EMPTY_DICT
|
||||
kind_resource_version = kind and kind.metadata.resource_version
|
||||
|
||||
# There is different leader or resource_version in cache didn't change
|
||||
@@ -1225,7 +1228,7 @@ class Kubernetes(AbstractDCS):
|
||||
def update_leader(self, leader: Leader, last_lsn: Optional[int],
|
||||
slots: Optional[Dict[str, int]] = None, failsafe: Optional[Dict[str, str]] = None) -> bool:
|
||||
kind = self._kinds.get(self.leader_path)
|
||||
kind_annotations = kind and kind.metadata.annotations or {}
|
||||
kind_annotations = kind and kind.metadata.annotations or EMPTY_DICT
|
||||
|
||||
if kind and kind_annotations.get(self._LEADER) != self._name:
|
||||
return False
|
||||
@@ -1346,7 +1349,7 @@ class Kubernetes(AbstractDCS):
|
||||
def delete_leader(self, leader: Optional[Leader], last_lsn: Optional[int] = None) -> bool:
|
||||
ret = False
|
||||
kind = self._kinds.get(self.leader_path)
|
||||
if kind and (kind.metadata.annotations or {}).get(self._LEADER) == self._name:
|
||||
if kind and (kind.metadata.annotations or EMPTY_DICT).get(self._LEADER) == self._name:
|
||||
annotations: Dict[str, Optional[str]] = {self._LEADER: None}
|
||||
if last_lsn:
|
||||
annotations[self._OPTIME] = str(last_lsn)
|
||||
|
||||
@@ -10,6 +10,7 @@ import types
|
||||
from copy import deepcopy
|
||||
from typing import Any, Dict, List, Optional, Union, TYPE_CHECKING
|
||||
|
||||
from .collections import EMPTY_DICT
|
||||
from .utils import parse_bool, parse_int
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
@@ -214,7 +215,7 @@ class GlobalConfig(types.ModuleType):
|
||||
@property
|
||||
def use_slots(self) -> bool:
|
||||
"""``True`` if cluster is configured to use replication slots."""
|
||||
return bool(parse_bool((self.get('postgresql') or {}).get('use_slots', True)))
|
||||
return bool(parse_bool((self.get('postgresql') or EMPTY_DICT).get('use_slots', True)))
|
||||
|
||||
@property
|
||||
def permanent_slots(self) -> Dict[str, Any]:
|
||||
@@ -222,7 +223,7 @@ class GlobalConfig(types.ModuleType):
|
||||
return deepcopy(self.get('permanent_replication_slots')
|
||||
or self.get('permanent_slots')
|
||||
or self.get('slots')
|
||||
or {})
|
||||
or EMPTY_DICT.copy())
|
||||
|
||||
|
||||
sys.modules[__name__] = GlobalConfig()
|
||||
|
||||
@@ -413,7 +413,8 @@ class PatroniLogger(Thread):
|
||||
if not isinstance(handler, RotatingFileHandler):
|
||||
handler = RotatingFileHandler(os.path.join(config['dir'], __name__))
|
||||
|
||||
handler.maxBytes = int(config.get('file_size', 25000000)) # pyright: ignore [reportGeneralTypeIssues]
|
||||
max_file_size = int(config.get('file_size', 25000000))
|
||||
handler.maxBytes = max_file_size # pyright: ignore [reportAttributeAccessIssue]
|
||||
handler.backupCount = int(config.get('file_num', 4))
|
||||
# we can't use `if not isinstance(handler, logging.StreamHandler)` below,
|
||||
# because RotatingFileHandler is a child of StreamHandler!!!
|
||||
|
||||
@@ -26,7 +26,7 @@ from .slots import SlotsHandler
|
||||
from .sync import SyncHandler
|
||||
from .. import global_config, psycopg
|
||||
from ..async_executor import CriticalTask
|
||||
from ..collections import CaseInsensitiveSet, CaseInsensitiveDict
|
||||
from ..collections import CaseInsensitiveSet, CaseInsensitiveDict, EMPTY_DICT
|
||||
from ..dcs import Cluster, Leader, Member, SLOT_ADVANCE_AVAILABLE_VERSION
|
||||
from ..exceptions import PostgresConnectionException
|
||||
from ..utils import Retry, RetryFailedError, polling_loop, data_directory_is_empty, parse_int
|
||||
@@ -272,7 +272,7 @@ class Postgresql(object):
|
||||
|
||||
:returns: path to Postgres binary named *cmd*.
|
||||
"""
|
||||
return os.path.join(self._bin_dir, (self.config.get('bin_name', {}) or {}).get(cmd, cmd))
|
||||
return os.path.join(self._bin_dir, (self.config.get('bin_name', {}) or EMPTY_DICT).get(cmd, cmd))
|
||||
|
||||
def pg_ctl(self, cmd: str, *args: str, **kwargs: Any) -> bool:
|
||||
"""Builds and executes pg_ctl command
|
||||
@@ -414,7 +414,7 @@ class Postgresql(object):
|
||||
return data_directory_is_empty(self._data_dir)
|
||||
|
||||
def replica_method_options(self, method: str) -> Dict[str, Any]:
|
||||
return deepcopy(self.config.get(method, {}) or {})
|
||||
return deepcopy(self.config.get(method, {}) or EMPTY_DICT.copy())
|
||||
|
||||
def replica_method_can_work_without_replication_connection(self, method: str) -> bool:
|
||||
return method != 'basebackup' and bool(self.replica_method_options(method).get('no_master')
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
import logging
|
||||
import sys
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Iterator
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
if sys.version_info < (3, 9):
|
||||
if sys.version_info < (3, 9): # pragma: no cover
|
||||
from pathlib import Path
|
||||
|
||||
PathLikeObj = Path
|
||||
conf_dir = Path(__file__).parent
|
||||
else:
|
||||
|
||||
@@ -7,6 +7,7 @@ import time
|
||||
from typing import Any, Callable, Dict, List, Optional, Union, Tuple, TYPE_CHECKING
|
||||
|
||||
from ..async_executor import CriticalTask
|
||||
from ..collections import EMPTY_DICT
|
||||
from ..dcs import Leader, Member, RemoteMember
|
||||
from ..psycopg import quote_ident, quote_literal
|
||||
from ..utils import deep_compare, unquote
|
||||
@@ -146,7 +147,7 @@ class Bootstrap(object):
|
||||
|
||||
# make sure there is no trigger file or postgres will be automatically promoted
|
||||
trigger_file = self._postgresql.config.triggerfile_good_name
|
||||
trigger_file = (self._postgresql.config.get('recovery_conf') or {}).get(trigger_file) or 'promote'
|
||||
trigger_file = (self._postgresql.config.get('recovery_conf') or EMPTY_DICT).get(trigger_file) or 'promote'
|
||||
trigger_file = os.path.abspath(os.path.join(self._postgresql.data_dir, trigger_file))
|
||||
if os.path.exists(trigger_file):
|
||||
os.unlink(trigger_file)
|
||||
@@ -441,7 +442,7 @@ END;$$""".format(f, quote_ident(rewind['username'], postgresql.connection()))
|
||||
if config.get('users'):
|
||||
logger.warning('User creation via "bootstrap.users" will be removed in v4.0.0')
|
||||
|
||||
for name, value in (config.get('users') or {}).items():
|
||||
for name, value in (config.get('users') or EMPTY_DICT).items():
|
||||
if all(name != a.get('username') for a in (superuser, replication, rewind)):
|
||||
self.create_or_update_role(name, value.get('password'), value.get('options', []))
|
||||
|
||||
|
||||
@@ -100,7 +100,8 @@ class CancellableSubprocess(CancellableExecutor):
|
||||
|
||||
if started and self._process is not None:
|
||||
if isinstance(communicate, dict):
|
||||
communicate['stdout'], communicate['stderr'] = self._process.communicate(input_data)
|
||||
communicate['stdout'], communicate['stderr'] = \
|
||||
self._process.communicate(input_data) # pyright: ignore [reportGeneralTypeIssues]
|
||||
return self._process.wait()
|
||||
finally:
|
||||
with self._lock:
|
||||
|
||||
@@ -13,7 +13,7 @@ from typing import Any, Callable, Collection, Dict, Iterator, List, Optional, Un
|
||||
|
||||
from .validator import recovery_parameters, transform_postgresql_parameter_value, transform_recovery_parameter_value
|
||||
from .. import global_config
|
||||
from ..collections import CaseInsensitiveDict, CaseInsensitiveSet
|
||||
from ..collections import CaseInsensitiveDict, CaseInsensitiveSet, EMPTY_DICT
|
||||
from ..dcs import Leader, Member, RemoteMember, slot_name_from_member_name
|
||||
from ..exceptions import PatroniFatalException, PostgresConnectionException
|
||||
from ..file_perm import pg_perm
|
||||
@@ -619,7 +619,8 @@ class ConfigHandler(object):
|
||||
fd.write_param(name, value)
|
||||
|
||||
def build_recovery_params(self, member: Union[Leader, Member, None]) -> CaseInsensitiveDict:
|
||||
recovery_params = CaseInsensitiveDict({p: v for p, v in (self.get('recovery_conf') or {}).items()
|
||||
default: Dict[str, Any] = {}
|
||||
recovery_params = CaseInsensitiveDict({p: v for p, v in (self.get('recovery_conf') or default).items()
|
||||
if not p.lower().startswith('recovery_target')
|
||||
and p.lower() not in ('primary_conninfo', 'primary_slot_name')})
|
||||
recovery_params.update({'standby_mode': 'on', 'recovery_target_timeline': 'latest'})
|
||||
@@ -845,7 +846,7 @@ class ConfigHandler(object):
|
||||
required['restart' if mtype else 'reload'] += 1
|
||||
|
||||
wanted_recovery_params = self.build_recovery_params(member)
|
||||
for param, value in (self._current_recovery_params or {}).items():
|
||||
for param, value in (self._current_recovery_params or EMPTY_DICT).items():
|
||||
# Skip certain parameters defined in the included postgres config files
|
||||
# if we know that they are not specified in the patroni configuration.
|
||||
if len(value) > 2 and value[2] not in (self._postgresql_conf, self._auto_conf) and \
|
||||
@@ -1324,4 +1325,4 @@ class ConfigHandler(object):
|
||||
return self._config.get(key, default)
|
||||
|
||||
def restore_command(self) -> Optional[str]:
|
||||
return (self.get('recovery_conf') or {}).get('restore_command')
|
||||
return (self.get('recovery_conf') or EMPTY_DICT).get('restore_command')
|
||||
|
||||
@@ -299,6 +299,8 @@ def iter_mpp_classes(
|
||||
|
||||
:yields: tuples, each containing the module ``name`` and the imported MPP class object.
|
||||
"""
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
assert isinstance(__package__, str)
|
||||
yield from iter_classes(__package__, AbstractMPP, config)
|
||||
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@ from . import Postgresql
|
||||
from .connection import get_connection_cursor
|
||||
from .misc import format_lsn, fsync_dir, parse_history, parse_lsn
|
||||
from ..async_executor import CriticalTask
|
||||
from ..collections import EMPTY_DICT
|
||||
from ..dcs import Leader, RemoteMember
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -418,7 +419,7 @@ class Rewind(object):
|
||||
dsn = self._postgresql.config.format_dsn(r, True)
|
||||
logger.info('running pg_rewind from %s', dsn)
|
||||
|
||||
restore_command = (self._postgresql.config.get('recovery_conf') or {}).get('restore_command') \
|
||||
restore_command = (self._postgresql.config.get('recovery_conf') or EMPTY_DICT).get('restore_command') \
|
||||
if self._postgresql.major_version < 120000 else self._postgresql.get_guc_value('restore_command')
|
||||
|
||||
# Until v15 pg_rewind expected postgresql.conf to be inside $PGDATA, which is not the case on e.g. Debian
|
||||
|
||||
@@ -42,7 +42,8 @@ try:
|
||||
value.prepare(conn)
|
||||
return value.getquoted().decode('utf-8')
|
||||
except ImportError:
|
||||
from psycopg import connect as __connect, sql, Error, DatabaseError, OperationalError, ProgrammingError
|
||||
from psycopg import connect as __connect # pyright: ignore [reportUnknownVariableType]
|
||||
from psycopg import sql, Error, DatabaseError, OperationalError, ProgrammingError
|
||||
|
||||
def _connect(dsn: Optional[str] = None, **kwargs: Any) -> 'Connection[Any]':
|
||||
"""Call :func:`psycopg.connect` with *dsn* and ``**kwargs``.
|
||||
@@ -56,7 +57,7 @@ except ImportError:
|
||||
|
||||
:returns: a connection to the database.
|
||||
"""
|
||||
ret = __connect(dsn or "", **kwargs)
|
||||
ret: 'Connection[Any]' = __connect(dsn or "", **kwargs)
|
||||
setattr(ret, 'server_version', ret.pgconn.server_version) # compatibility with psycopg2
|
||||
return ret
|
||||
|
||||
|
||||
@@ -11,8 +11,7 @@ import socket
|
||||
|
||||
from typing import Any, Dict, Union, Iterator, List, Optional as OptionalType, Tuple, TYPE_CHECKING
|
||||
|
||||
from .collections import CaseInsensitiveSet
|
||||
|
||||
from .collections import CaseInsensitiveSet, EMPTY_DICT
|
||||
from .dcs import dcs_modules
|
||||
from .exceptions import ConfigParseError
|
||||
from .utils import parse_int, split_host_port, data_directory_is_empty, get_major_version
|
||||
@@ -245,7 +244,7 @@ def get_bin_name(bin_name: str) -> str:
|
||||
"""
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
assert isinstance(schema.data, dict)
|
||||
return (schema.data.get('postgresql', {}).get('bin_name', {}) or {}).get(bin_name, bin_name)
|
||||
return (schema.data.get('postgresql', {}).get('bin_name', {}) or EMPTY_DICT).get(bin_name, bin_name)
|
||||
|
||||
|
||||
def validate_data_dir(data_dir: str) -> bool:
|
||||
|
||||
@@ -160,12 +160,10 @@ class TestConfig(unittest.TestCase):
|
||||
@patch('patroni.config.logger')
|
||||
def test__validate_failover_tags(self, mock_logger, mock_get):
|
||||
"""Ensures that only one of `nofailover` or `failover_priority` can be provided"""
|
||||
config = Config("postgres0.yml")
|
||||
|
||||
# Providing one of `nofailover` or `failover_priority` is fine
|
||||
for single_param in ({"nofailover": True}, {"failover_priority": 1}, {"failover_priority": 0}):
|
||||
mock_get.side_effect = [single_param] * 2
|
||||
self.assertIsNone(config._validate_failover_tags())
|
||||
self.assertIsNone(self.config._validate_failover_tags())
|
||||
mock_logger.warning.assert_not_called()
|
||||
|
||||
# Providing both `nofailover` and `failover_priority` is fine if consistent
|
||||
@@ -175,7 +173,7 @@ class TestConfig(unittest.TestCase):
|
||||
{"nofailover": "False", "failover_priority": 0}
|
||||
):
|
||||
mock_get.side_effect = [consistent_state] * 2
|
||||
self.assertIsNone(config._validate_failover_tags())
|
||||
self.assertIsNone(self.config._validate_failover_tags())
|
||||
mock_logger.warning.assert_not_called()
|
||||
|
||||
# Providing both inconsistently should log a warning
|
||||
@@ -186,7 +184,7 @@ class TestConfig(unittest.TestCase):
|
||||
{"nofailover": "", "failover_priority": 0}
|
||||
):
|
||||
mock_get.side_effect = [inconsistent_state] * 2
|
||||
self.assertIsNone(config._validate_failover_tags())
|
||||
self.assertIsNone(self.config._validate_failover_tags())
|
||||
mock_logger.warning.assert_called_once_with(
|
||||
'Conflicting configuration between nofailover: %s and failover_priority: %s.'
|
||||
+ ' Defaulting to nofailover: %s',
|
||||
|
||||
@@ -80,7 +80,7 @@ def mock_namespaced_kind(*args, **kwargs):
|
||||
|
||||
|
||||
def mock_load_k8s_config(self, *args, **kwargs):
|
||||
self._server = ''
|
||||
self._server = 'http://localhost'
|
||||
|
||||
|
||||
class TestK8sConfig(unittest.TestCase):
|
||||
@@ -242,6 +242,7 @@ class BaseTestKubernetes(unittest.TestCase):
|
||||
self.k.get_cluster()
|
||||
|
||||
|
||||
@patch('urllib3.PoolManager.request', Mock())
|
||||
@patch.object(k8s_client.CoreV1Api, 'patch_namespaced_config_map', mock_namespaced_kind, create=True)
|
||||
class TestKubernetesConfigMaps(BaseTestKubernetes):
|
||||
|
||||
@@ -374,6 +375,7 @@ class TestKubernetesConfigMaps(BaseTestKubernetes):
|
||||
mock_warning.assert_called_once()
|
||||
|
||||
|
||||
@patch('urllib3.PoolManager.request', Mock())
|
||||
class TestKubernetesEndpointsNoPodIP(BaseTestKubernetes):
|
||||
@patch.object(k8s_client.CoreV1Api, 'list_namespaced_endpoints', mock_list_namespaced_endpoints, create=True)
|
||||
def setUp(self, config=None):
|
||||
@@ -388,6 +390,7 @@ class TestKubernetesEndpointsNoPodIP(BaseTestKubernetes):
|
||||
self.assertEqual(args[2].subsets[0].addresses[0].ip, '10.0.0.1')
|
||||
|
||||
|
||||
@patch('urllib3.PoolManager.request', Mock())
|
||||
class TestKubernetesEndpoints(BaseTestKubernetes):
|
||||
|
||||
@patch.object(k8s_client.CoreV1Api, 'list_namespaced_endpoints', mock_list_namespaced_endpoints, create=True)
|
||||
@@ -478,6 +481,7 @@ def mock_watch(*args):
|
||||
return urllib3.HTTPResponse()
|
||||
|
||||
|
||||
@patch('urllib3.PoolManager.request', Mock())
|
||||
class TestCacheBuilder(BaseTestKubernetes):
|
||||
|
||||
@patch.object(k8s_client.CoreV1Api, 'list_namespaced_config_map', mock_list_namespaced_config_map, create=True)
|
||||
|
||||
Reference in New Issue
Block a user