From 2c3692582bf35f7b3691c63866cdb93c59667816 Mon Sep 17 00:00:00 2001 From: Jamil Date: Tue, 22 Jul 2025 11:13:45 -0400 Subject: [PATCH] fix(portal): more robust replication pid discovery (#9960) When debugging why we're receiving "Failed to start replication connection" errors on deploy, it was discovered that there's a bug in the Process discovery mechanism that new nodes use to attempt to link to the existing replication connection. When restarting an existing `domain` container that's not doing replication, we see this: ``` {"message":"Elixir.Domain.Events.ReplicationConnection: Publication tables are up to date","time":"2025-07-22T07:18:45.948Z","domain":["elixir"],"application":"domain","severity":"INFO","logging.googleapis.com/sourceLocation":{"function":"Elixir.Domain.Events.ReplicationConnection.handle_publication_tables_diff/2","line":2,"file":"lib/domain/events/replication_connection.ex"},"logging.googleapis.com/operation":{"producer":"#PID<0.764.0>"}} {"message":"notifier only receiving messages from its own node, functionality may be degraded","time":"2025-07-22T07:18:45.942Z","domain":["elixir"],"application":"oban","source":"oban","severity":"DEBUG","event":"notifier:switch","connectivity_status":"solitary","logging.googleapis.com/sourceLocation":{"function":"Elixir.Oban.Telemetry.log/2","line":624,"file":"lib/oban/telemetry.ex"},"logging.googleapis.com/operation":{"producer":"#PID<0.756.0>"}} {"message":"Elixir.Domain.ChangeLogs.ReplicationConnection: Publication tables are up to date","time":"2025-07-22T07:18:45.952Z","domain":["elixir"],"application":"domain","severity":"INFO","logging.googleapis.com/sourceLocation":{"function":"Elixir.Domain.ChangeLogs.ReplicationConnection.handle_publication_tables_diff/2","line":2,"file":"lib/domain/change_logs/replication_connection.ex"},"logging.googleapis.com/operation":{"producer":"#PID<0.763.0>"}} {"message":"Elixir.Domain.ChangeLogs.ReplicationConnection: Starting replication slot change_logs_slot","time":"2025-07-22T07:18:45.966Z","state":"[REDACTED]","domain":["elixir"],"application":"domain","severity":"INFO","logging.googleapis.com/sourceLocation":{"function":"Elixir.Domain.ChangeLogs.ReplicationConnection.handle_result/2","line":2,"file":"lib/domain/change_logs/replication_connection.ex"},"logging.googleapis.com/operation":{"producer":"#PID<0.763.0>"}} {"message":"Elixir.Domain.Events.ReplicationConnection: Starting replication slot events_slot","time":"2025-07-22T07:18:45.966Z","state":"[REDACTED]","domain":["elixir"],"application":"domain","severity":"INFO","logging.googleapis.com/sourceLocation":{"function":"Elixir.Domain.Events.ReplicationConnection.handle_result/2","line":2,"file":"lib/domain/events/replication_connection.ex"},"logging.googleapis.com/operation":{"producer":"#PID<0.764.0>"}} {"message":"Elixir.Domain.ChangeLogs.ReplicationConnection: Replication connection disconnected","time":"2025-07-22T07:18:45.977Z","domain":["elixir"],"application":"domain","counter":0,"severity":"INFO","logging.googleapis.com/sourceLocation":{"function":"Elixir.Domain.ChangeLogs.ReplicationConnection.handle_disconnect/1","line":2,"file":"lib/domain/change_logs/replication_connection.ex"},"logging.googleapis.com/operation":{"producer":"#PID<0.763.0>"}} {"message":"Elixir.Domain.Events.ReplicationConnection: Replication connection disconnected","time":"2025-07-22T07:18:45.977Z","domain":["elixir"],"application":"domain","counter":0,"severity":"INFO","logging.googleapis.com/sourceLocation":{"function":"Elixir.Domain.Events.ReplicationConnection.handle_disconnect/1","line":2,"file":"lib/domain/events/replication_connection.ex"},"logging.googleapis.com/operation":{"producer":"#PID<0.764.0>"}} {"message":"Failed to start replication connection Elixir.Domain.Events.ReplicationConnection","reason":"%Postgrex.Error{message: nil, postgres: %{code: :object_in_use, line: \"607\", message: \"replication slot \\\"events_slot\\\" is active for PID 135123\", file: \"slot.c\", unknown: \"ERROR\", severity: \"ERROR\", pg_code: \"55006\", routine: \"ReplicationSlotAcquire\"}, connection_id: 136400, query: nil}","time":"2025-07-22T07:18:45.978Z","domain":["elixir"],"application":"domain","max_retries":10,"severity":"INFO","logging.googleapis.com/sourceLocation":{"function":"Elixir.Domain.Replication.Manager.handle_info/2","line":41,"file":"lib/domain/replication/manager.ex"},"logging.googleapis.com/operation":{"producer":"#PID<0.761.0>"},"retries":0} {"message":"Failed to start replication connection Elixir.Domain.ChangeLogs.ReplicationConnection","reason":"%Postgrex.Error{message: nil, postgres: %{code: :object_in_use, line: \"607\", message: \"replication slot \\\"change_logs_slot\\\" is active for PID 135124\", file: \"slot.c\", unknown: \"ERROR\", severity: \"ERROR\", pg_code: \"55006\", routine: \"ReplicationSlotAcquire\"}, connection_id: 136401, query: nil}","time":"2025-07-22T07:18:45.978Z","domain":["elixir"],"application":"domain","max_retries":10,"severity":"INFO","logging.googleapis.com/sourceLocation":{"function":"Elixir.Domain.Replication.Manager.handle_info/2","line":41,"file":"lib/domain/replication/manager.ex"},"logging.googleapis.com/operation":{"producer":"#PID<0.760.0>"},"retries":0} ``` Before, we relied on `start_link` telling us that there was an existing pid running in the cluster. However, from the output above, it appears that may not always be reliable. Instead, we first check explicitly where the running process is and, if alive, we try linking to it. If not, we try starting the connection ourselves. Once linked to the process, we react to it being torn down as well, causing a first-one-wins scenario where all nodes will attempt to start replication, minimizing downtime during deploys. Now that https://github.com/firezone/infra/pull/94 is in place, I did verify we are properly handling SIGTERM in the BEAM, so the deployment would now go like this: 1. GCP brings up the new nodes, they all find the existing pid and link to it 2. GCP sends SIGTERM to the old nodes 3. The _actual_ pid receives SIGTERM and exits 4. This exit propagates to all other nodes due to the link 5. Some node will "win", and the others will end up linking to it Fixes #9911 --- .../domain/lib/domain/replication/manager.ex | 111 ++++++++++++------ 1 file changed, 78 insertions(+), 33 deletions(-) diff --git a/elixir/apps/domain/lib/domain/replication/manager.ex b/elixir/apps/domain/lib/domain/replication/manager.ex index 5c2989aca..20104de35 100644 --- a/elixir/apps/domain/lib/domain/replication/manager.ex +++ b/elixir/apps/domain/lib/domain/replication/manager.ex @@ -6,10 +6,11 @@ defmodule Domain.Replication.Manager do use GenServer require Logger - @retry_interval :timer.seconds(30) - # Should be enough to gracefully handle transient network issues and DB restarts, - # but not too long to avoid consuming WAL data. - @max_retries 10 + # These should be enough to gracefully handle transient network issues and DB restarts, + # but not too long to avoid consuming WAL data. When this limit is hit, the Supervisor will restart + # us, so we can be a bit aggressive here. + @retry_interval :timer.seconds(5) + @max_retries 12 def start_link(connection_module, opts) do GenServer.start_link(__MODULE__, connection_module, opts) @@ -18,40 +19,34 @@ defmodule Domain.Replication.Manager do @impl true def init(connection_module) do Process.flag(:trap_exit, true) - send(self(), {:connect, connection_module}) + send(self(), :connect) {:ok, %{retries: 0, connection_pid: nil, connection_module: connection_module}} end + # Try to find an existing connection process or start a new one, with edge cases handled + # to minimize false starts. During deploys, the new nodes will merge into the existing + # cluster state and we want to minimize the window of time where we're not processing + # messages. + @impl true - def handle_info({:connect, connection_module}, %{retries: retries} = state) do - Process.send_after(self(), {:connect, connection_module}, @retry_interval) + def handle_info(:connect, %{connection_module: connection_module, connection_pid: nil} = state) do + Process.send_after(self(), :connect, @retry_interval) - case connection_module.start_link(replication_child_spec(connection_module)) do - {:ok, pid} -> - {:noreply, %{state | retries: 0, connection_pid: pid}} + # First, try to link to an existing connection process + case :global.whereis_name(connection_module) do + :undefined -> + # No existing process found, attempt to start one + start_connection(state) - {:error, {:already_started, pid}} -> - # This will allow our current node to attempt connections whenever any node's - # replication connection dies. - Process.link(pid) - {:noreply, %{state | retries: 0, connection_pid: pid}} + pid when is_pid(pid) -> + case Process.alive?(pid) do + true -> + # Found existing alive process, link to it + link_existing_pid(pid, state) - {:error, reason} -> - if retries < @max_retries do - Logger.info("Failed to start replication connection #{connection_module}", - retries: retries, - max_retries: @max_retries, - reason: inspect(reason) - ) - - {:noreply, %{state | retries: retries + 1, connection_pid: nil}} - else - Logger.error( - "Failed to start replication connection #{connection_module} after #{@max_retries} attempts, giving up!", - reason: inspect(reason) - ) - - {:noreply, %{state | retries: -1, connection_pid: nil}} + false -> + # Process is dead but still registered, try to start a new one + start_connection(state) end end end @@ -60,19 +55,69 @@ defmodule Domain.Replication.Manager do {:EXIT, pid, _reason}, %{connection_module: connection_module, connection_pid: pid} = state ) do - Logger.info("#{connection_module}: Replication connection died, restarting immediately", + Logger.info( + "#{connection_module}: Replication connection died unexpectedly, restarting immediately", died_pid: inspect(pid), died_node: node(pid) ) - send(self(), {:connect, state.connection_module}) + send(self(), :connect) {:noreply, %{state | connection_pid: nil, retries: 0}} end + # Ignore exits from other unrelated processes we may be linked to def handle_info({:EXIT, _other_pid, _reason}, state) do {:noreply, state} end + # Process was found, stop the retry timer + def handle_info(:connect, state) do + {:noreply, state} + end + + defp start_connection(%{connection_module: connection_module} = state) do + case connection_module.start_link(replication_child_spec(connection_module)) do + {:ok, pid} -> + link_existing_pid(pid, state) + + {:error, {:already_started, pid}} -> + link_existing_pid(pid, state) + + {:error, reason} -> + handle_start_error(reason, state) + end + end + + defp link_existing_pid(pid, state) do + Process.link(pid) + {:noreply, %{state | retries: 0, connection_pid: pid}} + rescue + ArgumentError -> + handle_start_error(:link_failed, state) + end + + defp handle_start_error( + reason, + %{retries: retries, connection_module: connection_module} = state + ) do + if retries < @max_retries do + Logger.info("Failed to start replication connection #{connection_module}, retrying...", + retries: retries, + max_retries: @max_retries, + reason: inspect(reason) + ) + + {:noreply, %{state | retries: retries + 1, connection_pid: nil}} + else + Logger.error( + "Failed to start replication connection #{connection_module} after #{@max_retries} attempts, giving up!", + reason: inspect(reason) + ) + + {:noreply, %{state | retries: -1, connection_pid: nil}} + end + end + def replication_child_spec(connection_module) do {connection_opts, config} = Application.fetch_env!(:domain, connection_module)