From c650dc092e4e46909baf8cb64d3071884e049cbe Mon Sep 17 00:00:00 2001 From: Oleksii Kliukin Date: Wed, 30 Dec 2015 18:33:23 +0100 Subject: [PATCH] Follow the node in the replicatefrom if present. Rename the follow_the_leader to just follow, since the node to be followed is not necessary a leader anymore. Extend the code that manages replication slots to the non-master nodes if they are mentioned in at least one replicatefrom tag. Add the 3rd configuration in order to be able to run cascading replicas. --- patroni/ha.py | 44 ++++++++++------- patroni/postgresql.py | 8 +++- postgres0.yml | 5 +- postgres1.yml | 5 +- postgres2.yml | 101 +++++++++++++++++++++++++++++++++++++++ tests/test_ha.py | 9 ++-- tests/test_postgresql.py | 16 +++---- 7 files changed, 152 insertions(+), 36 deletions(-) create mode 100644 postgres2.yml diff --git a/patroni/ha.py b/patroni/ha.py index af1b62b1..3856366d 100644 --- a/patroni/ha.py +++ b/patroni/ha.py @@ -102,19 +102,25 @@ class Ha: pg_controldata.get('Database cluster state', '') == 'in production': # crashed master self.state_handler.require_rewind() self.recovering = True - return self.follow_the_leader("started as readonly because i had the session lock", - "started as a secondary", - refresh=True, recovery=True) + return self.follow("started as readonly because i had the session lock", + "started as a secondary", + refresh=True, recovery=True) - def follow_the_leader(self, demote_reason, follow_reason, refresh=True, recovery=False): + def follow(self, demote_reason, follow_reason, refresh=True, recovery=False): refresh and self.load_cluster_from_dcs() ret = demote_reason if (not recovery and self.state_handler.is_leader() or recovery and self.state_handler.role == 'master') else follow_reason - leader = self.cluster.leader - leader = None if (leader and leader.name) == self.state_handler.name else leader - if not self.state_handler.check_recovery_conf(leader) or recovery: + # determine the node to follow. If replicatefrom tag is set, + # try to follow the node mentioned there, otherwise, follow the leader. + if self.patroni.replicatefrom: + node_to_follow = [m for m in self.cluster.members if m.name == self.patroni.replicatefrom] + node_to_follow = node_to_follow[0] if node_to_follow else self.cluster.leader + else: + node_to_follow = self.cluster.leader + node_to_follow = None if (node_to_follow and node_to_follow.name) == self.state_handler.name else node_to_follow + if not self.state_handler.check_recovery_conf(node_to_follow) or recovery: self._async_executor.schedule('changing primary_conninfo and restarting') - self._async_executor.run_async(self.state_handler.follow_the_leader, (leader, recovery)) + self._async_executor.run_async(self.state_handler.follow, (node_to_follow, recovery)) return ret def enforce_master_role(self, message, promote_message): @@ -287,14 +293,14 @@ class Ha: return self.enforce_master_role('acquired session lock as a leader', 'promoted self to leader by acquiring session lock') else: - return self.follow_the_leader('demoted self due after trying and failing to obtain lock', - 'following new leader after trying and failing to obtain lock') + return self.follow('demoted self due after trying and failing to obtain lock', + 'following new leader after trying and failing to obtain lock') else: if self.patroni.nofailover: - return self.follow_the_leader('demoting self because I am not allowed to become master', - 'following a different leader because I am not allowed to promote') - return self.follow_the_leader('demoting self because i am not the healthiest node', - 'following a different leader because i am not the healthiest node') + return self.follow('demoting self because I am not allowed to become master', + 'following a different leader because I am not allowed to promote') + return self.follow('demoting self because i am not the healthiest node', + 'following a different leader because i am not the healthiest node') def process_healthy_cluster(self): if self.has_lock(): @@ -312,8 +318,8 @@ class Ha: self.load_cluster_from_dcs() else: logger.info('does not have lock') - return self.follow_the_leader('demoting self because i do not have the lock and i was a leader', - 'no action. i am a secondary and i am following a leader', False) + return self.follow('demoting self because i do not have the lock and i was a leader', + 'no action. i am a secondary and i am following a leader', False) def schedule(self, action): with self._async_executor: @@ -430,7 +436,11 @@ class Ha: else: return self.process_healthy_cluster() finally: - self.state_handler.sync_replication_slots(self.cluster) + # we might not have a valid PostgreSQL connection here if another thread + # stops PostgreSQL, therefore, we only reload replication slots if no + # asyncrhonous processes are running (should be always the case for the master) + if not self._async_executor.busy: + self.state_handler.sync_replication_slots(self.cluster) except DCSError: logger.error('Error communicating with DCS') if self.state_handler.is_running() and self.state_handler.is_leader(): diff --git a/patroni/postgresql.py b/patroni/postgresql.py index 84223675..db47cbff 100644 --- a/patroni/postgresql.py +++ b/patroni/postgresql.py @@ -524,7 +524,7 @@ recovery_target_timeline = 'latest' except: logger.exception("Unable to remove {}".format(path)) - def follow_the_leader(self, leader, recovery=False): + def follow(self, leader, recovery=False): if not self.check_recovery_conf(leader) or recovery: change_role = (self.role == 'master') @@ -634,7 +634,11 @@ $$""".format(name, options), name, password, password) if self.use_slots: try: self.load_replication_slots() - slots = [m.name for m in cluster.members if m.name != self.name] if self.role == 'master' else [] + if self.role == 'master': + slots = [m.name for m in cluster.members if m.name != self.name] + else: + # only manage slots for replicas that want to replicate from this one + slots = [m.name for m in cluster.members if m.replicatefrom == self.name] # drop unused slots for slot in set(self.replication_slots) - set(slots): self.query("""SELECT pg_drop_replication_slot(%s) diff --git a/postgres0.yml b/postgres0.yml index 36018ad5..f29e1fee 100644 --- a/postgres0.yml +++ b/postgres0.yml @@ -88,14 +88,13 @@ postgresql: archive_mode: "on" wal_level: hot_standby archive_command: mkdir -p ../wal_archive && test ! -f ../wal_archive/%f && cp %p ../wal_archive/%f - max_wal_senders: 5 + max_wal_senders: 10 wal_keep_segments: 8 archive_timeout: 1800s - max_replication_slots: 5 + max_replication_slots: 10 hot_standby: "on" wal_log_hints: "on" tags: nofailover: False noloadbalance: False clonefrom: False - replicatefrom: 127.0.0.1 diff --git a/postgres1.yml b/postgres1.yml index e1b61b3b..1e7a7045 100644 --- a/postgres1.yml +++ b/postgres1.yml @@ -88,14 +88,13 @@ postgresql: archive_mode: "on" wal_level: hot_standby archive_command: mkdir -p ../wal_archive && test ! -f ../wal_archive/%f && cp %p ../wal_archive/%f - max_wal_senders: 5 + max_wal_senders: 10 wal_keep_segments: 8 archive_timeout: 1800s - max_replication_slots: 5 + max_replication_slots: 10 hot_standby: "on" wal_log_hints: "on" tags: nofailover: False noloadbalance: False clonefrom: False - replicatefrom: 127.0.0.1 diff --git a/postgres2.yml b/postgres2.yml new file mode 100644 index 00000000..99e8b47e --- /dev/null +++ b/postgres2.yml @@ -0,0 +1,101 @@ +ttl: &ttl 30 +loop_wait: &loop_wait 10 +scope: &scope batman +restapi: + listen: 127.0.0.1:8010 + connect_address: 127.0.0.1:8010 + auth: 'username:password' +# certfile: /etc/ssl/certs/ssl-cert-snakeoil.pem +# keyfile: /etc/ssl/private/ssl-cert-snakeoil.key +etcd: + scope: *scope + ttl: *ttl + host: 127.0.0.1:4001 + #discovery_srv: my-etcd.domain +#zookeeper: +# scope: *scope +# session_timeout: *ttl +# reconnect_timeout: *loop_wait +# hosts: +# - 127.0.0.1:2181 +# - 127.0.0.2:2181 +# exhibitor: +# poll_interval: 300 +# port: 8181 +# hosts: +# - host1 +# - host2 +# - host3 +postgresql: + name: postgresql2 + scope: *scope + listen: 127.0.0.1:5434 + connect_address: 127.0.0.1:5434 + data_dir: data/postgresql2 + maximum_lag_on_failover: 1048576 # 1 megabyte in bytes + use_slots: True + pgpass: /tmp/pgpass2 + initdb: ## We allow the following options to be passed on to initdb + # - auth: authmethod + # - auth-host: authmethod + # - auth-local: authmethod + - encoding: UTF8 + # - data-checksums # When pg_rewind is needed on 9.3, this needs to be enabled + # - locale: locale + # - lc-collate: locale + # - lc-ctype: locale + # - lc-messages: locale + # - lc-monetary: locale + # - lc-numeric: locale + # - lc-time: locale + # - text-search-config: CFG + # - xlogdir: directory + # - debug + # - noclean + pg_rewind: + username: postgres + password: zalando + pg_hba: + - host all all 0.0.0.0/0 md5 + - hostssl all all 0.0.0.0/0 md5 + replication: + username: replicator + password: rep-pass + network: 127.0.0.1/32 + superuser: + user: postgres + password: zalando + admin: + username: admin + password: admin +# commented-out example for wal-e provisioning + create_replica_method: + - basebackup +# - wal_e +# commented-out example for wal-e provisioning + #wal_e: + #command: /patroni/scripts/wale_restore.py + #env_dir: /home/postgres/etc/wal-e.d/env + #threshold_megabytes: 10240 + #threshold_backup_size_percentage: 30 + #retries: 2 + #use_iam: 1 + #recovery_conf: + #restore_command: envdir /etc/wal-e.d/env wal-e wal-fetch "%f" "%p" -p 1 + recovery_conf: + restore_command: cp ../wal_archive/%f %p + parameters: + archive_mode: "on" + wal_level: hot_standby + archive_command: mkdir -p ../wal_archive && test ! -f ../wal_archive/%f && cp %p ../wal_archive/%f + max_wal_senders: 10 + wal_keep_segments: 8 + archive_timeout: 1800s + max_replication_slots: 10 + hot_standby: "on" + wal_log_hints: "on" +tags: + nofailover: False + noloadbalance: False + clonefrom: False + replicatefrom: postgresql1 diff --git a/tests/test_ha.py b/tests/test_ha.py index bcc5c93d..336b78a3 100644 --- a/tests/test_ha.py +++ b/tests/test_ha.py @@ -88,6 +88,7 @@ class MockPatroni: self.api = Mock() self.tags = {} self.nofailover = None + self.replicatefrom = None self.api.connection_string = 'http://127.0.0.1:8008' @@ -128,12 +129,12 @@ class TestHa(unittest.TestCase): self.p.controldata = lambda: {'Database cluster state': 'in production'} self.p.is_healthy = false self.p.is_running = false - self.p.follow_the_leader = false + self.p.follow = false self.assertEquals(self.ha.run_cycle(), 'started as a secondary') self.assertEquals(self.ha.run_cycle(), 'failed to start postgres') def test_recover_master_failed(self): - self.p.follow_the_leader = false + self.p.follow = false self.p.is_healthy = false self.p.is_running = false self.ha.has_lock = true @@ -202,10 +203,12 @@ class TestHa(unittest.TestCase): self.ha.update_lock = false self.assertEquals(self.ha.run_cycle(), 'demoting self because i do not have the lock and i was a leader') - def test_follow_the_leader(self): + def test_follow(self): self.ha.cluster.is_unlocked = false self.p.is_leader = false self.assertEquals(self.ha.run_cycle(), 'no action. i am a secondary and i am following a leader') + self.ha.patroni.replicatefrom = "foo" + self.assertEquals(self.ha.run_cycle(), 'no action. i am a secondary and i am following a leader') def test_no_etcd_connection_master_demote(self): self.ha.load_cluster_from_dcs = Mock(side_effect=DCSError('Etcd is not responding properly')) diff --git a/tests/test_postgresql.py b/tests/test_postgresql.py index a4dae962..8114272b 100644 --- a/tests/test_postgresql.py +++ b/tests/test_postgresql.py @@ -242,23 +242,23 @@ class TestPostgresql(unittest.TestCase): @patch('patroni.postgresql.Postgresql.remove_data_directory', MagicMock(return_value=True)) @patch('patroni.postgresql.Postgresql.single_user_mode', MagicMock(return_value=1)) @patch('patroni.postgresql.Postgresql.write_pgpass', MagicMock(return_value=dict())) - def test_follow_the_leader(self, mock_pg_rewind): - self.p.follow_the_leader(None) - self.p.follow_the_leader(self.leader) - self.p.follow_the_leader(Leader(-1, 28, self.other)) + def test_follow(self, mock_pg_rewind): + self.p.follow(None) + self.p.follow(self.leader) + self.p.follow(Leader(-1, 28, self.other)) self.p.rewind = mock_pg_rewind - self.p.follow_the_leader(self.leader) + self.p.follow(self.leader) self.p.require_rewind() with mock.patch('os.path.islink', MagicMock(return_value=True)): with mock.patch('patroni.postgresql.Postgresql.can_rewind', new_callable=PropertyMock(return_value=True)): with mock.patch('os.unlink', MagicMock(return_value=True)): - self.p.follow_the_leader(self.leader, recovery=True) + self.p.follow(self.leader, recovery=True) self.p.require_rewind() with mock.patch('patroni.postgresql.Postgresql.can_rewind', new_callable=PropertyMock(return_value=True)): self.p.rewind.return_value = True - self.p.follow_the_leader(self.leader, recovery=True) + self.p.follow(self.leader, recovery=True) self.p.rewind.return_value = False - self.p.follow_the_leader(self.leader, recovery=True) + self.p.follow(self.leader, recovery=True) def test_can_rewind(self): tmp = self.p.pg_rewind