diff --git a/patroni/postgresql/__init__.py b/patroni/postgresql/__init__.py index 244496b6..64a86f46 100644 --- a/patroni/postgresql/__init__.py +++ b/patroni/postgresql/__init__.py @@ -118,17 +118,28 @@ class Postgresql(object): # Last known running process self._postmaster_proc = None - if self.is_running(): # we are "joining" already running postgres - self.set_state('running') + if self.is_running(): + # If we found postmaster process we need to figure out whether postgres is accepting connections + self.set_state('starting') + self.check_startup_state_changed() + + if self.state == 'running': # we are "joining" already running postgres + # we know that PostgreSQL is accepting connections and can read some GUC's from pg_settings + self.config.load_current_server_parameters() + self.set_role('master' if self.is_leader() else 'replica') - # postpone writing postgresql.conf for 12+ because recovery parameters are not yet known - if self.major_version < 120000 or self.is_leader(): - self.config.write_postgresql_conf() + hba_saved = self.config.replace_pg_hba() ident_saved = self.config.replace_pg_ident() - if hba_saved or ident_saved: + + if self.major_version < 120000 or self.role in ('master', 'primary'): + # If PostgreSQL is running as a primary or we run PostgreSQL that is older than 12 we can + # call reload_config() once again (the first call happened in the ConfigHandler constructor), + # so that it can figure out if config files should be updated and pg_ctl reload executed. + self.config.reload_config(config, sighup=bool(hba_saved or ident_saved)) + elif hba_saved or ident_saved: self.reload() - elif self.role in ('master', 'primary'): + elif not self.is_running() and self.role in ('master', 'primary'): self.set_role('demoted') @property diff --git a/patroni/postgresql/config.py b/patroni/postgresql/config.py index 80f5be38..dc58c079 100644 --- a/patroni/postgresql/config.py +++ b/patroni/postgresql/config.py @@ -326,14 +326,22 @@ class ConfigHandler(object): .format(self._pgpass)) self._passfile = None self._passfile_mtime = None - self._synchronous_standby_names = None self._postmaster_ctime = None self._current_recovery_params: Optional[CaseInsensitiveDict] = None self._config = {} self._recovery_params = CaseInsensitiveDict() - self._server_parameters: CaseInsensitiveDict + self._server_parameters: CaseInsensitiveDict = CaseInsensitiveDict() self.reload_config(config) + def load_current_server_parameters(self) -> None: + """Read GUC's values from ``pg_settings`` when Patroni is joining the the postgres that is already running.""" + exclude = [name.lower() for name, value in self.CMDLINE_OPTIONS.items() if value[1] == _false_validator] \ + + [name.lower() for name in self._RECOVERY_PARAMETERS] + self._server_parameters = CaseInsensitiveDict({r[0]: r[1] for r in self._postgresql.query( + "SELECT name, pg_catalog.current_setting(name) FROM pg_catalog.pg_settings" + " WHERE (source IN ('command line', 'environment variable') OR sourcefile = %s)" + " AND pg_catalog.lower(name) != ALL(%s)", self._postgresql_conf, exclude)}) + def setup_server_parameters(self) -> None: self._server_parameters = self.get_server_parameters(self._config) self._adjust_recovery_parameters() @@ -922,14 +930,15 @@ class ConfigHandler(object): listen_addresses, port = split_host_port(config['listen'], 5432) parameters.update(cluster_name=self._postgresql.scope, listen_addresses=listen_addresses, port=str(port)) if not self._postgresql.global_config or self._postgresql.global_config.is_synchronous_mode: - if self._synchronous_standby_names is None: + synchronous_standby_names = self._server_parameters.get('synchronous_standby_names') + if synchronous_standby_names is None: if self._postgresql.global_config and self._postgresql.global_config.is_synchronous_mode_strict\ and self._postgresql.role in ('master', 'primary', 'promoted'): parameters['synchronous_standby_names'] = '*' else: parameters.pop('synchronous_standby_names', None) else: - parameters['synchronous_standby_names'] = self._synchronous_standby_names + parameters['synchronous_standby_names'] = synchronous_standby_names # Handle hot_standby <-> replica rename if parameters.get('wal_level') == ('hot_standby' if self._postgresql.major_version >= 90600 else 'replica'): @@ -1129,12 +1138,11 @@ class ConfigHandler(object): def set_synchronous_standby_names(self, value: Optional[str]) -> Optional[bool]: """Updates synchronous_standby_names and reloads if necessary. :returns: True if value was updated.""" - if value != self._synchronous_standby_names: + if value != self._server_parameters.get('synchronous_standby_names'): if value is None: self._server_parameters.pop('synchronous_standby_names', None) else: self._server_parameters['synchronous_standby_names'] = value - self._synchronous_standby_names = value if self._postgresql.state == 'running': self.write_postgresql_conf() self._postgresql.reload() diff --git a/tests/__init__.py b/tests/__init__.py index dcf9ed9e..063f94e7 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -118,11 +118,31 @@ class MockCursor(object): '"state":"streaming","sync_state":"async","sync_priority":0}]' now = datetime.datetime.now(tzutc) self.results = [(now, 0, '', 0, '', False, now, 'streaming', None, replication_info)] + elif sql.startswith('SELECT name, pg_catalog.current_setting(name) FROM pg_catalog.pg_settings'): + self.results = [('data_directory', 'data'), + ('hba_file', os.path.join('data', 'pg_hba.conf')), + ('ident_file', os.path.join('data', 'pg_ident.conf')), + ('max_connections', 42), + ('max_locks_per_transaction', 73), + ('max_prepared_transactions', 0), + ('max_replication_slots', 21), + ('max_wal_senders', 37), + ('track_commit_timestamp', 'off'), + ('wal_level', 'replica'), + ('listen_addresses', '6.6.6.6'), + ('port', 1984), + ('archive_command', 'my archive command'), + ('cluster_name', 'my_cluster')] elif sql.startswith('SELECT name, setting'): self.results = [('wal_segment_size', '2048', '8kB', 'integer', 'internal'), ('wal_block_size', '8192', None, 'integer', 'internal'), ('shared_buffers', '16384', '8kB', 'integer', 'postmaster'), ('wal_buffers', '-1', '8kB', 'integer', 'postmaster'), + ('max_connections', '100', None, 'integer', 'postmaster'), + ('max_prepared_transactions', '0', None, 'integer', 'postmaster'), + ('max_worker_processes', '8', None, 'integer', 'postmaster'), + ('max_locks_per_transaction', '64', None, 'integer', 'postmaster'), + ('max_wal_senders', '5', None, 'integer', 'postmaster'), ('search_path', 'public', None, 'string', 'user'), ('port', '5433', None, 'integer', 'postmaster'), ('listen_addresses', '*', None, 'string', 'postmaster'), @@ -222,6 +242,7 @@ class PostgresInit(unittest.TestCase): class BaseTestPostgresql(PostgresInit): + @patch('time.sleep', Mock()) def setUp(self): super(BaseTestPostgresql, self).setUp() diff --git a/tests/test_postgresql.py b/tests/test_postgresql.py index 00b90b94..16cc9d1d 100644 --- a/tests/test_postgresql.py +++ b/tests/test_postgresql.py @@ -715,6 +715,7 @@ class TestPostgresql(BaseTestPostgresql): self.assertEqual(self.p.get_primary_timeline(), 1) @patch.object(Postgresql, 'get_postgres_role_from_data_directory', Mock(return_value='replica')) + @patch.object(Postgresql, 'is_running', Mock(return_value=False)) @patch.object(Bootstrap, 'running_custom_bootstrap', PropertyMock(return_value=True)) @patch.object(Postgresql, 'controldata', Mock(return_value={'max_connections setting': '200', 'max_worker_processes setting': '20', @@ -958,6 +959,7 @@ class TestPostgresql2(BaseTestPostgresql): @patch('patroni.postgresql.CallbackExecutor', Mock()) @patch.object(Postgresql, 'get_major_version', Mock(return_value=140000)) @patch.object(Postgresql, 'is_running', Mock(return_value=True)) + @patch.object(Postgresql, 'is_leader', Mock(return_value=False)) def setUp(self): super(TestPostgresql2, self).setUp()