fix(portal): Add ReplicationConnectionManager (#9158)

When deploying, new Elixir nodes are spun up before the old ones are
brought down. This ensures that the state in the cluster is merge into
the new nodes between the old ones go away.

This also means, however, that the existing WAL consumer is still up
when our new one tries to come online.

Normally, this isn't an issue, because we find the old pid and return it
with `{:ok, existing_pid}`. When that VM goes away, the Supervisor(s) of
the new application notice and restart it,

However, if the cluster state diverges or is inconsistent during this
period, we may fail to find the existing pid, and try to start a new
ReplicationConnection. If the old pid is still active, this will fail
because there's a mutex on the connection. The original implementation
was designed to handle this case using the Supervisor with a
`:transient` restart policy.

What the author failed to understand is that the restart policy applies
only to _restarts_ and not initial starts, so if all of the new
application servers fail to find the old pid which is still connected,
and they all fail to come up, we won't consume the WAL.

This is fixed with a `ReplicationConnectionManager` that always comes up
fine, and then simply tries to start a `ReplicationConnection` every
30s, giving up after 5 minutes if it can't start one or find an existing
one. This will crash, causing the Supervisor to restart us, and then
notify us.
This commit is contained in:
Jamil
2025-05-16 12:54:27 -07:00
committed by GitHub
parent 65c58ee254
commit f07aa8aa3a
4 changed files with 108 additions and 51 deletions

View File

@@ -41,9 +41,7 @@ defmodule Domain.Application do
# Observability
Domain.Telemetry
] ++
oban() ++
replication()
] ++ oban() ++ replication()
end
defp configure_logger do
@@ -86,31 +84,9 @@ defmodule Domain.Application do
config = Application.fetch_env!(:domain, Domain.Events.ReplicationConnection)
if config[:enabled] do
[
replication_child_spec()
]
[Domain.Events.ReplicationConnectionManager]
else
[]
end
end
defp replication_child_spec do
{connection_opts, config} =
Application.fetch_env!(:domain, Domain.Events.ReplicationConnection)
|> Keyword.pop(:connection_opts)
init_state = %{
connection_opts: connection_opts,
instance: struct(Domain.Events.ReplicationConnection, config)
}
%{
id: Domain.Events.ReplicationConnection,
start: {Domain.Events.ReplicationConnection, :start_link, [init_state]},
restart: :transient,
# Allow up to 240 restarts in 20 minutes - covers duration of a deploy
max_restarts: 240,
max_seconds: 1200
}
end
end

View File

@@ -26,6 +26,8 @@ defmodule Domain.Events.ReplicationConnection do
alias Domain.Events.Decoder
alias Domain.Events.Protocol.{KeepAlive, Write}
@status_log_interval :timer.minutes(5)
@type t :: %__MODULE__{
schema: String.t(),
step:
@@ -41,7 +43,8 @@ defmodule Domain.Events.ReplicationConnection do
output_plugin: String.t(),
proto_version: integer(),
table_subscriptions: list(),
relations: map()
relations: map(),
counter: integer()
}
defstruct schema: "public",
step: :disconnected,
@@ -50,25 +53,14 @@ defmodule Domain.Events.ReplicationConnection do
output_plugin: "pgoutput",
proto_version: 1,
table_subscriptions: [],
relations: %{}
relations: %{},
counter: 0
def start_link(%{instance: %__MODULE__{} = instance, connection_opts: connection_opts}) do
# Start only one ReplicationConnection in the cluster.
opts = connection_opts ++ [name: {:global, __MODULE__}]
case(Postgrex.ReplicationConnection.start_link(__MODULE__, instance, opts)) do
{:ok, pid} ->
{:ok, pid}
{:error, {:already_started, pid}} ->
{:ok, pid}
error ->
# This is expected in clustered environments.
Logger.info("Failed to start replication connection", error: inspect(error))
error
end
Postgrex.ReplicationConnection.start_link(__MODULE__, instance, opts)
end
@impl true
@@ -121,6 +113,9 @@ defmodule Domain.Events.ReplicationConnection do
def handle_result([%Postgrex.Result{}], %__MODULE__{step: :start_replication_slot} = state) do
Logger.info("Starting replication slot", state: inspect(state))
# Start logging regular status updates
send(self(), :interval_logger)
query =
"START_REPLICATION SLOT \"#{state.replication_slot_name}\" LOGICAL 0/0 (proto_version '#{state.proto_version}', publication_names '#{state.publication_name}')"
@@ -193,10 +188,9 @@ defmodule Domain.Events.ReplicationConnection do
def handle_data(data, state) when is_write(data) do
%Write{message: message} = parse(data)
# TODO: Telemetry: Mark start
message
|> decode_message()
|> handle_message(state)
|> handle_message(%{state | counter: state.counter + 1})
end
def handle_data(data, state) do
@@ -273,7 +267,7 @@ defmodule Domain.Events.ReplicationConnection do
defp handle_message(%Decoder.Messages.Unsupported{data: data}, state) do
Logger.warning("Unsupported message received",
data: inspect(data),
state: inspect(state)
counter: state.counter
)
{:noreply, [], state}
@@ -281,6 +275,14 @@ defmodule Domain.Events.ReplicationConnection do
@impl true
def handle_info(:interval_logger, state) do
Logger.info("Processed #{state.counter} write messages from the WAL stream")
Process.send_after(self(), :interval_logger, @status_log_interval)
{:noreply, state}
end
def handle_info(:shutdown, _), do: {:disconnect, :normal}
def handle_info({:DOWN, _, :process, _, _}, _), do: {:disconnect, :normal}
@@ -303,7 +305,7 @@ defmodule Domain.Events.ReplicationConnection do
@impl true
def handle_disconnect(state) do
Logger.info("Replication connection disconnected",
state: inspect(state)
counter: state.counter
)
{:noreply, %{state | step: :disconnected}}

View File

@@ -0,0 +1,70 @@
defmodule Domain.Events.ReplicationConnectionManager do
@moduledoc """
Manages the Postgrex.ReplicationConnection to ensure that we always have one running to prevent
unbounded growth of the WAL log and ensure we are processing events.
"""
use GenServer
require Logger
@retry_interval :timer.seconds(30)
# Should be enough to gracefully handle transient network issues and DB restarts,
# but not too long to avoid broadcasting needed events.
@max_retries 10
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
@impl true
def init(_opts) do
send(self(), :connect)
{:ok, %{retries: 0}}
end
@impl true
def handle_info(:connect, %{retries: retries} = state) do
Process.send_after(self(), :connect, @retry_interval)
case Domain.Events.ReplicationConnection.start_link(replication_child_spec()) do
{:ok, _pid} ->
# Our process won
{:noreply, %{state | retries: 0}}
{:error, {:already_started, _pid}} ->
# Another process already started the connection
{:noreply, %{state | retries: 0}}
{:error, reason} ->
if retries < @max_retries do
Logger.info("Failed to start replication connection",
retries: retries,
max_retries: @max_retries,
reason: inspect(reason)
)
{:noreply, %{state | retries: retries + 1}}
else
Logger.error(
"Failed to start replication connection after #{@max_retries} attempts, giving up!",
reason: inspect(reason)
)
# Let the supervisor restart us
{:stop, :normal, state}
end
end
end
defp replication_child_spec do
{connection_opts, config} =
Application.fetch_env!(:domain, Domain.Events.ReplicationConnection)
|> Keyword.pop(:connection_opts)
%{
connection_opts: connection_opts,
instance: struct(Domain.Events.ReplicationConnection, config)
}
end
end

View File

@@ -13,11 +13,12 @@ defmodule Domain.Events.ReplicationConnectionTest do
output_plugin: "pgoutput",
proto_version: 1,
table_subscriptions: ["accounts", "resources"],
relations: %{}
relations: %{},
counter: 0
}
# Used to test live connection
setup_all do
setup do
{connection_opts, config} =
Application.fetch_env!(:domain, Domain.Events.ReplicationConnection)
|> Keyword.pop(:connection_opts)
@@ -29,11 +30,17 @@ defmodule Domain.Events.ReplicationConnectionTest do
child_spec = %{
id: Domain.Events.ReplicationConnection,
start: {Domain.Events.ReplicationConnection, :start_link, [init_state]},
restart: :transient
start: {Domain.Events.ReplicationConnection, :start_link, [init_state]}
}
{:ok, pid} = start_supervised(child_spec)
{:ok, pid} =
case start_supervised(child_spec) do
{:ok, pid} ->
{:ok, pid}
{:error, {:already_started, pid}} ->
{:ok, pid}
end
{:ok, pid: pid}
end
@@ -169,7 +176,9 @@ defmodule Domain.Events.ReplicationConnectionTest do
write_data =
<<?w, server_wal_start::64, server_wal_end::64, server_system_clock::64, message::binary>>
assert {:noreply, [], ^state} = ReplicationConnection.handle_data(write_data, state)
new_state = %{state | counter: state.counter + 1}
assert {:noreply, [], ^new_state} = ReplicationConnection.handle_data(write_data, state)
end
test "handle_data handles unknown message" do