mirror of
https://github.com/optim-enterprises-bv/patroni.git
synced 2026-01-12 19:05:12 +00:00
Current problem of Patroni that strikes many people is that it removes replication slot for member which key is expired from DCS. As a result, when the replica comes back from a scheduled maintenance WAL segments could be already absent, and it can't continue streaming without pulling files from archive. With PostgreSQL 16 and newer we get another problem: logical slot on a standby node could be invalidated if physical replication slot on the primary was removed (and `pg_catalog` vacuumed). The most problematic environment is Kubernetes, where slot is removed nearly instantly when member Pod is deleted. So far, one of the recommended solutions was to configure permanent physical slots with names that match member names to avoid removal of replication slots. It works, but depending on environment might be non-trivial to implement (when for example members may change their names). This PR implements support of `member_slots_ttl` global configuration parameter, that controls for how long member replication slots should be kept when the member key is absent. Default value is set to `30min`. The feature is supported only starting from PostgreSQL 11 and newer, because we want to retain slots not only on the leader node, but on all nodes that could potentially become the new leader, and they should be moved forward using `pg_replication_slot_advance()` function. One could disable feature and get back to the old behavior by setting `member_slots_ttl` to `0`.
340 lines
16 KiB
Python
340 lines
16 KiB
Python
import json
|
|
import unittest
|
|
|
|
from threading import Thread
|
|
from unittest.mock import Mock, patch, PropertyMock
|
|
|
|
import etcd
|
|
import urllib3
|
|
|
|
from patroni.dcs import get_dcs
|
|
from patroni.dcs.etcd import DnsCachingResolver
|
|
from patroni.dcs.etcd3 import AuthFailed, AuthOldRevision, base64_encode, Cluster, Etcd3, \
|
|
Etcd3Client, Etcd3ClientError, Etcd3Error, InvalidAuthToken, PatroniEtcd3Client, \
|
|
RetryFailedError, Unavailable, Unknown, UnsupportedEtcdVersion, UserEmpty
|
|
from patroni.postgresql.mpp import get_mpp
|
|
|
|
from . import MockResponse, SleepException
|
|
|
|
|
|
def mock_urlopen(self, method, url, **kwargs):
|
|
ret = MockResponse()
|
|
if method == 'GET' and url.endswith('/version'):
|
|
ret.content = '{"etcdserver": "3.3.13", "etcdcluster": "3.3.0"}'
|
|
elif method != 'POST':
|
|
raise Exception('Unexpected request method: {0} {1} {2}'.format(method, url, kwargs))
|
|
elif url.endswith('/cluster/member/list'):
|
|
ret.content = '{"members":[{"clientURLs":["http://localhost:2379", "http://localhost:4001"]}]}'
|
|
elif url.endswith('/auth/authenticate'):
|
|
ret.content = '{"token":"authtoken"}'
|
|
elif url.endswith('/lease/grant'):
|
|
ret.content = '{"ID": "123"}'
|
|
elif url.endswith('/lease/keepalive'):
|
|
ret.content = '{"result":{"TTL":30}}'
|
|
elif url.endswith('/kv/range'):
|
|
ret.content = json.dumps({
|
|
"header": {"revision": "1"},
|
|
"kvs": [
|
|
{"key": base64_encode('/patroni/test/1/initialize'),
|
|
"value": base64_encode('12345'), "mod_revision": '1'},
|
|
{"key": base64_encode('/patroni/test/leader'),
|
|
"value": base64_encode('foo'), "lease": "bla", "mod_revision": '1'},
|
|
{"key": base64_encode('/patroni/test/members/foo'),
|
|
"value": base64_encode('{}'), "lease": "123", "mod_revision": '1'},
|
|
{"key": base64_encode('/patroni/test/members/bar'),
|
|
"value": base64_encode('{"version":"1.6.5"}'), "lease": "123", "mod_revision": '1'},
|
|
{"key": base64_encode('/patroni/test/failover'), "value": base64_encode('{}'), "mod_revision": '1'},
|
|
{"key": base64_encode('/patroni/test/failsafe'), "value": base64_encode('{'), "mod_revision": '1'}
|
|
]
|
|
})
|
|
elif url.endswith('/watch'):
|
|
key = base64_encode('/patroni/test/config')
|
|
ret.read_chunked = Mock(return_value=[json.dumps({
|
|
'result': {'events': [
|
|
{'kv': {'key': key, 'value': base64_encode('bar'), 'mod_revision': '2'}},
|
|
{'kv': {'key': key, 'value': base64_encode('buzz'), 'mod_revision': '3'}},
|
|
{'type': 'DELETE', 'kv': {'key': key, 'mod_revision': '4'}},
|
|
{'kv': {'key': base64_encode('/patroni/test/optime/leader'),
|
|
'value': base64_encode('1234567'), 'mod_revision': '5'}},
|
|
]}
|
|
})[:-1].encode('utf-8'), b'}{"error":{"grpc_code":14,"message":"","http_code":503}}'])
|
|
elif url.endswith('/kv/put') or url.endswith('/kv/txn'):
|
|
if base64_encode('/patroni/test/sync') in kwargs['body']:
|
|
ret.content = '{"header":{"revision":"1"},"succeeded":true}'
|
|
else:
|
|
ret.status_code = 400
|
|
ret.content = '{"code":5,"error":"etcdserver: requested lease not found"}'
|
|
elif not url.endswith('/kv/deleterange'):
|
|
raise Exception('Unexpected url: {0} {1} {2}'.format(method, url, kwargs))
|
|
return ret
|
|
|
|
|
|
class TestEtcd3Client(unittest.TestCase):
|
|
|
|
@patch.object(Thread, 'start', Mock())
|
|
@patch.object(urllib3.PoolManager, 'urlopen', mock_urlopen)
|
|
def test_authenticate(self):
|
|
etcd3 = Etcd3Client({'host': '127.0.0.1', 'port': 2379, 'use_proxies': True, 'retry_timeout': 10},
|
|
DnsCachingResolver())
|
|
self.assertIsNotNone(etcd3._cluster_version)
|
|
|
|
|
|
class BaseTestEtcd3(unittest.TestCase):
|
|
|
|
@patch.object(Thread, 'start', Mock())
|
|
@patch.object(urllib3.PoolManager, 'urlopen', mock_urlopen)
|
|
def setUp(self):
|
|
self.etcd3 = get_dcs({'namespace': '/patroni/', 'ttl': 30, 'retry_timeout': 10, 'name': 'foo', 'scope': 'test',
|
|
'etcd3': {'host': 'localhost:2378', 'username': 'etcduser', 'password': 'etcdpassword'}})
|
|
self.assertIsInstance(self.etcd3, Etcd3)
|
|
self.client = self.etcd3._client
|
|
self.kv_cache = self.client._kv_cache
|
|
|
|
|
|
class TestKVCache(BaseTestEtcd3):
|
|
|
|
@patch.object(urllib3.PoolManager, 'urlopen', mock_urlopen)
|
|
@patch.object(Etcd3Client, 'watchprefix', Mock(return_value=urllib3.response.HTTPResponse()))
|
|
def test__build_cache(self):
|
|
self.kv_cache._build_cache()
|
|
|
|
def test__do_watch(self):
|
|
self.client.watchprefix = Mock(return_value=False)
|
|
self.assertRaises(AttributeError, self.kv_cache._do_watch, '1')
|
|
|
|
@patch('time.sleep', Mock(side_effect=SleepException))
|
|
@patch('patroni.dcs.etcd3.KVCache._build_cache', Mock(side_effect=Exception))
|
|
def test_run(self):
|
|
self.assertRaises(SleepException, self.kv_cache.run)
|
|
|
|
@patch.object(urllib3.response.HTTPResponse, 'read_chunked',
|
|
Mock(return_value=[b'{"error":{"grpc_code":14,"message":"","http_code":503}}']))
|
|
@patch.object(Etcd3Client, 'watchprefix', Mock(return_value=urllib3.response.HTTPResponse()))
|
|
def test_kill_stream(self):
|
|
self.assertRaises(Unavailable, self.kv_cache._do_watch, '1')
|
|
with patch.object(urllib3.response.HTTPResponse, 'connection') as mock_conn:
|
|
self.kv_cache.kill_stream()
|
|
mock_conn.sock.close.side_effect = Exception
|
|
self.kv_cache.kill_stream()
|
|
type(mock_conn).sock = PropertyMock(side_effect=Exception)
|
|
self.kv_cache.kill_stream()
|
|
|
|
|
|
class TestPatroniEtcd3Client(BaseTestEtcd3):
|
|
|
|
@patch('patroni.dcs.etcd3.Etcd3Client.authenticate', Mock(side_effect=AuthFailed))
|
|
def test__init__(self):
|
|
self.assertRaises(SystemExit, self.setUp)
|
|
|
|
@patch.object(urllib3.PoolManager, 'urlopen')
|
|
def test_call_rpc(self, mock_urlopen):
|
|
request = {'key': base64_encode('/patroni/test/leader')}
|
|
mock_urlopen.return_value = MockResponse()
|
|
mock_urlopen.return_value.content = '{"succeeded":true,"header":{"revision":"1"}}'
|
|
self.client.call_rpc('/kv/put', request)
|
|
self.client.call_rpc('/kv/deleterange', request)
|
|
|
|
@patch.object(urllib3.PoolManager, 'urlopen')
|
|
def test_txn(self, mock_urlopen):
|
|
mock_urlopen.return_value = MockResponse()
|
|
mock_urlopen.return_value.content = '{"header":{"revision":"1"}}'
|
|
self.client.txn({'target': 'MOD', 'mod_revision': '1'},
|
|
{'request_delete_range': {'key': base64_encode('/patroni/test/leader')}})
|
|
|
|
@patch('time.time', Mock(side_effect=[1, 10.9, 100]))
|
|
def test__wait_cache(self):
|
|
with self.kv_cache.condition:
|
|
self.assertRaises(RetryFailedError, self.client._wait_cache, 10)
|
|
|
|
@patch.object(urllib3.PoolManager, 'urlopen')
|
|
def test__restart_watcher(self, mock_urlopen):
|
|
mock_urlopen.return_value = MockResponse()
|
|
mock_urlopen.return_value.status_code = 400
|
|
mock_urlopen.return_value.content = '{"code":9,"error":"etcdserver: authentication is not enabled"}'
|
|
self.client.authenticate()
|
|
|
|
@patch.object(urllib3.PoolManager, 'urlopen')
|
|
def test__handle_auth_errors(self, mock_urlopen):
|
|
mock_urlopen.return_value = MockResponse()
|
|
mock_urlopen.return_value.content = '{"code":3,"error":"etcdserver: user name is empty"}'
|
|
mock_urlopen.return_value.status_code = 403
|
|
self.client._cluster_version = (3, 1, 5)
|
|
self.assertRaises(UnsupportedEtcdVersion, self.client.deleteprefix, 'foo')
|
|
self.client._cluster_version = (3, 3, 13)
|
|
self.assertRaises(UserEmpty, self.client.deleteprefix, 'foo')
|
|
mock_urlopen.return_value.content = '{"code":16,"error":"etcdserver: invalid auth token"}'
|
|
self.assertRaises(InvalidAuthToken, self.client.deleteprefix, 'foo')
|
|
with patch.object(PatroniEtcd3Client, 'authenticate', Mock(return_value=True)):
|
|
retry = self.etcd3._retry.copy()
|
|
with patch('time.time', Mock(side_effect=[0, 10, 20, 30, 40])):
|
|
self.assertRaises(InvalidAuthToken, retry, self.client.deleteprefix, 'foo', retry=retry)
|
|
with patch('time.time', Mock(side_effect=[0, 10])):
|
|
self.assertRaises(InvalidAuthToken, self.client.deleteprefix, 'foo')
|
|
self.client.username = None
|
|
self.client._reauthenticate = False
|
|
retry = self.etcd3._retry.copy()
|
|
self.assertRaises(InvalidAuthToken, retry, self.client.deleteprefix, 'foo', retry=retry)
|
|
mock_urlopen.return_value.content = '{"code":3,"error":"etcdserver: revision of auth store is old"}'
|
|
self.client._reauthenticate = False
|
|
self.assertRaises(AuthOldRevision, retry, self.client.deleteprefix, 'foo', retry=retry)
|
|
|
|
def test__handle_server_response(self):
|
|
response = MockResponse()
|
|
response.content = '{"code":0,"error":"'
|
|
self.assertRaises(etcd.EtcdException, self.client._handle_server_response, response)
|
|
response.status_code = 400
|
|
self.assertRaises(Unknown, self.client._handle_server_response, response)
|
|
response.content = '{"error":{"grpc_code":0,"message":"","http_code":400}}'
|
|
try:
|
|
self.client._handle_server_response(response)
|
|
except Unknown as e:
|
|
self.assertEqual(e.as_dict(), {'code': 2, 'codeText': 'OK', 'error': u'', 'status': 400})
|
|
|
|
@patch.object(urllib3.PoolManager, 'urlopen')
|
|
def test__ensure_version_prefix(self, mock_urlopen):
|
|
self.client.version_prefix = None
|
|
mock_urlopen.return_value = MockResponse()
|
|
mock_urlopen.return_value.content = '{"etcdserver": "3.0.3", "etcdcluster": "3.0.0"}'
|
|
self.assertRaises(UnsupportedEtcdVersion, self.client._ensure_version_prefix, '')
|
|
mock_urlopen.return_value.content = '{"etcdserver": "3.0.4", "etcdcluster": "3.0.0"}'
|
|
self.client._ensure_version_prefix('')
|
|
self.assertEqual(self.client.version_prefix, '/v3alpha')
|
|
mock_urlopen.return_value.content = '{"etcdserver": "3.4.4", "etcdcluster": "3.4.0"}'
|
|
self.client._ensure_version_prefix('')
|
|
self.assertEqual(self.client.version_prefix, '/v3')
|
|
|
|
|
|
@patch.object(urllib3.PoolManager, 'urlopen', mock_urlopen)
|
|
class TestEtcd3(BaseTestEtcd3):
|
|
|
|
@patch.object(Thread, 'start', Mock())
|
|
@patch.object(urllib3.PoolManager, 'urlopen', mock_urlopen)
|
|
def setUp(self):
|
|
super(TestEtcd3, self).setUp()
|
|
# self.assertRaises(AttributeError, self.kv_cache._build_cache)
|
|
self.kv_cache._build_cache()
|
|
self.kv_cache._is_ready = True
|
|
self.etcd3.get_cluster()
|
|
|
|
def test_get_cluster(self):
|
|
self.assertIsInstance(self.etcd3.get_cluster(), Cluster)
|
|
self.client._kv_cache = None
|
|
with patch.object(urllib3.PoolManager, 'urlopen') as mock_urlopen:
|
|
mock_urlopen.return_value = MockResponse()
|
|
mock_urlopen.return_value.content = json.dumps({
|
|
"header": {"revision": "1"},
|
|
"kvs": [
|
|
{"key": base64_encode('/patroni/test/status'),
|
|
"value": base64_encode('{"optime":1234567,"slots":{"ls":12345},"retain_slots": ["foo"]}'),
|
|
"mod_revision": '1'}
|
|
]
|
|
})
|
|
self.assertIsInstance(self.etcd3.get_cluster(), Cluster)
|
|
mock_urlopen.return_value.content = json.dumps({
|
|
"header": {"revision": "1"},
|
|
"kvs": [
|
|
{"key": base64_encode('/patroni/test/status'), "value": base64_encode('{'), "mod_revision": '1'}
|
|
]
|
|
})
|
|
self.assertIsInstance(self.etcd3.get_cluster(), Cluster)
|
|
mock_urlopen.side_effect = UnsupportedEtcdVersion('')
|
|
self.assertRaises(UnsupportedEtcdVersion, self.etcd3.get_cluster)
|
|
mock_urlopen.side_effect = SleepException()
|
|
self.assertRaises(Etcd3Error, self.etcd3.get_cluster)
|
|
|
|
def test__get_citus_cluster(self):
|
|
self.etcd3._mpp = get_mpp({'citus': {'group': 0, 'database': 'postgres'}})
|
|
cluster = self.etcd3.get_cluster()
|
|
self.assertIsInstance(cluster, Cluster)
|
|
self.assertIsInstance(cluster.workers[1], Cluster)
|
|
|
|
def test_touch_member(self):
|
|
self.etcd3.touch_member({})
|
|
self.etcd3._lease = 'bla'
|
|
self.etcd3.touch_member({})
|
|
with patch.object(PatroniEtcd3Client, 'lease_grant', Mock(side_effect=Etcd3ClientError)):
|
|
self.etcd3.touch_member({})
|
|
|
|
def test__update_leader(self):
|
|
cluster = self.etcd3.get_cluster()
|
|
self.etcd3._lease = None
|
|
with patch.object(Etcd3Client, 'txn', Mock(return_value={'succeeded': True})):
|
|
self.etcd3.update_leader(cluster, '123', failsafe={'foo': 'bar'})
|
|
self.etcd3._last_lease_refresh = 0
|
|
self.etcd3.update_leader(cluster, '124')
|
|
with patch.object(PatroniEtcd3Client, 'lease_keepalive', Mock(return_value=True)), \
|
|
patch('time.time', Mock(side_effect=[0, 100, 200, 300])):
|
|
self.assertRaises(Etcd3Error, self.etcd3.update_leader, cluster, '126')
|
|
self.etcd3._lease = cluster.leader.session
|
|
self.etcd3.update_leader(cluster, '124')
|
|
self.etcd3._last_lease_refresh = 0
|
|
with patch.object(PatroniEtcd3Client, 'lease_keepalive', Mock(side_effect=Unknown)):
|
|
self.assertFalse(self.etcd3.update_leader(cluster, '125'))
|
|
|
|
def test_take_leader(self):
|
|
self.assertFalse(self.etcd3.take_leader())
|
|
|
|
def test_attempt_to_acquire_leader(self):
|
|
self.assertFalse(self.etcd3.attempt_to_acquire_leader())
|
|
with patch('time.time', Mock(side_effect=[0, 0, 0, 0, 0, 100, 200, 300])):
|
|
self.assertFalse(self.etcd3.attempt_to_acquire_leader())
|
|
with patch('time.time', Mock(side_effect=[0, 100, 200, 300, 400])):
|
|
self.assertRaises(Etcd3Error, self.etcd3.attempt_to_acquire_leader)
|
|
with patch.object(PatroniEtcd3Client, 'put', Mock(return_value=False)):
|
|
self.assertFalse(self.etcd3.attempt_to_acquire_leader())
|
|
|
|
def test_set_ttl(self):
|
|
self.etcd3.set_ttl(20)
|
|
|
|
@patch.object(PatroniEtcd3Client, 'lease_keepalive', Mock(return_value=False))
|
|
def test_refresh_lease(self):
|
|
self.etcd3._last_lease_refresh = 0
|
|
self.etcd3.refresh_lease()
|
|
|
|
@patch('time.sleep', Mock(side_effect=SleepException))
|
|
@patch.object(PatroniEtcd3Client, 'lease_keepalive', Mock(return_value=False))
|
|
@patch.object(PatroniEtcd3Client, 'lease_grant', Mock(side_effect=Etcd3ClientError))
|
|
def test_create_lease(self):
|
|
self.etcd3._lease = None
|
|
self.etcd3._last_lease_refresh = 0
|
|
self.assertRaises(SleepException, self.etcd3.create_lease)
|
|
|
|
def test_set_failover_value(self):
|
|
self.etcd3.set_failover_value('', 1)
|
|
|
|
def test_set_config_value(self):
|
|
self.etcd3.set_config_value('')
|
|
|
|
def test_initialize(self):
|
|
self.etcd3.initialize()
|
|
|
|
def test_cancel_initialization(self):
|
|
self.etcd3.cancel_initialization()
|
|
|
|
def test_delete_leader(self):
|
|
leader = self.etcd3.get_cluster().leader
|
|
self.etcd3.delete_leader(leader)
|
|
self.etcd3._name = 'other'
|
|
self.etcd3.delete_leader(leader)
|
|
|
|
def test_delete_cluster(self):
|
|
self.etcd3.delete_cluster()
|
|
|
|
def test_set_history_value(self):
|
|
self.etcd3.set_history_value('')
|
|
|
|
def test_set_sync_state_value(self):
|
|
self.etcd3.set_sync_state_value('', 1)
|
|
|
|
def test_delete_sync_state(self):
|
|
self.etcd3.delete_sync_state('1')
|
|
|
|
def test_watch(self):
|
|
self.etcd3.set_ttl(10)
|
|
self.etcd3.watch(None, 0)
|
|
self.etcd3.watch('5', 0)
|
|
|
|
def test_set_socket_options(self):
|
|
with patch('socket.SIO_KEEPALIVE_VALS', 1, create=True):
|
|
self.etcd3.set_socket_options(Mock(), None)
|