Follow the node in the replicatefrom if present.

Rename the follow_the_leader to just follow, since the node to
be followed is not necessary a leader anymore. Extend the code
that manages replication slots to the non-master nodes if they
are mentioned in at least one replicatefrom tag.
Add the 3rd configuration in order to be able to run cascading
replicas.
This commit is contained in:
Oleksii Kliukin
2015-12-30 18:33:23 +01:00
parent bf52fa6f57
commit c650dc092e
7 changed files with 152 additions and 36 deletions

View File

@@ -102,19 +102,25 @@ class Ha:
pg_controldata.get('Database cluster state', '') == 'in production': # crashed master
self.state_handler.require_rewind()
self.recovering = True
return self.follow_the_leader("started as readonly because i had the session lock",
"started as a secondary",
refresh=True, recovery=True)
return self.follow("started as readonly because i had the session lock",
"started as a secondary",
refresh=True, recovery=True)
def follow_the_leader(self, demote_reason, follow_reason, refresh=True, recovery=False):
def follow(self, demote_reason, follow_reason, refresh=True, recovery=False):
refresh and self.load_cluster_from_dcs()
ret = demote_reason if (not recovery and self.state_handler.is_leader()
or recovery and self.state_handler.role == 'master') else follow_reason
leader = self.cluster.leader
leader = None if (leader and leader.name) == self.state_handler.name else leader
if not self.state_handler.check_recovery_conf(leader) or recovery:
# determine the node to follow. If replicatefrom tag is set,
# try to follow the node mentioned there, otherwise, follow the leader.
if self.patroni.replicatefrom:
node_to_follow = [m for m in self.cluster.members if m.name == self.patroni.replicatefrom]
node_to_follow = node_to_follow[0] if node_to_follow else self.cluster.leader
else:
node_to_follow = self.cluster.leader
node_to_follow = None if (node_to_follow and node_to_follow.name) == self.state_handler.name else node_to_follow
if not self.state_handler.check_recovery_conf(node_to_follow) or recovery:
self._async_executor.schedule('changing primary_conninfo and restarting')
self._async_executor.run_async(self.state_handler.follow_the_leader, (leader, recovery))
self._async_executor.run_async(self.state_handler.follow, (node_to_follow, recovery))
return ret
def enforce_master_role(self, message, promote_message):
@@ -287,14 +293,14 @@ class Ha:
return self.enforce_master_role('acquired session lock as a leader',
'promoted self to leader by acquiring session lock')
else:
return self.follow_the_leader('demoted self due after trying and failing to obtain lock',
'following new leader after trying and failing to obtain lock')
return self.follow('demoted self due after trying and failing to obtain lock',
'following new leader after trying and failing to obtain lock')
else:
if self.patroni.nofailover:
return self.follow_the_leader('demoting self because I am not allowed to become master',
'following a different leader because I am not allowed to promote')
return self.follow_the_leader('demoting self because i am not the healthiest node',
'following a different leader because i am not the healthiest node')
return self.follow('demoting self because I am not allowed to become master',
'following a different leader because I am not allowed to promote')
return self.follow('demoting self because i am not the healthiest node',
'following a different leader because i am not the healthiest node')
def process_healthy_cluster(self):
if self.has_lock():
@@ -312,8 +318,8 @@ class Ha:
self.load_cluster_from_dcs()
else:
logger.info('does not have lock')
return self.follow_the_leader('demoting self because i do not have the lock and i was a leader',
'no action. i am a secondary and i am following a leader', False)
return self.follow('demoting self because i do not have the lock and i was a leader',
'no action. i am a secondary and i am following a leader', False)
def schedule(self, action):
with self._async_executor:
@@ -430,7 +436,11 @@ class Ha:
else:
return self.process_healthy_cluster()
finally:
self.state_handler.sync_replication_slots(self.cluster)
# we might not have a valid PostgreSQL connection here if another thread
# stops PostgreSQL, therefore, we only reload replication slots if no
# asyncrhonous processes are running (should be always the case for the master)
if not self._async_executor.busy:
self.state_handler.sync_replication_slots(self.cluster)
except DCSError:
logger.error('Error communicating with DCS')
if self.state_handler.is_running() and self.state_handler.is_leader():

View File

@@ -524,7 +524,7 @@ recovery_target_timeline = 'latest'
except:
logger.exception("Unable to remove {}".format(path))
def follow_the_leader(self, leader, recovery=False):
def follow(self, leader, recovery=False):
if not self.check_recovery_conf(leader) or recovery:
change_role = (self.role == 'master')
@@ -634,7 +634,11 @@ $$""".format(name, options), name, password, password)
if self.use_slots:
try:
self.load_replication_slots()
slots = [m.name for m in cluster.members if m.name != self.name] if self.role == 'master' else []
if self.role == 'master':
slots = [m.name for m in cluster.members if m.name != self.name]
else:
# only manage slots for replicas that want to replicate from this one
slots = [m.name for m in cluster.members if m.replicatefrom == self.name]
# drop unused slots
for slot in set(self.replication_slots) - set(slots):
self.query("""SELECT pg_drop_replication_slot(%s)

View File

@@ -88,14 +88,13 @@ postgresql:
archive_mode: "on"
wal_level: hot_standby
archive_command: mkdir -p ../wal_archive && test ! -f ../wal_archive/%f && cp %p ../wal_archive/%f
max_wal_senders: 5
max_wal_senders: 10
wal_keep_segments: 8
archive_timeout: 1800s
max_replication_slots: 5
max_replication_slots: 10
hot_standby: "on"
wal_log_hints: "on"
tags:
nofailover: False
noloadbalance: False
clonefrom: False
replicatefrom: 127.0.0.1

View File

@@ -88,14 +88,13 @@ postgresql:
archive_mode: "on"
wal_level: hot_standby
archive_command: mkdir -p ../wal_archive && test ! -f ../wal_archive/%f && cp %p ../wal_archive/%f
max_wal_senders: 5
max_wal_senders: 10
wal_keep_segments: 8
archive_timeout: 1800s
max_replication_slots: 5
max_replication_slots: 10
hot_standby: "on"
wal_log_hints: "on"
tags:
nofailover: False
noloadbalance: False
clonefrom: False
replicatefrom: 127.0.0.1

101
postgres2.yml Normal file
View File

@@ -0,0 +1,101 @@
ttl: &ttl 30
loop_wait: &loop_wait 10
scope: &scope batman
restapi:
listen: 127.0.0.1:8010
connect_address: 127.0.0.1:8010
auth: 'username:password'
# certfile: /etc/ssl/certs/ssl-cert-snakeoil.pem
# keyfile: /etc/ssl/private/ssl-cert-snakeoil.key
etcd:
scope: *scope
ttl: *ttl
host: 127.0.0.1:4001
#discovery_srv: my-etcd.domain
#zookeeper:
# scope: *scope
# session_timeout: *ttl
# reconnect_timeout: *loop_wait
# hosts:
# - 127.0.0.1:2181
# - 127.0.0.2:2181
# exhibitor:
# poll_interval: 300
# port: 8181
# hosts:
# - host1
# - host2
# - host3
postgresql:
name: postgresql2
scope: *scope
listen: 127.0.0.1:5434
connect_address: 127.0.0.1:5434
data_dir: data/postgresql2
maximum_lag_on_failover: 1048576 # 1 megabyte in bytes
use_slots: True
pgpass: /tmp/pgpass2
initdb: ## We allow the following options to be passed on to initdb
# - auth: authmethod
# - auth-host: authmethod
# - auth-local: authmethod
- encoding: UTF8
# - data-checksums # When pg_rewind is needed on 9.3, this needs to be enabled
# - locale: locale
# - lc-collate: locale
# - lc-ctype: locale
# - lc-messages: locale
# - lc-monetary: locale
# - lc-numeric: locale
# - lc-time: locale
# - text-search-config: CFG
# - xlogdir: directory
# - debug
# - noclean
pg_rewind:
username: postgres
password: zalando
pg_hba:
- host all all 0.0.0.0/0 md5
- hostssl all all 0.0.0.0/0 md5
replication:
username: replicator
password: rep-pass
network: 127.0.0.1/32
superuser:
user: postgres
password: zalando
admin:
username: admin
password: admin
# commented-out example for wal-e provisioning
create_replica_method:
- basebackup
# - wal_e
# commented-out example for wal-e provisioning
#wal_e:
#command: /patroni/scripts/wale_restore.py
#env_dir: /home/postgres/etc/wal-e.d/env
#threshold_megabytes: 10240
#threshold_backup_size_percentage: 30
#retries: 2
#use_iam: 1
#recovery_conf:
#restore_command: envdir /etc/wal-e.d/env wal-e wal-fetch "%f" "%p" -p 1
recovery_conf:
restore_command: cp ../wal_archive/%f %p
parameters:
archive_mode: "on"
wal_level: hot_standby
archive_command: mkdir -p ../wal_archive && test ! -f ../wal_archive/%f && cp %p ../wal_archive/%f
max_wal_senders: 10
wal_keep_segments: 8
archive_timeout: 1800s
max_replication_slots: 10
hot_standby: "on"
wal_log_hints: "on"
tags:
nofailover: False
noloadbalance: False
clonefrom: False
replicatefrom: postgresql1

View File

@@ -88,6 +88,7 @@ class MockPatroni:
self.api = Mock()
self.tags = {}
self.nofailover = None
self.replicatefrom = None
self.api.connection_string = 'http://127.0.0.1:8008'
@@ -128,12 +129,12 @@ class TestHa(unittest.TestCase):
self.p.controldata = lambda: {'Database cluster state': 'in production'}
self.p.is_healthy = false
self.p.is_running = false
self.p.follow_the_leader = false
self.p.follow = false
self.assertEquals(self.ha.run_cycle(), 'started as a secondary')
self.assertEquals(self.ha.run_cycle(), 'failed to start postgres')
def test_recover_master_failed(self):
self.p.follow_the_leader = false
self.p.follow = false
self.p.is_healthy = false
self.p.is_running = false
self.ha.has_lock = true
@@ -202,10 +203,12 @@ class TestHa(unittest.TestCase):
self.ha.update_lock = false
self.assertEquals(self.ha.run_cycle(), 'demoting self because i do not have the lock and i was a leader')
def test_follow_the_leader(self):
def test_follow(self):
self.ha.cluster.is_unlocked = false
self.p.is_leader = false
self.assertEquals(self.ha.run_cycle(), 'no action. i am a secondary and i am following a leader')
self.ha.patroni.replicatefrom = "foo"
self.assertEquals(self.ha.run_cycle(), 'no action. i am a secondary and i am following a leader')
def test_no_etcd_connection_master_demote(self):
self.ha.load_cluster_from_dcs = Mock(side_effect=DCSError('Etcd is not responding properly'))

View File

@@ -242,23 +242,23 @@ class TestPostgresql(unittest.TestCase):
@patch('patroni.postgresql.Postgresql.remove_data_directory', MagicMock(return_value=True))
@patch('patroni.postgresql.Postgresql.single_user_mode', MagicMock(return_value=1))
@patch('patroni.postgresql.Postgresql.write_pgpass', MagicMock(return_value=dict()))
def test_follow_the_leader(self, mock_pg_rewind):
self.p.follow_the_leader(None)
self.p.follow_the_leader(self.leader)
self.p.follow_the_leader(Leader(-1, 28, self.other))
def test_follow(self, mock_pg_rewind):
self.p.follow(None)
self.p.follow(self.leader)
self.p.follow(Leader(-1, 28, self.other))
self.p.rewind = mock_pg_rewind
self.p.follow_the_leader(self.leader)
self.p.follow(self.leader)
self.p.require_rewind()
with mock.patch('os.path.islink', MagicMock(return_value=True)):
with mock.patch('patroni.postgresql.Postgresql.can_rewind', new_callable=PropertyMock(return_value=True)):
with mock.patch('os.unlink', MagicMock(return_value=True)):
self.p.follow_the_leader(self.leader, recovery=True)
self.p.follow(self.leader, recovery=True)
self.p.require_rewind()
with mock.patch('patroni.postgresql.Postgresql.can_rewind', new_callable=PropertyMock(return_value=True)):
self.p.rewind.return_value = True
self.p.follow_the_leader(self.leader, recovery=True)
self.p.follow(self.leader, recovery=True)
self.p.rewind.return_value = False
self.p.follow_the_leader(self.leader, recovery=True)
self.p.follow(self.leader, recovery=True)
def test_can_rewind(self):
tmp = self.p.pg_rewind