From 7543e64000e897e36be46d162e3770195149235d Mon Sep 17 00:00:00 2001 From: Polina Bungina <27892524+hughcapet@users.noreply.github.com> Date: Fri, 14 Mar 2025 09:53:56 +0100 Subject: [PATCH] Convert states to enums (#3293) - Postgresql._state - pg_isready state --- patroni/api.py | 23 ++++----- patroni/ctl.py | 9 ++-- patroni/dcs/__init__.py | 6 ++- patroni/dcs/consul.py | 5 +- patroni/dcs/kubernetes.py | 8 +-- patroni/ha.py | 22 ++++---- patroni/postgresql/__init__.py | 91 ++++++++++++++++++--------------- patroni/postgresql/bootstrap.py | 13 ++--- patroni/postgresql/config.py | 8 +-- patroni/postgresql/misc.py | 25 +++++++++ patroni/postgresql/mpp/citus.py | 4 +- patroni/postgresql/sync.py | 4 +- patroni/utils.py | 5 +- tests/__init__.py | 7 +-- tests/test_api.py | 10 ++-- tests/test_bootstrap.py | 3 +- tests/test_consul.py | 12 +++-- tests/test_ctl.py | 6 ++- tests/test_ha.py | 5 +- tests/test_kubernetes.py | 21 ++++---- tests/test_patroni.py | 3 +- tests/test_postgresql.py | 45 ++++++++-------- tests/test_slots.py | 18 +++---- 23 files changed, 202 insertions(+), 151 deletions(-) diff --git a/patroni/api.py b/patroni/api.py index 73d9b1c9..f2effde5 100644 --- a/patroni/api.py +++ b/patroni/api.py @@ -30,7 +30,7 @@ from . import global_config, psycopg from .__main__ import Patroni from .dcs import Cluster from .exceptions import PostgresConnectionException, PostgresException -from .postgresql.misc import postgres_version_to_int +from .postgresql.misc import postgres_version_to_int, PostgresqlState from .utils import cluster_as_json, deep_compare, enable_keepalive, parse_bool, \ parse_int, patch_config, Retry, RetryFailedError, split_host_port, tzutc, uri @@ -324,7 +324,7 @@ class RestApiHandler(BaseHTTPRequestHandler): is_lagging = leader_optime and leader_optime > replayed_location + max_replica_lag replica_status_code = 200 if not patroni.noloadbalance and not is_lagging and \ - response.get('role') == 'replica' and response.get('state') == 'running' else 503 + response.get('role') == 'replica' and response.get('state') == PostgresqlState.RUNNING else 503 if not cluster and response.get('pause'): leader_status_code = 200 if response.get('role') in ('primary', 'standby_leader') else 503 @@ -358,7 +358,7 @@ class RestApiHandler(BaseHTTPRequestHandler): elif 'read-only' in path and 'sync' not in path and 'quorum' not in path: status_code = 200 if 200 in (primary_status_code, standby_leader_status_code) else replica_status_code elif 'health' in path: - status_code = 200 if response.get('state') == 'running' else 503 + status_code = 200 if response.get('state') == PostgresqlState.RUNNING else 503 elif cluster: # dcs is available is_quorum = response.get('quorum_standby') is_synchronous = response.get('sync_standby') @@ -462,7 +462,7 @@ class RestApiHandler(BaseHTTPRequestHandler): patroni = self.server.patroni if patroni.ha.is_leader(): status_code = 200 - elif patroni.postgresql.state == 'running': + elif patroni.postgresql.state == PostgresqlState.RUNNING: status_code = 200 if patroni.dcs.cluster else 503 else: status_code = 503 @@ -573,7 +573,8 @@ class RestApiHandler(BaseHTTPRequestHandler): metrics.append("# HELP patroni_postgres_running Value is 1 if Postgres is running, 0 otherwise.") metrics.append("# TYPE patroni_postgres_running gauge") - metrics.append("patroni_postgres_running{0} {1}".format(labels, int(postgres['state'] == 'running'))) + metrics.append("patroni_postgres_running{0} {1}".format( + labels, int(postgres['state'] == PostgresqlState.RUNNING))) metrics.append("# HELP patroni_postmaster_start_time Epoch seconds since Postgres started.") metrics.append("# TYPE patroni_postmaster_start_time gauge") @@ -889,7 +890,7 @@ class RestApiHandler(BaseHTTPRequestHandler): If it's not able to parse the request body, then the request is silently discarded. """ status_code = 500 - data = 'restart failed' + data = PostgresqlState.RESTART_FAILED request = self._read_json_content(body_is_optional=True) cluster = self.server.patroni.dcs.get_cluster() if request is None: @@ -1263,10 +1264,7 @@ class RestApiHandler(BaseHTTPRequestHandler): :returns: a dict with the status of Postgres/Patroni. The keys are: - * ``state``: Postgres state among ``stopping``, ``stopped``, ``stop failed``, ``crashed``, ``running``, - ``starting``, ``start failed``, ``restarting``, ``restart failed``, ``initializing new cluster``, - ``initdb failed``, ``running custom bootstrap script``, ``starting after custom bootstrap``, - ``custom bootstrap failed``, ``creating replica``, or ``unknown``; + * ``state``: one of :class:`~patroni.postgresql.misc.PostgresqlState` or ``unknown``; * ``postmaster_start_time``: ``pg_postmaster_start_time()``; * ``role``: ``replica`` or ``primary`` based on ``pg_is_in_recovery()`` output; * ``server_version``: Postgres version without periods, e.g. ``150002`` for Postgres ``15.2``; @@ -1308,7 +1306,8 @@ class RestApiHandler(BaseHTTPRequestHandler): config = global_config.from_cluster(cluster) try: - if postgresql.state not in ('running', 'restarting', 'starting'): + if postgresql.state not in (PostgresqlState.RUNNING, PostgresqlState.RESTARTING, + PostgresqlState.STARTING): raise RetryFailedError('') replication_state = ("pg_catalog.pg_{0}_{1}_diff(wr.latest_end_lsn, '0/0')::bigint, wr.status" if postgresql.major_version >= 90600 else "NULL, NULL") + ", " +\ @@ -1365,7 +1364,7 @@ class RestApiHandler(BaseHTTPRequestHandler): except (psycopg.Error, RetryFailedError, PostgresConnectionException): state = postgresql.state - if state == 'running': + if state == PostgresqlState.RUNNING: logger.exception('get_postgresql_status') state = 'unknown' result: Dict[str, Any] = {'state': state, 'role': postgresql.role} diff --git a/patroni/ctl.py b/patroni/ctl.py index ea28c56c..5c363d1d 100644 --- a/patroni/ctl.py +++ b/patroni/ctl.py @@ -64,7 +64,7 @@ from . import global_config from .config import Config from .dcs import AbstractDCS, Cluster, get_dcs as _get_dcs, Member from .exceptions import PatroniException -from .postgresql.misc import postgres_version_to_int +from .postgresql.misc import postgres_version_to_int, PostgresqlState from .postgresql.mpp import get_mpp from .request import PatroniRequest from .utils import cluster_as_json, patch_config, polling_loop @@ -1210,7 +1210,7 @@ def reinit(cluster_name: str, group: Optional[int], member_names: List[str], for time.sleep(2) for member in wait_on_members: data = json.loads(request_patroni(member, 'get', 'patroni').data.decode('utf-8')) - if data.get('state') != 'creating replica': + if data.get('state') != PostgresqlState.CREATING_REPLICA: click.echo('Reinitialize is completed on: {0}'.format(member.name)) wait_on_members.remove(member) @@ -1533,10 +1533,7 @@ def output_members(cluster: Cluster, name: str, extended: bool = False, * ``Member``: name of the Patroni node, as per ``name`` configuration; * ``Host``: hostname (or IP) and port, as per ``postgresql.listen`` configuration; * ``Role``: ``Leader``, ``Standby Leader``, ``Sync Standby`` or ``Replica``; - * ``State``: ``stopping``, ``stopped``, ``stop failed``, ``crashed``, ``running``, ``starting``, - ``start failed``, ``restarting``, ``restart failed``, ``initializing new cluster``, ``initdb failed``, - ``running custom bootstrap script``, ``starting after custom bootstrap``, ``custom bootstrap failed``, - ``creating replica``, ``streaming``, ``in archive recovery``, and so on; + * ``State``: one of :class:`~patroni.postgresql.misc.PostgresqlState`; * ``TL``: current timeline in Postgres; ``Lag in MB``: replication lag. diff --git a/patroni/dcs/__init__.py b/patroni/dcs/__init__.py index 7414c1eb..1aeb5eae 100644 --- a/patroni/dcs/__init__.py +++ b/patroni/dcs/__init__.py @@ -295,8 +295,10 @@ class Member(Tags, NamedTuple('Member', @property def is_running(self) -> bool: - """``True`` if the member :attr:`~Member.state` is ``running``.""" - return self.state == 'running' + """``True`` if the member :attr:`~Member.state` is :class:`~patroni.postgresql.misc.PostgresqlState.RUNNING`.""" + from ..postgresql.misc import PostgresqlState + + return self.state == PostgresqlState.RUNNING @property def patroni_version(self) -> Optional[Tuple[int, ...]]: diff --git a/patroni/dcs/consul.py b/patroni/dcs/consul.py index dbf3aff1..db4738d7 100644 --- a/patroni/dcs/consul.py +++ b/patroni/dcs/consul.py @@ -19,6 +19,7 @@ from consul import base, Check, ConsulException, NotFound from urllib3.exceptions import HTTPError from ..exceptions import DCSError +from ..postgresql.misc import PostgresqlState from ..postgresql.mpp import AbstractMPP from ..utils import deep_compare, parse_bool, Retry, RetryFailedError, split_host_port, uri, USER_AGENT from . import AbstractDCS, catch_return_false_exception, Cluster, ClusterConfig, \ @@ -546,13 +547,13 @@ class Consul(AbstractDCS): 'enable_tag_override': True, } - if state == 'stopped' or (not self._register_service and self._previous_loop_register_service): + if state == PostgresqlState.STOPPED or (not self._register_service and self._previous_loop_register_service): self._previous_loop_register_service = self._register_service return self.deregister_service(params['service_id']) self._previous_loop_register_service = self._register_service if role in ['primary', 'replica', 'standby-leader']: - if state != 'running': + if state != PostgresqlState.RUNNING: return return self.register_service(service_name, **params) diff --git a/patroni/dcs/kubernetes.py b/patroni/dcs/kubernetes.py index 0200e27e..77fc9590 100644 --- a/patroni/dcs/kubernetes.py +++ b/patroni/dcs/kubernetes.py @@ -23,6 +23,7 @@ from urllib3.exceptions import HTTPError from ..collections import EMPTY_DICT from ..exceptions import DCSError +from ..postgresql.misc import PostgresqlState from ..postgresql.mpp import AbstractMPP from ..utils import deep_compare, iter_response_objects, \ keepalive_socket_options, Retry, RetryFailedError, tzutc, uri, USER_AGENT @@ -1320,7 +1321,7 @@ class Kubernetes(AbstractDCS): if cluster and cluster.leader and cluster.leader.name == self._name: role = self._standby_leader_label_value if data['role'] == 'standby_leader' else self._leader_label_value tmp_role = 'primary' - elif data['state'] == 'running' and data['role'] != 'primary': + elif data['state'] == PostgresqlState.RUNNING and data['role'] != 'primary': role = {'replica': self._follower_label_value}.get(data['role'], data['role']) tmp_role = data['role'] else: @@ -1332,9 +1333,8 @@ class Kubernetes(AbstractDCS): updated_labels[self._tmp_role_label] = tmp_role if self._bootstrap_labels: - if data['state'] in ('initializing new cluster', - 'running custom bootstrap script', 'starting after custom bootstrap', - 'creating replica'): + if data['state'] in (PostgresqlState.INITDB, PostgresqlState.CUSTOM_BOOTSTRAP, + PostgresqlState.BOOTSTRAP_STARTING, PostgresqlState.CREATING_REPLICA): updated_labels.update(self._bootstrap_labels) else: updated_labels.update({k: None for k, _ in self._bootstrap_labels.items()}) diff --git a/patroni/ha.py b/patroni/ha.py index 89ecf04f..f136ea55 100644 --- a/patroni/ha.py +++ b/patroni/ha.py @@ -17,7 +17,7 @@ from .collections import CaseInsensitiveSet from .dcs import AbstractDCS, Cluster, Leader, Member, RemoteMember, Status, SyncState from .exceptions import DCSError, PatroniFatalException, PostgresConnectionException from .postgresql.callback_executor import CallbackAction -from .postgresql.misc import postgres_version_to_int +from .postgresql.misc import postgres_version_to_int, PostgresqlState from .postgresql.postmaster import PostmasterProcess from .postgresql.rewind import Rewind from .quorum import QuorumStateResolver @@ -458,7 +458,8 @@ class Ha(object): data['pending_restart'] = True data['pending_restart_reason'] = dict(self.state_handler.pending_restart_reason) if self._async_executor.scheduled_action in (None, 'promote') \ - and data['state'] in ['running', 'restarting', 'starting']: + and data['state'] in [PostgresqlState.RUNNING, PostgresqlState.RESTARTING, + PostgresqlState.STARTING]: try: timeline, wal_position, pg_control_timeline = self.state_handler.timeline_wal_position() data['xlog_location'] = self._last_wal_lsn = wal_position @@ -495,7 +496,7 @@ class Ha(object): ret = self.dcs.touch_member(data) if ret: new_state = (data['state'], data['role']) - if self._last_state != new_state and new_state == ('running', 'primary'): + if self._last_state != new_state and new_state == (PostgresqlState.RUNNING, 'primary'): self.notify_mpp_coordinator('after_promote') self._last_state = new_state return ret @@ -636,7 +637,7 @@ class Ha(object): if timeout\ and data.get('Database cluster state') in ('in production', 'in crash recovery', 'shutting down', 'shut down')\ - and self.state_handler.state == 'crashed'\ + and self.state_handler.state == PostgresqlState.CRASHED\ and self.state_handler.role == 'primary'\ and not self.state_handler.config.recovery_conf_exists(): # We know 100% that we were running as a primary a few moments ago, therefore could just start postgres @@ -1158,7 +1159,7 @@ class Ha(object): :returns: the reason why caller shouldn't continue as a primary or the current value of received/replayed LSN. """ - if self.state_handler.state == 'running' and self.state_handler.role == 'primary': + if self.state_handler.state == PostgresqlState.RUNNING and self.state_handler.role == 'primary': return 'Running as a leader' self._failsafe.update(data) return self._last_wal_lsn @@ -1898,7 +1899,7 @@ class Ha(object): elif res is None: return (False, 'postgres is still starting') else: - return (False, 'restart failed') + return (False, PostgresqlState.RESTART_FAILED) def _do_reinitialize(self, cluster: Cluster) -> Optional[bool]: self.state_handler.stop('immediate', stop_timeout=self.patroni.config['retry_timeout']) @@ -2035,7 +2036,8 @@ class Ha(object): if not self.state_handler.check_for_startup() or self.is_paused(): self.set_start_timeout(None) if self.is_paused(): - self.state_handler.set_state(self.state_handler.is_running() and 'running' or 'stopped') + self.state_handler.set_state(PostgresqlState.RUNNING if self.state_handler.is_running() + else PostgresqlState.STOPPED) return None # state_handler.state == 'starting' here @@ -2201,7 +2203,7 @@ class Ha(object): if not self.state_handler.is_healthy(): if self.is_paused(): - self.state_handler.set_state('stopped') + self.state_handler.set_state(PostgresqlState.STOPPED) if self.has_lock(): self._delete_leader() return 'removed leader lock because postgres is not running' @@ -2214,8 +2216,8 @@ class Ha(object): (self._rewind.is_needed and self._rewind.can_rewind_or_reinitialize_allowed): return 'postgres is not running' - if self.state_handler.state in ('running', 'starting'): - self.state_handler.set_state('crashed') + if self.state_handler.state in (PostgresqlState.RUNNING, PostgresqlState.STARTING): + self.state_handler.set_state(PostgresqlState.CRASHED) # try to start dead postgres return self.recover() diff --git a/patroni/postgresql/__init__.py b/patroni/postgresql/__init__.py index f06cf4e7..74a9520e 100644 --- a/patroni/postgresql/__init__.py +++ b/patroni/postgresql/__init__.py @@ -9,6 +9,7 @@ import time from contextlib import contextmanager from copy import deepcopy from datetime import datetime +from enum import IntEnum from threading import current_thread, Lock from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, TYPE_CHECKING, Union @@ -27,7 +28,7 @@ from .callback_executor import CallbackAction, CallbackExecutor from .cancellable import CancellableSubprocess from .config import ConfigHandler, mtime from .connection import ConnectionPool, get_connection_cursor -from .misc import parse_history, parse_lsn, postgres_major_version_to_int +from .misc import parse_history, parse_lsn, postgres_major_version_to_int, PostgresqlState from .mpp import AbstractMPP from .postmaster import PostmasterProcess from .slots import SlotsHandler @@ -39,14 +40,24 @@ if TYPE_CHECKING: # pragma: no cover logger = logging.getLogger(__name__) -STATE_RUNNING = 'running' -STATE_REJECT = 'rejecting connections' -STATE_NO_RESPONSE = 'not responding' -STATE_UNKNOWN = 'unknown' - STOP_POLLING_INTERVAL = 1 +class PgIsReadyStatus(IntEnum): + """Possible PostgreSQL connection status ``pg_isready`` utility can report. + + :cvar RUNNING: return code 0. PostgreSQL is accepting connections normally. + :cvar REJECT: return code 1. PostgreSQL is rejecting connections. + :cvar NO_RESPONSE: return code 2. There was no response to the connection attempt. + :cvar UNKNOWN: Return code 3. No connection attempt was made, something went wrong. + """ + + RUNNING = 0 + REJECT = 1 + NO_RESPONSE = 2 + UNKNOWN = 3 + + @contextmanager def null_context(): yield @@ -76,7 +87,7 @@ class Postgresql(object): self._major_version = self.get_major_version() self._state_lock = Lock() - self.set_state('stopped') + self.set_state(PostgresqlState.STOPPED) self._pending_restart_reason = CaseInsensitiveDict() self.connection_pool = ConnectionPool() @@ -123,10 +134,10 @@ class Postgresql(object): if self.is_running(): # If we found postmaster process we need to figure out whether postgres is accepting connections - self.set_state('starting') + self.set_state(PostgresqlState.STARTING) self.check_startup_state_changed() - if self.state == 'running': # we are "joining" already running postgres + if self.state == PostgresqlState.RUNNING: # we are "joining" already running postgres # we know that PostgreSQL is accepting connections and can read some GUC's from pg_settings self.config.load_current_server_parameters() @@ -303,10 +314,10 @@ class Postgresql(object): initdb = [self.pgcommand('initdb')] + list(args) + [self.data_dir] return subprocess.call(initdb, **kwargs) == 0 - def pg_isready(self) -> str: + def pg_isready(self) -> PgIsReadyStatus: """Runs pg_isready to see if PostgreSQL is accepting connections. - :returns: 'ok' if PostgreSQL is up, 'reject' if starting up, 'no_response' if not up.""" + :returns: one of :class:`PgIsReadyStatus` values.""" r = self.connection_pool.conn_kwargs cmd = [self.pgcommand('pg_isready'), '-p', r['port'], '-d', self._database] @@ -320,11 +331,10 @@ class Postgresql(object): cmd.extend(['-U', r['user']]) ret = subprocess.call(cmd) - return_codes = {0: STATE_RUNNING, - 1: STATE_REJECT, - 2: STATE_NO_RESPONSE, - 3: STATE_UNKNOWN} - return return_codes.get(ret, STATE_UNKNOWN) + try: + return PgIsReadyStatus(ret) + except ValueError: + return PgIsReadyStatus.UNKNOWN def reload_config(self, config: Dict[str, Any], sighup: bool = False) -> None: self.config.reload_config(config, sighup) @@ -388,7 +398,7 @@ class Postgresql(object): try: return self._connection.query(sql, *params) except PostgresConnectionException as exc: - if self.state == 'restarting': + if self.state == PostgresqlState.RESTARTING: raise RetryFailedError('cluster is being restarted') from exc raise @@ -489,8 +499,8 @@ class Postgresql(object): self._cluster_info_state = cluster_info_state except RetryFailedError as e: # SELECT failed two times self._cluster_info_state = {'error': str(e)} - if not self.is_starting() and self.pg_isready() == STATE_REJECT: - self.set_state('starting') + if not self.is_starting() and self.pg_isready() == PgIsReadyStatus.REJECT: + self.set_state(PostgresqlState.STARTING) if 'error' in self._cluster_info_state: raise PostgresConnectionException(self._cluster_info_state['error']) @@ -700,11 +710,11 @@ class Postgresql(object): self._role = value @property - def state(self) -> str: + def state(self) -> PostgresqlState: with self._state_lock: return self._state - def set_state(self, value: str) -> None: + def set_state(self, value: PostgresqlState) -> None: with self._state_lock: self._state = value self._state_entry_timestamp = time.time() @@ -713,7 +723,7 @@ class Postgresql(object): return time.time() - self._state_entry_timestamp def is_starting(self) -> bool: - return self.state in ('starting', 'starting after custom bootstrap') + return self.state in (PostgresqlState.STARTING, PostgresqlState.BOOTSTRAP_STARTING) def wait_for_port_open(self, postmaster: PostmasterProcess, timeout: float) -> bool: """Waits until PostgreSQL opens ports.""" @@ -723,12 +733,12 @@ class Postgresql(object): if not postmaster.is_running(): logger.error('postmaster is not running') - self.set_state('start failed') + self.set_state(PostgresqlState.START_FAILED) return False isready = self.pg_isready() - if isready != STATE_NO_RESPONSE: - if isready not in [STATE_REJECT, STATE_RUNNING]: + if isready != PgIsReadyStatus.NO_RESPONSE: + if isready not in [PgIsReadyStatus.REJECT, PgIsReadyStatus.RUNNING]: logger.warning("Can't determine PostgreSQL startup status, assuming running") return True @@ -751,8 +761,8 @@ class Postgresql(object): # patroni. self.connection_pool.close() - state = 'starting after custom bootstrap' if self.bootstrap.running_custom_bootstrap else 'starting' - + state = (PostgresqlState.BOOTSTRAP_STARTING if self.bootstrap.running_custom_bootstrap + else PostgresqlState.STARTING) if self.is_running(): logger.error('Cannot start PostgreSQL because one is already running.') self.set_state(state) @@ -862,12 +872,12 @@ class Postgresql(object): # block_callbacks is used during restart to avoid # running start/stop callbacks in addition to restart ones if not block_callbacks: - self.set_state('stopped') + self.set_state(PostgresqlState.STOPPED) if pg_signaled: self.call_nowait(CallbackAction.ON_STOP) else: logger.warning('pg_ctl stop failed') - self.set_state('stop failed') + self.set_state(PostgresqlState.STOP_FAILED) return success def _do_stop(self, mode: str, block_callbacks: bool, checkpoint: bool, @@ -883,7 +893,7 @@ class Postgresql(object): self.checkpoint(timeout=stop_timeout) if not block_callbacks: - self.set_state('stopping') + self.set_state(PostgresqlState.STOPPING) # invoke user-directed before stop script self._before_stop() @@ -979,22 +989,22 @@ class Postgresql(object): """ ready = self.pg_isready() - if ready == STATE_REJECT: + if ready == PgIsReadyStatus.REJECT: return False - elif ready == STATE_NO_RESPONSE: + elif ready == PgIsReadyStatus.NO_RESPONSE: ret = not self.is_running() if ret: - self.set_state('start failed') + self.set_state(PostgresqlState.START_FAILED) self.slots_handler.schedule(False) # TODO: can remove this? self.config.save_configuration_files(True) # TODO: maybe remove this? return ret else: - if ready != STATE_RUNNING: + if ready != PgIsReadyStatus.RUNNING: # Bad configuration or unexpected OS error. No idea of PostgreSQL status. # Let the main loop of run cycle clean up the mess. logger.warning("%s status returned from pg_isready", - "Unknown" if ready == STATE_UNKNOWN else "Invalid") - self.set_state('running') + "Unknown" if ready == PgIsReadyStatus.UNKNOWN else "Invalid") + self.set_state(PostgresqlState.RUNNING) self.slots_handler.schedule() self.config.save_configuration_files(True) # TODO: __cb_pending can be None here after PostgreSQL restarts on its own. Do we want to call the callback? @@ -1018,7 +1028,7 @@ class Postgresql(object): return None time.sleep(1) - return self.state == 'running' + return self.state == PostgresqlState.RUNNING def restart(self, timeout: Optional[float] = None, task: Optional[CriticalTask] = None, block_callbacks: bool = False, role: Optional[str] = None, @@ -1031,13 +1041,14 @@ class Postgresql(object): :returns: True when restart was successful and timeout did not expire when waiting. """ - self.set_state('restarting') + self.set_state(PostgresqlState.RESTARTING) if not block_callbacks: self.__cb_pending = CallbackAction.ON_RESTART ret = self.stop(block_callbacks=True, before_shutdown=before_shutdown)\ and self.start(timeout, task, True, role, after_start) if not ret and not self.is_starting(): - self.set_state('restart failed ({0})'.format(self.state)) + logger.warning('restart failed (%r)', self.state) + self.set_state(PostgresqlState.RESTART_FAILED) return ret def is_healthy(self) -> bool: @@ -1059,7 +1070,7 @@ class Postgresql(object): def controldata(self) -> Dict[str, str]: """ return the contents of pg_controldata, or non-True value if pg_controldata call failed """ # Don't try to call pg_controldata during backup restore - if self._version_file_exists() and self.state != 'creating replica': + if self._version_file_exists() and self.state != PostgresqlState.CREATING_REPLICA: try: env = {**os.environ, 'LANG': 'C', 'LC_ALL': 'C'} data = subprocess.check_output([self.pgcommand('pg_controldata'), self._data_dir], env=env) diff --git a/patroni/postgresql/bootstrap.py b/patroni/postgresql/bootstrap.py index 94fa0ba7..e93a00d8 100644 --- a/patroni/postgresql/bootstrap.py +++ b/patroni/postgresql/bootstrap.py @@ -11,6 +11,7 @@ from ..collections import EMPTY_DICT from ..dcs import Leader, Member, RemoteMember from ..psycopg import quote_ident, quote_literal from ..utils import deep_compare, unquote +from .misc import PostgresqlState if TYPE_CHECKING: # pragma: no cover from . import Postgresql @@ -114,7 +115,7 @@ class Bootstrap(object): return user_options def _initdb(self, config: Any) -> bool: - self._postgresql.set_state('initializing new cluster') + self._postgresql.set_state(PostgresqlState.INITDB) not_allowed_options = ('pgdata', 'nosync', 'pwfile', 'sync-only', 'version') def error_handler(e: str) -> None: @@ -138,7 +139,7 @@ class Bootstrap(object): if ret: self._postgresql.configure_server_parameters() else: - self._postgresql.set_state('initdb failed') + self._postgresql.set_state(PostgresqlState.INITDB_FAILED) return ret def _post_restore(self) -> None: @@ -186,7 +187,7 @@ class Bootstrap(object): :returns: ``True`` if the bootstrap was successful, i.e. the execution of the custom ``command`` from *config* exited with code ``0``, ``False`` otherwise. """ - self._postgresql.set_state('running custom bootstrap script') + self._postgresql.set_state(PostgresqlState.CUSTOM_BOOTSTRAP) params = [] if config.get('no_params') else ['--scope=' + self._postgresql.scope, '--datadir=' + self._postgresql.data_dir] # Add custom parameters specified by the user @@ -196,7 +197,7 @@ class Bootstrap(object): try: logger.info('Running custom bootstrap script: %s', config['command']) if self._postgresql.cancellable.call(shlex.split(config['command']) + params) != 0: - self._postgresql.set_state('custom bootstrap failed') + self._postgresql.set_state(PostgresqlState.CUSTOM_BOOTSTRAP_FAILED) return False except Exception: logger.exception('Exception during custom bootstrap') @@ -241,7 +242,7 @@ class Bootstrap(object): loop through all methods the user supplies """ - self._postgresql.set_state('creating replica') + self._postgresql.set_state(PostgresqlState.CREATING_REPLICA) self._postgresql.schedule_sanity_checks_after_pause() is_remote_member = isinstance(clone_member, RemoteMember) @@ -322,7 +323,7 @@ class Bootstrap(object): if not method_config.get('keep_data', False) and not self._postgresql.data_directory_empty(): self._postgresql.remove_data_directory() - self._postgresql.set_state('stopped') + self._postgresql.set_state(PostgresqlState.STOPPED) return ret def basebackup(self, conn_url: str, env: Dict[str, str], options: Dict[str, Any]) -> Optional[int]: diff --git a/patroni/postgresql/config.py b/patroni/postgresql/config.py index 6b361b9c..21d3b8e4 100644 --- a/patroni/postgresql/config.py +++ b/patroni/postgresql/config.py @@ -16,11 +16,11 @@ from ..collections import CaseInsensitiveDict, CaseInsensitiveSet, EMPTY_DICT from ..dcs import Leader, Member, RemoteMember, slot_name_from_member_name from ..exceptions import PatroniFatalException, PostgresConnectionException from ..file_perm import pg_perm -from ..postgresql.misc import get_major_from_minor_version, postgres_version_to_int from ..psycopg import parse_conninfo from ..utils import compare_values, get_postgres_version, is_subpath, \ maybe_convert_from_base_unit, parse_bool, parse_int, split_host_port, uri, validate_directory from ..validator import EnumValidator, IntValidator +from .misc import get_major_from_minor_version, postgres_version_to_int, PostgresqlState from .validator import recovery_parameters, transform_postgresql_parameter_value, transform_recovery_parameter_value if TYPE_CHECKING: # pragma: no cover @@ -455,7 +455,7 @@ class ConfigHandler(object): in data directory. If it is not the case, we should use major version from the ``PG_VERSION`` file. """ - if self._postgresql.state == 'running': + if self._postgresql.state == PostgresqlState.RUNNING: try: return self._postgresql.server_version except AttributeError: @@ -1193,7 +1193,7 @@ class ConfigHandler(object): conf_changed = hba_changed = ident_changed = local_connection_address_changed = False param_diff = CaseInsensitiveDict() - if self._postgresql.state == 'running': + if self._postgresql.state == PostgresqlState.RUNNING: changes = CaseInsensitiveDict({p: v for p, v in server_parameters.items() if p not in params_skip_changes}) changes.update({p: None for p in self._server_parameters.keys() @@ -1311,7 +1311,7 @@ class ConfigHandler(object): self._server_parameters.pop('synchronous_standby_names', None) else: self._server_parameters['synchronous_standby_names'] = value - if self._postgresql.state == 'running': + if self._postgresql.state == PostgresqlState.RUNNING: self.write_postgresql_conf() self._postgresql.reload() return True diff --git a/patroni/postgresql/misc.py b/patroni/postgresql/misc.py index 089e0eb3..3fca51cf 100644 --- a/patroni/postgresql/misc.py +++ b/patroni/postgresql/misc.py @@ -2,6 +2,7 @@ import errno import logging import os +from enum import Enum from typing import Iterable, Tuple from ..exceptions import PostgresException @@ -9,6 +10,30 @@ from ..exceptions import PostgresException logger = logging.getLogger(__name__) +class PostgresqlState(str, Enum): + """Possible values of :attr:`Postgresql.state`.""" + + INITDB = 'initializing new cluster' + INITDB_FAILED = 'initdb failed' + CUSTOM_BOOTSTRAP = 'running custom bootstrap script' + CUSTOM_BOOTSTRAP_FAILED = 'custom bootstrap failed' + CREATING_REPLICA = 'creating replica' + RUNNING = 'running' + STARTING = 'starting' + BOOTSTRAP_STARTING = 'starting after custom bootstrap' + START_FAILED = 'start failed' + RESTARTING = 'restarting' + RESTART_FAILED = 'restart failed' + STOPPING = 'stopping' + STOPPED = 'stopped' + STOP_FAILED = 'stop failed' + CRASHED = 'crashed' + + def __repr__(self) -> str: + """Get a string representation of a :class:`PostgresqlState` member.""" + return self.value + + def postgres_version_to_int(pg_version: str) -> int: """Convert the server_version to integer diff --git a/patroni/postgresql/mpp/citus.py b/patroni/postgresql/mpp/citus.py index fe945084..a00de868 100644 --- a/patroni/postgresql/mpp/citus.py +++ b/patroni/postgresql/mpp/citus.py @@ -9,6 +9,7 @@ from urllib.parse import urlparse from ...dcs import Cluster from ...psycopg import connect, ProgrammingError, quote_ident from ...utils import parse_int +from ..misc import PostgresqlState from . import AbstractMPP, AbstractMPPHandler if TYPE_CHECKING: # pragma: no cover @@ -477,7 +478,8 @@ class CitusHandler(Citus, AbstractMPPHandler, Thread): for groupid, worker in cluster.workers.items(): leader = worker.leader if leader and leader.conn_url\ - and leader.data.get('role') in ('master', 'primary') and leader.data.get('state') == 'running': + and leader.data.get('role') in ('master', 'primary')\ + and leader.data.get('state') == PostgresqlState.RUNNING: self.add_task('after_promote', groupid, worker, leader.name, leader.conn_url) def find_task_by_groupid(self, groupid: int) -> Optional[int]: diff --git a/patroni/postgresql/sync.py b/patroni/postgresql/sync.py index ff435807..2fc2aaa4 100644 --- a/patroni/postgresql/sync.py +++ b/patroni/postgresql/sync.py @@ -9,6 +9,7 @@ from .. import global_config from ..collections import CaseInsensitiveDict, CaseInsensitiveSet from ..dcs import Cluster from ..psycopg import quote_ident +from .misc import PostgresqlState if TYPE_CHECKING: # pragma: no cover from . import Postgresql @@ -413,7 +414,8 @@ END;$$""") sync_param = f'{prefix}{num} ({sync_param})' if not (self._postgresql.config.set_synchronous_standby_names(sync_param) - and self._postgresql.state == 'running' and self._postgresql.is_primary()) or has_asterisk: + and self._postgresql.state == PostgresqlState.RUNNING + and self._postgresql.is_primary()) or has_asterisk: return time.sleep(0.1) # Usually it takes 1ms to reload postgresql.conf, but we will give it 100ms diff --git a/patroni/utils.py b/patroni/utils.py index 9800715e..42c0b49c 100644 --- a/patroni/utils.py +++ b/patroni/utils.py @@ -923,10 +923,7 @@ def cluster_as_json(cluster: 'Cluster') -> Dict[str, Any]: * ``name``: the name of the host (unique in the cluster). The ``members`` list is sorted by this key; * ``role``: ``leader``, ``standby_leader``, ``sync_standby``, ``quorum_standby``, or ``replica``; - * ``state``: ``stopping``, ``stopped``, ``stop failed``, ``crashed``, ``running``, ``starting``, - ``start failed``, ``restarting``, ``restart failed``, ``initializing new cluster``, ``initdb failed``, - ``running custom bootstrap script``, ``starting after custom bootstrap``, ``custom bootstrap failed``, - or ``creating replica``; + * ``state``: one of :class:`~patroni.postgresql.misc.PostgresqlState`; * ``api_url``: REST API URL based on ``restapi->connect_address`` configuration; * ``host``: PostgreSQL host based on ``postgresql->connect_address``; * ``port``: PostgreSQL port based on ``postgresql->connect_address``; diff --git a/tests/__init__.py b/tests/__init__.py index c84436b4..018b7990 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -12,6 +12,7 @@ import patroni.psycopg as psycopg from patroni.dcs import Leader, Member from patroni.postgresql import Postgresql from patroni.postgresql.config import ConfigHandler +from patroni.postgresql.misc import PostgresqlState from patroni.postgresql.mpp import get_mpp from patroni.utils import RetryFailedError, tzutc @@ -283,13 +284,13 @@ class BaseTestPostgresql(PostgresInit): if not os.path.exists(self.p.data_dir): os.makedirs(self.p.data_dir) - self.leadermem = Member(0, 'leader', 28, {'xlog_location': 100, 'state': 'running', + self.leadermem = Member(0, 'leader', 28, {'xlog_location': 100, 'state': PostgresqlState.RUNNING, 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5435/postgres'}) self.leader = Leader(-1, 28, self.leadermem) self.other = Member(0, 'test-1', 28, {'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5433/postgres', - 'state': 'running', 'tags': {'replicatefrom': 'leader'}}) + 'state': PostgresqlState.RUNNING, 'tags': {'replicatefrom': 'leader'}}) self.me = Member(0, 'test0', 28, { - 'state': 'running', 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5434/postgres'}) + 'state': PostgresqlState.RUNNING, 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5434/postgres'}) def tearDown(self): if os.path.exists(self.p.data_dir): diff --git a/tests/test_api.py b/tests/test_api.py index 63cffad3..65d1c20e 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -14,6 +14,7 @@ from patroni.dcs import ClusterConfig, Member from patroni.exceptions import PostgresConnectionException from patroni.ha import _MemberStatus from patroni.postgresql.config import get_param_diff +from patroni.postgresql.misc import PostgresqlState from patroni.psycopg import OperationalError from patroni.utils import RetryFailedError, tzutc @@ -49,7 +50,7 @@ class MockPostgresql: connection_pool = MockConnectionPool() name = 'test' - state = 'running' + state = PostgresqlState.RUNNING role = 'primary' server_version = 90625 major_version = 90600 @@ -215,7 +216,8 @@ class TestRestApiHandler(unittest.TestCase): MockRestApiServer(RestApiHandler, 'GET /replica') with patch.object(RestApiHandler, 'get_postgresql_status', Mock(return_value={'role': 'primary'})): MockRestApiServer(RestApiHandler, 'GET /replica') - with patch.object(RestApiHandler, 'get_postgresql_status', Mock(return_value={'state': 'running'})): + with patch.object(RestApiHandler, 'get_postgresql_status', + Mock(return_value={'state': PostgresqlState.RUNNING})): MockRestApiServer(RestApiHandler, 'GET /health') MockRestApiServer(RestApiHandler, 'GET /leader') with patch.object(RestApiHandler, 'get_postgresql_status', @@ -349,10 +351,10 @@ class TestRestApiHandler(unittest.TestCase): self.assertIsNotNone(MockRestApiServer(RestApiHandler, 'GET /readiness HTTP/1.0')) with patch.object(MockHa, 'is_leader', Mock(return_value=True)): self.assertIsNotNone(MockRestApiServer(RestApiHandler, 'GET /readiness HTTP/1.0')) - with patch.object(MockPostgresql, 'state', PropertyMock(return_value='stopped')): + with patch.object(MockPostgresql, 'state', PropertyMock(return_value=PostgresqlState.STOPPED)): self.assertIsNotNone(MockRestApiServer(RestApiHandler, 'GET /readiness HTTP/1.0')) - @patch.object(MockPostgresql, 'state', PropertyMock(return_value='stopped')) + @patch.object(MockPostgresql, 'state', PropertyMock(return_value=PostgresqlState.STOPPED)) def test_do_GET_patroni(self): self.assertIsNotNone(MockRestApiServer(RestApiHandler, 'GET /patroni')) diff --git a/tests/test_bootstrap.py b/tests/test_bootstrap.py index 60004dc6..7d7f1bf2 100644 --- a/tests/test_bootstrap.py +++ b/tests/test_bootstrap.py @@ -9,6 +9,7 @@ from patroni.postgresql import Postgresql from patroni.postgresql.bootstrap import Bootstrap from patroni.postgresql.cancellable import CancellableSubprocess from patroni.postgresql.config import ConfigHandler, get_param_diff +from patroni.postgresql.misc import PostgresqlState from . import BaseTestPostgresql, mock_available_gucs, psycopg_connect @@ -269,7 +270,7 @@ class TestBootstrap(BaseTestPostgresql): mock_restart.assert_called_once() self.b.bootstrap(config) - self.p.set_state('stopped') + self.p.set_state(PostgresqlState.STOPPED) self.p.reload_config({'authentication': {'superuser': {'username': 'p', 'password': 'p'}, 'replication': {'username': 'r', 'password': 'r'}, 'rewind': {'username': 'rw', 'password': 'rw'}}, diff --git a/tests/test_consul.py b/tests/test_consul.py index 8b35457c..8fb38602 100644 --- a/tests/test_consul.py +++ b/tests/test_consul.py @@ -9,6 +9,7 @@ from consul import ConsulException, NotFound from patroni.dcs import get_dcs from patroni.dcs.consul import AbstractDCS, Cluster, Consul, ConsulClient, ConsulError, \ ConsulInternalError, HTTPClient, InvalidSession, InvalidSessionTTL, RetryFailedError +from patroni.postgresql.misc import PostgresqlState from patroni.postgresql.mpp import get_mpp from . import SleepException @@ -256,16 +257,16 @@ class TestConsul(unittest.TestCase): @patch.object(Agent.Service, 'register', Mock(side_effect=(False, True, True, True))) @patch.object(Agent.Service, 'deregister', Mock(return_value=True)) def test_update_service(self): - d = {'role': 'replica', 'api_url': 'http://a/t', 'conn_url': 'pg://c:1', 'state': 'running'} + d = {'role': 'replica', 'api_url': 'http://a/t', 'conn_url': 'pg://c:1', 'state': PostgresqlState.RUNNING} self.assertIsNone(self.c.update_service({}, {})) self.assertFalse(self.c.update_service({}, d)) self.assertTrue(self.c.update_service(d, d)) self.assertIsNone(self.c.update_service(d, d)) - d['state'] = 'stopped' + d['state'] = PostgresqlState.STOPPED self.assertTrue(self.c.update_service(d, d, force=True)) - d['state'] = 'unknown' + d['state'] = PostgresqlState.STARTING self.assertIsNone(self.c.update_service({}, d)) - d['state'] = 'running' + d['state'] = PostgresqlState.RUNNING d['role'] = 'bla' self.assertIsNone(self.c.update_service({}, d)) d['role'] = 'primary' @@ -280,7 +281,8 @@ class TestConsul(unittest.TestCase): self.c.refresh_session = Mock(return_value=False) - d = {'role': 'replica', 'api_url': 'http://a/t', 'conn_url': 'pg://c:1', 'state': 'running'} + d = {'role': 'replica', 'api_url': 'http://a/t', + 'conn_url': 'pg://c:1', 'state': PostgresqlState.RUNNING} # Changing register_service from True to False calls deregister() self.c.reload_config({'consul': {'register_service': False}, 'loop_wait': 10, 'ttl': 30, 'retry_timeout': 10}) diff --git a/tests/test_ctl.py b/tests/test_ctl.py index 4cdfc947..93aab2f4 100644 --- a/tests/test_ctl.py +++ b/tests/test_ctl.py @@ -11,6 +11,8 @@ import etcd from click.testing import CliRunner from prettytable import PrettyTable +from patroni.postgresql.misc import PostgresqlState + try: from prettytable import HRuleStyle hrule_all = HRuleStyle.ALL @@ -526,11 +528,11 @@ class TestCtl(unittest.TestCase): cluster = get_cluster_initialized_with_leader() cluster.members.append(Member(0, 'cascade', 28, {'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5437/postgres', - 'api_url': 'http://127.0.0.1:8012/patroni', 'state': 'running', + 'api_url': 'http://127.0.0.1:8012/patroni', 'state': PostgresqlState.RUNNING, 'tags': {'replicatefrom': 'other'}})) cluster.members.append(Member(0, 'wrong_cascade', 28, {'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5438/postgres', - 'api_url': 'http://127.0.0.1:8013/patroni', 'state': 'running', + 'api_url': 'http://127.0.0.1:8013/patroni', 'state': PostgresqlState.RUNNING, 'tags': {'replicatefrom': 'nonexistinghost'}})) with patch('patroni.dcs.AbstractDCS.get_cluster', Mock(return_value=cluster)): result = self.runner.invoke(ctl, ['topology', 'dummy']) diff --git a/tests/test_ha.py b/tests/test_ha.py index b5803982..a260478b 100644 --- a/tests/test_ha.py +++ b/tests/test_ha.py @@ -18,6 +18,7 @@ from patroni.postgresql.bootstrap import Bootstrap from patroni.postgresql.callback_executor import CallbackAction from patroni.postgresql.cancellable import CancellableSubprocess from patroni.postgresql.config import ConfigHandler +from patroni.postgresql.misc import PostgresqlState from patroni.postgresql.postmaster import PostmasterProcess from patroni.postgresql.rewind import Rewind from patroni.postgresql.slots import SlotsHandler @@ -211,7 +212,7 @@ class TestHa(PostgresInit): @patch.object(Config, '_load_cache', Mock()) def setUp(self): super(TestHa, self).setUp() - self.p.set_state('running') + self.p.set_state(PostgresqlState.RUNNING) self.p.set_role('replica') self.p.postmaster_start_time = MagicMock(return_value=str(postmaster_start_time)) self.p.can_create_replica_without_replication_connection = MagicMock(return_value=False) @@ -678,7 +679,7 @@ class TestHa(PostgresInit): self.p.restart = Mock(return_value=None) self.assertEqual(self.ha.restart({}), (False, 'postgres is still starting')) self.p.restart = false - self.assertEqual(self.ha.restart({}), (False, 'restart failed')) + self.assertEqual(self.ha.restart({}), (False, PostgresqlState.RESTART_FAILED)) self.ha.cluster = get_cluster_initialized_with_leader() self.ha._async_executor.schedule('reinitialize') self.assertEqual(self.ha.restart({}), (False, 'reinitialize already in progress')) diff --git a/tests/test_kubernetes.py b/tests/test_kubernetes.py index a32c348a..48828266 100644 --- a/tests/test_kubernetes.py +++ b/tests/test_kubernetes.py @@ -15,6 +15,7 @@ from patroni.dcs import get_dcs from patroni.dcs.kubernetes import Cluster, k8s_client, k8s_config, K8sConfig, K8sConnectionFailed, \ K8sException, K8sObject, Kubernetes, KubernetesError, KubernetesRetriableException, Retry, \ RetryFailedError, SERVICE_HOST_ENV_NAME, SERVICE_PORT_ENV_NAME +from patroni.postgresql.misc import PostgresqlState from patroni.postgresql.mpp import get_mpp from . import MockResponse, SleepException @@ -321,19 +322,19 @@ class TestKubernetesConfigMaps(BaseTestKubernetes): mock_patch_namespaced_pod.return_value.metadata.resource_version = '10' self.k._name = 'p-1' - self.k.touch_member({'role': 'replica', 'state': 'initializing new cluster'}) + self.k.touch_member({'role': 'replica', 'state': PostgresqlState.INITDB}) self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['foo'], 'bar') - self.k.touch_member({'state': 'running', 'role': 'replica'}) + self.k.touch_member({'state': PostgresqlState.RUNNING, 'role': 'replica'}) self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['foo'], None) - self.k.touch_member({'role': 'replica', 'state': 'running custom bootstrap script'}) + self.k.touch_member({'role': 'replica', 'state': PostgresqlState.CUSTOM_BOOTSTRAP}) self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['foo'], 'bar') - self.k.touch_member({'role': 'replica', 'state': 'starting after custom bootstrap'}) + self.k.touch_member({'role': 'replica', 'state': PostgresqlState.BOOTSTRAP_STARTING}) self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['foo'], 'bar') - self.k.touch_member({'state': 'stopped', 'role': 'primary'}) + self.k.touch_member({'state': PostgresqlState.STOPPED, 'role': 'primary'}) self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['foo'], None) self.k._role_label = 'isMaster' @@ -342,23 +343,23 @@ class TestKubernetesConfigMaps(BaseTestKubernetes): self.k._standby_leader_label_value = 'false' self.k._tmp_role_label = 'tmp_role' - self.k.touch_member({'state': 'creating replica', 'role': 'replica'}) + self.k.touch_member({'state': PostgresqlState.CREATING_REPLICA, 'role': 'replica'}) self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['foo'], 'bar') - self.k.touch_member({'state': 'running', 'role': 'replica'}) + self.k.touch_member({'state': PostgresqlState.RUNNING, 'role': 'replica'}) self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['foo'], None) self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['isMaster'], 'false') self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['tmp_role'], 'replica') mock_patch_namespaced_pod.rest_mock() self.k._name = 'p-0' - self.k.touch_member({'state': 'running', 'role': 'standby_leader'}) + self.k.touch_member({'state': PostgresqlState.RUNNING, 'role': 'standby_leader'}) mock_patch_namespaced_pod.assert_called() self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['isMaster'], 'false') self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['tmp_role'], 'primary') mock_patch_namespaced_pod.rest_mock() - self.k.touch_member({'state': 'running', 'role': 'primary'}) + self.k.touch_member({'state': PostgresqlState.RUNNING, 'role': 'primary'}) mock_patch_namespaced_pod.assert_called() self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['isMaster'], 'true') self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['tmp_role'], 'primary') @@ -494,7 +495,7 @@ class TestKubernetesEndpoints(BaseTestKubernetes): self.assertEqual(('create_config_service failed',), mock_logger_exception.call_args[0]) mock_logger_exception.reset_mock() - self.k.touch_member({'state': 'running', 'role': 'replica'}) + self.k.touch_member({'state': PostgresqlState.RUNNING, 'role': 'replica'}) mock_logger_exception.assert_called_once() self.assertEqual(('create_config_service failed',), mock_logger_exception.call_args[0]) diff --git a/tests/test_patroni.py b/tests/test_patroni.py index f8299dae..53010d56 100644 --- a/tests/test_patroni.py +++ b/tests/test_patroni.py @@ -20,6 +20,7 @@ from patroni.dcs.etcd import AbstractEtcdClientWithFailover from patroni.exceptions import DCSError from patroni.postgresql import Postgresql from patroni.postgresql.config import ConfigHandler +from patroni.postgresql.misc import PostgresqlState from . import psycopg_connect, SleepException from .test_etcd import etcd_read, etcd_write @@ -159,7 +160,7 @@ class TestPatroni(unittest.TestCase): @patch('patroni.config.Config.save_cache', Mock()) @patch('patroni.config.Config.reload_local_configuration', Mock(return_value=True)) @patch('patroni.ha.Ha.is_leader', Mock(return_value=True)) - @patch.object(Postgresql, 'state', PropertyMock(return_value='running')) + @patch.object(Postgresql, 'state', PropertyMock(return_value=PostgresqlState.RUNNING)) @patch.object(Postgresql, 'data_directory_empty', Mock(return_value=False)) def test_run(self): self.p.postgresql.set_role('replica') diff --git a/tests/test_postgresql.py b/tests/test_postgresql.py index b811ff25..3ab78cac 100644 --- a/tests/test_postgresql.py +++ b/tests/test_postgresql.py @@ -18,10 +18,11 @@ from patroni.async_executor import CriticalTask from patroni.collections import CaseInsensitiveDict, CaseInsensitiveSet from patroni.dcs import RemoteMember from patroni.exceptions import PatroniException, PostgresConnectionException -from patroni.postgresql import Postgresql, STATE_NO_RESPONSE, STATE_REJECT +from patroni.postgresql import PgIsReadyStatus, Postgresql from patroni.postgresql.bootstrap import Bootstrap from patroni.postgresql.callback_executor import CallbackAction from patroni.postgresql.config import _false_validator, get_param_diff +from patroni.postgresql.misc import PostgresqlState from patroni.postgresql.postmaster import PostmasterProcess from patroni.postgresql.validator import _get_postgres_guc_validators, _load_postgres_gucs_validators, \ _read_postgres_gucs_validators_file, Bool, Enum, EnumBool, Integer, InvalidGucValidatorsFile, \ @@ -159,7 +160,7 @@ class TestPostgresql(BaseTestPostgresql): @patch.object(Postgresql, 'pg_isready') @patch('patroni.postgresql.polling_loop', Mock(return_value=range(1))) def test_wait_for_port_open(self, mock_pg_isready): - mock_pg_isready.return_value = STATE_NO_RESPONSE + mock_pg_isready.return_value = PgIsReadyStatus.NO_RESPONSE mock_postmaster = MockPostmaster() mock_postmaster.is_running.return_value = None @@ -257,7 +258,7 @@ class TestPostgresql(BaseTestPostgresql): def test_restart(self): self.p.start = Mock(return_value=False) self.assertFalse(self.p.restart()) - self.assertEqual(self.p.state, 'restart failed (restarting)') + self.assertEqual(self.p.state, PostgresqlState.RESTART_FAILED) @patch('os.chmod', Mock()) @patch('builtins.open', MagicMock()) @@ -378,7 +379,7 @@ class TestPostgresql(BaseTestPostgresql): @patch.object(MockCursor, 'execute', Mock(side_effect=psycopg.OperationalError)) def test__query(self): self.assertRaises(PostgresConnectionException, self.p._query, 'blabla') - self.p._state = 'restarting' + self.p._state = PostgresqlState.RESTARTING self.assertRaises(RetryFailedError, self.p._query, 'blabla') def test_query(self): @@ -386,7 +387,7 @@ class TestPostgresql(BaseTestPostgresql): self.assertRaises(PostgresConnectionException, self.p.query, 'RetryFailedError') self.assertRaises(psycopg.ProgrammingError, self.p.query, 'blabla') - @patch.object(Postgresql, 'pg_isready', Mock(return_value=STATE_REJECT)) + @patch.object(Postgresql, 'pg_isready', Mock(return_value=PgIsReadyStatus.REJECT)) def test_is_primary(self): self.assertTrue(self.p.is_primary()) self.p.reset_cluster_info_state(None) @@ -744,33 +745,33 @@ class TestPostgresql(BaseTestPostgresql): def test_check_for_startup(self): with patch('subprocess.call', return_value=0): - self.p._state = 'starting' + self.p._state = PostgresqlState.STARTING self.assertFalse(self.p.check_for_startup()) - self.assertEqual(self.p.state, 'running') + self.assertEqual(self.p.state, PostgresqlState.RUNNING) with patch('subprocess.call', return_value=1): - self.p._state = 'starting' + self.p._state = PostgresqlState.STARTING self.assertTrue(self.p.check_for_startup()) - self.assertEqual(self.p.state, 'starting') + self.assertEqual(self.p.state, PostgresqlState.STARTING) with patch('subprocess.call', return_value=2): - self.p._state = 'starting' + self.p._state = PostgresqlState.STARTING self.assertFalse(self.p.check_for_startup()) - self.assertEqual(self.p.state, 'start failed') + self.assertEqual(self.p.state, PostgresqlState.START_FAILED) with patch('subprocess.call', return_value=0): - self.p._state = 'running' + self.p._state = PostgresqlState.RUNNING self.assertFalse(self.p.check_for_startup()) - self.assertEqual(self.p.state, 'running') + self.assertEqual(self.p.state, PostgresqlState.RUNNING) with patch('subprocess.call', return_value=127): - self.p._state = 'running' + self.p._state = PostgresqlState.RUNNING self.assertFalse(self.p.check_for_startup()) - self.assertEqual(self.p.state, 'running') + self.assertEqual(self.p.state, PostgresqlState.RUNNING) - self.p._state = 'starting' + self.p._state = PostgresqlState.STARTING self.assertFalse(self.p.check_for_startup()) - self.assertEqual(self.p.state, 'running') + self.assertEqual(self.p.state, PostgresqlState.RUNNING) def test_wait_for_startup(self): state = {'sleeps': 0, 'num_rejects': 0, 'final_return': 0} @@ -793,21 +794,21 @@ class TestPostgresql(BaseTestPostgresql): with patch('time.sleep', side_effect=increment_sleeps): self.p.time_in_state = Mock(side_effect=time_in_state) - self.p._state = 'stopped' + self.p._state = PostgresqlState.STOPPED self.assertTrue(self.p.wait_for_startup()) self.assertEqual(state['sleeps'], 0) - self.p._state = 'starting' + self.p._state = PostgresqlState.STARTING state['num_rejects'] = 5 self.assertTrue(self.p.wait_for_startup()) self.assertEqual(state['sleeps'], 5) - self.p._state = 'starting' + self.p._state = PostgresqlState.STARTING state['sleeps'] = 0 state['final_return'] = 2 self.assertFalse(self.p.wait_for_startup()) - self.p._state = 'starting' + self.p._state = PostgresqlState.STARTING state['sleeps'] = 0 state['final_return'] = 0 self.assertFalse(self.p.wait_for_startup(timeout=2)) @@ -815,7 +816,7 @@ class TestPostgresql(BaseTestPostgresql): with patch.object(Postgresql, 'check_startup_state_changed', Mock(return_value=False)): self.p.cancellable.cancel() - self.p._state = 'starting' + self.p._state = PostgresqlState.STARTING self.assertIsNone(self.p.wait_for_startup()) def test_get_server_parameters(self): diff --git a/tests/test_slots.py b/tests/test_slots.py index 4a7afa27..8be9864e 100644 --- a/tests/test_slots.py +++ b/tests/test_slots.py @@ -8,7 +8,7 @@ from unittest.mock import Mock, patch, PropertyMock from patroni import global_config, psycopg from patroni.dcs import Cluster, ClusterConfig, Member, Status, SyncState from patroni.postgresql import Postgresql -from patroni.postgresql.misc import fsync_dir +from patroni.postgresql.misc import fsync_dir, PostgresqlState from patroni.postgresql.slots import SlotsAdvanceThread, SlotsHandler from patroni.tags import Tags @@ -86,7 +86,7 @@ class TestSlotsHandler(BaseTestPostgresql): """Test sync with a cascading replica so physical slots are present on a replica.""" config = ClusterConfig(1, {'slots': {'ls': {'database': 'a', 'plugin': 'b'}}}, 1) cascading_replica = Member(0, 'test-2', 28, { - 'state': 'running', 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres', + 'state': PostgresqlState.RUNNING, 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres', 'tags': {'replicatefrom': 'postgresql0'} }) cluster = Cluster(True, config, self.leader, Status(0, {'ls': 10}, []), @@ -127,17 +127,17 @@ class TestSlotsHandler(BaseTestPostgresql): config = ClusterConfig( 1, {'slots': {'foo': {'type': 'logical', 'database': 'a', 'plugin': 'b'}, 'bar': {'type': 'physical'}}}, 1) nostream_node = Member(0, 'test-2', 28, { - 'state': 'running', 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres', + 'state': PostgresqlState.RUNNING, 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres', 'tags': {'nostream': 'True'}, 'xlog_location': 10, }) cascade_node = Member(0, 'test-3', 28, { - 'state': 'running', 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres', + 'state': PostgresqlState.RUNNING, 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres', 'tags': {'replicatefrom': 'test-2'}, 'xlog_location': 98 }) stream_node = Member(0, 'test-4', 28, { - 'state': 'running', 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres', + 'state': PostgresqlState.RUNNING, 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres', 'xlog_location': 99}) cluster = Cluster( True, config, self.leader, Status(100, {'leader': 99, 'test_2': 98, 'test_3': 97, 'test_4': 98}, []), @@ -193,11 +193,11 @@ class TestSlotsHandler(BaseTestPostgresql): def test_get_slot_name_on_primary(self): node1 = Member(0, 'node1', 28, { - 'state': 'running', 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres', + 'state': PostgresqlState.RUNNING, 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres', 'tags': {'replicatefrom': 'node2'} }) node2 = Member(0, 'node2', 28, { - 'state': 'running', 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres', + 'state': PostgresqlState.RUNNING, 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres', 'tags': {'replicatefrom': 'node1'} }) cluster = Cluster(True, None, self.leader, Status.empty(), [self.leadermem, node1, node2], @@ -206,11 +206,11 @@ class TestSlotsHandler(BaseTestPostgresql): def test_should_enforce_hot_standby_feedback(self): node1 = Member(0, 'postgresql0', 28, { - 'state': 'running', 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres', + 'state': PostgresqlState.RUNNING, 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres', 'tags': {'replicatefrom': 'postgresql1'} }) node2 = Member(0, 'postgresql1', 28, { - 'state': 'running', 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres', + 'state': PostgresqlState.RUNNING, 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres', 'tags': {'replicatefrom': 'postgresql0'} }) cluster = Cluster(True, None, self.leader, Status.empty(), [self.leadermem, node1, node2],