Make it visible replication state on standbys (#2733)

To do that we use `pg_stat_get_wal_receiver()` function, which is available since 9.6. For older versions the `patronictl list` output and REST API responses remain as before.

In case if there is no wal receiver process we check if `restore_command` is set and show the state as `in archive recovery`.

Example of `patronictl list` output:
```bash
$ patronictl list
+ Cluster: batman -------------+---------+---------------------+----+-----------+
| Member      | Host           | Role    | State               | TL | Lag in MB |
+-------------+----------------+---------+---------------------+----+-----------+
| postgresql0 | 127.0.0.1:5432 | Leader  | running             | 12 |           |
| postgresql1 | 127.0.0.1:5433 | Replica | in archive recovery | 12 |         0 |
+-------------+----------------+---------+---------------------+----+-----------+

$ patronictl list
+ Cluster: batman -------------+---------+-----------+----+-----------+
| Member      | Host           | Role    | State     | TL | Lag in MB |
+-------------+----------------+---------+-----------+----+-----------+
| postgresql0 | 127.0.0.1:5432 | Leader  | running   | 12 |           |
| postgresql1 | 127.0.0.1:5433 | Replica | streaming | 12 |         0 |
+-------------+----------------+---------+-----------+----+-----------+
```

Example of REST API response:
```bash
$ curl -s localhost:8009 | jq .
{
  "state": "running",
  "postmaster_start_time": "2023-07-06 13:12:00.595118+02:00",
  "role": "replica",
  "server_version": 150003,
  "xlog": {
    "received_location": 335544480,
    "replayed_location": 335544480,
    "replayed_timestamp": null,
    "paused": false
  },
  "timeline": 12,
  "replication_state": "in archive recovery",
  "dcs_last_seen": 1688642069,
  "database_system_identifier": "7252327498286490579",
  "patroni": {
    "version": "3.0.3",
    "scope": "batman"
  }
}

$ curl -s localhost:8009 | jq .
{
  "state": "running",
  "postmaster_start_time": "2023-07-06 13:12:00.595118+02:00",
  "role": "replica",
  "server_version": 150003,
  "xlog": {
    "received_location": 335544816,
    "replayed_location": 335544816,
    "replayed_timestamp": null,
    "paused": false
  },
  "timeline": 12,
  "replication_state": "streaming",
  "dcs_last_seen": 1688642089,
  "database_system_identifier": "7252327498286490579",
  "patroni": {
    "version": "3.0.3",
    "scope": "batman"
  }
}
```
This commit is contained in:
Alexander Kukushkin
2023-07-13 09:24:20 +02:00
committed by GitHub
parent 665f49b320
commit d46ca88e6b
13 changed files with 115 additions and 26 deletions

View File

@@ -141,8 +141,8 @@ Retrieve the Patroni metrics in Prometheus format through the ``GET /metrics`` e
# TYPE patroni_replica gauge
patroni_replica{scope="batman"} 0
# HELP patroni_sync_standby Value is 1 if this node is a sync standby replica, 0 otherwise.
# TYPE patroni_sync_standby gauge
patroni_sync_standby{scope="batman"} 0
# TYPE patroni_sync_standby gauge
patroni_sync_standby{scope="batman"} 0
# HELP patroni_xlog_received_location Current location of the received Postgres transaction log, 0 if this node is not a replica.
# TYPE patroni_xlog_received_location counter
patroni_xlog_received_location{scope="batman"} 0
@@ -155,6 +155,12 @@ Retrieve the Patroni metrics in Prometheus format through the ``GET /metrics`` e
# HELP patroni_xlog_paused Value is 1 if the Postgres xlog is paused, 0 otherwise.
# TYPE patroni_xlog_paused gauge
patroni_xlog_paused{scope="batman"} 0
# HELP patroni_postgres_streaming Value is 1 if Postgres is streaming, 0 otherwise.
# TYPE patroni_postgres_streaming gauge
patroni_postgres_streaming{scope="batman"} 1
# HELP patroni_postgres_in_archive_recovery Value is 1 if Postgres is replicating from archive, 0 otherwise.
# TYPE patroni_postgres_in_archive_recovery gauge
patroni_postgres_in_archive_recovery{scope="batman"} 0
# HELP patroni_postgres_server_version Version of Postgres (if running), 0 otherwise.
# TYPE patroni_postgres_server_version gauge
patroni_postgres_server_version {scope="batman"} 140004

View File

@@ -13,6 +13,10 @@ Feature: standby cluster
When I start postgres0
Then "members/postgres0" key in DCS has state=running after 10 seconds
And replication works from postgres1 to postgres0 after 15 seconds
When I issue a GET request to http://127.0.0.1:8008/patroni
Then I receive a response code 200
And I receive a response replication_state streaming
And "members/postgres0" key in DCS has replication_state=streaming after 10 seconds
@slot-advance
Scenario: check permanent logical slots are synced to the replica
@@ -34,6 +38,9 @@ Feature: standby cluster
Then postgres1 is a leader of batman1 after 10 seconds
When I add the table foo to postgres0
Then table foo is present on postgres1 after 20 seconds
When I issue a GET request to http://127.0.0.1:8009/patroni
Then I receive a response code 200
And I receive a response replication_state streaming
And I sleep for 3 seconds
When I issue a GET request to http://127.0.0.1:8009/primary
Then I receive a response code 503
@@ -44,6 +51,9 @@ Feature: standby cluster
When I start postgres2 in a cluster batman1
Then postgres2 role is the replica after 24 seconds
And table foo is present on postgres2 after 20 seconds
When I issue a GET request to http://127.0.0.1:8010/patroni
Then I receive a response code 200
And I receive a response replication_state streaming
And postgres1 does not have a logical replication slot named test_logical
Scenario: check failover

View File

@@ -535,6 +535,18 @@ class RestApiHandler(BaseHTTPRequestHandler):
metrics.append("patroni_xlog_paused{0} {1}"
.format(scope_label, int(postgres.get('xlog', {}).get('paused', False) is True)))
if postgres.get('server_version', 0) >= 90600:
metrics.append("# HELP patroni_postgres_streaming Value is 1 if Postgres is streaming, 0 otherwise.")
metrics.append("# TYPE patroni_postgres_streaming gauge")
metrics.append("patroni_postgres_streaming{0} {1}"
.format(scope_label, int(postgres.get('replication_state') == 'streaming')))
metrics.append("# HELP patroni_postgres_in_archive_recovery Value is 1"
" if Postgres is replicating from archive, 0 otherwise.")
metrics.append("# TYPE patroni_postgres_in_archive_recovery gauge")
metrics.append("patroni_postgres_in_archive_recovery{0} {1}"
.format(scope_label, int(postgres.get('replication_state') == 'in archive recovery')))
metrics.append("# HELP patroni_postgres_server_version Version of Postgres (if running), 0 otherwise.")
metrics.append("# TYPE patroni_postgres_server_version gauge")
metrics.append("patroni_postgres_server_version {0} {1}".format(scope_label, postgres.get('server_version', 0)))
@@ -1151,8 +1163,11 @@ class RestApiHandler(BaseHTTPRequestHandler):
if postgresql.state not in ('running', 'restarting', 'starting'):
raise RetryFailedError('')
replication_state = ('(pg_catalog.pg_stat_get_wal_receiver()).status'
if postgresql.major_version >= 90600 else 'NULL') + ", " +\
("pg_catalog.current_setting('restore_command')" if postgresql.major_version >= 120000 else "NULL")
stmt = ("SELECT " + postgresql.POSTMASTER_START_TIME + ", " + postgresql.TL_LSN + ","
" pg_catalog.pg_last_xact_replay_timestamp(),"
" pg_catalog.pg_last_xact_replay_timestamp(), " + replication_state + ","
" pg_catalog.array_to_json(pg_catalog.array_agg(pg_catalog.row_to_json(ri))) "
"FROM (SELECT (SELECT rolname FROM pg_catalog.pg_authid WHERE oid = usesysid) AS usename,"
" application_name, client_addr, w.state, sync_state, sync_priority"
@@ -1188,8 +1203,12 @@ class RestApiHandler(BaseHTTPRequestHandler):
if not cluster or cluster.is_unlocked() or not cluster.leader else cluster.leader.timeline
result['timeline'] = postgresql.replica_cached_timeline(leader_timeline)
if row[7]:
result['replication'] = row[7]
replication_state = postgresql.replication_state_from_parameters(row[1] > 0, row[7], row[8])
if replication_state:
result['replication_state'] = replication_state
if row[9]:
result['replication'] = row[9]
except (psycopg.Error, RetryFailedError, PostgresConnectionException):
state = postgresql.state

View File

@@ -1490,7 +1490,8 @@ def output_members(obj: Dict[str, Any], cluster: Cluster, name: str,
* ``Role``: ``Leader``, ``Standby Leader``, ``Sync Standby`` or ``Replica``;
* ``State``: ``stopping``, ``stopped``, ``stop failed``, ``crashed``, ``running``, ``starting``,
``start failed``, ``restarting``, ``restart failed``, ``initializing new cluster``, ``initdb failed``,
``running custom bootstrap script``, ``custom bootstrap failed``, or ``creating replica``, and so on;
``running custom bootstrap script``, ``custom bootstrap failed``, ``creating replica``, ``streaming``,
``in archive recovery``, and so on;
* ``TL``: current timeline in Postgres;
``Lag in MB``: replication lag.

View File

@@ -306,10 +306,13 @@ class Ha(object):
if self._async_executor.scheduled_action in (None, 'promote') \
and data['state'] in ['running', 'restarting', 'starting']:
try:
timeline: Optional[int]
timeline, wal_position, pg_control_timeline = self.state_handler.timeline_wal_position()
data['xlog_location'] = wal_position
if not timeline: # try pg_stat_wal_receiver to get the timeline
if not timeline: # running as a standby
replication_state = self.state_handler.replication_state()
if replication_state:
data['replication_state'] = replication_state
# try pg_stat_wal_receiver to get the timeline
timeline = self.state_handler.received_timeline()
if not timeline:
# So far the only way to get the current timeline on the standby is from

View File

@@ -197,18 +197,19 @@ class Postgresql(object):
and self.role in ('master', 'primary', 'promoted') else "'on', '', NULL")
if self._major_version >= 90600:
extra = ("(SELECT pg_catalog.json_agg(s.*) FROM (SELECT slot_name, slot_type as type, datoid::bigint, "
"plugin, catalog_xmin, pg_catalog.pg_wal_lsn_diff(confirmed_flush_lsn, '0/0')::bigint"
" AS confirmed_flush_lsn FROM pg_catalog.pg_get_replication_slots()) AS s)"
if self._has_permanent_logical_slots and self._major_version >= 110000 else "NULL") + extra
extra = ("pg_catalog.current_setting('restore_command')" if self._major_version >= 120000 else "NULL") +\
", " + ("(SELECT pg_catalog.json_agg(s.*) FROM (SELECT slot_name, slot_type as type, datoid::bigint, "
"plugin, catalog_xmin, pg_catalog.pg_wal_lsn_diff(confirmed_flush_lsn, '0/0')::bigint"
" AS confirmed_flush_lsn FROM pg_catalog.pg_get_replication_slots()) AS s)"
if self._has_permanent_logical_slots and self._major_version >= 110000 else "NULL") + extra
extra = (", CASE WHEN latest_end_lsn IS NULL THEN NULL ELSE received_tli END,"
" slot_name, conninfo, {0} FROM pg_catalog.pg_stat_get_wal_receiver()").format(extra)
" slot_name, conninfo, status, {0} FROM pg_catalog.pg_stat_get_wal_receiver()").format(extra)
if self.role == 'standby_leader':
extra = "timeline_id" + extra + ", pg_catalog.pg_control_checkpoint()"
else:
extra = "0" + extra
else:
extra = "0, NULL, NULL, NULL, NULL" + extra
extra = "0, NULL, NULL, NULL, NULL, NULL, NULL" + extra
return ("SELECT " + self.TL_LSN + ", {2}").format(self.wal_name, self.lsn_name, extra)
@@ -426,7 +427,8 @@ class Postgresql(object):
result = self._is_leader_retry(self._query, self.cluster_info_query).fetchone()
cluster_info_state = dict(zip(['timeline', 'wal_position', 'replayed_location',
'received_location', 'replay_paused', 'pg_control_timeline',
'received_tli', 'slot_name', 'conninfo', 'slots', 'synchronous_commit',
'received_tli', 'slot_name', 'conninfo', 'receiver_state',
'restore_command', 'slots', 'synchronous_commit',
'synchronous_standby_names', 'pg_stat_replication'], result))
if self._has_permanent_logical_slots:
cluster_info_state['slots'] =\
@@ -472,6 +474,41 @@ class Postgresql(object):
""":returns: a result set of 'SELECT * FROM pg_stat_replication'."""
return self._cluster_info_state_get('pg_stat_replication') or []
def replication_state_from_parameters(self, is_leader: bool, receiver_state: Optional[str],
restore_command: Optional[str]) -> Optional[str]:
"""Figure out the replication state from input parameters.
.. note::
This method could be only called when Postgres is up, running and queries are successfuly executed.
:is_leader: `True` is postgres is not running in recovery
:receiver_state: value from `pg_stat_get_wal_receiver.state` or None if Postgres is older than 9.6
:restore_command: value of ``restore_command`` GUC for PostgreSQL 12+ or
`postgresql.recovery_conf.restore_command` if it is set in Patroni configuration
:returns: - `None` for the primary and for Postgres older than 9.6;
- 'streaming' if replica is streaming according to the `pg_stat_wal_receiver` view;
- 'in archive recovery' if replica isn't streaming and there is a `restore_command`
"""
if self._major_version >= 90600 and not is_leader:
if receiver_state == 'streaming':
return 'streaming'
# For Postgres older than 12 we get `restore_command` from Patroni config, otherwise we check GUC
if self._major_version < 120000 and self.config.restore_command() or restore_command:
return 'in archive recovery'
def replication_state(self) -> Optional[str]:
"""Checks replication state from `pg_stat_get_wal_receiver()`.
.. note::
Available only since 9.6
:returns: ``streaming``, ``in archive recovery``, or ``None``
"""
return self.replication_state_from_parameters(self.is_leader(),
self._cluster_info_state_get('receiver_state'),
self._cluster_info_state_get('restore_command'))
def is_leader(self) -> bool:
try:
return bool(self._cluster_info_state_get('timeline'))

View File

@@ -1177,3 +1177,6 @@ class ConfigHandler(object):
def get(self, key: str, default: Optional[Any] = None) -> Optional[Any]:
return self._config.get(key, default)
def restore_command(self) -> Optional[str]:
return (self.get('recovery_conf') or {}).get('restore_command')

View File

@@ -773,7 +773,8 @@ def cluster_as_json(cluster: 'Cluster', global_config: Optional['GlobalConfig']
else:
role = 'replica'
member = {'name': m.name, 'role': role, 'state': m.data.get('state', ''), 'api_url': m.api_url}
state = (m.data.get('replication_state', '') if role != 'leader' else '') or m.data.get('state', '')
member = {'name': m.name, 'role': role, 'state': state, 'api_url': m.api_url}
conn_kwargs = m.conn_kwargs()
if conn_kwargs.get('host'):
member['host'] = conn_kwargs['host']

View File

@@ -108,7 +108,7 @@ class MockCursor(object):
elif sql.startswith('WITH slots AS (SELECT slot_name, active'):
self.results = [(False, True)] if self.rowcount == 1 else [None]
elif sql.startswith('SELECT CASE WHEN pg_catalog.pg_is_in_recovery()'):
self.results = [(1, 2, 1, 0, False, 1, 1, None, None,
self.results = [(1, 2, 1, 0, False, 1, 1, None, None, 'streaming', '',
[{"slot_name": "ls", "confirmed_flush_lsn": 12345}],
'on', 'n1', None)]
elif sql.startswith('SELECT pg_catalog.pg_is_in_recovery()'):
@@ -117,7 +117,7 @@ class MockCursor(object):
replication_info = '[{"application_name":"walreceiver","client_addr":"1.2.3.4",' +\
'"state":"streaming","sync_state":"async","sync_priority":0}]'
now = datetime.datetime.now(tzutc)
self.results = [(now, 0, '', 0, '', False, now, replication_info)]
self.results = [(now, 0, '', 0, '', False, now, 'streaming', None, replication_info)]
elif sql.startswith('SELECT name, setting'):
self.results = [('wal_segment_size', '2048', '8kB', 'integer', 'internal'),
('wal_block_size', '8192', None, 'integer', 'internal'),

View File

@@ -29,7 +29,8 @@ class MockPostgresql(object):
name = 'test'
state = 'running'
role = 'primary'
server_version = '999999'
server_version = 90625
major_version = 90600
sysid = 'dummysysid'
scope = 'dummy'
pending_restart = True
@@ -55,6 +56,10 @@ class MockPostgresql(object):
def is_running():
return True
@staticmethod
def replication_state_from_parameters(*args):
return 'streaming'
class MockWatchdog(object):
is_healthy = False
@@ -219,7 +224,7 @@ class TestRestApiHandler(unittest.TestCase):
with patch.object(MockHa, 'restart_scheduled', Mock(return_value=True)):
MockRestApiServer(RestApiHandler, 'GET /primary')
self.assertIsNotNone(MockRestApiServer(RestApiHandler, 'GET /primary'))
with patch.object(RestApiServer, 'query', Mock(return_value=[('', 1, '', '', '', '', False, '')])):
with patch.object(RestApiServer, 'query', Mock(return_value=[('', 1, '', '', '', '', False, None, None, '')])):
self.assertIsNotNone(MockRestApiServer(RestApiHandler, 'GET /patroni'))
with patch.object(GlobalConfig, 'is_standby_cluster', Mock(return_value=True)),\
patch.object(GlobalConfig, 'is_paused', Mock(return_value=True)):

View File

@@ -223,9 +223,12 @@ class TestHa(PostgresInit):
@patch.object(Postgresql, 'received_timeline', Mock(return_value=None))
def test_touch_member(self):
self.p._major_version = 110000
self.p.is_leader = false
self.p.timeline_wal_position = Mock(return_value=(0, 1, 0))
self.p.replica_cached_timeline = Mock(side_effect=Exception)
self.ha.touch_member()
with patch.object(Postgresql, '_cluster_info_state_get', Mock(return_value='streaming')):
self.ha.touch_member()
self.p.timeline_wal_position = Mock(return_value=(0, 1, 1))
self.p.set_role('standby_leader')
self.ha.touch_member()

View File

@@ -91,9 +91,10 @@ class TestRewind(BaseTestPostgresql):
'Latest checkpoint location': '0/'})):
self.r.rewind_or_reinitialize_needed_and_possible(self.leader)
with patch.object(Postgresql, 'is_running', Mock(return_value=True)):
with patch.object(MockCursor, 'fetchone', Mock(side_effect=[(0, 0, 1, 1, 0, 0, 0, 0, 0, None), Exception])):
self.r.rewind_or_reinitialize_needed_and_possible(self.leader)
with patch.object(Postgresql, 'is_running', Mock(return_value=True)),\
patch.object(MockCursor, 'fetchone',
Mock(side_effect=[(0, 0, 1, 1, 0, 0, 0, 0, 0, None, None, None), Exception])):
self.r.rewind_or_reinitialize_needed_and_possible(self.leader)
@patch.object(CancellableSubprocess, 'call', mock_cancellable_call)
@patch.object(Postgresql, 'checkpoint', side_effect=['', '1'],)

View File

@@ -77,14 +77,14 @@ class TestSlotsHandler(BaseTestPostgresql):
with patch.object(Postgresql, '_query') as mock_query:
self.p.reset_cluster_info_state(None)
mock_query.return_value.fetchone.return_value = (
1, 0, 0, 0, 0, 0, 0, 0, 0,
1, 0, 0, 0, 0, 0, 0, 0, 0, None, None,
[{"slot_name": "ls", "type": "logical", "datoid": 5, "plugin": "b",
"confirmed_flush_lsn": 12345, "catalog_xmin": 105}])
self.assertEqual(self.p.slots(), {'ls': 12345})
self.p.reset_cluster_info_state(None)
mock_query.return_value.fetchone.return_value = (
1, 0, 0, 0, 0, 0, 0, 0, 0,
1, 0, 0, 0, 0, 0, 0, 0, 0, None, None,
[{"slot_name": "ls", "type": "logical", "datoid": 6, "plugin": "b",
"confirmed_flush_lsn": 12345, "catalog_xmin": 105}])
self.assertEqual(self.p.slots(), {})