mirror of
https://github.com/outbackdingo/patroni.git
synced 2026-01-27 10:20:10 +00:00
@@ -30,7 +30,7 @@ from . import global_config, psycopg
|
||||
from .__main__ import Patroni
|
||||
from .dcs import Cluster
|
||||
from .exceptions import PostgresConnectionException, PostgresException
|
||||
from .postgresql.misc import postgres_version_to_int
|
||||
from .postgresql.misc import postgres_version_to_int, PostgresqlState
|
||||
from .utils import cluster_as_json, deep_compare, enable_keepalive, parse_bool, \
|
||||
parse_int, patch_config, Retry, RetryFailedError, split_host_port, tzutc, uri
|
||||
|
||||
@@ -324,7 +324,7 @@ class RestApiHandler(BaseHTTPRequestHandler):
|
||||
is_lagging = leader_optime and leader_optime > replayed_location + max_replica_lag
|
||||
|
||||
replica_status_code = 200 if not patroni.noloadbalance and not is_lagging and \
|
||||
response.get('role') == 'replica' and response.get('state') == 'running' else 503
|
||||
response.get('role') == 'replica' and response.get('state') == PostgresqlState.RUNNING else 503
|
||||
|
||||
if not cluster and response.get('pause'):
|
||||
leader_status_code = 200 if response.get('role') in ('primary', 'standby_leader') else 503
|
||||
@@ -358,7 +358,7 @@ class RestApiHandler(BaseHTTPRequestHandler):
|
||||
elif 'read-only' in path and 'sync' not in path and 'quorum' not in path:
|
||||
status_code = 200 if 200 in (primary_status_code, standby_leader_status_code) else replica_status_code
|
||||
elif 'health' in path:
|
||||
status_code = 200 if response.get('state') == 'running' else 503
|
||||
status_code = 200 if response.get('state') == PostgresqlState.RUNNING else 503
|
||||
elif cluster: # dcs is available
|
||||
is_quorum = response.get('quorum_standby')
|
||||
is_synchronous = response.get('sync_standby')
|
||||
@@ -462,7 +462,7 @@ class RestApiHandler(BaseHTTPRequestHandler):
|
||||
patroni = self.server.patroni
|
||||
if patroni.ha.is_leader():
|
||||
status_code = 200
|
||||
elif patroni.postgresql.state == 'running':
|
||||
elif patroni.postgresql.state == PostgresqlState.RUNNING:
|
||||
status_code = 200 if patroni.dcs.cluster else 503
|
||||
else:
|
||||
status_code = 503
|
||||
@@ -573,7 +573,8 @@ class RestApiHandler(BaseHTTPRequestHandler):
|
||||
|
||||
metrics.append("# HELP patroni_postgres_running Value is 1 if Postgres is running, 0 otherwise.")
|
||||
metrics.append("# TYPE patroni_postgres_running gauge")
|
||||
metrics.append("patroni_postgres_running{0} {1}".format(labels, int(postgres['state'] == 'running')))
|
||||
metrics.append("patroni_postgres_running{0} {1}".format(
|
||||
labels, int(postgres['state'] == PostgresqlState.RUNNING)))
|
||||
|
||||
metrics.append("# HELP patroni_postmaster_start_time Epoch seconds since Postgres started.")
|
||||
metrics.append("# TYPE patroni_postmaster_start_time gauge")
|
||||
@@ -889,7 +890,7 @@ class RestApiHandler(BaseHTTPRequestHandler):
|
||||
If it's not able to parse the request body, then the request is silently discarded.
|
||||
"""
|
||||
status_code = 500
|
||||
data = 'restart failed'
|
||||
data = PostgresqlState.RESTART_FAILED
|
||||
request = self._read_json_content(body_is_optional=True)
|
||||
cluster = self.server.patroni.dcs.get_cluster()
|
||||
if request is None:
|
||||
@@ -1263,10 +1264,7 @@ class RestApiHandler(BaseHTTPRequestHandler):
|
||||
|
||||
:returns: a dict with the status of Postgres/Patroni. The keys are:
|
||||
|
||||
* ``state``: Postgres state among ``stopping``, ``stopped``, ``stop failed``, ``crashed``, ``running``,
|
||||
``starting``, ``start failed``, ``restarting``, ``restart failed``, ``initializing new cluster``,
|
||||
``initdb failed``, ``running custom bootstrap script``, ``starting after custom bootstrap``,
|
||||
``custom bootstrap failed``, ``creating replica``, or ``unknown``;
|
||||
* ``state``: one of :class:`~patroni.postgresql.misc.PostgresqlState` or ``unknown``;
|
||||
* ``postmaster_start_time``: ``pg_postmaster_start_time()``;
|
||||
* ``role``: ``replica`` or ``primary`` based on ``pg_is_in_recovery()`` output;
|
||||
* ``server_version``: Postgres version without periods, e.g. ``150002`` for Postgres ``15.2``;
|
||||
@@ -1308,7 +1306,8 @@ class RestApiHandler(BaseHTTPRequestHandler):
|
||||
config = global_config.from_cluster(cluster)
|
||||
try:
|
||||
|
||||
if postgresql.state not in ('running', 'restarting', 'starting'):
|
||||
if postgresql.state not in (PostgresqlState.RUNNING, PostgresqlState.RESTARTING,
|
||||
PostgresqlState.STARTING):
|
||||
raise RetryFailedError('')
|
||||
replication_state = ("pg_catalog.pg_{0}_{1}_diff(wr.latest_end_lsn, '0/0')::bigint, wr.status"
|
||||
if postgresql.major_version >= 90600 else "NULL, NULL") + ", " +\
|
||||
@@ -1365,7 +1364,7 @@ class RestApiHandler(BaseHTTPRequestHandler):
|
||||
|
||||
except (psycopg.Error, RetryFailedError, PostgresConnectionException):
|
||||
state = postgresql.state
|
||||
if state == 'running':
|
||||
if state == PostgresqlState.RUNNING:
|
||||
logger.exception('get_postgresql_status')
|
||||
state = 'unknown'
|
||||
result: Dict[str, Any] = {'state': state, 'role': postgresql.role}
|
||||
|
||||
@@ -64,7 +64,7 @@ from . import global_config
|
||||
from .config import Config
|
||||
from .dcs import AbstractDCS, Cluster, get_dcs as _get_dcs, Member
|
||||
from .exceptions import PatroniException
|
||||
from .postgresql.misc import postgres_version_to_int
|
||||
from .postgresql.misc import postgres_version_to_int, PostgresqlState
|
||||
from .postgresql.mpp import get_mpp
|
||||
from .request import PatroniRequest
|
||||
from .utils import cluster_as_json, patch_config, polling_loop
|
||||
@@ -1210,7 +1210,7 @@ def reinit(cluster_name: str, group: Optional[int], member_names: List[str], for
|
||||
time.sleep(2)
|
||||
for member in wait_on_members:
|
||||
data = json.loads(request_patroni(member, 'get', 'patroni').data.decode('utf-8'))
|
||||
if data.get('state') != 'creating replica':
|
||||
if data.get('state') != PostgresqlState.CREATING_REPLICA:
|
||||
click.echo('Reinitialize is completed on: {0}'.format(member.name))
|
||||
wait_on_members.remove(member)
|
||||
|
||||
@@ -1533,10 +1533,7 @@ def output_members(cluster: Cluster, name: str, extended: bool = False,
|
||||
* ``Member``: name of the Patroni node, as per ``name`` configuration;
|
||||
* ``Host``: hostname (or IP) and port, as per ``postgresql.listen`` configuration;
|
||||
* ``Role``: ``Leader``, ``Standby Leader``, ``Sync Standby`` or ``Replica``;
|
||||
* ``State``: ``stopping``, ``stopped``, ``stop failed``, ``crashed``, ``running``, ``starting``,
|
||||
``start failed``, ``restarting``, ``restart failed``, ``initializing new cluster``, ``initdb failed``,
|
||||
``running custom bootstrap script``, ``starting after custom bootstrap``, ``custom bootstrap failed``,
|
||||
``creating replica``, ``streaming``, ``in archive recovery``, and so on;
|
||||
* ``State``: one of :class:`~patroni.postgresql.misc.PostgresqlState`;
|
||||
* ``TL``: current timeline in Postgres;
|
||||
``Lag in MB``: replication lag.
|
||||
|
||||
|
||||
@@ -295,8 +295,10 @@ class Member(Tags, NamedTuple('Member',
|
||||
|
||||
@property
|
||||
def is_running(self) -> bool:
|
||||
"""``True`` if the member :attr:`~Member.state` is ``running``."""
|
||||
return self.state == 'running'
|
||||
"""``True`` if the member :attr:`~Member.state` is :class:`~patroni.postgresql.misc.PostgresqlState.RUNNING`."""
|
||||
from ..postgresql.misc import PostgresqlState
|
||||
|
||||
return self.state == PostgresqlState.RUNNING
|
||||
|
||||
@property
|
||||
def patroni_version(self) -> Optional[Tuple[int, ...]]:
|
||||
|
||||
@@ -19,6 +19,7 @@ from consul import base, Check, ConsulException, NotFound
|
||||
from urllib3.exceptions import HTTPError
|
||||
|
||||
from ..exceptions import DCSError
|
||||
from ..postgresql.misc import PostgresqlState
|
||||
from ..postgresql.mpp import AbstractMPP
|
||||
from ..utils import deep_compare, parse_bool, Retry, RetryFailedError, split_host_port, uri, USER_AGENT
|
||||
from . import AbstractDCS, catch_return_false_exception, Cluster, ClusterConfig, \
|
||||
@@ -546,13 +547,13 @@ class Consul(AbstractDCS):
|
||||
'enable_tag_override': True,
|
||||
}
|
||||
|
||||
if state == 'stopped' or (not self._register_service and self._previous_loop_register_service):
|
||||
if state == PostgresqlState.STOPPED or (not self._register_service and self._previous_loop_register_service):
|
||||
self._previous_loop_register_service = self._register_service
|
||||
return self.deregister_service(params['service_id'])
|
||||
|
||||
self._previous_loop_register_service = self._register_service
|
||||
if role in ['primary', 'replica', 'standby-leader']:
|
||||
if state != 'running':
|
||||
if state != PostgresqlState.RUNNING:
|
||||
return
|
||||
return self.register_service(service_name, **params)
|
||||
|
||||
|
||||
@@ -23,6 +23,7 @@ from urllib3.exceptions import HTTPError
|
||||
|
||||
from ..collections import EMPTY_DICT
|
||||
from ..exceptions import DCSError
|
||||
from ..postgresql.misc import PostgresqlState
|
||||
from ..postgresql.mpp import AbstractMPP
|
||||
from ..utils import deep_compare, iter_response_objects, \
|
||||
keepalive_socket_options, Retry, RetryFailedError, tzutc, uri, USER_AGENT
|
||||
@@ -1320,7 +1321,7 @@ class Kubernetes(AbstractDCS):
|
||||
if cluster and cluster.leader and cluster.leader.name == self._name:
|
||||
role = self._standby_leader_label_value if data['role'] == 'standby_leader' else self._leader_label_value
|
||||
tmp_role = 'primary'
|
||||
elif data['state'] == 'running' and data['role'] != 'primary':
|
||||
elif data['state'] == PostgresqlState.RUNNING and data['role'] != 'primary':
|
||||
role = {'replica': self._follower_label_value}.get(data['role'], data['role'])
|
||||
tmp_role = data['role']
|
||||
else:
|
||||
@@ -1332,9 +1333,8 @@ class Kubernetes(AbstractDCS):
|
||||
updated_labels[self._tmp_role_label] = tmp_role
|
||||
|
||||
if self._bootstrap_labels:
|
||||
if data['state'] in ('initializing new cluster',
|
||||
'running custom bootstrap script', 'starting after custom bootstrap',
|
||||
'creating replica'):
|
||||
if data['state'] in (PostgresqlState.INITDB, PostgresqlState.CUSTOM_BOOTSTRAP,
|
||||
PostgresqlState.BOOTSTRAP_STARTING, PostgresqlState.CREATING_REPLICA):
|
||||
updated_labels.update(self._bootstrap_labels)
|
||||
else:
|
||||
updated_labels.update({k: None for k, _ in self._bootstrap_labels.items()})
|
||||
|
||||
@@ -17,7 +17,7 @@ from .collections import CaseInsensitiveSet
|
||||
from .dcs import AbstractDCS, Cluster, Leader, Member, RemoteMember, Status, SyncState
|
||||
from .exceptions import DCSError, PatroniFatalException, PostgresConnectionException
|
||||
from .postgresql.callback_executor import CallbackAction
|
||||
from .postgresql.misc import postgres_version_to_int
|
||||
from .postgresql.misc import postgres_version_to_int, PostgresqlState
|
||||
from .postgresql.postmaster import PostmasterProcess
|
||||
from .postgresql.rewind import Rewind
|
||||
from .quorum import QuorumStateResolver
|
||||
@@ -458,7 +458,8 @@ class Ha(object):
|
||||
data['pending_restart'] = True
|
||||
data['pending_restart_reason'] = dict(self.state_handler.pending_restart_reason)
|
||||
if self._async_executor.scheduled_action in (None, 'promote') \
|
||||
and data['state'] in ['running', 'restarting', 'starting']:
|
||||
and data['state'] in [PostgresqlState.RUNNING, PostgresqlState.RESTARTING,
|
||||
PostgresqlState.STARTING]:
|
||||
try:
|
||||
timeline, wal_position, pg_control_timeline = self.state_handler.timeline_wal_position()
|
||||
data['xlog_location'] = self._last_wal_lsn = wal_position
|
||||
@@ -495,7 +496,7 @@ class Ha(object):
|
||||
ret = self.dcs.touch_member(data)
|
||||
if ret:
|
||||
new_state = (data['state'], data['role'])
|
||||
if self._last_state != new_state and new_state == ('running', 'primary'):
|
||||
if self._last_state != new_state and new_state == (PostgresqlState.RUNNING, 'primary'):
|
||||
self.notify_mpp_coordinator('after_promote')
|
||||
self._last_state = new_state
|
||||
return ret
|
||||
@@ -636,7 +637,7 @@ class Ha(object):
|
||||
if timeout\
|
||||
and data.get('Database cluster state') in ('in production', 'in crash recovery',
|
||||
'shutting down', 'shut down')\
|
||||
and self.state_handler.state == 'crashed'\
|
||||
and self.state_handler.state == PostgresqlState.CRASHED\
|
||||
and self.state_handler.role == 'primary'\
|
||||
and not self.state_handler.config.recovery_conf_exists():
|
||||
# We know 100% that we were running as a primary a few moments ago, therefore could just start postgres
|
||||
@@ -1158,7 +1159,7 @@ class Ha(object):
|
||||
|
||||
:returns: the reason why caller shouldn't continue as a primary or the current value of received/replayed LSN.
|
||||
"""
|
||||
if self.state_handler.state == 'running' and self.state_handler.role == 'primary':
|
||||
if self.state_handler.state == PostgresqlState.RUNNING and self.state_handler.role == 'primary':
|
||||
return 'Running as a leader'
|
||||
self._failsafe.update(data)
|
||||
return self._last_wal_lsn
|
||||
@@ -1898,7 +1899,7 @@ class Ha(object):
|
||||
elif res is None:
|
||||
return (False, 'postgres is still starting')
|
||||
else:
|
||||
return (False, 'restart failed')
|
||||
return (False, PostgresqlState.RESTART_FAILED)
|
||||
|
||||
def _do_reinitialize(self, cluster: Cluster) -> Optional[bool]:
|
||||
self.state_handler.stop('immediate', stop_timeout=self.patroni.config['retry_timeout'])
|
||||
@@ -2035,7 +2036,8 @@ class Ha(object):
|
||||
if not self.state_handler.check_for_startup() or self.is_paused():
|
||||
self.set_start_timeout(None)
|
||||
if self.is_paused():
|
||||
self.state_handler.set_state(self.state_handler.is_running() and 'running' or 'stopped')
|
||||
self.state_handler.set_state(PostgresqlState.RUNNING if self.state_handler.is_running()
|
||||
else PostgresqlState.STOPPED)
|
||||
return None
|
||||
|
||||
# state_handler.state == 'starting' here
|
||||
@@ -2201,7 +2203,7 @@ class Ha(object):
|
||||
|
||||
if not self.state_handler.is_healthy():
|
||||
if self.is_paused():
|
||||
self.state_handler.set_state('stopped')
|
||||
self.state_handler.set_state(PostgresqlState.STOPPED)
|
||||
if self.has_lock():
|
||||
self._delete_leader()
|
||||
return 'removed leader lock because postgres is not running'
|
||||
@@ -2214,8 +2216,8 @@ class Ha(object):
|
||||
(self._rewind.is_needed and self._rewind.can_rewind_or_reinitialize_allowed):
|
||||
return 'postgres is not running'
|
||||
|
||||
if self.state_handler.state in ('running', 'starting'):
|
||||
self.state_handler.set_state('crashed')
|
||||
if self.state_handler.state in (PostgresqlState.RUNNING, PostgresqlState.STARTING):
|
||||
self.state_handler.set_state(PostgresqlState.CRASHED)
|
||||
# try to start dead postgres
|
||||
return self.recover()
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ import time
|
||||
from contextlib import contextmanager
|
||||
from copy import deepcopy
|
||||
from datetime import datetime
|
||||
from enum import IntEnum
|
||||
from threading import current_thread, Lock
|
||||
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, TYPE_CHECKING, Union
|
||||
|
||||
@@ -27,7 +28,7 @@ from .callback_executor import CallbackAction, CallbackExecutor
|
||||
from .cancellable import CancellableSubprocess
|
||||
from .config import ConfigHandler, mtime
|
||||
from .connection import ConnectionPool, get_connection_cursor
|
||||
from .misc import parse_history, parse_lsn, postgres_major_version_to_int
|
||||
from .misc import parse_history, parse_lsn, postgres_major_version_to_int, PostgresqlState
|
||||
from .mpp import AbstractMPP
|
||||
from .postmaster import PostmasterProcess
|
||||
from .slots import SlotsHandler
|
||||
@@ -39,14 +40,24 @@ if TYPE_CHECKING: # pragma: no cover
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
STATE_RUNNING = 'running'
|
||||
STATE_REJECT = 'rejecting connections'
|
||||
STATE_NO_RESPONSE = 'not responding'
|
||||
STATE_UNKNOWN = 'unknown'
|
||||
|
||||
STOP_POLLING_INTERVAL = 1
|
||||
|
||||
|
||||
class PgIsReadyStatus(IntEnum):
|
||||
"""Possible PostgreSQL connection status ``pg_isready`` utility can report.
|
||||
|
||||
:cvar RUNNING: return code 0. PostgreSQL is accepting connections normally.
|
||||
:cvar REJECT: return code 1. PostgreSQL is rejecting connections.
|
||||
:cvar NO_RESPONSE: return code 2. There was no response to the connection attempt.
|
||||
:cvar UNKNOWN: Return code 3. No connection attempt was made, something went wrong.
|
||||
"""
|
||||
|
||||
RUNNING = 0
|
||||
REJECT = 1
|
||||
NO_RESPONSE = 2
|
||||
UNKNOWN = 3
|
||||
|
||||
|
||||
@contextmanager
|
||||
def null_context():
|
||||
yield
|
||||
@@ -76,7 +87,7 @@ class Postgresql(object):
|
||||
self._major_version = self.get_major_version()
|
||||
|
||||
self._state_lock = Lock()
|
||||
self.set_state('stopped')
|
||||
self.set_state(PostgresqlState.STOPPED)
|
||||
|
||||
self._pending_restart_reason = CaseInsensitiveDict()
|
||||
self.connection_pool = ConnectionPool()
|
||||
@@ -123,10 +134,10 @@ class Postgresql(object):
|
||||
|
||||
if self.is_running():
|
||||
# If we found postmaster process we need to figure out whether postgres is accepting connections
|
||||
self.set_state('starting')
|
||||
self.set_state(PostgresqlState.STARTING)
|
||||
self.check_startup_state_changed()
|
||||
|
||||
if self.state == 'running': # we are "joining" already running postgres
|
||||
if self.state == PostgresqlState.RUNNING: # we are "joining" already running postgres
|
||||
# we know that PostgreSQL is accepting connections and can read some GUC's from pg_settings
|
||||
self.config.load_current_server_parameters()
|
||||
|
||||
@@ -303,10 +314,10 @@ class Postgresql(object):
|
||||
initdb = [self.pgcommand('initdb')] + list(args) + [self.data_dir]
|
||||
return subprocess.call(initdb, **kwargs) == 0
|
||||
|
||||
def pg_isready(self) -> str:
|
||||
def pg_isready(self) -> PgIsReadyStatus:
|
||||
"""Runs pg_isready to see if PostgreSQL is accepting connections.
|
||||
|
||||
:returns: 'ok' if PostgreSQL is up, 'reject' if starting up, 'no_response' if not up."""
|
||||
:returns: one of :class:`PgIsReadyStatus` values."""
|
||||
|
||||
r = self.connection_pool.conn_kwargs
|
||||
cmd = [self.pgcommand('pg_isready'), '-p', r['port'], '-d', self._database]
|
||||
@@ -320,11 +331,10 @@ class Postgresql(object):
|
||||
cmd.extend(['-U', r['user']])
|
||||
|
||||
ret = subprocess.call(cmd)
|
||||
return_codes = {0: STATE_RUNNING,
|
||||
1: STATE_REJECT,
|
||||
2: STATE_NO_RESPONSE,
|
||||
3: STATE_UNKNOWN}
|
||||
return return_codes.get(ret, STATE_UNKNOWN)
|
||||
try:
|
||||
return PgIsReadyStatus(ret)
|
||||
except ValueError:
|
||||
return PgIsReadyStatus.UNKNOWN
|
||||
|
||||
def reload_config(self, config: Dict[str, Any], sighup: bool = False) -> None:
|
||||
self.config.reload_config(config, sighup)
|
||||
@@ -388,7 +398,7 @@ class Postgresql(object):
|
||||
try:
|
||||
return self._connection.query(sql, *params)
|
||||
except PostgresConnectionException as exc:
|
||||
if self.state == 'restarting':
|
||||
if self.state == PostgresqlState.RESTARTING:
|
||||
raise RetryFailedError('cluster is being restarted') from exc
|
||||
raise
|
||||
|
||||
@@ -489,8 +499,8 @@ class Postgresql(object):
|
||||
self._cluster_info_state = cluster_info_state
|
||||
except RetryFailedError as e: # SELECT failed two times
|
||||
self._cluster_info_state = {'error': str(e)}
|
||||
if not self.is_starting() and self.pg_isready() == STATE_REJECT:
|
||||
self.set_state('starting')
|
||||
if not self.is_starting() and self.pg_isready() == PgIsReadyStatus.REJECT:
|
||||
self.set_state(PostgresqlState.STARTING)
|
||||
|
||||
if 'error' in self._cluster_info_state:
|
||||
raise PostgresConnectionException(self._cluster_info_state['error'])
|
||||
@@ -700,11 +710,11 @@ class Postgresql(object):
|
||||
self._role = value
|
||||
|
||||
@property
|
||||
def state(self) -> str:
|
||||
def state(self) -> PostgresqlState:
|
||||
with self._state_lock:
|
||||
return self._state
|
||||
|
||||
def set_state(self, value: str) -> None:
|
||||
def set_state(self, value: PostgresqlState) -> None:
|
||||
with self._state_lock:
|
||||
self._state = value
|
||||
self._state_entry_timestamp = time.time()
|
||||
@@ -713,7 +723,7 @@ class Postgresql(object):
|
||||
return time.time() - self._state_entry_timestamp
|
||||
|
||||
def is_starting(self) -> bool:
|
||||
return self.state in ('starting', 'starting after custom bootstrap')
|
||||
return self.state in (PostgresqlState.STARTING, PostgresqlState.BOOTSTRAP_STARTING)
|
||||
|
||||
def wait_for_port_open(self, postmaster: PostmasterProcess, timeout: float) -> bool:
|
||||
"""Waits until PostgreSQL opens ports."""
|
||||
@@ -723,12 +733,12 @@ class Postgresql(object):
|
||||
|
||||
if not postmaster.is_running():
|
||||
logger.error('postmaster is not running')
|
||||
self.set_state('start failed')
|
||||
self.set_state(PostgresqlState.START_FAILED)
|
||||
return False
|
||||
|
||||
isready = self.pg_isready()
|
||||
if isready != STATE_NO_RESPONSE:
|
||||
if isready not in [STATE_REJECT, STATE_RUNNING]:
|
||||
if isready != PgIsReadyStatus.NO_RESPONSE:
|
||||
if isready not in [PgIsReadyStatus.REJECT, PgIsReadyStatus.RUNNING]:
|
||||
logger.warning("Can't determine PostgreSQL startup status, assuming running")
|
||||
return True
|
||||
|
||||
@@ -751,8 +761,8 @@ class Postgresql(object):
|
||||
# patroni.
|
||||
self.connection_pool.close()
|
||||
|
||||
state = 'starting after custom bootstrap' if self.bootstrap.running_custom_bootstrap else 'starting'
|
||||
|
||||
state = (PostgresqlState.BOOTSTRAP_STARTING if self.bootstrap.running_custom_bootstrap
|
||||
else PostgresqlState.STARTING)
|
||||
if self.is_running():
|
||||
logger.error('Cannot start PostgreSQL because one is already running.')
|
||||
self.set_state(state)
|
||||
@@ -862,12 +872,12 @@ class Postgresql(object):
|
||||
# block_callbacks is used during restart to avoid
|
||||
# running start/stop callbacks in addition to restart ones
|
||||
if not block_callbacks:
|
||||
self.set_state('stopped')
|
||||
self.set_state(PostgresqlState.STOPPED)
|
||||
if pg_signaled:
|
||||
self.call_nowait(CallbackAction.ON_STOP)
|
||||
else:
|
||||
logger.warning('pg_ctl stop failed')
|
||||
self.set_state('stop failed')
|
||||
self.set_state(PostgresqlState.STOP_FAILED)
|
||||
return success
|
||||
|
||||
def _do_stop(self, mode: str, block_callbacks: bool, checkpoint: bool,
|
||||
@@ -883,7 +893,7 @@ class Postgresql(object):
|
||||
self.checkpoint(timeout=stop_timeout)
|
||||
|
||||
if not block_callbacks:
|
||||
self.set_state('stopping')
|
||||
self.set_state(PostgresqlState.STOPPING)
|
||||
|
||||
# invoke user-directed before stop script
|
||||
self._before_stop()
|
||||
@@ -979,22 +989,22 @@ class Postgresql(object):
|
||||
"""
|
||||
ready = self.pg_isready()
|
||||
|
||||
if ready == STATE_REJECT:
|
||||
if ready == PgIsReadyStatus.REJECT:
|
||||
return False
|
||||
elif ready == STATE_NO_RESPONSE:
|
||||
elif ready == PgIsReadyStatus.NO_RESPONSE:
|
||||
ret = not self.is_running()
|
||||
if ret:
|
||||
self.set_state('start failed')
|
||||
self.set_state(PostgresqlState.START_FAILED)
|
||||
self.slots_handler.schedule(False) # TODO: can remove this?
|
||||
self.config.save_configuration_files(True) # TODO: maybe remove this?
|
||||
return ret
|
||||
else:
|
||||
if ready != STATE_RUNNING:
|
||||
if ready != PgIsReadyStatus.RUNNING:
|
||||
# Bad configuration or unexpected OS error. No idea of PostgreSQL status.
|
||||
# Let the main loop of run cycle clean up the mess.
|
||||
logger.warning("%s status returned from pg_isready",
|
||||
"Unknown" if ready == STATE_UNKNOWN else "Invalid")
|
||||
self.set_state('running')
|
||||
"Unknown" if ready == PgIsReadyStatus.UNKNOWN else "Invalid")
|
||||
self.set_state(PostgresqlState.RUNNING)
|
||||
self.slots_handler.schedule()
|
||||
self.config.save_configuration_files(True)
|
||||
# TODO: __cb_pending can be None here after PostgreSQL restarts on its own. Do we want to call the callback?
|
||||
@@ -1018,7 +1028,7 @@ class Postgresql(object):
|
||||
return None
|
||||
time.sleep(1)
|
||||
|
||||
return self.state == 'running'
|
||||
return self.state == PostgresqlState.RUNNING
|
||||
|
||||
def restart(self, timeout: Optional[float] = None, task: Optional[CriticalTask] = None,
|
||||
block_callbacks: bool = False, role: Optional[str] = None,
|
||||
@@ -1031,13 +1041,14 @@ class Postgresql(object):
|
||||
|
||||
:returns: True when restart was successful and timeout did not expire when waiting.
|
||||
"""
|
||||
self.set_state('restarting')
|
||||
self.set_state(PostgresqlState.RESTARTING)
|
||||
if not block_callbacks:
|
||||
self.__cb_pending = CallbackAction.ON_RESTART
|
||||
ret = self.stop(block_callbacks=True, before_shutdown=before_shutdown)\
|
||||
and self.start(timeout, task, True, role, after_start)
|
||||
if not ret and not self.is_starting():
|
||||
self.set_state('restart failed ({0})'.format(self.state))
|
||||
logger.warning('restart failed (%r)', self.state)
|
||||
self.set_state(PostgresqlState.RESTART_FAILED)
|
||||
return ret
|
||||
|
||||
def is_healthy(self) -> bool:
|
||||
@@ -1059,7 +1070,7 @@ class Postgresql(object):
|
||||
def controldata(self) -> Dict[str, str]:
|
||||
""" return the contents of pg_controldata, or non-True value if pg_controldata call failed """
|
||||
# Don't try to call pg_controldata during backup restore
|
||||
if self._version_file_exists() and self.state != 'creating replica':
|
||||
if self._version_file_exists() and self.state != PostgresqlState.CREATING_REPLICA:
|
||||
try:
|
||||
env = {**os.environ, 'LANG': 'C', 'LC_ALL': 'C'}
|
||||
data = subprocess.check_output([self.pgcommand('pg_controldata'), self._data_dir], env=env)
|
||||
|
||||
@@ -11,6 +11,7 @@ from ..collections import EMPTY_DICT
|
||||
from ..dcs import Leader, Member, RemoteMember
|
||||
from ..psycopg import quote_ident, quote_literal
|
||||
from ..utils import deep_compare, unquote
|
||||
from .misc import PostgresqlState
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
from . import Postgresql
|
||||
@@ -114,7 +115,7 @@ class Bootstrap(object):
|
||||
return user_options
|
||||
|
||||
def _initdb(self, config: Any) -> bool:
|
||||
self._postgresql.set_state('initializing new cluster')
|
||||
self._postgresql.set_state(PostgresqlState.INITDB)
|
||||
not_allowed_options = ('pgdata', 'nosync', 'pwfile', 'sync-only', 'version')
|
||||
|
||||
def error_handler(e: str) -> None:
|
||||
@@ -138,7 +139,7 @@ class Bootstrap(object):
|
||||
if ret:
|
||||
self._postgresql.configure_server_parameters()
|
||||
else:
|
||||
self._postgresql.set_state('initdb failed')
|
||||
self._postgresql.set_state(PostgresqlState.INITDB_FAILED)
|
||||
return ret
|
||||
|
||||
def _post_restore(self) -> None:
|
||||
@@ -186,7 +187,7 @@ class Bootstrap(object):
|
||||
:returns: ``True`` if the bootstrap was successful, i.e. the execution of the custom ``command`` from *config*
|
||||
exited with code ``0``, ``False`` otherwise.
|
||||
"""
|
||||
self._postgresql.set_state('running custom bootstrap script')
|
||||
self._postgresql.set_state(PostgresqlState.CUSTOM_BOOTSTRAP)
|
||||
params = [] if config.get('no_params') else ['--scope=' + self._postgresql.scope,
|
||||
'--datadir=' + self._postgresql.data_dir]
|
||||
# Add custom parameters specified by the user
|
||||
@@ -196,7 +197,7 @@ class Bootstrap(object):
|
||||
try:
|
||||
logger.info('Running custom bootstrap script: %s', config['command'])
|
||||
if self._postgresql.cancellable.call(shlex.split(config['command']) + params) != 0:
|
||||
self._postgresql.set_state('custom bootstrap failed')
|
||||
self._postgresql.set_state(PostgresqlState.CUSTOM_BOOTSTRAP_FAILED)
|
||||
return False
|
||||
except Exception:
|
||||
logger.exception('Exception during custom bootstrap')
|
||||
@@ -241,7 +242,7 @@ class Bootstrap(object):
|
||||
loop through all methods the user supplies
|
||||
"""
|
||||
|
||||
self._postgresql.set_state('creating replica')
|
||||
self._postgresql.set_state(PostgresqlState.CREATING_REPLICA)
|
||||
self._postgresql.schedule_sanity_checks_after_pause()
|
||||
|
||||
is_remote_member = isinstance(clone_member, RemoteMember)
|
||||
@@ -322,7 +323,7 @@ class Bootstrap(object):
|
||||
if not method_config.get('keep_data', False) and not self._postgresql.data_directory_empty():
|
||||
self._postgresql.remove_data_directory()
|
||||
|
||||
self._postgresql.set_state('stopped')
|
||||
self._postgresql.set_state(PostgresqlState.STOPPED)
|
||||
return ret
|
||||
|
||||
def basebackup(self, conn_url: str, env: Dict[str, str], options: Dict[str, Any]) -> Optional[int]:
|
||||
|
||||
@@ -16,11 +16,11 @@ from ..collections import CaseInsensitiveDict, CaseInsensitiveSet, EMPTY_DICT
|
||||
from ..dcs import Leader, Member, RemoteMember, slot_name_from_member_name
|
||||
from ..exceptions import PatroniFatalException, PostgresConnectionException
|
||||
from ..file_perm import pg_perm
|
||||
from ..postgresql.misc import get_major_from_minor_version, postgres_version_to_int
|
||||
from ..psycopg import parse_conninfo
|
||||
from ..utils import compare_values, get_postgres_version, is_subpath, \
|
||||
maybe_convert_from_base_unit, parse_bool, parse_int, split_host_port, uri, validate_directory
|
||||
from ..validator import EnumValidator, IntValidator
|
||||
from .misc import get_major_from_minor_version, postgres_version_to_int, PostgresqlState
|
||||
from .validator import recovery_parameters, transform_postgresql_parameter_value, transform_recovery_parameter_value
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
@@ -455,7 +455,7 @@ class ConfigHandler(object):
|
||||
in data directory. If it is not the case, we should use major version from the ``PG_VERSION``
|
||||
file.
|
||||
"""
|
||||
if self._postgresql.state == 'running':
|
||||
if self._postgresql.state == PostgresqlState.RUNNING:
|
||||
try:
|
||||
return self._postgresql.server_version
|
||||
except AttributeError:
|
||||
@@ -1193,7 +1193,7 @@ class ConfigHandler(object):
|
||||
|
||||
conf_changed = hba_changed = ident_changed = local_connection_address_changed = False
|
||||
param_diff = CaseInsensitiveDict()
|
||||
if self._postgresql.state == 'running':
|
||||
if self._postgresql.state == PostgresqlState.RUNNING:
|
||||
changes = CaseInsensitiveDict({p: v for p, v in server_parameters.items()
|
||||
if p not in params_skip_changes})
|
||||
changes.update({p: None for p in self._server_parameters.keys()
|
||||
@@ -1311,7 +1311,7 @@ class ConfigHandler(object):
|
||||
self._server_parameters.pop('synchronous_standby_names', None)
|
||||
else:
|
||||
self._server_parameters['synchronous_standby_names'] = value
|
||||
if self._postgresql.state == 'running':
|
||||
if self._postgresql.state == PostgresqlState.RUNNING:
|
||||
self.write_postgresql_conf()
|
||||
self._postgresql.reload()
|
||||
return True
|
||||
|
||||
@@ -2,6 +2,7 @@ import errno
|
||||
import logging
|
||||
import os
|
||||
|
||||
from enum import Enum
|
||||
from typing import Iterable, Tuple
|
||||
|
||||
from ..exceptions import PostgresException
|
||||
@@ -9,6 +10,30 @@ from ..exceptions import PostgresException
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PostgresqlState(str, Enum):
|
||||
"""Possible values of :attr:`Postgresql.state`."""
|
||||
|
||||
INITDB = 'initializing new cluster'
|
||||
INITDB_FAILED = 'initdb failed'
|
||||
CUSTOM_BOOTSTRAP = 'running custom bootstrap script'
|
||||
CUSTOM_BOOTSTRAP_FAILED = 'custom bootstrap failed'
|
||||
CREATING_REPLICA = 'creating replica'
|
||||
RUNNING = 'running'
|
||||
STARTING = 'starting'
|
||||
BOOTSTRAP_STARTING = 'starting after custom bootstrap'
|
||||
START_FAILED = 'start failed'
|
||||
RESTARTING = 'restarting'
|
||||
RESTART_FAILED = 'restart failed'
|
||||
STOPPING = 'stopping'
|
||||
STOPPED = 'stopped'
|
||||
STOP_FAILED = 'stop failed'
|
||||
CRASHED = 'crashed'
|
||||
|
||||
def __repr__(self) -> str:
|
||||
"""Get a string representation of a :class:`PostgresqlState` member."""
|
||||
return self.value
|
||||
|
||||
|
||||
def postgres_version_to_int(pg_version: str) -> int:
|
||||
"""Convert the server_version to integer
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ from urllib.parse import urlparse
|
||||
from ...dcs import Cluster
|
||||
from ...psycopg import connect, ProgrammingError, quote_ident
|
||||
from ...utils import parse_int
|
||||
from ..misc import PostgresqlState
|
||||
from . import AbstractMPP, AbstractMPPHandler
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
@@ -477,7 +478,8 @@ class CitusHandler(Citus, AbstractMPPHandler, Thread):
|
||||
for groupid, worker in cluster.workers.items():
|
||||
leader = worker.leader
|
||||
if leader and leader.conn_url\
|
||||
and leader.data.get('role') in ('master', 'primary') and leader.data.get('state') == 'running':
|
||||
and leader.data.get('role') in ('master', 'primary')\
|
||||
and leader.data.get('state') == PostgresqlState.RUNNING:
|
||||
self.add_task('after_promote', groupid, worker, leader.name, leader.conn_url)
|
||||
|
||||
def find_task_by_groupid(self, groupid: int) -> Optional[int]:
|
||||
|
||||
@@ -9,6 +9,7 @@ from .. import global_config
|
||||
from ..collections import CaseInsensitiveDict, CaseInsensitiveSet
|
||||
from ..dcs import Cluster
|
||||
from ..psycopg import quote_ident
|
||||
from .misc import PostgresqlState
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
from . import Postgresql
|
||||
@@ -413,7 +414,8 @@ END;$$""")
|
||||
sync_param = f'{prefix}{num} ({sync_param})'
|
||||
|
||||
if not (self._postgresql.config.set_synchronous_standby_names(sync_param)
|
||||
and self._postgresql.state == 'running' and self._postgresql.is_primary()) or has_asterisk:
|
||||
and self._postgresql.state == PostgresqlState.RUNNING
|
||||
and self._postgresql.is_primary()) or has_asterisk:
|
||||
return
|
||||
|
||||
time.sleep(0.1) # Usually it takes 1ms to reload postgresql.conf, but we will give it 100ms
|
||||
|
||||
@@ -923,10 +923,7 @@ def cluster_as_json(cluster: 'Cluster') -> Dict[str, Any]:
|
||||
|
||||
* ``name``: the name of the host (unique in the cluster). The ``members`` list is sorted by this key;
|
||||
* ``role``: ``leader``, ``standby_leader``, ``sync_standby``, ``quorum_standby``, or ``replica``;
|
||||
* ``state``: ``stopping``, ``stopped``, ``stop failed``, ``crashed``, ``running``, ``starting``,
|
||||
``start failed``, ``restarting``, ``restart failed``, ``initializing new cluster``, ``initdb failed``,
|
||||
``running custom bootstrap script``, ``starting after custom bootstrap``, ``custom bootstrap failed``,
|
||||
or ``creating replica``;
|
||||
* ``state``: one of :class:`~patroni.postgresql.misc.PostgresqlState`;
|
||||
* ``api_url``: REST API URL based on ``restapi->connect_address`` configuration;
|
||||
* ``host``: PostgreSQL host based on ``postgresql->connect_address``;
|
||||
* ``port``: PostgreSQL port based on ``postgresql->connect_address``;
|
||||
|
||||
@@ -12,6 +12,7 @@ import patroni.psycopg as psycopg
|
||||
from patroni.dcs import Leader, Member
|
||||
from patroni.postgresql import Postgresql
|
||||
from patroni.postgresql.config import ConfigHandler
|
||||
from patroni.postgresql.misc import PostgresqlState
|
||||
from patroni.postgresql.mpp import get_mpp
|
||||
from patroni.utils import RetryFailedError, tzutc
|
||||
|
||||
@@ -283,13 +284,13 @@ class BaseTestPostgresql(PostgresInit):
|
||||
if not os.path.exists(self.p.data_dir):
|
||||
os.makedirs(self.p.data_dir)
|
||||
|
||||
self.leadermem = Member(0, 'leader', 28, {'xlog_location': 100, 'state': 'running',
|
||||
self.leadermem = Member(0, 'leader', 28, {'xlog_location': 100, 'state': PostgresqlState.RUNNING,
|
||||
'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5435/postgres'})
|
||||
self.leader = Leader(-1, 28, self.leadermem)
|
||||
self.other = Member(0, 'test-1', 28, {'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5433/postgres',
|
||||
'state': 'running', 'tags': {'replicatefrom': 'leader'}})
|
||||
'state': PostgresqlState.RUNNING, 'tags': {'replicatefrom': 'leader'}})
|
||||
self.me = Member(0, 'test0', 28, {
|
||||
'state': 'running', 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5434/postgres'})
|
||||
'state': PostgresqlState.RUNNING, 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5434/postgres'})
|
||||
|
||||
def tearDown(self):
|
||||
if os.path.exists(self.p.data_dir):
|
||||
|
||||
@@ -14,6 +14,7 @@ from patroni.dcs import ClusterConfig, Member
|
||||
from patroni.exceptions import PostgresConnectionException
|
||||
from patroni.ha import _MemberStatus
|
||||
from patroni.postgresql.config import get_param_diff
|
||||
from patroni.postgresql.misc import PostgresqlState
|
||||
from patroni.psycopg import OperationalError
|
||||
from patroni.utils import RetryFailedError, tzutc
|
||||
|
||||
@@ -49,7 +50,7 @@ class MockPostgresql:
|
||||
|
||||
connection_pool = MockConnectionPool()
|
||||
name = 'test'
|
||||
state = 'running'
|
||||
state = PostgresqlState.RUNNING
|
||||
role = 'primary'
|
||||
server_version = 90625
|
||||
major_version = 90600
|
||||
@@ -215,7 +216,8 @@ class TestRestApiHandler(unittest.TestCase):
|
||||
MockRestApiServer(RestApiHandler, 'GET /replica')
|
||||
with patch.object(RestApiHandler, 'get_postgresql_status', Mock(return_value={'role': 'primary'})):
|
||||
MockRestApiServer(RestApiHandler, 'GET /replica')
|
||||
with patch.object(RestApiHandler, 'get_postgresql_status', Mock(return_value={'state': 'running'})):
|
||||
with patch.object(RestApiHandler, 'get_postgresql_status',
|
||||
Mock(return_value={'state': PostgresqlState.RUNNING})):
|
||||
MockRestApiServer(RestApiHandler, 'GET /health')
|
||||
MockRestApiServer(RestApiHandler, 'GET /leader')
|
||||
with patch.object(RestApiHandler, 'get_postgresql_status',
|
||||
@@ -349,10 +351,10 @@ class TestRestApiHandler(unittest.TestCase):
|
||||
self.assertIsNotNone(MockRestApiServer(RestApiHandler, 'GET /readiness HTTP/1.0'))
|
||||
with patch.object(MockHa, 'is_leader', Mock(return_value=True)):
|
||||
self.assertIsNotNone(MockRestApiServer(RestApiHandler, 'GET /readiness HTTP/1.0'))
|
||||
with patch.object(MockPostgresql, 'state', PropertyMock(return_value='stopped')):
|
||||
with patch.object(MockPostgresql, 'state', PropertyMock(return_value=PostgresqlState.STOPPED)):
|
||||
self.assertIsNotNone(MockRestApiServer(RestApiHandler, 'GET /readiness HTTP/1.0'))
|
||||
|
||||
@patch.object(MockPostgresql, 'state', PropertyMock(return_value='stopped'))
|
||||
@patch.object(MockPostgresql, 'state', PropertyMock(return_value=PostgresqlState.STOPPED))
|
||||
def test_do_GET_patroni(self):
|
||||
self.assertIsNotNone(MockRestApiServer(RestApiHandler, 'GET /patroni'))
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ from patroni.postgresql import Postgresql
|
||||
from patroni.postgresql.bootstrap import Bootstrap
|
||||
from patroni.postgresql.cancellable import CancellableSubprocess
|
||||
from patroni.postgresql.config import ConfigHandler, get_param_diff
|
||||
from patroni.postgresql.misc import PostgresqlState
|
||||
|
||||
from . import BaseTestPostgresql, mock_available_gucs, psycopg_connect
|
||||
|
||||
@@ -269,7 +270,7 @@ class TestBootstrap(BaseTestPostgresql):
|
||||
mock_restart.assert_called_once()
|
||||
|
||||
self.b.bootstrap(config)
|
||||
self.p.set_state('stopped')
|
||||
self.p.set_state(PostgresqlState.STOPPED)
|
||||
self.p.reload_config({'authentication': {'superuser': {'username': 'p', 'password': 'p'},
|
||||
'replication': {'username': 'r', 'password': 'r'},
|
||||
'rewind': {'username': 'rw', 'password': 'rw'}},
|
||||
|
||||
@@ -9,6 +9,7 @@ from consul import ConsulException, NotFound
|
||||
from patroni.dcs import get_dcs
|
||||
from patroni.dcs.consul import AbstractDCS, Cluster, Consul, ConsulClient, ConsulError, \
|
||||
ConsulInternalError, HTTPClient, InvalidSession, InvalidSessionTTL, RetryFailedError
|
||||
from patroni.postgresql.misc import PostgresqlState
|
||||
from patroni.postgresql.mpp import get_mpp
|
||||
|
||||
from . import SleepException
|
||||
@@ -256,16 +257,16 @@ class TestConsul(unittest.TestCase):
|
||||
@patch.object(Agent.Service, 'register', Mock(side_effect=(False, True, True, True)))
|
||||
@patch.object(Agent.Service, 'deregister', Mock(return_value=True))
|
||||
def test_update_service(self):
|
||||
d = {'role': 'replica', 'api_url': 'http://a/t', 'conn_url': 'pg://c:1', 'state': 'running'}
|
||||
d = {'role': 'replica', 'api_url': 'http://a/t', 'conn_url': 'pg://c:1', 'state': PostgresqlState.RUNNING}
|
||||
self.assertIsNone(self.c.update_service({}, {}))
|
||||
self.assertFalse(self.c.update_service({}, d))
|
||||
self.assertTrue(self.c.update_service(d, d))
|
||||
self.assertIsNone(self.c.update_service(d, d))
|
||||
d['state'] = 'stopped'
|
||||
d['state'] = PostgresqlState.STOPPED
|
||||
self.assertTrue(self.c.update_service(d, d, force=True))
|
||||
d['state'] = 'unknown'
|
||||
d['state'] = PostgresqlState.STARTING
|
||||
self.assertIsNone(self.c.update_service({}, d))
|
||||
d['state'] = 'running'
|
||||
d['state'] = PostgresqlState.RUNNING
|
||||
d['role'] = 'bla'
|
||||
self.assertIsNone(self.c.update_service({}, d))
|
||||
d['role'] = 'primary'
|
||||
@@ -280,7 +281,8 @@ class TestConsul(unittest.TestCase):
|
||||
|
||||
self.c.refresh_session = Mock(return_value=False)
|
||||
|
||||
d = {'role': 'replica', 'api_url': 'http://a/t', 'conn_url': 'pg://c:1', 'state': 'running'}
|
||||
d = {'role': 'replica', 'api_url': 'http://a/t',
|
||||
'conn_url': 'pg://c:1', 'state': PostgresqlState.RUNNING}
|
||||
|
||||
# Changing register_service from True to False calls deregister()
|
||||
self.c.reload_config({'consul': {'register_service': False}, 'loop_wait': 10, 'ttl': 30, 'retry_timeout': 10})
|
||||
|
||||
@@ -11,6 +11,8 @@ import etcd
|
||||
from click.testing import CliRunner
|
||||
from prettytable import PrettyTable
|
||||
|
||||
from patroni.postgresql.misc import PostgresqlState
|
||||
|
||||
try:
|
||||
from prettytable import HRuleStyle
|
||||
hrule_all = HRuleStyle.ALL
|
||||
@@ -526,11 +528,11 @@ class TestCtl(unittest.TestCase):
|
||||
cluster = get_cluster_initialized_with_leader()
|
||||
cluster.members.append(Member(0, 'cascade', 28,
|
||||
{'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5437/postgres',
|
||||
'api_url': 'http://127.0.0.1:8012/patroni', 'state': 'running',
|
||||
'api_url': 'http://127.0.0.1:8012/patroni', 'state': PostgresqlState.RUNNING,
|
||||
'tags': {'replicatefrom': 'other'}}))
|
||||
cluster.members.append(Member(0, 'wrong_cascade', 28,
|
||||
{'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5438/postgres',
|
||||
'api_url': 'http://127.0.0.1:8013/patroni', 'state': 'running',
|
||||
'api_url': 'http://127.0.0.1:8013/patroni', 'state': PostgresqlState.RUNNING,
|
||||
'tags': {'replicatefrom': 'nonexistinghost'}}))
|
||||
with patch('patroni.dcs.AbstractDCS.get_cluster', Mock(return_value=cluster)):
|
||||
result = self.runner.invoke(ctl, ['topology', 'dummy'])
|
||||
|
||||
@@ -18,6 +18,7 @@ from patroni.postgresql.bootstrap import Bootstrap
|
||||
from patroni.postgresql.callback_executor import CallbackAction
|
||||
from patroni.postgresql.cancellable import CancellableSubprocess
|
||||
from patroni.postgresql.config import ConfigHandler
|
||||
from patroni.postgresql.misc import PostgresqlState
|
||||
from patroni.postgresql.postmaster import PostmasterProcess
|
||||
from patroni.postgresql.rewind import Rewind
|
||||
from patroni.postgresql.slots import SlotsHandler
|
||||
@@ -211,7 +212,7 @@ class TestHa(PostgresInit):
|
||||
@patch.object(Config, '_load_cache', Mock())
|
||||
def setUp(self):
|
||||
super(TestHa, self).setUp()
|
||||
self.p.set_state('running')
|
||||
self.p.set_state(PostgresqlState.RUNNING)
|
||||
self.p.set_role('replica')
|
||||
self.p.postmaster_start_time = MagicMock(return_value=str(postmaster_start_time))
|
||||
self.p.can_create_replica_without_replication_connection = MagicMock(return_value=False)
|
||||
@@ -678,7 +679,7 @@ class TestHa(PostgresInit):
|
||||
self.p.restart = Mock(return_value=None)
|
||||
self.assertEqual(self.ha.restart({}), (False, 'postgres is still starting'))
|
||||
self.p.restart = false
|
||||
self.assertEqual(self.ha.restart({}), (False, 'restart failed'))
|
||||
self.assertEqual(self.ha.restart({}), (False, PostgresqlState.RESTART_FAILED))
|
||||
self.ha.cluster = get_cluster_initialized_with_leader()
|
||||
self.ha._async_executor.schedule('reinitialize')
|
||||
self.assertEqual(self.ha.restart({}), (False, 'reinitialize already in progress'))
|
||||
|
||||
@@ -15,6 +15,7 @@ from patroni.dcs import get_dcs
|
||||
from patroni.dcs.kubernetes import Cluster, k8s_client, k8s_config, K8sConfig, K8sConnectionFailed, \
|
||||
K8sException, K8sObject, Kubernetes, KubernetesError, KubernetesRetriableException, Retry, \
|
||||
RetryFailedError, SERVICE_HOST_ENV_NAME, SERVICE_PORT_ENV_NAME
|
||||
from patroni.postgresql.misc import PostgresqlState
|
||||
from patroni.postgresql.mpp import get_mpp
|
||||
|
||||
from . import MockResponse, SleepException
|
||||
@@ -321,19 +322,19 @@ class TestKubernetesConfigMaps(BaseTestKubernetes):
|
||||
mock_patch_namespaced_pod.return_value.metadata.resource_version = '10'
|
||||
|
||||
self.k._name = 'p-1'
|
||||
self.k.touch_member({'role': 'replica', 'state': 'initializing new cluster'})
|
||||
self.k.touch_member({'role': 'replica', 'state': PostgresqlState.INITDB})
|
||||
self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['foo'], 'bar')
|
||||
|
||||
self.k.touch_member({'state': 'running', 'role': 'replica'})
|
||||
self.k.touch_member({'state': PostgresqlState.RUNNING, 'role': 'replica'})
|
||||
self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['foo'], None)
|
||||
|
||||
self.k.touch_member({'role': 'replica', 'state': 'running custom bootstrap script'})
|
||||
self.k.touch_member({'role': 'replica', 'state': PostgresqlState.CUSTOM_BOOTSTRAP})
|
||||
self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['foo'], 'bar')
|
||||
|
||||
self.k.touch_member({'role': 'replica', 'state': 'starting after custom bootstrap'})
|
||||
self.k.touch_member({'role': 'replica', 'state': PostgresqlState.BOOTSTRAP_STARTING})
|
||||
self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['foo'], 'bar')
|
||||
|
||||
self.k.touch_member({'state': 'stopped', 'role': 'primary'})
|
||||
self.k.touch_member({'state': PostgresqlState.STOPPED, 'role': 'primary'})
|
||||
self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['foo'], None)
|
||||
|
||||
self.k._role_label = 'isMaster'
|
||||
@@ -342,23 +343,23 @@ class TestKubernetesConfigMaps(BaseTestKubernetes):
|
||||
self.k._standby_leader_label_value = 'false'
|
||||
self.k._tmp_role_label = 'tmp_role'
|
||||
|
||||
self.k.touch_member({'state': 'creating replica', 'role': 'replica'})
|
||||
self.k.touch_member({'state': PostgresqlState.CREATING_REPLICA, 'role': 'replica'})
|
||||
self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['foo'], 'bar')
|
||||
|
||||
self.k.touch_member({'state': 'running', 'role': 'replica'})
|
||||
self.k.touch_member({'state': PostgresqlState.RUNNING, 'role': 'replica'})
|
||||
self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['foo'], None)
|
||||
self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['isMaster'], 'false')
|
||||
self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['tmp_role'], 'replica')
|
||||
mock_patch_namespaced_pod.rest_mock()
|
||||
|
||||
self.k._name = 'p-0'
|
||||
self.k.touch_member({'state': 'running', 'role': 'standby_leader'})
|
||||
self.k.touch_member({'state': PostgresqlState.RUNNING, 'role': 'standby_leader'})
|
||||
mock_patch_namespaced_pod.assert_called()
|
||||
self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['isMaster'], 'false')
|
||||
self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['tmp_role'], 'primary')
|
||||
mock_patch_namespaced_pod.rest_mock()
|
||||
|
||||
self.k.touch_member({'state': 'running', 'role': 'primary'})
|
||||
self.k.touch_member({'state': PostgresqlState.RUNNING, 'role': 'primary'})
|
||||
mock_patch_namespaced_pod.assert_called()
|
||||
self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['isMaster'], 'true')
|
||||
self.assertEqual(mock_patch_namespaced_pod.call_args[0][2].metadata.labels['tmp_role'], 'primary')
|
||||
@@ -494,7 +495,7 @@ class TestKubernetesEndpoints(BaseTestKubernetes):
|
||||
self.assertEqual(('create_config_service failed',), mock_logger_exception.call_args[0])
|
||||
mock_logger_exception.reset_mock()
|
||||
|
||||
self.k.touch_member({'state': 'running', 'role': 'replica'})
|
||||
self.k.touch_member({'state': PostgresqlState.RUNNING, 'role': 'replica'})
|
||||
mock_logger_exception.assert_called_once()
|
||||
self.assertEqual(('create_config_service failed',), mock_logger_exception.call_args[0])
|
||||
|
||||
|
||||
@@ -20,6 +20,7 @@ from patroni.dcs.etcd import AbstractEtcdClientWithFailover
|
||||
from patroni.exceptions import DCSError
|
||||
from patroni.postgresql import Postgresql
|
||||
from patroni.postgresql.config import ConfigHandler
|
||||
from patroni.postgresql.misc import PostgresqlState
|
||||
|
||||
from . import psycopg_connect, SleepException
|
||||
from .test_etcd import etcd_read, etcd_write
|
||||
@@ -159,7 +160,7 @@ class TestPatroni(unittest.TestCase):
|
||||
@patch('patroni.config.Config.save_cache', Mock())
|
||||
@patch('patroni.config.Config.reload_local_configuration', Mock(return_value=True))
|
||||
@patch('patroni.ha.Ha.is_leader', Mock(return_value=True))
|
||||
@patch.object(Postgresql, 'state', PropertyMock(return_value='running'))
|
||||
@patch.object(Postgresql, 'state', PropertyMock(return_value=PostgresqlState.RUNNING))
|
||||
@patch.object(Postgresql, 'data_directory_empty', Mock(return_value=False))
|
||||
def test_run(self):
|
||||
self.p.postgresql.set_role('replica')
|
||||
|
||||
@@ -18,10 +18,11 @@ from patroni.async_executor import CriticalTask
|
||||
from patroni.collections import CaseInsensitiveDict, CaseInsensitiveSet
|
||||
from patroni.dcs import RemoteMember
|
||||
from patroni.exceptions import PatroniException, PostgresConnectionException
|
||||
from patroni.postgresql import Postgresql, STATE_NO_RESPONSE, STATE_REJECT
|
||||
from patroni.postgresql import PgIsReadyStatus, Postgresql
|
||||
from patroni.postgresql.bootstrap import Bootstrap
|
||||
from patroni.postgresql.callback_executor import CallbackAction
|
||||
from patroni.postgresql.config import _false_validator, get_param_diff
|
||||
from patroni.postgresql.misc import PostgresqlState
|
||||
from patroni.postgresql.postmaster import PostmasterProcess
|
||||
from patroni.postgresql.validator import _get_postgres_guc_validators, _load_postgres_gucs_validators, \
|
||||
_read_postgres_gucs_validators_file, Bool, Enum, EnumBool, Integer, InvalidGucValidatorsFile, \
|
||||
@@ -159,7 +160,7 @@ class TestPostgresql(BaseTestPostgresql):
|
||||
@patch.object(Postgresql, 'pg_isready')
|
||||
@patch('patroni.postgresql.polling_loop', Mock(return_value=range(1)))
|
||||
def test_wait_for_port_open(self, mock_pg_isready):
|
||||
mock_pg_isready.return_value = STATE_NO_RESPONSE
|
||||
mock_pg_isready.return_value = PgIsReadyStatus.NO_RESPONSE
|
||||
mock_postmaster = MockPostmaster()
|
||||
mock_postmaster.is_running.return_value = None
|
||||
|
||||
@@ -257,7 +258,7 @@ class TestPostgresql(BaseTestPostgresql):
|
||||
def test_restart(self):
|
||||
self.p.start = Mock(return_value=False)
|
||||
self.assertFalse(self.p.restart())
|
||||
self.assertEqual(self.p.state, 'restart failed (restarting)')
|
||||
self.assertEqual(self.p.state, PostgresqlState.RESTART_FAILED)
|
||||
|
||||
@patch('os.chmod', Mock())
|
||||
@patch('builtins.open', MagicMock())
|
||||
@@ -378,7 +379,7 @@ class TestPostgresql(BaseTestPostgresql):
|
||||
@patch.object(MockCursor, 'execute', Mock(side_effect=psycopg.OperationalError))
|
||||
def test__query(self):
|
||||
self.assertRaises(PostgresConnectionException, self.p._query, 'blabla')
|
||||
self.p._state = 'restarting'
|
||||
self.p._state = PostgresqlState.RESTARTING
|
||||
self.assertRaises(RetryFailedError, self.p._query, 'blabla')
|
||||
|
||||
def test_query(self):
|
||||
@@ -386,7 +387,7 @@ class TestPostgresql(BaseTestPostgresql):
|
||||
self.assertRaises(PostgresConnectionException, self.p.query, 'RetryFailedError')
|
||||
self.assertRaises(psycopg.ProgrammingError, self.p.query, 'blabla')
|
||||
|
||||
@patch.object(Postgresql, 'pg_isready', Mock(return_value=STATE_REJECT))
|
||||
@patch.object(Postgresql, 'pg_isready', Mock(return_value=PgIsReadyStatus.REJECT))
|
||||
def test_is_primary(self):
|
||||
self.assertTrue(self.p.is_primary())
|
||||
self.p.reset_cluster_info_state(None)
|
||||
@@ -744,33 +745,33 @@ class TestPostgresql(BaseTestPostgresql):
|
||||
|
||||
def test_check_for_startup(self):
|
||||
with patch('subprocess.call', return_value=0):
|
||||
self.p._state = 'starting'
|
||||
self.p._state = PostgresqlState.STARTING
|
||||
self.assertFalse(self.p.check_for_startup())
|
||||
self.assertEqual(self.p.state, 'running')
|
||||
self.assertEqual(self.p.state, PostgresqlState.RUNNING)
|
||||
|
||||
with patch('subprocess.call', return_value=1):
|
||||
self.p._state = 'starting'
|
||||
self.p._state = PostgresqlState.STARTING
|
||||
self.assertTrue(self.p.check_for_startup())
|
||||
self.assertEqual(self.p.state, 'starting')
|
||||
self.assertEqual(self.p.state, PostgresqlState.STARTING)
|
||||
|
||||
with patch('subprocess.call', return_value=2):
|
||||
self.p._state = 'starting'
|
||||
self.p._state = PostgresqlState.STARTING
|
||||
self.assertFalse(self.p.check_for_startup())
|
||||
self.assertEqual(self.p.state, 'start failed')
|
||||
self.assertEqual(self.p.state, PostgresqlState.START_FAILED)
|
||||
|
||||
with patch('subprocess.call', return_value=0):
|
||||
self.p._state = 'running'
|
||||
self.p._state = PostgresqlState.RUNNING
|
||||
self.assertFalse(self.p.check_for_startup())
|
||||
self.assertEqual(self.p.state, 'running')
|
||||
self.assertEqual(self.p.state, PostgresqlState.RUNNING)
|
||||
|
||||
with patch('subprocess.call', return_value=127):
|
||||
self.p._state = 'running'
|
||||
self.p._state = PostgresqlState.RUNNING
|
||||
self.assertFalse(self.p.check_for_startup())
|
||||
self.assertEqual(self.p.state, 'running')
|
||||
self.assertEqual(self.p.state, PostgresqlState.RUNNING)
|
||||
|
||||
self.p._state = 'starting'
|
||||
self.p._state = PostgresqlState.STARTING
|
||||
self.assertFalse(self.p.check_for_startup())
|
||||
self.assertEqual(self.p.state, 'running')
|
||||
self.assertEqual(self.p.state, PostgresqlState.RUNNING)
|
||||
|
||||
def test_wait_for_startup(self):
|
||||
state = {'sleeps': 0, 'num_rejects': 0, 'final_return': 0}
|
||||
@@ -793,21 +794,21 @@ class TestPostgresql(BaseTestPostgresql):
|
||||
with patch('time.sleep', side_effect=increment_sleeps):
|
||||
self.p.time_in_state = Mock(side_effect=time_in_state)
|
||||
|
||||
self.p._state = 'stopped'
|
||||
self.p._state = PostgresqlState.STOPPED
|
||||
self.assertTrue(self.p.wait_for_startup())
|
||||
self.assertEqual(state['sleeps'], 0)
|
||||
|
||||
self.p._state = 'starting'
|
||||
self.p._state = PostgresqlState.STARTING
|
||||
state['num_rejects'] = 5
|
||||
self.assertTrue(self.p.wait_for_startup())
|
||||
self.assertEqual(state['sleeps'], 5)
|
||||
|
||||
self.p._state = 'starting'
|
||||
self.p._state = PostgresqlState.STARTING
|
||||
state['sleeps'] = 0
|
||||
state['final_return'] = 2
|
||||
self.assertFalse(self.p.wait_for_startup())
|
||||
|
||||
self.p._state = 'starting'
|
||||
self.p._state = PostgresqlState.STARTING
|
||||
state['sleeps'] = 0
|
||||
state['final_return'] = 0
|
||||
self.assertFalse(self.p.wait_for_startup(timeout=2))
|
||||
@@ -815,7 +816,7 @@ class TestPostgresql(BaseTestPostgresql):
|
||||
|
||||
with patch.object(Postgresql, 'check_startup_state_changed', Mock(return_value=False)):
|
||||
self.p.cancellable.cancel()
|
||||
self.p._state = 'starting'
|
||||
self.p._state = PostgresqlState.STARTING
|
||||
self.assertIsNone(self.p.wait_for_startup())
|
||||
|
||||
def test_get_server_parameters(self):
|
||||
|
||||
@@ -8,7 +8,7 @@ from unittest.mock import Mock, patch, PropertyMock
|
||||
from patroni import global_config, psycopg
|
||||
from patroni.dcs import Cluster, ClusterConfig, Member, Status, SyncState
|
||||
from patroni.postgresql import Postgresql
|
||||
from patroni.postgresql.misc import fsync_dir
|
||||
from patroni.postgresql.misc import fsync_dir, PostgresqlState
|
||||
from patroni.postgresql.slots import SlotsAdvanceThread, SlotsHandler
|
||||
from patroni.tags import Tags
|
||||
|
||||
@@ -86,7 +86,7 @@ class TestSlotsHandler(BaseTestPostgresql):
|
||||
"""Test sync with a cascading replica so physical slots are present on a replica."""
|
||||
config = ClusterConfig(1, {'slots': {'ls': {'database': 'a', 'plugin': 'b'}}}, 1)
|
||||
cascading_replica = Member(0, 'test-2', 28, {
|
||||
'state': 'running', 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres',
|
||||
'state': PostgresqlState.RUNNING, 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres',
|
||||
'tags': {'replicatefrom': 'postgresql0'}
|
||||
})
|
||||
cluster = Cluster(True, config, self.leader, Status(0, {'ls': 10}, []),
|
||||
@@ -127,17 +127,17 @@ class TestSlotsHandler(BaseTestPostgresql):
|
||||
config = ClusterConfig(
|
||||
1, {'slots': {'foo': {'type': 'logical', 'database': 'a', 'plugin': 'b'}, 'bar': {'type': 'physical'}}}, 1)
|
||||
nostream_node = Member(0, 'test-2', 28, {
|
||||
'state': 'running', 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres',
|
||||
'state': PostgresqlState.RUNNING, 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres',
|
||||
'tags': {'nostream': 'True'},
|
||||
'xlog_location': 10,
|
||||
})
|
||||
cascade_node = Member(0, 'test-3', 28, {
|
||||
'state': 'running', 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres',
|
||||
'state': PostgresqlState.RUNNING, 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres',
|
||||
'tags': {'replicatefrom': 'test-2'},
|
||||
'xlog_location': 98
|
||||
})
|
||||
stream_node = Member(0, 'test-4', 28, {
|
||||
'state': 'running', 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres',
|
||||
'state': PostgresqlState.RUNNING, 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres',
|
||||
'xlog_location': 99})
|
||||
cluster = Cluster(
|
||||
True, config, self.leader, Status(100, {'leader': 99, 'test_2': 98, 'test_3': 97, 'test_4': 98}, []),
|
||||
@@ -193,11 +193,11 @@ class TestSlotsHandler(BaseTestPostgresql):
|
||||
|
||||
def test_get_slot_name_on_primary(self):
|
||||
node1 = Member(0, 'node1', 28, {
|
||||
'state': 'running', 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres',
|
||||
'state': PostgresqlState.RUNNING, 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres',
|
||||
'tags': {'replicatefrom': 'node2'}
|
||||
})
|
||||
node2 = Member(0, 'node2', 28, {
|
||||
'state': 'running', 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres',
|
||||
'state': PostgresqlState.RUNNING, 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres',
|
||||
'tags': {'replicatefrom': 'node1'}
|
||||
})
|
||||
cluster = Cluster(True, None, self.leader, Status.empty(), [self.leadermem, node1, node2],
|
||||
@@ -206,11 +206,11 @@ class TestSlotsHandler(BaseTestPostgresql):
|
||||
|
||||
def test_should_enforce_hot_standby_feedback(self):
|
||||
node1 = Member(0, 'postgresql0', 28, {
|
||||
'state': 'running', 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres',
|
||||
'state': PostgresqlState.RUNNING, 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres',
|
||||
'tags': {'replicatefrom': 'postgresql1'}
|
||||
})
|
||||
node2 = Member(0, 'postgresql1', 28, {
|
||||
'state': 'running', 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres',
|
||||
'state': PostgresqlState.RUNNING, 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres',
|
||||
'tags': {'replicatefrom': 'postgresql0'}
|
||||
})
|
||||
cluster = Cluster(True, None, self.leader, Status.empty(), [self.leadermem, node1, node2],
|
||||
|
||||
Reference in New Issue
Block a user