Read GUC's values when joining running Postgres (#2876)

If restarted in pause Patroni was discarding `synchronous_standby_names` from `postgresql.conf` because in the internal cache this values was set to `None`. As a result synchronous replication transitioned to a broken state, with no synchronous replicas according to the `synchronous_standby_names` and Patroni not selecting/setting the new synchronous replicas (another bug).

To solve the problem of broken initial state and to avoid similar issues with other GUC's we will read GUC's value if Patroni is joining running Postgres.
This commit is contained in:
Alexander Kukushkin
2023-09-26 10:40:51 +02:00
parent 91e2be092c
commit 096ee8f36f
4 changed files with 55 additions and 13 deletions

View File

@@ -118,17 +118,28 @@ class Postgresql(object):
# Last known running process
self._postmaster_proc = None
if self.is_running(): # we are "joining" already running postgres
self.set_state('running')
if self.is_running():
# If we found postmaster process we need to figure out whether postgres is accepting connections
self.set_state('starting')
self.check_startup_state_changed()
if self.state == 'running': # we are "joining" already running postgres
# we know that PostgreSQL is accepting connections and can read some GUC's from pg_settings
self.config.load_current_server_parameters()
self.set_role('master' if self.is_leader() else 'replica')
# postpone writing postgresql.conf for 12+ because recovery parameters are not yet known
if self.major_version < 120000 or self.is_leader():
self.config.write_postgresql_conf()
hba_saved = self.config.replace_pg_hba()
ident_saved = self.config.replace_pg_ident()
if hba_saved or ident_saved:
if self.major_version < 120000 or self.role in ('master', 'primary'):
# If PostgreSQL is running as a primary or we run PostgreSQL that is older than 12 we can
# call reload_config() once again (the first call happened in the ConfigHandler constructor),
# so that it can figure out if config files should be updated and pg_ctl reload executed.
self.config.reload_config(config, sighup=bool(hba_saved or ident_saved))
elif hba_saved or ident_saved:
self.reload()
elif self.role in ('master', 'primary'):
elif not self.is_running() and self.role in ('master', 'primary'):
self.set_role('demoted')
@property

View File

@@ -326,14 +326,22 @@ class ConfigHandler(object):
.format(self._pgpass))
self._passfile = None
self._passfile_mtime = None
self._synchronous_standby_names = None
self._postmaster_ctime = None
self._current_recovery_params: Optional[CaseInsensitiveDict] = None
self._config = {}
self._recovery_params = CaseInsensitiveDict()
self._server_parameters: CaseInsensitiveDict
self._server_parameters: CaseInsensitiveDict = CaseInsensitiveDict()
self.reload_config(config)
def load_current_server_parameters(self) -> None:
"""Read GUC's values from ``pg_settings`` when Patroni is joining the the postgres that is already running."""
exclude = [name.lower() for name, value in self.CMDLINE_OPTIONS.items() if value[1] == _false_validator] \
+ [name.lower() for name in self._RECOVERY_PARAMETERS]
self._server_parameters = CaseInsensitiveDict({r[0]: r[1] for r in self._postgresql.query(
"SELECT name, pg_catalog.current_setting(name) FROM pg_catalog.pg_settings"
" WHERE (source IN ('command line', 'environment variable') OR sourcefile = %s)"
" AND pg_catalog.lower(name) != ALL(%s)", self._postgresql_conf, exclude)})
def setup_server_parameters(self) -> None:
self._server_parameters = self.get_server_parameters(self._config)
self._adjust_recovery_parameters()
@@ -922,14 +930,15 @@ class ConfigHandler(object):
listen_addresses, port = split_host_port(config['listen'], 5432)
parameters.update(cluster_name=self._postgresql.scope, listen_addresses=listen_addresses, port=str(port))
if not self._postgresql.global_config or self._postgresql.global_config.is_synchronous_mode:
if self._synchronous_standby_names is None:
synchronous_standby_names = self._server_parameters.get('synchronous_standby_names')
if synchronous_standby_names is None:
if self._postgresql.global_config and self._postgresql.global_config.is_synchronous_mode_strict\
and self._postgresql.role in ('master', 'primary', 'promoted'):
parameters['synchronous_standby_names'] = '*'
else:
parameters.pop('synchronous_standby_names', None)
else:
parameters['synchronous_standby_names'] = self._synchronous_standby_names
parameters['synchronous_standby_names'] = synchronous_standby_names
# Handle hot_standby <-> replica rename
if parameters.get('wal_level') == ('hot_standby' if self._postgresql.major_version >= 90600 else 'replica'):
@@ -1129,12 +1138,11 @@ class ConfigHandler(object):
def set_synchronous_standby_names(self, value: Optional[str]) -> Optional[bool]:
"""Updates synchronous_standby_names and reloads if necessary.
:returns: True if value was updated."""
if value != self._synchronous_standby_names:
if value != self._server_parameters.get('synchronous_standby_names'):
if value is None:
self._server_parameters.pop('synchronous_standby_names', None)
else:
self._server_parameters['synchronous_standby_names'] = value
self._synchronous_standby_names = value
if self._postgresql.state == 'running':
self.write_postgresql_conf()
self._postgresql.reload()

View File

@@ -118,11 +118,31 @@ class MockCursor(object):
'"state":"streaming","sync_state":"async","sync_priority":0}]'
now = datetime.datetime.now(tzutc)
self.results = [(now, 0, '', 0, '', False, now, 'streaming', None, replication_info)]
elif sql.startswith('SELECT name, pg_catalog.current_setting(name) FROM pg_catalog.pg_settings'):
self.results = [('data_directory', 'data'),
('hba_file', os.path.join('data', 'pg_hba.conf')),
('ident_file', os.path.join('data', 'pg_ident.conf')),
('max_connections', 42),
('max_locks_per_transaction', 73),
('max_prepared_transactions', 0),
('max_replication_slots', 21),
('max_wal_senders', 37),
('track_commit_timestamp', 'off'),
('wal_level', 'replica'),
('listen_addresses', '6.6.6.6'),
('port', 1984),
('archive_command', 'my archive command'),
('cluster_name', 'my_cluster')]
elif sql.startswith('SELECT name, setting'):
self.results = [('wal_segment_size', '2048', '8kB', 'integer', 'internal'),
('wal_block_size', '8192', None, 'integer', 'internal'),
('shared_buffers', '16384', '8kB', 'integer', 'postmaster'),
('wal_buffers', '-1', '8kB', 'integer', 'postmaster'),
('max_connections', '100', None, 'integer', 'postmaster'),
('max_prepared_transactions', '0', None, 'integer', 'postmaster'),
('max_worker_processes', '8', None, 'integer', 'postmaster'),
('max_locks_per_transaction', '64', None, 'integer', 'postmaster'),
('max_wal_senders', '5', None, 'integer', 'postmaster'),
('search_path', 'public', None, 'string', 'user'),
('port', '5433', None, 'integer', 'postmaster'),
('listen_addresses', '*', None, 'string', 'postmaster'),
@@ -222,6 +242,7 @@ class PostgresInit(unittest.TestCase):
class BaseTestPostgresql(PostgresInit):
@patch('time.sleep', Mock())
def setUp(self):
super(BaseTestPostgresql, self).setUp()

View File

@@ -715,6 +715,7 @@ class TestPostgresql(BaseTestPostgresql):
self.assertEqual(self.p.get_primary_timeline(), 1)
@patch.object(Postgresql, 'get_postgres_role_from_data_directory', Mock(return_value='replica'))
@patch.object(Postgresql, 'is_running', Mock(return_value=False))
@patch.object(Bootstrap, 'running_custom_bootstrap', PropertyMock(return_value=True))
@patch.object(Postgresql, 'controldata', Mock(return_value={'max_connections setting': '200',
'max_worker_processes setting': '20',
@@ -958,6 +959,7 @@ class TestPostgresql2(BaseTestPostgresql):
@patch('patroni.postgresql.CallbackExecutor', Mock())
@patch.object(Postgresql, 'get_major_version', Mock(return_value=140000))
@patch.object(Postgresql, 'is_running', Mock(return_value=True))
@patch.object(Postgresql, 'is_leader', Mock(return_value=False))
def setUp(self):
super(TestPostgresql2, self).setUp()