From f07aa8aa3a3418ddaad27187de25dd6c252fb79e Mon Sep 17 00:00:00 2001 From: Jamil Date: Fri, 16 May 2025 12:54:27 -0700 Subject: [PATCH] fix(portal): Add ReplicationConnectionManager (#9158) When deploying, new Elixir nodes are spun up before the old ones are brought down. This ensures that the state in the cluster is merge into the new nodes between the old ones go away. This also means, however, that the existing WAL consumer is still up when our new one tries to come online. Normally, this isn't an issue, because we find the old pid and return it with `{:ok, existing_pid}`. When that VM goes away, the Supervisor(s) of the new application notice and restart it, However, if the cluster state diverges or is inconsistent during this period, we may fail to find the existing pid, and try to start a new ReplicationConnection. If the old pid is still active, this will fail because there's a mutex on the connection. The original implementation was designed to handle this case using the Supervisor with a `:transient` restart policy. What the author failed to understand is that the restart policy applies only to _restarts_ and not initial starts, so if all of the new application servers fail to find the old pid which is still connected, and they all fail to come up, we won't consume the WAL. This is fixed with a `ReplicationConnectionManager` that always comes up fine, and then simply tries to start a `ReplicationConnection` every 30s, giving up after 5 minutes if it can't start one or find an existing one. This will crash, causing the Supervisor to restart us, and then notify us. --- elixir/apps/domain/lib/domain/application.ex | 28 +------- .../domain/events/replication_connection.ex | 40 ++++++----- .../events/replication_connection_manager.ex | 70 +++++++++++++++++++ .../events/replication_connection_test.exs | 21 ++++-- 4 files changed, 108 insertions(+), 51 deletions(-) create mode 100644 elixir/apps/domain/lib/domain/events/replication_connection_manager.ex diff --git a/elixir/apps/domain/lib/domain/application.ex b/elixir/apps/domain/lib/domain/application.ex index 0a1b7082a..68a37dfa7 100644 --- a/elixir/apps/domain/lib/domain/application.ex +++ b/elixir/apps/domain/lib/domain/application.ex @@ -41,9 +41,7 @@ defmodule Domain.Application do # Observability Domain.Telemetry - ] ++ - oban() ++ - replication() + ] ++ oban() ++ replication() end defp configure_logger do @@ -86,31 +84,9 @@ defmodule Domain.Application do config = Application.fetch_env!(:domain, Domain.Events.ReplicationConnection) if config[:enabled] do - [ - replication_child_spec() - ] + [Domain.Events.ReplicationConnectionManager] else [] end end - - defp replication_child_spec do - {connection_opts, config} = - Application.fetch_env!(:domain, Domain.Events.ReplicationConnection) - |> Keyword.pop(:connection_opts) - - init_state = %{ - connection_opts: connection_opts, - instance: struct(Domain.Events.ReplicationConnection, config) - } - - %{ - id: Domain.Events.ReplicationConnection, - start: {Domain.Events.ReplicationConnection, :start_link, [init_state]}, - restart: :transient, - # Allow up to 240 restarts in 20 minutes - covers duration of a deploy - max_restarts: 240, - max_seconds: 1200 - } - end end diff --git a/elixir/apps/domain/lib/domain/events/replication_connection.ex b/elixir/apps/domain/lib/domain/events/replication_connection.ex index 724f3edc3..afb869f88 100644 --- a/elixir/apps/domain/lib/domain/events/replication_connection.ex +++ b/elixir/apps/domain/lib/domain/events/replication_connection.ex @@ -26,6 +26,8 @@ defmodule Domain.Events.ReplicationConnection do alias Domain.Events.Decoder alias Domain.Events.Protocol.{KeepAlive, Write} + @status_log_interval :timer.minutes(5) + @type t :: %__MODULE__{ schema: String.t(), step: @@ -41,7 +43,8 @@ defmodule Domain.Events.ReplicationConnection do output_plugin: String.t(), proto_version: integer(), table_subscriptions: list(), - relations: map() + relations: map(), + counter: integer() } defstruct schema: "public", step: :disconnected, @@ -50,25 +53,14 @@ defmodule Domain.Events.ReplicationConnection do output_plugin: "pgoutput", proto_version: 1, table_subscriptions: [], - relations: %{} + relations: %{}, + counter: 0 def start_link(%{instance: %__MODULE__{} = instance, connection_opts: connection_opts}) do # Start only one ReplicationConnection in the cluster. opts = connection_opts ++ [name: {:global, __MODULE__}] - case(Postgrex.ReplicationConnection.start_link(__MODULE__, instance, opts)) do - {:ok, pid} -> - {:ok, pid} - - {:error, {:already_started, pid}} -> - {:ok, pid} - - error -> - # This is expected in clustered environments. - Logger.info("Failed to start replication connection", error: inspect(error)) - - error - end + Postgrex.ReplicationConnection.start_link(__MODULE__, instance, opts) end @impl true @@ -121,6 +113,9 @@ defmodule Domain.Events.ReplicationConnection do def handle_result([%Postgrex.Result{}], %__MODULE__{step: :start_replication_slot} = state) do Logger.info("Starting replication slot", state: inspect(state)) + # Start logging regular status updates + send(self(), :interval_logger) + query = "START_REPLICATION SLOT \"#{state.replication_slot_name}\" LOGICAL 0/0 (proto_version '#{state.proto_version}', publication_names '#{state.publication_name}')" @@ -193,10 +188,9 @@ defmodule Domain.Events.ReplicationConnection do def handle_data(data, state) when is_write(data) do %Write{message: message} = parse(data) - # TODO: Telemetry: Mark start message |> decode_message() - |> handle_message(state) + |> handle_message(%{state | counter: state.counter + 1}) end def handle_data(data, state) do @@ -273,7 +267,7 @@ defmodule Domain.Events.ReplicationConnection do defp handle_message(%Decoder.Messages.Unsupported{data: data}, state) do Logger.warning("Unsupported message received", data: inspect(data), - state: inspect(state) + counter: state.counter ) {:noreply, [], state} @@ -281,6 +275,14 @@ defmodule Domain.Events.ReplicationConnection do @impl true + def handle_info(:interval_logger, state) do + Logger.info("Processed #{state.counter} write messages from the WAL stream") + + Process.send_after(self(), :interval_logger, @status_log_interval) + + {:noreply, state} + end + def handle_info(:shutdown, _), do: {:disconnect, :normal} def handle_info({:DOWN, _, :process, _, _}, _), do: {:disconnect, :normal} @@ -303,7 +305,7 @@ defmodule Domain.Events.ReplicationConnection do @impl true def handle_disconnect(state) do Logger.info("Replication connection disconnected", - state: inspect(state) + counter: state.counter ) {:noreply, %{state | step: :disconnected}} diff --git a/elixir/apps/domain/lib/domain/events/replication_connection_manager.ex b/elixir/apps/domain/lib/domain/events/replication_connection_manager.ex new file mode 100644 index 000000000..d1541f707 --- /dev/null +++ b/elixir/apps/domain/lib/domain/events/replication_connection_manager.ex @@ -0,0 +1,70 @@ +defmodule Domain.Events.ReplicationConnectionManager do + @moduledoc """ + Manages the Postgrex.ReplicationConnection to ensure that we always have one running to prevent + unbounded growth of the WAL log and ensure we are processing events. + """ + use GenServer + require Logger + + @retry_interval :timer.seconds(30) + + # Should be enough to gracefully handle transient network issues and DB restarts, + # but not too long to avoid broadcasting needed events. + @max_retries 10 + + def start_link(opts) do + GenServer.start_link(__MODULE__, opts, name: __MODULE__) + end + + @impl true + def init(_opts) do + send(self(), :connect) + + {:ok, %{retries: 0}} + end + + @impl true + def handle_info(:connect, %{retries: retries} = state) do + Process.send_after(self(), :connect, @retry_interval) + + case Domain.Events.ReplicationConnection.start_link(replication_child_spec()) do + {:ok, _pid} -> + # Our process won + {:noreply, %{state | retries: 0}} + + {:error, {:already_started, _pid}} -> + # Another process already started the connection + {:noreply, %{state | retries: 0}} + + {:error, reason} -> + if retries < @max_retries do + Logger.info("Failed to start replication connection", + retries: retries, + max_retries: @max_retries, + reason: inspect(reason) + ) + + {:noreply, %{state | retries: retries + 1}} + else + Logger.error( + "Failed to start replication connection after #{@max_retries} attempts, giving up!", + reason: inspect(reason) + ) + + # Let the supervisor restart us + {:stop, :normal, state} + end + end + end + + defp replication_child_spec do + {connection_opts, config} = + Application.fetch_env!(:domain, Domain.Events.ReplicationConnection) + |> Keyword.pop(:connection_opts) + + %{ + connection_opts: connection_opts, + instance: struct(Domain.Events.ReplicationConnection, config) + } + end +end diff --git a/elixir/apps/domain/test/domain/events/replication_connection_test.exs b/elixir/apps/domain/test/domain/events/replication_connection_test.exs index 8b56241f4..f9a1b6ed5 100644 --- a/elixir/apps/domain/test/domain/events/replication_connection_test.exs +++ b/elixir/apps/domain/test/domain/events/replication_connection_test.exs @@ -13,11 +13,12 @@ defmodule Domain.Events.ReplicationConnectionTest do output_plugin: "pgoutput", proto_version: 1, table_subscriptions: ["accounts", "resources"], - relations: %{} + relations: %{}, + counter: 0 } # Used to test live connection - setup_all do + setup do {connection_opts, config} = Application.fetch_env!(:domain, Domain.Events.ReplicationConnection) |> Keyword.pop(:connection_opts) @@ -29,11 +30,17 @@ defmodule Domain.Events.ReplicationConnectionTest do child_spec = %{ id: Domain.Events.ReplicationConnection, - start: {Domain.Events.ReplicationConnection, :start_link, [init_state]}, - restart: :transient + start: {Domain.Events.ReplicationConnection, :start_link, [init_state]} } - {:ok, pid} = start_supervised(child_spec) + {:ok, pid} = + case start_supervised(child_spec) do + {:ok, pid} -> + {:ok, pid} + + {:error, {:already_started, pid}} -> + {:ok, pid} + end {:ok, pid: pid} end @@ -169,7 +176,9 @@ defmodule Domain.Events.ReplicationConnectionTest do write_data = <> - assert {:noreply, [], ^state} = ReplicationConnection.handle_data(write_data, state) + new_state = %{state | counter: state.counter + 1} + + assert {:noreply, [], ^new_state} = ReplicationConnection.handle_data(write_data, state) end test "handle_data handles unknown message" do