Quorum based failover (#2668)

To enable quorum commit:
```diff
$ patronictl.py edit-config
--- 
+++ 
@@ -5,3 +5,4 @@
   use_pg_rewind: true
 retry_timeout: 10
 ttl: 30
+synchronous_mode: quorum

Apply these changes? [y/N]: y
Configuration changed
```

By default Patroni will use `ANY 1(list,of,stanbys)` in `synchronous_standby_names`. That is, only one node out of listed replicas will be used for quorum.
If you want to increase the number of quorum nodes it is possible to do it with:
```diff
$ patronictl edit-config
--- 
+++ 
@@ -6,3 +6,4 @@
 retry_timeout: 10
 synchronous_mode: quorum
 ttl: 30
+synchronous_node_count: 2

Apply these changes? [y/N]: y
Configuration changed
```

Good old `synchronous_mode: on` is still supported.

Close https://github.com/patroni/patroni/issues/664
Close https://github.com/zalando/patroni/pull/672
This commit is contained in:
Alexander Kukushkin
2024-08-13 08:51:01 +02:00
committed by GitHub
parent 56dba93c55
commit 384705ad97
23 changed files with 1878 additions and 241 deletions

View File

@@ -34,6 +34,8 @@ There are only a few simple rules you need to follow:
After that you just need to start Patroni and it will handle the rest:
0. Patroni will set ``bootstrap.dcs.synchronous_mode`` to :ref:`quorum <quorum_mode>`
if it is not explicitly set to any other value.
1. ``citus`` extension will be automatically added to ``shared_preload_libraries``.
2. If ``max_prepared_transactions`` isn't explicitly set in the global
:ref:`dynamic configuration <dynamic_configuration>` Patroni will
@@ -77,36 +79,36 @@ It results in two major differences in :ref:`patronictl` behaviour when
An example of :ref:`patronictl_list` output for the Citus cluster::
postgres@coord1:~$ patronictl list demo
+ Citus cluster: demo ----------+--------------+---------+----+-----------+
| Group | Member | Host | Role | State | TL | Lag in MB |
+-------+---------+-------------+--------------+---------+----+-----------+
| 0 | coord1 | 172.27.0.10 | Replica | running | 1 | 0 |
| 0 | coord2 | 172.27.0.6 | Sync Standby | running | 1 | 0 |
| 0 | coord3 | 172.27.0.4 | Leader | running | 1 | |
| 1 | work1-1 | 172.27.0.8 | Sync Standby | running | 1 | 0 |
| 1 | work1-2 | 172.27.0.2 | Leader | running | 1 | |
| 2 | work2-1 | 172.27.0.5 | Sync Standby | running | 1 | 0 |
| 2 | work2-2 | 172.27.0.7 | Leader | running | 1 | |
+-------+---------+-------------+--------------+---------+----+-----------+
+ Citus cluster: demo ----------+----------------+---------+----+-----------+
| Group | Member | Host | Role | State | TL | Lag in MB |
+-------+---------+-------------+----------------+---------+----+-----------+
| 0 | coord1 | 172.27.0.10 | Replica | running | 1 | 0 |
| 0 | coord2 | 172.27.0.6 | Quorum Standby | running | 1 | 0 |
| 0 | coord3 | 172.27.0.4 | Leader | running | 1 | |
| 1 | work1-1 | 172.27.0.8 | Quorum Standby | running | 1 | 0 |
| 1 | work1-2 | 172.27.0.2 | Leader | running | 1 | |
| 2 | work2-1 | 172.27.0.5 | Quorum Standby | running | 1 | 0 |
| 2 | work2-2 | 172.27.0.7 | Leader | running | 1 | |
+-------+---------+-------------+----------------+---------+----+-----------+
If we add the ``--group`` option, the output will change to::
postgres@coord1:~$ patronictl list demo --group 0
+ Citus cluster: demo (group: 0, 7179854923829112860) -----------+
| Member | Host | Role | State | TL | Lag in MB |
+--------+-------------+--------------+---------+----+-----------+
| coord1 | 172.27.0.10 | Replica | running | 1 | 0 |
| coord2 | 172.27.0.6 | Sync Standby | running | 1 | 0 |
| coord3 | 172.27.0.4 | Leader | running | 1 | |
+--------+-------------+--------------+---------+----+-----------+
+ Citus cluster: demo (group: 0, 7179854923829112860) -+-----------+
| Member | Host | Role | State | TL | Lag in MB |
+--------+-------------+----------------+---------+----+-----------+
| coord1 | 172.27.0.10 | Replica | running | 1 | 0 |
| coord2 | 172.27.0.6 | Quorum Standby | running | 1 | 0 |
| coord3 | 172.27.0.4 | Leader | running | 1 | |
+--------+-------------+----------------+---------+----+-----------+
postgres@coord1:~$ patronictl list demo --group 1
+ Citus cluster: demo (group: 1, 7179854923881963547) -----------+
| Member | Host | Role | State | TL | Lag in MB |
+---------+------------+--------------+---------+----+-----------+
| work1-1 | 172.27.0.8 | Sync Standby | running | 1 | 0 |
| work1-2 | 172.27.0.2 | Leader | running | 1 | |
+---------+------------+--------------+---------+----+-----------+
+ Citus cluster: demo (group: 1, 7179854923881963547) -+-----------+
| Member | Host | Role | State | TL | Lag in MB |
+---------+------------+----------------+---------+----+-----------+
| work1-1 | 172.27.0.8 | Quorum Standby | running | 1 | 0 |
| work1-2 | 172.27.0.2 | Leader | running | 1 | |
+---------+------------+----------------+---------+----+-----------+
Citus worker switchover
-----------------------
@@ -122,28 +124,28 @@ new primary worker node is ready to accept read-write queries.
An example of :ref:`patronictl_switchover` on the worker cluster::
postgres@coord1:~$ patronictl switchover demo
+ Citus cluster: demo ----------+--------------+---------+----+-----------+
| Group | Member | Host | Role | State | TL | Lag in MB |
+-------+---------+-------------+--------------+---------+----+-----------+
| 0 | coord1 | 172.27.0.10 | Replica | running | 1 | 0 |
| 0 | coord2 | 172.27.0.6 | Sync Standby | running | 1 | 0 |
| 0 | coord3 | 172.27.0.4 | Leader | running | 1 | |
| 1 | work1-1 | 172.27.0.8 | Leader | running | 1 | |
| 1 | work1-2 | 172.27.0.2 | Sync Standby | running | 1 | 0 |
| 2 | work2-1 | 172.27.0.5 | Sync Standby | running | 1 | 0 |
| 2 | work2-2 | 172.27.0.7 | Leader | running | 1 | |
+-------+---------+-------------+--------------+---------+----+-----------+
+ Citus cluster: demo ----------+----------------+---------+----+-----------+
| Group | Member | Host | Role | State | TL | Lag in MB |
+-------+---------+-------------+----------------+---------+----+-----------+
| 0 | coord1 | 172.27.0.10 | Replica | running | 1 | 0 |
| 0 | coord2 | 172.27.0.6 | Quorum Standby | running | 1 | 0 |
| 0 | coord3 | 172.27.0.4 | Leader | running | 1 | |
| 1 | work1-1 | 172.27.0.8 | Leader | running | 1 | |
| 1 | work1-2 | 172.27.0.2 | Quorum Standby | running | 1 | 0 |
| 2 | work2-1 | 172.27.0.5 | Quorum Standby | running | 1 | 0 |
| 2 | work2-2 | 172.27.0.7 | Leader | running | 1 | |
+-------+---------+-------------+----------------+---------+----+-----------+
Citus group: 2
Primary [work2-2]:
Candidate ['work2-1'] []:
When should the switchover take place (e.g. 2022-12-22T08:02 ) [now]:
Current cluster topology
+ Citus cluster: demo (group: 2, 7179854924063375386) -----------+
| Member | Host | Role | State | TL | Lag in MB |
+---------+------------+--------------+---------+----+-----------+
| work2-1 | 172.27.0.5 | Sync Standby | running | 1 | 0 |
| work2-2 | 172.27.0.7 | Leader | running | 1 | |
+---------+------------+--------------+---------+----+-----------+
+ Citus cluster: demo (group: 2, 7179854924063375386) -+-----------+
| Member | Host | Role | State | TL | Lag in MB |
+---------+------------+----------------+---------+----+-----------+
| work2-1 | 172.27.0.5 | Quorum Standby | running | 1 | 0 |
| work2-2 | 172.27.0.7 | Leader | running | 1 | |
+---------+------------+----------------+---------+----+-----------+
Are you sure you want to switchover cluster demo, demoting current primary work2-2? [y/N]: y
2022-12-22 07:02:40.33003 Successfully switched over to "work2-1"
+ Citus cluster: demo (group: 2, 7179854924063375386) ------+
@@ -154,17 +156,17 @@ An example of :ref:`patronictl_switchover` on the worker cluster::
+---------+------------+---------+---------+----+-----------+
postgres@coord1:~$ patronictl list demo
+ Citus cluster: demo ----------+--------------+---------+----+-----------+
| Group | Member | Host | Role | State | TL | Lag in MB |
+-------+---------+-------------+--------------+---------+----+-----------+
| 0 | coord1 | 172.27.0.10 | Replica | running | 1 | 0 |
| 0 | coord2 | 172.27.0.6 | Sync Standby | running | 1 | 0 |
| 0 | coord3 | 172.27.0.4 | Leader | running | 1 | |
| 1 | work1-1 | 172.27.0.8 | Leader | running | 1 | |
| 1 | work1-2 | 172.27.0.2 | Sync Standby | running | 1 | 0 |
| 2 | work2-1 | 172.27.0.5 | Leader | running | 2 | |
| 2 | work2-2 | 172.27.0.7 | Sync Standby | running | 2 | 0 |
+-------+---------+-------------+--------------+---------+----+-----------+
+ Citus cluster: demo ----------+----------------+---------+----+-----------+
| Group | Member | Host | Role | State | TL | Lag in MB |
+-------+---------+-------------+----------------+---------+----+-----------+
| 0 | coord1 | 172.27.0.10 | Replica | running | 1 | 0 |
| 0 | coord2 | 172.27.0.6 | Quorum Standby | running | 1 | 0 |
| 0 | coord3 | 172.27.0.4 | Leader | running | 1 | |
| 1 | work1-1 | 172.27.0.8 | Leader | running | 1 | |
| 1 | work1-2 | 172.27.0.2 | Quorum Standby | running | 1 | 0 |
| 2 | work2-1 | 172.27.0.5 | Leader | running | 2 | |
| 2 | work2-2 | 172.27.0.7 | Quorum Standby | running | 2 | 0 |
+-------+---------+-------------+----------------+---------+----+-----------+
And this is how it looks on the coordinator side::

View File

@@ -25,7 +25,7 @@ In order to change the dynamic configuration you can use either :ref:`patronictl
- **max\_timelines\_history**: maximum number of timeline history items kept in DCS. Default value: 0. When set to 0, it keeps the full history in DCS.
- **primary\_start\_timeout**: the amount of time a primary is allowed to recover from failures before failover is triggered (in seconds). Default is 300 seconds. When set to 0 failover is done immediately after a crash is detected if possible. When using asynchronous replication a failover can cause lost transactions. Worst case failover time for primary failure is: loop\_wait + primary\_start\_timeout + loop\_wait, unless primary\_start\_timeout is zero, in which case it's just loop\_wait. Set the value according to your durability/availability tradeoff.
- **primary\_stop\_timeout**: The number of seconds Patroni is allowed to wait when stopping Postgres and effective only when synchronous_mode is enabled. When set to > 0 and the synchronous_mode is enabled, Patroni sends SIGKILL to the postmaster if the stop operation is running for more than the value set by primary\_stop\_timeout. Set the value according to your durability/availability tradeoff. If the parameter is not set or set <= 0, primary\_stop\_timeout does not apply.
- **synchronous\_mode**: turns on synchronous replication mode. In this mode a replica will be chosen as synchronous and only the latest leader and synchronous replica are able to participate in leader election. Synchronous mode makes sure that successfully committed transactions will not be lost at failover, at the cost of losing availability for writes when Patroni cannot ensure transaction durability. See :ref:`replication modes documentation <replication_modes>` for details.
- **synchronous\_mode**: turns on synchronous replication mode. Possible values: ``off``, ``on``, ``quorum``. In this mode the leader takes care of management of ``synchronous_standby_names``, and only the last known leader, or one of synchronous replicas, are allowed to participate in leader race. Synchronous mode makes sure that successfully committed transactions will not be lost at failover, at the cost of losing availability for writes when Patroni cannot ensure transaction durability. See :ref:`replication modes documentation <replication_modes>` for details.
- **synchronous\_mode\_strict**: prevents disabling synchronous replication if no synchronous replicas are available, blocking all client writes to the primary. See :ref:`replication modes documentation <replication_modes>` for details.
- **failsafe\_mode**: Enables :ref:`DCS Failsafe Mode <dcs_failsafe_mode>`. Defaults to `false`.
- **postgresql**:

View File

@@ -6,8 +6,9 @@ Replication modes
Patroni uses PostgreSQL streaming replication. For more information about streaming replication, see the `Postgres documentation <http://www.postgresql.org/docs/current/static/warm-standby.html#STREAMING-REPLICATION>`__. By default Patroni configures PostgreSQL for asynchronous replication. Choosing your replication schema is dependent on your business considerations. Investigate both async and sync replication, as well as other HA solutions, to determine which solution is best for you.
Asynchronous mode durability
----------------------------
============================
In asynchronous mode the cluster is allowed to lose some committed transactions to ensure availability. When the primary server fails or becomes unavailable for any other reason Patroni will automatically promote a sufficiently healthy standby to primary. Any transactions that have not been replicated to that standby remain in a "forked timeline" on the primary, and are effectively unrecoverable [1]_.
@@ -15,10 +16,11 @@ The amount of transactions that can be lost is controlled via ``maximum_lag_on_f
By default, when running leader elections, Patroni does not take into account the current timeline of replicas, what in some cases could be undesirable behavior. You can prevent the node not having the same timeline as a former primary become the new leader by changing the value of ``check_timeline`` parameter to ``true``.
PostgreSQL synchronous replication
----------------------------------
You can use Postgres's `synchronous replication <http://www.postgresql.org/docs/current/static/warm-standby.html#SYNCHRONOUS-REPLICATION>`__ with Patroni. Synchronous replication ensures consistency across a cluster by confirming that writes are written to a secondary before returning to the connecting client with a success. The cost of synchronous replication: reduced throughput on writes. This throughput will be entirely based on network performance.
PostgreSQL synchronous replication
==================================
You can use Postgres's `synchronous replication <http://www.postgresql.org/docs/current/static/warm-standby.html#SYNCHRONOUS-REPLICATION>`__ with Patroni. Synchronous replication ensures consistency across a cluster by confirming that writes are written to a secondary before returning to the connecting client with a success. The cost of synchronous replication: increased latency and reduced throughput on writes. This throughput will be entirely based on network performance.
In hosted datacenter environments (like AWS, Rackspace, or any network you do not control), synchronous replication significantly increases the variability of write performance. If followers become inaccessible from the leader, the leader effectively becomes read-only.
@@ -33,10 +35,11 @@ When using PostgreSQL synchronous replication, use at least three Postgres data
Using PostgreSQL synchronous replication does not guarantee zero lost transactions under all circumstances. When the primary and the secondary that is currently acting as a synchronous replica fail simultaneously a third node that might not contain all transactions will be promoted.
.. _synchronous_mode:
Synchronous mode
----------------
================
For use cases where losing committed transactions is not permissible you can turn on Patroni's ``synchronous_mode``. When ``synchronous_mode`` is turned on Patroni will not promote a standby unless it is certain that the standby contains all transactions that may have returned a successful commit status to client [2]_. This means that the system may be unavailable for writes even though some servers are available. System administrators can still use manual failover commands to promote a standby even if it results in transaction loss.
@@ -55,28 +58,122 @@ up.
You can ensure that a standby never becomes the synchronous standby by setting ``nosync`` tag to true. This is recommended to set for standbys that are behind slow network connections and would cause performance degradation when becoming a synchronous standby. Setting tag ``nostream`` to true will also have the same effect.
Synchronous mode can be switched on and off via Patroni REST interface. See :ref:`dynamic configuration <dynamic_configuration>` for instructions.
Synchronous mode can be switched on and off using ``patronictl edit-config`` command or via Patroni REST interface. See :ref:`dynamic configuration <dynamic_configuration>` for instructions.
Note: Because of the way synchronous replication is implemented in PostgreSQL it is still possible to lose transactions even when using ``synchronous_mode_strict``. If the PostgreSQL backend is cancelled while waiting to acknowledge replication (as a result of packet cancellation due to client timeout or backend failure) transaction changes become visible for other backends. Such changes are not yet replicated and may be lost in case of standby promotion.
Synchronous Replication Factor
------------------------------
The parameter ``synchronous_node_count`` is used by Patroni to manage number of synchronous standby databases. It is set to 1 by default. It has no effect when ``synchronous_mode`` is set to off. When enabled, Patroni manages precise number of synchronous standby databases based on parameter ``synchronous_node_count`` and adjusts the state in DCS & synchronous_standby_names as members join and leave.
==============================
The parameter ``synchronous_node_count`` is used by Patroni to manage the number of synchronous standby databases. It is set to ``1`` by default. It has no effect when ``synchronous_mode`` is set to ``off``. When enabled, Patroni manages the precise number of synchronous standby databases based on parameter ``synchronous_node_count`` and adjusts the state in DCS & ``synchronous_standby_names`` in PostgreSQL as members join and leave. If the parameter is set to a value higher than the number of eligible nodes it will be automatically reduced by Patroni.
Maximum lag on synchronous node
===============================
By default Patroni sticks to nodes that are declared as ``synchronous``, according to the ``pg_stat_replication`` view, even when there are other nodes ahead of it. This is done to minimize the number of changes of ``synchronous_standby_names``. To change this behavior one may use ``maximum_lag_on_syncnode`` parameter. It controls how much lag the replica can have to still be considered as "synchronous".
Patroni utilizes the max replica LSN if there is more than one standby, otherwise it will use leader's current wal LSN. The default is ``-1``, and Patroni will not take action to swap a synchronous unhealthy standby when the value is set to ``0`` or less. Please set the value high enough so that Patroni won't swap synchronous standbys frequently during high transaction volume.
Synchronous mode implementation
-------------------------------
===============================
When in synchronous mode Patroni maintains synchronization state in the DCS, containing the latest primary and current synchronous standby databases. This state is updated with strict ordering constraints to ensure the following invariants:
When in synchronous mode Patroni maintains synchronization state in the DCS (``/sync`` key), containing the latest primary and current synchronous standby databases. This state is updated with strict ordering constraints to ensure the following invariants:
- A node must be marked as the latest leader whenever it can accept write transactions. Patroni crashing or PostgreSQL not shutting down can cause violations of this invariant.
- A node must be set as the synchronous standby in PostgreSQL as long as it is published as the synchronous standby.
- A node must be set as the synchronous standby in PostgreSQL as long as it is published as the synchronous standby in the ``/sync`` key in DCS..
- A node that is not the leader or current synchronous standby is not allowed to promote itself automatically.
Patroni will only assign one or more synchronous standby nodes based on ``synchronous_node_count`` parameter to ``synchronous_standby_names``.
On each HA loop iteration Patroni re-evaluates synchronous standby nodes choice. If the current list of synchronous standby nodes are connected and has not requested its synchronous status to be removed it remains picked. Otherwise the cluster member available for sync that is furthest ahead in replication is picked.
On each HA loop iteration Patroni re-evaluates synchronous standby nodes choice. If the current list of synchronous standby nodes are connected and has not requested its synchronous status to be removed it remains picked. Otherwise the cluster members available for sync that are furthest ahead in replication are picked.
Example:
---------
``/config`` key in DCS
^^^^^^^^^^^^^^^^^^^^^^
.. code-block:: YAML
synchronous_mode: on
synchronous_node_count: 2
...
``/sync`` key in DCS
^^^^^^^^^^^^^^^^^^^^
.. code-block:: JSON
{
"leader": "node0",
"sync_standby": "node1,node2"
}
postgresql.conf
^^^^^^^^^^^^^^^
.. code-block:: INI
synchronous_standby_names = 'FIRST 2 (node1,node2)'
In the above examples only nodes ``node1`` and ``node2`` are known to be synchronous and allowed to be automatically promoted if the primary (``node0``) fails.
.. _quorum_mode:
Quorum commit mode
==================
Starting from PostgreSQL v10 Patroni supports quorum-based synchronous replication.
In this mode, Patroni maintains synchronization state in the DCS, containing the latest known primary, the number of nodes required for quorum, and the nodes currently eligible to vote on quorum. In steady state, the nodes voting on quorum are the leader and all synchronous standbys. This state is updated with strict ordering constraints, with regards to node promotion and ``synchronous_standby_names``, to ensure that at all times any subset of voters that can achieve quorum includes at least one node with the latest successful commit.
On each iteration of HA loop, Patroni re-evaluates synchronous standby choices and quorum, based on node availability and requested cluster configuration. In PostgreSQL versions above 9.6 all eligible nodes are added as synchronous standbys as soon as their replication catches up to leader.
Quorum commit helps to reduce worst case latencies, even during normal operation, as a higher latency of replicating to one standby can be compensated by other standbys.
The quorum-based synchronous mode could be enabled by setting ``synchronous_mode`` to ``quorum`` using ``patronictl edit-config`` command or via Patroni REST interface. See :ref:`dynamic configuration <dynamic_configuration>` for instructions.
Other parameters, like ``synchronous_node_count``, ``maximum_lag_on_syncnode``, and ``synchronous_mode_strict`` continue to work the same way as with ``synchronous_mode=on``.
Example:
---------
``/config`` key in DCS
^^^^^^^^^^^^^^^^^^^^^^
.. code-block:: YAML
synchronous_mode: quorum
synchronous_node_count: 2
...
``/sync`` key in DCS
^^^^^^^^^^^^^^^^^^^^
.. code-block:: JSON
{
"leader": "node0",
"sync_standby": "node1,node2,node3",
"quorum": 1
}
postgresql.conf
^^^^^^^^^^^^^^^
.. code-block:: INI
synchronous_standby_names = 'ANY 2 (node1,node2,node3)'
If the primary (``node0``) failed, in the above example two of the ``node1``, ``node2``, ``node3`` will have the latest transaction received, but we don't know which ones. To figure out whether the node ``node1`` has received the latest transaction, we need to compare its LSN with the LSN on **at least** one node (``quorum=1`` in the ``/sync`` key) among ``node2`` and ``node3``. If ``node1`` isn't behind of at least one of them, we can guarantee that there will be no user visible data loss if ``node1`` is promoted.
.. [1] The data is still there, but recovering it requires a manual recovery effort by data recovery specialists. When Patroni is allowed to rewind with ``use_pg_rewind`` the forked timeline will be automatically erased to rejoin the failed primary with the cluster. However, for ``use_pg_rewind`` to function properly, either the cluster must be initialized with ``data page checksums`` (``--data-checksums`` option for ``initdb``) and/or ``wal_log_hints`` must be set to ``on``.

View File

@@ -45,6 +45,10 @@ For all health check ``GET`` requests Patroni returns a JSON document with the s
- ``GET /read-only-sync``: like the above endpoint, but also includes the primary.
- ``GET /quorum``: returns HTTP status code **200** only when this Patroni node is listed as a quorum node in ``synchronous_standby_names`` on the primary.
- ``GET /read-only-quorum``: like the above endpoint, but also includes the primary.
- ``GET /asynchronous`` or ``GET /async``: returns HTTP status code **200** only when the Patroni node is running as an asynchronous standby.
@@ -308,6 +312,9 @@ Retrieve the Patroni metrics in Prometheus format through the ``GET /metrics`` e
# HELP patroni_sync_standby Value is 1 if this node is a sync standby replica, 0 otherwise.
# TYPE patroni_sync_standby gauge
patroni_sync_standby{scope="batman",name="patroni1"} 0
# HELP patroni_quorum_standby Value is 1 if this node is a quorum standby replica, 0 otherwise.
# TYPE patroni_quorum_standby gauge
patroni_quorum_standby{scope="batman",name="patroni1"} 0
# HELP patroni_xlog_received_location Current location of the received Postgres transaction log, 0 if this node is not a replica.
# TYPE patroni_xlog_received_location counter
patroni_xlog_received_location{scope="batman",name="patroni1"} 0

View File

@@ -0,0 +1,68 @@
Feature: quorum commit
Check basic workfrlows when quorum commit is enabled
Scenario: check enable quorum commit and that the only leader promotes after restart
Given I start postgres0
Then postgres0 is a leader after 10 seconds
And there is a non empty initialize key in DCS after 15 seconds
When I issue a PATCH request to http://127.0.0.1:8008/config with {"ttl": 20, "synchronous_mode": "quorum"}
Then I receive a response code 200
And sync key in DCS has leader=postgres0 after 20 seconds
And sync key in DCS has quorum=0 after 2 seconds
And synchronous_standby_names on postgres0 is set to "_empty_str_" after 2 seconds
When I shut down postgres0
And sync key in DCS has leader=postgres0 after 2 seconds
When I start postgres0
Then postgres0 role is the primary after 10 seconds
When I issue a PATCH request to http://127.0.0.1:8008/config with {"synchronous_mode_strict": true}
Then synchronous_standby_names on postgres0 is set to "ANY 1 (*)" after 10 seconds
Scenario: check failover with one quorum standby
Given I start postgres1
Then sync key in DCS has sync_standby=postgres1 after 10 seconds
And synchronous_standby_names on postgres0 is set to "ANY 1 (postgres1)" after 2 seconds
When I shut down postgres0
Then postgres1 role is the primary after 10 seconds
And sync key in DCS has quorum=0 after 10 seconds
Then synchronous_standby_names on postgres1 is set to "ANY 1 (*)" after 10 seconds
When I start postgres0
Then sync key in DCS has leader=postgres1 after 10 seconds
Then sync key in DCS has sync_standby=postgres0 after 10 seconds
And synchronous_standby_names on postgres1 is set to "ANY 1 (postgres0)" after 2 seconds
Scenario: check behavior with three nodes and different replication factor
Given I start postgres2
Then sync key in DCS has sync_standby=postgres0,postgres2 after 10 seconds
And sync key in DCS has quorum=1 after 2 seconds
And synchronous_standby_names on postgres1 is set to "ANY 1 (postgres0,postgres2)" after 2 seconds
When I issue a PATCH request to http://127.0.0.1:8009/config with {"synchronous_node_count": 2}
Then sync key in DCS has quorum=0 after 10 seconds
And synchronous_standby_names on postgres1 is set to "ANY 2 (postgres0,postgres2)" after 2 seconds
Scenario: switch from quorum replication to good old multisync and back
Given I issue a PATCH request to http://127.0.0.1:8009/config with {"synchronous_mode": true, "synchronous_node_count": 1}
And I shut down postgres0
Then synchronous_standby_names on postgres1 is set to "postgres2" after 10 seconds
And sync key in DCS has sync_standby=postgres2 after 10 seconds
Then sync key in DCS has quorum=0 after 2 seconds
When I issue a PATCH request to http://127.0.0.1:8009/config with {"synchronous_mode": "quorum"}
And I start postgres0
Then synchronous_standby_names on postgres1 is set to "ANY 1 (postgres0,postgres2)" after 10 seconds
And sync key in DCS has sync_standby=postgres0,postgres2 after 10 seconds
Then sync key in DCS has quorum=1 after 2 seconds
Scenario: REST API and patronictl
Given I run patronictl.py list batman
Then I receive a response returncode 0
And I receive a response output "Quorum Standby"
And Status code on GET http://127.0.0.1:8008/quorum is 200 after 3 seconds
And Status code on GET http://127.0.0.1:8010/quorum is 200 after 3 seconds
Scenario: nosync node is removed from voters and synchronous_standby_names
Given I add tag nosync true to postgres2 config
When I issue an empty POST request to http://127.0.0.1:8010/reload
Then I receive a response code 202
And sync key in DCS has quorum=0 after 10 seconds
And sync key in DCS has sync_standby=postgres0 after 10 seconds
And synchronous_standby_names on postgres1 is set to "ANY 1 (postgres0)" after 2 seconds
And Status code on GET http://127.0.0.1:8010/quorum is 503 after 10 seconds

View File

@@ -0,0 +1,60 @@
import json
import re
import time
from behave import step, then
@step('sync key in DCS has {key:w}={value} after {time_limit:d} seconds')
def check_sync(context, key, value, time_limit):
time_limit *= context.timeout_multiplier
max_time = time.time() + int(time_limit)
dcs_value = None
while time.time() < max_time:
try:
response = json.loads(context.dcs_ctl.query('sync'))
dcs_value = response.get(key)
if key == 'sync_standby' and set((dcs_value or '').split(',')) == set(value.split(',')):
return
elif str(dcs_value) == value:
return
except Exception:
pass
time.sleep(1)
assert False, "sync does not have {0}={1} (found {2}) in dcs after {3} seconds".format(key, value,
dcs_value, time_limit)
def _parse_synchronous_standby_names(value):
if '(' in value:
m = re.match(r'.*(\d+) \(([^)]+)\)', value)
expected_value = set(m.group(2).split())
expected_num = m.group(1)
else:
expected_value = set([value])
expected_num = '1'
return expected_num, expected_value
@then('synchronous_standby_names on {name:2} is set to "{value}" after {time_limit:d} seconds')
def check_synchronous_standby_names(context, name, value, time_limit):
time_limit *= context.timeout_multiplier
max_time = time.time() + int(time_limit)
if value == '_empty_str_':
value = ''
expected_num, expected_value = _parse_synchronous_standby_names(value)
ssn = None
while time.time() < max_time:
try:
ssn = context.pctl.query(name, "SHOW synchronous_standby_names").fetchone()[0]
db_num, db_value = _parse_synchronous_standby_names(ssn)
if expected_value == db_value and expected_num == db_num:
return
except Exception:
pass
time.sleep(1)
assert False, "synchronous_standby_names is not set to '{0}' (found '{1}') after {2} seconds".format(value, ssn,
time_limit)

View File

@@ -266,6 +266,14 @@ class RestApiHandler(BaseHTTPRequestHandler):
* HTTP status ``200``: if up and running and without ``noloadbalance`` tag.
* ``/quorum``:
* HTTP status ``200``: if up and running as a quorum synchronous standby.
* ``/read-only-quorum``:
* HTTP status ``200``: if up and running as a quorum synchronous standby or primary.
* ``/synchronous`` or ``/sync``:
* HTTP status ``200``: if up and running as a synchronous standby.
@@ -346,16 +354,24 @@ class RestApiHandler(BaseHTTPRequestHandler):
ignore_tags = True
elif 'replica' in path:
status_code = replica_status_code
elif 'read-only' in path and 'sync' not in path:
elif 'read-only' in path and 'sync' not in path and 'quorum' not in path:
status_code = 200 if 200 in (primary_status_code, standby_leader_status_code) else replica_status_code
elif 'health' in path:
status_code = 200 if response.get('state') == 'running' else 503
elif cluster: # dcs is available
is_quorum = response.get('quorum_standby')
is_synchronous = response.get('sync_standby')
if path in ('/sync', '/synchronous') and is_synchronous:
status_code = replica_status_code
elif path in ('/async', '/asynchronous') and not is_synchronous:
elif path == '/quorum' and is_quorum:
status_code = replica_status_code
elif path in ('/async', '/asynchronous') and not is_synchronous and not is_quorum:
status_code = replica_status_code
elif path == '/read-only-quorum':
if 200 in (primary_status_code, standby_leader_status_code):
status_code = 200
elif is_quorum:
status_code = replica_status_code
elif path in ('/read-only-sync', '/read-only-synchronous'):
if 200 in (primary_status_code, standby_leader_status_code):
status_code = 200
@@ -522,6 +538,7 @@ class RestApiHandler(BaseHTTPRequestHandler):
* ``patroni_standby_leader``: ``1`` if standby leader node, else ``0``;
* ``patroni_replica``: ``1`` if a replica, else ``0``;
* ``patroni_sync_standby``: ``1`` if a sync replica, else ``0``;
* ``patroni_quorum_standby``: ``1`` if a quorum sync replica, else ``0``;
* ``patroni_xlog_received_location``: ``pg_wal_lsn_diff(pg_last_wal_receive_lsn(), '0/0')``;
* ``patroni_xlog_replayed_location``: ``pg_wal_lsn_diff(pg_last_wal_replay_lsn(), '0/0)``;
* ``patroni_xlog_replayed_timestamp``: ``pg_last_xact_replay_timestamp``;
@@ -584,10 +601,14 @@ class RestApiHandler(BaseHTTPRequestHandler):
metrics.append("# TYPE patroni_replica gauge")
metrics.append("patroni_replica{0} {1}".format(labels, int(postgres['role'] == 'replica')))
metrics.append("# HELP patroni_sync_standby Value is 1 if this node is a sync standby replica, 0 otherwise.")
metrics.append("# HELP patroni_sync_standby Value is 1 if this node is a sync standby, 0 otherwise.")
metrics.append("# TYPE patroni_sync_standby gauge")
metrics.append("patroni_sync_standby{0} {1}".format(labels, int(postgres.get('sync_standby', False))))
metrics.append("# HELP patroni_quorum_standby Value is 1 if this node is a quorum standby, 0 otherwise.")
metrics.append("# TYPE patroni_quorum_standby gauge")
metrics.append("patroni_quorum_standby{0} {1}".format(labels, int(postgres.get('quorum_standby', False))))
metrics.append("# HELP patroni_xlog_received_location Current location of the received"
" Postgres transaction log, 0 if this node is not a replica.")
metrics.append("# TYPE patroni_xlog_received_location counter")
@@ -1049,16 +1070,17 @@ class RestApiHandler(BaseHTTPRequestHandler):
:returns: a string with the error message or ``None`` if good nodes are found.
"""
is_synchronous_mode = global_config.from_cluster(cluster).is_synchronous_mode
config = global_config.from_cluster(cluster)
if leader and (not cluster.leader or cluster.leader.name != leader):
return 'leader name does not match'
if candidate:
if action == 'switchover' and is_synchronous_mode and not cluster.sync.matches(candidate):
if action == 'switchover' and config.is_synchronous_mode\
and not config.is_quorum_commit_mode and not cluster.sync.matches(candidate):
return 'candidate name does not match with sync_standby'
members = [m for m in cluster.members if m.name == candidate]
if not members:
return 'candidate does not exists'
elif is_synchronous_mode:
elif config.is_synchronous_mode and not config.is_quorum_commit_mode:
members = [m for m in cluster.members if cluster.sync.matches(m.name)]
if not members:
return action + ' is not possible: can not find sync_standby'
@@ -1265,6 +1287,7 @@ class RestApiHandler(BaseHTTPRequestHandler):
* ``paused``: ``pg_is_wal_replay_paused()``;
* ``sync_standby``: ``True`` if replication mode is synchronous and this is a sync standby;
* ``quorum_standby``: ``True`` if replication mode is quorum and this is a quorum standby;
* ``timeline``: PostgreSQL primary node timeline;
* ``replication``: :class:`list` of :class:`dict` entries, one for each replication connection. Each entry
contains the following keys:
@@ -1320,7 +1343,7 @@ class RestApiHandler(BaseHTTPRequestHandler):
if result['role'] == 'replica' and config.is_synchronous_mode\
and cluster and cluster.sync.matches(postgresql.name):
result['sync_standby'] = True
result['quorum_standby' if global_config.is_quorum_commit_mode else 'sync_standby'] = True
if row[1] > 0:
result['timeline'] = row[1]

View File

@@ -756,7 +756,7 @@ class Config(object):
if 'citus' in config:
bootstrap = config.setdefault('bootstrap', {})
dcs = bootstrap.setdefault('dcs', {})
dcs.setdefault('synchronous_mode', True)
dcs.setdefault('synchronous_mode', 'quorum')
updated_fields = (
'name',

View File

@@ -548,11 +548,15 @@ class SyncState(NamedTuple):
:ivar version: modification version of a synchronization key in a Configuration Store.
:ivar leader: reference to member that was leader.
:ivar sync_standby: synchronous standby list (comma delimited) which are last synchronized to leader.
:ivar quorum: if the node from :attr:`~SyncState.sync_standby` list is doing a leader race it should
see at least :attr:`~SyncState.quorum` other nodes from the
:attr:`~SyncState.sync_standby` + :attr:`~SyncState.leader` list.
"""
version: Optional[_Version]
leader: Optional[str]
sync_standby: Optional[str]
quorum: int
@staticmethod
def from_node(version: Optional[_Version], value: Union[str, Dict[str, Any], None]) -> 'SyncState':
@@ -587,7 +591,9 @@ class SyncState(NamedTuple):
if value and isinstance(value, str):
value = json.loads(value)
assert isinstance(value, dict)
return SyncState(version, value.get('leader'), value.get('sync_standby'))
leader = value.get('leader')
quorum = value.get('quorum')
return SyncState(version, leader, value.get('sync_standby'), int(quorum) if leader and quorum else 0)
except (AssertionError, TypeError, ValueError):
return SyncState.empty(version)
@@ -599,7 +605,7 @@ class SyncState(NamedTuple):
:returns: empty synchronisation state object.
"""
return SyncState(version, None, None)
return SyncState(version, None, None, 0)
@property
def is_empty(self) -> bool:
@@ -617,10 +623,17 @@ class SyncState(NamedTuple):
return list(filter(lambda a: a, [s.strip() for s in value.split(',')]))
@property
def members(self) -> List[str]:
def voters(self) -> List[str]:
""":attr:`~SyncState.sync_standby` as list or an empty list if undefined or object considered ``empty``."""
return self._str_to_list(self.sync_standby) if not self.is_empty and self.sync_standby else []
@property
def members(self) -> List[str]:
""":attr:`~SyncState.sync_standby` and :attr:`~SyncState.leader` as list
or an empty list if object considered ``empty``.
"""
return [] if not self.leader else [self.leader] + self.voters
def matches(self, name: Optional[str], check_leader: bool = False) -> bool:
"""Checks if node is presented in the /sync state.
@@ -634,7 +647,7 @@ class SyncState(NamedTuple):
the sync state.
:Example:
>>> s = SyncState(1, 'foo', 'bar,zoo')
>>> s = SyncState(1, 'foo', 'bar,zoo', 0)
>>> s.matches('foo')
False
@@ -1885,18 +1898,23 @@ class AbstractDCS(abc.ABC):
"""
@staticmethod
def sync_state(leader: Optional[str], sync_standby: Optional[Collection[str]]) -> Dict[str, Any]:
def sync_state(leader: Optional[str], sync_standby: Optional[Collection[str]],
quorum: Optional[int]) -> Dict[str, Any]:
"""Build ``sync_state`` dictionary.
:param leader: name of the leader node that manages ``/sync`` key.
:param sync_standby: collection of currently known synchronous standby node names.
:param quorum: if the node from :attr:`~SyncState.sync_standby` list is doing a leader race it should
see at least :attr:`~SyncState.quorum` other nodes from the
:attr:`~SyncState.sync_standby` + :attr:`~SyncState.leader` list
:returns: dictionary that later could be serialized to JSON or saved directly to DCS.
"""
return {'leader': leader, 'sync_standby': ','.join(sorted(sync_standby)) if sync_standby else None}
return {'leader': leader, 'quorum': quorum,
'sync_standby': ','.join(sorted(sync_standby)) if sync_standby else None}
def write_sync_state(self, leader: Optional[str], sync_standby: Optional[Collection[str]],
version: Optional[Any] = None) -> Optional[SyncState]:
quorum: Optional[int], version: Optional[Any] = None) -> Optional[SyncState]:
"""Write the new synchronous state to DCS.
Calls :meth:`~AbstractDCS.sync_state` to build a dictionary and then calls DCS specific
@@ -1905,10 +1923,13 @@ class AbstractDCS(abc.ABC):
:param leader: name of the leader node that manages ``/sync`` key.
:param sync_standby: collection of currently known synchronous standby node names.
:param version: for conditional update of the key/object.
:param quorum: if the node from :attr:`~SyncState.sync_standby` list is doing a leader race it should
see at least :attr:`~SyncState.quorum` other nodes from the
:attr:`~SyncState.sync_standby` + :attr:`~SyncState.leader` list
:returns: the new :class:`SyncState` object or ``None``.
"""
sync_value = self.sync_state(leader, sync_standby)
sync_value = self.sync_state(leader, sync_standby, quorum)
ret = self.set_sync_state_value(json.dumps(sync_value, separators=(',', ':')), version)
if not isinstance(ret, bool):
return SyncState.from_node(ret, sync_value)

View File

@@ -1373,15 +1373,18 @@ class Kubernetes(AbstractDCS):
raise NotImplementedError # pragma: no cover
def write_sync_state(self, leader: Optional[str], sync_standby: Optional[Collection[str]],
version: Optional[str] = None) -> Optional[SyncState]:
quorum: Optional[int], version: Optional[str] = None) -> Optional[SyncState]:
"""Prepare and write annotations to $SCOPE-sync Endpoint or ConfigMap.
:param leader: name of the leader node that manages /sync key
:param sync_standby: collection of currently known synchronous standby node names
:param quorum: if the node from sync_standby list is doing a leader race it should
see at least quorum other nodes from the sync_standby + leader list
:param version: last known `resource_version` for conditional update of the object
:returns: the new :class:`SyncState` object or None
"""
sync_state = self.sync_state(leader, sync_standby)
sync_state = self.sync_state(leader, sync_standby, quorum)
sync_state['quorum'] = str(sync_state['quorum']) if sync_state['quorum'] is not None else None
ret = self.patch_or_create(self.sync_path, sync_state, version, False)
if not isinstance(ret, bool):
return SyncState.from_node(ret.metadata.resource_version, sync_state)
@@ -1393,7 +1396,7 @@ class Kubernetes(AbstractDCS):
:param version: last known `resource_version` for conditional update of the object
:returns: `True` if "delete" was successful
"""
return self.write_sync_state(None, None, version=version) is not None
return self.write_sync_state(None, None, None, version=version) is not None
def watch(self, leader_version: Optional[str], timeout: float) -> bool:
if self.__do_not_watch:

View File

@@ -105,10 +105,16 @@ class GlobalConfig(types.ModuleType):
"""``True`` if cluster is in maintenance mode."""
return self.check_mode('pause')
@property
def is_quorum_commit_mode(self) -> bool:
""":returns: ``True`` if quorum commit replication is requested"""
return str(self.get('synchronous_mode')).lower() == 'quorum'
@property
def is_synchronous_mode(self) -> bool:
"""``True`` if synchronous replication is requested and it is not a standby cluster config."""
return self.check_mode('synchronous_mode') and not self.is_standby_cluster
return (self.check_mode('synchronous_mode') is True or self.is_quorum_commit_mode) \
and not self.is_standby_cluster
@property
def is_synchronous_mode_strict(self) -> bool:

View File

@@ -14,12 +14,13 @@ from . import global_config, psycopg
from .__main__ import Patroni
from .async_executor import AsyncExecutor, CriticalTask
from .collections import CaseInsensitiveSet
from .dcs import AbstractDCS, Cluster, Leader, Member, RemoteMember, Status, slot_name_from_member_name
from .dcs import AbstractDCS, Cluster, Leader, Member, RemoteMember, Status, SyncState, slot_name_from_member_name
from .exceptions import DCSError, PostgresConnectionException, PatroniFatalException
from .postgresql.callback_executor import CallbackAction
from .postgresql.misc import postgres_version_to_int
from .postgresql.postmaster import PostmasterProcess
from .postgresql.rewind import Rewind
from .quorum import QuorumStateResolver
from .tags import Tags
from .utils import parse_int, polling_loop, tzutc
@@ -231,6 +232,7 @@ class Ha(object):
self._leader_expiry_lock = RLock()
self._failsafe = Failsafe(patroni.dcs)
self._was_paused = False
self._promote_timestamp = 0
self._leader_timeline = None
self.recovering = False
self._async_response = CriticalTask()
@@ -296,6 +298,8 @@ class Ha(object):
"""
with self._leader_expiry_lock:
self._leader_expiry = time.time() + self.dcs.ttl if value else 0
if not value:
self._promote_timestamp = 0
def sync_mode_is_active(self) -> bool:
"""Check whether synchronous replication is requested and already active.
@@ -304,6 +308,13 @@ class Ha(object):
"""
return self.is_synchronous_mode() and not self.cluster.sync.is_empty
def quorum_commit_mode_is_active(self) -> bool:
"""Checks whether quorum replication is requested and already active.
:returns: ``True`` if the primary already put its name into the ``/sync`` in DCS.
"""
return self.is_quorum_commit_mode() and not self.cluster.sync.is_empty
def _get_failover_action_name(self) -> str:
"""Return the currently requested manual failover action name or the default ``failover``.
@@ -771,81 +782,221 @@ class Ha(object):
""":returns: `True` if synchronous replication is requested."""
return global_config.is_synchronous_mode
def is_quorum_commit_mode(self) -> bool:
"""``True`` if quorum commit replication is requested and "supported"."""
return global_config.is_quorum_commit_mode and self.state_handler.supports_multiple_sync
def is_failsafe_mode(self) -> bool:
""":returns: `True` if failsafe_mode is enabled in global configuration."""
return global_config.check_mode('failsafe_mode')
def process_sync_replication(self) -> None:
"""Process synchronous standby beahvior.
def _maybe_enable_synchronous_mode(self) -> Optional[SyncState]:
"""Explicitly enable synchronous mode if not yet enabled.
We are trying to solve a corner case: synchronous mode needs to be explicitly enabled
by updating the ``/sync`` key with the current leader name and empty members. In opposite
case it will never be automatically enabled if there are no eligible candidates.
:returns: the latest version of :class:`~patroni.dcs.SyncState` object.
"""
sync = self.cluster.sync
if sync.is_empty:
sync = self.dcs.write_sync_state(self.state_handler.name, None, 0, version=sync.version)
if sync:
logger.info("Enabled synchronous replication")
else:
logger.warning("Updating sync state failed")
return sync
def disable_synchronous_replication(self) -> None:
"""Cleans up ``/sync`` key in DCS and updates ``synchronous_standby_names``.
.. note::
We fall back to using the value configured by the user for ``synchronous_standby_names``, if any.
"""
# If synchronous_mode was turned off, we need to update synchronous_standby_names in Postgres
if not self.cluster.sync.is_empty and self.dcs.delete_sync_state(version=self.cluster.sync.version):
logger.info("Disabled synchronous replication")
self.state_handler.sync_handler.set_synchronous_standby_names(CaseInsensitiveSet())
# As synchronous_mode is off, check if the user configured Postgres synchronous replication instead
ssn = self.state_handler.config.synchronous_standby_names
self.state_handler.config.set_synchronous_standby_names(ssn)
def _process_quorum_replication(self) -> None:
"""Process synchronous replication state when quorum commit is requested.
Synchronous standbys are registered in two places: ``postgresql.conf`` and DCS. The order of updating them must
keep the invariant that ``quorum + sync >= len(set(quorum pool)|set(sync pool))``. This is done using
:class:`QuorumStateResolver` that given a current state and set of desired synchronous nodes and replication
level outputs changes to DCS and synchronous replication in correct order to reach the desired state.
In case any of those steps causes an error we can just bail out and let next iteration rediscover the state
and retry necessary transitions.
"""
start_time = time.time()
min_sync = global_config.min_synchronous_nodes
sync_wanted = global_config.synchronous_node_count
sync = self._maybe_enable_synchronous_mode()
if not sync or not sync.leader:
return
leader = sync.leader
def _check_timeout(offset: float = 0) -> bool:
return time.time() - start_time + offset >= self.dcs.loop_wait
while True:
transition = 'break' # we need define transition value if `QuorumStateResolver` produced no changes
sync_state = self.state_handler.sync_handler.current_state(self.cluster)
for transition, leader, num, nodes in QuorumStateResolver(leader=leader,
quorum=sync.quorum,
voters=sync.voters,
numsync=sync_state.numsync,
sync=sync_state.sync,
numsync_confirmed=sync_state.numsync_confirmed,
active=sync_state.active,
sync_wanted=sync_wanted,
leader_wanted=self.state_handler.name):
if _check_timeout():
return
if transition == 'quorum':
logger.info("Setting leader to %s, quorum to %d of %d (%s)",
leader, num, len(nodes), ", ".join(sorted(nodes)))
sync = self.dcs.write_sync_state(leader, nodes, num, version=sync.version)
if not sync:
return logger.info('Synchronous replication key updated by someone else.')
elif transition == 'sync':
logger.info("Setting synchronous replication to %d of %d (%s)",
num, len(nodes), ", ".join(sorted(nodes)))
# Bump up number of num nodes to meet minimum replication factor. Commits will have to wait until
# we have enough nodes to meet replication target.
if num < min_sync:
logger.warning("Replication factor %d requested, but %d synchronous standbys available."
" Commits will be delayed.", min_sync + 1, num)
num = min_sync
self.state_handler.sync_handler.set_synchronous_standby_names(nodes, num)
if transition != 'restart' or _check_timeout(1):
return
# synchronous_standby_names was transitioned from empty to non-empty and it may take
# some time for nodes to become synchronous. In this case we want to restart state machine
# hoping that we can update /sync key earlier than in loop_wait seconds.
time.sleep(1)
self.state_handler.reset_cluster_info_state(None)
def _process_multisync_replication(self) -> None:
"""Process synchronous replication state with one or more sync standbys.
Synchronous standbys are registered in two places postgresql.conf and DCS. The order of updating them must
be right. The invariant that should be kept is that if a node is primary and sync_standby is set in DCS,
then that node must have synchronous_standby set to that value. Or more simple, first set in postgresql.conf
and then in DCS. When removing, first remove in DCS, then in postgresql.conf. This is so we only consider
promoting standbys that were guaranteed to be replicating synchronously.
.. note::
If ``synchronous_mode`` is disabled, we fall back to using the value configured by the user for
``synchronous_standby_names``, if any.
"""
if self.is_synchronous_mode():
sync = self.cluster.sync
if sync.is_empty:
# corner case: we need to explicitly enable synchronous mode by updating the
# ``/sync`` key with the current leader name and empty members. In opposite case
# it will never be automatically enabled if there are not eligible candidates.
sync = self.dcs.write_sync_state(self.state_handler.name, None, version=sync.version)
if not sync:
return logger.warning("Updating sync state failed")
logger.info("Enabled synchronous replication")
sync = self._maybe_enable_synchronous_mode()
if not sync:
return
current = CaseInsensitiveSet(sync.members)
picked, allow_promote = self.state_handler.sync_handler.current_state(self.cluster)
current_state = self.state_handler.sync_handler.current_state(self.cluster)
picked = current_state.active
allow_promote = current_state.sync
voters = CaseInsensitiveSet(sync.voters)
if picked == current and current != allow_promote:
logger.warning('Inconsistent state between synchronous_standby_names = %s and /sync = %s key '
'detected, updating synchronous replication key...', list(allow_promote), list(current))
sync = self.dcs.write_sync_state(self.state_handler.name, allow_promote, version=sync.version)
if not sync:
return logger.warning("Updating sync state failed")
current = CaseInsensitiveSet(sync.members)
if picked == voters and voters != allow_promote:
logger.warning('Inconsistent state between synchronous_standby_names = %s and /sync = %s key '
'detected, updating synchronous replication key...', list(allow_promote), list(voters))
sync = self.dcs.write_sync_state(self.state_handler.name, allow_promote, 0, version=sync.version)
if not sync:
return logger.warning("Updating sync state failed")
voters = CaseInsensitiveSet(sync.voters)
if picked != current:
# update synchronous standby list in dcs temporarily to point to common nodes in current and picked
sync_common = current & allow_promote
if sync_common != current:
logger.info("Updating synchronous privilege temporarily from %s to %s",
list(current), list(sync_common))
sync = self.dcs.write_sync_state(self.state_handler.name, sync_common, version=sync.version)
if not sync:
return logger.info('Synchronous replication key updated by someone else.')
if picked == voters:
return
# When strict mode and no suitable replication connections put "*" to synchronous_standby_names
if global_config.is_synchronous_mode_strict and not picked:
picked = CaseInsensitiveSet('*')
logger.warning("No standbys available!")
# update synchronous standby list in dcs temporarily to point to common nodes in current and picked
sync_common = voters & allow_promote
if sync_common != voters:
logger.info("Updating synchronous privilege temporarily from %s to %s",
list(voters), list(sync_common))
sync = self.dcs.write_sync_state(self.state_handler.name, sync_common, 0, version=sync.version)
if not sync:
return logger.info('Synchronous replication key updated by someone else.')
# Update postgresql.conf and wait 2 secs for changes to become active
logger.info("Assigning synchronous standby status to %s", list(picked))
self.state_handler.sync_handler.set_synchronous_standby_names(picked)
# When strict mode and no suitable replication connections put "*" to synchronous_standby_names
if global_config.is_synchronous_mode_strict and not picked:
picked = CaseInsensitiveSet('*')
logger.warning("No standbys available!")
if picked and picked != CaseInsensitiveSet('*') and allow_promote != picked:
# Wait for PostgreSQL to enable synchronous mode and see if we can immediately set sync_standby
time.sleep(2)
_, allow_promote = self.state_handler.sync_handler.current_state(self.cluster)
if allow_promote and allow_promote != sync_common:
if not self.dcs.write_sync_state(self.state_handler.name, allow_promote, version=sync.version):
return logger.info("Synchronous replication key updated by someone else")
logger.info("Synchronous standby status assigned to %s", list(allow_promote))
# Update postgresql.conf and wait 2 secs for changes to become active
logger.info("Assigning synchronous standby status to %s", list(picked))
self.state_handler.sync_handler.set_synchronous_standby_names(picked)
if picked and picked != CaseInsensitiveSet('*') and allow_promote != picked:
# Wait for PostgreSQL to enable synchronous mode and see if we can immediately set sync_standby
time.sleep(2)
allow_promote = self.state_handler.sync_handler.current_state(self.cluster).sync
if allow_promote and allow_promote != sync_common:
if self.dcs.write_sync_state(self.state_handler.name, allow_promote, 0, version=sync.version):
logger.info("Synchronous standby status assigned to %s", list(allow_promote))
else:
logger.info("Synchronous replication key updated by someone else")
def process_sync_replication(self) -> None:
"""Process synchronous replication behavior on the primary."""
if self.is_quorum_commit_mode():
# The synchronous_standby_names was adjusted right before promote.
# After that, when postgres has become a primary, we need to reflect this change
# in the /sync key. Further changes of synchronous_standby_names and /sync key should
# be postponed for `loop_wait` seconds, to give a chance to some replicas to start streaming.
# In opposite case the /sync key will end up without synchronous nodes.
if self.state_handler.is_primary():
if self._promote_timestamp == 0 or time.time() - self._promote_timestamp > self.dcs.loop_wait:
self._process_quorum_replication()
if self._promote_timestamp == 0:
self._promote_timestamp = time.time()
elif self.is_synchronous_mode():
self._process_multisync_replication()
else:
# If synchronous_mode was turned off, we need to update synchronous_standby_names in Postgres
if not self.cluster.sync.is_empty and self.dcs.delete_sync_state(version=self.cluster.sync.version):
logger.info("Disabled synchronous replication")
self.state_handler.sync_handler.set_synchronous_standby_names(CaseInsensitiveSet())
self.disable_synchronous_replication()
# As synchronous_mode is off, check if the user configured Postgres synchronous replication instead
ssn = self.state_handler.config.synchronous_standby_names
self.state_handler.config.set_synchronous_standby_names(ssn)
def process_sync_replication_prepromote(self) -> bool:
"""Handle sync replication state before promote.
If quorum replication is requested, and we can keep syncing to enough nodes satisfying the quorum invariant
we can promote immediately and let normal quorum resolver process handle any membership changes later.
Otherwise, we will just reset DCS state to ourselves and add replicas as they connect.
:returns: ``True`` if on success or ``False`` if failed to update /sync key in DCS.
"""
if not self.is_synchronous_mode():
self.disable_synchronous_replication()
return True
if self.quorum_commit_mode_is_active():
sync = CaseInsensitiveSet(self.cluster.sync.members)
numsync = len(sync) - self.cluster.sync.quorum - 1
if self.state_handler.name not in sync: # Node outside voters achieved quorum and got leader
numsync += 1
else:
sync.discard(self.state_handler.name)
else:
sync = CaseInsensitiveSet()
numsync = global_config.min_synchronous_nodes
if not self.is_quorum_commit_mode() or not self.state_handler.supports_multiple_sync and numsync > 1:
sync = CaseInsensitiveSet()
numsync = global_config.min_synchronous_nodes
# Just set ourselves as the authoritative source of truth for now. We don't want to wait for standbys
# to connect. We will try finding a synchronous standby in the next cycle.
if not self.dcs.write_sync_state(self.state_handler.name, None, 0, version=self.cluster.sync.version):
return False
self.state_handler.sync_handler.set_synchronous_standby_names(sync, numsync)
return True
def is_sync_standby(self, cluster: Cluster) -> bool:
""":returns: `True` if the current node is a synchronous standby."""
@@ -956,15 +1107,10 @@ class Ha(object):
self.process_sync_replication()
return message
else:
if self.is_synchronous_mode():
# Just set ourselves as the authoritative source of truth for now. We don't want to wait for standbys
# to connect. We will try finding a synchronous standby in the next cycle.
if not self.dcs.write_sync_state(self.state_handler.name, None, version=self.cluster.sync.version):
# Somebody else updated sync state, it may be due to us losing the lock. To be safe, postpone
# promotion until next cycle. TODO: trigger immediate retry of run_cycle
return 'Postponing promotion because synchronous replication state was updated by somebody else'
self.state_handler.sync_handler.set_synchronous_standby_names(
CaseInsensitiveSet('*') if global_config.is_synchronous_mode_strict else CaseInsensitiveSet())
if not self.process_sync_replication_prepromote():
# Somebody else updated sync state, it may be due to us losing the lock. To be safe,
# postpone promotion until next cycle. TODO: trigger immediate retry of run_cycle.
return 'Postponing promotion because synchronous replication state was updated by somebody else'
if self.state_handler.role not in ('master', 'promoted', 'primary'):
# reset failsafe state when promote
self._failsafe.set_is_active(0)
@@ -980,10 +1126,14 @@ class Ha(object):
return promote_message
def fetch_node_status(self, member: Member) -> _MemberStatus:
"""This function perform http get request on member.api_url and fetches its status
:returns: `_MemberStatus` object
"""
"""Perform http get request on member.api_url to fetch its status.
Usually this happens during the leader race and we can't afford to wait an indefinite time
for a response, therefore the request timeout is hardcoded to 2 seconds, which seems to be a
good compromise. The node which is slow to respond is most likely unhealthy.
:returns: :class:`_MemberStatus` object
"""
try:
response = self.patroni.request(member, timeout=2, retries=0)
data = response.data.decode('utf-8')
@@ -1092,18 +1242,26 @@ class Ha(object):
return ret
def is_lagging(self, wal_position: int) -> bool:
"""Returns if instance with an wal should consider itself unhealthy to be promoted due to replication lag.
"""Check if node should consider itself unhealthy to be promoted due to replication lag.
:param wal_position: Current wal position.
:returns True when node is lagging
:returns: ``True`` when node is lagging
"""
lag = self.cluster.status.last_lsn - wal_position
return lag > global_config.maximum_lag_on_failover
def _is_healthiest_node(self, members: Collection[Member], check_replication_lag: bool = True) -> bool:
"""This method tries to determine whether I am healthy enough to became a new leader candidate or not."""
"""Determine whether the current node is healthy enough to become a new leader candidate.
:param members: the list of nodes to check against
:param check_replication_lag: whether to take the replication lag into account.
If the lag exceeds configured threshold the node disqualifies itself.
:returns: ``True`` if the node is eligible to become the new leader. Since this method is executed
on multiple nodes independently it is possible that multiple nodes could count
themselves as the healthiest because they received/replayed up to the same LSN,
but this is totally fine.
"""
my_wal_position = self.state_handler.last_operation()
if check_replication_lag and self.is_lagging(my_wal_position):
logger.info('My wal position exceeds maximum replication lag')
@@ -1119,8 +1277,26 @@ class Ha(object):
logger.info('My timeline %s is behind last known cluster timeline %s', my_timeline, cluster_timeline)
return False
# Prepare list of nodes to run check against
members = [m for m in members if m.name != self.state_handler.name and not m.nofailover and m.api_url]
if self.quorum_commit_mode_is_active():
quorum = self.cluster.sync.quorum
voting_set = CaseInsensitiveSet(self.cluster.sync.members)
else:
quorum = 0
voting_set = CaseInsensitiveSet()
# Prepare list of nodes to run check against. If quorum commit is enabled
# we also include members with nofailover tag if they are listed in voters.
members = [m for m in members if m.name != self.state_handler.name
and m.api_url and (not m.nofailover or m.name in voting_set)]
# If there is a quorum active then at least one of the quorum contains latest commit. A quorum member saying
# their WAL position is not ahead counts as a vote saying we may become new leader. Note that a node doesn't
# have to be a member of the voting set to gather the necessary votes.
# Regardless of voting, if we observe a node that can become a leader and is ahead, we defer to that node.
# This can lead to failure to act on quorum if there is asymmetric connectivity.
quorum_votes = 0 if self.state_handler.name in voting_set else -1
nodes_ahead = 0
for st in self.fetch_nodes_statuses(members):
if st.failover_limitation() is None:
@@ -1128,22 +1304,34 @@ class Ha(object):
logger.warning('Primary (%s) is still alive', st.member.name)
return False
if my_wal_position < st.wal_position:
nodes_ahead += 1
logger.info('Wal position of %s is ahead of my wal position', st.member.name)
# In synchronous mode the former leader might be still accessible and even be ahead of us.
# We should not disqualify himself from the leader race in such a situation.
if not self.sync_mode_is_active() or not self.cluster.sync.leader_matches(st.member.name):
return False
logger.info('Ignoring the former leader being ahead of us')
if my_wal_position == st.wal_position and self.patroni.failover_priority < st.failover_priority:
# There's a higher priority non-lagging replica
logger.info(
'%s has equally tolerable WAL position and priority %s, while this node has priority %s',
st.member.name,
st.failover_priority,
self.patroni.failover_priority,
)
return False
return True
elif st.wal_position > 0: # we want to count votes only from nodes with postgres up and running!
quorum_vote = st.member.name in voting_set
low_priority = my_wal_position == st.wal_position \
and self.patroni.failover_priority < st.failover_priority
if low_priority and (not self.sync_mode_is_active() or quorum_vote):
# There's a higher priority non-lagging replica
logger.info(
'%s has equally tolerable WAL position and priority %s, while this node has priority %s',
st.member.name, st.failover_priority, self.patroni.failover_priority)
return False
if quorum_vote:
logger.info('Got quorum vote from %s', st.member.name)
quorum_votes += 1
# When not in quorum commit we just want to return `True`.
# In quorum commit the former leader is special and counted healthy even when there are no other nodes.
# Otherwise check that the number of votes exceeds the quorum field from the /sync key.
return not self.quorum_commit_mode_is_active() or quorum_votes >= quorum\
or nodes_ahead == 0 and self.cluster.sync.leader == self.state_handler.name
def is_failover_possible(self, *, cluster_lsn: int = 0, exclude_failover_candidate: bool = False) -> bool:
"""Checks whether any of the cluster members is allowed to promote and is healthy enough for that.
@@ -1203,9 +1391,10 @@ class Ha(object):
return None
return False
# in synchronous mode when our name is not in the /sync key
# we shouldn't take any action even if the candidate is unhealthy
if self.is_synchronous_mode() and not self.cluster.sync.matches(self.state_handler.name, True):
# in synchronous mode (except quorum commit!) when our name is not in the
# /sync key we shouldn't take any action even if the candidate is unhealthy
if self.is_synchronous_mode() and not self.is_quorum_commit_mode()\
and not self.cluster.sync.matches(self.state_handler.name, True):
return False
# find specific node and check that it is healthy
@@ -1303,9 +1492,11 @@ class Ha(object):
all_known_members += [RemoteMember(name, {'api_url': url}) for name, url in failsafe_members.items()]
all_known_members += self.cluster.members
# When in sync mode, only last known primary and sync standby are allowed to promote automatically.
# Special handling if synchronous mode was requested and activated (the leader in /sync is not empty)
if self.sync_mode_is_active():
if not self.cluster.sync.matches(self.state_handler.name, True):
# In quorum commit mode we allow nodes outside of "voters" to take part in
# the leader race. They just need to get enough votes to `reach quorum + 1`.
if not self.is_quorum_commit_mode() and not self.cluster.sync.matches(self.state_handler.name, True):
return False
# pick between synchronous candidates so we minimize unnecessary failovers/demotions
members = {m.name: m for m in all_known_members if self.cluster.sync.matches(m.name, True)}
@@ -2205,8 +2396,11 @@ class Ha(object):
exclude = [self.state_handler.name] + ([failover.candidate] if failover and exclude_failover_candidate else [])
def is_eligible(node: Member) -> bool:
# If quorum commit is requested we want to check all nodes (even not voters),
# because they could get enough votes and reach necessary quorum + 1.
# in synchronous mode we allow failover (not switchover!) to async node
if self.sync_mode_is_active() and not self.cluster.sync.matches(node.name)\
if self.sync_mode_is_active()\
and not (self.is_quorum_commit_mode() or self.cluster.sync.matches(node.name))\
and not (failover and not failover.leader):
return False
# Don't spend time on "nofailover" nodes checking.

View File

@@ -181,6 +181,11 @@ class Postgresql(object):
def lsn_name(self) -> str:
return 'lsn' if self._major_version >= 100000 else 'location'
@property
def supports_quorum_commit(self) -> bool:
"""``True`` if quorum commit is supported by Postgres."""
return self._major_version >= 100000
@property
def supports_multiple_sync(self) -> bool:
""":returns: `True` if Postgres version supports more than one synchronous node."""

View File

@@ -3,7 +3,7 @@ import re
import time
from copy import deepcopy
from typing import Collection, List, NamedTuple, Tuple, TYPE_CHECKING
from typing import Collection, List, NamedTuple, Optional, TYPE_CHECKING
from .. import global_config
from ..collections import CaseInsensitiveDict, CaseInsensitiveSet
@@ -138,7 +138,7 @@ def parse_sync_standby_names(value: str) -> _SSN:
if len(synclist) == i + 1: # except the last token
raise ValueError("Unparseable synchronous_standby_names value %r: Unexpected token %s %r at %d" %
(value, a_type, a_value, a_pos))
elif a_type != 'comma':
if a_type != 'comma':
raise ValueError("Unparseable synchronous_standby_names value %r: ""Got token %s %r while"
" expecting comma at %d" % (value, a_type, a_value, a_pos))
elif a_type in {'ident', 'first', 'any'}:
@@ -154,6 +154,26 @@ def parse_sync_standby_names(value: str) -> _SSN:
return _SSN(sync_type, has_star, num, members)
class _SyncState(NamedTuple):
"""Class representing the current synchronous state.
:ivar sync_type: possible values: ``off``, ``priority``, ``quorum``
:ivar numsync: how many nodes are required to be synchronous (according to ``synchronous_standby_names``).
Is ``0`` if ``synchronous_standby_names`` value is invalid or contains ``*``.
:ivar numsync_confirmed: how many nodes are known to be synchronous according to the ``pg_stat_replication`` view.
Only nodes that caught up with the :attr:`SyncHandler._primary_flush_lsn` are counted.
:ivar sync: collection of synchronous node names. In case of quorum commit all nodes listed
in ``synchronous_standby_names``, otherwise nodes that are confirmed to be synchronous according
to the ``pg_stat_replication`` view.
:ivar active: collection of node names that are streaming and have no restrictions to become synchronous.
"""
sync_type: str
numsync: int
numsync_confirmed: int
sync: CaseInsensitiveSet
active: CaseInsensitiveSet
class _Replica(NamedTuple):
"""Class representing a single replica that is eligible to be synchronous.
@@ -217,6 +237,8 @@ class _ReplicaList(List[_Replica]):
# Prefer replicas that are in state ``sync`` and with higher values of ``write``/``flush``/``replay`` LSN.
self.sort(key=lambda r: (r.sync_state, r.lsn), reverse=True)
# When checking ``maximum_lag_on_syncnode`` we want to compare with the most
# up-to-date replica otherwise with cluster LSN if there is only one replica.
self.max_lsn = max(self, key=lambda x: x.lsn).lsn if len(self) > 1 else postgresql.last_operation()
@@ -278,12 +300,22 @@ END;$$""")
# if standby name is listed in the /sync key we can count it as synchronous, otherwise
# it becomes really synchronous when sync_state = 'sync' and it is known that it managed to catch up
if replica.application_name not in self._ready_replicas\
and replica.application_name in self._ssn_data.members\
and (cluster.sync.matches(replica.application_name)
or replica.sync_state == 'sync' and replica.lsn >= self._primary_flush_lsn):
self._ready_replicas[replica.application_name] = replica.pid
and replica.application_name in self._ssn_data.members:
if global_config.is_quorum_commit_mode:
# When quorum commit is enabled we can't check against cluster.sync because nodes
# are written there when at least one of them caught up with _primary_flush_lsn.
if replica.lsn >= self._primary_flush_lsn\
and (replica.sync_state == 'quorum'
or (not self._postgresql.supports_quorum_commit
and replica.sync_state in ('sync', 'potential'))):
self._ready_replicas[replica.application_name] = replica.pid
elif cluster.sync.matches(replica.application_name)\
or replica.sync_state == 'sync' and replica.lsn >= self._primary_flush_lsn:
# if standby name is listed in the /sync key we can count it as synchronous, otherwise it becomes
# "really" synchronous when sync_state = 'sync' and we known that it managed to catch up
self._ready_replicas[replica.application_name] = replica.pid
def current_state(self, cluster: Cluster) -> Tuple[CaseInsensitiveSet, CaseInsensitiveSet]:
def current_state(self, cluster: Cluster) -> _SyncState:
"""Find the best candidates to be the synchronous standbys.
Current synchronous standby is always preferred, unless it has disconnected or does not want to be a
@@ -291,51 +323,86 @@ END;$$""")
Standbys are selected based on values from the global configuration:
- `maximum_lag_on_syncnode`: would help swapping unhealthy sync replica in case if it stops
responding (or hung). Please set the value high enough so it won't unncessarily swap sync
standbys during high loads. Any value less or equal of 0 keeps the behavior backward compatible.
Please note that it will not also swap sync standbys in case where all replicas are hung.
- `synchronous_node_count`: controlls how many nodes should be set as synchronous.
- ``maximum_lag_on_syncnode``: would help swapping unhealthy sync replica in case it stops
responding (or hung). Please set the value high enough, so it won't unnecessarily swap sync
standbys during high loads. Any value less or equal to ``0`` keeps the behavior backwards compatible.
Please note that it will also not swap sync standbys when all replicas are hung.
:returns: tuple of candidates :class:`CaseInsensitiveSet` and synchronous standbys :class:`CaseInsensitiveSet`.
- ``synchronous_node_count``: controls how many nodes should be set as synchronous.
:param cluster: current cluster topology from DCS
:returns: current synchronous replication state as a :class:`_SyncState` object
"""
self._handle_synchronous_standby_names_change()
replica_list = _ReplicaList(self._postgresql, cluster)
self._process_replica_readiness(cluster, replica_list)
active = CaseInsensitiveSet()
sync_nodes = CaseInsensitiveSet()
numsync_confirmed = 0
sync_node_count = global_config.synchronous_node_count if self._postgresql.supports_multiple_sync else 1
sync_node_maxlag = global_config.maximum_lag_on_syncnode
candidates = CaseInsensitiveSet()
sync_nodes = CaseInsensitiveSet()
# Prefer members without nofailover tag. We are relying on the fact that sorts are guaranteed to be stable.
for replica in sorted(replica_list, key=lambda x: x.nofailover):
if sync_node_maxlag <= 0 or replica_list.max_lsn - replica.lsn <= sync_node_maxlag:
candidates.add(replica.application_name)
if replica.sync_state == 'sync' and replica.application_name in self._ready_replicas:
sync_nodes.add(replica.application_name)
if len(candidates) >= sync_node_count:
break
if global_config.is_quorum_commit_mode:
# We do not add nodes with `nofailover` enabled because that reduces availability.
# We need to check LSN quorum only among nodes that are promotable because
# there is a chance that a non-promotable node is ahead of a promotable one.
if not replica.nofailover or len(active) < sync_node_count:
if replica.application_name in self._ready_replicas:
numsync_confirmed += 1
active.add(replica.application_name)
else:
active.add(replica.application_name)
if replica.sync_state == 'sync' and replica.application_name in self._ready_replicas:
sync_nodes.add(replica.application_name)
numsync_confirmed += 1
if len(active) >= sync_node_count:
break
return candidates, sync_nodes
if global_config.is_quorum_commit_mode:
sync_nodes = CaseInsensitiveSet() if self._ssn_data.has_star else self._ssn_data.members
def set_synchronous_standby_names(self, sync: Collection[str]) -> None:
"""Constructs and sets "synchronous_standby_names" GUC value.
return _SyncState(
self._ssn_data.sync_type,
0 if self._ssn_data.has_star else self._ssn_data.num,
numsync_confirmed,
sync_nodes,
active)
def set_synchronous_standby_names(self, sync: Collection[str], num: Optional[int] = None) -> None:
"""Constructs and sets ``synchronous_standby_names`` GUC value.
.. note::
standbys in ``synchronous_standby_names`` will be sorted by name.
:param sync: set of nodes to sync to
:param num: specifies number of nodes to sync to. The *num* is set only in case if quorum commit is enabled
"""
has_asterisk = '*' in sync
# Special case. If sync nodes set is empty but requested num of sync nodes >= 1
# we want to set synchronous_standby_names to '*'
has_asterisk = '*' in sync or num and num >= 1 and not sync
if has_asterisk:
sync = ['*']
else:
sync = [quote_ident(x) for x in sync]
sync = [quote_ident(x) for x in sorted(sync)]
if self._postgresql.supports_multiple_sync and len(sync) > 1:
sync_param = '{0} ({1})'.format(len(sync), ','.join(sync))
if num is None:
num = len(sync)
sync_param = ','.join(sync)
else:
sync_param = next(iter(sync), None)
if global_config.is_quorum_commit_mode and sync or self._postgresql.supports_multiple_sync and len(sync) > 1:
prefix = 'ANY ' if global_config.is_quorum_commit_mode and self._postgresql.supports_quorum_commit else ''
sync_param = f'{prefix}{num} ({sync_param})'
if not (self._postgresql.config.set_synchronous_standby_names(sync_param)
and self._postgresql.state == 'running' and self._postgresql.is_primary()) or has_asterisk:
return

431
patroni/quorum.py Normal file
View File

@@ -0,0 +1,431 @@
"""Implement state machine to manage ``synchronous_standby_names`` GUC and ``/sync`` key in DCS."""
import logging
from typing import Collection, Iterator, NamedTuple, Optional
from .collections import CaseInsensitiveSet
from .exceptions import PatroniException
logger = logging.getLogger(__name__)
class Transition(NamedTuple):
"""Object describing transition of ``/sync`` or ``synchronous_standby_names`` to the new state.
.. note::
Object attributes represent the new state.
:ivar transition_type: possible values:
* ``sync`` - indicates that we needed to update ``synchronous_standby_names``.
* ``quorum`` - indicates that we need to update ``/sync`` key in DCS.
* ``restart`` - caller should stop iterating over transitions and restart :class:`QuorumStateResolver`.
:ivar leader: the new value of the ``leader`` field in the ``/sync`` key.
:ivar num: the new value of the synchronous nodes count in ``synchronous_standby_names`` or value of the ``quorum``
field in the ``/sync`` key for :attr:`transition_type` values ``sync`` and ``quorum`` respectively.
:ivar names: the new value of node names listed in ``synchronous_standby_names`` or value of ``voters``
field in the ``/sync`` key for :attr:`transition_type` values ``sync`` and ``quorum`` respectively.
"""
transition_type: str
leader: str
num: int
names: CaseInsensitiveSet
class QuorumError(PatroniException):
"""Exception indicating that the quorum state is broken."""
class QuorumStateResolver:
"""Calculates a list of state transitions and yields them as :class:`Transition` named tuples.
Synchronous replication state is set in two places:
* PostgreSQL configuration sets how many and which nodes are needed for a commit to succeed, abbreviated as
``numsync`` and ``sync`` set here;
* DCS contains information about how many and which nodes need to be interrogated to be sure to see an wal position
containing latest confirmed commit, abbreviated as ``quorum`` and ``voters`` set.
.. note::
Both of above pairs have the meaning "ANY n OF set".
The number of nodes needed for commit to succeed, ``numsync``, is also called the replication factor.
To guarantee zero transaction loss on failover we need to keep the invariant that at all times any subset of
nodes that can acknowledge a commit overlaps with any subset of nodes that can achieve quorum to promote a new
leader. Given a desired replication factor and a set of nodes able to participate in sync replication there
is one optimal state satisfying this condition. Given the node set ``active``, the optimal state is::
sync = voters = active
numsync = min(sync_wanted, len(active))
quorum = len(active) - numsync
We need to be able to produce a series of state changes that take the system to this desired state from any
other arbitrary state given arbitrary changes is node availability, configuration and interrupted transitions.
To keep the invariant the rule to follow is that when increasing ``numsync`` or ``quorum``, we need to perform the
increasing operation first. When decreasing either, the decreasing operation needs to be performed later. In other
words:
* If a user increases ``synchronous_node_count`` configuration, first we increase ``synchronous_standby_names``
(``numsync``), then we decrease ``quorum`` field in the ``/sync`` key;
* If a user decreases ``synchronous_node_count`` configuration, first we increase ``quorum`` field in the ``/sync``
key, then we decrease ``synchronous_standby_names`` (``numsync``).
Order of adding or removing nodes from ``sync`` and ``voters`` depends on the state of
``synchronous_standby_names``.
When adding new nodes::
if ``sync`` (``synchronous_standby_names``) is empty:
add new nodes first to ``sync`` and then to ``voters`` when ``numsync_confirmed`` > ``0``.
else:
add new nodes first to ``voters`` and then to ``sync``.
When removing nodes::
if ``sync`` (``synchronous_standby_names``) will become empty after removal:
first remove nodes from ``voters`` and then from ``sync``.
else:
first remove nodes from ``sync`` and then from ``voters``.
Make ``voters`` empty if ``numsync_confirmed`` == ``0``.
:ivar leader: name of the leader, according to the ``/sync`` key.
:ivar quorum: ``quorum`` value from the ``/sync`` key, the minimal number of nodes we need see
when doing the leader race.
:ivar voters: ``sync_standby`` value from the ``/sync`` key, set of node names we will be
running the leader race against.
:ivar numsync: the number of synchronous nodes from the ``synchronous_standby_names``.
:ivar sync: set of node names listed in the ``synchronous_standby_names``.
:ivar numsync_confirmed: the number of nodes that are confirmed to reach "safe" LSN after they were added to the
``synchronous_standby_names``.
:ivar active: set of node names that are replicating from the primary (according to ``pg_stat_replication``)
and are eligible to be listed in ``synchronous_standby_names``.
:ivar sync_wanted: desired number of synchronous nodes (``synchronous_node_count`` from the global configuration).
:ivar leader_wanted: the desired leader (could be different from the :attr:`leader` right after a failover).
"""
def __init__(self, leader: str, quorum: int, voters: Collection[str],
numsync: int, sync: Collection[str], numsync_confirmed: int,
active: Collection[str], sync_wanted: int, leader_wanted: str) -> None:
"""Instantiate :class:``QuorumStateResolver`` based on input parameters.
:param leader: name of the leader, according to the ``/sync`` key.
:param quorum: ``quorum`` value from the ``/sync`` key, the minimal number of nodes we need see
when doing the leader race.
:param voters: ``sync_standby`` value from the ``/sync`` key, set of node names we will be
running the leader race against.
:param numsync: the number of synchronous nodes from the ``synchronous_standby_names``.
:param sync: Set of node names listed in the ``synchronous_standby_names``.
:param numsync_confirmed: the number of nodes that are confirmed to reach "safe" LSN after
they were added to the ``synchronous_standby_names``.
:param active: set of node names that are replicating from the primary (according to ``pg_stat_replication``)
and are eligible to be listed in ``synchronous_standby_names``.
:param sync_wanted: desired number of synchronous nodes
(``synchronous_node_count`` from the global configuration).
:param leader_wanted: the desired leader (could be different from the *leader* right after a failover).
"""
self.leader = leader
self.quorum = quorum
self.voters = CaseInsensitiveSet(voters)
self.numsync = min(numsync, len(sync)) # numsync can't be bigger than number of listed synchronous nodes.
self.sync = CaseInsensitiveSet(sync)
self.numsync_confirmed = numsync_confirmed
self.active = CaseInsensitiveSet(active)
self.sync_wanted = sync_wanted
self.leader_wanted = leader_wanted
def check_invariants(self) -> None:
"""Checks invariant of ``synchronous_standby_names`` and ``/sync`` key in DCS.
.. seealso::
Check :class:`QuorumStateResolver`'s docstring for more information.
:raises:
:exc:`QuorumError`: in case of broken state"""
voters = CaseInsensitiveSet(self.voters | CaseInsensitiveSet([self.leader]))
sync = CaseInsensitiveSet(self.sync | CaseInsensitiveSet([self.leader_wanted]))
# We need to verify that subset of nodes that can acknowledge a commit overlaps
# with any subset of nodes that can achieve quorum to promote a new leader.
# ``+ 1`` is required because the leader is included in the set.
if self.voters and not (len(voters | sync) <= self.quorum + self.numsync + 1):
len_nodes = len(voters | sync)
raise QuorumError("Quorum and sync not guaranteed to overlap: "
f"nodes {len_nodes} >= quorum {self.quorum} + sync {self.sync} + 1")
# unstable cases, we are changing synchronous_standby_names and /sync key
# one after another, hence one set is allowed to be a subset of another
if not (voters.issubset(sync) or sync.issubset(voters)):
voters_only = voters - sync
sync_only = sync - voters
raise QuorumError(f"Mismatched sets: voter only={voters_only} sync only={sync_only}")
def quorum_update(self, quorum: int, voters: CaseInsensitiveSet, leader: Optional[str] = None,
adjust_quorum: Optional[bool] = True) -> Iterator[Transition]:
"""Updates :attr:`quorum`, :attr:`voters` and optionally :attr:`leader` fields.
:param quorum: the new value for :attr:`quorum`, could be adjusted depending
on values of :attr:`numsync_confirmed` and *adjust_quorum*.
:param voters: the new value for :attr:`voters`, could be adjusted if :attr:`numsync_confirmed` == ``0``.
:param leader: the new value for :attr:`leader`, optional.
:param adjust_quorum: if set to ``True`` the quorum requirement will be increased by the
difference between :attr:`numsync` and :attr:`numsync_confirmed`.
:yields: the new state of the ``/sync`` key as a :class:`Transition` object.
:raises:
:exc:`QuorumError` in case of invalid data or if the invariant after transition could not be satisfied.
"""
if quorum < 0:
raise QuorumError(f'Quorum {quorum} < 0 of ({voters})')
if quorum > 0 and quorum >= len(voters):
raise QuorumError(f'Quorum {quorum} >= N of ({voters})')
old_leader = self.leader
if leader is not None: # Change of leader was requested
self.leader = leader
elif self.numsync_confirmed == 0:
# If there are no nodes that known to caught up with the primary we want to reset quorum/voters in /sync key
quorum = 0
voters = CaseInsensitiveSet()
elif adjust_quorum:
# It could be that the number of nodes that are known to catch up with the primary is below desired numsync.
# We want to increase quorum to guarantee that the sync node will be found during the leader race.
quorum += max(self.numsync - self.numsync_confirmed, 0)
if (self.leader, quorum, voters) == (old_leader, self.quorum, self.voters):
if self.voters:
return
# If transition produces no change of leader/quorum/voters we want to give a hint to
# the caller to fetch the new state from the database and restart QuorumStateResolver.
yield Transition('restart', self.leader, self.quorum, self.voters)
self.quorum = quorum
self.voters = voters
self.check_invariants()
logger.debug('quorum %s %s %s', self.leader, self.quorum, self.voters)
yield Transition('quorum', self.leader, self.quorum, self.voters)
def sync_update(self, numsync: int, sync: CaseInsensitiveSet) -> Iterator[Transition]:
"""Updates :attr:`numsync` and :attr:`sync` fields.
:param numsync: the new value for :attr:`numsync`.
:param sync: the new value for :attr:`sync`:
:yields: the new state of ``synchronous_standby_names`` as a :class:`Transition` object.
:raises:
:exc:`QuorumError` in case of invalid data or if invariant after transition could not be satisfied
"""
if numsync < 0:
raise QuorumError(f'Sync {numsync} < 0 of ({sync})')
if numsync > len(sync):
raise QuorumError(f'Sync {numsync} > N of ({sync})')
self.numsync = numsync
self.sync = sync
self.check_invariants()
logger.debug('sync %s %s %s', self.leader, self.numsync, self.sync)
yield Transition('sync', self.leader, self.numsync, self.sync)
def __iter__(self) -> Iterator[Transition]:
"""Iterate over the transitions produced by :meth:`_generate_transitions`.
.. note::
Merge two transitions of the same type to a single one.
This is always safe because skipping the first transition is equivalent
to no one observing the intermediate state.
:yields: transitions as :class:`Transition` objects.
"""
transitions = list(self._generate_transitions())
for cur_transition, next_transition in zip(transitions, transitions[1:] + [None]):
if isinstance(next_transition, Transition) \
and cur_transition.transition_type == next_transition.transition_type:
continue
yield cur_transition
if cur_transition.transition_type == 'restart':
break
def __handle_non_steady_cases(self) -> Iterator[Transition]:
"""Handle cases when set of transitions produced on previous run was interrupted.
:yields: transitions as :class:`Transition` objects.
"""
if self.sync < self.voters:
logger.debug("Case 1: synchronous_standby_names %s is a subset of DCS state %s", self.sync, self.voters)
# Case 1: voters is superset of sync nodes. In the middle of changing voters (quorum).
# Evict dead nodes from voters that are not being synced.
remove_from_voters = self.voters - (self.sync | self.active)
if remove_from_voters:
yield from self.quorum_update(
quorum=len(self.voters) - len(remove_from_voters) - self.numsync,
voters=CaseInsensitiveSet(self.voters - remove_from_voters),
adjust_quorum=not (self.sync - self.active))
# Start syncing to nodes that are in voters and alive
add_to_sync = (self.voters & self.active) - self.sync
if add_to_sync:
yield from self.sync_update(self.numsync, CaseInsensitiveSet(self.sync | add_to_sync))
elif self.sync > self.voters:
logger.debug("Case 2: synchronous_standby_names %s is a superset of DCS state %s", self.sync, self.voters)
# Case 2: sync is superset of voters nodes. In the middle of changing replication factor (sync).
# Add to voters nodes that are already synced and active
add_to_voters = (self.sync - self.voters) & self.active
if add_to_voters:
voters = CaseInsensitiveSet(self.voters | add_to_voters)
yield from self.quorum_update(len(voters) - self.numsync, voters)
# Remove from sync nodes that are dead
remove_from_sync = self.sync - self.voters
if remove_from_sync:
yield from self.sync_update(
numsync=min(self.numsync, len(self.sync) - len(remove_from_sync)),
sync=CaseInsensitiveSet(self.sync - remove_from_sync))
# After handling these two cases voters and sync must match.
assert self.voters == self.sync
safety_margin = self.quorum + min(self.numsync, self.numsync_confirmed) - len(self.voters | self.sync)
if safety_margin > 0: # In the middle of changing replication factor.
if self.numsync > self.sync_wanted:
numsync = max(self.sync_wanted, len(self.voters) - self.quorum)
logger.debug('Case 3: replication factor %d is bigger than needed %d', self.numsync, numsync)
yield from self.sync_update(numsync, self.sync)
else:
quorum = len(self.sync) - self.numsync
logger.debug('Case 4: quorum %d is bigger than needed %d', self.quorum, quorum)
yield from self.quorum_update(quorum, self.voters)
else:
safety_margin = self.quorum + self.numsync - len(self.voters | self.sync)
if self.numsync == self.sync_wanted and safety_margin > 0 and self.numsync > self.numsync_confirmed:
yield from self.quorum_update(len(self.sync) - self.numsync, self.voters)
def __remove_gone_nodes(self) -> Iterator[Transition]:
"""Remove inactive nodes from ``synchronous_standby_names`` and from ``/sync`` key.
:yields: transitions as :class:`Transition` objects.
"""
to_remove = self.sync - self.active
if to_remove and self.sync == to_remove:
logger.debug("Removing nodes: %s", to_remove)
yield from self.quorum_update(0, CaseInsensitiveSet(), adjust_quorum=False)
yield from self.sync_update(0, CaseInsensitiveSet())
elif to_remove:
logger.debug("Removing nodes: %s", to_remove)
can_reduce_quorum_by = self.quorum
# If we can reduce quorum size try to do so first
if can_reduce_quorum_by:
# Pick nodes to remove by sorted order to provide deterministic behavior for tests
remove = CaseInsensitiveSet(sorted(to_remove, reverse=True)[:can_reduce_quorum_by])
sync = CaseInsensitiveSet(self.sync - remove)
# when removing nodes from sync we can safely increase numsync if requested
numsync = min(self.sync_wanted, len(sync)) if self.sync_wanted > self.numsync else self.numsync
yield from self.sync_update(numsync, sync)
voters = CaseInsensitiveSet(self.voters - remove)
to_remove &= self.sync
yield from self.quorum_update(len(voters) - self.numsync, voters,
adjust_quorum=not to_remove)
if to_remove:
assert self.quorum == 0
numsync = self.numsync - len(to_remove)
sync = CaseInsensitiveSet(self.sync - to_remove)
voters = CaseInsensitiveSet(self.voters - to_remove)
sync_decrease = numsync - min(self.sync_wanted, len(sync))
quorum = min(sync_decrease, len(voters) - 1) if sync_decrease else 0
yield from self.quorum_update(quorum, voters, adjust_quorum=False)
yield from self.sync_update(numsync, sync)
def __add_new_nodes(self) -> Iterator[Transition]:
"""Add new active nodes to ``synchronous_standby_names`` and to ``/sync`` key.
:yields: transitions as :class:`Transition` objects.
"""
to_add = self.active - self.sync
if to_add:
# First get to requested replication factor
logger.debug("Adding nodes: %s", to_add)
sync_wanted = min(self.sync_wanted, len(self.sync | to_add))
increase_numsync_by = sync_wanted - self.numsync
if increase_numsync_by > 0:
if self.sync:
add = CaseInsensitiveSet(sorted(to_add)[:increase_numsync_by])
increase_numsync_by = len(add)
else: # there is only the leader
add = to_add # and it is safe to add all nodes at once if sync is empty
yield from self.sync_update(self.numsync + increase_numsync_by, CaseInsensitiveSet(self.sync | add))
voters = CaseInsensitiveSet(self.voters | add)
yield from self.quorum_update(len(voters) - sync_wanted, voters)
to_add -= self.sync
if to_add:
voters = CaseInsensitiveSet(self.voters | to_add)
yield from self.quorum_update(len(voters) - sync_wanted, voters,
adjust_quorum=sync_wanted > self.numsync_confirmed)
yield from self.sync_update(sync_wanted, CaseInsensitiveSet(self.sync | to_add))
def __handle_replication_factor_change(self) -> Iterator[Transition]:
"""Handle change of the replication factor (:attr:`sync_wanted`, aka ``synchronous_node_count``).
:yields: transitions as :class:`Transition` objects.
"""
# Apply requested replication factor change
sync_increase = min(self.sync_wanted, len(self.sync)) - self.numsync
if sync_increase > 0:
# Increase replication factor
logger.debug("Increasing replication factor to %s", self.numsync + sync_increase)
yield from self.sync_update(self.numsync + sync_increase, self.sync)
yield from self.quorum_update(len(self.voters) - self.numsync, self.voters)
elif sync_increase < 0:
# Reduce replication factor
logger.debug("Reducing replication factor to %s", self.numsync + sync_increase)
if self.quorum - sync_increase < len(self.voters):
yield from self.quorum_update(len(self.voters) - self.numsync - sync_increase, self.voters,
adjust_quorum=self.sync_wanted > self.numsync_confirmed)
yield from self.sync_update(self.numsync + sync_increase, self.sync)
def _generate_transitions(self) -> Iterator[Transition]:
"""Produce a set of changes to safely transition from the current state to the desired.
:yields: transitions as :class:`Transition` objects.
"""
logger.debug("Quorum state: leader %s quorum %s, voters %s, numsync %s, sync %s, "
"numsync_confirmed %s, active %s, sync_wanted %s leader_wanted %s",
self.leader, self.quorum, self.voters, self.numsync, self.sync,
self.numsync_confirmed, self.active, self.sync_wanted, self.leader_wanted)
try:
if self.leader_wanted != self.leader: # failover
voters = (self.voters - CaseInsensitiveSet([self.leader_wanted])) | CaseInsensitiveSet([self.leader])
if not self.sync:
# If sync is empty we need to update synchronous_standby_names first
numsync = len(voters) - self.quorum
yield from self.sync_update(numsync, CaseInsensitiveSet(voters))
# If leader changed we need to add the old leader to quorum (voters)
yield from self.quorum_update(self.quorum, CaseInsensitiveSet(voters), self.leader_wanted)
# right after promote there could be no replication connections yet
if not self.sync & self.active:
return # give another loop_wait seconds for replicas to reconnect before removing them from quorum
else:
self.check_invariants()
except QuorumError as e:
logger.warning('%s', e)
yield from self.quorum_update(len(self.sync) - self.numsync, self.sync)
assert self.leader == self.leader_wanted
# numsync_confirmed could be 0 after restart/failover, we will calculate it from quorum
if self.numsync_confirmed == 0 and self.sync & self.active:
self.numsync_confirmed = min(len(self.sync & self.active), len(self.voters) - self.quorum)
logger.debug('numsync_confirmed=0, adjusting it to %d', self.numsync_confirmed)
yield from self.__handle_non_steady_cases()
# We are in a steady state point. Find if desired state is different and act accordingly.
yield from self.__remove_gone_nodes()
yield from self.__add_new_nodes()
yield from self.__handle_replication_factor_change()

View File

@@ -922,7 +922,7 @@ def cluster_as_json(cluster: 'Cluster') -> Dict[str, Any]:
* ``members``: list of members in the cluster. Each value is a :class:`dict` that may have the following keys:
* ``name``: the name of the host (unique in the cluster). The ``members`` list is sorted by this key;
* ``role``: ``leader``, ``standby_leader``, ``sync_standby``, or ``replica``;
* ``role``: ``leader``, ``standby_leader``, ``sync_standby``, ``quorum_standby``, or ``replica``;
* ``state``: ``stopping``, ``stopped``, ``stop failed``, ``crashed``, ``running``, ``starting``,
``start failed``, ``restarting``, ``restart failed``, ``initializing new cluster``, ``initdb failed``,
``running custom bootstrap script``, ``custom bootstrap failed``, or ``creating replica``;
@@ -949,11 +949,12 @@ def cluster_as_json(cluster: 'Cluster') -> Dict[str, Any]:
cluster_lsn = cluster.status.last_lsn
ret: Dict[str, Any] = {'members': []}
sync_role = 'quorum_standby' if config.is_quorum_commit_mode else 'sync_standby'
for m in cluster.members:
if m.name == leader_name:
role = 'standby_leader' if config.is_standby_cluster else 'leader'
elif config.is_synchronous_mode and cluster.sync.matches(m.name):
role = 'sync_standby'
role = sync_role
else:
role = 'replica'

View File

@@ -205,7 +205,6 @@ class TestRestApiHandler(unittest.TestCase):
def test_do_GET(self):
MockPostgresql.pending_restart_reason = {'max_connections': get_param_diff('200', '100')}
MockPatroni.dcs.cluster.status.last_lsn = 20
MockPatroni.dcs.cluster.sync.members = [MockPostgresql.name]
with patch.object(global_config.__class__, 'is_synchronous_mode', PropertyMock(return_value=True)):
MockRestApiServer(RestApiHandler, 'GET /replica')
MockRestApiServer(RestApiHandler, 'GET /replica?lag=1M')
@@ -223,12 +222,16 @@ class TestRestApiHandler(unittest.TestCase):
Mock(return_value={'role': 'replica', 'sync_standby': True})):
MockRestApiServer(RestApiHandler, 'GET /synchronous')
MockRestApiServer(RestApiHandler, 'GET /read-only-sync')
with patch.object(RestApiHandler, 'get_postgresql_status',
Mock(return_value={'role': 'replica', 'quorum_standby': True})):
MockRestApiServer(RestApiHandler, 'GET /quorum')
MockRestApiServer(RestApiHandler, 'GET /read-only-quorum')
with patch.object(RestApiHandler, 'get_postgresql_status', Mock(return_value={'role': 'replica'})):
MockPatroni.dcs.cluster.sync.members = []
MockRestApiServer(RestApiHandler, 'GET /asynchronous')
with patch.object(MockHa, 'is_leader', Mock(return_value=True)):
MockRestApiServer(RestApiHandler, 'GET /replica')
MockRestApiServer(RestApiHandler, 'GET /read-only-sync')
MockRestApiServer(RestApiHandler, 'GET /read-only-quorum')
with patch.object(global_config.__class__, 'is_standby_cluster', Mock(return_value=True)):
MockRestApiServer(RestApiHandler, 'GET /standby_leader')
MockPatroni.dcs.cluster = None

View File

@@ -344,7 +344,7 @@ class TestEtcd(unittest.TestCase):
self.assertTrue(self.etcd.watch(None, 1))
def test_sync_state(self):
self.assertIsNone(self.etcd.write_sync_state('leader', None))
self.assertIsNone(self.etcd.write_sync_state('leader', None, 0))
self.assertFalse(self.etcd.delete_sync_state())
def test_set_history_value(self):

View File

@@ -18,6 +18,7 @@ from patroni.postgresql.config import ConfigHandler
from patroni.postgresql.postmaster import PostmasterProcess
from patroni.postgresql.rewind import Rewind
from patroni.postgresql.slots import SlotsHandler
from patroni.postgresql.sync import _SyncState
from patroni.utils import tzutc
from patroni.watchdog import Watchdog
@@ -63,7 +64,7 @@ def get_cluster_initialized_without_leader(leader=False, failover=None, sync=Non
'tags': {'clonefrom': True},
'scheduled_restart': {'schedule': "2100-01-01 10:53:07.560445+00:00",
'postgres_version': '99.0.0'}})
syncstate = SyncState(0 if sync else None, sync and sync[0], sync and sync[1])
syncstate = SyncState(0 if sync else None, sync and sync[0], sync and sync[1], 0)
failsafe = {m.name: m.api_url for m in (m1, m2)} if failsafe else None
return get_cluster(SYSID, leader, [m1, m2], failover, syncstate, cluster_config, failsafe)
@@ -207,6 +208,7 @@ class TestHa(PostgresInit):
@patch('patroni.dcs.dcs_modules', Mock(return_value=['patroni.dcs.etcd']))
@patch.object(etcd.Client, 'read', etcd_read)
@patch.object(AbstractEtcdClientWithFailover, '_get_machines_list', Mock(return_value=['http://remotehost:2379']))
@patch.object(Config, '_load_cache', Mock())
def setUp(self):
super(TestHa, self).setUp()
self.p.set_state('running')
@@ -1309,7 +1311,7 @@ class TestHa(PostgresInit):
self.ha.demote('immediate')
follow.assert_called_once_with(None)
def test_process_sync_replication(self):
def test__process_multisync_replication(self):
self.ha.has_lock = true
mock_set_sync = self.p.sync_handler.set_synchronous_standby_names = Mock()
mock_cfg_set_sync = self.p.config.set_synchronous_standby_names = Mock()
@@ -1339,8 +1341,9 @@ class TestHa(PostgresInit):
self.ha.is_synchronous_mode = true
# Test sync standby not touched when picking the same node
self.p.sync_handler.current_state = Mock(return_value=(CaseInsensitiveSet(['other']),
CaseInsensitiveSet(['other'])))
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 1, 1,
CaseInsensitiveSet(['other']),
CaseInsensitiveSet(['other'])))
self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'other'))
self.ha.run_cycle()
mock_set_sync.assert_not_called()
@@ -1349,15 +1352,17 @@ class TestHa(PostgresInit):
mock_cfg_set_sync.reset_mock()
# Test sync standby is replaced when switching standbys
self.p.sync_handler.current_state = Mock(return_value=(CaseInsensitiveSet(['other2']), CaseInsensitiveSet()))
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, 0, CaseInsensitiveSet(),
CaseInsensitiveSet(['other2'])))
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
self.ha.run_cycle()
mock_set_sync.assert_called_once_with(CaseInsensitiveSet(['other2']))
mock_cfg_set_sync.assert_not_called()
# Test sync standby is replaced when new standby is joined
self.p.sync_handler.current_state = Mock(return_value=(CaseInsensitiveSet(['other2', 'other3']),
CaseInsensitiveSet(['other2'])))
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 1, 1,
CaseInsensitiveSet(['other2']),
CaseInsensitiveSet(['other2', 'other3'])))
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
self.ha.run_cycle()
self.assertEqual(mock_set_sync.call_args_list[0][0], (CaseInsensitiveSet(['other2']),))
@@ -1378,8 +1383,9 @@ class TestHa(PostgresInit):
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
self.ha.dcs.get_cluster = Mock(return_value=get_cluster_initialized_with_leader(sync=('leader', 'other')))
# self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'other'))
self.p.sync_handler.current_state = Mock(return_value=(CaseInsensitiveSet(['other2']),
CaseInsensitiveSet(['other2'])))
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 1, 1,
CaseInsensitiveSet(['other2']),
CaseInsensitiveSet(['other2'])))
self.ha.run_cycle()
self.assertEqual(self.ha.dcs.write_sync_state.call_count, 2)
@@ -1402,9 +1408,10 @@ class TestHa(PostgresInit):
# Test sync set to '*' when synchronous_mode_strict is enabled
mock_set_sync.reset_mock()
mock_cfg_set_sync.reset_mock()
self.p.sync_handler.current_state = Mock(return_value=(CaseInsensitiveSet(), CaseInsensitiveSet()))
self.ha.cluster.config.data['synchronous_mode_strict'] = True
self.ha.run_cycle()
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, 0, CaseInsensitiveSet(),
CaseInsensitiveSet()))
with patch.object(global_config.__class__, 'is_synchronous_mode_strict', PropertyMock(return_value=True)):
self.ha.run_cycle()
mock_set_sync.assert_called_once_with(CaseInsensitiveSet('*'))
mock_cfg_set_sync.assert_not_called()
@@ -1432,8 +1439,8 @@ class TestHa(PostgresInit):
# When we just became primary nobody is sync
self.assertEqual(self.ha.enforce_primary_role('msg', 'promote msg'), 'promote msg')
mock_set_sync.assert_called_once_with(CaseInsensitiveSet())
mock_write_sync.assert_called_once_with('leader', None, version=0)
mock_set_sync.assert_called_once_with(CaseInsensitiveSet(), 0)
mock_write_sync.assert_called_once_with('leader', None, 0, version=0)
mock_set_sync.reset_mock()
@@ -1471,7 +1478,7 @@ class TestHa(PostgresInit):
mock_acquire.assert_called_once()
mock_follow.assert_not_called()
mock_promote.assert_called_once()
mock_write_sync.assert_called_once_with('other', None, version=0)
mock_write_sync.assert_called_once_with('other', None, 0, version=0)
def test_disable_sync_when_restarting(self):
self.ha.is_synchronous_mode = true
@@ -1513,7 +1520,8 @@ class TestHa(PostgresInit):
self.ha.is_synchronous_mode = true
self.ha.has_lock = true
self.p.name = 'leader'
self.p.sync_handler.current_state = Mock(return_value=(CaseInsensitiveSet(), CaseInsensitiveSet()))
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, 0,
CaseInsensitiveSet(), CaseInsensitiveSet()))
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
with patch('patroni.ha.logger.info') as mock_logger:
self.ha.run_cycle()
@@ -1529,7 +1537,8 @@ class TestHa(PostgresInit):
self.ha.has_lock = true
self.p.name = 'leader'
self.ha.cluster = get_cluster_initialized_without_leader(sync=('leader', 'a'))
self.p.sync_handler.current_state = Mock(return_value=(CaseInsensitiveSet('a'), CaseInsensitiveSet()))
self.p.sync_handler.current_state = Mock(return_value=_SyncState('priority', 0, 0,
CaseInsensitiveSet(), CaseInsensitiveSet('a')))
self.ha.dcs.write_sync_state = Mock(return_value=SyncState.empty())
mock_set_sync = self.p.sync_handler.set_synchronous_standby_names = Mock()
with patch('patroni.ha.logger.warning') as mock_logger:
@@ -1699,3 +1708,113 @@ class TestHa(PostgresInit):
mock_logger.assert_called()
self.assertTrue(mock_logger.call_args[0][0].startswith('Request to %s coordinator leader'))
self.assertEqual(mock_logger.call_args[0][1], 'Citus')
@patch.object(global_config.__class__, 'is_synchronous_mode', PropertyMock(return_value=True))
@patch.object(global_config.__class__, 'is_quorum_commit_mode', PropertyMock(return_value=True))
def test_process_sync_replication_prepromote(self):
self.p._major_version = 90500
self.ha.cluster = get_cluster_initialized_without_leader(sync=('other', self.p.name + ',foo'))
self.p.is_primary = false
self.p.set_role('replica')
mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=None)
# Postgres 9.5, write_sync_state to DCS failed
self.assertEqual(self.ha.run_cycle(),
'Postponing promotion because synchronous replication state was updated by somebody else')
self.assertEqual(self.ha.dcs.write_sync_state.call_count, 1)
self.assertEqual(mock_write_sync.call_args_list[0][0], (self.p.name, None, 0))
self.assertEqual(mock_write_sync.call_args_list[0][1], {'version': 0})
mock_set_sync = self.p.config.set_synchronous_standby_names = Mock()
mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=True)
# Postgres 9.5, our name is written to leader of the /sync key, while voters list and ssn is empty
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
self.assertEqual(self.ha.dcs.write_sync_state.call_count, 1)
self.assertEqual(mock_write_sync.call_args_list[0][0], (self.p.name, None, 0))
self.assertEqual(mock_write_sync.call_args_list[0][1], {'version': 0})
self.assertEqual(mock_set_sync.call_count, 1)
self.assertEqual(mock_set_sync.call_args_list[0][0], (None,))
self.p._major_version = 90600
mock_set_sync.reset_mock()
mock_write_sync.reset_mock()
self.p.set_role('replica')
# Postgres 9.6, with quorum commit we avoid updating /sync key and put some nodes to ssn
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
self.assertEqual(mock_write_sync.call_count, 0)
self.assertEqual(mock_set_sync.call_count, 1)
self.assertEqual(mock_set_sync.call_args_list[0][0], ('2 (foo,other)',))
self.p._major_version = 150000
mock_set_sync.reset_mock()
self.p.set_role('replica')
self.p.name = 'nonsync'
self.ha.fetch_node_status = get_node_status()
# Postgres 15, with quorum commit. Non-sync node promoted we avoid updating /sync key and put some nodes to ssn
self.assertEqual(self.ha.run_cycle(), 'promoted self to leader by acquiring session lock')
self.assertEqual(mock_write_sync.call_count, 0)
self.assertEqual(mock_set_sync.call_count, 1)
self.assertEqual(mock_set_sync.call_args_list[0][0], ('ANY 3 (foo,other,postgresql0)',))
@patch.object(global_config.__class__, 'is_synchronous_mode', PropertyMock(return_value=True))
@patch.object(global_config.__class__, 'is_quorum_commit_mode', PropertyMock(return_value=True))
def test__process_quorum_replication(self):
self.p._major_version = 150000
self.ha.has_lock = true
mock_set_sync = self.p.config.set_synchronous_standby_names = Mock()
self.p.name = 'leader'
mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=None)
# Test /sync key is attempted to set and failed when missing or invalid
self.p.sync_handler.current_state = Mock(return_value=_SyncState('quorum', 1, 1, CaseInsensitiveSet(['other']),
CaseInsensitiveSet(['other'])))
self.ha.run_cycle()
self.assertEqual(mock_write_sync.call_count, 1)
self.assertEqual(mock_write_sync.call_args_list[0][0], (self.p.name, None, 0))
self.assertEqual(mock_write_sync.call_args_list[0][1], {'version': None})
self.assertEqual(mock_set_sync.call_count, 0)
self.ha._promote_timestamp = 1
mock_write_sync = self.ha.dcs.write_sync_state = Mock(side_effect=[SyncState(None, self.p.name, None, 0), None])
# Test /sync key is attempted to set and succeed when missing or invalid
with patch.object(SyncState, 'is_empty', Mock(side_effect=[True, False])):
self.ha.run_cycle()
self.assertEqual(mock_write_sync.call_count, 2)
self.assertEqual(mock_write_sync.call_args_list[0][0], (self.p.name, None, 0))
self.assertEqual(mock_write_sync.call_args_list[0][1], {'version': None})
self.assertEqual(mock_write_sync.call_args_list[1][0], (self.p.name, CaseInsensitiveSet(['other']), 0))
self.assertEqual(mock_write_sync.call_args_list[1][1], {'version': None})
self.assertEqual(mock_set_sync.call_count, 0)
self.p.sync_handler.current_state = Mock(side_effect=[_SyncState('quorum', 1, 0, CaseInsensitiveSet(['foo']),
CaseInsensitiveSet(['other'])),
_SyncState('quorum', 1, 1, CaseInsensitiveSet(['foo']),
CaseInsensitiveSet(['foo']))])
mock_write_sync = self.ha.dcs.write_sync_state = Mock(return_value=SyncState(1, 'leader', 'foo', 0))
self.ha.cluster = get_cluster_initialized_with_leader(sync=('leader', 'foo'))
# Test the sync node is removed from voters, added to ssn
with patch.object(Postgresql, 'synchronous_standby_names', Mock(return_value='other')), \
patch('time.sleep', Mock()):
self.ha.run_cycle()
self.assertEqual(mock_write_sync.call_count, 1)
self.assertEqual(mock_write_sync.call_args_list[0][0], (self.p.name, CaseInsensitiveSet(), 0))
self.assertEqual(mock_write_sync.call_args_list[0][1], {'version': 0})
self.assertEqual(mock_set_sync.call_count, 1)
self.assertEqual(mock_set_sync.call_args_list[0][0], ('ANY 1 (other)',))
# Test ANY 1 (*) when synchronous_mode_strict and no nodes available
self.p.sync_handler.current_state = Mock(return_value=_SyncState('quorum', 1, 0,
CaseInsensitiveSet(['other', 'foo']),
CaseInsensitiveSet()))
mock_write_sync.reset_mock()
mock_set_sync.reset_mock()
with patch.object(global_config.__class__, 'is_synchronous_mode_strict', PropertyMock(return_value=True)):
self.ha.run_cycle()
self.assertEqual(mock_write_sync.call_count, 1)
self.assertEqual(mock_write_sync.call_args_list[0][0], (self.p.name, CaseInsensitiveSet(), 0))
self.assertEqual(mock_write_sync.call_args_list[0][1], {'version': 0})
self.assertEqual(mock_set_sync.call_count, 1)
self.assertEqual(mock_set_sync.call_args_list[0][0], ('ANY 1 (*)',))
# Test that _process_quorum_replication doesn't take longer than loop_wait
with patch('time.time', Mock(side_effect=[30, 60, 90, 120])):
self.ha.process_sync_replication()

View File

@@ -439,7 +439,7 @@ class TestKubernetesEndpoints(BaseTestKubernetes):
@patch.object(k8s_client.CoreV1Api, 'patch_namespaced_endpoints', mock_namespaced_kind, create=True)
def test_write_sync_state(self):
self.assertIsNotNone(self.k.write_sync_state('a', ['b'], 1))
self.assertIsNotNone(self.k.write_sync_state('a', ['b'], 0, 1))
@patch.object(k8s_client.CoreV1Api, 'patch_namespaced_pod', mock_namespaced_kind, create=True)
@patch.object(k8s_client.CoreV1Api, 'create_namespaced_endpoints', mock_namespaced_kind, create=True)

473
tests/test_quorum.py Normal file
View File

@@ -0,0 +1,473 @@
import unittest
from typing import List, Set, Tuple
from patroni.quorum import QuorumStateResolver, QuorumError
class QuorumTest(unittest.TestCase):
def check_state_transitions(self, leader: str, quorum: int, voters: Set[str], numsync: int, sync: Set[str],
numsync_confirmed: int, active: Set[str], sync_wanted: int, leader_wanted: str,
expected: List[Tuple[str, str, int, Set[str]]]) -> None:
kwargs = {
'leader': leader, 'quorum': quorum, 'voters': voters,
'numsync': numsync, 'sync': sync, 'numsync_confirmed': numsync_confirmed,
'active': active, 'sync_wanted': sync_wanted, 'leader_wanted': leader_wanted
}
result = list(QuorumStateResolver(**kwargs))
self.assertEqual(result, expected)
# also check interrupted transitions
if len(result) > 0 and result[0][0] != 'restart' and kwargs['leader'] == result[0][1]:
if result[0][0] == 'sync':
kwargs.update(numsync=result[0][2], sync=result[0][3])
else:
kwargs.update(leader=result[0][1], quorum=result[0][2], voters=result[0][3])
kwargs['expected'] = expected[1:]
self.check_state_transitions(**kwargs)
def test_1111(self):
leader = 'a'
# Add node
self.check_state_transitions(leader=leader, quorum=0, voters=set(),
numsync=0, sync=set(), numsync_confirmed=0, active=set('b'),
sync_wanted=2, leader_wanted=leader, expected=[
('sync', leader, 1, set('b')),
('restart', leader, 0, set()),
])
self.check_state_transitions(leader=leader, quorum=0, voters=set(),
numsync=1, sync=set('b'), numsync_confirmed=1, active=set('b'),
sync_wanted=2, leader_wanted=leader, expected=[
('quorum', leader, 0, set('b'))
])
self.check_state_transitions(leader=leader, quorum=0, voters=set(),
numsync=0, sync=set(), numsync_confirmed=0, active=set('bcde'),
sync_wanted=2, leader_wanted=leader, expected=[
('sync', leader, 2, set('bcde')),
('restart', leader, 0, set()),
])
self.check_state_transitions(leader=leader, quorum=0, voters=set(),
numsync=2, sync=set('bcde'), numsync_confirmed=1, active=set('bcde'),
sync_wanted=2, leader_wanted=leader, expected=[
('quorum', leader, 3, set('bcde')),
])
def test_1222(self):
"""2 node cluster"""
leader = 'a'
# Active set matches state
self.check_state_transitions(leader=leader, quorum=0, voters=set('b'),
numsync=1, sync=set('b'), numsync_confirmed=1, active=set('b'),
sync_wanted=2, leader_wanted=leader, expected=[])
# Add node by increasing quorum
self.check_state_transitions(leader=leader, quorum=0, voters=set('b'),
numsync=1, sync=set('b'), numsync_confirmed=1, active=set('BC'),
sync_wanted=1, leader_wanted=leader, expected=[
('quorum', leader, 1, set('bC')),
('sync', leader, 1, set('bC')),
])
# Add node by increasing sync
self.check_state_transitions(leader=leader, quorum=0, voters=set('b'),
numsync=1, sync=set('b'), numsync_confirmed=1, active=set('bc'),
sync_wanted=2, leader_wanted=leader, expected=[
('sync', leader, 2, set('bc')),
('quorum', leader, 1, set('bc')),
])
# Reduce quorum after added node caught up
self.check_state_transitions(leader=leader, quorum=1, voters=set('bc'),
numsync=2, sync=set('bc'), numsync_confirmed=2, active=set('bc'),
sync_wanted=2, leader_wanted=leader, expected=[
('quorum', leader, 0, set('bc')),
])
# Add multiple nodes by increasing both sync and quorum
self.check_state_transitions(leader=leader, quorum=0, voters=set('b'),
numsync=1, sync=set('b'), numsync_confirmed=1, active=set('BCdE'),
sync_wanted=2, leader_wanted=leader, expected=[
('sync', leader, 2, set('bC')),
('quorum', leader, 3, set('bCdE')),
('sync', leader, 2, set('bCdE')),
])
# Reduce quorum after added nodes caught up
self.check_state_transitions(leader=leader, quorum=3, voters=set('bcde'),
numsync=2, sync=set('bcde'), numsync_confirmed=3, active=set('bcde'),
sync_wanted=2, leader_wanted=leader, expected=[
('quorum', leader, 2, set('bcde')),
])
# Primary is alone
self.check_state_transitions(leader=leader, quorum=0, voters=set('b'),
numsync=1, sync=set('b'), numsync_confirmed=0, active=set(),
sync_wanted=1, leader_wanted=leader, expected=[
('quorum', leader, 0, set()),
('sync', leader, 0, set()),
])
# Swap out sync replica
self.check_state_transitions(leader=leader, quorum=0, voters=set('b'),
numsync=1, sync=set('b'), numsync_confirmed=0, active=set('c'),
sync_wanted=1, leader_wanted=leader, expected=[
('quorum', leader, 0, set()),
('sync', leader, 1, set('c')),
('restart', leader, 0, set()),
])
# Update quorum when added node caught up
self.check_state_transitions(leader=leader, quorum=0, voters=set(),
numsync=1, sync=set('c'), numsync_confirmed=1, active=set('c'),
sync_wanted=1, leader_wanted=leader, expected=[
('quorum', leader, 0, set('c')),
])
def test_1233(self):
"""Interrupted transition from 2 node cluster to 3 node fully sync cluster"""
leader = 'a'
# Node c went away, transition back to 2 node cluster
self.check_state_transitions(leader=leader, quorum=0, voters=set('b'),
numsync=2, sync=set('bc'), numsync_confirmed=1, active=set('b'),
sync_wanted=2, leader_wanted=leader, expected=[
('sync', leader, 1, set('b')),
])
# Node c is available transition to larger quorum set, but not yet caught up.
self.check_state_transitions(leader=leader, quorum=0, voters=set('b'),
numsync=2, sync=set('bc'), numsync_confirmed=1, active=set('bc'),
sync_wanted=2, leader_wanted=leader, expected=[
('quorum', leader, 1, set('bc')),
])
# Add in a new node at the same time, but node c didn't caught up yet
self.check_state_transitions(leader=leader, quorum=0, voters=set('b'),
numsync=2, sync=set('bc'), numsync_confirmed=1, active=set('bcd'),
sync_wanted=2, leader_wanted=leader, expected=[
('quorum', leader, 2, set('bcd')),
('sync', leader, 2, set('bcd')),
])
# All sync nodes caught up, reduce quorum
self.check_state_transitions(leader=leader, quorum=2, voters=set('bcd'),
numsync=2, sync=set('bcd'), numsync_confirmed=3, active=set('bcd'),
sync_wanted=2, leader_wanted=leader, expected=[
('quorum', leader, 1, set('bcd')),
])
# Change replication factor at the same time
self.check_state_transitions(leader=leader, quorum=0, voters=set('b'),
numsync=2, sync=set('bc'), numsync_confirmed=1, active=set('bc'),
sync_wanted=1, leader_wanted=leader, expected=[
('quorum', leader, 1, set('bc')),
('sync', leader, 1, set('bc')),
])
def test_2322(self):
"""Interrupted transition from 2 node cluster to 3 node cluster with replication factor 2"""
leader = 'a'
# Node c went away, transition back to 2 node cluster
self.check_state_transitions(leader=leader, quorum=1, voters=set('bc'),
numsync=1, sync=set('b'), numsync_confirmed=1, active=set('b'),
sync_wanted=1, leader_wanted=leader, expected=[
('quorum', leader, 0, set('b')),
])
# Node c is available transition to larger quorum set.
self.check_state_transitions(leader=leader, quorum=1, voters=set('bc'),
numsync=1, sync=set('b'), numsync_confirmed=1, active=set('bc'),
sync_wanted=1, leader_wanted=leader, expected=[
('sync', leader, 1, set('bc')),
])
# Add in a new node at the same time
self.check_state_transitions(leader=leader, quorum=1, voters=set('bc'),
numsync=1, sync=set('b'), numsync_confirmed=1, active=set('bcd'),
sync_wanted=1, leader_wanted=leader, expected=[
('sync', leader, 1, set('bc')),
('quorum', leader, 2, set('bcd')),
('sync', leader, 1, set('bcd')),
])
# Convert to a fully synced cluster
self.check_state_transitions(leader=leader, quorum=1, voters=set('bc'),
numsync=1, sync=set('b'), numsync_confirmed=1, active=set('bc'),
sync_wanted=2, leader_wanted=leader, expected=[
('sync', leader, 2, set('bc')),
])
# Reduce quorum after all nodes caught up
self.check_state_transitions(leader=leader, quorum=1, voters=set('bc'),
numsync=2, sync=set('bc'), numsync_confirmed=2, active=set('bc'),
sync_wanted=2, leader_wanted=leader, expected=[
('quorum', leader, 0, set('bc')),
])
def test_3535(self):
leader = 'a'
# remove nodes
self.check_state_transitions(leader=leader, quorum=2, voters=set('bcde'),
numsync=2, sync=set('bcde'), numsync_confirmed=2, active=set('bc'),
sync_wanted=2, leader_wanted=leader, expected=[
('sync', leader, 2, set('bc')),
('quorum', leader, 0, set('bc')),
])
self.check_state_transitions(leader=leader, quorum=2, voters=set('bcde'),
numsync=2, sync=set('bcde'), numsync_confirmed=3, active=set('bcd'),
sync_wanted=2, leader_wanted=leader, expected=[
('sync', leader, 2, set('bcd')),
('quorum', leader, 1, set('bcd')),
])
# remove nodes and decrease sync
self.check_state_transitions(leader=leader, quorum=2, voters=set('bcde'),
numsync=2, sync=set('bcde'), numsync_confirmed=2, active=set('bc'),
sync_wanted=1, leader_wanted=leader, expected=[
('sync', leader, 2, set('bc')),
('quorum', leader, 1, set('bc')),
('sync', leader, 1, set('bc')),
])
self.check_state_transitions(leader=leader, quorum=1, voters=set('bcde'),
numsync=3, sync=set('bcde'), numsync_confirmed=2, active=set('bc'),
sync_wanted=1, leader_wanted=leader, expected=[
('sync', leader, 3, set('bcd')),
('quorum', leader, 1, set('bc')),
('sync', leader, 1, set('bc')),
])
# Increase replication factor and decrease quorum
self.check_state_transitions(leader=leader, quorum=2, voters=set('bcde'),
numsync=2, sync=set('bcde'), numsync_confirmed=2, active=set('bcde'),
sync_wanted=3, leader_wanted=leader, expected=[
('sync', leader, 3, set('bcde')),
])
# decrease quorum after more nodes caught up
self.check_state_transitions(leader=leader, quorum=2, voters=set('bcde'),
numsync=3, sync=set('bcde'), numsync_confirmed=3, active=set('bcde'),
sync_wanted=3, leader_wanted=leader, expected=[
('quorum', leader, 1, set('bcde')),
])
# Add node with decreasing sync and increasing quorum
self.check_state_transitions(leader=leader, quorum=2, voters=set('bcde'),
numsync=2, sync=set('bcde'), numsync_confirmed=2, active=set('bcdef'),
sync_wanted=1, leader_wanted=leader, expected=[
# increase quorum by 2, 1 for added node and another for reduced sync
('quorum', leader, 4, set('bcdef')),
# now reduce replication factor to requested value
('sync', leader, 1, set('bcdef')),
])
# Remove node with increasing sync and decreasing quorum
self.check_state_transitions(leader=leader, quorum=2, voters=set('bcde'),
numsync=2, sync=set('bcde'), numsync_confirmed=2, active=set('bcd'),
sync_wanted=3, leader_wanted=leader, expected=[
# node e removed from sync wth replication factor increase
('sync', leader, 3, set('bcd')),
# node e removed from voters with quorum decrease
('quorum', leader, 1, set('bcd')),
])
def test_remove_nosync_node(self):
leader = 'a'
self.check_state_transitions(leader=leader, quorum=0, voters=set('bc'),
numsync=2, sync=set('bc'), numsync_confirmed=1, active=set('b'),
sync_wanted=2, leader_wanted=leader, expected=[
('quorum', leader, 0, set('b')),
('sync', leader, 1, set('b'))
])
def test_swap_sync_node(self):
leader = 'a'
self.check_state_transitions(leader=leader, quorum=0, voters=set('bc'),
numsync=2, sync=set('bc'), numsync_confirmed=1, active=set('bd'),
sync_wanted=2, leader_wanted=leader, expected=[
('quorum', leader, 0, set('b')),
('sync', leader, 2, set('bd')),
('quorum', leader, 1, set('bd'))
])
def test_promotion(self):
# Beginning stat: 'a' in the primary, 1 of bcd in sync
# a fails, c gets quorum votes and promotes
self.check_state_transitions(leader='a', quorum=2, voters=set('bcd'),
numsync=0, sync=set(), numsync_confirmed=0, active=set(),
sync_wanted=1, leader_wanted='c', expected=[
('sync', 'a', 1, set('abd')), # set a and b to sync
('quorum', 'c', 2, set('abd')), # set c as a leader and move a to voters
# and stop because there are no active nodes
])
# next loop, b managed to reconnect
self.check_state_transitions(leader='c', quorum=2, voters=set('abd'),
numsync=1, sync=set('abd'), numsync_confirmed=0, active=set('b'),
sync_wanted=1, leader_wanted='c', expected=[
('sync', 'c', 1, set('b')), # remove a from sync as inactive
('quorum', 'c', 0, set('b')), # remove a from voters and reduce quorum
])
# alternative reality: next loop, no one reconnected
self.check_state_transitions(leader='c', quorum=2, voters=set('abd'),
numsync=1, sync=set('abd'), numsync_confirmed=0, active=set(),
sync_wanted=1, leader_wanted='c', expected=[
('quorum', 'c', 0, set()),
('sync', 'c', 0, set()),
])
def test_nonsync_promotion(self):
# Beginning state: 1 of bc in sync. e.g. (a primary, ssn = ANY 1 (b c))
# a fails, d sees b and c, knows that it is in sync and decides to promote.
# We include in sync state former primary increasing replication factor
# and let situation resolve. Node d ssn=ANY 1 (b c)
leader = 'd'
self.check_state_transitions(leader='a', quorum=1, voters=set('bc'),
numsync=0, sync=set(), numsync_confirmed=0, active=set(),
sync_wanted=1, leader_wanted=leader, expected=[
# Set a, b, and c to sync and increase replication factor
('sync', 'a', 2, set('abc')),
# Set ourselves as the leader and move the old leader to voters
('quorum', leader, 1, set('abc')),
# and stop because there are no active nodes
])
# next loop, b and c managed to reconnect
self.check_state_transitions(leader=leader, quorum=1, voters=set('abc'),
numsync=2, sync=set('abc'), numsync_confirmed=0, active=set('bc'),
sync_wanted=1, leader_wanted=leader, expected=[
('sync', leader, 2, set('bc')), # Remove a from being synced to.
('quorum', leader, 1, set('bc')), # Remove a from quorum
('sync', leader, 1, set('bc')), # Can now reduce replication factor back
])
# alternative reality: next loop, no one reconnected
self.check_state_transitions(leader=leader, quorum=1, voters=set('abc'),
numsync=2, sync=set('abc'), numsync_confirmed=0, active=set(),
sync_wanted=1, leader_wanted=leader, expected=[
('quorum', leader, 0, set()),
('sync', leader, 0, set()),
])
def test_invalid_states(self):
leader = 'a'
# Main invariant is not satisfied, system is in an unsafe state
resolver = QuorumStateResolver(leader=leader, quorum=0, voters=set('bc'),
numsync=1, sync=set('bc'), numsync_confirmed=1,
active=set('bc'), sync_wanted=1, leader_wanted=leader)
self.assertRaises(QuorumError, resolver.check_invariants)
self.assertEqual(list(resolver), [
('quorum', leader, 1, set('bc'))
])
# Quorum and sync states mismatched, somebody other than Patroni modified system state
resolver = QuorumStateResolver(leader=leader, quorum=1, voters=set('bc'),
numsync=2, sync=set('bd'), numsync_confirmed=1,
active=set('bd'), sync_wanted=1, leader_wanted=leader)
self.assertRaises(QuorumError, resolver.check_invariants)
self.assertEqual(list(resolver), [
('quorum', leader, 1, set('bd')),
('sync', leader, 1, set('bd')),
])
self.assertTrue(repr(resolver.sync).startswith('<CaseInsensitiveSet'))
def test_sync_high_quorum_low_safety_margin_high(self):
leader = 'a'
self.check_state_transitions(leader=leader, quorum=2, voters=set('bcdef'),
numsync=4, sync=set('bcdef'), numsync_confirmed=3, active=set('bcdef'),
sync_wanted=2, leader_wanted=leader, expected=[
('quorum', leader, 3, set('bcdef')), # Adjust quorum requirements
('sync', leader, 2, set('bcdef')), # Reduce synchronization
])
def test_quorum_update(self):
resolver = QuorumStateResolver(leader='a', quorum=1, voters=set('bc'), numsync=1, sync=set('bc'),
numsync_confirmed=1, active=set('bc'), sync_wanted=1, leader_wanted='a')
self.assertRaises(QuorumError, list, resolver.quorum_update(-1, set()))
self.assertRaises(QuorumError, list, resolver.quorum_update(1, set()))
def test_sync_update(self):
resolver = QuorumStateResolver(leader='a', quorum=1, voters=set('bc'), numsync=1, sync=set('bc'),
numsync_confirmed=1, active=set('bc'), sync_wanted=1, leader_wanted='a')
self.assertRaises(QuorumError, list, resolver.sync_update(-1, set()))
self.assertRaises(QuorumError, list, resolver.sync_update(1, set()))
def test_remove_nodes_with_decreasing_sync(self):
leader = 'a'
# Remove node with decreasing sync
self.check_state_transitions(leader=leader, quorum=1, voters=set('bcdef'),
numsync=4, sync=set('bcdef'), numsync_confirmed=2, active=set('bcd'),
sync_wanted=2, leader_wanted=leader, expected=[
# node f removed from sync
('sync', leader, 4, set('bcde')),
# nodes e and f removed from voters with quorum decrease
('quorum', leader, 1, set('bcd')),
# node e removed from sync with replication factor decrease
('sync', leader, 2, set('bcd')),
])
# Interrupted state, and node g joined
self.check_state_transitions(leader=leader, quorum=1, voters=set('bcdef'),
numsync=4, sync=set('bcde'), numsync_confirmed=2, active=set('bcdg'),
sync_wanted=2, leader_wanted=leader, expected=[
# remove nodes e and f from voters
('quorum', leader, 1, set('bcd')),
# remove node e from sync and reduce replication factor
('sync', leader, 3, set('bcd')),
# add node g to voters with quorum increase
('quorum', leader, 2, set('bcdg')),
# add node g to sync and reduce replication factor
('sync', leader, 2, set('bcdg')),
])
# node f returned
self.check_state_transitions(leader=leader, quorum=1, voters=set('bcdef'),
numsync=4, sync=set('bcde'), numsync_confirmed=2, active=set('bcdf'),
sync_wanted=2, leader_wanted=leader, expected=[
# replace node e with f in sync
('sync', leader, 4, set('bcdf')),
# remove nodes e from voters with quorum decrease
('quorum', leader, 2, set('bcdf')),
# reduce replication factor as it was requested
('sync', leader, 2, set('bcdf')),
])
# node e returned
self.check_state_transitions(leader=leader, quorum=1, voters=set('bcdef'),
numsync=4, sync=set('bcde'), numsync_confirmed=2, active=set('bcde'),
sync_wanted=2, leader_wanted=leader, expected=[
# remove nodes f from voters with quorum decrease
('quorum', leader, 2, set('bcde')),
# reduce replication factor as it was requested
('sync', leader, 2, set('bcde')),
])
# node b is also lost
self.check_state_transitions(leader=leader, quorum=1, voters=set('bcdef'),
numsync=4, sync=set('bcde'), numsync_confirmed=2, active=set('cd'),
sync_wanted=1, leader_wanted=leader, expected=[
# remove nodes b, e, and f from voters
('quorum', leader, 1, set('cd')),
# remove nodes b and e from sync with replication factor decrease
('sync', leader, 1, set('cd')),
])
def test_empty_ssn(self):
# Beginning stat: 'a' in the primary, 1 of bc in sync
# a fails, c gets quorum votes and promotes
self.check_state_transitions(leader='a', quorum=1, voters=set('bc'),
numsync=1, sync=set(), numsync_confirmed=0, active=set(),
sync_wanted=1, leader_wanted='c', expected=[
('sync', 'a', 1, set('ab')), # remove a from sync as inactive
('quorum', 'c', 1, set('ab')), # set c as a leader and move a to voters
# and stop because there are no active nodes
])
# next loop, b managed to reconnect
self.check_state_transitions(leader='c', quorum=1, voters=set('ab'),
numsync=1, sync=set('ab'), numsync_confirmed=0, active=set('b'),
sync_wanted=1, leader_wanted='c', expected=[
('sync', 'c', 1, set('b')), # remove a from sync as inactive
('quorum', 'c', 0, set('b')), # remove a from voters and reduce quorum
])

View File

@@ -140,8 +140,8 @@ class TestRaft(unittest.TestCase):
self.assertTrue(raft.initialize())
self.assertTrue(raft.cancel_initialization())
self.assertTrue(raft.set_config_value('{}'))
self.assertTrue(raft.write_sync_state('foo', 'bar'))
self.assertFalse(raft.write_sync_state('foo', 'bar', 1))
self.assertTrue(raft.write_sync_state('foo', 'bar', 0))
self.assertFalse(raft.write_sync_state('foo', 'bar', 0, 1))
raft._mpp = get_mpp({'citus': {'group': 1, 'database': 'postgres'}})
self.assertTrue(raft.manual_failover('foo', 'bar'))
raft._mpp = get_mpp({'citus': {'group': 0, 'database': 'postgres'}})

View File

@@ -1,9 +1,10 @@
import os
from unittest.mock import Mock, patch, PropertyMock
from unittest.mock import Mock, patch
from patroni import global_config
from patroni.collections import CaseInsensitiveSet
from patroni.dcs import Cluster, SyncState
from patroni.dcs import Cluster, ClusterConfig, Status, SyncState
from patroni.postgresql import Postgresql
from . import BaseTestPostgresql, psycopg_connect, mock_available_gucs
@@ -12,7 +13,6 @@ from . import BaseTestPostgresql, psycopg_connect, mock_available_gucs
@patch('subprocess.call', Mock(return_value=0))
@patch('patroni.psycopg.connect', psycopg_connect)
@patch.object(Postgresql, 'available_gucs', mock_available_gucs)
@patch.object(global_config.__class__, 'is_synchronous_mode', PropertyMock(return_value=True))
class TestSync(BaseTestPostgresql):
@patch('subprocess.call', Mock(return_value=0))
@@ -25,12 +25,13 @@ class TestSync(BaseTestPostgresql):
super(TestSync, self).setUp()
self.p.config.write_postgresql_conf()
self.s = self.p.sync_handler
config = ClusterConfig(1, {'synchronous_mode': True}, 1)
self.cluster = Cluster(True, config, self.leader, Status.empty(), [self.me, self.other, self.leadermem],
None, SyncState(0, self.me.name, self.leadermem.name, 0), None, None, None)
global_config.update(self.cluster)
@patch.object(Postgresql, 'last_operation', Mock(return_value=1))
def test_pick_sync_standby(self):
cluster = Cluster(True, None, self.leader, 0, [self.me, self.other, self.leadermem], None,
SyncState(0, self.me.name, self.leadermem.name), None, None, None)
pg_stat_replication = [
{'pid': 100, 'application_name': self.leadermem.name, 'sync_state': 'sync', 'flush_lsn': 1},
{'pid': 101, 'application_name': self.me.name, 'sync_state': 'async', 'flush_lsn': 2},
@@ -39,34 +40,55 @@ class TestSync(BaseTestPostgresql):
# sync node is a bit behind of async, but we prefer it anyway
with patch.object(Postgresql, "_cluster_info_state_get", side_effect=[self.leadermem.name,
'on', pg_stat_replication]):
self.assertEqual(self.s.current_state(cluster), (CaseInsensitiveSet([self.leadermem.name]),
CaseInsensitiveSet([self.leadermem.name])))
self.assertEqual(self.s.current_state(self.cluster), ('priority', 1, 1,
CaseInsensitiveSet([self.leadermem.name]),
CaseInsensitiveSet([self.leadermem.name])))
# prefer node with sync_state='potential', even if it is slightly behind of async
pg_stat_replication[0]['sync_state'] = 'potential'
for r in pg_stat_replication:
r['write_lsn'] = r.pop('flush_lsn')
with patch.object(Postgresql, "_cluster_info_state_get", side_effect=['', 'remote_write', pg_stat_replication]):
self.assertEqual(self.s.current_state(cluster), (CaseInsensitiveSet([self.leadermem.name]),
CaseInsensitiveSet()))
self.assertEqual(self.s.current_state(self.cluster), ('off', 0, 0, CaseInsensitiveSet(),
CaseInsensitiveSet([self.leadermem.name])))
# when there are no sync or potential candidates we pick async with the minimal replication lag
for i, r in enumerate(pg_stat_replication):
r.update(replay_lsn=3 - i, application_name=r['application_name'].upper())
missing = pg_stat_replication.pop(0)
with patch.object(Postgresql, "_cluster_info_state_get", side_effect=['', 'remote_apply', pg_stat_replication]):
self.assertEqual(self.s.current_state(cluster), (CaseInsensitiveSet([self.me.name]), CaseInsensitiveSet()))
self.assertEqual(self.s.current_state(self.cluster), ('off', 0, 0, CaseInsensitiveSet(),
CaseInsensitiveSet([self.me.name])))
# unknown sync node is ignored
missing.update(application_name='missing', sync_state='sync')
pg_stat_replication.insert(0, missing)
with patch.object(Postgresql, "_cluster_info_state_get", side_effect=['', 'remote_apply', pg_stat_replication]):
self.assertEqual(self.s.current_state(cluster), (CaseInsensitiveSet([self.me.name]), CaseInsensitiveSet()))
self.assertEqual(self.s.current_state(self.cluster), ('off', 0, 0, CaseInsensitiveSet(),
CaseInsensitiveSet([self.me.name])))
# invalid synchronous_standby_names and empty pg_stat_replication
with patch.object(Postgresql, "_cluster_info_state_get", side_effect=['a b', 'remote_apply', None]):
self.p._major_version = 90400
self.assertEqual(self.s.current_state(cluster), (CaseInsensitiveSet(), CaseInsensitiveSet()))
self.assertEqual(self.s.current_state(self.cluster), ('off', 0, 0, CaseInsensitiveSet(),
CaseInsensitiveSet()))
@patch.object(Postgresql, 'last_operation', Mock(return_value=1))
def test_current_state_quorum(self):
self.cluster.config.data['synchronous_mode'] = 'quorum'
global_config.update(self.cluster)
pg_stat_replication = [
{'pid': 100, 'application_name': self.leadermem.name, 'sync_state': 'quorum', 'flush_lsn': 1},
{'pid': 101, 'application_name': self.other.name, 'sync_state': 'quorum', 'flush_lsn': 2}]
# sync node is a bit behind of async, but we prefer it anyway
with patch.object(Postgresql, "_cluster_info_state_get",
side_effect=['ANY 1 ({0},"{1}")'.format(self.leadermem.name, self.other.name),
'on', pg_stat_replication]):
self.assertEqual(self.s.current_state(self.cluster),
('quorum', 1, 2, CaseInsensitiveSet([self.other.name, self.leadermem.name]),
CaseInsensitiveSet([self.leadermem.name, self.other.name])))
def test_set_sync_standby(self):
def value_in_conf():
@@ -85,6 +107,7 @@ class TestSync(BaseTestPostgresql):
mock_reload.assert_not_called()
self.assertEqual(value_in_conf(), "synchronous_standby_names = 'n1'")
mock_reload.reset_mock()
self.s.set_synchronous_standby_names(CaseInsensitiveSet(['n1', 'n2']))
mock_reload.assert_called()
self.assertEqual(value_in_conf(), "synchronous_standby_names = '2 (n1,n2)'")
@@ -99,11 +122,44 @@ class TestSync(BaseTestPostgresql):
mock_reload.assert_called()
self.assertEqual(value_in_conf(), "synchronous_standby_names = '*'")
self.cluster.config.data['synchronous_mode'] = 'quorum'
global_config.update(self.cluster)
mock_reload.reset_mock()
self.s.set_synchronous_standby_names([], 1)
mock_reload.assert_called()
self.assertEqual(value_in_conf(), "synchronous_standby_names = 'ANY 1 (*)'")
mock_reload.reset_mock()
self.s.set_synchronous_standby_names(['a', 'b'], 1)
mock_reload.assert_called()
self.assertEqual(value_in_conf(), "synchronous_standby_names = 'ANY 1 (a,b)'")
mock_reload.reset_mock()
self.s.set_synchronous_standby_names(['a', 'b'], 3)
mock_reload.assert_called()
self.assertEqual(value_in_conf(), "synchronous_standby_names = 'ANY 3 (a,b)'")
self.p._major_version = 90601
mock_reload.reset_mock()
self.s.set_synchronous_standby_names([], 1)
mock_reload.assert_called()
self.assertEqual(value_in_conf(), "synchronous_standby_names = '1 (*)'")
mock_reload.reset_mock()
self.s.set_synchronous_standby_names(['a', 'b'], 1)
mock_reload.assert_called()
self.assertEqual(value_in_conf(), "synchronous_standby_names = '1 (a,b)'")
mock_reload.reset_mock()
self.s.set_synchronous_standby_names(['a', 'b'], 3)
mock_reload.assert_called()
self.assertEqual(value_in_conf(), "synchronous_standby_names = '3 (a,b)'")
@patch.object(Postgresql, 'last_operation', Mock(return_value=1))
def test_do_not_prick_yourself(self):
self.p.name = self.leadermem.name
cluster = Cluster(True, None, self.leader, 0, [self.me, self.other, self.leadermem], None,
SyncState(0, self.me.name, self.leadermem.name), None, None, None)
SyncState(0, self.me.name, self.leadermem.name, 0), None, None, None)
pg_stat_replication = [
{'pid': 100, 'application_name': self.leadermem.name, 'sync_state': 'sync', 'flush_lsn': 1},
@@ -114,4 +170,5 @@ class TestSync(BaseTestPostgresql):
# the pg_stat_replication. We need to check that primary is not selected as the synchronous node.
with patch.object(Postgresql, "_cluster_info_state_get", side_effect=[self.leadermem.name,
'on', pg_stat_replication]):
self.assertEqual(self.s.current_state(cluster), (CaseInsensitiveSet([self.me.name]), CaseInsensitiveSet()))
self.assertEqual(self.s.current_state(cluster), ('priority', 1, 0, CaseInsensitiveSet(),
CaseInsensitiveSet([self.me.name])))