diff --git a/elixir/apps/domain/lib/domain/replication/manager.ex b/elixir/apps/domain/lib/domain/replication/manager.ex index 5c2989aca..20104de35 100644 --- a/elixir/apps/domain/lib/domain/replication/manager.ex +++ b/elixir/apps/domain/lib/domain/replication/manager.ex @@ -6,10 +6,11 @@ defmodule Domain.Replication.Manager do use GenServer require Logger - @retry_interval :timer.seconds(30) - # Should be enough to gracefully handle transient network issues and DB restarts, - # but not too long to avoid consuming WAL data. - @max_retries 10 + # These should be enough to gracefully handle transient network issues and DB restarts, + # but not too long to avoid consuming WAL data. When this limit is hit, the Supervisor will restart + # us, so we can be a bit aggressive here. + @retry_interval :timer.seconds(5) + @max_retries 12 def start_link(connection_module, opts) do GenServer.start_link(__MODULE__, connection_module, opts) @@ -18,40 +19,34 @@ defmodule Domain.Replication.Manager do @impl true def init(connection_module) do Process.flag(:trap_exit, true) - send(self(), {:connect, connection_module}) + send(self(), :connect) {:ok, %{retries: 0, connection_pid: nil, connection_module: connection_module}} end + # Try to find an existing connection process or start a new one, with edge cases handled + # to minimize false starts. During deploys, the new nodes will merge into the existing + # cluster state and we want to minimize the window of time where we're not processing + # messages. + @impl true - def handle_info({:connect, connection_module}, %{retries: retries} = state) do - Process.send_after(self(), {:connect, connection_module}, @retry_interval) + def handle_info(:connect, %{connection_module: connection_module, connection_pid: nil} = state) do + Process.send_after(self(), :connect, @retry_interval) - case connection_module.start_link(replication_child_spec(connection_module)) do - {:ok, pid} -> - {:noreply, %{state | retries: 0, connection_pid: pid}} + # First, try to link to an existing connection process + case :global.whereis_name(connection_module) do + :undefined -> + # No existing process found, attempt to start one + start_connection(state) - {:error, {:already_started, pid}} -> - # This will allow our current node to attempt connections whenever any node's - # replication connection dies. - Process.link(pid) - {:noreply, %{state | retries: 0, connection_pid: pid}} + pid when is_pid(pid) -> + case Process.alive?(pid) do + true -> + # Found existing alive process, link to it + link_existing_pid(pid, state) - {:error, reason} -> - if retries < @max_retries do - Logger.info("Failed to start replication connection #{connection_module}", - retries: retries, - max_retries: @max_retries, - reason: inspect(reason) - ) - - {:noreply, %{state | retries: retries + 1, connection_pid: nil}} - else - Logger.error( - "Failed to start replication connection #{connection_module} after #{@max_retries} attempts, giving up!", - reason: inspect(reason) - ) - - {:noreply, %{state | retries: -1, connection_pid: nil}} + false -> + # Process is dead but still registered, try to start a new one + start_connection(state) end end end @@ -60,19 +55,69 @@ defmodule Domain.Replication.Manager do {:EXIT, pid, _reason}, %{connection_module: connection_module, connection_pid: pid} = state ) do - Logger.info("#{connection_module}: Replication connection died, restarting immediately", + Logger.info( + "#{connection_module}: Replication connection died unexpectedly, restarting immediately", died_pid: inspect(pid), died_node: node(pid) ) - send(self(), {:connect, state.connection_module}) + send(self(), :connect) {:noreply, %{state | connection_pid: nil, retries: 0}} end + # Ignore exits from other unrelated processes we may be linked to def handle_info({:EXIT, _other_pid, _reason}, state) do {:noreply, state} end + # Process was found, stop the retry timer + def handle_info(:connect, state) do + {:noreply, state} + end + + defp start_connection(%{connection_module: connection_module} = state) do + case connection_module.start_link(replication_child_spec(connection_module)) do + {:ok, pid} -> + link_existing_pid(pid, state) + + {:error, {:already_started, pid}} -> + link_existing_pid(pid, state) + + {:error, reason} -> + handle_start_error(reason, state) + end + end + + defp link_existing_pid(pid, state) do + Process.link(pid) + {:noreply, %{state | retries: 0, connection_pid: pid}} + rescue + ArgumentError -> + handle_start_error(:link_failed, state) + end + + defp handle_start_error( + reason, + %{retries: retries, connection_module: connection_module} = state + ) do + if retries < @max_retries do + Logger.info("Failed to start replication connection #{connection_module}, retrying...", + retries: retries, + max_retries: @max_retries, + reason: inspect(reason) + ) + + {:noreply, %{state | retries: retries + 1, connection_pid: nil}} + else + Logger.error( + "Failed to start replication connection #{connection_module} after #{@max_retries} attempts, giving up!", + reason: inspect(reason) + ) + + {:noreply, %{state | retries: -1, connection_pid: nil}} + end + end + def replication_child_spec(connection_module) do {connection_opts, config} = Application.fetch_env!(:domain, connection_module)