fix(portal): more robust replication pid discovery (#9960)

When debugging why we're receiving "Failed to start replication
connection" errors on deploy, it was discovered that there's a bug in
the Process discovery mechanism that new nodes use to attempt to link to
the existing replication connection. When restarting an existing
`domain` container that's not doing replication, we see this:

```
{"message":"Elixir.Domain.Events.ReplicationConnection: Publication tables are up to date","time":"2025-07-22T07:18:45.948Z","domain":["elixir"],"application":"domain","severity":"INFO","logging.googleapis.com/sourceLocation":{"function":"Elixir.Domain.Events.ReplicationConnection.handle_publication_tables_diff/2","line":2,"file":"lib/domain/events/replication_connection.ex"},"logging.googleapis.com/operation":{"producer":"#PID<0.764.0>"}}
{"message":"notifier only receiving messages from its own node, functionality may be degraded","time":"2025-07-22T07:18:45.942Z","domain":["elixir"],"application":"oban","source":"oban","severity":"DEBUG","event":"notifier:switch","connectivity_status":"solitary","logging.googleapis.com/sourceLocation":{"function":"Elixir.Oban.Telemetry.log/2","line":624,"file":"lib/oban/telemetry.ex"},"logging.googleapis.com/operation":{"producer":"#PID<0.756.0>"}}
{"message":"Elixir.Domain.ChangeLogs.ReplicationConnection: Publication tables are up to date","time":"2025-07-22T07:18:45.952Z","domain":["elixir"],"application":"domain","severity":"INFO","logging.googleapis.com/sourceLocation":{"function":"Elixir.Domain.ChangeLogs.ReplicationConnection.handle_publication_tables_diff/2","line":2,"file":"lib/domain/change_logs/replication_connection.ex"},"logging.googleapis.com/operation":{"producer":"#PID<0.763.0>"}}
{"message":"Elixir.Domain.ChangeLogs.ReplicationConnection: Starting replication slot change_logs_slot","time":"2025-07-22T07:18:45.966Z","state":"[REDACTED]","domain":["elixir"],"application":"domain","severity":"INFO","logging.googleapis.com/sourceLocation":{"function":"Elixir.Domain.ChangeLogs.ReplicationConnection.handle_result/2","line":2,"file":"lib/domain/change_logs/replication_connection.ex"},"logging.googleapis.com/operation":{"producer":"#PID<0.763.0>"}}
{"message":"Elixir.Domain.Events.ReplicationConnection: Starting replication slot events_slot","time":"2025-07-22T07:18:45.966Z","state":"[REDACTED]","domain":["elixir"],"application":"domain","severity":"INFO","logging.googleapis.com/sourceLocation":{"function":"Elixir.Domain.Events.ReplicationConnection.handle_result/2","line":2,"file":"lib/domain/events/replication_connection.ex"},"logging.googleapis.com/operation":{"producer":"#PID<0.764.0>"}}
{"message":"Elixir.Domain.ChangeLogs.ReplicationConnection: Replication connection disconnected","time":"2025-07-22T07:18:45.977Z","domain":["elixir"],"application":"domain","counter":0,"severity":"INFO","logging.googleapis.com/sourceLocation":{"function":"Elixir.Domain.ChangeLogs.ReplicationConnection.handle_disconnect/1","line":2,"file":"lib/domain/change_logs/replication_connection.ex"},"logging.googleapis.com/operation":{"producer":"#PID<0.763.0>"}}
{"message":"Elixir.Domain.Events.ReplicationConnection: Replication connection disconnected","time":"2025-07-22T07:18:45.977Z","domain":["elixir"],"application":"domain","counter":0,"severity":"INFO","logging.googleapis.com/sourceLocation":{"function":"Elixir.Domain.Events.ReplicationConnection.handle_disconnect/1","line":2,"file":"lib/domain/events/replication_connection.ex"},"logging.googleapis.com/operation":{"producer":"#PID<0.764.0>"}}
{"message":"Failed to start replication connection Elixir.Domain.Events.ReplicationConnection","reason":"%Postgrex.Error{message: nil, postgres: %{code: :object_in_use, line: \"607\", message: \"replication slot \\\"events_slot\\\" is active for PID 135123\", file: \"slot.c\", unknown: \"ERROR\", severity: \"ERROR\", pg_code: \"55006\", routine: \"ReplicationSlotAcquire\"}, connection_id: 136400, query: nil}","time":"2025-07-22T07:18:45.978Z","domain":["elixir"],"application":"domain","max_retries":10,"severity":"INFO","logging.googleapis.com/sourceLocation":{"function":"Elixir.Domain.Replication.Manager.handle_info/2","line":41,"file":"lib/domain/replication/manager.ex"},"logging.googleapis.com/operation":{"producer":"#PID<0.761.0>"},"retries":0}
{"message":"Failed to start replication connection Elixir.Domain.ChangeLogs.ReplicationConnection","reason":"%Postgrex.Error{message: nil, postgres: %{code: :object_in_use, line: \"607\", message: \"replication slot \\\"change_logs_slot\\\" is active for PID 135124\", file: \"slot.c\", unknown: \"ERROR\", severity: \"ERROR\", pg_code: \"55006\", routine: \"ReplicationSlotAcquire\"}, connection_id: 136401, query: nil}","time":"2025-07-22T07:18:45.978Z","domain":["elixir"],"application":"domain","max_retries":10,"severity":"INFO","logging.googleapis.com/sourceLocation":{"function":"Elixir.Domain.Replication.Manager.handle_info/2","line":41,"file":"lib/domain/replication/manager.ex"},"logging.googleapis.com/operation":{"producer":"#PID<0.760.0>"},"retries":0}
```

Before, we relied on `start_link` telling us that there was an existing
pid running in the cluster. However, from the output above, it appears
that may not always be reliable.

Instead, we first check explicitly where the running process is and, if
alive, we try linking to it. If not, we try starting the connection
ourselves.

Once linked to the process, we react to it being torn down as well,
causing a first-one-wins scenario where all nodes will attempt to start
replication, minimizing downtime during deploys.

Now that https://github.com/firezone/infra/pull/94 is in place, I did
verify we are properly handling SIGTERM in the BEAM, so the deployment
would now go like this:

1. GCP brings up the new nodes, they all find the existing pid and link
to it
2. GCP sends SIGTERM to the old nodes
3. The _actual_ pid receives SIGTERM and exits
4. This exit propagates to all other nodes due to the link
5. Some node will "win", and the others will end up linking to it

Fixes #9911
This commit is contained in:
Jamil
2025-07-22 11:13:45 -04:00
committed by GitHub
parent 4292ca7ae8
commit 2c3692582b

View File

@@ -6,10 +6,11 @@ defmodule Domain.Replication.Manager do
use GenServer
require Logger
@retry_interval :timer.seconds(30)
# Should be enough to gracefully handle transient network issues and DB restarts,
# but not too long to avoid consuming WAL data.
@max_retries 10
# These should be enough to gracefully handle transient network issues and DB restarts,
# but not too long to avoid consuming WAL data. When this limit is hit, the Supervisor will restart
# us, so we can be a bit aggressive here.
@retry_interval :timer.seconds(5)
@max_retries 12
def start_link(connection_module, opts) do
GenServer.start_link(__MODULE__, connection_module, opts)
@@ -18,40 +19,34 @@ defmodule Domain.Replication.Manager do
@impl true
def init(connection_module) do
Process.flag(:trap_exit, true)
send(self(), {:connect, connection_module})
send(self(), :connect)
{:ok, %{retries: 0, connection_pid: nil, connection_module: connection_module}}
end
# Try to find an existing connection process or start a new one, with edge cases handled
# to minimize false starts. During deploys, the new nodes will merge into the existing
# cluster state and we want to minimize the window of time where we're not processing
# messages.
@impl true
def handle_info({:connect, connection_module}, %{retries: retries} = state) do
Process.send_after(self(), {:connect, connection_module}, @retry_interval)
def handle_info(:connect, %{connection_module: connection_module, connection_pid: nil} = state) do
Process.send_after(self(), :connect, @retry_interval)
case connection_module.start_link(replication_child_spec(connection_module)) do
{:ok, pid} ->
{:noreply, %{state | retries: 0, connection_pid: pid}}
# First, try to link to an existing connection process
case :global.whereis_name(connection_module) do
:undefined ->
# No existing process found, attempt to start one
start_connection(state)
{:error, {:already_started, pid}} ->
# This will allow our current node to attempt connections whenever any node's
# replication connection dies.
Process.link(pid)
{:noreply, %{state | retries: 0, connection_pid: pid}}
pid when is_pid(pid) ->
case Process.alive?(pid) do
true ->
# Found existing alive process, link to it
link_existing_pid(pid, state)
{:error, reason} ->
if retries < @max_retries do
Logger.info("Failed to start replication connection #{connection_module}",
retries: retries,
max_retries: @max_retries,
reason: inspect(reason)
)
{:noreply, %{state | retries: retries + 1, connection_pid: nil}}
else
Logger.error(
"Failed to start replication connection #{connection_module} after #{@max_retries} attempts, giving up!",
reason: inspect(reason)
)
{:noreply, %{state | retries: -1, connection_pid: nil}}
false ->
# Process is dead but still registered, try to start a new one
start_connection(state)
end
end
end
@@ -60,19 +55,69 @@ defmodule Domain.Replication.Manager do
{:EXIT, pid, _reason},
%{connection_module: connection_module, connection_pid: pid} = state
) do
Logger.info("#{connection_module}: Replication connection died, restarting immediately",
Logger.info(
"#{connection_module}: Replication connection died unexpectedly, restarting immediately",
died_pid: inspect(pid),
died_node: node(pid)
)
send(self(), {:connect, state.connection_module})
send(self(), :connect)
{:noreply, %{state | connection_pid: nil, retries: 0}}
end
# Ignore exits from other unrelated processes we may be linked to
def handle_info({:EXIT, _other_pid, _reason}, state) do
{:noreply, state}
end
# Process was found, stop the retry timer
def handle_info(:connect, state) do
{:noreply, state}
end
defp start_connection(%{connection_module: connection_module} = state) do
case connection_module.start_link(replication_child_spec(connection_module)) do
{:ok, pid} ->
link_existing_pid(pid, state)
{:error, {:already_started, pid}} ->
link_existing_pid(pid, state)
{:error, reason} ->
handle_start_error(reason, state)
end
end
defp link_existing_pid(pid, state) do
Process.link(pid)
{:noreply, %{state | retries: 0, connection_pid: pid}}
rescue
ArgumentError ->
handle_start_error(:link_failed, state)
end
defp handle_start_error(
reason,
%{retries: retries, connection_module: connection_module} = state
) do
if retries < @max_retries do
Logger.info("Failed to start replication connection #{connection_module}, retrying...",
retries: retries,
max_retries: @max_retries,
reason: inspect(reason)
)
{:noreply, %{state | retries: retries + 1, connection_pid: nil}}
else
Logger.error(
"Failed to start replication connection #{connection_module} after #{@max_retries} attempts, giving up!",
reason: inspect(reason)
)
{:noreply, %{state | retries: -1, connection_pid: nil}}
end
end
def replication_child_spec(connection_module) do
{connection_opts, config} =
Application.fetch_env!(:domain, connection_module)