From 096ee8f36f7ae40e5e89cccc6ce115f7438fca5b Mon Sep 17 00:00:00 2001 From: Alexander Kukushkin Date: Tue, 26 Sep 2023 10:40:51 +0200 Subject: [PATCH] Read GUC's values when joining running Postgres (#2876) If restarted in pause Patroni was discarding `synchronous_standby_names` from `postgresql.conf` because in the internal cache this values was set to `None`. As a result synchronous replication transitioned to a broken state, with no synchronous replicas according to the `synchronous_standby_names` and Patroni not selecting/setting the new synchronous replicas (another bug). To solve the problem of broken initial state and to avoid similar issues with other GUC's we will read GUC's value if Patroni is joining running Postgres. --- patroni/postgresql/__init__.py | 25 ++++++++++++++++++------- patroni/postgresql/config.py | 20 ++++++++++++++------ tests/__init__.py | 21 +++++++++++++++++++++ tests/test_postgresql.py | 2 ++ 4 files changed, 55 insertions(+), 13 deletions(-) diff --git a/patroni/postgresql/__init__.py b/patroni/postgresql/__init__.py index 244496b6..64a86f46 100644 --- a/patroni/postgresql/__init__.py +++ b/patroni/postgresql/__init__.py @@ -118,17 +118,28 @@ class Postgresql(object): # Last known running process self._postmaster_proc = None - if self.is_running(): # we are "joining" already running postgres - self.set_state('running') + if self.is_running(): + # If we found postmaster process we need to figure out whether postgres is accepting connections + self.set_state('starting') + self.check_startup_state_changed() + + if self.state == 'running': # we are "joining" already running postgres + # we know that PostgreSQL is accepting connections and can read some GUC's from pg_settings + self.config.load_current_server_parameters() + self.set_role('master' if self.is_leader() else 'replica') - # postpone writing postgresql.conf for 12+ because recovery parameters are not yet known - if self.major_version < 120000 or self.is_leader(): - self.config.write_postgresql_conf() + hba_saved = self.config.replace_pg_hba() ident_saved = self.config.replace_pg_ident() - if hba_saved or ident_saved: + + if self.major_version < 120000 or self.role in ('master', 'primary'): + # If PostgreSQL is running as a primary or we run PostgreSQL that is older than 12 we can + # call reload_config() once again (the first call happened in the ConfigHandler constructor), + # so that it can figure out if config files should be updated and pg_ctl reload executed. + self.config.reload_config(config, sighup=bool(hba_saved or ident_saved)) + elif hba_saved or ident_saved: self.reload() - elif self.role in ('master', 'primary'): + elif not self.is_running() and self.role in ('master', 'primary'): self.set_role('demoted') @property diff --git a/patroni/postgresql/config.py b/patroni/postgresql/config.py index 80f5be38..dc58c079 100644 --- a/patroni/postgresql/config.py +++ b/patroni/postgresql/config.py @@ -326,14 +326,22 @@ class ConfigHandler(object): .format(self._pgpass)) self._passfile = None self._passfile_mtime = None - self._synchronous_standby_names = None self._postmaster_ctime = None self._current_recovery_params: Optional[CaseInsensitiveDict] = None self._config = {} self._recovery_params = CaseInsensitiveDict() - self._server_parameters: CaseInsensitiveDict + self._server_parameters: CaseInsensitiveDict = CaseInsensitiveDict() self.reload_config(config) + def load_current_server_parameters(self) -> None: + """Read GUC's values from ``pg_settings`` when Patroni is joining the the postgres that is already running.""" + exclude = [name.lower() for name, value in self.CMDLINE_OPTIONS.items() if value[1] == _false_validator] \ + + [name.lower() for name in self._RECOVERY_PARAMETERS] + self._server_parameters = CaseInsensitiveDict({r[0]: r[1] for r in self._postgresql.query( + "SELECT name, pg_catalog.current_setting(name) FROM pg_catalog.pg_settings" + " WHERE (source IN ('command line', 'environment variable') OR sourcefile = %s)" + " AND pg_catalog.lower(name) != ALL(%s)", self._postgresql_conf, exclude)}) + def setup_server_parameters(self) -> None: self._server_parameters = self.get_server_parameters(self._config) self._adjust_recovery_parameters() @@ -922,14 +930,15 @@ class ConfigHandler(object): listen_addresses, port = split_host_port(config['listen'], 5432) parameters.update(cluster_name=self._postgresql.scope, listen_addresses=listen_addresses, port=str(port)) if not self._postgresql.global_config or self._postgresql.global_config.is_synchronous_mode: - if self._synchronous_standby_names is None: + synchronous_standby_names = self._server_parameters.get('synchronous_standby_names') + if synchronous_standby_names is None: if self._postgresql.global_config and self._postgresql.global_config.is_synchronous_mode_strict\ and self._postgresql.role in ('master', 'primary', 'promoted'): parameters['synchronous_standby_names'] = '*' else: parameters.pop('synchronous_standby_names', None) else: - parameters['synchronous_standby_names'] = self._synchronous_standby_names + parameters['synchronous_standby_names'] = synchronous_standby_names # Handle hot_standby <-> replica rename if parameters.get('wal_level') == ('hot_standby' if self._postgresql.major_version >= 90600 else 'replica'): @@ -1129,12 +1138,11 @@ class ConfigHandler(object): def set_synchronous_standby_names(self, value: Optional[str]) -> Optional[bool]: """Updates synchronous_standby_names and reloads if necessary. :returns: True if value was updated.""" - if value != self._synchronous_standby_names: + if value != self._server_parameters.get('synchronous_standby_names'): if value is None: self._server_parameters.pop('synchronous_standby_names', None) else: self._server_parameters['synchronous_standby_names'] = value - self._synchronous_standby_names = value if self._postgresql.state == 'running': self.write_postgresql_conf() self._postgresql.reload() diff --git a/tests/__init__.py b/tests/__init__.py index dcf9ed9e..063f94e7 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -118,11 +118,31 @@ class MockCursor(object): '"state":"streaming","sync_state":"async","sync_priority":0}]' now = datetime.datetime.now(tzutc) self.results = [(now, 0, '', 0, '', False, now, 'streaming', None, replication_info)] + elif sql.startswith('SELECT name, pg_catalog.current_setting(name) FROM pg_catalog.pg_settings'): + self.results = [('data_directory', 'data'), + ('hba_file', os.path.join('data', 'pg_hba.conf')), + ('ident_file', os.path.join('data', 'pg_ident.conf')), + ('max_connections', 42), + ('max_locks_per_transaction', 73), + ('max_prepared_transactions', 0), + ('max_replication_slots', 21), + ('max_wal_senders', 37), + ('track_commit_timestamp', 'off'), + ('wal_level', 'replica'), + ('listen_addresses', '6.6.6.6'), + ('port', 1984), + ('archive_command', 'my archive command'), + ('cluster_name', 'my_cluster')] elif sql.startswith('SELECT name, setting'): self.results = [('wal_segment_size', '2048', '8kB', 'integer', 'internal'), ('wal_block_size', '8192', None, 'integer', 'internal'), ('shared_buffers', '16384', '8kB', 'integer', 'postmaster'), ('wal_buffers', '-1', '8kB', 'integer', 'postmaster'), + ('max_connections', '100', None, 'integer', 'postmaster'), + ('max_prepared_transactions', '0', None, 'integer', 'postmaster'), + ('max_worker_processes', '8', None, 'integer', 'postmaster'), + ('max_locks_per_transaction', '64', None, 'integer', 'postmaster'), + ('max_wal_senders', '5', None, 'integer', 'postmaster'), ('search_path', 'public', None, 'string', 'user'), ('port', '5433', None, 'integer', 'postmaster'), ('listen_addresses', '*', None, 'string', 'postmaster'), @@ -222,6 +242,7 @@ class PostgresInit(unittest.TestCase): class BaseTestPostgresql(PostgresInit): + @patch('time.sleep', Mock()) def setUp(self): super(BaseTestPostgresql, self).setUp() diff --git a/tests/test_postgresql.py b/tests/test_postgresql.py index 00b90b94..16cc9d1d 100644 --- a/tests/test_postgresql.py +++ b/tests/test_postgresql.py @@ -715,6 +715,7 @@ class TestPostgresql(BaseTestPostgresql): self.assertEqual(self.p.get_primary_timeline(), 1) @patch.object(Postgresql, 'get_postgres_role_from_data_directory', Mock(return_value='replica')) + @patch.object(Postgresql, 'is_running', Mock(return_value=False)) @patch.object(Bootstrap, 'running_custom_bootstrap', PropertyMock(return_value=True)) @patch.object(Postgresql, 'controldata', Mock(return_value={'max_connections setting': '200', 'max_worker_processes setting': '20', @@ -958,6 +959,7 @@ class TestPostgresql2(BaseTestPostgresql): @patch('patroni.postgresql.CallbackExecutor', Mock()) @patch.object(Postgresql, 'get_major_version', Mock(return_value=140000)) @patch.object(Postgresql, 'is_running', Mock(return_value=True)) + @patch.object(Postgresql, 'is_leader', Mock(return_value=False)) def setUp(self): super(TestPostgresql2, self).setUp()