From b20c141759ba4d4e423ef6179ea9c617cc341fef Mon Sep 17 00:00:00 2001 From: Jamil Date: Sat, 5 Jul 2025 12:03:28 -0700 Subject: [PATCH] feat(portal): add batch-insert to change logs (#9733) Inserting a change log incurs some minor overhead for sending query over the network and reacting to its response. In many cases, this makes up the bulk of the actual time it takes to run the change log insert. To reduce this overhead and avoid any kind of processing delay in the WAL consumers, we introduce batch insert functionality with size `500` and timeout `30` seconds. If either of those two are hit, we flush the batch using `insert_all`. `insert_all` does not use `Ecto.Changeset`, so we need to be a bit more careful about the data we insert, and check the inserted LSNs to determine what to update the acknowledged LSN pointer to. The functionality to determine when to call the new `on_flush/1` callback lives in the replication_connection module, but the actual behavior of `on_flush/1` is left to the child modules to implement. The `Events.ReplicationConnection` module does not use flush behavior, and so does not override the defaults, which is not to use a flush mechanism. Related: #949 --- elixir/apps/domain/lib/domain/change_logs.ex | 10 +- .../change_logs/change_log/changeset.ex | 81 -- .../change_logs/replication_connection.ex | 127 +- .../domain/events/replication_connection.ex | 47 +- .../lib/domain/replication/connection.ex | 224 ++-- ...0630191454_set_defaults_on_change_logs.exs | 17 + ..._foreign_key_constraint_on_change_logs.exs | 13 + ...020715_add_change_logs_data_constraint.exs | 18 + .../replication_connection_test.exs | 1034 ++++++++++----- .../domain/test/domain/change_logs_test.exs | 249 ++-- .../events/replication_connection_test.exs | 148 ++- .../domain/replication/connection_test.exs | 1138 +++++++---------- elixir/config/config.exs | 25 +- elixir/config/test.exs | 3 +- 14 files changed, 1669 insertions(+), 1465 deletions(-) delete mode 100644 elixir/apps/domain/lib/domain/change_logs/change_log/changeset.ex create mode 100644 elixir/apps/domain/priv/repo/migrations/20250630191454_set_defaults_on_change_logs.exs create mode 100644 elixir/apps/domain/priv/repo/migrations/20250630210514_drop_foreign_key_constraint_on_change_logs.exs create mode 100644 elixir/apps/domain/priv/repo/migrations/20250702020715_add_change_logs_data_constraint.exs diff --git a/elixir/apps/domain/lib/domain/change_logs.ex b/elixir/apps/domain/lib/domain/change_logs.ex index 8c665f0a4..1a78708ea 100644 --- a/elixir/apps/domain/lib/domain/change_logs.ex +++ b/elixir/apps/domain/lib/domain/change_logs.ex @@ -2,9 +2,11 @@ defmodule Domain.ChangeLogs do alias Domain.ChangeLogs.ChangeLog alias Domain.Repo - def create_change_log(attrs) do - attrs - |> ChangeLog.Changeset.changeset() - |> Repo.insert() + def bulk_insert(list_of_attrs) do + ChangeLog + |> Repo.insert_all(list_of_attrs, + on_conflict: :nothing, + conflict_target: [:lsn] + ) end end diff --git a/elixir/apps/domain/lib/domain/change_logs/change_log/changeset.ex b/elixir/apps/domain/lib/domain/change_logs/change_log/changeset.ex deleted file mode 100644 index 608e6263b..000000000 --- a/elixir/apps/domain/lib/domain/change_logs/change_log/changeset.ex +++ /dev/null @@ -1,81 +0,0 @@ -defmodule Domain.ChangeLogs.ChangeLog.Changeset do - use Domain, :changeset - - @fields ~w[account_id lsn table op old_data data vsn]a - - def changeset(attrs) do - %Domain.ChangeLogs.ChangeLog{} - |> cast(attrs, @fields) - |> validate_inclusion(:op, [:insert, :update, :delete]) - |> validate_correct_data_present() - |> validate_same_account() - |> put_account_id() - |> validate_required([:account_id, :lsn, :table, :op, :vsn]) - |> unique_constraint(:lsn) - |> foreign_key_constraint(:account_id, name: :change_logs_account_id_fkey) - end - - # :insert requires old_data = nil and data != nil - # :update requires old_data != nil and data != nil - # :delete requires old_data != nil and data = nil - def validate_correct_data_present(changeset) do - op = get_field(changeset, :op) - old_data = get_field(changeset, :old_data) - data = get_field(changeset, :data) - - case {op, old_data, data} do - {:insert, nil, %{} = _data} -> - changeset - - {:update, %{} = _old_data, %{} = _data} -> - changeset - - {:delete, %{} = _old_data, nil} -> - changeset - - _ -> - add_error(changeset, :base, "Invalid combination of operation and data") - end - end - - # Add an error if data["account_id"] != old_data["account_id"] - defp validate_same_account(changeset) do - old_data = get_field(changeset, :old_data) - data = get_field(changeset, :data) - - account_id_key = account_id_field(changeset) - - if old_data && data && old_data[account_id_key] != data[account_id_key] do - add_error(changeset, :base, "Account ID cannot be changed") - else - changeset - end - end - - # Populate account_id from one of data, old_data - defp put_account_id(changeset) do - old_data = get_field(changeset, :old_data) - data = get_field(changeset, :data) - - account_id_key = account_id_field(changeset) - - account_id = - case {old_data, data} do - {nil, nil} -> nil - {_, %{^account_id_key => id}} -> id - {%{^account_id_key => id}, _} -> id - _ -> nil - end - - put_change(changeset, :account_id, account_id) - end - - # For accounts table updates, the account_id is in the "id" field - # For other tables, it is in the "account_id" field - defp account_id_field(changeset) do - case get_field(changeset, :table) do - "accounts" -> "id" - _ -> "account_id" - end - end -end diff --git a/elixir/apps/domain/lib/domain/change_logs/replication_connection.ex b/elixir/apps/domain/lib/domain/change_logs/replication_connection.ex index 920024402..c38189f82 100644 --- a/elixir/apps/domain/lib/domain/change_logs/replication_connection.ex +++ b/elixir/apps/domain/lib/domain/change_logs/replication_connection.ex @@ -1,90 +1,83 @@ defmodule Domain.ChangeLogs.ReplicationConnection do + use Domain.Replication.Connection alias Domain.ChangeLogs - use Domain.Replication.Connection, - # Allow up to 5 minutes of processing lag before alerting. This needs to be able to survive - # deploys without alerting. - warning_threshold_ms: 5 * 60 * 1_000, - - # 1 month in ms - we never want to bypass changelog inserts - error_threshold_ms: 30 * 24 * 60 * 60 * 1_000 - # Bump this to signify a change in the audit log schema. Use with care. @vsn 0 - def on_insert(lsn, table, data) do - log(:insert, lsn, table, nil, data) + # Avoid overwhelming the change log with soft-deleted records getting hard-deleted en masse. + # Can be removed after https://github.com/firezone/firezone/issues/8187 is shipped. + def on_write(state, _lsn, :delete, _table, %{"deleted_at" => deleted_at}, _data) + when not is_nil(deleted_at) do + state end - def on_update(lsn, table, old_data, data) do - log(:update, lsn, table, old_data, data) + # Ignore token writes for relay_groups since these are not expected to have an account_id + def on_write(state, _lsn, _op, "tokens", %{"type" => "relay_group"}, _data), do: state + def on_write(state, _lsn, _op, "tokens", _old_data, %{"type" => "relay_group"}), do: state + + # Handle accounts specially + def on_write(state, lsn, op, "accounts", old_data, %{"id" => account_id} = data) do + buffer(state, lsn, op, "accounts", account_id, old_data, data) end - def on_delete(lsn, table, old_data) do - if is_nil(old_data["deleted_at"]) do - log(:delete, lsn, table, old_data, nil) - else - # Avoid overwhelming the change log with soft-deleted records getting hard-deleted en masse. - # Can be removed after https://github.com/firezone/firezone/issues/8187 is shipped. - :ok - end + # Handle other writes where an account_id is present + def on_write(state, lsn, op, table, old_data, %{"account_id" => account_id} = data) + when not is_nil(account_id) do + buffer(state, lsn, op, table, account_id, old_data, data) end - # Relay group tokens don't have account_ids - - defp log(_op, _lsn, "tokens", %{"type" => "relay_group"}, _data) do - :ok + def on_write(state, lsn, op, table, %{"account_id" => account_id} = old_data, data) + when not is_nil(account_id) do + buffer(state, lsn, op, table, account_id, old_data, data) end - defp log(_op, _lsn, "tokens", _old_data, %{"type" => "relay_group"}) do - :ok - end - - defp log(op, lsn, table, old_data, data) do - attrs = %{ - op: op, + # If we get here, raise the alarm as it means we encountered a change we didn't expect. + def on_write(state, lsn, op, table, _old_data, _data) do + Logger.error( + "Unexpected write operation!", lsn: lsn, - table: table, - old_data: old_data, - data: data, - vsn: @vsn - } + op: op, + table: table + ) - case ChangeLogs.create_change_log(attrs) do - {:ok, _change_log} -> - :ok - - {:error, %Ecto.Changeset{errors: errors} = changeset} -> - if Enum.any?(errors, &should_skip_change_log?/1) do - # Expected under normal operation when an account is deleted or we are catching up on - # already-processed but not acknowledged WAL data. - :ok - else - Logger.warning("Failed to create change log", - errors: inspect(changeset.errors), - table: table, - op: op, - lsn: lsn, - vsn: @vsn - ) - - # TODO: WAL - # Don't ignore failures to insert change logs. Improve this after we have some - # operational experience with the data flowing in here. - :ok - end - end + state end - defp should_skip_change_log?({:account_id, {"does not exist", _violations}}) do - true + def on_flush(%{flush_buffer: flush_buffer} = state) when map_size(flush_buffer) == 0, do: state + + def on_flush(state) do + to_insert = Map.values(state.flush_buffer) + attempted_count = Enum.count(state.flush_buffer) + + {successful_count, _change_logs} = ChangeLogs.bulk_insert(to_insert) + + Logger.info("Flushed #{successful_count}/#{attempted_count} change logs") + + # We always advance the LSN to the highest LSN in the flush buffer because + # LSN conflicts just mean the data is already inserted, and other insert_all + # issues like a missing account_id will raise an exception. + last_lsn = + state.flush_buffer + |> Map.keys() + |> Enum.max() + + %{state | flush_buffer: %{}, last_flushed_lsn: last_lsn} end - defp should_skip_change_log?({:lsn, {"has already been taken", _violations}}) do - true - end + defp buffer(state, lsn, op, table, account_id, old_data, data) do + flush_buffer = + state.flush_buffer + |> Map.put_new(lsn, %{ + lsn: lsn, + op: op, + table: table, + account_id: account_id, + old_data: old_data, + data: data, + vsn: @vsn + }) - defp should_skip_change_log?(_error) do - false + %{state | flush_buffer: flush_buffer} end end diff --git a/elixir/apps/domain/lib/domain/events/replication_connection.ex b/elixir/apps/domain/lib/domain/events/replication_connection.ex index 48428764e..736dd8513 100644 --- a/elixir/apps/domain/lib/domain/events/replication_connection.ex +++ b/elixir/apps/domain/lib/domain/events/replication_connection.ex @@ -1,15 +1,7 @@ defmodule Domain.Events.ReplicationConnection do + use Domain.Replication.Connection alias Domain.Events.Hooks - use Domain.Replication.Connection, - # Allow up to 60 seconds of lag before alerting - warning_threshold_ms: 60 * 1_000, - - # Allow up to 30 minutes of lag before bypassing hooks - error_threshold_ms: 30 * 60 * 1_000 - - require Logger - @tables_to_hooks %{ "accounts" => Hooks.Accounts, "actor_group_memberships" => Hooks.ActorGroupMemberships, @@ -26,37 +18,18 @@ defmodule Domain.Events.ReplicationConnection do "tokens" => Hooks.Tokens } - def on_insert(_lsn, table, data) do - hook = Map.get(@tables_to_hooks, table) - - if hook do - hook.on_insert(data) + def on_write(state, _lsn, op, table, old_data, data) do + if hook = Map.get(@tables_to_hooks, table) do + case op do + :insert -> hook.on_insert(data) + :update -> hook.on_update(old_data, data) + :delete -> hook.on_delete(old_data) + end else - log_warning(:insert, table) - :ok + log_warning(op, table) end - end - def on_update(_lsn, table, old_data, data) do - hook = Map.get(@tables_to_hooks, table) - - if hook do - hook.on_update(old_data, data) - else - log_warning(:update, table) - :ok - end - end - - def on_delete(_lsn, table, old_data) do - hook = Map.get(@tables_to_hooks, table) - - if hook do - hook.on_delete(old_data) - else - log_warning(:delete, table) - :ok - end + state end defp log_warning(op, table) do diff --git a/elixir/apps/domain/lib/domain/replication/connection.ex b/elixir/apps/domain/lib/domain/replication/connection.ex index 129267b7b..f43ba34bb 100644 --- a/elixir/apps/domain/lib/domain/replication/connection.ex +++ b/elixir/apps/domain/lib/domain/replication/connection.ex @@ -15,17 +15,8 @@ defmodule Domain.Replication.Connection do ## Usage defmodule MyApp.ReplicationConnection do - use Domain.Replication.Connection, - status_log_interval_s: 60, - warning_threshold_ms: 30_000, - error_threshold_ms: 60 * 1_000 + use Domain.Replication.Connection end - - ## Options - - * `:warning_threshold_ms`: How long to allow the WAL stream to lag before logging a warning - * `:error_threshold_ms`: How long to allow the WAL stream to lag before logging an error and - bypassing side effect handlers. """ defmacro __using__(opts \\ []) do @@ -65,10 +56,6 @@ defmodule Domain.Replication.Connection do # Extract struct definition and constants defp struct_and_constants(opts) do quote bind_quoted: [opts: opts] do - @warning_threshold_ms Keyword.fetch!(opts, :warning_threshold_ms) - @error_threshold_ms Keyword.fetch!(opts, :error_threshold_ms) - @status_log_interval :timer.seconds(Keyword.get(opts, :status_log_interval_s, 60)) - # Everything else uses defaults @schema "public" @output_plugin "pgoutput" @@ -93,19 +80,56 @@ defmodule Domain.Replication.Connection do table_subscriptions: list(), relations: map(), counter: integer(), - tables_to_remove: MapSet.t() + tables_to_remove: MapSet.t(), + flush_interval: integer(), + flush_buffer: map(), + last_flushed_lsn: integer(), + warning_threshold_exceeded?: boolean(), + error_threshold_exceeded?: boolean(), + flush_buffer_size: integer(), + status_log_interval: integer(), + warning_threshold: integer(), + error_threshold: integer() } - defstruct schema: @schema, - step: :disconnected, - publication_name: nil, - replication_slot_name: nil, - output_plugin: @output_plugin, - proto_version: @proto_version, - table_subscriptions: [], - relations: %{}, - counter: 0, - tables_to_remove: MapSet.new() + defstruct( + # schema to use for the publication + schema: @schema, + # starting step + step: :disconnected, + # publication name to check/create + publication_name: nil, + # replication slot name to check/create + replication_slot_name: nil, + # output plugin to use for logical replication + output_plugin: @output_plugin, + # protocol version to use for logical replication + proto_version: @proto_version, + # tables we want to subscribe to in the publication + table_subscriptions: [], + # relations we have seen so far + relations: %{}, + # counter for the number of messages processed + counter: 0, + # calculated tables to remove from the publication + tables_to_remove: MapSet.new(), + # flush interval in milliseconds, set to 0 to use immediate processing + flush_interval: 0, + # buffer for data to flush + flush_buffer: %{}, + # last flushed LSN, used to track progress while flushing + last_flushed_lsn: 0, + # flags to track if we have exceeded warning/error thresholds + warning_threshold_exceeded?: false, + error_threshold_exceeded?: false, + # size of the flush buffer, used to determine when to flush + flush_buffer_size: 0, + # interval for logging status updates + status_log_interval: :timer.minutes(1), + # thresholds for warning and error logging + warning_threshold: :timer.seconds(30), + error_threshold: :timer.seconds(60) + ) end end @@ -119,10 +143,9 @@ defmodule Domain.Replication.Connection do @impl true def init(state) do - state = - state - |> Map.put(:warning_threshold_exceeded?, false) - |> Map.put(:error_threshold_exceeded?, false) + if state.flush_interval > 0 do + Process.send_after(self(), :flush, state.flush_interval) + end {:ok, state} end @@ -365,7 +388,14 @@ defmodule Domain.Replication.Connection do def handle_data(data, state) when is_keep_alive(data) do %KeepAlive{reply: reply, wal_end: wal_end} = parse(data) - wal_end = wal_end + 1 + wal_end = + if state.flush_interval > 0 do + # If we are flushing, we use the tracked last_flushed_lsn + state.last_flushed_lsn + 1 + else + # Otherwise, we assume to be current with the server + wal_end + 1 + end message = case reply do @@ -382,7 +412,7 @@ defmodule Domain.Replication.Connection do message |> decode_message() - |> handle_message(server_wal_end, %{state | counter: state.counter + 1}) + |> handle_write(server_wal_end, %{state | counter: state.counter + 1}) end end @@ -392,7 +422,7 @@ defmodule Domain.Replication.Connection do state: inspect(state) ) - {:noreply, [], state} + {:noreply, state} end end end @@ -402,14 +432,15 @@ defmodule Domain.Replication.Connection do quote do # Handles messages received: # - # 1. Insert/Update/Delete/Begin/Commit - send to appropriate hook - # 2. Relation messages - store the relation data in our state so we can use it later + # 1. Insert/Update/Delete - send to on_write/5 + # 2. Begin - check how far we are lagging behind + # 3. Relation messages - store the relation data in our state so we can use it later # to associate column names etc with the data we receive. In practice, we'll always # see a Relation message before we see any data for that relation. - # 3. Origin/Truncate/Type - we ignore these messages for now - # 4. Graceful shutdown - we respond with {:disconnect, :normal} to + # 4. Origin/Truncate/Type - we ignore these messages for now + # 5. Graceful shutdown - we respond with {:disconnect, :normal} to # indicate that we are shutting down gracefully and prevent auto reconnecting. - defp handle_message( + defp handle_write( %Decoder.Messages.Relation{ id: id, namespace: namespace, @@ -425,67 +456,57 @@ defmodule Domain.Replication.Connection do columns: columns } - {:noreply, ack(server_wal_end), - %{state | relations: Map.put(state.relations, id, relation)}} + {:noreply, %{state | relations: Map.put(state.relations, id, relation)}} end - defp handle_message(%Decoder.Messages.Insert{} = msg, server_wal_end, state) do - unless state.error_threshold_exceeded? do - {op, table, _old_data, data} = transform(msg, state.relations) - :ok = on_insert(server_wal_end, table, data) - end - - {:noreply, ack(server_wal_end), state} + defp handle_write(%Decoder.Messages.Insert{} = msg, server_wal_end, state) do + process_write(msg, server_wal_end, state) end - defp handle_message(%Decoder.Messages.Update{} = msg, server_wal_end, state) do - unless state.error_threshold_exceeded? do - {op, table, old_data, data} = transform(msg, state.relations) - :ok = on_update(server_wal_end, table, old_data, data) - end - - {:noreply, ack(server_wal_end), state} + defp handle_write(%Decoder.Messages.Update{} = msg, server_wal_end, state) do + process_write(msg, server_wal_end, state) end - defp handle_message(%Decoder.Messages.Delete{} = msg, server_wal_end, state) do - unless state.error_threshold_exceeded? do - {op, table, old_data, _data} = transform(msg, state.relations) - :ok = on_delete(server_wal_end, table, old_data) - end - - {:noreply, ack(server_wal_end), state} + defp handle_write(%Decoder.Messages.Delete{} = msg, server_wal_end, state) do + process_write(msg, server_wal_end, state) end - defp ack(server_wal_end) do - wal_end = server_wal_end + 1 - standby_status(wal_end, wal_end, wal_end, :now) + defp process_write(_msg, _server_wal_end, %{error_threshold_exceeded?: true} = state) do + {:noreply, state} end + + defp process_write(msg, server_wal_end, state) do + {op, table, old_data, data} = transform(msg, state.relations) + + state + |> on_write(server_wal_end, op, table, old_data, data) + |> maybe_flush() + |> then(&{:noreply, &1}) + end + + defp maybe_flush(%{flush_buffer: buffer, flush_buffer_size: size} = state) + when map_size(buffer) >= size do + on_flush(state) + end + + defp maybe_flush(state), do: state end end # Extract transaction and ignored message handlers defp transaction_handlers do quote do - defp handle_message( + defp handle_write( %Decoder.Messages.Begin{commit_timestamp: commit_timestamp} = msg, server_wal_end, state ) do - # Since we receive a commit for each operation and we process each operation - # one-by-one, we can use the commit timestamp to check if we are lagging behind. + # We can use the commit timestamp to check how far we are lagging behind lag_ms = DateTime.diff(DateTime.utc_now(), commit_timestamp, :millisecond) send(self(), {:check_warning_threshold, lag_ms}) send(self(), {:check_error_threshold, lag_ms}) - {:noreply, ack(server_wal_end), state} - end - - defp handle_message( - %Decoder.Messages.Commit{commit_timestamp: commit_timestamp}, - server_wal_end, - state - ) do - {:noreply, ack(server_wal_end), state} + {:noreply, state} end end end @@ -494,25 +515,30 @@ defmodule Domain.Replication.Connection do defp ignored_message_handlers do quote do # These messages are not relevant for our use case, so we ignore them. - defp handle_message(%Decoder.Messages.Origin{}, server_wal_end, state) do - {:noreply, ack(server_wal_end), state} + + defp handle_write(%Decoder.Messages.Commit{}, _server_wal_end, state) do + {:noreply, state} end - defp handle_message(%Decoder.Messages.Truncate{}, server_wal_end, state) do - {:noreply, ack(server_wal_end), state} + defp handle_write(%Decoder.Messages.Origin{}, _server_wal_end, state) do + {:noreply, state} end - defp handle_message(%Decoder.Messages.Type{}, server_wal_end, state) do - {:noreply, ack(server_wal_end), state} + defp handle_write(%Decoder.Messages.Truncate{}, _server_wal_end, state) do + {:noreply, state} end - defp handle_message(%Decoder.Messages.Unsupported{data: data}, server_wal_end, state) do + defp handle_write(%Decoder.Messages.Type{}, _server_wal_end, state) do + {:noreply, state} + end + + defp handle_write(%Decoder.Messages.Unsupported{data: data}, _server_wal_end, state) do Logger.warning("#{__MODULE__}: Unsupported message received", data: inspect(data), counter: state.counter ) - {:noreply, ack(server_wal_end), state} + {:noreply, state} end end end @@ -559,25 +585,24 @@ defmodule Domain.Replication.Connection do end end - # Extract info handlers defp info_handlers(opts) do quote bind_quoted: [opts: opts] do @impl true def handle_info( {:check_warning_threshold, lag_ms}, - %{warning_threshold_exceeded?: false} = state + %{warning_threshold_exceeded?: false, warning_threshold: warning_threshold} = state ) - when lag_ms >= @warning_threshold_ms do + when lag_ms >= warning_threshold do Logger.warning("#{__MODULE__}: Processing lag exceeds warning threshold", lag_ms: lag_ms) {:noreply, %{state | warning_threshold_exceeded?: true}} end def handle_info( {:check_warning_threshold, lag_ms}, - %{warning_threshold_exceeded?: true} = state + %{warning_threshold_exceeded?: true, warning_threshold: warning_threshold} = state ) - when lag_ms < @warning_threshold_ms do + when lag_ms < warning_threshold do Logger.info("#{__MODULE__}: Processing lag is back below warning threshold", lag_ms: lag_ms ) @@ -587,9 +612,9 @@ defmodule Domain.Replication.Connection do def handle_info( {:check_error_threshold, lag_ms}, - %{error_threshold_exceeded?: false} = state + %{error_threshold_exceeded?: false, error_threshold: error_threshold} = state ) - when lag_ms >= @error_threshold_ms do + when lag_ms >= error_threshold do Logger.error( "#{__MODULE__}: Processing lag exceeds error threshold; skipping side effects!", lag_ms: lag_ms @@ -600,9 +625,9 @@ defmodule Domain.Replication.Connection do def handle_info( {:check_error_threshold, lag_ms}, - %{error_threshold_exceeded?: true} = state + %{error_threshold_exceeded?: true, error_threshold: error_threshold} = state ) - when lag_ms < @error_threshold_ms do + when lag_ms < error_threshold do Logger.info("#{__MODULE__}: Processing lag is back below error threshold", lag_ms: lag_ms) {:noreply, %{state | error_threshold_exceeded?: false}} end @@ -612,11 +637,17 @@ defmodule Domain.Replication.Connection do "#{__MODULE__}: Processed #{state.counter} write messages from the WAL stream" ) - Process.send_after(self(), :interval_logger, @status_log_interval) + Process.send_after(self(), :interval_logger, state.status_log_interval) {:noreply, state} end + def handle_info(:flush, state) do + Process.send_after(self(), :flush, state.flush_interval) + + {:noreply, on_flush(state)} + end + def handle_info(:shutdown, _), do: {:disconnect, :normal} def handle_info({:DOWN, _, :process, _, _}, _), do: {:disconnect, :normal} @@ -628,11 +659,10 @@ defmodule Domain.Replication.Connection do defp default_callbacks do quote do # Default implementations for required callbacks - modules using this should implement these - def on_insert(_lsn, _table, _data), do: :ok - def on_update(_lsn, _table, _old_data, _data), do: :ok - def on_delete(_lsn, _table, _old_data), do: :ok + def on_write(state, _lsn, _op, _table, _old_data, _data), do: state + def on_flush(state), do: state - defoverridable on_insert: 3, on_update: 4, on_delete: 3 + defoverridable on_write: 6, on_flush: 1 end end end diff --git a/elixir/apps/domain/priv/repo/migrations/20250630191454_set_defaults_on_change_logs.exs b/elixir/apps/domain/priv/repo/migrations/20250630191454_set_defaults_on_change_logs.exs new file mode 100644 index 000000000..35130e0a2 --- /dev/null +++ b/elixir/apps/domain/priv/repo/migrations/20250630191454_set_defaults_on_change_logs.exs @@ -0,0 +1,17 @@ +defmodule Domain.Repo.Migrations.SetDefaultsOnChangeLogs do + use Ecto.Migration + + def up do + alter table(:change_logs) do + modify(:inserted_at, :utc_datetime_usec, default: fragment("now()")) + modify(:id, :binary_id, default: fragment("gen_random_uuid()")) + end + end + + def down do + alter table(:change_logs) do + modify(:inserted_at, :utc_datetime_usec, default: nil) + modify(:id, :binary_id, default: nil) + end + end +end diff --git a/elixir/apps/domain/priv/repo/migrations/20250630210514_drop_foreign_key_constraint_on_change_logs.exs b/elixir/apps/domain/priv/repo/migrations/20250630210514_drop_foreign_key_constraint_on_change_logs.exs new file mode 100644 index 000000000..cb9a2fd28 --- /dev/null +++ b/elixir/apps/domain/priv/repo/migrations/20250630210514_drop_foreign_key_constraint_on_change_logs.exs @@ -0,0 +1,13 @@ +defmodule Domain.Repo.Migrations.DropForeignKeyConstraintOnChangeLogs do + use Ecto.Migration + + def up do + drop(constraint(:change_logs, :change_logs_account_id_fkey)) + end + + def down do + alter table(:change_logs) do + modify(:account_id, references(:accounts, type: :binary_id, on_delete: :delete_all)) + end + end +end diff --git a/elixir/apps/domain/priv/repo/migrations/20250702020715_add_change_logs_data_constraint.exs b/elixir/apps/domain/priv/repo/migrations/20250702020715_add_change_logs_data_constraint.exs new file mode 100644 index 000000000..9ba9c1161 --- /dev/null +++ b/elixir/apps/domain/priv/repo/migrations/20250702020715_add_change_logs_data_constraint.exs @@ -0,0 +1,18 @@ +defmodule Domain.Repo.Migrations.AddChangeLogsDataConstraint do + use Ecto.Migration + + def change do + create( + constraint(:change_logs, :valid_data_for_operation, + check: """ + CASE op + WHEN 'insert' THEN data IS NOT NULL AND old_data IS NULL + WHEN 'update' THEN data IS NOT NULL AND old_data IS NOT NULL + WHEN 'delete' THEN data IS NULL AND old_data IS NOT NULL + ELSE false + END + """ + ) + ) + end +end diff --git a/elixir/apps/domain/test/domain/change_logs/replication_connection_test.exs b/elixir/apps/domain/test/domain/change_logs/replication_connection_test.exs index 39b856134..d178cc1b9 100644 --- a/elixir/apps/domain/test/domain/change_logs/replication_connection_test.exs +++ b/elixir/apps/domain/test/domain/change_logs/replication_connection_test.exs @@ -1,9 +1,8 @@ defmodule Domain.ChangeLogs.ReplicationConnectionTest do use Domain.DataCase, async: true - import ExUnit.CaptureLog import Ecto.Query - import Domain.ChangeLogs.ReplicationConnection + alias Domain.ChangeLogs.ReplicationConnection alias Domain.ChangeLogs.ChangeLog alias Domain.Repo @@ -12,351 +11,762 @@ defmodule Domain.ChangeLogs.ReplicationConnectionTest do %{account: account} end - describe "on_insert/2" do - test "ignores flows table - no record created" do - table = "flows" - data = %{"id" => 1, "name" => "test flow"} + describe "on_write/6 for inserts" do + test "ignores account inserts", %{account: account} do + initial_state = %{flush_buffer: %{}} - initial_count = Repo.aggregate(ChangeLog, :count, :id) + result_state = + ReplicationConnection.on_write( + initial_state, + 12345, + :insert, + "accounts", + nil, + %{"id" => account.id, "name" => "test account"} + ) - assert :ok = on_insert(0, table, data) - - # No record should be created for flows - final_count = Repo.aggregate(ChangeLog, :count, :id) - assert final_count == initial_count + assert result_state == %{ + flush_buffer: %{ + 12345 => %{ + data: %{ + "id" => account.id, + "name" => "test account" + }, + table: "accounts", + vsn: 0, + op: :insert, + account_id: account.id, + lsn: 12345, + old_data: nil + } + } + } end - test "creates change log record for non-flows tables", %{account: account} do - table = "accounts" - data = %{"id" => account.id, "name" => "test account"} + test "adds insert operation to flush buffer for non-account tables", %{account: account} do + table = "resources" - assert :ok = on_insert(0, table, data) + data = %{ + "id" => Ecto.UUID.generate(), + "account_id" => account.id, + "name" => "test resource" + } - # Verify the record was created - change_log = Repo.one!(from cl in ChangeLog, order_by: [desc: cl.inserted_at], limit: 1) + lsn = 12345 - assert change_log.op == :insert - assert change_log.table == table - assert change_log.old_data == nil - assert change_log.data == data - assert change_log.vsn == 0 - assert change_log.account_id == account.id + initial_state = %{flush_buffer: %{}} + + result_state = + ReplicationConnection.on_write( + initial_state, + lsn, + :insert, + table, + nil, + data + ) + + assert map_size(result_state.flush_buffer) == 1 + assert Map.has_key?(result_state.flush_buffer, lsn) + + attrs = result_state.flush_buffer[lsn] + assert attrs.lsn == lsn + assert attrs.table == table + assert attrs.op == :insert + assert attrs.data == data + assert attrs.old_data == nil + assert attrs.account_id == account.id + assert attrs.vsn == 0 end - test "creates records for different table types", %{account: account} do - test_cases = [ - {"accounts", %{"id" => account.id, "name" => "test account"}}, - {"resources", - %{"id" => Ecto.UUID.generate(), "name" => "test resource", "account_id" => account.id}}, - {"policies", - %{"id" => Ecto.UUID.generate(), "name" => "test policy", "account_id" => account.id}}, - {"actors", - %{"id" => Ecto.UUID.generate(), "name" => "test actor", "account_id" => account.id}} - ] + test "preserves existing buffer items", %{account: account} do + existing_lsn = 100 - for {{table, data}, idx} <- Enum.with_index(test_cases) do - initial_count = Repo.aggregate(ChangeLog, :count, :id) + existing_item = %{ + lsn: existing_lsn, + table: "other_table", + op: :update, + account_id: account.id, + data: %{"id" => "existing"}, + old_data: nil, + vsn: 0 + } - assert :ok = on_insert(idx, table, data) + initial_state = %{flush_buffer: %{existing_lsn => existing_item}} - final_count = Repo.aggregate(ChangeLog, :count, :id) - assert final_count == initial_count + 1 + new_lsn = 101 - change_log = Repo.one!(from cl in ChangeLog, order_by: [desc: cl.inserted_at], limit: 1) + result_state = + ReplicationConnection.on_write( + initial_state, + new_lsn, + :insert, + "resources", + nil, + %{"id" => Ecto.UUID.generate(), "account_id" => account.id} + ) - assert change_log.op == :insert - assert change_log.table == table - assert change_log.old_data == nil - assert change_log.data == data - assert change_log.vsn == 0 - assert change_log.account_id == account.id - end - end - end - - describe "on_update/3" do - test "ignores flows table - no record created" do - table = "flows" - old_data = %{"id" => 1, "name" => "old flow"} - data = %{"id" => 1, "name" => "new flow"} - - initial_count = Repo.aggregate(ChangeLog, :count, :id) - - assert :ok = on_update(0, table, old_data, data) - - # No record should be created for flows - final_count = Repo.aggregate(ChangeLog, :count, :id) - assert final_count == initial_count + assert map_size(result_state.flush_buffer) == 2 + assert result_state.flush_buffer[existing_lsn] == existing_item + assert Map.has_key?(result_state.flush_buffer, new_lsn) end - test "creates change log record for non-flows tables", %{account: account} do - table = "accounts" - old_data = %{"id" => account.id, "name" => "old name"} - data = %{"id" => account.id, "name" => "new name"} + test "ignores relay_group tokens" do + initial_state = %{flush_buffer: %{}} - assert :ok = on_update(0, table, old_data, data) + result_state = + ReplicationConnection.on_write( + initial_state, + 12345, + :insert, + "tokens", + nil, + %{"id" => Ecto.UUID.generate(), "type" => "relay_group"} + ) - # Verify the record was created - change_log = Repo.one!(from cl in ChangeLog, order_by: [desc: cl.inserted_at], limit: 1) - - assert change_log.op == :update - assert change_log.table == table - assert change_log.old_data == old_data - assert change_log.data == data - assert change_log.vsn == 0 - assert change_log.account_id == account.id + assert result_state == initial_state end test "handles complex data structures", %{account: account} do - table = "resources" - resource_id = Ecto.UUID.generate() - - old_data = %{ - "id" => resource_id, - "name" => "old name", - "account_id" => account.id, - "settings" => %{"theme" => "dark", "notifications" => true} - } - - data = %{ - "id" => resource_id, - "name" => "new name", - "account_id" => account.id, - "settings" => %{"theme" => "light", "notifications" => false}, - "tags" => ["updated", "important"] - } - - assert :ok = on_update(0, table, old_data, data) - - change_log = Repo.one!(from cl in ChangeLog, order_by: [desc: cl.inserted_at], limit: 1) - - assert change_log.op == :update - assert change_log.table == table - assert change_log.old_data == old_data - assert change_log.data == data - assert change_log.vsn == 0 - assert change_log.account_id == account.id - end - end - - describe "on_delete/2" do - test "ignores soft-deleted records" do - table = "resources" - old_data = %{"id" => Ecto.UUID.generate(), "deleted_at" => "#{DateTime.utc_now()}"} - - initial_count = Repo.aggregate(ChangeLog, :count, :id) - - assert :ok = on_delete(0, table, old_data) - - # No record should be created for soft-deleted records - final_count = Repo.aggregate(ChangeLog, :count, :id) - assert final_count == initial_count - end - - test "ignores flows table - no record created" do - table = "flows" - old_data = %{"id" => 1, "name" => "deleted flow"} - - initial_count = Repo.aggregate(ChangeLog, :count, :id) - - assert :ok = on_delete(0, table, old_data) - - # No record should be created for flows - final_count = Repo.aggregate(ChangeLog, :count, :id) - assert final_count == initial_count - end - - test "creates change log record for non-flows tables", %{account: account} do - table = "accounts" - old_data = %{"id" => account.id, "name" => "deleted account"} - - assert :ok = on_delete(0, table, old_data) - - # Verify the record was created - change_log = Repo.one!(from cl in ChangeLog, order_by: [desc: cl.inserted_at], limit: 1) - - assert change_log.op == :delete - assert change_log.table == table - assert change_log.old_data == old_data - assert change_log.data == nil - assert change_log.vsn == 0 - assert change_log.account_id == account.id - end - - test "handles various data types in old_data", %{account: account} do - table = "resources" - resource_id = Ecto.UUID.generate() - - old_data = %{ - "id" => resource_id, - "name" => "complex resource", - "account_id" => account.id, - "metadata" => %{ - "created_by" => "system", - "permissions" => ["read", "write"], - "config" => %{"timeout" => 30, "retries" => 3} - }, - "active" => true, - "count" => 42 - } - - assert :ok = on_delete(0, table, old_data) - - change_log = Repo.one!(from cl in ChangeLog, order_by: [desc: cl.inserted_at], limit: 1) - - assert change_log.op == :delete - assert change_log.table == table - assert change_log.old_data == old_data - assert change_log.data == nil - assert change_log.vsn == 0 - assert change_log.account_id == account.id - end - end - - describe "error handling" do - test "handles foreign key errors gracefully" do - # Create a change log entry that references a non-existent account - table = "resources" - # Non-existent account_id - data = %{"id" => Ecto.UUID.generate(), "account_id" => Ecto.UUID.generate()} - - initial_count = Repo.aggregate(ChangeLog, :count, :id) - - # Should return :ok even if foreign key constraint fails - assert :ok = on_insert(0, table, data) - - # No record should be created due to foreign key error - final_count = Repo.aggregate(ChangeLog, :count, :id) - assert final_count == initial_count - end - - test "logs and handles non-foreign-key validation errors gracefully", %{account: account} do - # Test with invalid data that would cause validation errors (not foreign key) - table = "accounts" - # Missing required fields but valid FK - data = %{"account_id" => account.id} - - initial_count = Repo.aggregate(ChangeLog, :count, :id) - - log_output = - capture_log(fn -> - assert :ok = on_insert(0, table, data) - end) - - # Should log the error - assert log_output =~ "Failed to create change log" - - # No record should be created - final_count = Repo.aggregate(ChangeLog, :count, :id) - assert final_count == initial_count - end - end - - describe "data integrity" do - test "preserves exact data structures", %{account: account} do - table = "policies" - policy_id = Ecto.UUID.generate() - - # Test with various data types complex_data = %{ - "id" => policy_id, + "id" => Ecto.UUID.generate(), "account_id" => account.id, - "string_field" => "test string", - "integer_field" => 42, - "boolean_field" => true, + "nested" => %{"key" => "value", "array" => [1, 2, 3]}, "null_field" => nil, - "array_field" => [1, "two", %{"three" => 3}], - "nested_object" => %{ - "level1" => %{ - "level2" => %{ - "deep_value" => "preserved" - } - } + "boolean" => true + } + + state = %{flush_buffer: %{}} + lsn = 200 + + result_state = + ReplicationConnection.on_write( + state, + lsn, + :insert, + "resources", + nil, + complex_data + ) + + attrs = result_state.flush_buffer[lsn] + assert attrs.data == complex_data + end + end + + describe "on_write/6 for updates" do + test "adds update operation to flush buffer", %{account: account} do + table = "resources" + old_data = %{"id" => Ecto.UUID.generate(), "account_id" => account.id, "name" => "old name"} + data = %{"id" => old_data["id"], "account_id" => account.id, "name" => "new name"} + lsn = 12346 + + initial_state = %{flush_buffer: %{}} + + result_state = + ReplicationConnection.on_write( + initial_state, + lsn, + :update, + table, + old_data, + data + ) + + assert map_size(result_state.flush_buffer) == 1 + attrs = result_state.flush_buffer[lsn] + + assert attrs.lsn == lsn + assert attrs.table == table + assert attrs.op == :update + assert attrs.data == data + assert attrs.old_data == old_data + assert attrs.account_id == account.id + assert attrs.vsn == 0 + end + + test "handles account updates specially", %{account: account} do + old_data = %{"id" => account.id, "name" => "old name"} + data = %{"id" => account.id, "name" => "new name"} + lsn = 12346 + + initial_state = %{flush_buffer: %{}} + + result_state = + ReplicationConnection.on_write( + initial_state, + lsn, + :update, + "accounts", + old_data, + data + ) + + # Account updates should be buffered + assert map_size(result_state.flush_buffer) == 1 + attrs = result_state.flush_buffer[lsn] + assert attrs.table == "accounts" + assert attrs.op == :update + assert attrs.account_id == account.id + end + + test "handles complex data changes", %{account: account} do + old_data = %{ + "id" => Ecto.UUID.generate(), + "account_id" => account.id, + "settings" => %{"theme" => "dark"}, + "tags" => ["old"] + } + + new_data = %{ + "id" => old_data["id"], + "account_id" => account.id, + "settings" => %{"theme" => "light", "language" => "en"}, + "tags" => ["new", "updated"] + } + + state = %{flush_buffer: %{}} + lsn = 300 + + result_state = + ReplicationConnection.on_write( + state, + lsn, + :update, + "resources", + old_data, + new_data + ) + + attrs = result_state.flush_buffer[lsn] + assert attrs.old_data == old_data + assert attrs.data == new_data + end + end + + describe "on_write/6 for deletes" do + test "ignores account deletes", %{account: account} do + initial_state = %{flush_buffer: %{}} + + result_state = + ReplicationConnection.on_write( + initial_state, + 12345, + :delete, + "accounts", + %{"id" => account.id, "name" => "deleted account"}, + nil + ) + + assert result_state == initial_state + end + + test "adds delete operation to flush buffer for non-soft-deleted records", %{account: account} do + table = "resources" + + old_data = %{ + "id" => Ecto.UUID.generate(), + "account_id" => account.id, + "name" => "deleted resource", + "deleted_at" => nil + } + + lsn = 12347 + + initial_state = %{flush_buffer: %{}} + + result_state = + ReplicationConnection.on_write( + initial_state, + lsn, + :delete, + table, + old_data, + nil + ) + + assert map_size(result_state.flush_buffer) == 1 + attrs = result_state.flush_buffer[lsn] + + assert attrs.lsn == lsn + assert attrs.table == table + assert attrs.op == :delete + assert attrs.data == nil + assert attrs.old_data == old_data + assert attrs.account_id == account.id + assert attrs.vsn == 0 + end + + test "ignores soft-deleted records", %{account: account} do + table = "resources" + + old_data = %{ + "id" => Ecto.UUID.generate(), + "account_id" => account.id, + "name" => "soft deleted", + "deleted_at" => "2024-01-01T00:00:00Z" + } + + lsn = 12348 + + initial_state = %{flush_buffer: %{}} + + result_state = + ReplicationConnection.on_write( + initial_state, + lsn, + :delete, + table, + old_data, + nil + ) + + # Buffer should remain unchanged + assert map_size(result_state.flush_buffer) == 0 + assert result_state == initial_state + end + + test "processes record without deleted_at field", %{account: account} do + old_data = %{ + "id" => Ecto.UUID.generate(), + "account_id" => account.id, + "name" => "no deleted_at field" + } + + state = %{flush_buffer: %{}} + lsn = 400 + + result_state = + ReplicationConnection.on_write( + state, + lsn, + :delete, + "resources", + old_data, + nil + ) + + assert map_size(result_state.flush_buffer) == 1 + attrs = result_state.flush_buffer[lsn] + assert attrs.op == :delete + end + end + + describe "multiple operations and buffer accumulation" do + test "operations accumulate in flush buffer correctly", %{account: account} do + initial_state = %{flush_buffer: %{}} + + # Insert + state1 = + ReplicationConnection.on_write( + initial_state, + 100, + :insert, + "resources", + nil, + %{"id" => Ecto.UUID.generate(), "account_id" => account.id, "name" => "test"} + ) + + assert map_size(state1.flush_buffer) == 1 + + # Update + resource_id = Ecto.UUID.generate() + + state2 = + ReplicationConnection.on_write( + state1, + 101, + :update, + "resources", + %{"id" => resource_id, "account_id" => account.id, "name" => "test"}, + %{"id" => resource_id, "account_id" => account.id, "name" => "updated"} + ) + + assert map_size(state2.flush_buffer) == 2 + + # Delete (non-soft) + state3 = + ReplicationConnection.on_write( + state2, + 102, + :delete, + "resources", + %{"id" => resource_id, "account_id" => account.id, "name" => "updated"}, + nil + ) + + assert map_size(state3.flush_buffer) == 3 + + # Verify LSNs + assert Map.has_key?(state3.flush_buffer, 100) + assert Map.has_key?(state3.flush_buffer, 101) + assert Map.has_key?(state3.flush_buffer, 102) + + assert state3.flush_buffer[100].op == :insert + assert state3.flush_buffer[101].op == :update + assert state3.flush_buffer[102].op == :delete + end + + test "mixed operations with soft deletes", %{account: account} do + state = %{flush_buffer: %{}} + + # Regular insert + state1 = + ReplicationConnection.on_write( + state, + 100, + :insert, + "resources", + nil, + %{"id" => Ecto.UUID.generate(), "account_id" => account.id, "name" => "test"} + ) + + # Regular update + resource_id = Ecto.UUID.generate() + + state2 = + ReplicationConnection.on_write( + state1, + 101, + :update, + "resources", + %{"id" => resource_id, "account_id" => account.id, "name" => "test"}, + %{"id" => resource_id, "account_id" => account.id, "name" => "updated"} + ) + + # Soft delete (should be ignored) + state3 = + ReplicationConnection.on_write( + state2, + 102, + :delete, + "resources", + %{ + "id" => resource_id, + "account_id" => account.id, + "name" => "updated", + "deleted_at" => "2024-01-01T00:00:00Z" + }, + nil + ) + + # Hard delete (should be included) + state4 = + ReplicationConnection.on_write( + state3, + 103, + :delete, + "resources", + %{ + "id" => resource_id, + "account_id" => account.id, + "name" => "updated", + "deleted_at" => nil + }, + nil + ) + + # Should have 3 operations: insert, update, hard delete (soft delete ignored) + assert map_size(state4.flush_buffer) == 3 + assert Map.has_key?(state4.flush_buffer, 100) + assert Map.has_key?(state4.flush_buffer, 101) + refute Map.has_key?(state4.flush_buffer, 102) + assert Map.has_key?(state4.flush_buffer, 103) + end + end + + describe "on_flush/1" do + test "handles empty flush buffer" do + state = %{flush_buffer: %{}} + + result_state = ReplicationConnection.on_flush(state) + + assert result_state == state + end + + test "successfully flushes buffer and clears it", %{account: account} do + # Create valid change log data + attrs1 = %{ + lsn: 100, + table: "resources", + op: :insert, + account_id: account.id, + data: %{"id" => Ecto.UUID.generate(), "account_id" => account.id, "name" => "test1"}, + old_data: nil, + vsn: 0 + } + + attrs2 = %{ + lsn: 101, + table: "resources", + op: :update, + account_id: account.id, + data: %{"id" => Ecto.UUID.generate(), "account_id" => account.id, "name" => "test2"}, + old_data: %{"id" => Ecto.UUID.generate(), "account_id" => account.id, "name" => "test1"}, + vsn: 0 + } + + state = %{ + flush_buffer: %{100 => attrs1, 101 => attrs2}, + last_flushed_lsn: 99 + } + + result_state = ReplicationConnection.on_flush(state) + + assert result_state.flush_buffer == %{} + # Should be the highest LSN + assert result_state.last_flushed_lsn == 101 + + # Verify actual records were created in database + change_logs = Repo.all(from cl in ChangeLog, where: cl.lsn in [100, 101], order_by: cl.lsn) + assert length(change_logs) == 2 + + [log1, log2] = change_logs + assert log1.lsn == 100 + assert log1.op == :insert + assert log2.lsn == 101 + assert log2.op == :update + end + + test "calculates last_flushed_lsn correctly as max LSN", %{account: account} do + # Create multiple records with non-sequential LSNs + attrs_map = %{ + 400 => %{ + lsn: 400, + table: "resources", + op: :insert, + account_id: account.id, + data: %{"id" => Ecto.UUID.generate(), "account_id" => account.id}, + old_data: nil, + vsn: 0 + }, + 402 => %{ + lsn: 402, + table: "resources", + op: :insert, + account_id: account.id, + data: %{"id" => Ecto.UUID.generate(), "account_id" => account.id}, + old_data: nil, + vsn: 0 + }, + 401 => %{ + lsn: 401, + table: "resources", + op: :insert, + account_id: account.id, + data: %{"id" => Ecto.UUID.generate(), "account_id" => account.id}, + old_data: nil, + vsn: 0 } } - assert :ok = on_insert(0, table, complex_data) + state = %{flush_buffer: attrs_map, last_flushed_lsn: 399} - change_log = Repo.one!(from cl in ChangeLog, order_by: [desc: cl.inserted_at], limit: 1) + result_state = ReplicationConnection.on_flush(state) - # Data should be preserved exactly as provided - assert change_log.data == complex_data - assert change_log.op == :insert - assert change_log.table == table - assert change_log.account_id == account.id - end - - test "tracks operation sequence correctly", %{account: account} do - table = "accounts" - initial_data = %{"id" => account.id, "name" => "initial"} - updated_data = %{"id" => account.id, "name" => "updated"} - - # Insert - assert :ok = on_insert(0, table, initial_data) - - # Update - assert :ok = on_update(1, table, initial_data, updated_data) - - # Delete - assert :ok = on_delete(2, table, updated_data) - - # Get the three most recent records in reverse chronological order - logs = - Repo.all( - from cl in ChangeLog, - where: cl.account_id == ^account.id, - order_by: [desc: cl.inserted_at], - limit: 3 - ) - - # Should have 3 records (delete, update, insert in that order) - assert length(logs) >= 3 - [delete_log, update_log, insert_log] = Enum.take(logs, 3) - - # Verify sequence (most recent first) - assert delete_log.op == :delete - assert delete_log.old_data == updated_data - assert delete_log.data == nil - assert delete_log.account_id == account.id - - assert update_log.op == :update - assert update_log.old_data == initial_data - assert update_log.data == updated_data - assert update_log.account_id == account.id - - assert insert_log.op == :insert - assert insert_log.old_data == nil - assert insert_log.data == initial_data - assert insert_log.account_id == account.id - - # All should have same version - assert insert_log.vsn == 0 - assert update_log.vsn == 0 - assert delete_log.vsn == 0 + # Should update to max LSN (402) + assert result_state.last_flushed_lsn == 402 + assert result_state.flush_buffer == %{} end end - describe "flows table comprehensive test" do - test "flows table never creates records regardless of operation or data" do - initial_count = Repo.aggregate(ChangeLog, :count, :id) + describe "LSN tracking and ordering" do + test "LSNs are preserved correctly in buffer" do + lsns = [1000, 1001, 1002] - # Test various data shapes and operations - test_data_sets = [ - %{}, - %{"id" => 1}, - %{"complex" => %{"nested" => ["data", 1, true, nil]}}, - nil - ] + state = %{flush_buffer: %{}} - for data <- test_data_sets do - assert :ok = on_insert(0, "flows", data) - assert :ok = on_update(1, "flows", data, data) - assert :ok = on_delete(2, "flows", data) - end + # Add multiple operations with specific LSNs + final_state = + Enum.reduce(lsns, state, fn lsn, acc_state -> + ReplicationConnection.on_write( + acc_state, + lsn, + :insert, + "resources", + nil, + %{ + "id" => Ecto.UUID.generate(), + "account_id" => "test-account", + "name" => "test_#{lsn}" + } + ) + end) - # No records should have been created - final_count = Repo.aggregate(ChangeLog, :count, :id) - assert final_count == initial_count + # Verify LSNs are preserved as keys + assert Map.keys(final_state.flush_buffer) |> Enum.sort() == lsns + + # Verify each entry has correct LSN + Enum.each(lsns, fn lsn -> + assert final_state.flush_buffer[lsn].lsn == lsn + end) + end + + test "handles large LSN values", %{account: account} do + large_lsn = 999_999_999_999_999 + + state = %{flush_buffer: %{}} + + result_state = + ReplicationConnection.on_write( + state, + large_lsn, + :insert, + "resources", + nil, + %{"id" => Ecto.UUID.generate(), "account_id" => account.id} + ) + + attrs = result_state.flush_buffer[large_lsn] + assert attrs.lsn == large_lsn + end + + test "preserves LSN ordering through flush", %{account: account} do + # Add operations with non-sequential LSNs + lsns = [1005, 1003, 1007, 1001] + + state = %{flush_buffer: %{}, last_flushed_lsn: 0} + + final_state = + Enum.reduce(lsns, state, fn lsn, acc_state -> + ReplicationConnection.on_write( + acc_state, + lsn, + :insert, + "resources", + nil, + %{"id" => Ecto.UUID.generate(), "account_id" => account.id} + ) + end) + + # Flush to database + ReplicationConnection.on_flush(final_state) + + # Verify records in database have correct LSNs + change_logs = Repo.all(from cl in ChangeLog, where: cl.lsn in ^lsns, order_by: cl.lsn) + db_lsns = Enum.map(change_logs, & &1.lsn) + assert db_lsns == Enum.sort(lsns) + end + end + + describe "edge cases and error scenarios" do + test "logs error for writes without account_id" do + import ExUnit.CaptureLog + + state = %{flush_buffer: %{}} + + log = + capture_log(fn -> + result = + ReplicationConnection.on_write( + state, + 500, + :insert, + "some_table", + nil, + %{"id" => Ecto.UUID.generate(), "name" => "no account_id"} + ) + + # State should remain unchanged + assert result == state + end) + + assert log =~ "Unexpected write operation!" + assert log =~ "lsn=500" + end + + test "handles account_id in old_data for deletes", %{account: account} do + state = %{flush_buffer: %{}} + lsn = 600 + + result_state = + ReplicationConnection.on_write( + state, + lsn, + :delete, + "resources", + %{"id" => Ecto.UUID.generate(), "account_id" => account.id}, + nil + ) + + assert map_size(result_state.flush_buffer) == 1 + assert result_state.flush_buffer[lsn].account_id == account.id + end + + test "handles very large buffers" do + account_id = Ecto.UUID.generate() + state = %{flush_buffer: %{}, last_flushed_lsn: 0} + + # Simulate adding many operations + operations = 1..100 + + final_state = + Enum.reduce(operations, state, fn i, acc_state -> + ReplicationConnection.on_write( + acc_state, + i, + :insert, + "resources", + nil, + %{"id" => Ecto.UUID.generate(), "account_id" => account_id, "name" => "user#{i}"} + ) + end) + + assert map_size(final_state.flush_buffer) == 100 + + # Verify all LSNs are present + buffer_lsns = Map.keys(final_state.flush_buffer) |> Enum.sort() + assert buffer_lsns == Enum.to_list(1..100) + end + end + + describe "special table handling" do + test "ignores relay_group token updates" do + state = %{flush_buffer: %{}} + + # Update where old_data has relay_group type + result_state1 = + ReplicationConnection.on_write( + state, + 100, + :update, + "tokens", + %{"id" => Ecto.UUID.generate(), "type" => "relay_group"}, + %{"id" => Ecto.UUID.generate(), "type" => "relay_group", "updated" => true} + ) + + assert result_state1 == state + + # Update where new data has relay_group type + result_state2 = + ReplicationConnection.on_write( + state, + 101, + :update, + "tokens", + %{"id" => Ecto.UUID.generate(), "type" => "other"}, + %{"id" => Ecto.UUID.generate(), "type" => "relay_group"} + ) + + assert result_state2 == state + end + + test "processes non-relay_group tokens normally", %{account: account} do + state = %{flush_buffer: %{}} + lsn = 102 + + result_state = + ReplicationConnection.on_write( + state, + lsn, + :insert, + "tokens", + nil, + %{"id" => Ecto.UUID.generate(), "account_id" => account.id, "type" => "browser"} + ) + + assert map_size(result_state.flush_buffer) == 1 + assert result_state.flush_buffer[lsn].table == "tokens" end end end diff --git a/elixir/apps/domain/test/domain/change_logs_test.exs b/elixir/apps/domain/test/domain/change_logs_test.exs index 48b8cf85d..f9af0b863 100644 --- a/elixir/apps/domain/test/domain/change_logs_test.exs +++ b/elixir/apps/domain/test/domain/change_logs_test.exs @@ -2,15 +2,15 @@ defmodule Domain.ChangeLogsTest do use Domain.DataCase, async: true import Domain.ChangeLogs - describe "create/1" do + describe "bulk_insert/1" do setup do account = Fixtures.Accounts.create_account() - %{account: account} end - test "inserts a change_log for an account", %{account: account} do + test "skips duplicate lsn", %{account: account} do attrs = %{ + account_id: account.id, lsn: 1, table: "resources", op: :insert, @@ -19,152 +19,16 @@ defmodule Domain.ChangeLogsTest do vsn: 1 } - assert {:ok, %Domain.ChangeLogs.ChangeLog{} = change_log} = create_change_log(attrs) - - assert change_log.account_id == account.id - assert change_log.op == :insert - assert change_log.old_data == nil - assert change_log.data == %{"account_id" => account.id, "key" => "value"} - end - - test "uses the 'id' field accounts table updates", %{account: account} do - attrs = %{ - lsn: 1, - table: "accounts", - op: :update, - old_data: %{"id" => account.id, "name" => "Old Name"}, - data: %{"id" => account.id, "name" => "New Name"}, - vsn: 1 - } - - assert {:ok, %Domain.ChangeLogs.ChangeLog{} = change_log} = create_change_log(attrs) - - assert change_log.account_id == account.id - assert change_log.op == :update - assert change_log.old_data == %{"id" => account.id, "name" => "Old Name"} - assert change_log.data == %{"id" => account.id, "name" => "New Name"} - end - - test "requires vsn field", %{account: account} do - attrs = %{ - lsn: 1, - table: "resources", - op: :insert, - old_data: nil, - data: %{"account_id" => account.id, "key" => "value"} - } - - assert {:error, changeset} = create_change_log(attrs) - assert changeset.valid? == false - assert changeset.errors[:vsn] == {"can't be blank", [validation: :required]} - end - - test "requires table field", %{account: account} do - attrs = %{ - lsn: 1, - op: :insert, - old_data: nil, - data: %{"account_id" => account.id, "key" => "value"}, - vsn: 1 - } - - assert {:error, changeset} = create_change_log(attrs) - assert changeset.valid? == false - assert changeset.errors[:table] == {"can't be blank", [validation: :required]} - end - - test "prevents inserting duplicate lsn", %{account: account} do - attrs = %{ - lsn: 1, - table: "resources", - op: :insert, - old_data: nil, - data: %{"account_id" => account.id, "key" => "value"}, - vsn: 1 - } - - assert {:ok, _change_log} = create_change_log(attrs) + assert {1, nil} = bulk_insert([attrs]) + # Try to insert with same LSN but different data dupe_lsn_attrs = Map.put(attrs, :data, %{"account_id" => account.id, "key" => "new_value"}) - - assert {:error, changeset} = create_change_log(dupe_lsn_attrs) - assert changeset.valid? == false - - assert changeset.errors[:lsn] == - {"has already been taken", - [constraint: :unique, constraint_name: "change_logs_lsn_index"]} + assert {0, nil} = bulk_insert([dupe_lsn_attrs]) end - test "requires op field to be one of :insert, :update, :delete", %{account: account} do - attrs = %{ - lsn: 1, - table: "resources", - op: :invalid_op, - old_data: nil, - data: %{"account_id" => account.id, "key" => "value"}, - vsn: 1 - } - - assert {:error, changeset} = create_change_log(attrs) - assert changeset.valid? == false - assert {"is invalid", errors} = changeset.errors[:op] - assert {:validation, :inclusion} in errors - end - - test "requires correct combination of operation and data", %{account: account} do - # Invalid combination: :insert with old_data present - attrs = %{ - lsn: 1, - table: "resources", - op: :insert, - old_data: %{"account_id" => account.id, "key" => "old_value"}, - data: %{"account_id" => account.id, "key" => "new_value"}, - vsn: 1 - } - - assert {:error, changeset} = create_change_log(attrs) - assert changeset.valid? == false - assert changeset.errors[:base] == {"Invalid combination of operation and data", []} - - # Valid combination: :insert with old_data nil and data present - attrs = %{ - lsn: 1, - table: "resources", - op: :insert, - old_data: nil, - data: %{"account_id" => account.id, "key" => "new_value"}, - vsn: 1 - } - - assert {:ok, _change_log} = create_change_log(attrs) - - # Valid combination: :update with both old_data and data present - attrs = %{ - lsn: 2, - table: "resources", - op: :update, - old_data: %{"account_id" => account.id, "key" => "old_value"}, - data: %{"account_id" => account.id, "key" => "new_value"}, - vsn: 1 - } - - assert {:ok, _change_log} = create_change_log(attrs) - - # Valid combination: :delete with old_data present and data nil - attrs = %{ - lsn: 3, - table: "resources", - op: :delete, - old_data: %{"account_id" => account.id, "key" => "old_value"}, - data: nil, - vsn: 1 - } - - assert {:ok, _change_log} = create_change_log(attrs) - end - - test "requires account_id to be populated from old_data or data" do + test "raises not null constraint when account_id is missing" do attrs = %{ + # account_id is missing lsn: 1, table: "resources", op: :insert, @@ -173,26 +37,97 @@ defmodule Domain.ChangeLogsTest do vsn: 1 } - assert {:error, changeset} = create_change_log(attrs) - assert changeset.valid? == false - assert changeset.errors[:account_id] == {"can't be blank", [validation: :required]} + assert_raise Postgrex.Error, + ~r/null value in column "account_id".*violates not-null constraint/, + fn -> + bulk_insert([attrs]) + end end - test "requires old_data[\"account_id\"] and data[\"account_id\"] to match", %{ - account: account - } do + test "raises not null constraint when table is missing", %{account: account} do attrs = %{ + account_id: account.id, lsn: 1, - table: "resources", - op: :update, - old_data: %{"account_id" => account.id, "key" => "old_value"}, - data: %{"account_id" => "different_account_id", "key" => "new_value"}, + # table is missing + op: :insert, + old_data: nil, + data: %{"key" => "value"}, vsn: 1 } - assert {:error, changeset} = create_change_log(attrs) - assert changeset.valid? == false - assert changeset.errors[:base] == {"Account ID cannot be changed", []} + assert_raise Postgrex.Error, + ~r/null value in column "table".*violates not-null constraint/, + fn -> + bulk_insert([attrs]) + end + end + + test "raises not null constraint when op is missing", %{account: account} do + attrs = %{ + account_id: account.id, + lsn: 1, + table: "resources", + # op is missing + old_data: nil, + data: %{"key" => "value"}, + vsn: 1 + } + + assert_raise Postgrex.Error, + ~r/null value in column "op".*violates not-null constraint/, + fn -> + bulk_insert([attrs]) + end + end + + test "enforces data constraints based on operation type", %{account: account} do + # Invalid insert (has old_data) + assert_raise Postgrex.Error, ~r/valid_data_for_operation/, fn -> + bulk_insert([ + %{ + account_id: account.id, + lsn: 1, + table: "resources", + op: :insert, + # Should be null for insert + old_data: %{"id" => "123"}, + data: %{"id" => "123"}, + vsn: 1 + } + ]) + end + + # Invalid update (missing old_data) + assert_raise Postgrex.Error, ~r/valid_data_for_operation/, fn -> + bulk_insert([ + %{ + account_id: account.id, + lsn: 2, + table: "resources", + op: :update, + # Should not be null for update + old_data: nil, + data: %{"id" => "123"}, + vsn: 1 + } + ]) + end + + # Invalid delete (has data) + assert_raise Postgrex.Error, ~r/valid_data_for_operation/, fn -> + bulk_insert([ + %{ + account_id: account.id, + lsn: 3, + table: "resources", + op: :delete, + old_data: %{"id" => "123"}, + # Should be null for delete + data: %{"id" => "123"}, + vsn: 1 + } + ]) + end end end end diff --git a/elixir/apps/domain/test/domain/events/replication_connection_test.exs b/elixir/apps/domain/test/domain/events/replication_connection_test.exs index b95a0773a..fdbc63b17 100644 --- a/elixir/apps/domain/test/domain/events/replication_connection_test.exs +++ b/elixir/apps/domain/test/domain/events/replication_connection_test.exs @@ -2,7 +2,7 @@ defmodule Domain.Events.ReplicationConnectionTest do use ExUnit.Case, async: true import ExUnit.CaptureLog - import Domain.Events.ReplicationConnection + alias Domain.Events.ReplicationConnection setup do tables = @@ -12,14 +12,15 @@ defmodule Domain.Events.ReplicationConnectionTest do %{tables: tables} end - describe "on_insert/2" do + describe "on_write/6 for inserts" do test "logs warning for unknown table" do table = "unknown_table" data = %{"id" => Ecto.UUID.generate(), "name" => "test"} log_output = capture_log(fn -> - assert :ok = on_insert(0, table, data) + result = ReplicationConnection.on_write(%{}, 0, :insert, table, nil, data) + assert result == %{} end) assert log_output =~ "No hook defined for insert on table unknown_table" @@ -33,9 +34,8 @@ defmodule Domain.Events.ReplicationConnectionTest do # The actual hook call might fail if the hook modules aren't available, # but we can test that our routing logic works try do - result = on_insert(0, table, data) - # Should either succeed or fail gracefully - assert result in [:ok, :error] or match?({:error, _}, result) + result = ReplicationConnection.on_write(%{}, 0, :insert, table, nil, data) + assert result == %{} rescue # Depending on the shape of the data we might get a function clause error. This is ok, # as we are testing the routing logic, not the actual hook implementations. @@ -50,7 +50,9 @@ defmodule Domain.Events.ReplicationConnectionTest do log_output = capture_log(fn -> try do - on_insert(0, table, %{"id" => Ecto.UUID.generate()}) + ReplicationConnection.on_write(%{}, 0, :insert, table, nil, %{ + "id" => Ecto.UUID.generate() + }) rescue FunctionClauseError -> # Shape of the data might not match the expected one, which is fine @@ -63,15 +65,16 @@ defmodule Domain.Events.ReplicationConnectionTest do end end - describe "on_update/3" do + describe "on_write/6 for updates" do test "logs warning for unknown table" do table = "unknown_table" old_data = %{"id" => Ecto.UUID.generate(), "name" => "old"} - data = %{"id" => Ecto.UUID.generate(), "name" => "new"} + data = %{"id" => old_data["id"], "name" => "new"} log_output = capture_log(fn -> - assert :ok = on_update(0, table, old_data, data) + result = ReplicationConnection.on_write(%{}, 0, :update, table, old_data, data) + assert result == %{} end) assert log_output =~ "No hook defined for update on table unknown_table" @@ -80,12 +83,12 @@ defmodule Domain.Events.ReplicationConnectionTest do test "handles known tables", %{tables: tables} do old_data = %{"id" => Ecto.UUID.generate(), "name" => "old name"} - data = %{"id" => Ecto.UUID.generate(), "name" => "new name"} + data = %{"id" => old_data["id"], "name" => "new name"} for table <- tables do try do - result = on_update(0, table, old_data, data) - assert result in [:ok, :error] or match?({:error, _}, result) + result = ReplicationConnection.on_write(%{}, 0, :update, table, old_data, data) + assert result == %{} rescue FunctionClauseError -> # Shape of the data might not match the expected one, which is fine @@ -95,14 +98,15 @@ defmodule Domain.Events.ReplicationConnectionTest do end end - describe "on_delete/2" do + describe "on_write/6 for deletes" do test "logs warning for unknown table" do table = "unknown_table" old_data = %{"id" => Ecto.UUID.generate(), "name" => "deleted"} log_output = capture_log(fn -> - assert :ok = on_delete(0, table, old_data) + result = ReplicationConnection.on_write(%{}, 0, :delete, table, old_data, nil) + assert result == %{} end) assert log_output =~ "No hook defined for delete on table unknown_table" @@ -110,11 +114,12 @@ defmodule Domain.Events.ReplicationConnectionTest do end test "handles known tables", %{tables: tables} do - old_data = %{"id" => Ecto.UUID.generate(), "name" => "deleted gateway"} + old_data = %{"id" => Ecto.UUID.generate(), "name" => "deleted item"} for table <- tables do try do - assert :ok = on_delete(0, table, old_data) + result = ReplicationConnection.on_write(%{}, 0, :delete, table, old_data, nil) + assert result == %{} rescue # Shape of the data might not match the expected one, which is fine FunctionClauseError -> :ok @@ -123,31 +128,132 @@ defmodule Domain.Events.ReplicationConnectionTest do end end + describe "operation routing" do + test "routes to correct hook based on operation type" do + # Test that we dispatch to the right operation + # Since we can't directly test the hook calls without the actual hook modules, + # we can at least verify the routing logic doesn't crash + + state = %{} + table = "accounts" + + # Insert + try do + result = ReplicationConnection.on_write(state, 1, :insert, table, nil, %{"id" => "123"}) + assert result == state + rescue + FunctionClauseError -> :ok + end + + # Update + try do + result = + ReplicationConnection.on_write(state, 2, :update, table, %{"id" => "123"}, %{ + "id" => "123", + "updated" => true + }) + + assert result == state + rescue + FunctionClauseError -> :ok + end + + # Delete + try do + result = ReplicationConnection.on_write(state, 3, :delete, table, %{"id" => "123"}, nil) + assert result == state + rescue + FunctionClauseError -> :ok + end + end + end + describe "warning message formatting" do - test "log_warning generates correct message format" do + test "log_warning generates correct message format for each operation" do + # Test insert log_output = capture_log(fn -> - assert :ok = on_insert(0, "test_table_insert", %{}) + ReplicationConnection.on_write(%{}, 0, :insert, "test_table_insert", nil, %{}) end) assert log_output =~ "No hook defined for insert on table test_table_insert" assert log_output =~ "Please implement Domain.Events.Hooks for this table" + # Test update log_output = capture_log(fn -> - assert :ok = on_update(0, "test_table_update", %{}, %{}) + ReplicationConnection.on_write(%{}, 0, :update, "test_table_update", %{}, %{}) end) assert log_output =~ "No hook defined for update on table test_table_update" assert log_output =~ "Please implement Domain.Events.Hooks for this table" + # Test delete log_output = capture_log(fn -> - assert :ok = on_delete(0, "test_table_delete", %{}) + ReplicationConnection.on_write(%{}, 0, :delete, "test_table_delete", %{}, nil) end) assert log_output =~ "No hook defined for delete on table test_table_delete" assert log_output =~ "Please implement Domain.Events.Hooks for this table" end end + + describe "state preservation" do + test "always returns the state unchanged" do + initial_state = %{some: "data", counter: 42} + + # Unknown table - should log warning and return state unchanged + result1 = ReplicationConnection.on_write(initial_state, 1, :insert, "unknown", nil, %{}) + assert result1 == initial_state + + # Known table (might error in hook, but should still preserve state) + try do + result2 = ReplicationConnection.on_write(initial_state, 2, :insert, "accounts", nil, %{}) + assert result2 == initial_state + rescue + FunctionClauseError -> :ok + end + end + end + + describe "table_to_hooks mapping" do + test "all configured tables have hook modules" do + # This test ensures our tables_to_hooks map is properly configured + tables_to_hooks = %{ + "accounts" => Domain.Events.Hooks.Accounts, + "actor_group_memberships" => Domain.Events.Hooks.ActorGroupMemberships, + "actor_groups" => Domain.Events.Hooks.ActorGroups, + "actors" => Domain.Events.Hooks.Actors, + "auth_identities" => Domain.Events.Hooks.AuthIdentities, + "auth_providers" => Domain.Events.Hooks.AuthProviders, + "clients" => Domain.Events.Hooks.Clients, + "gateway_groups" => Domain.Events.Hooks.GatewayGroups, + "gateways" => Domain.Events.Hooks.Gateways, + "policies" => Domain.Events.Hooks.Policies, + "resource_connections" => Domain.Events.Hooks.ResourceConnections, + "resources" => Domain.Events.Hooks.Resources, + "tokens" => Domain.Events.Hooks.Tokens + } + + # Verify the mapping includes all expected tables + assert Map.keys(tables_to_hooks) |> Enum.sort() == + [ + "accounts", + "actor_group_memberships", + "actor_groups", + "actors", + "auth_identities", + "auth_providers", + "clients", + "gateway_groups", + "gateways", + "policies", + "resource_connections", + "resources", + "tokens" + ] + |> Enum.sort() + end + end end diff --git a/elixir/apps/domain/test/domain/replication/connection_test.exs b/elixir/apps/domain/test/domain/replication/connection_test.exs index 720ece52b..bf8cd61a8 100644 --- a/elixir/apps/domain/test/domain/replication/connection_test.exs +++ b/elixir/apps/domain/test/domain/replication/connection_test.exs @@ -1,728 +1,494 @@ defmodule Domain.Replication.ConnectionTest do - # Only one ReplicationConnection should be started in the cluster - use ExUnit.Case, async: false + use ExUnit.Case, async: true - # Create a test module that uses the macro + import ExUnit.CaptureLog + + # Define a test module that uses the Domain.Replication.Connection macro defmodule TestReplicationConnection do - use Domain.Replication.Connection, - warning_threshold_ms: 5_000, - error_threshold_ms: 60_000 - end + use Domain.Replication.Connection - alias TestReplicationConnection + def on_write(state, lsn, op, table, old_data, data) do + # Simple test implementation that tracks operations + operations = Map.get(state, :operations, []) - # Used to test callbacks, not used for live connection - def mock_state, - do: %TestReplicationConnection{ - schema: "test_schema", - step: :disconnected, - publication_name: "test_pub", - replication_slot_name: "test_slot", - output_plugin: "pgoutput", - proto_version: 1, - table_subscriptions: ["accounts", "resources"], - relations: %{}, - counter: 0, - tables_to_remove: MapSet.new() - } - - # Used to test live connection - setup do - {connection_opts, config} = - Application.fetch_env!(:domain, Domain.Events.ReplicationConnection) - |> Keyword.pop(:connection_opts) - - init_state = %{ - connection_opts: connection_opts, - instance: struct(TestReplicationConnection, config) - } - - child_spec = %{ - id: TestReplicationConnection, - start: {TestReplicationConnection, :start_link, [init_state]} - } - - {:ok, pid} = - case start_supervised(child_spec) do - {:ok, pid} -> - {:ok, pid} - - {:error, {:already_started, pid}} -> - {:ok, pid} - end - - {:ok, pid: pid} - end - - describe "handle_connect/1 callback" do - test "handle_connect initiates publication check" do - state = mock_state() - expected_query = "SELECT 1 FROM pg_publication WHERE pubname = '#{state.publication_name}'" - expected_next_state = %{state | step: :check_publication} - - assert {:query, ^expected_query, ^expected_next_state} = - TestReplicationConnection.handle_connect(state) - end - end - - describe "handle_result/2 callback" do - test "handle_result transitions from check_publication to check_publication_tables when publication exists" do - state = %{mock_state() | step: :check_publication} - result = [%Postgrex.Result{num_rows: 1}] - - expected_query = """ - SELECT schemaname, tablename - FROM pg_publication_tables - WHERE pubname = '#{state.publication_name}' - ORDER BY schemaname, tablename - """ - - expected_next_state = %{state | step: :check_publication_tables} - - assert {:query, ^expected_query, ^expected_next_state} = - TestReplicationConnection.handle_result(result, state) - end - - test "handle_result creates publication if it doesn't exist" do - state = %{mock_state() | step: :check_publication} - result = [%Postgrex.Result{num_rows: 0}] - - expected_tables = "test_schema.accounts,test_schema.resources" - expected_query = "CREATE PUBLICATION #{state.publication_name} FOR TABLE #{expected_tables}" - expected_next_state = %{state | step: :check_replication_slot} - - assert {:query, ^expected_query, ^expected_next_state} = - TestReplicationConnection.handle_result(result, state) - end - - test "handle_result proceeds to replication slot when publication tables are up to date" do - state = %{mock_state() | step: :check_publication_tables} - # Mock existing tables that match our desired tables exactly - result = [ - %Postgrex.Result{rows: [["test_schema", "accounts"], ["test_schema", "resources"]]} - ] - - expected_query = - "SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'" - - expected_next_state = %{state | step: :create_slot} - - assert {:query, ^expected_query, ^expected_next_state} = - TestReplicationConnection.handle_result(result, state) - end - - test "handle_result adds new tables when they are missing from publication" do - state = %{mock_state() | step: :check_publication_tables} - # Mock existing tables that are missing "resources" - result = [%Postgrex.Result{rows: [["test_schema", "accounts"]]}] - - expected_query = - "ALTER PUBLICATION #{state.publication_name} ADD TABLE test_schema.resources" - - expected_next_state = %{ - state - | step: :remove_publication_tables, - tables_to_remove: MapSet.new() + operation = %{ + lsn: lsn, + op: op, + table: table, + old_data: old_data, + data: data } - assert {:query, ^expected_query, ^expected_next_state} = - TestReplicationConnection.handle_result(result, state) + Map.put(state, :operations, [operation | operations]) end - test "handle_result removes unwanted tables when they exist in publication" do - state = %{mock_state() | step: :check_publication_tables} - # Mock existing tables that include an unwanted "old_table" - result = [ - %Postgrex.Result{ - rows: [ - ["test_schema", "accounts"], - ["test_schema", "resources"], - ["test_schema", "old_table"] - ] - } - ] + def on_flush(state) do + # Test implementation that counts flushes + flush_count = Map.get(state, :flush_count, 0) - expected_query = - "ALTER PUBLICATION #{state.publication_name} DROP TABLE test_schema.old_table" - - expected_next_state = %{state | step: :check_replication_slot} - - assert {:query, ^expected_query, ^expected_next_state} = - TestReplicationConnection.handle_result(result, state) - end - - test "handle_result adds tables first, then removes unwanted tables" do - state = %{mock_state() | step: :check_publication_tables} - # Mock existing tables that have "old_table" but missing "resources" - result = [ - %Postgrex.Result{rows: [["test_schema", "accounts"], ["test_schema", "old_table"]]} - ] - - expected_query = - "ALTER PUBLICATION #{state.publication_name} ADD TABLE test_schema.resources" - - expected_tables_to_remove = MapSet.new(["test_schema.old_table"]) - - expected_next_state = %{ - state - | step: :remove_publication_tables, - tables_to_remove: expected_tables_to_remove - } - - assert {:query, ^expected_query, ^expected_next_state} = - TestReplicationConnection.handle_result(result, state) - end - - test "verify MapSet behavior for debugging" do - # Verify that our MapSet is not empty - tables_to_remove = MapSet.new(["test_schema.old_table"]) - refute Enum.empty?(tables_to_remove) - assert MapSet.size(tables_to_remove) == 1 - assert MapSet.member?(tables_to_remove, "test_schema.old_table") - end - - test "handle_result removes tables after adding when tables_to_remove is not empty" do - tables_to_remove = MapSet.new(["test_schema.old_table"]) - - state = %{ - mock_state() - | step: :remove_publication_tables, - tables_to_remove: tables_to_remove - } - - result = [%Postgrex.Result{}] - - # Debug: verify the state is what we think it is - refute Enum.empty?(state.tables_to_remove) - assert state.step == :remove_publication_tables - - expected_query = - "ALTER PUBLICATION #{state.publication_name} DROP TABLE test_schema.old_table" - - expected_next_state = %{state | step: :check_replication_slot} - - assert {:query, ^expected_query, ^expected_next_state} = - TestReplicationConnection.handle_result(result, state) - end - - test "handle_result proceeds to replication slot when no tables to remove" do - state = %{mock_state() | step: :remove_publication_tables, tables_to_remove: MapSet.new()} - result = [%Postgrex.Result{}] - - expected_query = - "SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'" - - expected_next_state = %{state | step: :create_slot} - - assert {:query, ^expected_query, ^expected_next_state} = - TestReplicationConnection.handle_result(result, state) - end - - test "handle_result transitions from create_slot to start_replication_slot when slot exists" do - state = %{mock_state() | step: :create_slot} - result = [%Postgrex.Result{num_rows: 1}] - - expected_query = "SELECT 1" - expected_next_state = %{state | step: :start_replication_slot} - - assert {:query, ^expected_query, ^expected_next_state} = - TestReplicationConnection.handle_result(result, state) - end - - test "handle_result transitions from start_replication_slot to streaming" do - state = %{mock_state() | step: :start_replication_slot} - result = [%Postgrex.Result{num_rows: 1}] - - expected_stream_query = - "START_REPLICATION SLOT \"#{state.replication_slot_name}\" LOGICAL 0/0 (proto_version '#{state.proto_version}', publication_names '#{state.publication_name}')" - - expected_next_state = %{state | step: :streaming} - - assert {:stream, ^expected_stream_query, [], ^expected_next_state} = - TestReplicationConnection.handle_result(result, state) - end - - test "handle_result transitions from check_replication_slot to create_slot after creating publication" do - state = %{mock_state() | step: :check_replication_slot} - result = [%Postgrex.Result{num_rows: 0}] - - expected_query = - "SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'" - - expected_next_state = %{state | step: :create_slot} - - assert {:query, ^expected_query, ^expected_next_state} = - TestReplicationConnection.handle_result(result, state) - end - - test "handle_result creates replication slot if it doesn't exist" do - state = %{mock_state() | step: :create_slot} - result = [%Postgrex.Result{num_rows: 0}] - - expected_query = - "CREATE_REPLICATION_SLOT #{state.replication_slot_name} LOGICAL #{state.output_plugin} NOEXPORT_SNAPSHOT" - - expected_next_state = %{state | step: :start_replication_slot} - - assert {:query, ^expected_query, ^expected_next_state} = - TestReplicationConnection.handle_result(result, state) + state + |> Map.put(:flush_count, flush_count + 1) + |> Map.put(:flush_buffer, %{}) end end - describe "handle_data/2" do - test "handle_data handles KeepAlive with reply :now" do - state = %{mock_state() | step: :streaming} - wal_end = 12345 + # Create a test module that captures relation updates + defmodule RelationTestConnection do + use Domain.Replication.Connection - now = - System.os_time(:microsecond) - DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond) + def on_write(state, _, _, _, _, _), do: state - # 100 milliseconds - grace_period = 100_000 - keepalive_data = <> + def on_flush(state), do: state + end - assert {:noreply, reply, ^state} = - TestReplicationConnection.handle_data(keepalive_data, state) - - assert [<>] = reply - - assert now <= clock - assert clock < now + grace_period - end - - test "handle_data handles KeepAlive with reply :later" do - state = %{mock_state() | step: :streaming} - wal_end = 54321 - - keepalive_data = <> - expected_reply_message = [] - - assert {:noreply, ^expected_reply_message, ^state} = - TestReplicationConnection.handle_data(keepalive_data, state) - end - - test "handle_data handles Write message and increments counter" do - state = %{mock_state() | step: :streaming} - server_wal_start = 123_456_789 - server_wal_end = 987_654_321 - server_system_clock = 1_234_567_890 - message = "Hello, world!" - - write_data = - <> - - new_state = %{state | counter: state.counter + 1} - expected_wal_end = server_wal_end + 1 - - assert {:noreply, [ack_message], ^new_state} = - TestReplicationConnection.handle_data(write_data, state) - - # Validate the acknowledgment structure without pinning the timestamp - assert <> = ack_message - end - - test "handle_data handles unknown message" do - state = %{mock_state() | step: :streaming} - unknown_data = <> - - assert {:noreply, [], ^state} = TestReplicationConnection.handle_data(unknown_data, state) - end - - test "sends {:check_warning_threshold, lag_ms} > 5_000 ms" do - state = - %{mock_state() | step: :streaming} - |> Map.put(:warning_threshold_exceeded?, false) - - server_wal_start = 123_456_789 - server_wal_end = 987_654_321 - server_system_clock = 1_234_567_890 - lsn = <<0::32, 100::32>> - xid = <<0::32>> - - # Simulate a commit timestamp that exceeds the threshold - timestamp = - DateTime.diff(DateTime.utc_now(), ~U[2000-01-01 00:00:00Z], :microsecond) - 10_000_000 - - begin_data = <> - - write_message = - <> - - expected_wal_end = server_wal_end + 1 - - assert {:noreply, [ack_message], _state} = - TestReplicationConnection.handle_data(write_message, state) - - # Validate acknowledgment structure - assert <> = ack_message - - assert_receive({:check_warning_threshold, lag_ms}) - assert lag_ms > 5_000 - end - - test "sends {:check_warning_threshold, lag_ms} < 5_000 ms" do - state = - %{mock_state() | step: :streaming} - |> Map.put(:warning_threshold_exceeded?, true) - - server_wal_start = 123_456_789 - server_wal_end = 987_654_321 - server_system_clock = 1_234_567_890 - lsn = <<0::32, 100::32>> - xid = <<0::32>> - - # Simulate a commit timestamp that is within the threshold - timestamp = - DateTime.diff(DateTime.utc_now(), ~U[2000-01-01 00:00:00Z], :microsecond) - 1_000_000 - - begin_data = <> - - write_message = - <> - - expected_wal_end = server_wal_end + 1 - - assert {:noreply, [ack_message], _state} = - TestReplicationConnection.handle_data(write_message, state) - - # Validate acknowledgment structure - assert <> = ack_message - - assert_receive({:check_warning_threshold, lag_ms}) - assert lag_ms < 5_000 + describe "macro compilation" do + test "creates a module with required functions" do + # Verify the module was created with expected functions + assert function_exported?(TestReplicationConnection, :start_link, 1) + assert function_exported?(TestReplicationConnection, :init, 1) + assert function_exported?(TestReplicationConnection, :handle_connect, 1) + assert function_exported?(TestReplicationConnection, :handle_disconnect, 1) + assert function_exported?(TestReplicationConnection, :handle_result, 2) + assert function_exported?(TestReplicationConnection, :handle_data, 2) + assert function_exported?(TestReplicationConnection, :handle_info, 2) + assert function_exported?(TestReplicationConnection, :on_write, 6) + assert function_exported?(TestReplicationConnection, :on_flush, 1) end end - describe "handle_info/2" do - test "handle_info handles :shutdown message" do - state = mock_state() - assert {:disconnect, :normal} = TestReplicationConnection.handle_info(:shutdown, state) - end + describe "struct definition" do + test "defines correct struct with expected fields" do + assert %TestReplicationConnection{} = struct = %TestReplicationConnection{} - test "handle_info handles :DOWN message from monitored process" do - state = mock_state() - monitor_ref = make_ref() - down_msg = {:DOWN, monitor_ref, :process, :some_pid, :shutdown} - - assert {:disconnect, :normal} = TestReplicationConnection.handle_info(down_msg, state) - end - - test "handle_info ignores other messages" do - state = mock_state() - random_msg = {:some_other_info, "data"} - - assert {:noreply, ^state} = TestReplicationConnection.handle_info(random_msg, state) - end - - test "handle_info processes warning threshold alerts" do - state = Map.put(mock_state(), :warning_threshold_exceeded?, false) - - # Test crossing threshold - assert {:noreply, %{warning_threshold_exceeded?: true}} = - TestReplicationConnection.handle_info({:check_warning_threshold, 6_000}, state) - - # Test going back below threshold - state_above = %{state | warning_threshold_exceeded?: true} - - assert {:noreply, %{warning_threshold_exceeded?: false}} = - TestReplicationConnection.handle_info( - {:check_warning_threshold, 3_000}, - state_above - ) - - # Test staying below threshold - assert {:noreply, %{warning_threshold_exceeded?: false}} = - TestReplicationConnection.handle_info({:check_warning_threshold, 2_000}, state) - - # Test staying above threshold - assert {:noreply, %{warning_threshold_exceeded?: true}} = - TestReplicationConnection.handle_info( - {:check_warning_threshold, 7_000}, - state_above - ) + # Check default values + assert struct.schema == "public" + assert struct.step == :disconnected + assert struct.output_plugin == "pgoutput" + assert struct.proto_version == 1 + assert struct.table_subscriptions == [] + assert struct.relations == %{} + assert struct.counter == 0 + assert struct.tables_to_remove == MapSet.new() + assert struct.flush_interval == 0 + assert struct.flush_buffer == %{} + assert struct.last_flushed_lsn == 0 + assert struct.warning_threshold_exceeded? == false + assert struct.error_threshold_exceeded? == false + assert struct.flush_buffer_size == 0 + assert struct.status_log_interval == :timer.minutes(1) + assert struct.warning_threshold == :timer.seconds(30) + assert struct.error_threshold == :timer.seconds(60) end end - describe "error threshold functionality" do - test "handle_info sets error_threshold_exceeded? to true when lag exceeds error threshold" do - state = - mock_state() - |> Map.put(:error_threshold_exceeded?, false) - - # Test crossing the error threshold (60_000ms from TestReplicationConnection config) - assert {:noreply, updated_state} = - TestReplicationConnection.handle_info({:check_error_threshold, 65_000}, state) - - assert updated_state.error_threshold_exceeded? == true + describe "initialization" do + test "init/1 preserves state" do + initial_state = %TestReplicationConnection{counter: 42} + {:ok, state} = TestReplicationConnection.init(initial_state) + assert state.counter == 42 end - test "handle_info sets error_threshold_exceeded? to false when lag drops below error threshold" do - state = - mock_state() - |> Map.put(:error_threshold_exceeded?, true) + test "init/1 schedules flush when flush_interval > 0" do + initial_state = %TestReplicationConnection{flush_interval: 1000} + {:ok, _state} = TestReplicationConnection.init(initial_state) - # Test going back below threshold - assert {:noreply, updated_state} = - TestReplicationConnection.handle_info({:check_error_threshold, 30_000}, state) - - assert updated_state.error_threshold_exceeded? == false + # Should receive flush message after interval + assert_receive :flush, 1100 end - test "handle_info keeps error_threshold_exceeded? true when lag stays above error threshold" do - state = - mock_state() - |> Map.put(:error_threshold_exceeded?, true) + test "init/1 does not schedule flush when flush_interval is 0" do + initial_state = %TestReplicationConnection{flush_interval: 0} + {:ok, _state} = TestReplicationConnection.init(initial_state) - # Test staying above threshold - assert {:noreply, updated_state} = - TestReplicationConnection.handle_info({:check_error_threshold, 70_000}, state) - - assert updated_state.error_threshold_exceeded? == true - end - - test "handle_info keeps error_threshold_exceeded? false when lag stays below error threshold" do - state = - mock_state() - |> Map.put(:error_threshold_exceeded?, false) - - # Test staying below threshold - assert {:noreply, updated_state} = - TestReplicationConnection.handle_info({:check_error_threshold, 30_000}, state) - - assert updated_state.error_threshold_exceeded? == false + # Should not receive flush message + refute_receive :flush, 100 end end - describe "BEGIN message lag tracking with error threshold" do - test "sends both check_warning_threshold and check_error_threshold messages" do - state = - %{mock_state() | step: :streaming} - |> Map.put(:warning_threshold_exceeded?, false) - |> Map.put(:error_threshold_exceeded?, false) + describe "handle_connect/1" do + test "returns query to check publication" do + state = %TestReplicationConnection{publication_name: "test_pub"} - server_wal_start = 123_456_789 - server_wal_end = 987_654_321 - server_system_clock = 1_234_567_890 - lsn = <<0::32, 100::32>> - xid = <<0::32>> + {:query, query, new_state} = TestReplicationConnection.handle_connect(state) - # Simulate a commit timestamp that exceeds both thresholds (70 seconds lag) - timestamp = - DateTime.diff(DateTime.utc_now(), ~U[2000-01-01 00:00:00Z], :microsecond) - 70_000_000 - - begin_data = <> - - write_message = - <> - - expected_wal_end = server_wal_end + 1 - - assert {:noreply, [ack_message], _state} = - TestReplicationConnection.handle_data(write_message, state) - - # Validate acknowledgment structure - assert <> = ack_message - - # Should receive both threshold check messages - assert_receive {:check_warning_threshold, warning_lag_ms} - assert warning_lag_ms > 5_000 - - assert_receive {:check_error_threshold, error_lag_ms} - assert error_lag_ms > 60_000 - - # Both should report the same lag time - assert warning_lag_ms == error_lag_ms - end - - test "sends check_error_threshold with lag below error threshold" do - state = - %{mock_state() | step: :streaming} - |> Map.put(:warning_threshold_exceeded?, false) - |> Map.put(:error_threshold_exceeded?, false) - - server_wal_start = 123_456_789 - server_wal_end = 987_654_321 - server_system_clock = 1_234_567_890 - lsn = <<0::32, 100::32>> - xid = <<0::32>> - - # Simulate a commit timestamp with moderate lag (10 seconds) - timestamp = - DateTime.diff(DateTime.utc_now(), ~U[2000-01-01 00:00:00Z], :microsecond) - 10_000_000 - - begin_data = <> - - write_message = - <> - - expected_wal_end = server_wal_end + 1 - - assert {:noreply, [ack_message], _state} = - TestReplicationConnection.handle_data(write_message, state) - - # Validate acknowledgment structure - assert <> = ack_message - - # Should receive both threshold check messages - assert_receive {:check_warning_threshold, warning_lag_ms} - assert warning_lag_ms > 5_000 - - assert_receive {:check_error_threshold, error_lag_ms} - assert error_lag_ms < 60_000 - # Still above warning threshold - assert error_lag_ms > 5_000 - end - end - - describe "message processing bypass behavior" do - # Note: We can't directly test handle_message/3 since it's private, - # but we can test the behavior by mocking the on_insert/on_update/on_delete callbacks - # and verifying they're not called when error_threshold_exceeded? is true - - defmodule TestCallbackModule do - use Domain.Replication.Connection, - warning_threshold_ms: 5_000, - error_threshold_ms: 60_000 - - def on_insert(lsn, table, data) do - send(self(), {:callback_called, :on_insert, lsn, table, data}) - :ok - end - - def on_update(lsn, table, old_data, data) do - send(self(), {:callback_called, :on_update, lsn, table, old_data, data}) - :ok - end - - def on_delete(lsn, table, old_data) do - send(self(), {:callback_called, :on_delete, lsn, table, old_data}) - :ok - end - end - - test "insert processing is bypassed when error threshold exceeded" do - # Create a relation that can be referenced - relation = %{ - namespace: "test_schema", - name: "test_table", - columns: [%{name: "id"}, %{name: "name"}] - } - - state = - %TestCallbackModule{ - schema: "test_schema", - step: :streaming, - publication_name: "test_pub", - replication_slot_name: "test_slot", - output_plugin: "pgoutput", - proto_version: 1, - table_subscriptions: ["test_table"], - relations: %{1 => relation}, - counter: 0, - tables_to_remove: MapSet.new() - } - |> Map.put(:error_threshold_exceeded?, true) - - # Since we can't easily construct valid WAL insert messages without more - # complex setup, let's focus on testing the threshold management itself - - # The key test is that when error_threshold_exceeded? is true, - # the system should handle this gracefully - assert state.error_threshold_exceeded? == true - end - - test "callbacks are called when error threshold not exceeded" do - # Similar setup but with error_threshold_exceeded? set to false - relation = %{ - namespace: "test_schema", - name: "test_table", - columns: [%{name: "id"}, %{name: "name"}] - } - - state = - %TestCallbackModule{ - schema: "test_schema", - step: :streaming, - publication_name: "test_pub", - replication_slot_name: "test_slot", - output_plugin: "pgoutput", - proto_version: 1, - table_subscriptions: ["test_table"], - relations: %{1 => relation}, - counter: 0, - tables_to_remove: MapSet.new() - } - |> Map.put(:error_threshold_exceeded?, false) - - # Test that the state is properly configured for normal processing - assert state.error_threshold_exceeded? == false - - # The actual message processing would require constructing complex WAL messages, - # but the important thing is that the state management works correctly - end - end - - describe "error threshold integration with warning threshold" do - test "both thresholds can be managed independently" do - state = - mock_state() - |> Map.put(:warning_threshold_exceeded?, false) - |> Map.put(:error_threshold_exceeded?, false) - - # Cross warning threshold only - assert {:noreply, updated_state} = - TestReplicationConnection.handle_info({:check_warning_threshold, 10_000}, state) - - assert updated_state.warning_threshold_exceeded? == true - assert updated_state.error_threshold_exceeded? == false - - # Cross error threshold - assert {:noreply, updated_state2} = - TestReplicationConnection.handle_info( - {:check_error_threshold, 70_000}, - updated_state - ) - - assert updated_state2.warning_threshold_exceeded? == true - assert updated_state2.error_threshold_exceeded? == true - - # Drop below error threshold but stay above warning - assert {:noreply, updated_state3} = - TestReplicationConnection.handle_info( - {:check_error_threshold, 10_000}, - updated_state2 - ) - - assert updated_state3.warning_threshold_exceeded? == true - assert updated_state3.error_threshold_exceeded? == false - - # Drop below warning threshold - assert {:noreply, updated_state4} = - TestReplicationConnection.handle_info( - {:check_warning_threshold, 2_000}, - updated_state3 - ) - - assert updated_state4.warning_threshold_exceeded? == false - assert updated_state4.error_threshold_exceeded? == false + assert query == "SELECT 1 FROM pg_publication WHERE pubname = 'test_pub'" + assert new_state.step == :check_publication end end describe "handle_disconnect/1" do - test "handle_disconnect resets step to :disconnected" do - state = %{mock_state() | step: :streaming} - expected_state = %{state | step: :disconnected} + test "logs disconnection and updates state" do + state = %TestReplicationConnection{counter: 123, step: :streaming} - assert {:noreply, ^expected_state} = TestReplicationConnection.handle_disconnect(state) + log = + capture_log(fn -> + {:noreply, new_state} = TestReplicationConnection.handle_disconnect(state) + assert new_state.step == :disconnected + assert new_state.counter == 123 + end) + + assert log =~ "Replication connection disconnected" + assert log =~ "counter=123" + end + end + + describe "handle_info/2" do + test "handles :shutdown message" do + assert {:disconnect, :normal} = TestReplicationConnection.handle_info(:shutdown, %{}) + end + + test "handles DOWN message" do + assert {:disconnect, :normal} = + TestReplicationConnection.handle_info({:DOWN, nil, :process, nil, nil}, %{}) + end + + test "handles :flush message when flush_interval > 0" do + state = + %TestReplicationConnection{ + flush_interval: 1000, + flush_buffer: %{1 => %{data: "test"}} + } + |> Map.put(:operations, []) + |> Map.put(:flush_count, 0) + + {:noreply, new_state} = TestReplicationConnection.handle_info(:flush, state) + + # Our test on_flush implementation increments flush_count + assert Map.get(new_state, :flush_count) == 1 + assert new_state.flush_buffer == %{} + + # Should schedule next flush + assert_receive :flush, 1100 + end + + test "handles :interval_logger message" do + state = %TestReplicationConnection{counter: 456, status_log_interval: 100} + + log = + capture_log(fn -> + {:noreply, _new_state} = TestReplicationConnection.handle_info(:interval_logger, state) + end) + + assert log =~ "Processed 456 write messages from the WAL stream" + + # Should schedule next log + assert_receive :interval_logger, 150 + end + + test "handles warning threshold checks" do + # Below threshold + state = %TestReplicationConnection{ + warning_threshold_exceeded?: false, + warning_threshold: 1000 + } + + {:noreply, new_state} = + TestReplicationConnection.handle_info({:check_warning_threshold, 500}, state) + + assert new_state.warning_threshold_exceeded? == false + + # Above threshold + log = + capture_log(fn -> + {:noreply, new_state2} = + TestReplicationConnection.handle_info({:check_warning_threshold, 1500}, state) + + assert new_state2.warning_threshold_exceeded? == true + end) + + assert log =~ "Processing lag exceeds warning threshold" + + # Back below threshold + exceeded_state = %{state | warning_threshold_exceeded?: true} + + log2 = + capture_log(fn -> + {:noreply, new_state3} = + TestReplicationConnection.handle_info({:check_warning_threshold, 500}, exceeded_state) + + assert new_state3.warning_threshold_exceeded? == false + end) + + assert log2 =~ "Processing lag is back below warning threshold" + end + + test "handles error threshold checks" do + # Below threshold + state = %TestReplicationConnection{ + error_threshold_exceeded?: false, + error_threshold: 2000 + } + + {:noreply, new_state} = + TestReplicationConnection.handle_info({:check_error_threshold, 1000}, state) + + assert new_state.error_threshold_exceeded? == false + + # Above threshold + log = + capture_log(fn -> + {:noreply, new_state2} = + TestReplicationConnection.handle_info({:check_error_threshold, 3000}, state) + + assert new_state2.error_threshold_exceeded? == true + end) + + assert log =~ "Processing lag exceeds error threshold; skipping side effects!" + + # Back below threshold + exceeded_state = %{state | error_threshold_exceeded?: true} + + log2 = + capture_log(fn -> + {:noreply, new_state3} = + TestReplicationConnection.handle_info({:check_error_threshold, 1000}, exceeded_state) + + assert new_state3.error_threshold_exceeded? == false + end) + + assert log2 =~ "Processing lag is back below error threshold" + end + + test "handles unknown messages" do + state = %TestReplicationConnection{counter: 789} + {:noreply, new_state} = TestReplicationConnection.handle_info(:unknown_message, state) + assert new_state == state + end + end + + describe "write message handling" do + test "processes insert messages" do + state = + %TestReplicationConnection{ + relations: %{ + 1 => %{ + namespace: "public", + name: "users", + columns: [ + %{name: "id"}, + %{name: "name"} + ] + } + }, + counter: 0 + } + |> Map.put(:operations, []) + + # Test on_write callback directly + # In a real scenario, handle_data would parse WAL messages and eventually + # call on_write with the decoded operation data + new_state = + TestReplicationConnection.on_write( + state, + 100, + :insert, + "users", + nil, + %{"id" => "123", "name" => "John Doe"} + ) + + operations = Map.get(new_state, :operations, []) + assert length(operations) == 1 + [operation] = operations + assert operation.op == :insert + assert operation.table == "users" + assert operation.lsn == 100 + end + + test "processes update messages" do + state = + %TestReplicationConnection{} + |> Map.put(:operations, []) + + new_state = + TestReplicationConnection.on_write( + state, + 101, + :update, + "users", + %{"id" => "123", "name" => "John"}, + %{"id" => "123", "name" => "John Doe"} + ) + + operations = Map.get(new_state, :operations, []) + assert length(operations) == 1 + [operation] = operations + assert operation.op == :update + assert operation.old_data == %{"id" => "123", "name" => "John"} + assert operation.data == %{"id" => "123", "name" => "John Doe"} + end + + test "processes delete messages" do + state = + %TestReplicationConnection{} + |> Map.put(:operations, []) + + new_state = + TestReplicationConnection.on_write( + state, + 102, + :delete, + "users", + %{"id" => "123", "name" => "John Doe"}, + nil + ) + + operations = Map.get(new_state, :operations, []) + assert length(operations) == 1 + [operation] = operations + assert operation.op == :delete + assert operation.old_data == %{"id" => "123", "name" => "John Doe"} + assert operation.data == nil + end + + test "on_write always processes operations in test implementation" do + # Note: In the real implementation, process_write checks error_threshold_exceeded? + # but our test implementation doesn't, so operations are always processed + state = + %TestReplicationConnection{ + error_threshold_exceeded?: true + } + |> Map.put(:operations, []) + + new_state = + TestReplicationConnection.on_write( + state, + 103, + :insert, + "users", + nil, + %{"id" => "456"} + ) + + # Our test implementation always processes operations + operations = Map.get(new_state, :operations, []) + assert length(operations) == 1 + end + end + + describe "flush behavior" do + test "calls on_flush when buffer size reached" do + state = + %TestReplicationConnection{ + flush_buffer: %{1 => %{}, 2 => %{}}, + flush_buffer_size: 3 + } + |> Map.put(:flush_count, 0) + |> Map.put(:operations, []) + + # Adding one more should trigger flush + # In the real implementation, this would happen in process_write + # when maybe_flush is called + new_state = %{state | flush_buffer: Map.put(state.flush_buffer, 3, %{})} + + # Simulate maybe_flush logic + flushed_state = + if map_size(new_state.flush_buffer) >= new_state.flush_buffer_size do + TestReplicationConnection.on_flush(new_state) + else + new_state + end + + assert Map.get(flushed_state, :flush_count) == 1 + assert flushed_state.flush_buffer == %{} + end + end + + describe "publication and slot management flow" do + test "creates publication when it doesn't exist" do + state = %TestReplicationConnection{ + publication_name: "test_pub", + table_subscriptions: ["users", "posts"], + schema: "public", + step: :check_publication + } + + # Simulate publication not existing + {:query, query, new_state} = + TestReplicationConnection.handle_result( + [%Postgrex.Result{num_rows: 0}], + state + ) + + assert query == "CREATE PUBLICATION test_pub FOR TABLE public.users,public.posts" + assert new_state.step == :check_replication_slot + end + + test "checks publication tables when publication exists" do + state = %TestReplicationConnection{ + publication_name: "test_pub", + step: :check_publication + } + + {:query, query, new_state} = + TestReplicationConnection.handle_result( + [%Postgrex.Result{num_rows: 1}], + state + ) + + assert query =~ "SELECT schemaname, tablename" + assert query =~ "FROM pg_publication_tables" + assert query =~ "WHERE pubname = 'test_pub'" + assert new_state.step == :check_publication_tables + end + + test "creates replication slot when it doesn't exist" do + state = %TestReplicationConnection{ + replication_slot_name: "test_slot", + output_plugin: "pgoutput", + step: :create_slot + } + + {:query, query, new_state} = + TestReplicationConnection.handle_result( + [%Postgrex.Result{num_rows: 0}], + state + ) + + assert query == "CREATE_REPLICATION_SLOT test_slot LOGICAL pgoutput NOEXPORT_SNAPSHOT" + assert new_state.step == :start_replication_slot + end + + test "starts replication when slot exists" do + state = %TestReplicationConnection{ + replication_slot_name: "test_slot", + publication_name: "test_pub", + proto_version: 1, + step: :start_replication_slot + } + + {:stream, query, [], new_state} = + TestReplicationConnection.handle_result( + [%Postgrex.Result{}], + state + ) + + assert query =~ "START_REPLICATION SLOT \"test_slot\"" + assert query =~ "publication_names 'test_pub'" + assert query =~ "proto_version '1'" + assert new_state.step == :streaming + end + end + + describe "relation message handling" do + test "stores relation information" do + state = %RelationTestConnection{relations: %{}} + + # In the real implementation, this would be called from handle_data + # when a Relation message is received. Since handle_write is private, + # we can't test it directly, but we know it updates the relations map + relation = %{ + id: 1, + namespace: "public", + name: "test_table", + columns: [%{name: "id"}, %{name: "data"}] + } + + # The relation would be stored with id as key + new_state = %{state | relations: Map.put(state.relations, relation.id, relation)} + + assert new_state.relations[1].name == "test_table" + assert length(new_state.relations[1].columns) == 2 end end end diff --git a/elixir/config/config.exs b/elixir/config/config.exs index 31d929588..6d66a3c75 100644 --- a/elixir/config/config.exs +++ b/elixir/config/config.exs @@ -64,7 +64,19 @@ config :domain, Domain.ChangeLogs.ReplicationConnection, resource_connections resources tokens - ] + ], + # Allow up to 5 minutes of processing lag before alerting. This needs to be able to survive + # deploys without alerting. + warning_threshold: :timer.minutes(5), + + # We almost never want to bypass changelog inserts + error_threshold: :timer.hours(30 * 24), + + # Flush change logs data at least every 30 seconds + flush_interval: :timer.seconds(30), + + # We want to flush at most 500 change logs at a time + flush_buffer_size: 500 config :domain, Domain.Events.ReplicationConnection, replication_slot_name: "events_slot", @@ -98,7 +110,16 @@ config :domain, Domain.Events.ReplicationConnection, resource_connections resources tokens - ] + ], + # Allow up to 60 seconds of lag before alerting + warning_threshold: :timer.seconds(60), + + # Allow up to 30 minutes of lag before bypassing hooks + error_threshold: :timer.minutes(30), + + # Disable flush + flush_interval: 0, + flush_buffer_size: 0 config :domain, Domain.Tokens, key_base: "5OVYJ83AcoQcPmdKNksuBhJFBhjHD1uUa9mDOHV/6EIdBQ6pXksIhkVeWIzFk5S2", diff --git a/elixir/config/test.exs b/elixir/config/test.exs index 87eb6ae2c..7603f00bb 100644 --- a/elixir/config/test.exs +++ b/elixir/config/test.exs @@ -107,7 +107,8 @@ config :api, ############################### config :domain, Domain.Mailer, adapter: Domain.Mailer.TestAdapter -config :logger, level: :warning +# Allow asserting on info logs and higher +config :logger, level: :info config :argon2_elixir, t_cost: 1, m_cost: 8