fix(portal): Restrict WAL streaming to domain nodes only (#8956)

The `web` and `api` application use `domain` as a dependency in their
`mix.exs`. This means by default their Supervisor will start the
Domain's supervision tree as well.

The author did not realize this at the time of implementation, and so we
now leverage the convention in place for restricting tasks to `domain`
nodes, the `background_jobs_enabled` application configuration
parameter.

We also add an info log when the replication slot is being started so we
can verify the node it's starting on.
This commit is contained in:
Jamil
2025-05-01 06:28:40 -07:00
committed by GitHub
parent 12efba3cc2
commit 8e054f5c74
4 changed files with 34 additions and 22 deletions

View File

@@ -26,7 +26,6 @@ defmodule Domain.Application do
# Note: only one of platform adapters will be actually started.
Domain.GoogleCloudPlatform,
Domain.Cluster,
{Oban, Application.fetch_env!(:domain, Oban)},
# Application
Domain.Tokens,
@@ -41,11 +40,22 @@ defmodule Domain.Application do
Domain.ComponentVersions,
# Observability
Domain.Telemetry,
Domain.Telemetry
] ++ background_children()
end
# WAL replication
replication_child_spec()
]
defp background_children do
if background_jobs_enabled?() do
[
# Job system
{Oban, Application.fetch_env!(:domain, Oban)},
# WAL replication
replication_child_spec()
]
else
[]
end
end
defp replication_child_spec do
@@ -98,4 +108,13 @@ defmodule Domain.Application do
_ -> :info
end
end
defp background_jobs_enabled? do
# The web and api applications also start Domain's supervision tree so that they may
# use the same database connection pool and other shared resources. We want to restrict
# some children to only run in the domain node.
# For now, we use BACKGROUND_JOBS_ENABLED to determine if we are running in the domain node.
# We could also use the node's name, but this doesn't work as well in dev/test environments.
Application.get_env(:domain, :background_jobs_enabled, false)
end
end

View File

@@ -120,6 +120,8 @@ defmodule Domain.Events.ReplicationConnection do
end
def handle_result([%Postgrex.Result{}], %__MODULE__{step: :start_replication_slot} = state) do
Logger.info("Starting replication slot", state: inspect(state))
query =
"START_REPLICATION SLOT \"#{state.replication_slot_name}\" LOGICAL 0/0 (proto_version '#{state.proto_version}', publication_names '#{state.publication_name}')"
@@ -288,20 +290,20 @@ defmodule Domain.Events.ReplicationConnection do
@doc """
Called when the connection is disconnected unexpectedly.
We log the error and set the state to :disconnected, which will cause the
ReplicationConnection to attempt to reconnect when auto_reconnect is enabled.
This will happen if:
1. Postgres is restarted such as during a maintenance window
2. The connection is closed by the server due to our failure to acknowledge
Keepalive messages in a timely manner
3. The connection is cut due to a network error
4. The ReplicationConnection process crashes or is killed abruptly for any reason
5. Potentially during a deploy if the connection is not closed gracefully.
Our Supervisor will restart this process automatically so this is not an error.
"""
@impl true
def handle_disconnect(state) do
Logger.warning("Replication connection disconnected",
Logger.info("Replication connection disconnected",
state: inspect(state)
)

View File

@@ -18,15 +18,13 @@ defmodule Domain.Events.ReplicationConnectionTest do
# Used to test live connection
setup_all do
{config, connection_opts} =
{connection_opts, config} =
Application.fetch_env!(:domain, Domain.Events.ReplicationConnection)
|> Keyword.pop(:connection_opts)
instance = struct(Domain.Events.ReplicationConnection, config)
init_state = %{
connection_opts: connection_opts,
instance: instance
instance: struct(Domain.Events.ReplicationConnection, config)
}
child_spec = %{
@@ -205,16 +203,11 @@ defmodule Domain.Events.ReplicationConnectionTest do
end
describe "handle_disconnect/1" do
test "handle_disconnect resets step to :disconnected and logs warning" do
test "handle_disconnect resets step to :disconnected" do
state = %{@mock_state | step: :streaming}
expected_state = %{state | step: :disconnected}
log_output =
ExUnit.CaptureLog.capture_log(fn ->
assert {:noreply, ^expected_state} = ReplicationConnection.handle_disconnect(state)
end)
assert log_output =~ "Replication connection disconnected"
assert {:noreply, ^expected_state} = ReplicationConnection.handle_disconnect(state)
end
end
end

View File

@@ -29,8 +29,6 @@ if config_env() == :prod do
config :domain, Domain.Events.ReplicationConnection,
connection_opts: [
# Automatically reconnect if we lose connection.
auto_reconnect: true,
hostname: compile_config!(:database_host),
port: compile_config!(:database_port),
ssl: compile_config!(:database_ssl_enabled),