feat(portal): add batch-insert to change logs (#9733)

Inserting a change log incurs some minor overhead for sending query over
the network and reacting to its response. In many cases, this makes up
the bulk of the actual time it takes to run the change log insert.

To reduce this overhead and avoid any kind of processing delay in the
WAL consumers, we introduce batch insert functionality with size `500`
and timeout `30` seconds. If either of those two are hit, we flush the
batch using `insert_all`.

`insert_all` does not use `Ecto.Changeset`, so we need to be a bit more
careful about the data we insert, and check the inserted LSNs to
determine what to update the acknowledged LSN pointer to.

The functionality to determine when to call the new `on_flush/1`
callback lives in the replication_connection module, but the actual
behavior of `on_flush/1` is left to the child modules to implement. The
`Events.ReplicationConnection` module does not use flush behavior, and
so does not override the defaults, which is not to use a flush
mechanism.

Related: #949
This commit is contained in:
Jamil
2025-07-05 12:03:28 -07:00
committed by GitHub
parent c48ed2a1a0
commit b20c141759
14 changed files with 1669 additions and 1465 deletions

View File

@@ -2,9 +2,11 @@ defmodule Domain.ChangeLogs do
alias Domain.ChangeLogs.ChangeLog
alias Domain.Repo
def create_change_log(attrs) do
attrs
|> ChangeLog.Changeset.changeset()
|> Repo.insert()
def bulk_insert(list_of_attrs) do
ChangeLog
|> Repo.insert_all(list_of_attrs,
on_conflict: :nothing,
conflict_target: [:lsn]
)
end
end

View File

@@ -1,81 +0,0 @@
defmodule Domain.ChangeLogs.ChangeLog.Changeset do
use Domain, :changeset
@fields ~w[account_id lsn table op old_data data vsn]a
def changeset(attrs) do
%Domain.ChangeLogs.ChangeLog{}
|> cast(attrs, @fields)
|> validate_inclusion(:op, [:insert, :update, :delete])
|> validate_correct_data_present()
|> validate_same_account()
|> put_account_id()
|> validate_required([:account_id, :lsn, :table, :op, :vsn])
|> unique_constraint(:lsn)
|> foreign_key_constraint(:account_id, name: :change_logs_account_id_fkey)
end
# :insert requires old_data = nil and data != nil
# :update requires old_data != nil and data != nil
# :delete requires old_data != nil and data = nil
def validate_correct_data_present(changeset) do
op = get_field(changeset, :op)
old_data = get_field(changeset, :old_data)
data = get_field(changeset, :data)
case {op, old_data, data} do
{:insert, nil, %{} = _data} ->
changeset
{:update, %{} = _old_data, %{} = _data} ->
changeset
{:delete, %{} = _old_data, nil} ->
changeset
_ ->
add_error(changeset, :base, "Invalid combination of operation and data")
end
end
# Add an error if data["account_id"] != old_data["account_id"]
defp validate_same_account(changeset) do
old_data = get_field(changeset, :old_data)
data = get_field(changeset, :data)
account_id_key = account_id_field(changeset)
if old_data && data && old_data[account_id_key] != data[account_id_key] do
add_error(changeset, :base, "Account ID cannot be changed")
else
changeset
end
end
# Populate account_id from one of data, old_data
defp put_account_id(changeset) do
old_data = get_field(changeset, :old_data)
data = get_field(changeset, :data)
account_id_key = account_id_field(changeset)
account_id =
case {old_data, data} do
{nil, nil} -> nil
{_, %{^account_id_key => id}} -> id
{%{^account_id_key => id}, _} -> id
_ -> nil
end
put_change(changeset, :account_id, account_id)
end
# For accounts table updates, the account_id is in the "id" field
# For other tables, it is in the "account_id" field
defp account_id_field(changeset) do
case get_field(changeset, :table) do
"accounts" -> "id"
_ -> "account_id"
end
end
end

View File

@@ -1,90 +1,83 @@
defmodule Domain.ChangeLogs.ReplicationConnection do
use Domain.Replication.Connection
alias Domain.ChangeLogs
use Domain.Replication.Connection,
# Allow up to 5 minutes of processing lag before alerting. This needs to be able to survive
# deploys without alerting.
warning_threshold_ms: 5 * 60 * 1_000,
# 1 month in ms - we never want to bypass changelog inserts
error_threshold_ms: 30 * 24 * 60 * 60 * 1_000
# Bump this to signify a change in the audit log schema. Use with care.
@vsn 0
def on_insert(lsn, table, data) do
log(:insert, lsn, table, nil, data)
# Avoid overwhelming the change log with soft-deleted records getting hard-deleted en masse.
# Can be removed after https://github.com/firezone/firezone/issues/8187 is shipped.
def on_write(state, _lsn, :delete, _table, %{"deleted_at" => deleted_at}, _data)
when not is_nil(deleted_at) do
state
end
def on_update(lsn, table, old_data, data) do
log(:update, lsn, table, old_data, data)
# Ignore token writes for relay_groups since these are not expected to have an account_id
def on_write(state, _lsn, _op, "tokens", %{"type" => "relay_group"}, _data), do: state
def on_write(state, _lsn, _op, "tokens", _old_data, %{"type" => "relay_group"}), do: state
# Handle accounts specially
def on_write(state, lsn, op, "accounts", old_data, %{"id" => account_id} = data) do
buffer(state, lsn, op, "accounts", account_id, old_data, data)
end
def on_delete(lsn, table, old_data) do
if is_nil(old_data["deleted_at"]) do
log(:delete, lsn, table, old_data, nil)
else
# Avoid overwhelming the change log with soft-deleted records getting hard-deleted en masse.
# Can be removed after https://github.com/firezone/firezone/issues/8187 is shipped.
:ok
end
# Handle other writes where an account_id is present
def on_write(state, lsn, op, table, old_data, %{"account_id" => account_id} = data)
when not is_nil(account_id) do
buffer(state, lsn, op, table, account_id, old_data, data)
end
# Relay group tokens don't have account_ids
defp log(_op, _lsn, "tokens", %{"type" => "relay_group"}, _data) do
:ok
def on_write(state, lsn, op, table, %{"account_id" => account_id} = old_data, data)
when not is_nil(account_id) do
buffer(state, lsn, op, table, account_id, old_data, data)
end
defp log(_op, _lsn, "tokens", _old_data, %{"type" => "relay_group"}) do
:ok
end
defp log(op, lsn, table, old_data, data) do
attrs = %{
op: op,
# If we get here, raise the alarm as it means we encountered a change we didn't expect.
def on_write(state, lsn, op, table, _old_data, _data) do
Logger.error(
"Unexpected write operation!",
lsn: lsn,
table: table,
old_data: old_data,
data: data,
vsn: @vsn
}
op: op,
table: table
)
case ChangeLogs.create_change_log(attrs) do
{:ok, _change_log} ->
:ok
{:error, %Ecto.Changeset{errors: errors} = changeset} ->
if Enum.any?(errors, &should_skip_change_log?/1) do
# Expected under normal operation when an account is deleted or we are catching up on
# already-processed but not acknowledged WAL data.
:ok
else
Logger.warning("Failed to create change log",
errors: inspect(changeset.errors),
table: table,
op: op,
lsn: lsn,
vsn: @vsn
)
# TODO: WAL
# Don't ignore failures to insert change logs. Improve this after we have some
# operational experience with the data flowing in here.
:ok
end
end
state
end
defp should_skip_change_log?({:account_id, {"does not exist", _violations}}) do
true
def on_flush(%{flush_buffer: flush_buffer} = state) when map_size(flush_buffer) == 0, do: state
def on_flush(state) do
to_insert = Map.values(state.flush_buffer)
attempted_count = Enum.count(state.flush_buffer)
{successful_count, _change_logs} = ChangeLogs.bulk_insert(to_insert)
Logger.info("Flushed #{successful_count}/#{attempted_count} change logs")
# We always advance the LSN to the highest LSN in the flush buffer because
# LSN conflicts just mean the data is already inserted, and other insert_all
# issues like a missing account_id will raise an exception.
last_lsn =
state.flush_buffer
|> Map.keys()
|> Enum.max()
%{state | flush_buffer: %{}, last_flushed_lsn: last_lsn}
end
defp should_skip_change_log?({:lsn, {"has already been taken", _violations}}) do
true
end
defp buffer(state, lsn, op, table, account_id, old_data, data) do
flush_buffer =
state.flush_buffer
|> Map.put_new(lsn, %{
lsn: lsn,
op: op,
table: table,
account_id: account_id,
old_data: old_data,
data: data,
vsn: @vsn
})
defp should_skip_change_log?(_error) do
false
%{state | flush_buffer: flush_buffer}
end
end

View File

@@ -1,15 +1,7 @@
defmodule Domain.Events.ReplicationConnection do
use Domain.Replication.Connection
alias Domain.Events.Hooks
use Domain.Replication.Connection,
# Allow up to 60 seconds of lag before alerting
warning_threshold_ms: 60 * 1_000,
# Allow up to 30 minutes of lag before bypassing hooks
error_threshold_ms: 30 * 60 * 1_000
require Logger
@tables_to_hooks %{
"accounts" => Hooks.Accounts,
"actor_group_memberships" => Hooks.ActorGroupMemberships,
@@ -26,37 +18,18 @@ defmodule Domain.Events.ReplicationConnection do
"tokens" => Hooks.Tokens
}
def on_insert(_lsn, table, data) do
hook = Map.get(@tables_to_hooks, table)
if hook do
hook.on_insert(data)
def on_write(state, _lsn, op, table, old_data, data) do
if hook = Map.get(@tables_to_hooks, table) do
case op do
:insert -> hook.on_insert(data)
:update -> hook.on_update(old_data, data)
:delete -> hook.on_delete(old_data)
end
else
log_warning(:insert, table)
:ok
log_warning(op, table)
end
end
def on_update(_lsn, table, old_data, data) do
hook = Map.get(@tables_to_hooks, table)
if hook do
hook.on_update(old_data, data)
else
log_warning(:update, table)
:ok
end
end
def on_delete(_lsn, table, old_data) do
hook = Map.get(@tables_to_hooks, table)
if hook do
hook.on_delete(old_data)
else
log_warning(:delete, table)
:ok
end
state
end
defp log_warning(op, table) do

View File

@@ -15,17 +15,8 @@ defmodule Domain.Replication.Connection do
## Usage
defmodule MyApp.ReplicationConnection do
use Domain.Replication.Connection,
status_log_interval_s: 60,
warning_threshold_ms: 30_000,
error_threshold_ms: 60 * 1_000
use Domain.Replication.Connection
end
## Options
* `:warning_threshold_ms`: How long to allow the WAL stream to lag before logging a warning
* `:error_threshold_ms`: How long to allow the WAL stream to lag before logging an error and
bypassing side effect handlers.
"""
defmacro __using__(opts \\ []) do
@@ -65,10 +56,6 @@ defmodule Domain.Replication.Connection do
# Extract struct definition and constants
defp struct_and_constants(opts) do
quote bind_quoted: [opts: opts] do
@warning_threshold_ms Keyword.fetch!(opts, :warning_threshold_ms)
@error_threshold_ms Keyword.fetch!(opts, :error_threshold_ms)
@status_log_interval :timer.seconds(Keyword.get(opts, :status_log_interval_s, 60))
# Everything else uses defaults
@schema "public"
@output_plugin "pgoutput"
@@ -93,19 +80,56 @@ defmodule Domain.Replication.Connection do
table_subscriptions: list(),
relations: map(),
counter: integer(),
tables_to_remove: MapSet.t()
tables_to_remove: MapSet.t(),
flush_interval: integer(),
flush_buffer: map(),
last_flushed_lsn: integer(),
warning_threshold_exceeded?: boolean(),
error_threshold_exceeded?: boolean(),
flush_buffer_size: integer(),
status_log_interval: integer(),
warning_threshold: integer(),
error_threshold: integer()
}
defstruct schema: @schema,
step: :disconnected,
publication_name: nil,
replication_slot_name: nil,
output_plugin: @output_plugin,
proto_version: @proto_version,
table_subscriptions: [],
relations: %{},
counter: 0,
tables_to_remove: MapSet.new()
defstruct(
# schema to use for the publication
schema: @schema,
# starting step
step: :disconnected,
# publication name to check/create
publication_name: nil,
# replication slot name to check/create
replication_slot_name: nil,
# output plugin to use for logical replication
output_plugin: @output_plugin,
# protocol version to use for logical replication
proto_version: @proto_version,
# tables we want to subscribe to in the publication
table_subscriptions: [],
# relations we have seen so far
relations: %{},
# counter for the number of messages processed
counter: 0,
# calculated tables to remove from the publication
tables_to_remove: MapSet.new(),
# flush interval in milliseconds, set to 0 to use immediate processing
flush_interval: 0,
# buffer for data to flush
flush_buffer: %{},
# last flushed LSN, used to track progress while flushing
last_flushed_lsn: 0,
# flags to track if we have exceeded warning/error thresholds
warning_threshold_exceeded?: false,
error_threshold_exceeded?: false,
# size of the flush buffer, used to determine when to flush
flush_buffer_size: 0,
# interval for logging status updates
status_log_interval: :timer.minutes(1),
# thresholds for warning and error logging
warning_threshold: :timer.seconds(30),
error_threshold: :timer.seconds(60)
)
end
end
@@ -119,10 +143,9 @@ defmodule Domain.Replication.Connection do
@impl true
def init(state) do
state =
state
|> Map.put(:warning_threshold_exceeded?, false)
|> Map.put(:error_threshold_exceeded?, false)
if state.flush_interval > 0 do
Process.send_after(self(), :flush, state.flush_interval)
end
{:ok, state}
end
@@ -365,7 +388,14 @@ defmodule Domain.Replication.Connection do
def handle_data(data, state) when is_keep_alive(data) do
%KeepAlive{reply: reply, wal_end: wal_end} = parse(data)
wal_end = wal_end + 1
wal_end =
if state.flush_interval > 0 do
# If we are flushing, we use the tracked last_flushed_lsn
state.last_flushed_lsn + 1
else
# Otherwise, we assume to be current with the server
wal_end + 1
end
message =
case reply do
@@ -382,7 +412,7 @@ defmodule Domain.Replication.Connection do
message
|> decode_message()
|> handle_message(server_wal_end, %{state | counter: state.counter + 1})
|> handle_write(server_wal_end, %{state | counter: state.counter + 1})
end
end
@@ -392,7 +422,7 @@ defmodule Domain.Replication.Connection do
state: inspect(state)
)
{:noreply, [], state}
{:noreply, state}
end
end
end
@@ -402,14 +432,15 @@ defmodule Domain.Replication.Connection do
quote do
# Handles messages received:
#
# 1. Insert/Update/Delete/Begin/Commit - send to appropriate hook
# 2. Relation messages - store the relation data in our state so we can use it later
# 1. Insert/Update/Delete - send to on_write/5
# 2. Begin - check how far we are lagging behind
# 3. Relation messages - store the relation data in our state so we can use it later
# to associate column names etc with the data we receive. In practice, we'll always
# see a Relation message before we see any data for that relation.
# 3. Origin/Truncate/Type - we ignore these messages for now
# 4. Graceful shutdown - we respond with {:disconnect, :normal} to
# 4. Origin/Truncate/Type - we ignore these messages for now
# 5. Graceful shutdown - we respond with {:disconnect, :normal} to
# indicate that we are shutting down gracefully and prevent auto reconnecting.
defp handle_message(
defp handle_write(
%Decoder.Messages.Relation{
id: id,
namespace: namespace,
@@ -425,67 +456,57 @@ defmodule Domain.Replication.Connection do
columns: columns
}
{:noreply, ack(server_wal_end),
%{state | relations: Map.put(state.relations, id, relation)}}
{:noreply, %{state | relations: Map.put(state.relations, id, relation)}}
end
defp handle_message(%Decoder.Messages.Insert{} = msg, server_wal_end, state) do
unless state.error_threshold_exceeded? do
{op, table, _old_data, data} = transform(msg, state.relations)
:ok = on_insert(server_wal_end, table, data)
end
{:noreply, ack(server_wal_end), state}
defp handle_write(%Decoder.Messages.Insert{} = msg, server_wal_end, state) do
process_write(msg, server_wal_end, state)
end
defp handle_message(%Decoder.Messages.Update{} = msg, server_wal_end, state) do
unless state.error_threshold_exceeded? do
{op, table, old_data, data} = transform(msg, state.relations)
:ok = on_update(server_wal_end, table, old_data, data)
end
{:noreply, ack(server_wal_end), state}
defp handle_write(%Decoder.Messages.Update{} = msg, server_wal_end, state) do
process_write(msg, server_wal_end, state)
end
defp handle_message(%Decoder.Messages.Delete{} = msg, server_wal_end, state) do
unless state.error_threshold_exceeded? do
{op, table, old_data, _data} = transform(msg, state.relations)
:ok = on_delete(server_wal_end, table, old_data)
end
{:noreply, ack(server_wal_end), state}
defp handle_write(%Decoder.Messages.Delete{} = msg, server_wal_end, state) do
process_write(msg, server_wal_end, state)
end
defp ack(server_wal_end) do
wal_end = server_wal_end + 1
standby_status(wal_end, wal_end, wal_end, :now)
defp process_write(_msg, _server_wal_end, %{error_threshold_exceeded?: true} = state) do
{:noreply, state}
end
defp process_write(msg, server_wal_end, state) do
{op, table, old_data, data} = transform(msg, state.relations)
state
|> on_write(server_wal_end, op, table, old_data, data)
|> maybe_flush()
|> then(&{:noreply, &1})
end
defp maybe_flush(%{flush_buffer: buffer, flush_buffer_size: size} = state)
when map_size(buffer) >= size do
on_flush(state)
end
defp maybe_flush(state), do: state
end
end
# Extract transaction and ignored message handlers
defp transaction_handlers do
quote do
defp handle_message(
defp handle_write(
%Decoder.Messages.Begin{commit_timestamp: commit_timestamp} = msg,
server_wal_end,
state
) do
# Since we receive a commit for each operation and we process each operation
# one-by-one, we can use the commit timestamp to check if we are lagging behind.
# We can use the commit timestamp to check how far we are lagging behind
lag_ms = DateTime.diff(DateTime.utc_now(), commit_timestamp, :millisecond)
send(self(), {:check_warning_threshold, lag_ms})
send(self(), {:check_error_threshold, lag_ms})
{:noreply, ack(server_wal_end), state}
end
defp handle_message(
%Decoder.Messages.Commit{commit_timestamp: commit_timestamp},
server_wal_end,
state
) do
{:noreply, ack(server_wal_end), state}
{:noreply, state}
end
end
end
@@ -494,25 +515,30 @@ defmodule Domain.Replication.Connection do
defp ignored_message_handlers do
quote do
# These messages are not relevant for our use case, so we ignore them.
defp handle_message(%Decoder.Messages.Origin{}, server_wal_end, state) do
{:noreply, ack(server_wal_end), state}
defp handle_write(%Decoder.Messages.Commit{}, _server_wal_end, state) do
{:noreply, state}
end
defp handle_message(%Decoder.Messages.Truncate{}, server_wal_end, state) do
{:noreply, ack(server_wal_end), state}
defp handle_write(%Decoder.Messages.Origin{}, _server_wal_end, state) do
{:noreply, state}
end
defp handle_message(%Decoder.Messages.Type{}, server_wal_end, state) do
{:noreply, ack(server_wal_end), state}
defp handle_write(%Decoder.Messages.Truncate{}, _server_wal_end, state) do
{:noreply, state}
end
defp handle_message(%Decoder.Messages.Unsupported{data: data}, server_wal_end, state) do
defp handle_write(%Decoder.Messages.Type{}, _server_wal_end, state) do
{:noreply, state}
end
defp handle_write(%Decoder.Messages.Unsupported{data: data}, _server_wal_end, state) do
Logger.warning("#{__MODULE__}: Unsupported message received",
data: inspect(data),
counter: state.counter
)
{:noreply, ack(server_wal_end), state}
{:noreply, state}
end
end
end
@@ -559,25 +585,24 @@ defmodule Domain.Replication.Connection do
end
end
# Extract info handlers
defp info_handlers(opts) do
quote bind_quoted: [opts: opts] do
@impl true
def handle_info(
{:check_warning_threshold, lag_ms},
%{warning_threshold_exceeded?: false} = state
%{warning_threshold_exceeded?: false, warning_threshold: warning_threshold} = state
)
when lag_ms >= @warning_threshold_ms do
when lag_ms >= warning_threshold do
Logger.warning("#{__MODULE__}: Processing lag exceeds warning threshold", lag_ms: lag_ms)
{:noreply, %{state | warning_threshold_exceeded?: true}}
end
def handle_info(
{:check_warning_threshold, lag_ms},
%{warning_threshold_exceeded?: true} = state
%{warning_threshold_exceeded?: true, warning_threshold: warning_threshold} = state
)
when lag_ms < @warning_threshold_ms do
when lag_ms < warning_threshold do
Logger.info("#{__MODULE__}: Processing lag is back below warning threshold",
lag_ms: lag_ms
)
@@ -587,9 +612,9 @@ defmodule Domain.Replication.Connection do
def handle_info(
{:check_error_threshold, lag_ms},
%{error_threshold_exceeded?: false} = state
%{error_threshold_exceeded?: false, error_threshold: error_threshold} = state
)
when lag_ms >= @error_threshold_ms do
when lag_ms >= error_threshold do
Logger.error(
"#{__MODULE__}: Processing lag exceeds error threshold; skipping side effects!",
lag_ms: lag_ms
@@ -600,9 +625,9 @@ defmodule Domain.Replication.Connection do
def handle_info(
{:check_error_threshold, lag_ms},
%{error_threshold_exceeded?: true} = state
%{error_threshold_exceeded?: true, error_threshold: error_threshold} = state
)
when lag_ms < @error_threshold_ms do
when lag_ms < error_threshold do
Logger.info("#{__MODULE__}: Processing lag is back below error threshold", lag_ms: lag_ms)
{:noreply, %{state | error_threshold_exceeded?: false}}
end
@@ -612,11 +637,17 @@ defmodule Domain.Replication.Connection do
"#{__MODULE__}: Processed #{state.counter} write messages from the WAL stream"
)
Process.send_after(self(), :interval_logger, @status_log_interval)
Process.send_after(self(), :interval_logger, state.status_log_interval)
{:noreply, state}
end
def handle_info(:flush, state) do
Process.send_after(self(), :flush, state.flush_interval)
{:noreply, on_flush(state)}
end
def handle_info(:shutdown, _), do: {:disconnect, :normal}
def handle_info({:DOWN, _, :process, _, _}, _), do: {:disconnect, :normal}
@@ -628,11 +659,10 @@ defmodule Domain.Replication.Connection do
defp default_callbacks do
quote do
# Default implementations for required callbacks - modules using this should implement these
def on_insert(_lsn, _table, _data), do: :ok
def on_update(_lsn, _table, _old_data, _data), do: :ok
def on_delete(_lsn, _table, _old_data), do: :ok
def on_write(state, _lsn, _op, _table, _old_data, _data), do: state
def on_flush(state), do: state
defoverridable on_insert: 3, on_update: 4, on_delete: 3
defoverridable on_write: 6, on_flush: 1
end
end
end

View File

@@ -0,0 +1,17 @@
defmodule Domain.Repo.Migrations.SetDefaultsOnChangeLogs do
use Ecto.Migration
def up do
alter table(:change_logs) do
modify(:inserted_at, :utc_datetime_usec, default: fragment("now()"))
modify(:id, :binary_id, default: fragment("gen_random_uuid()"))
end
end
def down do
alter table(:change_logs) do
modify(:inserted_at, :utc_datetime_usec, default: nil)
modify(:id, :binary_id, default: nil)
end
end
end

View File

@@ -0,0 +1,13 @@
defmodule Domain.Repo.Migrations.DropForeignKeyConstraintOnChangeLogs do
use Ecto.Migration
def up do
drop(constraint(:change_logs, :change_logs_account_id_fkey))
end
def down do
alter table(:change_logs) do
modify(:account_id, references(:accounts, type: :binary_id, on_delete: :delete_all))
end
end
end

View File

@@ -0,0 +1,18 @@
defmodule Domain.Repo.Migrations.AddChangeLogsDataConstraint do
use Ecto.Migration
def change do
create(
constraint(:change_logs, :valid_data_for_operation,
check: """
CASE op
WHEN 'insert' THEN data IS NOT NULL AND old_data IS NULL
WHEN 'update' THEN data IS NOT NULL AND old_data IS NOT NULL
WHEN 'delete' THEN data IS NULL AND old_data IS NOT NULL
ELSE false
END
"""
)
)
end
end

View File

@@ -2,15 +2,15 @@ defmodule Domain.ChangeLogsTest do
use Domain.DataCase, async: true
import Domain.ChangeLogs
describe "create/1" do
describe "bulk_insert/1" do
setup do
account = Fixtures.Accounts.create_account()
%{account: account}
end
test "inserts a change_log for an account", %{account: account} do
test "skips duplicate lsn", %{account: account} do
attrs = %{
account_id: account.id,
lsn: 1,
table: "resources",
op: :insert,
@@ -19,152 +19,16 @@ defmodule Domain.ChangeLogsTest do
vsn: 1
}
assert {:ok, %Domain.ChangeLogs.ChangeLog{} = change_log} = create_change_log(attrs)
assert change_log.account_id == account.id
assert change_log.op == :insert
assert change_log.old_data == nil
assert change_log.data == %{"account_id" => account.id, "key" => "value"}
end
test "uses the 'id' field accounts table updates", %{account: account} do
attrs = %{
lsn: 1,
table: "accounts",
op: :update,
old_data: %{"id" => account.id, "name" => "Old Name"},
data: %{"id" => account.id, "name" => "New Name"},
vsn: 1
}
assert {:ok, %Domain.ChangeLogs.ChangeLog{} = change_log} = create_change_log(attrs)
assert change_log.account_id == account.id
assert change_log.op == :update
assert change_log.old_data == %{"id" => account.id, "name" => "Old Name"}
assert change_log.data == %{"id" => account.id, "name" => "New Name"}
end
test "requires vsn field", %{account: account} do
attrs = %{
lsn: 1,
table: "resources",
op: :insert,
old_data: nil,
data: %{"account_id" => account.id, "key" => "value"}
}
assert {:error, changeset} = create_change_log(attrs)
assert changeset.valid? == false
assert changeset.errors[:vsn] == {"can't be blank", [validation: :required]}
end
test "requires table field", %{account: account} do
attrs = %{
lsn: 1,
op: :insert,
old_data: nil,
data: %{"account_id" => account.id, "key" => "value"},
vsn: 1
}
assert {:error, changeset} = create_change_log(attrs)
assert changeset.valid? == false
assert changeset.errors[:table] == {"can't be blank", [validation: :required]}
end
test "prevents inserting duplicate lsn", %{account: account} do
attrs = %{
lsn: 1,
table: "resources",
op: :insert,
old_data: nil,
data: %{"account_id" => account.id, "key" => "value"},
vsn: 1
}
assert {:ok, _change_log} = create_change_log(attrs)
assert {1, nil} = bulk_insert([attrs])
# Try to insert with same LSN but different data
dupe_lsn_attrs = Map.put(attrs, :data, %{"account_id" => account.id, "key" => "new_value"})
assert {:error, changeset} = create_change_log(dupe_lsn_attrs)
assert changeset.valid? == false
assert changeset.errors[:lsn] ==
{"has already been taken",
[constraint: :unique, constraint_name: "change_logs_lsn_index"]}
assert {0, nil} = bulk_insert([dupe_lsn_attrs])
end
test "requires op field to be one of :insert, :update, :delete", %{account: account} do
attrs = %{
lsn: 1,
table: "resources",
op: :invalid_op,
old_data: nil,
data: %{"account_id" => account.id, "key" => "value"},
vsn: 1
}
assert {:error, changeset} = create_change_log(attrs)
assert changeset.valid? == false
assert {"is invalid", errors} = changeset.errors[:op]
assert {:validation, :inclusion} in errors
end
test "requires correct combination of operation and data", %{account: account} do
# Invalid combination: :insert with old_data present
attrs = %{
lsn: 1,
table: "resources",
op: :insert,
old_data: %{"account_id" => account.id, "key" => "old_value"},
data: %{"account_id" => account.id, "key" => "new_value"},
vsn: 1
}
assert {:error, changeset} = create_change_log(attrs)
assert changeset.valid? == false
assert changeset.errors[:base] == {"Invalid combination of operation and data", []}
# Valid combination: :insert with old_data nil and data present
attrs = %{
lsn: 1,
table: "resources",
op: :insert,
old_data: nil,
data: %{"account_id" => account.id, "key" => "new_value"},
vsn: 1
}
assert {:ok, _change_log} = create_change_log(attrs)
# Valid combination: :update with both old_data and data present
attrs = %{
lsn: 2,
table: "resources",
op: :update,
old_data: %{"account_id" => account.id, "key" => "old_value"},
data: %{"account_id" => account.id, "key" => "new_value"},
vsn: 1
}
assert {:ok, _change_log} = create_change_log(attrs)
# Valid combination: :delete with old_data present and data nil
attrs = %{
lsn: 3,
table: "resources",
op: :delete,
old_data: %{"account_id" => account.id, "key" => "old_value"},
data: nil,
vsn: 1
}
assert {:ok, _change_log} = create_change_log(attrs)
end
test "requires account_id to be populated from old_data or data" do
test "raises not null constraint when account_id is missing" do
attrs = %{
# account_id is missing
lsn: 1,
table: "resources",
op: :insert,
@@ -173,26 +37,97 @@ defmodule Domain.ChangeLogsTest do
vsn: 1
}
assert {:error, changeset} = create_change_log(attrs)
assert changeset.valid? == false
assert changeset.errors[:account_id] == {"can't be blank", [validation: :required]}
assert_raise Postgrex.Error,
~r/null value in column "account_id".*violates not-null constraint/,
fn ->
bulk_insert([attrs])
end
end
test "requires old_data[\"account_id\"] and data[\"account_id\"] to match", %{
account: account
} do
test "raises not null constraint when table is missing", %{account: account} do
attrs = %{
account_id: account.id,
lsn: 1,
table: "resources",
op: :update,
old_data: %{"account_id" => account.id, "key" => "old_value"},
data: %{"account_id" => "different_account_id", "key" => "new_value"},
# table is missing
op: :insert,
old_data: nil,
data: %{"key" => "value"},
vsn: 1
}
assert {:error, changeset} = create_change_log(attrs)
assert changeset.valid? == false
assert changeset.errors[:base] == {"Account ID cannot be changed", []}
assert_raise Postgrex.Error,
~r/null value in column "table".*violates not-null constraint/,
fn ->
bulk_insert([attrs])
end
end
test "raises not null constraint when op is missing", %{account: account} do
attrs = %{
account_id: account.id,
lsn: 1,
table: "resources",
# op is missing
old_data: nil,
data: %{"key" => "value"},
vsn: 1
}
assert_raise Postgrex.Error,
~r/null value in column "op".*violates not-null constraint/,
fn ->
bulk_insert([attrs])
end
end
test "enforces data constraints based on operation type", %{account: account} do
# Invalid insert (has old_data)
assert_raise Postgrex.Error, ~r/valid_data_for_operation/, fn ->
bulk_insert([
%{
account_id: account.id,
lsn: 1,
table: "resources",
op: :insert,
# Should be null for insert
old_data: %{"id" => "123"},
data: %{"id" => "123"},
vsn: 1
}
])
end
# Invalid update (missing old_data)
assert_raise Postgrex.Error, ~r/valid_data_for_operation/, fn ->
bulk_insert([
%{
account_id: account.id,
lsn: 2,
table: "resources",
op: :update,
# Should not be null for update
old_data: nil,
data: %{"id" => "123"},
vsn: 1
}
])
end
# Invalid delete (has data)
assert_raise Postgrex.Error, ~r/valid_data_for_operation/, fn ->
bulk_insert([
%{
account_id: account.id,
lsn: 3,
table: "resources",
op: :delete,
old_data: %{"id" => "123"},
# Should be null for delete
data: %{"id" => "123"},
vsn: 1
}
])
end
end
end
end

View File

@@ -2,7 +2,7 @@ defmodule Domain.Events.ReplicationConnectionTest do
use ExUnit.Case, async: true
import ExUnit.CaptureLog
import Domain.Events.ReplicationConnection
alias Domain.Events.ReplicationConnection
setup do
tables =
@@ -12,14 +12,15 @@ defmodule Domain.Events.ReplicationConnectionTest do
%{tables: tables}
end
describe "on_insert/2" do
describe "on_write/6 for inserts" do
test "logs warning for unknown table" do
table = "unknown_table"
data = %{"id" => Ecto.UUID.generate(), "name" => "test"}
log_output =
capture_log(fn ->
assert :ok = on_insert(0, table, data)
result = ReplicationConnection.on_write(%{}, 0, :insert, table, nil, data)
assert result == %{}
end)
assert log_output =~ "No hook defined for insert on table unknown_table"
@@ -33,9 +34,8 @@ defmodule Domain.Events.ReplicationConnectionTest do
# The actual hook call might fail if the hook modules aren't available,
# but we can test that our routing logic works
try do
result = on_insert(0, table, data)
# Should either succeed or fail gracefully
assert result in [:ok, :error] or match?({:error, _}, result)
result = ReplicationConnection.on_write(%{}, 0, :insert, table, nil, data)
assert result == %{}
rescue
# Depending on the shape of the data we might get a function clause error. This is ok,
# as we are testing the routing logic, not the actual hook implementations.
@@ -50,7 +50,9 @@ defmodule Domain.Events.ReplicationConnectionTest do
log_output =
capture_log(fn ->
try do
on_insert(0, table, %{"id" => Ecto.UUID.generate()})
ReplicationConnection.on_write(%{}, 0, :insert, table, nil, %{
"id" => Ecto.UUID.generate()
})
rescue
FunctionClauseError ->
# Shape of the data might not match the expected one, which is fine
@@ -63,15 +65,16 @@ defmodule Domain.Events.ReplicationConnectionTest do
end
end
describe "on_update/3" do
describe "on_write/6 for updates" do
test "logs warning for unknown table" do
table = "unknown_table"
old_data = %{"id" => Ecto.UUID.generate(), "name" => "old"}
data = %{"id" => Ecto.UUID.generate(), "name" => "new"}
data = %{"id" => old_data["id"], "name" => "new"}
log_output =
capture_log(fn ->
assert :ok = on_update(0, table, old_data, data)
result = ReplicationConnection.on_write(%{}, 0, :update, table, old_data, data)
assert result == %{}
end)
assert log_output =~ "No hook defined for update on table unknown_table"
@@ -80,12 +83,12 @@ defmodule Domain.Events.ReplicationConnectionTest do
test "handles known tables", %{tables: tables} do
old_data = %{"id" => Ecto.UUID.generate(), "name" => "old name"}
data = %{"id" => Ecto.UUID.generate(), "name" => "new name"}
data = %{"id" => old_data["id"], "name" => "new name"}
for table <- tables do
try do
result = on_update(0, table, old_data, data)
assert result in [:ok, :error] or match?({:error, _}, result)
result = ReplicationConnection.on_write(%{}, 0, :update, table, old_data, data)
assert result == %{}
rescue
FunctionClauseError ->
# Shape of the data might not match the expected one, which is fine
@@ -95,14 +98,15 @@ defmodule Domain.Events.ReplicationConnectionTest do
end
end
describe "on_delete/2" do
describe "on_write/6 for deletes" do
test "logs warning for unknown table" do
table = "unknown_table"
old_data = %{"id" => Ecto.UUID.generate(), "name" => "deleted"}
log_output =
capture_log(fn ->
assert :ok = on_delete(0, table, old_data)
result = ReplicationConnection.on_write(%{}, 0, :delete, table, old_data, nil)
assert result == %{}
end)
assert log_output =~ "No hook defined for delete on table unknown_table"
@@ -110,11 +114,12 @@ defmodule Domain.Events.ReplicationConnectionTest do
end
test "handles known tables", %{tables: tables} do
old_data = %{"id" => Ecto.UUID.generate(), "name" => "deleted gateway"}
old_data = %{"id" => Ecto.UUID.generate(), "name" => "deleted item"}
for table <- tables do
try do
assert :ok = on_delete(0, table, old_data)
result = ReplicationConnection.on_write(%{}, 0, :delete, table, old_data, nil)
assert result == %{}
rescue
# Shape of the data might not match the expected one, which is fine
FunctionClauseError -> :ok
@@ -123,31 +128,132 @@ defmodule Domain.Events.ReplicationConnectionTest do
end
end
describe "operation routing" do
test "routes to correct hook based on operation type" do
# Test that we dispatch to the right operation
# Since we can't directly test the hook calls without the actual hook modules,
# we can at least verify the routing logic doesn't crash
state = %{}
table = "accounts"
# Insert
try do
result = ReplicationConnection.on_write(state, 1, :insert, table, nil, %{"id" => "123"})
assert result == state
rescue
FunctionClauseError -> :ok
end
# Update
try do
result =
ReplicationConnection.on_write(state, 2, :update, table, %{"id" => "123"}, %{
"id" => "123",
"updated" => true
})
assert result == state
rescue
FunctionClauseError -> :ok
end
# Delete
try do
result = ReplicationConnection.on_write(state, 3, :delete, table, %{"id" => "123"}, nil)
assert result == state
rescue
FunctionClauseError -> :ok
end
end
end
describe "warning message formatting" do
test "log_warning generates correct message format" do
test "log_warning generates correct message format for each operation" do
# Test insert
log_output =
capture_log(fn ->
assert :ok = on_insert(0, "test_table_insert", %{})
ReplicationConnection.on_write(%{}, 0, :insert, "test_table_insert", nil, %{})
end)
assert log_output =~ "No hook defined for insert on table test_table_insert"
assert log_output =~ "Please implement Domain.Events.Hooks for this table"
# Test update
log_output =
capture_log(fn ->
assert :ok = on_update(0, "test_table_update", %{}, %{})
ReplicationConnection.on_write(%{}, 0, :update, "test_table_update", %{}, %{})
end)
assert log_output =~ "No hook defined for update on table test_table_update"
assert log_output =~ "Please implement Domain.Events.Hooks for this table"
# Test delete
log_output =
capture_log(fn ->
assert :ok = on_delete(0, "test_table_delete", %{})
ReplicationConnection.on_write(%{}, 0, :delete, "test_table_delete", %{}, nil)
end)
assert log_output =~ "No hook defined for delete on table test_table_delete"
assert log_output =~ "Please implement Domain.Events.Hooks for this table"
end
end
describe "state preservation" do
test "always returns the state unchanged" do
initial_state = %{some: "data", counter: 42}
# Unknown table - should log warning and return state unchanged
result1 = ReplicationConnection.on_write(initial_state, 1, :insert, "unknown", nil, %{})
assert result1 == initial_state
# Known table (might error in hook, but should still preserve state)
try do
result2 = ReplicationConnection.on_write(initial_state, 2, :insert, "accounts", nil, %{})
assert result2 == initial_state
rescue
FunctionClauseError -> :ok
end
end
end
describe "table_to_hooks mapping" do
test "all configured tables have hook modules" do
# This test ensures our tables_to_hooks map is properly configured
tables_to_hooks = %{
"accounts" => Domain.Events.Hooks.Accounts,
"actor_group_memberships" => Domain.Events.Hooks.ActorGroupMemberships,
"actor_groups" => Domain.Events.Hooks.ActorGroups,
"actors" => Domain.Events.Hooks.Actors,
"auth_identities" => Domain.Events.Hooks.AuthIdentities,
"auth_providers" => Domain.Events.Hooks.AuthProviders,
"clients" => Domain.Events.Hooks.Clients,
"gateway_groups" => Domain.Events.Hooks.GatewayGroups,
"gateways" => Domain.Events.Hooks.Gateways,
"policies" => Domain.Events.Hooks.Policies,
"resource_connections" => Domain.Events.Hooks.ResourceConnections,
"resources" => Domain.Events.Hooks.Resources,
"tokens" => Domain.Events.Hooks.Tokens
}
# Verify the mapping includes all expected tables
assert Map.keys(tables_to_hooks) |> Enum.sort() ==
[
"accounts",
"actor_group_memberships",
"actor_groups",
"actors",
"auth_identities",
"auth_providers",
"clients",
"gateway_groups",
"gateways",
"policies",
"resource_connections",
"resources",
"tokens"
]
|> Enum.sort()
end
end
end

File diff suppressed because it is too large Load Diff

View File

@@ -64,7 +64,19 @@ config :domain, Domain.ChangeLogs.ReplicationConnection,
resource_connections
resources
tokens
]
],
# Allow up to 5 minutes of processing lag before alerting. This needs to be able to survive
# deploys without alerting.
warning_threshold: :timer.minutes(5),
# We almost never want to bypass changelog inserts
error_threshold: :timer.hours(30 * 24),
# Flush change logs data at least every 30 seconds
flush_interval: :timer.seconds(30),
# We want to flush at most 500 change logs at a time
flush_buffer_size: 500
config :domain, Domain.Events.ReplicationConnection,
replication_slot_name: "events_slot",
@@ -98,7 +110,16 @@ config :domain, Domain.Events.ReplicationConnection,
resource_connections
resources
tokens
]
],
# Allow up to 60 seconds of lag before alerting
warning_threshold: :timer.seconds(60),
# Allow up to 30 minutes of lag before bypassing hooks
error_threshold: :timer.minutes(30),
# Disable flush
flush_interval: 0,
flush_buffer_size: 0
config :domain, Domain.Tokens,
key_base: "5OVYJ83AcoQcPmdKNksuBhJFBhjHD1uUa9mDOHV/6EIdBQ6pXksIhkVeWIzFk5S2",

View File

@@ -107,7 +107,8 @@ config :api,
###############################
config :domain, Domain.Mailer, adapter: Domain.Mailer.TestAdapter
config :logger, level: :warning
# Allow asserting on info logs and higher
config :logger, level: :info
config :argon2_elixir, t_cost: 1, m_cost: 8