diff --git a/elixir/apps/domain/lib/domain/application.ex b/elixir/apps/domain/lib/domain/application.ex index 0a1b7082a..68a37dfa7 100644 --- a/elixir/apps/domain/lib/domain/application.ex +++ b/elixir/apps/domain/lib/domain/application.ex @@ -41,9 +41,7 @@ defmodule Domain.Application do # Observability Domain.Telemetry - ] ++ - oban() ++ - replication() + ] ++ oban() ++ replication() end defp configure_logger do @@ -86,31 +84,9 @@ defmodule Domain.Application do config = Application.fetch_env!(:domain, Domain.Events.ReplicationConnection) if config[:enabled] do - [ - replication_child_spec() - ] + [Domain.Events.ReplicationConnectionManager] else [] end end - - defp replication_child_spec do - {connection_opts, config} = - Application.fetch_env!(:domain, Domain.Events.ReplicationConnection) - |> Keyword.pop(:connection_opts) - - init_state = %{ - connection_opts: connection_opts, - instance: struct(Domain.Events.ReplicationConnection, config) - } - - %{ - id: Domain.Events.ReplicationConnection, - start: {Domain.Events.ReplicationConnection, :start_link, [init_state]}, - restart: :transient, - # Allow up to 240 restarts in 20 minutes - covers duration of a deploy - max_restarts: 240, - max_seconds: 1200 - } - end end diff --git a/elixir/apps/domain/lib/domain/events/replication_connection.ex b/elixir/apps/domain/lib/domain/events/replication_connection.ex index 724f3edc3..afb869f88 100644 --- a/elixir/apps/domain/lib/domain/events/replication_connection.ex +++ b/elixir/apps/domain/lib/domain/events/replication_connection.ex @@ -26,6 +26,8 @@ defmodule Domain.Events.ReplicationConnection do alias Domain.Events.Decoder alias Domain.Events.Protocol.{KeepAlive, Write} + @status_log_interval :timer.minutes(5) + @type t :: %__MODULE__{ schema: String.t(), step: @@ -41,7 +43,8 @@ defmodule Domain.Events.ReplicationConnection do output_plugin: String.t(), proto_version: integer(), table_subscriptions: list(), - relations: map() + relations: map(), + counter: integer() } defstruct schema: "public", step: :disconnected, @@ -50,25 +53,14 @@ defmodule Domain.Events.ReplicationConnection do output_plugin: "pgoutput", proto_version: 1, table_subscriptions: [], - relations: %{} + relations: %{}, + counter: 0 def start_link(%{instance: %__MODULE__{} = instance, connection_opts: connection_opts}) do # Start only one ReplicationConnection in the cluster. opts = connection_opts ++ [name: {:global, __MODULE__}] - case(Postgrex.ReplicationConnection.start_link(__MODULE__, instance, opts)) do - {:ok, pid} -> - {:ok, pid} - - {:error, {:already_started, pid}} -> - {:ok, pid} - - error -> - # This is expected in clustered environments. - Logger.info("Failed to start replication connection", error: inspect(error)) - - error - end + Postgrex.ReplicationConnection.start_link(__MODULE__, instance, opts) end @impl true @@ -121,6 +113,9 @@ defmodule Domain.Events.ReplicationConnection do def handle_result([%Postgrex.Result{}], %__MODULE__{step: :start_replication_slot} = state) do Logger.info("Starting replication slot", state: inspect(state)) + # Start logging regular status updates + send(self(), :interval_logger) + query = "START_REPLICATION SLOT \"#{state.replication_slot_name}\" LOGICAL 0/0 (proto_version '#{state.proto_version}', publication_names '#{state.publication_name}')" @@ -193,10 +188,9 @@ defmodule Domain.Events.ReplicationConnection do def handle_data(data, state) when is_write(data) do %Write{message: message} = parse(data) - # TODO: Telemetry: Mark start message |> decode_message() - |> handle_message(state) + |> handle_message(%{state | counter: state.counter + 1}) end def handle_data(data, state) do @@ -273,7 +267,7 @@ defmodule Domain.Events.ReplicationConnection do defp handle_message(%Decoder.Messages.Unsupported{data: data}, state) do Logger.warning("Unsupported message received", data: inspect(data), - state: inspect(state) + counter: state.counter ) {:noreply, [], state} @@ -281,6 +275,14 @@ defmodule Domain.Events.ReplicationConnection do @impl true + def handle_info(:interval_logger, state) do + Logger.info("Processed #{state.counter} write messages from the WAL stream") + + Process.send_after(self(), :interval_logger, @status_log_interval) + + {:noreply, state} + end + def handle_info(:shutdown, _), do: {:disconnect, :normal} def handle_info({:DOWN, _, :process, _, _}, _), do: {:disconnect, :normal} @@ -303,7 +305,7 @@ defmodule Domain.Events.ReplicationConnection do @impl true def handle_disconnect(state) do Logger.info("Replication connection disconnected", - state: inspect(state) + counter: state.counter ) {:noreply, %{state | step: :disconnected}} diff --git a/elixir/apps/domain/lib/domain/events/replication_connection_manager.ex b/elixir/apps/domain/lib/domain/events/replication_connection_manager.ex new file mode 100644 index 000000000..d1541f707 --- /dev/null +++ b/elixir/apps/domain/lib/domain/events/replication_connection_manager.ex @@ -0,0 +1,70 @@ +defmodule Domain.Events.ReplicationConnectionManager do + @moduledoc """ + Manages the Postgrex.ReplicationConnection to ensure that we always have one running to prevent + unbounded growth of the WAL log and ensure we are processing events. + """ + use GenServer + require Logger + + @retry_interval :timer.seconds(30) + + # Should be enough to gracefully handle transient network issues and DB restarts, + # but not too long to avoid broadcasting needed events. + @max_retries 10 + + def start_link(opts) do + GenServer.start_link(__MODULE__, opts, name: __MODULE__) + end + + @impl true + def init(_opts) do + send(self(), :connect) + + {:ok, %{retries: 0}} + end + + @impl true + def handle_info(:connect, %{retries: retries} = state) do + Process.send_after(self(), :connect, @retry_interval) + + case Domain.Events.ReplicationConnection.start_link(replication_child_spec()) do + {:ok, _pid} -> + # Our process won + {:noreply, %{state | retries: 0}} + + {:error, {:already_started, _pid}} -> + # Another process already started the connection + {:noreply, %{state | retries: 0}} + + {:error, reason} -> + if retries < @max_retries do + Logger.info("Failed to start replication connection", + retries: retries, + max_retries: @max_retries, + reason: inspect(reason) + ) + + {:noreply, %{state | retries: retries + 1}} + else + Logger.error( + "Failed to start replication connection after #{@max_retries} attempts, giving up!", + reason: inspect(reason) + ) + + # Let the supervisor restart us + {:stop, :normal, state} + end + end + end + + defp replication_child_spec do + {connection_opts, config} = + Application.fetch_env!(:domain, Domain.Events.ReplicationConnection) + |> Keyword.pop(:connection_opts) + + %{ + connection_opts: connection_opts, + instance: struct(Domain.Events.ReplicationConnection, config) + } + end +end diff --git a/elixir/apps/domain/test/domain/events/replication_connection_test.exs b/elixir/apps/domain/test/domain/events/replication_connection_test.exs index 8b56241f4..f9a1b6ed5 100644 --- a/elixir/apps/domain/test/domain/events/replication_connection_test.exs +++ b/elixir/apps/domain/test/domain/events/replication_connection_test.exs @@ -13,11 +13,12 @@ defmodule Domain.Events.ReplicationConnectionTest do output_plugin: "pgoutput", proto_version: 1, table_subscriptions: ["accounts", "resources"], - relations: %{} + relations: %{}, + counter: 0 } # Used to test live connection - setup_all do + setup do {connection_opts, config} = Application.fetch_env!(:domain, Domain.Events.ReplicationConnection) |> Keyword.pop(:connection_opts) @@ -29,11 +30,17 @@ defmodule Domain.Events.ReplicationConnectionTest do child_spec = %{ id: Domain.Events.ReplicationConnection, - start: {Domain.Events.ReplicationConnection, :start_link, [init_state]}, - restart: :transient + start: {Domain.Events.ReplicationConnection, :start_link, [init_state]} } - {:ok, pid} = start_supervised(child_spec) + {:ok, pid} = + case start_supervised(child_spec) do + {:ok, pid} -> + {:ok, pid} + + {:error, {:already_started, pid}} -> + {:ok, pid} + end {:ok, pid: pid} end @@ -169,7 +176,9 @@ defmodule Domain.Events.ReplicationConnectionTest do write_data = <> - assert {:noreply, [], ^state} = ReplicationConnection.handle_data(write_data, state) + new_state = %{state | counter: state.counter + 1} + + assert {:noreply, [], ^new_state} = ReplicationConnection.handle_data(write_data, state) end test "handle_data handles unknown message" do