From d46ca88e6bb4b4788ec86689026c2d2e4da68c79 Mon Sep 17 00:00:00 2001 From: Alexander Kukushkin Date: Thu, 13 Jul 2023 09:24:20 +0200 Subject: [PATCH] Make it visible replication state on standbys (#2733) To do that we use `pg_stat_get_wal_receiver()` function, which is available since 9.6. For older versions the `patronictl list` output and REST API responses remain as before. In case if there is no wal receiver process we check if `restore_command` is set and show the state as `in archive recovery`. Example of `patronictl list` output: ```bash $ patronictl list + Cluster: batman -------------+---------+---------------------+----+-----------+ | Member | Host | Role | State | TL | Lag in MB | +-------------+----------------+---------+---------------------+----+-----------+ | postgresql0 | 127.0.0.1:5432 | Leader | running | 12 | | | postgresql1 | 127.0.0.1:5433 | Replica | in archive recovery | 12 | 0 | +-------------+----------------+---------+---------------------+----+-----------+ $ patronictl list + Cluster: batman -------------+---------+-----------+----+-----------+ | Member | Host | Role | State | TL | Lag in MB | +-------------+----------------+---------+-----------+----+-----------+ | postgresql0 | 127.0.0.1:5432 | Leader | running | 12 | | | postgresql1 | 127.0.0.1:5433 | Replica | streaming | 12 | 0 | +-------------+----------------+---------+-----------+----+-----------+ ``` Example of REST API response: ```bash $ curl -s localhost:8009 | jq . { "state": "running", "postmaster_start_time": "2023-07-06 13:12:00.595118+02:00", "role": "replica", "server_version": 150003, "xlog": { "received_location": 335544480, "replayed_location": 335544480, "replayed_timestamp": null, "paused": false }, "timeline": 12, "replication_state": "in archive recovery", "dcs_last_seen": 1688642069, "database_system_identifier": "7252327498286490579", "patroni": { "version": "3.0.3", "scope": "batman" } } $ curl -s localhost:8009 | jq . { "state": "running", "postmaster_start_time": "2023-07-06 13:12:00.595118+02:00", "role": "replica", "server_version": 150003, "xlog": { "received_location": 335544816, "replayed_location": 335544816, "replayed_timestamp": null, "paused": false }, "timeline": 12, "replication_state": "streaming", "dcs_last_seen": 1688642089, "database_system_identifier": "7252327498286490579", "patroni": { "version": "3.0.3", "scope": "batman" } } ``` --- docs/rest_api.rst | 10 +++++-- features/standby_cluster.feature | 10 +++++++ patroni/api.py | 25 ++++++++++++++-- patroni/ctl.py | 3 +- patroni/ha.py | 7 +++-- patroni/postgresql/__init__.py | 51 +++++++++++++++++++++++++++----- patroni/postgresql/config.py | 3 ++ patroni/utils.py | 3 +- tests/__init__.py | 4 +-- tests/test_api.py | 9 ++++-- tests/test_ha.py | 5 +++- tests/test_rewind.py | 7 +++-- tests/test_slots.py | 4 +-- 13 files changed, 115 insertions(+), 26 deletions(-) diff --git a/docs/rest_api.rst b/docs/rest_api.rst index 7bde651f..9deb3f5e 100644 --- a/docs/rest_api.rst +++ b/docs/rest_api.rst @@ -141,8 +141,8 @@ Retrieve the Patroni metrics in Prometheus format through the ``GET /metrics`` e # TYPE patroni_replica gauge patroni_replica{scope="batman"} 0 # HELP patroni_sync_standby Value is 1 if this node is a sync standby replica, 0 otherwise. - # TYPE patroni_sync_standby gauge - patroni_sync_standby{scope="batman"} 0 + # TYPE patroni_sync_standby gauge + patroni_sync_standby{scope="batman"} 0 # HELP patroni_xlog_received_location Current location of the received Postgres transaction log, 0 if this node is not a replica. # TYPE patroni_xlog_received_location counter patroni_xlog_received_location{scope="batman"} 0 @@ -155,6 +155,12 @@ Retrieve the Patroni metrics in Prometheus format through the ``GET /metrics`` e # HELP patroni_xlog_paused Value is 1 if the Postgres xlog is paused, 0 otherwise. # TYPE patroni_xlog_paused gauge patroni_xlog_paused{scope="batman"} 0 + # HELP patroni_postgres_streaming Value is 1 if Postgres is streaming, 0 otherwise. + # TYPE patroni_postgres_streaming gauge + patroni_postgres_streaming{scope="batman"} 1 + # HELP patroni_postgres_in_archive_recovery Value is 1 if Postgres is replicating from archive, 0 otherwise. + # TYPE patroni_postgres_in_archive_recovery gauge + patroni_postgres_in_archive_recovery{scope="batman"} 0 # HELP patroni_postgres_server_version Version of Postgres (if running), 0 otherwise. # TYPE patroni_postgres_server_version gauge patroni_postgres_server_version {scope="batman"} 140004 diff --git a/features/standby_cluster.feature b/features/standby_cluster.feature index 4e3bd5f0..850c7970 100644 --- a/features/standby_cluster.feature +++ b/features/standby_cluster.feature @@ -13,6 +13,10 @@ Feature: standby cluster When I start postgres0 Then "members/postgres0" key in DCS has state=running after 10 seconds And replication works from postgres1 to postgres0 after 15 seconds + When I issue a GET request to http://127.0.0.1:8008/patroni + Then I receive a response code 200 + And I receive a response replication_state streaming + And "members/postgres0" key in DCS has replication_state=streaming after 10 seconds @slot-advance Scenario: check permanent logical slots are synced to the replica @@ -34,6 +38,9 @@ Feature: standby cluster Then postgres1 is a leader of batman1 after 10 seconds When I add the table foo to postgres0 Then table foo is present on postgres1 after 20 seconds + When I issue a GET request to http://127.0.0.1:8009/patroni + Then I receive a response code 200 + And I receive a response replication_state streaming And I sleep for 3 seconds When I issue a GET request to http://127.0.0.1:8009/primary Then I receive a response code 503 @@ -44,6 +51,9 @@ Feature: standby cluster When I start postgres2 in a cluster batman1 Then postgres2 role is the replica after 24 seconds And table foo is present on postgres2 after 20 seconds + When I issue a GET request to http://127.0.0.1:8010/patroni + Then I receive a response code 200 + And I receive a response replication_state streaming And postgres1 does not have a logical replication slot named test_logical Scenario: check failover diff --git a/patroni/api.py b/patroni/api.py index 7be2a049..836a8673 100644 --- a/patroni/api.py +++ b/patroni/api.py @@ -535,6 +535,18 @@ class RestApiHandler(BaseHTTPRequestHandler): metrics.append("patroni_xlog_paused{0} {1}" .format(scope_label, int(postgres.get('xlog', {}).get('paused', False) is True))) + if postgres.get('server_version', 0) >= 90600: + metrics.append("# HELP patroni_postgres_streaming Value is 1 if Postgres is streaming, 0 otherwise.") + metrics.append("# TYPE patroni_postgres_streaming gauge") + metrics.append("patroni_postgres_streaming{0} {1}" + .format(scope_label, int(postgres.get('replication_state') == 'streaming'))) + + metrics.append("# HELP patroni_postgres_in_archive_recovery Value is 1" + " if Postgres is replicating from archive, 0 otherwise.") + metrics.append("# TYPE patroni_postgres_in_archive_recovery gauge") + metrics.append("patroni_postgres_in_archive_recovery{0} {1}" + .format(scope_label, int(postgres.get('replication_state') == 'in archive recovery'))) + metrics.append("# HELP patroni_postgres_server_version Version of Postgres (if running), 0 otherwise.") metrics.append("# TYPE patroni_postgres_server_version gauge") metrics.append("patroni_postgres_server_version {0} {1}".format(scope_label, postgres.get('server_version', 0))) @@ -1151,8 +1163,11 @@ class RestApiHandler(BaseHTTPRequestHandler): if postgresql.state not in ('running', 'restarting', 'starting'): raise RetryFailedError('') + replication_state = ('(pg_catalog.pg_stat_get_wal_receiver()).status' + if postgresql.major_version >= 90600 else 'NULL') + ", " +\ + ("pg_catalog.current_setting('restore_command')" if postgresql.major_version >= 120000 else "NULL") stmt = ("SELECT " + postgresql.POSTMASTER_START_TIME + ", " + postgresql.TL_LSN + "," - " pg_catalog.pg_last_xact_replay_timestamp()," + " pg_catalog.pg_last_xact_replay_timestamp(), " + replication_state + "," " pg_catalog.array_to_json(pg_catalog.array_agg(pg_catalog.row_to_json(ri))) " "FROM (SELECT (SELECT rolname FROM pg_catalog.pg_authid WHERE oid = usesysid) AS usename," " application_name, client_addr, w.state, sync_state, sync_priority" @@ -1188,8 +1203,12 @@ class RestApiHandler(BaseHTTPRequestHandler): if not cluster or cluster.is_unlocked() or not cluster.leader else cluster.leader.timeline result['timeline'] = postgresql.replica_cached_timeline(leader_timeline) - if row[7]: - result['replication'] = row[7] + replication_state = postgresql.replication_state_from_parameters(row[1] > 0, row[7], row[8]) + if replication_state: + result['replication_state'] = replication_state + + if row[9]: + result['replication'] = row[9] except (psycopg.Error, RetryFailedError, PostgresConnectionException): state = postgresql.state diff --git a/patroni/ctl.py b/patroni/ctl.py index 12463aad..55a054c1 100644 --- a/patroni/ctl.py +++ b/patroni/ctl.py @@ -1490,7 +1490,8 @@ def output_members(obj: Dict[str, Any], cluster: Cluster, name: str, * ``Role``: ``Leader``, ``Standby Leader``, ``Sync Standby`` or ``Replica``; * ``State``: ``stopping``, ``stopped``, ``stop failed``, ``crashed``, ``running``, ``starting``, ``start failed``, ``restarting``, ``restart failed``, ``initializing new cluster``, ``initdb failed``, - ``running custom bootstrap script``, ``custom bootstrap failed``, or ``creating replica``, and so on; + ``running custom bootstrap script``, ``custom bootstrap failed``, ``creating replica``, ``streaming``, + ``in archive recovery``, and so on; * ``TL``: current timeline in Postgres; ``Lag in MB``: replication lag. diff --git a/patroni/ha.py b/patroni/ha.py index 5f5de4fa..1148802a 100644 --- a/patroni/ha.py +++ b/patroni/ha.py @@ -306,10 +306,13 @@ class Ha(object): if self._async_executor.scheduled_action in (None, 'promote') \ and data['state'] in ['running', 'restarting', 'starting']: try: - timeline: Optional[int] timeline, wal_position, pg_control_timeline = self.state_handler.timeline_wal_position() data['xlog_location'] = wal_position - if not timeline: # try pg_stat_wal_receiver to get the timeline + if not timeline: # running as a standby + replication_state = self.state_handler.replication_state() + if replication_state: + data['replication_state'] = replication_state + # try pg_stat_wal_receiver to get the timeline timeline = self.state_handler.received_timeline() if not timeline: # So far the only way to get the current timeline on the standby is from diff --git a/patroni/postgresql/__init__.py b/patroni/postgresql/__init__.py index b2423c89..632cbee7 100644 --- a/patroni/postgresql/__init__.py +++ b/patroni/postgresql/__init__.py @@ -197,18 +197,19 @@ class Postgresql(object): and self.role in ('master', 'primary', 'promoted') else "'on', '', NULL") if self._major_version >= 90600: - extra = ("(SELECT pg_catalog.json_agg(s.*) FROM (SELECT slot_name, slot_type as type, datoid::bigint, " - "plugin, catalog_xmin, pg_catalog.pg_wal_lsn_diff(confirmed_flush_lsn, '0/0')::bigint" - " AS confirmed_flush_lsn FROM pg_catalog.pg_get_replication_slots()) AS s)" - if self._has_permanent_logical_slots and self._major_version >= 110000 else "NULL") + extra + extra = ("pg_catalog.current_setting('restore_command')" if self._major_version >= 120000 else "NULL") +\ + ", " + ("(SELECT pg_catalog.json_agg(s.*) FROM (SELECT slot_name, slot_type as type, datoid::bigint, " + "plugin, catalog_xmin, pg_catalog.pg_wal_lsn_diff(confirmed_flush_lsn, '0/0')::bigint" + " AS confirmed_flush_lsn FROM pg_catalog.pg_get_replication_slots()) AS s)" + if self._has_permanent_logical_slots and self._major_version >= 110000 else "NULL") + extra extra = (", CASE WHEN latest_end_lsn IS NULL THEN NULL ELSE received_tli END," - " slot_name, conninfo, {0} FROM pg_catalog.pg_stat_get_wal_receiver()").format(extra) + " slot_name, conninfo, status, {0} FROM pg_catalog.pg_stat_get_wal_receiver()").format(extra) if self.role == 'standby_leader': extra = "timeline_id" + extra + ", pg_catalog.pg_control_checkpoint()" else: extra = "0" + extra else: - extra = "0, NULL, NULL, NULL, NULL" + extra + extra = "0, NULL, NULL, NULL, NULL, NULL, NULL" + extra return ("SELECT " + self.TL_LSN + ", {2}").format(self.wal_name, self.lsn_name, extra) @@ -426,7 +427,8 @@ class Postgresql(object): result = self._is_leader_retry(self._query, self.cluster_info_query).fetchone() cluster_info_state = dict(zip(['timeline', 'wal_position', 'replayed_location', 'received_location', 'replay_paused', 'pg_control_timeline', - 'received_tli', 'slot_name', 'conninfo', 'slots', 'synchronous_commit', + 'received_tli', 'slot_name', 'conninfo', 'receiver_state', + 'restore_command', 'slots', 'synchronous_commit', 'synchronous_standby_names', 'pg_stat_replication'], result)) if self._has_permanent_logical_slots: cluster_info_state['slots'] =\ @@ -472,6 +474,41 @@ class Postgresql(object): """:returns: a result set of 'SELECT * FROM pg_stat_replication'.""" return self._cluster_info_state_get('pg_stat_replication') or [] + def replication_state_from_parameters(self, is_leader: bool, receiver_state: Optional[str], + restore_command: Optional[str]) -> Optional[str]: + """Figure out the replication state from input parameters. + + .. note:: + This method could be only called when Postgres is up, running and queries are successfuly executed. + + :is_leader: `True` is postgres is not running in recovery + :receiver_state: value from `pg_stat_get_wal_receiver.state` or None if Postgres is older than 9.6 + :restore_command: value of ``restore_command`` GUC for PostgreSQL 12+ or + `postgresql.recovery_conf.restore_command` if it is set in Patroni configuration + + :returns: - `None` for the primary and for Postgres older than 9.6; + - 'streaming' if replica is streaming according to the `pg_stat_wal_receiver` view; + - 'in archive recovery' if replica isn't streaming and there is a `restore_command` + """ + if self._major_version >= 90600 and not is_leader: + if receiver_state == 'streaming': + return 'streaming' + # For Postgres older than 12 we get `restore_command` from Patroni config, otherwise we check GUC + if self._major_version < 120000 and self.config.restore_command() or restore_command: + return 'in archive recovery' + + def replication_state(self) -> Optional[str]: + """Checks replication state from `pg_stat_get_wal_receiver()`. + + .. note:: + Available only since 9.6 + + :returns: ``streaming``, ``in archive recovery``, or ``None`` + """ + return self.replication_state_from_parameters(self.is_leader(), + self._cluster_info_state_get('receiver_state'), + self._cluster_info_state_get('restore_command')) + def is_leader(self) -> bool: try: return bool(self._cluster_info_state_get('timeline')) diff --git a/patroni/postgresql/config.py b/patroni/postgresql/config.py index ef3f5c36..9461714f 100644 --- a/patroni/postgresql/config.py +++ b/patroni/postgresql/config.py @@ -1177,3 +1177,6 @@ class ConfigHandler(object): def get(self, key: str, default: Optional[Any] = None) -> Optional[Any]: return self._config.get(key, default) + + def restore_command(self) -> Optional[str]: + return (self.get('recovery_conf') or {}).get('restore_command') diff --git a/patroni/utils.py b/patroni/utils.py index ee9a3d21..b7694d17 100644 --- a/patroni/utils.py +++ b/patroni/utils.py @@ -773,7 +773,8 @@ def cluster_as_json(cluster: 'Cluster', global_config: Optional['GlobalConfig'] else: role = 'replica' - member = {'name': m.name, 'role': role, 'state': m.data.get('state', ''), 'api_url': m.api_url} + state = (m.data.get('replication_state', '') if role != 'leader' else '') or m.data.get('state', '') + member = {'name': m.name, 'role': role, 'state': state, 'api_url': m.api_url} conn_kwargs = m.conn_kwargs() if conn_kwargs.get('host'): member['host'] = conn_kwargs['host'] diff --git a/tests/__init__.py b/tests/__init__.py index 03598ae1..dcf9ed9e 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -108,7 +108,7 @@ class MockCursor(object): elif sql.startswith('WITH slots AS (SELECT slot_name, active'): self.results = [(False, True)] if self.rowcount == 1 else [None] elif sql.startswith('SELECT CASE WHEN pg_catalog.pg_is_in_recovery()'): - self.results = [(1, 2, 1, 0, False, 1, 1, None, None, + self.results = [(1, 2, 1, 0, False, 1, 1, None, None, 'streaming', '', [{"slot_name": "ls", "confirmed_flush_lsn": 12345}], 'on', 'n1', None)] elif sql.startswith('SELECT pg_catalog.pg_is_in_recovery()'): @@ -117,7 +117,7 @@ class MockCursor(object): replication_info = '[{"application_name":"walreceiver","client_addr":"1.2.3.4",' +\ '"state":"streaming","sync_state":"async","sync_priority":0}]' now = datetime.datetime.now(tzutc) - self.results = [(now, 0, '', 0, '', False, now, replication_info)] + self.results = [(now, 0, '', 0, '', False, now, 'streaming', None, replication_info)] elif sql.startswith('SELECT name, setting'): self.results = [('wal_segment_size', '2048', '8kB', 'integer', 'internal'), ('wal_block_size', '8192', None, 'integer', 'internal'), diff --git a/tests/test_api.py b/tests/test_api.py index 25342dca..f9d0b1c1 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -29,7 +29,8 @@ class MockPostgresql(object): name = 'test' state = 'running' role = 'primary' - server_version = '999999' + server_version = 90625 + major_version = 90600 sysid = 'dummysysid' scope = 'dummy' pending_restart = True @@ -55,6 +56,10 @@ class MockPostgresql(object): def is_running(): return True + @staticmethod + def replication_state_from_parameters(*args): + return 'streaming' + class MockWatchdog(object): is_healthy = False @@ -219,7 +224,7 @@ class TestRestApiHandler(unittest.TestCase): with patch.object(MockHa, 'restart_scheduled', Mock(return_value=True)): MockRestApiServer(RestApiHandler, 'GET /primary') self.assertIsNotNone(MockRestApiServer(RestApiHandler, 'GET /primary')) - with patch.object(RestApiServer, 'query', Mock(return_value=[('', 1, '', '', '', '', False, '')])): + with patch.object(RestApiServer, 'query', Mock(return_value=[('', 1, '', '', '', '', False, None, None, '')])): self.assertIsNotNone(MockRestApiServer(RestApiHandler, 'GET /patroni')) with patch.object(GlobalConfig, 'is_standby_cluster', Mock(return_value=True)),\ patch.object(GlobalConfig, 'is_paused', Mock(return_value=True)): diff --git a/tests/test_ha.py b/tests/test_ha.py index a621b03b..dad9562f 100644 --- a/tests/test_ha.py +++ b/tests/test_ha.py @@ -223,9 +223,12 @@ class TestHa(PostgresInit): @patch.object(Postgresql, 'received_timeline', Mock(return_value=None)) def test_touch_member(self): + self.p._major_version = 110000 + self.p.is_leader = false self.p.timeline_wal_position = Mock(return_value=(0, 1, 0)) self.p.replica_cached_timeline = Mock(side_effect=Exception) - self.ha.touch_member() + with patch.object(Postgresql, '_cluster_info_state_get', Mock(return_value='streaming')): + self.ha.touch_member() self.p.timeline_wal_position = Mock(return_value=(0, 1, 1)) self.p.set_role('standby_leader') self.ha.touch_member() diff --git a/tests/test_rewind.py b/tests/test_rewind.py index 3d921621..b5858d8e 100644 --- a/tests/test_rewind.py +++ b/tests/test_rewind.py @@ -91,9 +91,10 @@ class TestRewind(BaseTestPostgresql): 'Latest checkpoint location': '0/'})): self.r.rewind_or_reinitialize_needed_and_possible(self.leader) - with patch.object(Postgresql, 'is_running', Mock(return_value=True)): - with patch.object(MockCursor, 'fetchone', Mock(side_effect=[(0, 0, 1, 1, 0, 0, 0, 0, 0, None), Exception])): - self.r.rewind_or_reinitialize_needed_and_possible(self.leader) + with patch.object(Postgresql, 'is_running', Mock(return_value=True)),\ + patch.object(MockCursor, 'fetchone', + Mock(side_effect=[(0, 0, 1, 1, 0, 0, 0, 0, 0, None, None, None), Exception])): + self.r.rewind_or_reinitialize_needed_and_possible(self.leader) @patch.object(CancellableSubprocess, 'call', mock_cancellable_call) @patch.object(Postgresql, 'checkpoint', side_effect=['', '1'],) diff --git a/tests/test_slots.py b/tests/test_slots.py index bdb4bc5d..a962dbe3 100644 --- a/tests/test_slots.py +++ b/tests/test_slots.py @@ -77,14 +77,14 @@ class TestSlotsHandler(BaseTestPostgresql): with patch.object(Postgresql, '_query') as mock_query: self.p.reset_cluster_info_state(None) mock_query.return_value.fetchone.return_value = ( - 1, 0, 0, 0, 0, 0, 0, 0, 0, + 1, 0, 0, 0, 0, 0, 0, 0, 0, None, None, [{"slot_name": "ls", "type": "logical", "datoid": 5, "plugin": "b", "confirmed_flush_lsn": 12345, "catalog_xmin": 105}]) self.assertEqual(self.p.slots(), {'ls': 12345}) self.p.reset_cluster_info_state(None) mock_query.return_value.fetchone.return_value = ( - 1, 0, 0, 0, 0, 0, 0, 0, 0, + 1, 0, 0, 0, 0, 0, 0, 0, 0, None, None, [{"slot_name": "ls", "type": "logical", "datoid": 6, "plugin": "b", "confirmed_flush_lsn": 12345, "catalog_xmin": 105}]) self.assertEqual(self.p.slots(), {})