feat(portal): add change_logs table and insert data (#9553)

Building on the WAL consumer that's been in development over the past
several weeks, we introduce a new `change_logs` table that stores very
lightly up-fitted data decoded from the WAL:

- `account_id` (indexed): a foreign key reference to an account.
- `inserted_at` (indexed): the timestamp of insert, for truncating rows
later.
- `table`: the table where the op took place.
- `op`: the operation performed (insert/update/delete)
- `old_data`: a nullable map of the old row data (update/delete)
- `data`: a nullable map of the new row data(insert/update)
- `vsn`: an integer version field we can bump to signify schema changes
in the data in case we need to apply operations to only new or only old
data.

Judging from our prod metrics, we're currently average about 1,000 write
operations a minute, which will generate about 1-2 dozen changelogs / s.
Doing the math on this, 30 days at our current volume will yield about
50M / month, which should be ok for some time, since this is an
append-only, rarely (if ever) read from table.

The one aspect of this we may need to handle sooner than later is
batch-inserting these. That raises an issue though - currently, in this
PR, we process each WAL event serially, ending with the final
acknowledgement `:ok` which will signal to Postgres our status in
processing the WAL.

If we do anything async here, this processing "cursor" then becomes
inaccurate, so we may need to think about what to track and what data we
care about.

Related: #7124
This commit is contained in:
Jamil
2025-06-24 19:06:20 -07:00
committed by GitHub
parent 2b154d88bf
commit a9f49629ae
27 changed files with 1815 additions and 1388 deletions

View File

@@ -81,12 +81,25 @@ defmodule Domain.Application do
end
defp replication do
config = Application.fetch_env!(:domain, Domain.Events.ReplicationConnection)
connection_modules = [
Domain.Events.ReplicationConnection,
Domain.ChangeLogs.ReplicationConnection
]
if config[:enabled] do
[Domain.Events.ReplicationConnectionManager]
else
[]
end
# Filter out disabled replication connections
Enum.reduce(connection_modules, [], fn module, enabled ->
config = Application.fetch_env!(:domain, module)
if config[:enabled] do
spec = %{
id: module,
start: {Domain.Replication.Manager, :start_link, [module, []]}
}
[spec | enabled]
else
enabled
end
end)
end
end

View File

@@ -0,0 +1,10 @@
defmodule Domain.ChangeLogs do
alias Domain.ChangeLogs.ChangeLog
alias Domain.Repo
def create_change_log(attrs) do
attrs
|> ChangeLog.Changeset.changeset()
|> Repo.insert()
end
end

View File

@@ -0,0 +1,16 @@
defmodule Domain.ChangeLogs.ChangeLog do
use Domain, :schema
schema "change_logs" do
belongs_to :account, Domain.Accounts.Account
field :lsn, :integer
field :table, :string
field :op, Ecto.Enum, values: [:insert, :update, :delete]
field :old_data, :map
field :data, :map
field :vsn, :integer
timestamps(type: :utc_datetime_usec, updated_at: false)
end
end

View File

@@ -0,0 +1,80 @@
defmodule Domain.ChangeLogs.ChangeLog.Changeset do
use Domain, :changeset
@fields ~w[account_id lsn table op old_data data vsn]a
def changeset(attrs) do
%Domain.ChangeLogs.ChangeLog{}
|> cast(attrs, @fields)
|> validate_inclusion(:op, [:insert, :update, :delete])
|> validate_correct_data_present()
|> validate_same_account()
|> put_account_id()
|> validate_required([:account_id, :lsn, :table, :op, :vsn])
|> foreign_key_constraint(:account_id, name: :change_logs_account_id_fkey)
end
# :insert requires old_data = nil and data != nil
# :update requires old_data != nil and data != nil
# :delete requires old_data != nil and data = nil
def validate_correct_data_present(changeset) do
op = get_field(changeset, :op)
old_data = get_field(changeset, :old_data)
data = get_field(changeset, :data)
case {op, old_data, data} do
{:insert, nil, %{} = _data} ->
changeset
{:update, %{} = _old_data, %{} = _data} ->
changeset
{:delete, %{} = _old_data, nil} ->
changeset
_ ->
add_error(changeset, :base, "Invalid combination of operation and data")
end
end
# Add an error if data["account_id"] != old_data["account_id"]
defp validate_same_account(changeset) do
old_data = get_field(changeset, :old_data)
data = get_field(changeset, :data)
account_id_key = account_id_field(changeset)
if old_data && data && old_data[account_id_key] != data[account_id_key] do
add_error(changeset, :base, "Account ID cannot be changed")
else
changeset
end
end
# Populate account_id from one of data, old_data
defp put_account_id(changeset) do
old_data = get_field(changeset, :old_data)
data = get_field(changeset, :data)
account_id_key = account_id_field(changeset)
account_id =
case {old_data, data} do
{nil, nil} -> nil
{_, %{^account_id_key => id}} -> id
{%{^account_id_key => id}, _} -> id
_ -> nil
end
put_change(changeset, :account_id, account_id)
end
# For accounts table updates, the account_id is in the "id" field
# For other tables, it is in the "account_id" field
defp account_id_field(changeset) do
case get_field(changeset, :table) do
"accounts" -> "id"
_ -> "account_id"
end
end
end

View File

@@ -0,0 +1,82 @@
defmodule Domain.ChangeLogs.ReplicationConnection do
alias Domain.ChangeLogs
use Domain.Replication.Connection,
# Allow up to 30 seconds of processing lag before alerting - this should be less
# than or equal to the wal_sender_timeout setting, which is typically 60s.
alert_threshold_ms: 30_000,
publication_name: "change_logs"
# Bump this to signify a change in the audit log schema. Use with care.
@vsn 0
def on_insert(lsn, table, data) do
log(:insert, lsn, table, nil, data)
end
def on_update(lsn, table, old_data, data) do
log(:update, lsn, table, old_data, data)
end
def on_delete(lsn, table, old_data) do
log(:delete, lsn, table, old_data, nil)
end
# Relay group tokens don't have account_ids
defp log(_op, _lsn, "tokens", %{"type" => "relay_group"}, _data) do
:ok
end
defp log(_op, _lsn, "tokens", _old_data, %{"type" => "relay_group"}) do
:ok
end
defp log(_op, _lsn, "flows", _old_data, _data) do
# TODO: WAL
# Flows are not logged to the change log as they are used only to trigger side effects which
# will be removed. Remove the flows table publication when that happens.
:ok
end
defp log(op, lsn, table, old_data, data) do
attrs = %{
op: op,
lsn: lsn,
table: table,
old_data: old_data,
data: data,
vsn: @vsn
}
case ChangeLogs.create_change_log(attrs) do
{:ok, _change_log} ->
:ok
{:error, %Ecto.Changeset{errors: errors} = changeset} ->
if foreign_key_error?(errors) do
# Expected under normal operation when an account is deleted
:ok
else
Logger.warning("Failed to create change log",
errors: inspect(changeset.errors),
table: table,
op: op,
lsn: lsn,
vsn: @vsn
)
# TODO: WAL
# Don't ignore failures to insert change logs. Improve this after we have some
# operational experience with the data flowing in here.
:ok
end
end
end
defp foreign_key_error?(errors) do
Enum.any?(errors, fn {field, {message, _}} ->
field == :account_id and message == "does not exist"
end)
end
end

View File

@@ -1,329 +0,0 @@
defmodule Domain.Events.Event do
alias Domain.Events.Decoder
alias Domain.Events.Hooks
require Logger
@doc """
Ingests a WAL write message from Postgres, transforms it into an event, and sends
it to the appropriate hook module for processing.
"""
def ingest(msg, relations) do
{op, old_tuple_data, tuple_data} = extract_msg_data(msg)
{:ok, relation} = Map.fetch(relations, msg.relation_id)
table = relation.name
old_data = zip(old_tuple_data, relation.columns)
data = zip(tuple_data, relation.columns)
process(op, table, old_data, data)
# TODO: WAL
# This is only for load testing. Remove this.
Domain.PubSub.broadcast("events", {op, table, old_data, data})
end
############
# accounts #
############
defp process(:insert, "accounts", _old_data, data) do
Hooks.Accounts.on_insert(data)
end
defp process(:update, "accounts", old_data, data) do
Hooks.Accounts.on_update(old_data, data)
end
defp process(:delete, "accounts", old_data, _data) do
Hooks.Accounts.on_delete(old_data)
end
###########################
# actor_group_memberships #
###########################
defp process(:insert, "actor_group_memberships", _old_data, data) do
Hooks.ActorGroupMemberships.on_insert(data)
end
defp process(:update, "actor_group_memberships", old_data, data) do
Hooks.ActorGroupMemberships.on_update(old_data, data)
end
defp process(:delete, "actor_group_memberships", old_data, _data) do
Hooks.ActorGroupMemberships.on_delete(old_data)
end
################
# actor_groups #
################
defp process(:insert, "actor_groups", _old_data, data) do
Hooks.ActorGroups.on_insert(data)
end
defp process(:update, "actor_groups", old_data, data) do
Hooks.ActorGroups.on_update(old_data, data)
end
defp process(:delete, "actor_groups", old_data, _data) do
Hooks.ActorGroups.on_delete(old_data)
end
##########
# actors #
##########
defp process(:insert, "actors", _old_data, data) do
Hooks.Actors.on_insert(data)
end
defp process(:update, "actors", old_data, data) do
Hooks.Actors.on_update(old_data, data)
end
defp process(:delete, "actors", old_data, _data) do
Hooks.Actors.on_delete(old_data)
end
###################
# auth_identities #
###################
defp process(:insert, "auth_identities", _old_data, data) do
Hooks.AuthIdentities.on_insert(data)
end
defp process(:update, "auth_identities", old_data, data) do
Hooks.AuthIdentities.on_update(old_data, data)
end
defp process(:delete, "auth_identities", old_data, _data) do
Hooks.AuthIdentities.on_delete(old_data)
end
##################
# auth_providers #
##################
defp process(:insert, "auth_providers", _old_data, data) do
Hooks.AuthProviders.on_insert(data)
end
defp process(:update, "auth_providers", old_data, data) do
Hooks.AuthProviders.on_update(old_data, data)
end
defp process(:delete, "auth_providers", old_data, _data) do
Hooks.AuthProviders.on_delete(old_data)
end
###########
# clients #
###########
defp process(:insert, "clients", _old_data, data) do
Hooks.Clients.on_insert(data)
end
defp process(:update, "clients", old_data, data) do
Hooks.Clients.on_update(old_data, data)
end
defp process(:delete, "clients", old_data, _data) do
Hooks.Clients.on_delete(old_data)
end
###################
# flow_activities #
###################
defp process(:insert, "flow_activities", _old_data, data) do
Hooks.FlowActivities.on_insert(data)
end
defp process(:update, "flow_activities", old_data, data) do
Hooks.FlowActivities.on_update(old_data, data)
end
defp process(:delete, "flow_activities", old_data, _data) do
Hooks.FlowActivities.on_delete(old_data)
end
#########
# flows #
#########
defp process(:insert, "flows", _old_data, data) do
Hooks.Flows.on_insert(data)
end
defp process(:update, "flows", old_data, data) do
Hooks.Flows.on_update(old_data, data)
end
defp process(:delete, "flows", old_data, _data) do
Hooks.Flows.on_delete(old_data)
end
##################
# gateway_groups #
##################
defp process(:insert, "gateway_groups", _old_data, data) do
Hooks.GatewayGroups.on_insert(data)
end
defp process(:update, "gateway_groups", old_data, data) do
Hooks.GatewayGroups.on_update(old_data, data)
end
defp process(:delete, "gateway_groups", old_data, _data) do
Hooks.GatewayGroups.on_delete(old_data)
end
############
# gateways #
############
defp process(:insert, "gateways", _old_data, data) do
Hooks.Gateways.on_insert(data)
end
defp process(:update, "gateways", old_data, data) do
Hooks.Gateways.on_update(old_data, data)
end
defp process(:delete, "gateways", old_data, _data) do
Hooks.Gateways.on_delete(old_data)
end
############
# policies #
############
defp process(:insert, "policies", _old_data, data) do
Hooks.Policies.on_insert(data)
end
defp process(:update, "policies", old_data, data) do
Hooks.Policies.on_update(old_data, data)
end
defp process(:delete, "policies", old_data, _data) do
Hooks.Policies.on_delete(old_data)
end
################
# relay_groups #
################
defp process(:insert, "relay_groups", _old_data, data) do
Hooks.RelayGroups.on_insert(data)
end
defp process(:update, "relay_groups", old_data, data) do
Hooks.RelayGroups.on_update(old_data, data)
end
defp process(:delete, "relay_groups", old_data, _data) do
Hooks.RelayGroups.on_delete(old_data)
end
##########
# relays #
##########
defp process(:insert, "relays", _old_data, data) do
Hooks.Relays.on_insert(data)
end
defp process(:update, "relays", old_data, data) do
Hooks.Relays.on_update(old_data, data)
end
defp process(:delete, "relays", old_data, _data) do
Hooks.Relays.on_delete(old_data)
end
########################
# resource_connections #
########################
defp process(:insert, "resource_connections", _old_data, data) do
Hooks.ResourceConnections.on_insert(data)
end
defp process(:update, "resource_connections", old_data, data) do
Hooks.ResourceConnections.on_update(old_data, data)
end
defp process(:delete, "resource_connections", old_data, _data) do
Hooks.ResourceConnections.on_delete(old_data)
end
#############
# resources #
#############
defp process(:insert, "resources", _old_data, data) do
Hooks.Resources.on_insert(data)
end
defp process(:update, "resources", old_data, data) do
Hooks.Resources.on_update(old_data, data)
end
defp process(:delete, "resources", old_data, _data) do
Hooks.Resources.on_delete(old_data)
end
##########
# tokens #
##########
defp process(:insert, "tokens", _old_data, data) do
Hooks.Tokens.on_insert(data)
end
defp process(:update, "tokens", old_data, data) do
Hooks.Tokens.on_update(old_data, data)
end
defp process(:delete, "tokens", old_data, _data) do
Hooks.Tokens.on_delete(old_data)
end
#############
# CATCH-ALL #
#############
defp process(op, table, _old_data, _data) do
Logger.warning("Unhandled event type!", op: op, table: table)
:ok
end
defp extract_msg_data(%Decoder.Messages.Insert{tuple_data: data}) do
{:insert, nil, data}
end
defp extract_msg_data(%Decoder.Messages.Update{old_tuple_data: old, tuple_data: data}) do
{:update, old, data}
end
defp extract_msg_data(%Decoder.Messages.Delete{old_tuple_data: old}) do
{:delete, old, nil}
end
defp zip(nil, _), do: nil
defp zip(tuple_data, columns) do
tuple_data
|> Tuple.to_list()
|> Enum.zip(columns)
|> Map.new(fn {value, column} -> {column.name, value} end)
|> Enum.into(%{})
end
end

View File

@@ -1,3 +1,5 @@
# TODO: WAL
# Move side-effects from flows to state table in clients and gateways
defmodule Domain.Events.Hooks.Flows do
@behaviour Domain.Events.Hooks
alias Domain.PubSub

View File

@@ -1,313 +1,67 @@
# CREDIT: https://github.com/supabase/realtime/blob/main/lib/realtime/tenants/replication_connection.ex
defmodule Domain.Events.ReplicationConnection do
@moduledoc """
Receives WAL events from PostgreSQL and broadcasts them where they need to go.
alias Domain.Events.Hooks
Generally, we only want to start one of these connections per cluster in order
to obtain a serial stream of the WAL. We can then fanout these events to the
appropriate consumers.
use Domain.Replication.Connection,
# Allow up to 5 seconds of lag before alerting
alert_threshold_ms: 5_000,
publication_name: "events"
The ReplicationConnection is started with a durable slot so that whatever data we
fail to acknowledge is retained in the slot on the server's disk. The server will
then send us the data when we reconnect. This is important because we want to
ensure that we don't lose any WAL data if we disconnect or crash, such as during a deploy.
The WAL data we receive is sent only once a COMMIT completes on the server. So even though
COMMIT is one of the message types we receive here, we can safely ignore it and process
insert/update/delete messages one-by-one in this module as we receive them.
"""
use Postgrex.ReplicationConnection
require Logger
import Domain.Events.Protocol
import Domain.Events.Decoder
@tables_to_hooks %{
"accounts" => Hooks.Accounts,
"actor_group_memberships" => Hooks.ActorGroupMemberships,
"actor_groups" => Hooks.ActorGroups,
"actors" => Hooks.Actors,
"auth_identities" => Hooks.AuthIdentities,
"auth_providers" => Hooks.AuthProviders,
"clients" => Hooks.Clients,
"flow_activities" => Hooks.FlowActivities,
"flows" => Hooks.Flows,
"gateway_groups" => Hooks.GatewayGroups,
"gateways" => Hooks.Gateways,
"policies" => Hooks.Policies,
"resource_connections" => Hooks.ResourceConnections,
"resources" => Hooks.Resources,
"tokens" => Hooks.Tokens
}
alias Domain.Events.Event
alias Domain.Events.Decoder
alias Domain.Events.Protocol.{KeepAlive, Write}
def on_insert(_lsn, table, data) do
hook = Map.get(@tables_to_hooks, table)
@status_log_interval :timer.minutes(5)
@type t :: %__MODULE__{
schema: String.t(),
step:
:disconnected
| :check_publication
| :create_publication
| :check_replication_slot
| :create_slot
| :start_replication_slot
| :streaming,
publication_name: String.t(),
replication_slot_name: String.t(),
output_plugin: String.t(),
proto_version: integer(),
table_subscriptions: list(),
relations: map(),
counter: integer()
}
defstruct schema: "public",
step: :disconnected,
publication_name: "events",
replication_slot_name: "events_slot",
output_plugin: "pgoutput",
proto_version: 1,
table_subscriptions: [],
relations: %{},
counter: 0
def start_link(%{instance: %__MODULE__{} = instance, connection_opts: connection_opts}) do
# Start only one ReplicationConnection in the cluster.
opts = connection_opts ++ [name: {:global, __MODULE__}]
Postgrex.ReplicationConnection.start_link(__MODULE__, instance, opts)
if hook do
hook.on_insert(data)
else
log_warning(:insert, table)
:ok
end
end
@impl true
def init(state) do
{:ok, state}
def on_update(_lsn, table, old_data, data) do
hook = Map.get(@tables_to_hooks, table)
if hook do
hook.on_update(old_data, data)
else
log_warning(:update, table)
:ok
end
end
@doc """
Called when we make a successful connection to the PostgreSQL server.
"""
def on_delete(_lsn, table, old_data) do
hook = Map.get(@tables_to_hooks, table)
@impl true
def handle_connect(state) do
query = "SELECT 1 FROM pg_publication WHERE pubname = '#{state.publication_name}'"
{:query, query, %{state | step: :create_publication}}
if hook do
hook.on_delete(old_data)
else
log_warning(:delete, table)
:ok
end
end
@doc """
Generic callback that handles replies to the queries we send.
We use a simple state machine to issue queries one at a time to Postgres in order to:
1. Check if the publication exists
2. Check if the replication slot exists
3. Create the publication if it doesn't exist
4. Create the replication slot if it doesn't exist
5. Start the replication slot
6. Start streaming data from the replication slot
"""
@impl true
def handle_result(
[%Postgrex.Result{num_rows: 1}],
%__MODULE__{step: :create_publication} = state
) do
query =
"SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'"
{:query, query, %{state | step: :create_replication_slot}}
end
def handle_result(
[%Postgrex.Result{num_rows: 1}],
%__MODULE__{step: :create_replication_slot} = state
) do
{:query, "SELECT 1", %{state | step: :start_replication_slot}}
end
def handle_result([%Postgrex.Result{}], %__MODULE__{step: :start_replication_slot} = state) do
Logger.info("Starting replication slot", state: inspect(state))
# Start logging regular status updates
send(self(), :interval_logger)
query =
"START_REPLICATION SLOT \"#{state.replication_slot_name}\" LOGICAL 0/0 (proto_version '#{state.proto_version}', publication_names '#{state.publication_name}')"
{:stream, query, [], %{state | step: :streaming}}
end
def handle_result(
[%Postgrex.Result{num_rows: 0}],
%__MODULE__{step: :create_publication} = state
) do
tables =
state.table_subscriptions
|> Enum.map_join(",", fn table -> "#{state.schema}.#{table}" end)
query = "CREATE PUBLICATION #{state.publication_name} FOR TABLE #{tables}"
{:query, query, %{state | step: :check_replication_slot}}
end
def handle_result([%Postgrex.Result{}], %__MODULE__{step: :check_replication_slot} = state) do
query =
"SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'"
{:query, query, %{state | step: :create_replication_slot}}
end
def handle_result(
[%Postgrex.Result{num_rows: 0}],
%__MODULE__{step: :create_replication_slot} = state
) do
query =
"CREATE_REPLICATION_SLOT #{state.replication_slot_name} LOGICAL #{state.output_plugin} NOEXPORT_SNAPSHOT"
{:query, query, %{state | step: :start_replication_slot}}
end
@doc """
Called when we receive a message from the PostgreSQL server.
We handle the following messages:
1. KeepAlive: A message sent by PostgreSQL to keep the connection alive and also acknowledge
processed data by responding with the current WAL position.
2. Write: A message containing a WAL event - the actual data we are interested in.
3. Unknown: Any other message that we don't know how to handle - we log and ignore it.
For the KeepAlive message, we respond immediately with the current WAL position. Note: it is expected
that we receive many more of these messages than expected. That is because the rate at which the server
sends these messages scales proportionally to the number of Write messages it sends.
For the Write message, we send broadcast for each message one-by-one as we receive it. This is important
because the WAL stream from Postgres is ordered; if we reply to a Keepalive advancing the WAL position,
we should have already processed all the messages up to that point.
"""
@impl true
def handle_data(data, state) when is_keep_alive(data) do
%KeepAlive{reply: reply, wal_end: wal_end} = parse(data)
wal_end = wal_end + 1
message =
case reply do
:now -> standby_status(wal_end, wal_end, wal_end, reply)
:later -> hold()
end
{:noreply, message, state}
end
def handle_data(data, state) when is_write(data) do
%Write{message: message} = parse(data)
message
|> decode_message()
|> handle_message(%{state | counter: state.counter + 1})
end
def handle_data(data, state) do
Logger.error("Unknown WAL message received!",
data: inspect(data),
state: inspect(state)
defp log_warning(op, table) do
Logger.warning(
"No hook defined for #{op} on table #{table}. Please implement Domain.Events.Hooks for this table."
)
{:noreply, [], state}
end
# Handles messages received:
#
# 1. Insert/Update/Delete - send to Event.ingest/2 for further processing
# 2. Relation messages - store the relation data in our state so we can use it later
# to associate column names etc with the data we receive. In practice, we'll always
# see a Relation message before we see any data for that relation.
# 3. Begin/Commit/Origin/Truncate/Type - we ignore these messages for now
# 4. Graceful shutdown - we respond with {:disconnect, :normal} to
# indicate that we are shutting down gracefully and prevent auto reconnecting.
defp handle_message(
%Decoder.Messages.Relation{
id: id,
namespace: namespace,
name: name,
columns: columns
},
state
) do
relation = %{
namespace: namespace,
name: name,
columns: columns
}
{:noreply, [], %{state | relations: Map.put(state.relations, id, relation)}}
end
defp handle_message(%Decoder.Messages.Insert{} = msg, state) do
:ok = Event.ingest(msg, state.relations)
{:noreply, [], state}
end
defp handle_message(%Decoder.Messages.Update{} = msg, state) do
:ok = Event.ingest(msg, state.relations)
{:noreply, [], state}
end
defp handle_message(%Decoder.Messages.Delete{} = msg, state) do
:ok = Event.ingest(msg, state.relations)
{:noreply, [], state}
end
defp handle_message(%Decoder.Messages.Begin{}, state) do
{:noreply, [], state}
end
defp handle_message(%Decoder.Messages.Commit{}, state) do
{:noreply, [], state}
end
defp handle_message(%Decoder.Messages.Origin{}, state) do
{:noreply, [], state}
end
defp handle_message(%Decoder.Messages.Truncate{}, state) do
{:noreply, [], state}
end
defp handle_message(%Decoder.Messages.Type{}, state) do
{:noreply, [], state}
end
defp handle_message(%Decoder.Messages.Unsupported{data: data}, state) do
Logger.warning("Unsupported message received",
data: inspect(data),
counter: state.counter
)
{:noreply, [], state}
end
@impl true
def handle_info(:interval_logger, state) do
Logger.info("Processed #{state.counter} write messages from the WAL stream")
Process.send_after(self(), :interval_logger, @status_log_interval)
{:noreply, state}
end
def handle_info(:shutdown, _), do: {:disconnect, :normal}
def handle_info({:DOWN, _, :process, _, _}, _), do: {:disconnect, :normal}
def handle_info(_, state), do: {:noreply, state}
@doc """
Called when the connection is disconnected unexpectedly.
This will happen if:
1. Postgres is restarted such as during a maintenance window
2. The connection is closed by the server due to our failure to acknowledge
Keepalive messages in a timely manner
3. The connection is cut due to a network error
4. The ReplicationConnection process crashes or is killed abruptly for any reason
5. Potentially during a deploy if the connection is not closed gracefully.
Our Supervisor will restart this process automatically so this is not an error.
"""
@impl true
def handle_disconnect(state) do
Logger.info("Replication connection disconnected",
counter: state.counter
)
{:noreply, %{state | step: :disconnected}}
end
end

View File

@@ -0,0 +1,476 @@
# CREDIT: https://github.com/supabase/realtime/blob/main/lib/realtime/tenants/replication_connection.ex
defmodule Domain.Replication.Connection do
@moduledoc """
Receives WAL events from PostgreSQL and broadcasts them where they need to go.
The ReplicationConnection is started with a durable slot so that whatever data we
fail to acknowledge is retained in the slot on the server's disk. The server will
then send us the data when we reconnect. This is important because we want to
ensure that we don't lose any WAL data if we disconnect or crash, such as during a deploy.
The WAL data we receive is sent only once a COMMIT completes on the server. So even though
COMMIT is one of the message types we receive here, we can safely ignore it and process
insert/update/delete messages one-by-one in this module as we receive them.
## Usage
defmodule MyApp.ReplicationConnection do
use Domain.Replication.Connection,
alert_threshold_ms: 30_000,
publication_name: "my_events"
end
## Options
* `:alert_threshold_ms` - How long to allow the WAL stream to lag before logging a warning (default: 5000)
* `:publication_name` - Name of the PostgreSQL publication (default: "events")
"""
defmacro __using__(opts \\ []) do
# Compose all the quote blocks without nesting
[
basic_setup(),
struct_and_constants(opts),
connection_functions(),
query_handlers(),
data_handlers(),
message_handlers(),
transaction_handlers(),
ignored_message_handlers(),
utility_functions(),
info_handlers(opts),
default_callbacks()
]
end
# Extract basic imports and aliases
defp basic_setup do
quote do
use Postgrex.ReplicationConnection
require Logger
require OpenTelemetry.Tracer
import Domain.Replication.Protocol
import Domain.Replication.Decoder
alias Domain.Replication.Decoder
alias Domain.Replication.Protocol.{KeepAlive, Write}
end
end
# Extract struct definition and constants
defp struct_and_constants(opts) do
quote bind_quoted: [opts: opts] do
# Only these two are configurable
@alert_threshold_ms Keyword.fetch!(opts, :alert_threshold_ms)
@publication_name Keyword.fetch!(opts, :publication_name)
# Everything else uses defaults
@status_log_interval :timer.minutes(5)
@replication_slot_name "#{@publication_name}_slot"
@schema "public"
@output_plugin "pgoutput"
@proto_version 1
@type t :: %__MODULE__{
schema: String.t(),
step:
:disconnected
| :check_publication
| :create_publication
| :check_replication_slot
| :create_slot
| :start_replication_slot
| :streaming,
publication_name: String.t(),
replication_slot_name: String.t(),
output_plugin: String.t(),
proto_version: integer(),
table_subscriptions: list(),
relations: map(),
counter: integer()
}
defstruct schema: @schema,
step: :disconnected,
publication_name: @publication_name,
replication_slot_name: @replication_slot_name,
output_plugin: @output_plugin,
proto_version: @proto_version,
table_subscriptions: [],
relations: %{},
counter: 0
end
end
# Extract connection setup functions
defp connection_functions do
quote do
def start_link(%{instance: %__MODULE__{} = instance, connection_opts: connection_opts}) do
opts = connection_opts ++ [name: {:global, __MODULE__}]
Postgrex.ReplicationConnection.start_link(__MODULE__, instance, opts)
end
@impl true
def init(state) do
{:ok, Map.put(state, :lag_threshold_exceeded, false)}
end
@doc """
Called when we make a successful connection to the PostgreSQL server.
"""
@impl true
def handle_connect(state) do
query = "SELECT 1 FROM pg_publication WHERE pubname = '#{state.publication_name}'"
{:query, query, %{state | step: :create_publication}}
end
@doc """
Called when the connection is disconnected unexpectedly.
This will happen if:
1. Postgres is restarted such as during a maintenance window
2. The connection is closed by the server due to our failure to acknowledge
Keepalive messages in a timely manner
3. The connection is cut due to a network error
4. The ReplicationConnection process crashes or is killed abruptly for any reason
5. Potentially during a deploy if the connection is not closed gracefully.
Our Supervisor will restart this process automatically so this is not an error.
"""
@impl true
def handle_disconnect(state) do
Logger.info("#{__MODULE__}: Replication connection disconnected",
counter: state.counter
)
{:noreply, %{state | step: :disconnected}}
end
end
end
# Extract query result handlers
defp query_handlers do
quote do
@doc """
Generic callback that handles replies to the queries we send.
We use a simple state machine to issue queries one at a time to Postgres in order to:
1. Check if the publication exists
2. Check if the replication slot exists
3. Create the publication if it doesn't exist
4. Create the replication slot if it doesn't exist
5. Start the replication slot
6. Start streaming data from the replication slot
"""
@impl true
def handle_result(
[%Postgrex.Result{num_rows: 1}],
%__MODULE__{step: :create_publication} = state
) do
query =
"SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'"
{:query, query, %{state | step: :create_replication_slot}}
end
def handle_result(
[%Postgrex.Result{num_rows: 1}],
%__MODULE__{step: :create_replication_slot} = state
) do
{:query, "SELECT 1", %{state | step: :start_replication_slot}}
end
def handle_result([%Postgrex.Result{}], %__MODULE__{step: :start_replication_slot} = state) do
Logger.info("Starting replication slot #{state.replication_slot_name}",
state: inspect(state)
)
# Start logging regular status updates
send(self(), :interval_logger)
query =
"START_REPLICATION SLOT \"#{state.replication_slot_name}\" LOGICAL 0/0 (proto_version '#{state.proto_version}', publication_names '#{state.publication_name}')"
{:stream, query, [], %{state | step: :streaming}}
end
def handle_result(
[%Postgrex.Result{num_rows: 0}],
%__MODULE__{step: :create_publication} = state
) do
tables =
state.table_subscriptions
|> Enum.map_join(",", fn table -> "#{state.schema}.#{table}" end)
query = "CREATE PUBLICATION #{state.publication_name} FOR TABLE #{tables}"
{:query, query, %{state | step: :check_replication_slot}}
end
def handle_result([%Postgrex.Result{}], %__MODULE__{step: :check_replication_slot} = state) do
query =
"SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'"
{:query, query, %{state | step: :create_replication_slot}}
end
def handle_result(
[%Postgrex.Result{num_rows: 0}],
%__MODULE__{step: :create_replication_slot} = state
) do
query =
"CREATE_REPLICATION_SLOT #{state.replication_slot_name} LOGICAL #{state.output_plugin} NOEXPORT_SNAPSHOT"
{:query, query, %{state | step: :start_replication_slot}}
end
end
end
# Extract data handling functions
defp data_handlers do
quote do
@doc """
Called when we receive a message from the PostgreSQL server.
We handle the following messages:
1. KeepAlive: A message sent by PostgreSQL to keep the connection alive and also acknowledge
processed data by responding with the current WAL position.
2. Write: A message containing a WAL event - the actual data we are interested in.
3. Unknown: Any other message that we don't know how to handle - we log and ignore it.
For the KeepAlive message, we respond immediately with the current WAL position. Note: it is expected
that we receive many more of these messages than expected. That is because the rate at which the server
sends these messages scales proportionally to the number of Write messages it sends.
For the Write message, we send broadcast for each message one-by-one as we receive it. This is important
because the WAL stream from Postgres is ordered; if we reply to a Keepalive advancing the WAL position,
we should have already processed all the messages up to that point.
"""
@impl true
def handle_data(data, state) when is_keep_alive(data) do
%KeepAlive{reply: reply, wal_end: wal_end} = parse(data)
wal_end = wal_end + 1
message =
case reply do
:now -> standby_status(wal_end, wal_end, wal_end, reply)
:later -> hold()
end
{:noreply, message, state}
end
def handle_data(data, state) when is_write(data) do
OpenTelemetry.Tracer.with_span "#{__MODULE__}.handle_data/2" do
%Write{server_wal_end: server_wal_end, message: message} = parse(data)
message
|> decode_message()
|> handle_message(server_wal_end, %{state | counter: state.counter + 1})
end
end
def handle_data(data, state) do
Logger.error("#{__MODULE__}: Unknown WAL message received!",
data: inspect(data),
state: inspect(state)
)
{:noreply, [], state}
end
end
end
# Extract core message handling functions
defp message_handlers do
quote do
# Handles messages received:
#
# 1. Insert/Update/Delete/Begin/Commit - send to appropriate hook
# 2. Relation messages - store the relation data in our state so we can use it later
# to associate column names etc with the data we receive. In practice, we'll always
# see a Relation message before we see any data for that relation.
# 3. Origin/Truncate/Type - we ignore these messages for now
# 4. Graceful shutdown - we respond with {:disconnect, :normal} to
# indicate that we are shutting down gracefully and prevent auto reconnecting.
defp handle_message(
%Decoder.Messages.Relation{
id: id,
namespace: namespace,
name: name,
columns: columns
},
_server_wal_end,
state
) do
relation = %{
namespace: namespace,
name: name,
columns: columns
}
{:noreply, [], %{state | relations: Map.put(state.relations, id, relation)}}
end
defp handle_message(%Decoder.Messages.Insert{} = msg, server_wal_end, state) do
{op, table, _old_data, data} = transform(msg, state.relations)
:ok = on_insert(server_wal_end, table, data)
{:noreply, [], state}
end
defp handle_message(%Decoder.Messages.Update{} = msg, server_wal_end, state) do
{op, table, old_data, data} = transform(msg, state.relations)
:ok = on_update(server_wal_end, table, old_data, data)
{:noreply, [], state}
end
defp handle_message(%Decoder.Messages.Delete{} = msg, server_wal_end, state) do
{op, table, old_data, _data} = transform(msg, state.relations)
:ok = on_delete(server_wal_end, table, old_data)
{:noreply, [], state}
end
end
end
# Extract transaction and ignored message handlers
defp transaction_handlers do
quote do
defp handle_message(%Decoder.Messages.Begin{} = msg, server_wal_end, state) do
{:noreply, [], state}
end
defp handle_message(
%Decoder.Messages.Commit{commit_timestamp: commit_timestamp} = msg,
_server_wal_end,
state
) do
# Since we receive a commit for each operation and we process each operation
# one-by-one, we can use the commit timestamp to check if we are lagging behind.
lag_ms = DateTime.diff(commit_timestamp, DateTime.utc_now(), :millisecond)
send(self(), {:check_alert, lag_ms})
{:noreply, [], state}
end
end
end
# Extract handlers for ignored message types
defp ignored_message_handlers do
quote do
# These messages are not relevant for our use case, so we ignore them.
defp handle_message(%Decoder.Messages.Origin{}, _server_wal_end, state) do
{:noreply, [], state}
end
defp handle_message(%Decoder.Messages.Truncate{}, _server_wal_end, state) do
{:noreply, [], state}
end
defp handle_message(%Decoder.Messages.Type{}, _server_wal_end, state) do
{:noreply, [], state}
end
defp handle_message(%Decoder.Messages.Unsupported{data: data}, _server_wal_end, state) do
Logger.warning("#{__MODULE__}: Unsupported message received",
data: inspect(data),
counter: state.counter
)
{:noreply, [], state}
end
end
end
# Extract data transformation utilities
defp utility_functions do
quote do
defp transform(msg, relations) do
{op, old_tuple_data, tuple_data} = extract_msg_data(msg)
{:ok, relation} = Map.fetch(relations, msg.relation_id)
table = relation.name
old_data = zip(old_tuple_data, relation.columns)
data = zip(tuple_data, relation.columns)
{
op,
table,
old_data,
data
}
end
defp extract_msg_data(%Decoder.Messages.Insert{tuple_data: data}) do
{:insert, nil, data}
end
defp extract_msg_data(%Decoder.Messages.Update{old_tuple_data: old, tuple_data: data}) do
{:update, old, data}
end
defp extract_msg_data(%Decoder.Messages.Delete{old_tuple_data: old}) do
{:delete, old, nil}
end
defp zip(nil, _), do: nil
defp zip(tuple_data, columns) do
tuple_data
|> Tuple.to_list()
|> Enum.zip(columns)
|> Map.new(fn {value, column} -> {column.name, value} end)
|> Enum.into(%{})
end
end
end
# Extract info handlers
defp info_handlers(opts) do
quote bind_quoted: [opts: opts] do
@alert_threshold_ms Keyword.fetch!(opts, :alert_threshold_ms)
@status_log_interval :timer.minutes(5)
@impl true
# Log only once when crossing the threshold
def handle_info({:check_alert, lag_ms}, %{lag_threshold_exceeded: false} = state)
when lag_ms >= @alert_threshold_ms do
Logger.warning("#{__MODULE__}: Processing lag exceeds threshold", lag_ms: lag_ms)
{:noreply, %{state | lag_threshold_exceeded: true}}
end
def handle_info({:check_alert, lag_ms}, %{lag_threshold_exceeded: true} = state)
when lag_ms < @alert_threshold_ms do
Logger.info("#{__MODULE__}: Processing lag is back below threshold", lag_ms: lag_ms)
{:noreply, %{state | lag_threshold_exceeded: false}}
end
def handle_info(:interval_logger, state) do
Logger.info(
"#{__MODULE__}: Processed #{state.counter} write messages from the WAL stream"
)
Process.send_after(self(), :interval_logger, @status_log_interval)
{:noreply, state}
end
def handle_info(:shutdown, _), do: {:disconnect, :normal}
def handle_info({:DOWN, _, :process, _, _}, _), do: {:disconnect, :normal}
def handle_info(_, state), do: {:noreply, state}
end
end
# Extract default callback implementations
defp default_callbacks do
quote do
# Default implementations for required callbacks - modules using this should implement these
def on_insert(_lsn, _table, _data), do: :ok
def on_update(_lsn, _table, _old_data, _data), do: :ok
def on_delete(_lsn, _table, _old_data), do: :ok
defoverridable on_insert: 3, on_update: 4, on_delete: 3
end
end
end

View File

@@ -1,5 +1,5 @@
# CREDIT: https://github.com/supabase/realtime/blob/main/lib/realtime/adapters/postgres/decoder.ex
defmodule Domain.Events.Decoder do
defmodule Domain.Replication.Decoder do
@moduledoc """
Functions for decoding different types of logical replication messages.
"""
@@ -148,7 +148,7 @@ defmodule Domain.Events.Decoder do
Unsupported
}
alias Domain.Events.OidDatabase
alias Domain.Replication.OidDatabase
@doc """
Parses logical replication messages from Postgres
@@ -156,7 +156,7 @@ defmodule Domain.Events.Decoder do
## Examples
iex> decode_message(<<73, 0, 0, 96, 0, 78, 0, 2, 116, 0, 0, 0, 3, 98, 97, 122, 116, 0, 0, 0, 3, 53, 54, 48>>)
%Domain.Events.Decoder.Messages.Insert{relation_id: 24576, tuple_data: {"baz", "560"}}
%Domain.Replication.Decoder.Messages.Insert{relation_id: 24576, tuple_data: {"baz", "560"}}
"""
def decode_message(message) when is_binary(message) do

View File

@@ -1,4 +1,4 @@
defmodule Domain.Events.ReplicationConnectionManager do
defmodule Domain.Replication.Manager do
@moduledoc """
Manages the Postgrex.ReplicationConnection to ensure that we always have one running to prevent
unbounded growth of the WAL log and ensure we are processing events.
@@ -12,22 +12,22 @@ defmodule Domain.Events.ReplicationConnectionManager do
# but not too long to avoid broadcasting needed events.
@max_retries 10
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
def start_link(connection_module, opts) do
GenServer.start_link(__MODULE__, connection_module, opts)
end
@impl true
def init(_opts) do
send(self(), :connect)
def init(connection_module) do
send(self(), {:connect, connection_module})
{:ok, %{retries: 0}}
end
@impl true
def handle_info(:connect, %{retries: retries} = state) do
Process.send_after(self(), :connect, @retry_interval)
def handle_info({:connect, connection_module}, %{retries: retries} = state) do
Process.send_after(self(), {:connect, connection_module}, @retry_interval)
case Domain.Events.ReplicationConnection.start_link(replication_child_spec()) do
case connection_module.start_link(replication_child_spec(connection_module)) do
{:ok, _pid} ->
# Our process won
{:noreply, %{state | retries: 0}}
@@ -38,7 +38,7 @@ defmodule Domain.Events.ReplicationConnectionManager do
{:error, reason} ->
if retries < @max_retries do
Logger.info("Failed to start replication connection",
Logger.info("Failed to start replication connection #{connection_module}",
retries: retries,
max_retries: @max_retries,
reason: inspect(reason)
@@ -47,7 +47,7 @@ defmodule Domain.Events.ReplicationConnectionManager do
{:noreply, %{state | retries: retries + 1}}
else
Logger.error(
"Failed to start replication connection after #{@max_retries} attempts, giving up!",
"Failed to start replication connection #{connection_module} after #{@max_retries} attempts, giving up!",
reason: inspect(reason)
)
@@ -57,14 +57,14 @@ defmodule Domain.Events.ReplicationConnectionManager do
end
end
defp replication_child_spec do
def replication_child_spec(connection_module) do
{connection_opts, config} =
Application.fetch_env!(:domain, Domain.Events.ReplicationConnection)
Application.fetch_env!(:domain, connection_module)
|> Keyword.pop(:connection_opts)
%{
connection_opts: connection_opts,
instance: struct(Domain.Events.ReplicationConnection, config)
instance: struct(connection_module, config)
}
end
end

View File

@@ -1,5 +1,5 @@
# CREDIT: https://github.com/supabase/realtime/blob/main/lib/realtime/adapters/postgres/oid_database.ex
defmodule Domain.Events.OidDatabase do
defmodule Domain.Replication.OidDatabase do
@moduledoc "This module maps a numeric PostgreSQL type ID to a descriptive string."
@doc """

View File

@@ -1,10 +1,10 @@
# CREDIT: https://github.com/supabase/realtime/blob/main/lib/realtime/adapters/postgres/protocol.ex
defmodule Domain.Events.Protocol do
defmodule Domain.Replication.Protocol do
@moduledoc """
This module is responsible for parsing the Postgres WAL messages.
"""
alias Domain.Events.Protocol.Write
alias Domain.Events.Protocol.KeepAlive
alias Domain.Replication.Protocol.Write
alias Domain.Replication.Protocol.KeepAlive
defguard is_write(value) when binary_part(value, 0, 1) == <<?w>>
defguard is_keep_alive(value) when binary_part(value, 0, 1) == <<?k>>

View File

@@ -1,4 +1,4 @@
defmodule Domain.Events.Protocol.KeepAlive do
defmodule Domain.Replication.Protocol.KeepAlive do
@moduledoc """
Primary keepalive message (B)
Byte1('k')

View File

@@ -1,4 +1,4 @@
defmodule Domain.Events.Protocol.Write do
defmodule Domain.Replication.Protocol.Write do
@moduledoc """
XLogData (B)
Byte1('w')

View File

@@ -0,0 +1,32 @@
defmodule Domain.Repo.Migrations.CreateChangeLogs do
use Ecto.Migration
def up do
create table(:change_logs, primary_key: false) do
add(:id, :binary_id, primary_key: true)
add(:account_id, references(:accounts, type: :binary_id, on_delete: :delete_all),
null: false
)
add(:lsn, :bigint, null: false)
add(:table, :string, null: false)
add(:op, :string, null: false)
add(:old_data, :map)
add(:data, :map)
add(:vsn, :integer, null: false)
timestamps(type: :utc_datetime_usec, updated_at: false)
end
# For pulling logs for a particular customer
create(index(:change_logs, [:account_id]))
# For truncating logs by date
create(index(:change_logs, [:inserted_at]))
end
def down do
drop(table(:change_logs))
end
end

View File

@@ -0,0 +1,20 @@
defmodule Domain.Repo.Migrations.RemoveReplicaFromRelays do
use Ecto.Migration
@relations ~w[
relay_groups
relays
]
def up do
for relation <- @relations do
execute("ALTER TABLE #{relation} REPLICA IDENTITY DEFAULT")
end
end
def down do
for relation <- @relations do
execute("ALTER TABLE #{relation} REPLICA IDENTITY FULL")
end
end
end

View File

@@ -0,0 +1,349 @@
defmodule Domain.ChangeLogs.ReplicationConnectionTest do
use Domain.DataCase, async: true
import ExUnit.CaptureLog
import Ecto.Query
import Domain.ChangeLogs.ReplicationConnection
alias Domain.ChangeLogs.ChangeLog
alias Domain.Repo
setup do
account = Fixtures.Accounts.create_account()
%{account: account}
end
describe "on_insert/2" do
test "ignores flows table - no record created" do
table = "flows"
data = %{"id" => 1, "name" => "test flow"}
initial_count = Repo.aggregate(ChangeLog, :count, :id)
assert :ok = on_insert(0, table, data)
# No record should be created for flows
final_count = Repo.aggregate(ChangeLog, :count, :id)
assert final_count == initial_count
end
test "creates change log record for non-flows tables", %{account: account} do
table = "accounts"
data = %{"id" => account.id, "name" => "test account"}
assert :ok = on_insert(0, table, data)
# Verify the record was created
change_log = Repo.one!(from cl in ChangeLog, order_by: [desc: cl.inserted_at], limit: 1)
assert change_log.op == :insert
assert change_log.table == table
assert change_log.old_data == nil
assert change_log.data == data
assert change_log.vsn == 0
assert change_log.account_id == account.id
end
test "creates records for different table types", %{account: account} do
test_cases = [
{"accounts", %{"id" => account.id, "name" => "test account"}},
{"resources",
%{"id" => Ecto.UUID.generate(), "name" => "test resource", "account_id" => account.id}},
{"policies",
%{"id" => Ecto.UUID.generate(), "name" => "test policy", "account_id" => account.id}},
{"actors",
%{"id" => Ecto.UUID.generate(), "name" => "test actor", "account_id" => account.id}}
]
for {table, data} <- test_cases do
initial_count = Repo.aggregate(ChangeLog, :count, :id)
assert :ok = on_insert(0, table, data)
final_count = Repo.aggregate(ChangeLog, :count, :id)
assert final_count == initial_count + 1
change_log = Repo.one!(from cl in ChangeLog, order_by: [desc: cl.inserted_at], limit: 1)
assert change_log.op == :insert
assert change_log.table == table
assert change_log.old_data == nil
assert change_log.data == data
assert change_log.vsn == 0
assert change_log.account_id == account.id
end
end
end
describe "on_update/3" do
test "ignores flows table - no record created" do
table = "flows"
old_data = %{"id" => 1, "name" => "old flow"}
data = %{"id" => 1, "name" => "new flow"}
initial_count = Repo.aggregate(ChangeLog, :count, :id)
assert :ok = on_update(0, table, old_data, data)
# No record should be created for flows
final_count = Repo.aggregate(ChangeLog, :count, :id)
assert final_count == initial_count
end
test "creates change log record for non-flows tables", %{account: account} do
table = "accounts"
old_data = %{"id" => account.id, "name" => "old name"}
data = %{"id" => account.id, "name" => "new name"}
assert :ok = on_update(0, table, old_data, data)
# Verify the record was created
change_log = Repo.one!(from cl in ChangeLog, order_by: [desc: cl.inserted_at], limit: 1)
assert change_log.op == :update
assert change_log.table == table
assert change_log.old_data == old_data
assert change_log.data == data
assert change_log.vsn == 0
assert change_log.account_id == account.id
end
test "handles complex data structures", %{account: account} do
table = "resources"
resource_id = Ecto.UUID.generate()
old_data = %{
"id" => resource_id,
"name" => "old name",
"account_id" => account.id,
"settings" => %{"theme" => "dark", "notifications" => true}
}
data = %{
"id" => resource_id,
"name" => "new name",
"account_id" => account.id,
"settings" => %{"theme" => "light", "notifications" => false},
"tags" => ["updated", "important"]
}
assert :ok = on_update(0, table, old_data, data)
change_log = Repo.one!(from cl in ChangeLog, order_by: [desc: cl.inserted_at], limit: 1)
assert change_log.op == :update
assert change_log.table == table
assert change_log.old_data == old_data
assert change_log.data == data
assert change_log.vsn == 0
assert change_log.account_id == account.id
end
end
describe "on_delete/2" do
test "ignores flows table - no record created" do
table = "flows"
old_data = %{"id" => 1, "name" => "deleted flow"}
initial_count = Repo.aggregate(ChangeLog, :count, :id)
assert :ok = on_delete(0, table, old_data)
# No record should be created for flows
final_count = Repo.aggregate(ChangeLog, :count, :id)
assert final_count == initial_count
end
test "creates change log record for non-flows tables", %{account: account} do
table = "accounts"
old_data = %{"id" => account.id, "name" => "deleted account"}
assert :ok = on_delete(0, table, old_data)
# Verify the record was created
change_log = Repo.one!(from cl in ChangeLog, order_by: [desc: cl.inserted_at], limit: 1)
assert change_log.op == :delete
assert change_log.table == table
assert change_log.old_data == old_data
assert change_log.data == nil
assert change_log.vsn == 0
assert change_log.account_id == account.id
end
test "handles various data types in old_data", %{account: account} do
table = "resources"
resource_id = Ecto.UUID.generate()
old_data = %{
"id" => resource_id,
"name" => "complex resource",
"account_id" => account.id,
"metadata" => %{
"created_by" => "system",
"permissions" => ["read", "write"],
"config" => %{"timeout" => 30, "retries" => 3}
},
"active" => true,
"count" => 42
}
assert :ok = on_delete(0, table, old_data)
change_log = Repo.one!(from cl in ChangeLog, order_by: [desc: cl.inserted_at], limit: 1)
assert change_log.op == :delete
assert change_log.table == table
assert change_log.old_data == old_data
assert change_log.data == nil
assert change_log.vsn == 0
assert change_log.account_id == account.id
end
end
describe "error handling" do
test "handles foreign key errors gracefully" do
# Create a change log entry that references a non-existent account
table = "resources"
# Non-existent account_id
data = %{"id" => Ecto.UUID.generate(), "account_id" => Ecto.UUID.generate()}
initial_count = Repo.aggregate(ChangeLog, :count, :id)
# Should return :ok even if foreign key constraint fails
assert :ok = on_insert(0, table, data)
# No record should be created due to foreign key error
final_count = Repo.aggregate(ChangeLog, :count, :id)
assert final_count == initial_count
end
test "logs and handles non-foreign-key validation errors gracefully", %{account: account} do
# Test with invalid data that would cause validation errors (not foreign key)
table = "accounts"
# Missing required fields but valid FK
data = %{"account_id" => account.id}
initial_count = Repo.aggregate(ChangeLog, :count, :id)
log_output =
capture_log(fn ->
assert :ok = on_insert(0, table, data)
end)
# Should log the error
assert log_output =~ "Failed to create change log"
# No record should be created
final_count = Repo.aggregate(ChangeLog, :count, :id)
assert final_count == initial_count
end
end
describe "data integrity" do
test "preserves exact data structures", %{account: account} do
table = "policies"
policy_id = Ecto.UUID.generate()
# Test with various data types
complex_data = %{
"id" => policy_id,
"account_id" => account.id,
"string_field" => "test string",
"integer_field" => 42,
"boolean_field" => true,
"null_field" => nil,
"array_field" => [1, "two", %{"three" => 3}],
"nested_object" => %{
"level1" => %{
"level2" => %{
"deep_value" => "preserved"
}
}
}
}
assert :ok = on_insert(0, table, complex_data)
change_log = Repo.one!(from cl in ChangeLog, order_by: [desc: cl.inserted_at], limit: 1)
# Data should be preserved exactly as provided
assert change_log.data == complex_data
assert change_log.op == :insert
assert change_log.table == table
assert change_log.account_id == account.id
end
test "tracks operation sequence correctly", %{account: account} do
table = "accounts"
initial_data = %{"id" => account.id, "name" => "initial"}
updated_data = %{"id" => account.id, "name" => "updated"}
# Insert
assert :ok = on_insert(0, table, initial_data)
# Update
assert :ok = on_update(0, table, initial_data, updated_data)
# Delete
assert :ok = on_delete(0, table, updated_data)
# Get the three most recent records in reverse chronological order
logs =
Repo.all(
from cl in ChangeLog,
where: cl.account_id == ^account.id,
order_by: [desc: cl.inserted_at],
limit: 3
)
# Should have 3 records (delete, update, insert in that order)
assert length(logs) >= 3
[delete_log, update_log, insert_log] = Enum.take(logs, 3)
# Verify sequence (most recent first)
assert delete_log.op == :delete
assert delete_log.old_data == updated_data
assert delete_log.data == nil
assert delete_log.account_id == account.id
assert update_log.op == :update
assert update_log.old_data == initial_data
assert update_log.data == updated_data
assert update_log.account_id == account.id
assert insert_log.op == :insert
assert insert_log.old_data == nil
assert insert_log.data == initial_data
assert insert_log.account_id == account.id
# All should have same version
assert insert_log.vsn == 0
assert update_log.vsn == 0
assert delete_log.vsn == 0
end
end
describe "flows table comprehensive test" do
test "flows table never creates records regardless of operation or data" do
initial_count = Repo.aggregate(ChangeLog, :count, :id)
# Test various data shapes and operations
test_data_sets = [
%{},
%{"id" => 1},
%{"complex" => %{"nested" => ["data", 1, true, nil]}},
nil
]
for data <- test_data_sets do
assert :ok = on_insert(0, "flows", data)
assert :ok = on_update(0, "flows", data, data)
assert :ok = on_delete(0, "flows", data)
end
# No records should have been created
final_count = Repo.aggregate(ChangeLog, :count, :id)
assert final_count == initial_count
end
end
end

View File

@@ -0,0 +1,176 @@
defmodule Domain.ChangeLogsTest do
use Domain.DataCase, async: true
import Domain.ChangeLogs
describe "create/1" do
setup do
account = Fixtures.Accounts.create_account()
%{account: account}
end
test "inserts a change_log for an account", %{account: account} do
attrs = %{
lsn: 1,
table: "resources",
op: :insert,
old_data: nil,
data: %{"account_id" => account.id, "key" => "value"},
vsn: 1
}
assert {:ok, %Domain.ChangeLogs.ChangeLog{} = change_log} = create_change_log(attrs)
assert change_log.account_id == account.id
assert change_log.op == :insert
assert change_log.old_data == nil
assert change_log.data == %{"account_id" => account.id, "key" => "value"}
end
test "uses the 'id' field accounts table updates", %{account: account} do
attrs = %{
lsn: 1,
table: "accounts",
op: :update,
old_data: %{"id" => account.id, "name" => "Old Name"},
data: %{"id" => account.id, "name" => "New Name"},
vsn: 1
}
assert {:ok, %Domain.ChangeLogs.ChangeLog{} = change_log} = create_change_log(attrs)
assert change_log.account_id == account.id
assert change_log.op == :update
assert change_log.old_data == %{"id" => account.id, "name" => "Old Name"}
assert change_log.data == %{"id" => account.id, "name" => "New Name"}
end
test "requires vsn field", %{account: account} do
attrs = %{
lsn: 1,
table: "resources",
op: :insert,
old_data: nil,
data: %{"account_id" => account.id, "key" => "value"}
}
assert {:error, changeset} = create_change_log(attrs)
assert changeset.valid? == false
assert changeset.errors[:vsn] == {"can't be blank", [validation: :required]}
end
test "requires table field", %{account: account} do
attrs = %{
lsn: 1,
op: :insert,
old_data: nil,
data: %{"account_id" => account.id, "key" => "value"},
vsn: 1
}
assert {:error, changeset} = create_change_log(attrs)
assert changeset.valid? == false
assert changeset.errors[:table] == {"can't be blank", [validation: :required]}
end
test "requires op field to be one of :insert, :update, :delete", %{account: account} do
attrs = %{
lsn: 1,
table: "resources",
op: :invalid_op,
old_data: nil,
data: %{"account_id" => account.id, "key" => "value"},
vsn: 1
}
assert {:error, changeset} = create_change_log(attrs)
assert changeset.valid? == false
assert {"is invalid", errors} = changeset.errors[:op]
assert {:validation, :inclusion} in errors
end
test "requires correct combination of operation and data", %{account: account} do
# Invalid combination: :insert with old_data present
attrs = %{
lsn: 1,
table: "resources",
op: :insert,
old_data: %{"account_id" => account.id, "key" => "old_value"},
data: %{"account_id" => account.id, "key" => "new_value"},
vsn: 1
}
assert {:error, changeset} = create_change_log(attrs)
assert changeset.valid? == false
assert changeset.errors[:base] == {"Invalid combination of operation and data", []}
# Valid combination: :insert with old_data nil and data present
attrs = %{
lsn: 1,
table: "resources",
op: :insert,
old_data: nil,
data: %{"account_id" => account.id, "key" => "new_value"},
vsn: 1
}
assert {:ok, _change_log} = create_change_log(attrs)
# Valid combination: :update with both old_data and data present
attrs = %{
lsn: 1,
table: "resources",
op: :update,
old_data: %{"account_id" => account.id, "key" => "old_value"},
data: %{"account_id" => account.id, "key" => "new_value"},
vsn: 1
}
assert {:ok, _change_log} = create_change_log(attrs)
# Valid combination: :delete with old_data present and data nil
attrs = %{
lsn: 1,
table: "resources",
op: :delete,
old_data: %{"account_id" => account.id, "key" => "old_value"},
data: nil,
vsn: 1
}
assert {:ok, _change_log} = create_change_log(attrs)
end
test "requires account_id to be populated from old_data or data" do
attrs = %{
lsn: 1,
table: "resources",
op: :insert,
old_data: nil,
data: %{"key" => "value"},
vsn: 1
}
assert {:error, changeset} = create_change_log(attrs)
assert changeset.valid? == false
assert changeset.errors[:account_id] == {"can't be blank", [validation: :required]}
end
test "requires old_data[\"account_id\"] and data[\"account_id\"] to match", %{
account: account
} do
attrs = %{
lsn: 1,
table: "resources",
op: :update,
old_data: %{"account_id" => account.id, "key" => "old_value"},
data: %{"account_id" => "different_account_id", "key" => "new_value"},
vsn: 1
}
assert {:error, changeset} = create_change_log(attrs)
assert changeset.valid? == false
assert changeset.errors[:base] == {"Account ID cannot be changed", []}
end
end
end

View File

@@ -1,28 +0,0 @@
defmodule Domain.Events.EventTest do
use ExUnit.Case, async: true
# import Domain.Events.Event
# alias Domain.Events.Decoder
setup do
config = Application.fetch_env!(:domain, Domain.Events.ReplicationConnection)
table_subscriptions = config[:table_subscriptions]
%{table_subscriptions: table_subscriptions}
end
# TODO: WAL
# Refactor this to test ingest of all table subscriptions as structs with stringified
# keys in order to assert on the shape of the data.
# describe "ingest/2" do
# test "returns :ok for insert on all configured table subscriptions", %{
# table_subscriptions: table_subscriptions
# } do
# for table <- table_subscriptions do
# relations = %{"1" => %{name: table, columns: []}}
# msg = %Decoder.Messages.Insert{tuple_data: {}, relation_id: "1"}
#
# assert :ok == ingest(msg, relations)
# end
# end
# end
end

View File

@@ -1,222 +1,153 @@
defmodule Domain.Events.ReplicationConnectionTest do
# Only one ReplicationConnection should be started in the cluster
use ExUnit.Case, async: false
use ExUnit.Case, async: true
alias Domain.Events.ReplicationConnection
import ExUnit.CaptureLog
import Domain.Events.ReplicationConnection
# Used to test callbacks, not used for live connection
@mock_state %ReplicationConnection{
schema: "test_schema",
step: :disconnected,
publication_name: "test_pub",
replication_slot_name: "test_slot",
output_plugin: "pgoutput",
proto_version: 1,
table_subscriptions: ["accounts", "resources"],
relations: %{},
counter: 0
}
# Used to test live connection
setup do
{connection_opts, config} =
tables =
Application.fetch_env!(:domain, Domain.Events.ReplicationConnection)
|> Keyword.pop(:connection_opts)
|> Keyword.fetch!(:table_subscriptions)
init_state = %{
connection_opts: connection_opts,
instance: struct(Domain.Events.ReplicationConnection, config)
}
%{tables: tables}
end
child_spec = %{
id: Domain.Events.ReplicationConnection,
start: {Domain.Events.ReplicationConnection, :start_link, [init_state]}
}
describe "on_insert/2" do
test "logs warning for unknown table" do
table = "unknown_table"
data = %{"id" => Ecto.UUID.generate(), "name" => "test"}
{:ok, pid} =
case start_supervised(child_spec) do
{:ok, pid} ->
{:ok, pid}
log_output =
capture_log(fn ->
assert :ok = on_insert(0, table, data)
end)
{:error, {:already_started, pid}} ->
{:ok, pid}
assert log_output =~ "No hook defined for insert on table unknown_table"
assert log_output =~ "Please implement Domain.Events.Hooks for this table"
end
test "handles known tables without errors", %{tables: tables} do
for table <- tables do
data = %{"id" => Ecto.UUID.generate(), "table" => table}
# The actual hook call might fail if the hook modules aren't available,
# but we can test that our routing logic works
try do
result = on_insert(0, table, data)
# Should either succeed or fail gracefully
assert result in [:ok, :error] or match?({:error, _}, result)
rescue
# Depending on the shape of the data we might get a function clause error. This is ok,
# as we are testing the routing logic, not the actual hook implementations.
FunctionClauseError -> :ok
end
end
end
{:ok, pid: pid}
end
test "handles all configured tables", %{tables: tables} do
for table <- tables do
# Should not log warnings for configured tables
log_output =
capture_log(fn ->
try do
on_insert(0, table, %{"id" => Ecto.UUID.generate()})
rescue
FunctionClauseError ->
# Shape of the data might not match the expected one, which is fine
:ok
end
end)
describe "handle_connect/1 callback" do
test "handle_connect initiates publication check" do
state = @mock_state
expected_query = "SELECT 1 FROM pg_publication WHERE pubname = '#{state.publication_name}'"
expected_next_state = %{state | step: :create_publication}
assert {:query, ^expected_query, ^expected_next_state} =
ReplicationConnection.handle_connect(state)
refute log_output =~ "No hook defined for insert"
end
end
end
describe "handle_result/2 callback" do
test "handle_result transitions from create_publication to create_replication_slot when publication exists" do
state = %{@mock_state | step: :create_publication}
result = [%Postgrex.Result{num_rows: 1}]
describe "on_update/3" do
test "logs warning for unknown table" do
table = "unknown_table"
old_data = %{"id" => Ecto.UUID.generate(), "name" => "old"}
data = %{"id" => Ecto.UUID.generate(), "name" => "new"}
expected_query =
"SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'"
log_output =
capture_log(fn ->
assert :ok = on_update(0, table, old_data, data)
end)
expected_next_state = %{state | step: :create_replication_slot}
assert {:query, ^expected_query, ^expected_next_state} =
ReplicationConnection.handle_result(result, state)
assert log_output =~ "No hook defined for update on table unknown_table"
assert log_output =~ "Please implement Domain.Events.Hooks for this table"
end
test "handle_result transitions from create_replication_slot to start_replication_slot when slot exists" do
state = %{@mock_state | step: :create_replication_slot}
result = [%Postgrex.Result{num_rows: 1}]
test "handles known tables", %{tables: tables} do
old_data = %{"id" => Ecto.UUID.generate(), "name" => "old name"}
data = %{"id" => Ecto.UUID.generate(), "name" => "new name"}
expected_query = "SELECT 1"
expected_next_state = %{state | step: :start_replication_slot}
assert {:query, ^expected_query, ^expected_next_state} =
ReplicationConnection.handle_result(result, state)
end
test "handle_result transitions from start_replication_slot to streaming" do
state = %{@mock_state | step: :start_replication_slot}
result = [%Postgrex.Result{num_rows: 1}]
expected_stream_query =
"START_REPLICATION SLOT \"#{state.replication_slot_name}\" LOGICAL 0/0 (proto_version '#{state.proto_version}', publication_names '#{state.publication_name}')"
expected_next_state = %{state | step: :streaming}
assert {:stream, ^expected_stream_query, [], ^expected_next_state} =
ReplicationConnection.handle_result(result, state)
end
test "handle_result creates publication if it doesn't exist" do
state = %{@mock_state | step: :create_publication}
result = [%Postgrex.Result{num_rows: 0}]
expected_tables = "test_schema.accounts,test_schema.resources"
expected_query = "CREATE PUBLICATION #{state.publication_name} FOR TABLE #{expected_tables}"
expected_next_state = %{state | step: :check_replication_slot}
assert {:query, ^expected_query, ^expected_next_state} =
ReplicationConnection.handle_result(result, state)
end
test "handle_result transitions from check_replication_slot to create_replication_slot after creating publication" do
state = %{@mock_state | step: :check_replication_slot}
result = [%Postgrex.Result{num_rows: 0}]
expected_query =
"SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'"
expected_next_state = %{state | step: :create_replication_slot}
assert {:query, ^expected_query, ^expected_next_state} =
ReplicationConnection.handle_result(result, state)
end
test "handle_result creates replication slot if it doesn't exist" do
state = %{@mock_state | step: :create_replication_slot}
result = [%Postgrex.Result{num_rows: 0}]
expected_query =
"CREATE_REPLICATION_SLOT #{state.replication_slot_name} LOGICAL #{state.output_plugin} NOEXPORT_SNAPSHOT"
expected_next_state = %{state | step: :start_replication_slot}
assert {:query, ^expected_query, ^expected_next_state} =
ReplicationConnection.handle_result(result, state)
for table <- tables do
try do
result = on_update(0, table, old_data, data)
assert result in [:ok, :error] or match?({:error, _}, result)
rescue
FunctionClauseError ->
# Shape of the data might not match the expected one, which is fine
:ok
end
end
end
end
# In-depth decoding tests are handled in Domain.Events.DecoderTest
describe "handle_data/2" do
test "handle_data handles KeepAlive with reply :now" do
state = %{@mock_state | step: :streaming}
wal_end = 12345
describe "on_delete/2" do
test "logs warning for unknown table" do
table = "unknown_table"
old_data = %{"id" => Ecto.UUID.generate(), "name" => "deleted"}
now =
System.os_time(:microsecond) - DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond)
log_output =
capture_log(fn ->
assert :ok = on_delete(0, table, old_data)
end)
# 100 milliseconds
grace_period = 100_000
keepalive_data = <<?k, wal_end::64, 0::64, 1>>
assert {:noreply, reply, ^state} =
ReplicationConnection.handle_data(keepalive_data, state)
assert [<<?r, 12346::64, 12346::64, 12346::64, clock::64, 1::8>>] = reply
assert now <= clock
assert clock < now + grace_period
assert log_output =~ "No hook defined for delete on table unknown_table"
assert log_output =~ "Please implement Domain.Events.Hooks for this table"
end
test "handle_data handles KeepAlive with reply :later" do
state = %{@mock_state | step: :streaming}
wal_end = 54321
test "handles known tables", %{tables: tables} do
old_data = %{"id" => Ecto.UUID.generate(), "name" => "deleted gateway"}
keepalive_data = <<?k, wal_end::64, 0::64, 0>>
expected_reply_message = []
assert {:noreply, ^expected_reply_message, ^state} =
ReplicationConnection.handle_data(keepalive_data, state)
end
test "handle_data handles Write message" do
state = %{@mock_state | step: :streaming}
server_wal_start = 123_456_789
server_wal_end = 987_654_321
server_system_clock = 1_234_567_890
message = "Hello, world!"
write_data =
<<?w, server_wal_start::64, server_wal_end::64, server_system_clock::64, message::binary>>
new_state = %{state | counter: state.counter + 1}
assert {:noreply, [], ^new_state} = ReplicationConnection.handle_data(write_data, state)
end
test "handle_data handles unknown message" do
state = %{@mock_state | step: :streaming}
unknown_data = <<?q, 1, 2, 3>>
assert {:noreply, [], ^state} = ReplicationConnection.handle_data(unknown_data, state)
for table <- tables do
try do
assert :ok = on_delete(0, table, old_data)
rescue
# Shape of the data might not match the expected one, which is fine
FunctionClauseError -> :ok
end
end
end
end
describe "handle_info/2" do
test "handle_info handles :shutdown message" do
state = @mock_state
assert {:disconnect, :normal} = ReplicationConnection.handle_info(:shutdown, state)
end
describe "warning message formatting" do
test "log_warning generates correct message format" do
log_output =
capture_log(fn ->
assert :ok = on_insert(0, "test_table_insert", %{})
end)
test "handle_info handles :DOWN message from monitored process" do
state = @mock_state
monitor_ref = make_ref()
down_msg = {:DOWN, monitor_ref, :process, :some_pid, :shutdown}
assert log_output =~ "No hook defined for insert on table test_table_insert"
assert log_output =~ "Please implement Domain.Events.Hooks for this table"
assert {:disconnect, :normal} = ReplicationConnection.handle_info(down_msg, state)
end
log_output =
capture_log(fn ->
assert :ok = on_update(0, "test_table_update", %{}, %{})
end)
test "handle_info ignores other messages" do
state = @mock_state
random_msg = {:some_other_info, "data"}
assert log_output =~ "No hook defined for update on table test_table_update"
assert log_output =~ "Please implement Domain.Events.Hooks for this table"
assert {:noreply, ^state} = ReplicationConnection.handle_info(random_msg, state)
end
end
log_output =
capture_log(fn ->
assert :ok = on_delete(0, "test_table_delete", %{})
end)
describe "handle_disconnect/1" do
test "handle_disconnect resets step to :disconnected" do
state = %{@mock_state | step: :streaming}
expected_state = %{state | step: :disconnected}
assert {:noreply, ^expected_state} = ReplicationConnection.handle_disconnect(state)
assert log_output =~ "No hook defined for delete on table test_table_delete"
assert log_output =~ "Please implement Domain.Events.Hooks for this table"
end
end
end

View File

@@ -0,0 +1,308 @@
defmodule Domain.Replication.ConnectionTest do
# Only one ReplicationConnection should be started in the cluster
use ExUnit.Case, async: false
# Create a test module that uses the macro
defmodule TestReplicationConnection do
use Domain.Replication.Connection,
alert_threshold_ms: 5_000,
publication_name: "test_events"
end
alias TestReplicationConnection
# Used to test callbacks, not used for live connection
def mock_state,
do: %TestReplicationConnection{
schema: "test_schema",
step: :disconnected,
publication_name: "test_pub",
replication_slot_name: "test_slot",
output_plugin: "pgoutput",
proto_version: 1,
table_subscriptions: ["accounts", "resources"],
relations: %{},
counter: 0
}
# Used to test live connection
setup do
{connection_opts, config} =
Application.fetch_env!(:domain, Domain.Events.ReplicationConnection)
|> Keyword.pop(:connection_opts)
init_state = %{
connection_opts: connection_opts,
instance: struct(TestReplicationConnection, config)
}
child_spec = %{
id: TestReplicationConnection,
start: {TestReplicationConnection, :start_link, [init_state]}
}
{:ok, pid} =
case start_supervised(child_spec) do
{:ok, pid} ->
{:ok, pid}
{:error, {:already_started, pid}} ->
{:ok, pid}
end
{:ok, pid: pid}
end
describe "handle_connect/1 callback" do
test "handle_connect initiates publication check" do
state = mock_state()
expected_query = "SELECT 1 FROM pg_publication WHERE pubname = '#{state.publication_name}'"
expected_next_state = %{state | step: :create_publication}
assert {:query, ^expected_query, ^expected_next_state} =
TestReplicationConnection.handle_connect(state)
end
end
describe "handle_result/2 callback" do
test "handle_result transitions from create_publication to create_replication_slot when publication exists" do
state = %{mock_state() | step: :create_publication}
result = [%Postgrex.Result{num_rows: 1}]
expected_query =
"SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'"
expected_next_state = %{state | step: :create_replication_slot}
assert {:query, ^expected_query, ^expected_next_state} =
TestReplicationConnection.handle_result(result, state)
end
test "handle_result transitions from create_replication_slot to start_replication_slot when slot exists" do
state = %{mock_state() | step: :create_replication_slot}
result = [%Postgrex.Result{num_rows: 1}]
expected_query = "SELECT 1"
expected_next_state = %{state | step: :start_replication_slot}
assert {:query, ^expected_query, ^expected_next_state} =
TestReplicationConnection.handle_result(result, state)
end
test "handle_result transitions from start_replication_slot to streaming" do
state = %{mock_state() | step: :start_replication_slot}
result = [%Postgrex.Result{num_rows: 1}]
expected_stream_query =
"START_REPLICATION SLOT \"#{state.replication_slot_name}\" LOGICAL 0/0 (proto_version '#{state.proto_version}', publication_names '#{state.publication_name}')"
expected_next_state = %{state | step: :streaming}
assert {:stream, ^expected_stream_query, [], ^expected_next_state} =
TestReplicationConnection.handle_result(result, state)
end
test "handle_result creates publication if it doesn't exist" do
state = %{mock_state() | step: :create_publication}
result = [%Postgrex.Result{num_rows: 0}]
expected_tables = "test_schema.accounts,test_schema.resources"
expected_query = "CREATE PUBLICATION #{state.publication_name} FOR TABLE #{expected_tables}"
expected_next_state = %{state | step: :check_replication_slot}
assert {:query, ^expected_query, ^expected_next_state} =
TestReplicationConnection.handle_result(result, state)
end
test "handle_result transitions from check_replication_slot to create_replication_slot after creating publication" do
state = %{mock_state() | step: :check_replication_slot}
result = [%Postgrex.Result{num_rows: 0}]
expected_query =
"SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'"
expected_next_state = %{state | step: :create_replication_slot}
assert {:query, ^expected_query, ^expected_next_state} =
TestReplicationConnection.handle_result(result, state)
end
test "handle_result creates replication slot if it doesn't exist" do
state = %{mock_state() | step: :create_replication_slot}
result = [%Postgrex.Result{num_rows: 0}]
expected_query =
"CREATE_REPLICATION_SLOT #{state.replication_slot_name} LOGICAL #{state.output_plugin} NOEXPORT_SNAPSHOT"
expected_next_state = %{state | step: :start_replication_slot}
assert {:query, ^expected_query, ^expected_next_state} =
TestReplicationConnection.handle_result(result, state)
end
end
describe "handle_data/2" do
test "handle_data handles KeepAlive with reply :now" do
state = %{mock_state() | step: :streaming}
wal_end = 12345
now =
System.os_time(:microsecond) - DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond)
# 100 milliseconds
grace_period = 100_000
keepalive_data = <<?k, wal_end::64, 0::64, 1>>
assert {:noreply, reply, ^state} =
TestReplicationConnection.handle_data(keepalive_data, state)
assert [<<?r, 12346::64, 12346::64, 12346::64, clock::64, 1::8>>] = reply
assert now <= clock
assert clock < now + grace_period
end
test "handle_data handles KeepAlive with reply :later" do
state = %{mock_state() | step: :streaming}
wal_end = 54321
keepalive_data = <<?k, wal_end::64, 0::64, 0>>
expected_reply_message = []
assert {:noreply, ^expected_reply_message, ^state} =
TestReplicationConnection.handle_data(keepalive_data, state)
end
test "handle_data handles Write message and increments counter" do
state = %{mock_state() | step: :streaming}
server_wal_start = 123_456_789
server_wal_end = 987_654_321
server_system_clock = 1_234_567_890
message = "Hello, world!"
write_data =
<<?w, server_wal_start::64, server_wal_end::64, server_system_clock::64, message::binary>>
new_state = %{state | counter: state.counter + 1}
assert {:noreply, [], ^new_state} = TestReplicationConnection.handle_data(write_data, state)
end
test "handle_data handles unknown message" do
state = %{mock_state() | step: :streaming}
unknown_data = <<?q, 1, 2, 3>>
assert {:noreply, [], ^state} = TestReplicationConnection.handle_data(unknown_data, state)
end
test "sends {:check_alert, lag_ms} > 5_000 ms" do
state =
%{mock_state() | step: :streaming}
|> Map.put(:lag_threshold_exceeded, false)
server_wal_start = 123_456_789
server_wal_end = 987_654_321
server_system_clock = 1_234_567_890
flags = <<0>>
lsn = <<0::32, 100::32>>
end_lsn = <<0::32, 200::32>>
# Simulate a commit timestamp that exceeds the threshold
timestamp =
DateTime.diff(DateTime.utc_now(), ~U[2000-01-01 00:00:00Z], :microsecond) + 10_000_000
commit_data = <<?C, flags::binary, lsn::binary, end_lsn::binary, timestamp::64>>
write_message =
<<?w, server_wal_start::64, server_wal_end::64, server_system_clock::64,
commit_data::binary>>
assert {:noreply, [], _state} =
TestReplicationConnection.handle_data(write_message, state)
assert_receive({:check_alert, lag_ms})
assert lag_ms > 5_000
end
test "sends {:check_alert, lag_ms} < 5_000 ms" do
state =
%{mock_state() | step: :streaming}
|> Map.put(:lag_threshold_exceeded, true)
server_wal_start = 123_456_789
server_wal_end = 987_654_321
server_system_clock = 1_234_567_890
flags = <<0>>
lsn = <<0::32, 100::32>>
end_lsn = <<0::32, 200::32>>
# Simulate a commit timestamp that is within the threshold
timestamp =
DateTime.diff(DateTime.utc_now(), ~U[2000-01-01 00:00:00Z], :microsecond) + 1_000_000
commit_data = <<?C, flags::binary, lsn::binary, end_lsn::binary, timestamp::64>>
write_message =
<<?w, server_wal_start::64, server_wal_end::64, server_system_clock::64,
commit_data::binary>>
assert {:noreply, [], _state} =
TestReplicationConnection.handle_data(write_message, state)
assert_receive({:check_alert, lag_ms})
assert lag_ms < 5_000
end
end
describe "handle_info/2" do
test "handle_info handles :shutdown message" do
state = mock_state()
assert {:disconnect, :normal} = TestReplicationConnection.handle_info(:shutdown, state)
end
test "handle_info handles :DOWN message from monitored process" do
state = mock_state()
monitor_ref = make_ref()
down_msg = {:DOWN, monitor_ref, :process, :some_pid, :shutdown}
assert {:disconnect, :normal} = TestReplicationConnection.handle_info(down_msg, state)
end
test "handle_info ignores other messages" do
state = mock_state()
random_msg = {:some_other_info, "data"}
assert {:noreply, ^state} = TestReplicationConnection.handle_info(random_msg, state)
end
test "handle_info processes lag alerts" do
state = Map.put(mock_state(), :lag_threshold_exceeded, false)
# Test crossing threshold
assert {:noreply, %{lag_threshold_exceeded: true}} =
TestReplicationConnection.handle_info({:check_alert, 6_000}, state)
# Test going back below threshold
state_above = %{state | lag_threshold_exceeded: true}
assert {:noreply, %{lag_threshold_exceeded: false}} =
TestReplicationConnection.handle_info({:check_alert, 3_000}, state_above)
# Test staying below threshold
assert {:noreply, %{lag_threshold_exceeded: false}} =
TestReplicationConnection.handle_info({:check_alert, 2_000}, state)
# Test staying above threshold
assert {:noreply, %{lag_threshold_exceeded: true}} =
TestReplicationConnection.handle_info({:check_alert, 7_000}, state_above)
end
end
describe "handle_disconnect/1" do
test "handle_disconnect resets step to :disconnected" do
state = %{mock_state() | step: :streaming}
expected_state = %{state | step: :disconnected}
assert {:noreply, ^expected_state} = TestReplicationConnection.handle_disconnect(state)
end
end
end

View File

@@ -1,8 +1,8 @@
defmodule Domain.Events.DecoderTest do
defmodule Domain.Replication.DecoderTest do
use ExUnit.Case, async: true
alias Domain.Events.Decoder
alias Domain.Events.Decoder.Messages
alias Domain.Replication.Decoder
alias Domain.Replication.Decoder.Messages
@lsn_binary <<0::integer-32, 23_785_280::integer-32>>
@lsn_decoded {0, 23_785_280}

View File

@@ -33,6 +33,39 @@ config :domain, Domain.Repo,
migration_lock: :pg_advisory_lock,
start_apps_before_migration: [:ssl, :logger_json]
config :domain, Domain.ChangeLogs.ReplicationConnection,
enabled: true,
connection_opts: [
hostname: "localhost",
port: 5432,
ssl: false,
ssl_opts: [],
parameters: [],
username: "postgres",
database: "firezone_dev",
password: "postgres"
],
# When changing these, make sure to also:
# 1. Make appropriate changes to `Domain.Events.Event`
# 2. Add an appropriate `Domain.Events.Hooks` module
table_subscriptions: ~w[
accounts
actor_group_memberships
actor_groups
actors
auth_identities
auth_providers
clients
flow_activities
flows
gateway_groups
gateways
policies
resource_connections
resources
tokens
]
config :domain, Domain.Events.ReplicationConnection,
enabled: true,
connection_opts: [
@@ -61,8 +94,6 @@ config :domain, Domain.Events.ReplicationConnection,
gateway_groups
gateways
policies
relay_groups
relays
resource_connections
resources
tokens

View File

@@ -27,6 +27,21 @@ if config_env() == :prod do
else: [{:hostname, env_var_to_config!(:database_host)}]
)
config :domain, Domain.ChangeLogs.ReplicationConnection,
enabled: env_var_to_config!(:background_jobs_enabled),
replication_slot_name: env_var_to_config!(:database_replication_slot_name),
publication_name: env_var_to_config!(:database_publication_name),
connection_opts: [
hostname: env_var_to_config!(:database_host),
port: env_var_to_config!(:database_port),
ssl: env_var_to_config!(:database_ssl_enabled),
ssl_opts: env_var_to_config!(:database_ssl_opts),
parameters: env_var_to_config!(:database_parameters),
username: env_var_to_config!(:database_user),
password: env_var_to_config!(:database_password),
database: env_var_to_config!(:database_name)
]
config :domain, Domain.Events.ReplicationConnection,
enabled: env_var_to_config!(:background_jobs_enabled),
replication_slot_name: env_var_to_config!(:database_replication_slot_name),

View File

@@ -20,11 +20,15 @@ config :domain, Domain.Repo,
pool: Ecto.Adapters.SQL.Sandbox,
queue_target: 1000
config :domain, Domain.Events.ReplicationConnection,
publication_name: "events_test",
replication_slot_name: "events_slot_test",
config :domain, Domain.ChangeLogs.ReplicationConnection,
enabled: false,
connection_opts: [
database: "firezone_test#{partition_suffix}"
]
config :domain, Domain.Events.ReplicationConnection,
enabled: false,
connection_opts: [
auto_reconnect: false,
database: "firezone_test#{partition_suffix}"
]

View File

@@ -1,515 +0,0 @@
defmodule Domain.Events.ReplicationConnectionTest do
# Only one ReplicationConnection should be started in the cluster
use ExUnit.Case, async: false
alias Domain.Events.Decoder.Messages
alias Domain.Events.ReplicationConnection
# Used to test callbacks, not used for live connection
@mock_state %ReplicationConnection{
schema: "test_schema",
connection_opts: [],
step: :disconnected,
publication_name: "test_pub",
replication_slot_name: "test_slot",
output_plugin: "pgoutput",
proto_version: 1,
# Example, adjust if needed
table_subscriptions: ["accounts", "resources"],
relations: %{}
}
# Used to test live connection (Setup remains unchanged)
setup do
# Ensure Postgrex is started if your tests rely on it implicitly
{:ok, pid} = start_supervised(Domain.Events.ReplicationConnection)
{:ok, pid: pid}
end
describe "handle_connect/1 callback" do
test "handle_connect initiates publication check" do
state = @mock_state
expected_query = "SELECT 1 FROM pg_publication WHERE pubname = '#{state.publication_name}'"
expected_next_state = %{state | step: :create_publication}
assert {:query, ^expected_query, ^expected_next_state} =
ReplicationConnection.handle_connect(state)
end
end
describe "handle_result/2 callback" do
test "handle_result transitions from create_publication to create_replication_slot when publication exists" do
state = %{@mock_state | step: :create_publication}
# Mock a successful result for the SELECT query
result = %Postgrex.Result{
command: :select,
columns: ["?column?"],
num_rows: 1,
rows: [[1]]
}
expected_query =
"SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'"
expected_next_state = %{state | step: :create_replication_slot}
assert {:query, ^expected_query, ^expected_next_state} =
ReplicationConnection.handle_result(result, state)
end
test "handle_result transitions from create_replication_slot to start_replication_slot when slot exists" do
state = %{@mock_state | step: :create_replication_slot}
# Mock a successful result for the SELECT query
result = %Postgrex.Result{
command: :select,
columns: ["?column?"],
num_rows: 1,
rows: [[1]]
}
expected_query =
"CREATE_REPLICATION_SLOT #{state.replication_slot_name} LOGICAL #{state.output_plugin} NOEXPORT_SNAPSHOT"
expected_next_state = %{state | step: :start_replication_slot}
expected_stream_query =
"START_REPLICATION SLOT \"#{state.replication_slot_name}\" LOGICAL 0/0 (proto_version '#{state.proto_version}', publication_names '#{state.publication_name}')"
# Should be :streaming directly? Check impl.
expected_next_state_direct = %{state | step: :start_replication_slot}
# Let's assume it first goes to :start_replication_slot step, then handle_result for *that* step triggers START_REPLICATION
assert {:query, _query, ^expected_next_state_direct} =
ReplicationConnection.handle_result(result, state)
end
test "handle_result transitions from start_replication_slot to streaming" do
state = %{@mock_state | step: :start_replication_slot}
# Mock a successful result for the CREATE_REPLICATION_SLOT or preceding step
result = %Postgrex.Result{
# Or whatever command led here
command: :create_replication_slot,
columns: nil,
num_rows: 0,
rows: nil
}
expected_stream_query =
"START_REPLICATION SLOT \"#{state.replication_slot_name}\" LOGICAL 0/0 (proto_version '#{state.proto_version}', publication_names '#{state.publication_name}')"
expected_next_state = %{state | step: :streaming}
assert {:stream, ^expected_stream_query, [], ^expected_next_state} =
ReplicationConnection.handle_result(result, state)
end
test "handle_result creates publication if it doesn't exist" do
state = %{@mock_state | step: :create_publication}
# Mock result indicating publication doesn't exist
result = %Postgrex.Result{
command: :select,
columns: ["?column?"],
num_rows: 0,
rows: []
}
# Combine schema and table names correctly
expected_tables =
state.table_subscriptions
|> Enum.map(fn table -> "#{state.schema}.#{table}" end)
|> Enum.join(",")
expected_query = "CREATE PUBLICATION #{state.publication_name} FOR TABLE #{expected_tables}"
# The original test expected the next step to be :check_replication_slot, let's keep that
expected_next_state = %{state | step: :check_replication_slot}
assert {:query, ^expected_query, ^expected_next_state} =
ReplicationConnection.handle_result(result, state)
end
test "handle_result transitions from check_replication_slot to create_replication_slot after creating publication" do
state = %{@mock_state | step: :check_replication_slot}
# Mock a successful result from the CREATE PUBLICATION command
result = %Postgrex.Result{
command: :create_publication,
columns: nil,
num_rows: 0,
rows: nil
}
expected_query =
"SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'"
expected_next_state = %{state | step: :create_replication_slot}
assert {:query, ^expected_query, ^expected_next_state} =
ReplicationConnection.handle_result(result, state)
end
test "handle_result creates replication slot if it doesn't exist" do
state = %{@mock_state | step: :create_replication_slot}
# Mock result indicating slot doesn't exist
result = %Postgrex.Result{
command: :select,
columns: ["?column?"],
num_rows: 0,
rows: []
}
expected_query =
"CREATE_REPLICATION_SLOT #{state.replication_slot_name} LOGICAL #{state.output_plugin} NOEXPORT_SNAPSHOT"
expected_next_state = %{state | step: :start_replication_slot}
assert {:query, ^expected_query, ^expected_next_state} =
ReplicationConnection.handle_result(result, state)
end
end
# --- handle_data tests remain unchanged ---
# In-depth decoding tests are handled in Domain.Events.DecoderTest
describe "handle_data/2" do
test "handle_data handles KeepAlive with reply :now" do
state = %{@mock_state | step: :streaming}
wal_end = 12345
# Keepalive doesn't use this field meaningfully here
server_wal_start = 0
# Reply requested
reply_requested = 1
now_microseconds =
System.os_time(:microsecond) - DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond)
# 100 milliseconds tolerance for clock check
grace_period_microseconds = 100_000
keepalive_data = <<?k, wal_end::64, server_wal_start::64, reply_requested::8>>
# Expected reply format: 'r', confirmed_lsn::64, confirmed_lsn_commit::64, no_reply::8, high_priority::8, clock::64
# The actual implementation might construct the reply differently.
# This assertion needs to match the exact binary structure returned by handle_data.
# Let's assume the implementation sends back the received wal_end as confirmed LSNs,
# and the current time. The no_reply and high_priority flags might be 0.
assert {:reply, reply_binary, ^state} =
ReplicationConnection.handle_data(keepalive_data, state)
# Deconstruct the reply to verify its parts
assert <<?r, confirmed_lsn::64, confirmed_lsn_commit::64, no_reply::8, high_priority::8,
clock::64>> = reply_binary
assert confirmed_lsn == wal_end
# Or potentially server_wal_start? Check impl.
assert confirmed_lsn_commit == wal_end
assert no_reply == 0
assert high_priority == 0
assert now_microseconds <= clock < now_microseconds + grace_period_microseconds
end
test "handle_data handles KeepAlive with reply :later" do
state = %{@mock_state | step: :streaming}
wal_end = 54321
server_wal_start = 0
# No reply requested
reply_requested = 0
keepalive_data = <<?k, wal_end::64, server_wal_start::64, reply_requested::8>>
# When no reply is requested, it should return :noreply with no binary message
assert {:noreply, [], ^state} =
ReplicationConnection.handle_data(keepalive_data, state)
end
test "handle_data handles Write message (XLogData)" do
state = %{@mock_state | step: :streaming}
server_wal_start = 123_456_789
# This is the LSN of the end of the WAL data in this message
server_wal_end = 987_654_321
# Timestamp in microseconds since PG epoch
server_system_clock = 1_234_567_890
# Example decoded message data (e.g., a BEGIN message binary)
# This data should be passed to handle_info via send(self(), decoded_msg)
message_binary =
<<"B", @lsn_binary || <<0::64>>::binary-8, @timestamp_int || 0::integer-64,
@xid || 0::integer-32>>
write_data =
<<?w, server_wal_start::64, server_wal_end::64, server_system_clock::64,
message_binary::binary>>
# handle_data for 'w' should decode the message_binary and send it to self()
# It returns {:noreply, [], state} because the reply/acknowledgement happens
# via the KeepAlive ('k') mechanism.
assert {:noreply, [], ^state} = ReplicationConnection.handle_data(write_data, state)
# Assert that the decoded message was sent to self()
# Note: This requires the test process to receive the message.
# You might need `allow_receive` or similar testing patterns if handle_data
# directly uses `send`. If it calls another function that sends, test that function.
# Let's assume handle_data directly sends for this example.
# Need some sample data defined earlier for the assertion
@lsn_binary <<0::integer-32, 23_785_280::integer-32>>
@timestamp_int 704_521_200_000
@xid 1234
@timestamp_decoded ~U[2022-04-29 12:20:00.000000Z]
@lsn_decoded {0, 23_785_280}
expected_decoded_msg = %Messages.Begin{
final_lsn: @lsn_decoded,
commit_timestamp: @timestamp_decoded,
xid: @xid
}
assert_receive(^expected_decoded_msg)
end
test "handle_data handles unknown message type" do
state = %{@mock_state | step: :streaming}
# Using 'q' as an example unknown type
unknown_data = <<?q, 1, 2, 3>>
# Expect it to ignore unknown types and return noreply
assert {:noreply, [], ^state} = ReplicationConnection.handle_data(unknown_data, state)
# Optionally, assert that a warning was logged if applicable
end
end
# --- handle_info tests are CORRECTED below ---
describe "handle_info/2" do
test "handle_info updates relations on Relation message" do
state = @mock_state
# Use the correct fields from Messages.Relation struct
relation_msg = %Messages.Relation{
id: 101,
namespace: "public",
name: "accounts",
# Added replica_identity
replica_identity: :default,
columns: [
%Messages.Relation.Column{
flags: [:key],
name: "id",
type: "int4",
type_modifier: -1
},
%Messages.Relation.Column{
flags: [],
name: "name",
type: "text",
type_modifier: -1
}
]
}
# The state should store the relevant parts of the relation message, keyed by ID
expected_relation_data = %{
namespace: "public",
name: "accounts",
replica_identity: :default,
columns: [
%Messages.Relation.Column{
flags: [:key],
name: "id",
type: "int4",
type_modifier: -1
},
%Messages.Relation.Column{
flags: [],
name: "name",
type: "text",
type_modifier: -1
}
]
}
expected_relations = %{101 => expected_relation_data}
expected_state = %{state | relations: expected_relations}
assert {:noreply, ^expected_state} = ReplicationConnection.handle_info(relation_msg, state)
end
test "handle_info returns noreply for Insert message" do
# Pre-populate state with relation info if the handler needs it
state = %{
@mock_state
| relations: %{
101 => %{
name: "accounts",
namespace: "public",
columns: [
%Messages.Relation.Column{name: "id", type: "int4"},
%Messages.Relation.Column{name: "name", type: "text"}
]
}
}
}
# Use the correct field: tuple_data (which is a tuple)
insert_msg = %Messages.Insert{relation_id: 101, tuple_data: {1, "Alice"}}
# handle_info likely broadcasts or processes the insert, but returns noreply
assert {:noreply, ^state} = ReplicationConnection.handle_info(insert_msg, state)
# Add assertions here if handle_info is expected to send messages or call other funcs
end
test "handle_info returns noreply for Update message" do
state = %{
@mock_state
| relations: %{
101 => %{
name: "accounts",
namespace: "public",
columns: [
%Messages.Relation.Column{name: "id", type: "int4"},
%Messages.Relation.Column{name: "name", type: "text"}
]
}
}
}
# Use the correct fields: relation_id, old_tuple_data, tuple_data, changed_key_tuple_data
update_msg = %Messages.Update{
relation_id: 101,
# Example: only old data provided
old_tuple_data: {1, "Alice"},
# Example: new data
tuple_data: {1, "Bob"},
# Example: key didn't change or wasn't provided
changed_key_tuple_data: nil
}
assert {:noreply, ^state} = ReplicationConnection.handle_info(update_msg, state)
# Add assertions for side effects (broadcasts etc.) if needed
end
test "handle_info returns noreply for Delete message" do
state = %{
@mock_state
| relations: %{
101 => %{
name: "accounts",
namespace: "public",
columns: [
%Messages.Relation.Column{name: "id", type: "int4"},
%Messages.Relation.Column{name: "name", type: "text"}
]
}
}
}
# Use the correct fields: relation_id, old_tuple_data, changed_key_tuple_data
delete_msg = %Messages.Delete{
relation_id: 101,
# Example: old data provided
old_tuple_data: {1, "Bob"},
# Example: key data not provided
changed_key_tuple_data: nil
}
assert {:noreply, ^state} = ReplicationConnection.handle_info(delete_msg, state)
# Add assertions for side effects if needed
end
test "handle_info ignores Begin message" do
state = @mock_state
# Use correct fields: final_lsn, commit_timestamp, xid
begin_msg = %Messages.Begin{
final_lsn: {0, 123},
commit_timestamp: ~U[2023-01-01 10:00:00Z],
xid: 789
}
assert {:noreply, ^state} = ReplicationConnection.handle_info(begin_msg, state)
end
test "handle_info ignores Commit message" do
state = @mock_state
# Use correct fields: flags, lsn, end_lsn, commit_timestamp
commit_msg = %Messages.Commit{
flags: [],
lsn: {0, 123},
end_lsn: {0, 456},
commit_timestamp: ~U[2023-01-01 10:00:01Z]
}
assert {:noreply, ^state} = ReplicationConnection.handle_info(commit_msg, state)
end
test "handle_info ignores Origin message" do
state = @mock_state
# Use correct fields: origin_commit_lsn, name
origin_msg = %Messages.Origin{origin_commit_lsn: {0, 1}, name: "origin_name"}
assert {:noreply, ^state} = ReplicationConnection.handle_info(origin_msg, state)
end
test "handle_info ignores Truncate message" do
state = @mock_state
# Use correct fields: number_of_relations, options, truncated_relations
truncate_msg = %Messages.Truncate{
number_of_relations: 2,
options: [:cascade],
truncated_relations: [101, 102]
}
assert {:noreply, ^state} = ReplicationConnection.handle_info(truncate_msg, state)
end
test "handle_info ignores Type message" do
state = @mock_state
# Use correct fields: id, namespace, name
type_msg = %Messages.Type{id: 23, namespace: "pg_catalog", name: "int4"}
assert {:noreply, ^state} = ReplicationConnection.handle_info(type_msg, state)
end
test "handle_info returns noreply for Unsupported message" do
state = @mock_state
unsupported_msg = %Messages.Unsupported{data: <<1, 2, 3>>}
# We cannot easily verify Logger.warning was called without mocks/capture.
assert {:noreply, ^state} = ReplicationConnection.handle_info(unsupported_msg, state)
end
test "handle_info handles :shutdown message" do
state = @mock_state
# Expect :disconnect tuple based on common GenServer patterns for shutdown
assert {:stop, :normal, ^state} = ReplicationConnection.handle_info(:shutdown, state)
# Note: The original test asserted {:disconnect, :normal}. {:stop, :normal, state} is
# the standard GenServer return for a clean stop triggered by handle_info. Adjust
# if your implementation specifically returns :disconnect.
end
test "handle_info handles :DOWN message from monitored process" do
state = @mock_state
monitor_ref = make_ref()
# Example DOWN message structure
down_msg = {:DOWN, monitor_ref, :process, :some_pid, :shutdown}
# Expect the server to stop itself upon receiving DOWN for a critical process
assert {:stop, :normal, ^state} = ReplicationConnection.handle_info(down_msg, state)
# Again, adjust the expected return (:disconnect vs :stop) based on implementation.
end
test "handle_info ignores other messages" do
state = @mock_state
random_msg = {:some_other_info, "data"}
assert {:noreply, ^state} = ReplicationConnection.handle_info(random_msg, state)
end
end
# --- Moved handle_disconnect test to its own describe block ---
describe "handle_disconnect/1" do
test "handle_disconnect resets step to :disconnected and logs warning" do
state = %{@mock_state | step: :streaming}
expected_state = %{state | step: :disconnected}
# Capture log to verify warning (requires ExUnit config)
log_output =
ExUnit.CaptureLog.capture_log(fn ->
assert {:noreply, ^expected_state} = ReplicationConnection.handle_disconnect(state)
end)
assert log_output =~ "Replication connection disconnected."
# Or match the exact log message if needed
end
end
end