diff --git a/elixir/apps/domain/lib/domain/change_logs.ex b/elixir/apps/domain/lib/domain/change_logs.ex index 8c665f0a4..1a78708ea 100644 --- a/elixir/apps/domain/lib/domain/change_logs.ex +++ b/elixir/apps/domain/lib/domain/change_logs.ex @@ -2,9 +2,11 @@ defmodule Domain.ChangeLogs do alias Domain.ChangeLogs.ChangeLog alias Domain.Repo - def create_change_log(attrs) do - attrs - |> ChangeLog.Changeset.changeset() - |> Repo.insert() + def bulk_insert(list_of_attrs) do + ChangeLog + |> Repo.insert_all(list_of_attrs, + on_conflict: :nothing, + conflict_target: [:lsn] + ) end end diff --git a/elixir/apps/domain/lib/domain/change_logs/change_log/changeset.ex b/elixir/apps/domain/lib/domain/change_logs/change_log/changeset.ex deleted file mode 100644 index 608e6263b..000000000 --- a/elixir/apps/domain/lib/domain/change_logs/change_log/changeset.ex +++ /dev/null @@ -1,81 +0,0 @@ -defmodule Domain.ChangeLogs.ChangeLog.Changeset do - use Domain, :changeset - - @fields ~w[account_id lsn table op old_data data vsn]a - - def changeset(attrs) do - %Domain.ChangeLogs.ChangeLog{} - |> cast(attrs, @fields) - |> validate_inclusion(:op, [:insert, :update, :delete]) - |> validate_correct_data_present() - |> validate_same_account() - |> put_account_id() - |> validate_required([:account_id, :lsn, :table, :op, :vsn]) - |> unique_constraint(:lsn) - |> foreign_key_constraint(:account_id, name: :change_logs_account_id_fkey) - end - - # :insert requires old_data = nil and data != nil - # :update requires old_data != nil and data != nil - # :delete requires old_data != nil and data = nil - def validate_correct_data_present(changeset) do - op = get_field(changeset, :op) - old_data = get_field(changeset, :old_data) - data = get_field(changeset, :data) - - case {op, old_data, data} do - {:insert, nil, %{} = _data} -> - changeset - - {:update, %{} = _old_data, %{} = _data} -> - changeset - - {:delete, %{} = _old_data, nil} -> - changeset - - _ -> - add_error(changeset, :base, "Invalid combination of operation and data") - end - end - - # Add an error if data["account_id"] != old_data["account_id"] - defp validate_same_account(changeset) do - old_data = get_field(changeset, :old_data) - data = get_field(changeset, :data) - - account_id_key = account_id_field(changeset) - - if old_data && data && old_data[account_id_key] != data[account_id_key] do - add_error(changeset, :base, "Account ID cannot be changed") - else - changeset - end - end - - # Populate account_id from one of data, old_data - defp put_account_id(changeset) do - old_data = get_field(changeset, :old_data) - data = get_field(changeset, :data) - - account_id_key = account_id_field(changeset) - - account_id = - case {old_data, data} do - {nil, nil} -> nil - {_, %{^account_id_key => id}} -> id - {%{^account_id_key => id}, _} -> id - _ -> nil - end - - put_change(changeset, :account_id, account_id) - end - - # For accounts table updates, the account_id is in the "id" field - # For other tables, it is in the "account_id" field - defp account_id_field(changeset) do - case get_field(changeset, :table) do - "accounts" -> "id" - _ -> "account_id" - end - end -end diff --git a/elixir/apps/domain/lib/domain/change_logs/replication_connection.ex b/elixir/apps/domain/lib/domain/change_logs/replication_connection.ex index 920024402..c38189f82 100644 --- a/elixir/apps/domain/lib/domain/change_logs/replication_connection.ex +++ b/elixir/apps/domain/lib/domain/change_logs/replication_connection.ex @@ -1,90 +1,83 @@ defmodule Domain.ChangeLogs.ReplicationConnection do + use Domain.Replication.Connection alias Domain.ChangeLogs - use Domain.Replication.Connection, - # Allow up to 5 minutes of processing lag before alerting. This needs to be able to survive - # deploys without alerting. - warning_threshold_ms: 5 * 60 * 1_000, - - # 1 month in ms - we never want to bypass changelog inserts - error_threshold_ms: 30 * 24 * 60 * 60 * 1_000 - # Bump this to signify a change in the audit log schema. Use with care. @vsn 0 - def on_insert(lsn, table, data) do - log(:insert, lsn, table, nil, data) + # Avoid overwhelming the change log with soft-deleted records getting hard-deleted en masse. + # Can be removed after https://github.com/firezone/firezone/issues/8187 is shipped. + def on_write(state, _lsn, :delete, _table, %{"deleted_at" => deleted_at}, _data) + when not is_nil(deleted_at) do + state end - def on_update(lsn, table, old_data, data) do - log(:update, lsn, table, old_data, data) + # Ignore token writes for relay_groups since these are not expected to have an account_id + def on_write(state, _lsn, _op, "tokens", %{"type" => "relay_group"}, _data), do: state + def on_write(state, _lsn, _op, "tokens", _old_data, %{"type" => "relay_group"}), do: state + + # Handle accounts specially + def on_write(state, lsn, op, "accounts", old_data, %{"id" => account_id} = data) do + buffer(state, lsn, op, "accounts", account_id, old_data, data) end - def on_delete(lsn, table, old_data) do - if is_nil(old_data["deleted_at"]) do - log(:delete, lsn, table, old_data, nil) - else - # Avoid overwhelming the change log with soft-deleted records getting hard-deleted en masse. - # Can be removed after https://github.com/firezone/firezone/issues/8187 is shipped. - :ok - end + # Handle other writes where an account_id is present + def on_write(state, lsn, op, table, old_data, %{"account_id" => account_id} = data) + when not is_nil(account_id) do + buffer(state, lsn, op, table, account_id, old_data, data) end - # Relay group tokens don't have account_ids - - defp log(_op, _lsn, "tokens", %{"type" => "relay_group"}, _data) do - :ok + def on_write(state, lsn, op, table, %{"account_id" => account_id} = old_data, data) + when not is_nil(account_id) do + buffer(state, lsn, op, table, account_id, old_data, data) end - defp log(_op, _lsn, "tokens", _old_data, %{"type" => "relay_group"}) do - :ok - end - - defp log(op, lsn, table, old_data, data) do - attrs = %{ - op: op, + # If we get here, raise the alarm as it means we encountered a change we didn't expect. + def on_write(state, lsn, op, table, _old_data, _data) do + Logger.error( + "Unexpected write operation!", lsn: lsn, - table: table, - old_data: old_data, - data: data, - vsn: @vsn - } + op: op, + table: table + ) - case ChangeLogs.create_change_log(attrs) do - {:ok, _change_log} -> - :ok - - {:error, %Ecto.Changeset{errors: errors} = changeset} -> - if Enum.any?(errors, &should_skip_change_log?/1) do - # Expected under normal operation when an account is deleted or we are catching up on - # already-processed but not acknowledged WAL data. - :ok - else - Logger.warning("Failed to create change log", - errors: inspect(changeset.errors), - table: table, - op: op, - lsn: lsn, - vsn: @vsn - ) - - # TODO: WAL - # Don't ignore failures to insert change logs. Improve this after we have some - # operational experience with the data flowing in here. - :ok - end - end + state end - defp should_skip_change_log?({:account_id, {"does not exist", _violations}}) do - true + def on_flush(%{flush_buffer: flush_buffer} = state) when map_size(flush_buffer) == 0, do: state + + def on_flush(state) do + to_insert = Map.values(state.flush_buffer) + attempted_count = Enum.count(state.flush_buffer) + + {successful_count, _change_logs} = ChangeLogs.bulk_insert(to_insert) + + Logger.info("Flushed #{successful_count}/#{attempted_count} change logs") + + # We always advance the LSN to the highest LSN in the flush buffer because + # LSN conflicts just mean the data is already inserted, and other insert_all + # issues like a missing account_id will raise an exception. + last_lsn = + state.flush_buffer + |> Map.keys() + |> Enum.max() + + %{state | flush_buffer: %{}, last_flushed_lsn: last_lsn} end - defp should_skip_change_log?({:lsn, {"has already been taken", _violations}}) do - true - end + defp buffer(state, lsn, op, table, account_id, old_data, data) do + flush_buffer = + state.flush_buffer + |> Map.put_new(lsn, %{ + lsn: lsn, + op: op, + table: table, + account_id: account_id, + old_data: old_data, + data: data, + vsn: @vsn + }) - defp should_skip_change_log?(_error) do - false + %{state | flush_buffer: flush_buffer} end end diff --git a/elixir/apps/domain/lib/domain/events/replication_connection.ex b/elixir/apps/domain/lib/domain/events/replication_connection.ex index 48428764e..736dd8513 100644 --- a/elixir/apps/domain/lib/domain/events/replication_connection.ex +++ b/elixir/apps/domain/lib/domain/events/replication_connection.ex @@ -1,15 +1,7 @@ defmodule Domain.Events.ReplicationConnection do + use Domain.Replication.Connection alias Domain.Events.Hooks - use Domain.Replication.Connection, - # Allow up to 60 seconds of lag before alerting - warning_threshold_ms: 60 * 1_000, - - # Allow up to 30 minutes of lag before bypassing hooks - error_threshold_ms: 30 * 60 * 1_000 - - require Logger - @tables_to_hooks %{ "accounts" => Hooks.Accounts, "actor_group_memberships" => Hooks.ActorGroupMemberships, @@ -26,37 +18,18 @@ defmodule Domain.Events.ReplicationConnection do "tokens" => Hooks.Tokens } - def on_insert(_lsn, table, data) do - hook = Map.get(@tables_to_hooks, table) - - if hook do - hook.on_insert(data) + def on_write(state, _lsn, op, table, old_data, data) do + if hook = Map.get(@tables_to_hooks, table) do + case op do + :insert -> hook.on_insert(data) + :update -> hook.on_update(old_data, data) + :delete -> hook.on_delete(old_data) + end else - log_warning(:insert, table) - :ok + log_warning(op, table) end - end - def on_update(_lsn, table, old_data, data) do - hook = Map.get(@tables_to_hooks, table) - - if hook do - hook.on_update(old_data, data) - else - log_warning(:update, table) - :ok - end - end - - def on_delete(_lsn, table, old_data) do - hook = Map.get(@tables_to_hooks, table) - - if hook do - hook.on_delete(old_data) - else - log_warning(:delete, table) - :ok - end + state end defp log_warning(op, table) do diff --git a/elixir/apps/domain/lib/domain/replication/connection.ex b/elixir/apps/domain/lib/domain/replication/connection.ex index 129267b7b..f43ba34bb 100644 --- a/elixir/apps/domain/lib/domain/replication/connection.ex +++ b/elixir/apps/domain/lib/domain/replication/connection.ex @@ -15,17 +15,8 @@ defmodule Domain.Replication.Connection do ## Usage defmodule MyApp.ReplicationConnection do - use Domain.Replication.Connection, - status_log_interval_s: 60, - warning_threshold_ms: 30_000, - error_threshold_ms: 60 * 1_000 + use Domain.Replication.Connection end - - ## Options - - * `:warning_threshold_ms`: How long to allow the WAL stream to lag before logging a warning - * `:error_threshold_ms`: How long to allow the WAL stream to lag before logging an error and - bypassing side effect handlers. """ defmacro __using__(opts \\ []) do @@ -65,10 +56,6 @@ defmodule Domain.Replication.Connection do # Extract struct definition and constants defp struct_and_constants(opts) do quote bind_quoted: [opts: opts] do - @warning_threshold_ms Keyword.fetch!(opts, :warning_threshold_ms) - @error_threshold_ms Keyword.fetch!(opts, :error_threshold_ms) - @status_log_interval :timer.seconds(Keyword.get(opts, :status_log_interval_s, 60)) - # Everything else uses defaults @schema "public" @output_plugin "pgoutput" @@ -93,19 +80,56 @@ defmodule Domain.Replication.Connection do table_subscriptions: list(), relations: map(), counter: integer(), - tables_to_remove: MapSet.t() + tables_to_remove: MapSet.t(), + flush_interval: integer(), + flush_buffer: map(), + last_flushed_lsn: integer(), + warning_threshold_exceeded?: boolean(), + error_threshold_exceeded?: boolean(), + flush_buffer_size: integer(), + status_log_interval: integer(), + warning_threshold: integer(), + error_threshold: integer() } - defstruct schema: @schema, - step: :disconnected, - publication_name: nil, - replication_slot_name: nil, - output_plugin: @output_plugin, - proto_version: @proto_version, - table_subscriptions: [], - relations: %{}, - counter: 0, - tables_to_remove: MapSet.new() + defstruct( + # schema to use for the publication + schema: @schema, + # starting step + step: :disconnected, + # publication name to check/create + publication_name: nil, + # replication slot name to check/create + replication_slot_name: nil, + # output plugin to use for logical replication + output_plugin: @output_plugin, + # protocol version to use for logical replication + proto_version: @proto_version, + # tables we want to subscribe to in the publication + table_subscriptions: [], + # relations we have seen so far + relations: %{}, + # counter for the number of messages processed + counter: 0, + # calculated tables to remove from the publication + tables_to_remove: MapSet.new(), + # flush interval in milliseconds, set to 0 to use immediate processing + flush_interval: 0, + # buffer for data to flush + flush_buffer: %{}, + # last flushed LSN, used to track progress while flushing + last_flushed_lsn: 0, + # flags to track if we have exceeded warning/error thresholds + warning_threshold_exceeded?: false, + error_threshold_exceeded?: false, + # size of the flush buffer, used to determine when to flush + flush_buffer_size: 0, + # interval for logging status updates + status_log_interval: :timer.minutes(1), + # thresholds for warning and error logging + warning_threshold: :timer.seconds(30), + error_threshold: :timer.seconds(60) + ) end end @@ -119,10 +143,9 @@ defmodule Domain.Replication.Connection do @impl true def init(state) do - state = - state - |> Map.put(:warning_threshold_exceeded?, false) - |> Map.put(:error_threshold_exceeded?, false) + if state.flush_interval > 0 do + Process.send_after(self(), :flush, state.flush_interval) + end {:ok, state} end @@ -365,7 +388,14 @@ defmodule Domain.Replication.Connection do def handle_data(data, state) when is_keep_alive(data) do %KeepAlive{reply: reply, wal_end: wal_end} = parse(data) - wal_end = wal_end + 1 + wal_end = + if state.flush_interval > 0 do + # If we are flushing, we use the tracked last_flushed_lsn + state.last_flushed_lsn + 1 + else + # Otherwise, we assume to be current with the server + wal_end + 1 + end message = case reply do @@ -382,7 +412,7 @@ defmodule Domain.Replication.Connection do message |> decode_message() - |> handle_message(server_wal_end, %{state | counter: state.counter + 1}) + |> handle_write(server_wal_end, %{state | counter: state.counter + 1}) end end @@ -392,7 +422,7 @@ defmodule Domain.Replication.Connection do state: inspect(state) ) - {:noreply, [], state} + {:noreply, state} end end end @@ -402,14 +432,15 @@ defmodule Domain.Replication.Connection do quote do # Handles messages received: # - # 1. Insert/Update/Delete/Begin/Commit - send to appropriate hook - # 2. Relation messages - store the relation data in our state so we can use it later + # 1. Insert/Update/Delete - send to on_write/5 + # 2. Begin - check how far we are lagging behind + # 3. Relation messages - store the relation data in our state so we can use it later # to associate column names etc with the data we receive. In practice, we'll always # see a Relation message before we see any data for that relation. - # 3. Origin/Truncate/Type - we ignore these messages for now - # 4. Graceful shutdown - we respond with {:disconnect, :normal} to + # 4. Origin/Truncate/Type - we ignore these messages for now + # 5. Graceful shutdown - we respond with {:disconnect, :normal} to # indicate that we are shutting down gracefully and prevent auto reconnecting. - defp handle_message( + defp handle_write( %Decoder.Messages.Relation{ id: id, namespace: namespace, @@ -425,67 +456,57 @@ defmodule Domain.Replication.Connection do columns: columns } - {:noreply, ack(server_wal_end), - %{state | relations: Map.put(state.relations, id, relation)}} + {:noreply, %{state | relations: Map.put(state.relations, id, relation)}} end - defp handle_message(%Decoder.Messages.Insert{} = msg, server_wal_end, state) do - unless state.error_threshold_exceeded? do - {op, table, _old_data, data} = transform(msg, state.relations) - :ok = on_insert(server_wal_end, table, data) - end - - {:noreply, ack(server_wal_end), state} + defp handle_write(%Decoder.Messages.Insert{} = msg, server_wal_end, state) do + process_write(msg, server_wal_end, state) end - defp handle_message(%Decoder.Messages.Update{} = msg, server_wal_end, state) do - unless state.error_threshold_exceeded? do - {op, table, old_data, data} = transform(msg, state.relations) - :ok = on_update(server_wal_end, table, old_data, data) - end - - {:noreply, ack(server_wal_end), state} + defp handle_write(%Decoder.Messages.Update{} = msg, server_wal_end, state) do + process_write(msg, server_wal_end, state) end - defp handle_message(%Decoder.Messages.Delete{} = msg, server_wal_end, state) do - unless state.error_threshold_exceeded? do - {op, table, old_data, _data} = transform(msg, state.relations) - :ok = on_delete(server_wal_end, table, old_data) - end - - {:noreply, ack(server_wal_end), state} + defp handle_write(%Decoder.Messages.Delete{} = msg, server_wal_end, state) do + process_write(msg, server_wal_end, state) end - defp ack(server_wal_end) do - wal_end = server_wal_end + 1 - standby_status(wal_end, wal_end, wal_end, :now) + defp process_write(_msg, _server_wal_end, %{error_threshold_exceeded?: true} = state) do + {:noreply, state} end + + defp process_write(msg, server_wal_end, state) do + {op, table, old_data, data} = transform(msg, state.relations) + + state + |> on_write(server_wal_end, op, table, old_data, data) + |> maybe_flush() + |> then(&{:noreply, &1}) + end + + defp maybe_flush(%{flush_buffer: buffer, flush_buffer_size: size} = state) + when map_size(buffer) >= size do + on_flush(state) + end + + defp maybe_flush(state), do: state end end # Extract transaction and ignored message handlers defp transaction_handlers do quote do - defp handle_message( + defp handle_write( %Decoder.Messages.Begin{commit_timestamp: commit_timestamp} = msg, server_wal_end, state ) do - # Since we receive a commit for each operation and we process each operation - # one-by-one, we can use the commit timestamp to check if we are lagging behind. + # We can use the commit timestamp to check how far we are lagging behind lag_ms = DateTime.diff(DateTime.utc_now(), commit_timestamp, :millisecond) send(self(), {:check_warning_threshold, lag_ms}) send(self(), {:check_error_threshold, lag_ms}) - {:noreply, ack(server_wal_end), state} - end - - defp handle_message( - %Decoder.Messages.Commit{commit_timestamp: commit_timestamp}, - server_wal_end, - state - ) do - {:noreply, ack(server_wal_end), state} + {:noreply, state} end end end @@ -494,25 +515,30 @@ defmodule Domain.Replication.Connection do defp ignored_message_handlers do quote do # These messages are not relevant for our use case, so we ignore them. - defp handle_message(%Decoder.Messages.Origin{}, server_wal_end, state) do - {:noreply, ack(server_wal_end), state} + + defp handle_write(%Decoder.Messages.Commit{}, _server_wal_end, state) do + {:noreply, state} end - defp handle_message(%Decoder.Messages.Truncate{}, server_wal_end, state) do - {:noreply, ack(server_wal_end), state} + defp handle_write(%Decoder.Messages.Origin{}, _server_wal_end, state) do + {:noreply, state} end - defp handle_message(%Decoder.Messages.Type{}, server_wal_end, state) do - {:noreply, ack(server_wal_end), state} + defp handle_write(%Decoder.Messages.Truncate{}, _server_wal_end, state) do + {:noreply, state} end - defp handle_message(%Decoder.Messages.Unsupported{data: data}, server_wal_end, state) do + defp handle_write(%Decoder.Messages.Type{}, _server_wal_end, state) do + {:noreply, state} + end + + defp handle_write(%Decoder.Messages.Unsupported{data: data}, _server_wal_end, state) do Logger.warning("#{__MODULE__}: Unsupported message received", data: inspect(data), counter: state.counter ) - {:noreply, ack(server_wal_end), state} + {:noreply, state} end end end @@ -559,25 +585,24 @@ defmodule Domain.Replication.Connection do end end - # Extract info handlers defp info_handlers(opts) do quote bind_quoted: [opts: opts] do @impl true def handle_info( {:check_warning_threshold, lag_ms}, - %{warning_threshold_exceeded?: false} = state + %{warning_threshold_exceeded?: false, warning_threshold: warning_threshold} = state ) - when lag_ms >= @warning_threshold_ms do + when lag_ms >= warning_threshold do Logger.warning("#{__MODULE__}: Processing lag exceeds warning threshold", lag_ms: lag_ms) {:noreply, %{state | warning_threshold_exceeded?: true}} end def handle_info( {:check_warning_threshold, lag_ms}, - %{warning_threshold_exceeded?: true} = state + %{warning_threshold_exceeded?: true, warning_threshold: warning_threshold} = state ) - when lag_ms < @warning_threshold_ms do + when lag_ms < warning_threshold do Logger.info("#{__MODULE__}: Processing lag is back below warning threshold", lag_ms: lag_ms ) @@ -587,9 +612,9 @@ defmodule Domain.Replication.Connection do def handle_info( {:check_error_threshold, lag_ms}, - %{error_threshold_exceeded?: false} = state + %{error_threshold_exceeded?: false, error_threshold: error_threshold} = state ) - when lag_ms >= @error_threshold_ms do + when lag_ms >= error_threshold do Logger.error( "#{__MODULE__}: Processing lag exceeds error threshold; skipping side effects!", lag_ms: lag_ms @@ -600,9 +625,9 @@ defmodule Domain.Replication.Connection do def handle_info( {:check_error_threshold, lag_ms}, - %{error_threshold_exceeded?: true} = state + %{error_threshold_exceeded?: true, error_threshold: error_threshold} = state ) - when lag_ms < @error_threshold_ms do + when lag_ms < error_threshold do Logger.info("#{__MODULE__}: Processing lag is back below error threshold", lag_ms: lag_ms) {:noreply, %{state | error_threshold_exceeded?: false}} end @@ -612,11 +637,17 @@ defmodule Domain.Replication.Connection do "#{__MODULE__}: Processed #{state.counter} write messages from the WAL stream" ) - Process.send_after(self(), :interval_logger, @status_log_interval) + Process.send_after(self(), :interval_logger, state.status_log_interval) {:noreply, state} end + def handle_info(:flush, state) do + Process.send_after(self(), :flush, state.flush_interval) + + {:noreply, on_flush(state)} + end + def handle_info(:shutdown, _), do: {:disconnect, :normal} def handle_info({:DOWN, _, :process, _, _}, _), do: {:disconnect, :normal} @@ -628,11 +659,10 @@ defmodule Domain.Replication.Connection do defp default_callbacks do quote do # Default implementations for required callbacks - modules using this should implement these - def on_insert(_lsn, _table, _data), do: :ok - def on_update(_lsn, _table, _old_data, _data), do: :ok - def on_delete(_lsn, _table, _old_data), do: :ok + def on_write(state, _lsn, _op, _table, _old_data, _data), do: state + def on_flush(state), do: state - defoverridable on_insert: 3, on_update: 4, on_delete: 3 + defoverridable on_write: 6, on_flush: 1 end end end diff --git a/elixir/apps/domain/priv/repo/migrations/20250630191454_set_defaults_on_change_logs.exs b/elixir/apps/domain/priv/repo/migrations/20250630191454_set_defaults_on_change_logs.exs new file mode 100644 index 000000000..35130e0a2 --- /dev/null +++ b/elixir/apps/domain/priv/repo/migrations/20250630191454_set_defaults_on_change_logs.exs @@ -0,0 +1,17 @@ +defmodule Domain.Repo.Migrations.SetDefaultsOnChangeLogs do + use Ecto.Migration + + def up do + alter table(:change_logs) do + modify(:inserted_at, :utc_datetime_usec, default: fragment("now()")) + modify(:id, :binary_id, default: fragment("gen_random_uuid()")) + end + end + + def down do + alter table(:change_logs) do + modify(:inserted_at, :utc_datetime_usec, default: nil) + modify(:id, :binary_id, default: nil) + end + end +end diff --git a/elixir/apps/domain/priv/repo/migrations/20250630210514_drop_foreign_key_constraint_on_change_logs.exs b/elixir/apps/domain/priv/repo/migrations/20250630210514_drop_foreign_key_constraint_on_change_logs.exs new file mode 100644 index 000000000..cb9a2fd28 --- /dev/null +++ b/elixir/apps/domain/priv/repo/migrations/20250630210514_drop_foreign_key_constraint_on_change_logs.exs @@ -0,0 +1,13 @@ +defmodule Domain.Repo.Migrations.DropForeignKeyConstraintOnChangeLogs do + use Ecto.Migration + + def up do + drop(constraint(:change_logs, :change_logs_account_id_fkey)) + end + + def down do + alter table(:change_logs) do + modify(:account_id, references(:accounts, type: :binary_id, on_delete: :delete_all)) + end + end +end diff --git a/elixir/apps/domain/priv/repo/migrations/20250702020715_add_change_logs_data_constraint.exs b/elixir/apps/domain/priv/repo/migrations/20250702020715_add_change_logs_data_constraint.exs new file mode 100644 index 000000000..9ba9c1161 --- /dev/null +++ b/elixir/apps/domain/priv/repo/migrations/20250702020715_add_change_logs_data_constraint.exs @@ -0,0 +1,18 @@ +defmodule Domain.Repo.Migrations.AddChangeLogsDataConstraint do + use Ecto.Migration + + def change do + create( + constraint(:change_logs, :valid_data_for_operation, + check: """ + CASE op + WHEN 'insert' THEN data IS NOT NULL AND old_data IS NULL + WHEN 'update' THEN data IS NOT NULL AND old_data IS NOT NULL + WHEN 'delete' THEN data IS NULL AND old_data IS NOT NULL + ELSE false + END + """ + ) + ) + end +end diff --git a/elixir/apps/domain/test/domain/change_logs/replication_connection_test.exs b/elixir/apps/domain/test/domain/change_logs/replication_connection_test.exs index 39b856134..d178cc1b9 100644 --- a/elixir/apps/domain/test/domain/change_logs/replication_connection_test.exs +++ b/elixir/apps/domain/test/domain/change_logs/replication_connection_test.exs @@ -1,9 +1,8 @@ defmodule Domain.ChangeLogs.ReplicationConnectionTest do use Domain.DataCase, async: true - import ExUnit.CaptureLog import Ecto.Query - import Domain.ChangeLogs.ReplicationConnection + alias Domain.ChangeLogs.ReplicationConnection alias Domain.ChangeLogs.ChangeLog alias Domain.Repo @@ -12,351 +11,762 @@ defmodule Domain.ChangeLogs.ReplicationConnectionTest do %{account: account} end - describe "on_insert/2" do - test "ignores flows table - no record created" do - table = "flows" - data = %{"id" => 1, "name" => "test flow"} + describe "on_write/6 for inserts" do + test "ignores account inserts", %{account: account} do + initial_state = %{flush_buffer: %{}} - initial_count = Repo.aggregate(ChangeLog, :count, :id) + result_state = + ReplicationConnection.on_write( + initial_state, + 12345, + :insert, + "accounts", + nil, + %{"id" => account.id, "name" => "test account"} + ) - assert :ok = on_insert(0, table, data) - - # No record should be created for flows - final_count = Repo.aggregate(ChangeLog, :count, :id) - assert final_count == initial_count + assert result_state == %{ + flush_buffer: %{ + 12345 => %{ + data: %{ + "id" => account.id, + "name" => "test account" + }, + table: "accounts", + vsn: 0, + op: :insert, + account_id: account.id, + lsn: 12345, + old_data: nil + } + } + } end - test "creates change log record for non-flows tables", %{account: account} do - table = "accounts" - data = %{"id" => account.id, "name" => "test account"} + test "adds insert operation to flush buffer for non-account tables", %{account: account} do + table = "resources" - assert :ok = on_insert(0, table, data) + data = %{ + "id" => Ecto.UUID.generate(), + "account_id" => account.id, + "name" => "test resource" + } - # Verify the record was created - change_log = Repo.one!(from cl in ChangeLog, order_by: [desc: cl.inserted_at], limit: 1) + lsn = 12345 - assert change_log.op == :insert - assert change_log.table == table - assert change_log.old_data == nil - assert change_log.data == data - assert change_log.vsn == 0 - assert change_log.account_id == account.id + initial_state = %{flush_buffer: %{}} + + result_state = + ReplicationConnection.on_write( + initial_state, + lsn, + :insert, + table, + nil, + data + ) + + assert map_size(result_state.flush_buffer) == 1 + assert Map.has_key?(result_state.flush_buffer, lsn) + + attrs = result_state.flush_buffer[lsn] + assert attrs.lsn == lsn + assert attrs.table == table + assert attrs.op == :insert + assert attrs.data == data + assert attrs.old_data == nil + assert attrs.account_id == account.id + assert attrs.vsn == 0 end - test "creates records for different table types", %{account: account} do - test_cases = [ - {"accounts", %{"id" => account.id, "name" => "test account"}}, - {"resources", - %{"id" => Ecto.UUID.generate(), "name" => "test resource", "account_id" => account.id}}, - {"policies", - %{"id" => Ecto.UUID.generate(), "name" => "test policy", "account_id" => account.id}}, - {"actors", - %{"id" => Ecto.UUID.generate(), "name" => "test actor", "account_id" => account.id}} - ] + test "preserves existing buffer items", %{account: account} do + existing_lsn = 100 - for {{table, data}, idx} <- Enum.with_index(test_cases) do - initial_count = Repo.aggregate(ChangeLog, :count, :id) + existing_item = %{ + lsn: existing_lsn, + table: "other_table", + op: :update, + account_id: account.id, + data: %{"id" => "existing"}, + old_data: nil, + vsn: 0 + } - assert :ok = on_insert(idx, table, data) + initial_state = %{flush_buffer: %{existing_lsn => existing_item}} - final_count = Repo.aggregate(ChangeLog, :count, :id) - assert final_count == initial_count + 1 + new_lsn = 101 - change_log = Repo.one!(from cl in ChangeLog, order_by: [desc: cl.inserted_at], limit: 1) + result_state = + ReplicationConnection.on_write( + initial_state, + new_lsn, + :insert, + "resources", + nil, + %{"id" => Ecto.UUID.generate(), "account_id" => account.id} + ) - assert change_log.op == :insert - assert change_log.table == table - assert change_log.old_data == nil - assert change_log.data == data - assert change_log.vsn == 0 - assert change_log.account_id == account.id - end - end - end - - describe "on_update/3" do - test "ignores flows table - no record created" do - table = "flows" - old_data = %{"id" => 1, "name" => "old flow"} - data = %{"id" => 1, "name" => "new flow"} - - initial_count = Repo.aggregate(ChangeLog, :count, :id) - - assert :ok = on_update(0, table, old_data, data) - - # No record should be created for flows - final_count = Repo.aggregate(ChangeLog, :count, :id) - assert final_count == initial_count + assert map_size(result_state.flush_buffer) == 2 + assert result_state.flush_buffer[existing_lsn] == existing_item + assert Map.has_key?(result_state.flush_buffer, new_lsn) end - test "creates change log record for non-flows tables", %{account: account} do - table = "accounts" - old_data = %{"id" => account.id, "name" => "old name"} - data = %{"id" => account.id, "name" => "new name"} + test "ignores relay_group tokens" do + initial_state = %{flush_buffer: %{}} - assert :ok = on_update(0, table, old_data, data) + result_state = + ReplicationConnection.on_write( + initial_state, + 12345, + :insert, + "tokens", + nil, + %{"id" => Ecto.UUID.generate(), "type" => "relay_group"} + ) - # Verify the record was created - change_log = Repo.one!(from cl in ChangeLog, order_by: [desc: cl.inserted_at], limit: 1) - - assert change_log.op == :update - assert change_log.table == table - assert change_log.old_data == old_data - assert change_log.data == data - assert change_log.vsn == 0 - assert change_log.account_id == account.id + assert result_state == initial_state end test "handles complex data structures", %{account: account} do - table = "resources" - resource_id = Ecto.UUID.generate() - - old_data = %{ - "id" => resource_id, - "name" => "old name", - "account_id" => account.id, - "settings" => %{"theme" => "dark", "notifications" => true} - } - - data = %{ - "id" => resource_id, - "name" => "new name", - "account_id" => account.id, - "settings" => %{"theme" => "light", "notifications" => false}, - "tags" => ["updated", "important"] - } - - assert :ok = on_update(0, table, old_data, data) - - change_log = Repo.one!(from cl in ChangeLog, order_by: [desc: cl.inserted_at], limit: 1) - - assert change_log.op == :update - assert change_log.table == table - assert change_log.old_data == old_data - assert change_log.data == data - assert change_log.vsn == 0 - assert change_log.account_id == account.id - end - end - - describe "on_delete/2" do - test "ignores soft-deleted records" do - table = "resources" - old_data = %{"id" => Ecto.UUID.generate(), "deleted_at" => "#{DateTime.utc_now()}"} - - initial_count = Repo.aggregate(ChangeLog, :count, :id) - - assert :ok = on_delete(0, table, old_data) - - # No record should be created for soft-deleted records - final_count = Repo.aggregate(ChangeLog, :count, :id) - assert final_count == initial_count - end - - test "ignores flows table - no record created" do - table = "flows" - old_data = %{"id" => 1, "name" => "deleted flow"} - - initial_count = Repo.aggregate(ChangeLog, :count, :id) - - assert :ok = on_delete(0, table, old_data) - - # No record should be created for flows - final_count = Repo.aggregate(ChangeLog, :count, :id) - assert final_count == initial_count - end - - test "creates change log record for non-flows tables", %{account: account} do - table = "accounts" - old_data = %{"id" => account.id, "name" => "deleted account"} - - assert :ok = on_delete(0, table, old_data) - - # Verify the record was created - change_log = Repo.one!(from cl in ChangeLog, order_by: [desc: cl.inserted_at], limit: 1) - - assert change_log.op == :delete - assert change_log.table == table - assert change_log.old_data == old_data - assert change_log.data == nil - assert change_log.vsn == 0 - assert change_log.account_id == account.id - end - - test "handles various data types in old_data", %{account: account} do - table = "resources" - resource_id = Ecto.UUID.generate() - - old_data = %{ - "id" => resource_id, - "name" => "complex resource", - "account_id" => account.id, - "metadata" => %{ - "created_by" => "system", - "permissions" => ["read", "write"], - "config" => %{"timeout" => 30, "retries" => 3} - }, - "active" => true, - "count" => 42 - } - - assert :ok = on_delete(0, table, old_data) - - change_log = Repo.one!(from cl in ChangeLog, order_by: [desc: cl.inserted_at], limit: 1) - - assert change_log.op == :delete - assert change_log.table == table - assert change_log.old_data == old_data - assert change_log.data == nil - assert change_log.vsn == 0 - assert change_log.account_id == account.id - end - end - - describe "error handling" do - test "handles foreign key errors gracefully" do - # Create a change log entry that references a non-existent account - table = "resources" - # Non-existent account_id - data = %{"id" => Ecto.UUID.generate(), "account_id" => Ecto.UUID.generate()} - - initial_count = Repo.aggregate(ChangeLog, :count, :id) - - # Should return :ok even if foreign key constraint fails - assert :ok = on_insert(0, table, data) - - # No record should be created due to foreign key error - final_count = Repo.aggregate(ChangeLog, :count, :id) - assert final_count == initial_count - end - - test "logs and handles non-foreign-key validation errors gracefully", %{account: account} do - # Test with invalid data that would cause validation errors (not foreign key) - table = "accounts" - # Missing required fields but valid FK - data = %{"account_id" => account.id} - - initial_count = Repo.aggregate(ChangeLog, :count, :id) - - log_output = - capture_log(fn -> - assert :ok = on_insert(0, table, data) - end) - - # Should log the error - assert log_output =~ "Failed to create change log" - - # No record should be created - final_count = Repo.aggregate(ChangeLog, :count, :id) - assert final_count == initial_count - end - end - - describe "data integrity" do - test "preserves exact data structures", %{account: account} do - table = "policies" - policy_id = Ecto.UUID.generate() - - # Test with various data types complex_data = %{ - "id" => policy_id, + "id" => Ecto.UUID.generate(), "account_id" => account.id, - "string_field" => "test string", - "integer_field" => 42, - "boolean_field" => true, + "nested" => %{"key" => "value", "array" => [1, 2, 3]}, "null_field" => nil, - "array_field" => [1, "two", %{"three" => 3}], - "nested_object" => %{ - "level1" => %{ - "level2" => %{ - "deep_value" => "preserved" - } - } + "boolean" => true + } + + state = %{flush_buffer: %{}} + lsn = 200 + + result_state = + ReplicationConnection.on_write( + state, + lsn, + :insert, + "resources", + nil, + complex_data + ) + + attrs = result_state.flush_buffer[lsn] + assert attrs.data == complex_data + end + end + + describe "on_write/6 for updates" do + test "adds update operation to flush buffer", %{account: account} do + table = "resources" + old_data = %{"id" => Ecto.UUID.generate(), "account_id" => account.id, "name" => "old name"} + data = %{"id" => old_data["id"], "account_id" => account.id, "name" => "new name"} + lsn = 12346 + + initial_state = %{flush_buffer: %{}} + + result_state = + ReplicationConnection.on_write( + initial_state, + lsn, + :update, + table, + old_data, + data + ) + + assert map_size(result_state.flush_buffer) == 1 + attrs = result_state.flush_buffer[lsn] + + assert attrs.lsn == lsn + assert attrs.table == table + assert attrs.op == :update + assert attrs.data == data + assert attrs.old_data == old_data + assert attrs.account_id == account.id + assert attrs.vsn == 0 + end + + test "handles account updates specially", %{account: account} do + old_data = %{"id" => account.id, "name" => "old name"} + data = %{"id" => account.id, "name" => "new name"} + lsn = 12346 + + initial_state = %{flush_buffer: %{}} + + result_state = + ReplicationConnection.on_write( + initial_state, + lsn, + :update, + "accounts", + old_data, + data + ) + + # Account updates should be buffered + assert map_size(result_state.flush_buffer) == 1 + attrs = result_state.flush_buffer[lsn] + assert attrs.table == "accounts" + assert attrs.op == :update + assert attrs.account_id == account.id + end + + test "handles complex data changes", %{account: account} do + old_data = %{ + "id" => Ecto.UUID.generate(), + "account_id" => account.id, + "settings" => %{"theme" => "dark"}, + "tags" => ["old"] + } + + new_data = %{ + "id" => old_data["id"], + "account_id" => account.id, + "settings" => %{"theme" => "light", "language" => "en"}, + "tags" => ["new", "updated"] + } + + state = %{flush_buffer: %{}} + lsn = 300 + + result_state = + ReplicationConnection.on_write( + state, + lsn, + :update, + "resources", + old_data, + new_data + ) + + attrs = result_state.flush_buffer[lsn] + assert attrs.old_data == old_data + assert attrs.data == new_data + end + end + + describe "on_write/6 for deletes" do + test "ignores account deletes", %{account: account} do + initial_state = %{flush_buffer: %{}} + + result_state = + ReplicationConnection.on_write( + initial_state, + 12345, + :delete, + "accounts", + %{"id" => account.id, "name" => "deleted account"}, + nil + ) + + assert result_state == initial_state + end + + test "adds delete operation to flush buffer for non-soft-deleted records", %{account: account} do + table = "resources" + + old_data = %{ + "id" => Ecto.UUID.generate(), + "account_id" => account.id, + "name" => "deleted resource", + "deleted_at" => nil + } + + lsn = 12347 + + initial_state = %{flush_buffer: %{}} + + result_state = + ReplicationConnection.on_write( + initial_state, + lsn, + :delete, + table, + old_data, + nil + ) + + assert map_size(result_state.flush_buffer) == 1 + attrs = result_state.flush_buffer[lsn] + + assert attrs.lsn == lsn + assert attrs.table == table + assert attrs.op == :delete + assert attrs.data == nil + assert attrs.old_data == old_data + assert attrs.account_id == account.id + assert attrs.vsn == 0 + end + + test "ignores soft-deleted records", %{account: account} do + table = "resources" + + old_data = %{ + "id" => Ecto.UUID.generate(), + "account_id" => account.id, + "name" => "soft deleted", + "deleted_at" => "2024-01-01T00:00:00Z" + } + + lsn = 12348 + + initial_state = %{flush_buffer: %{}} + + result_state = + ReplicationConnection.on_write( + initial_state, + lsn, + :delete, + table, + old_data, + nil + ) + + # Buffer should remain unchanged + assert map_size(result_state.flush_buffer) == 0 + assert result_state == initial_state + end + + test "processes record without deleted_at field", %{account: account} do + old_data = %{ + "id" => Ecto.UUID.generate(), + "account_id" => account.id, + "name" => "no deleted_at field" + } + + state = %{flush_buffer: %{}} + lsn = 400 + + result_state = + ReplicationConnection.on_write( + state, + lsn, + :delete, + "resources", + old_data, + nil + ) + + assert map_size(result_state.flush_buffer) == 1 + attrs = result_state.flush_buffer[lsn] + assert attrs.op == :delete + end + end + + describe "multiple operations and buffer accumulation" do + test "operations accumulate in flush buffer correctly", %{account: account} do + initial_state = %{flush_buffer: %{}} + + # Insert + state1 = + ReplicationConnection.on_write( + initial_state, + 100, + :insert, + "resources", + nil, + %{"id" => Ecto.UUID.generate(), "account_id" => account.id, "name" => "test"} + ) + + assert map_size(state1.flush_buffer) == 1 + + # Update + resource_id = Ecto.UUID.generate() + + state2 = + ReplicationConnection.on_write( + state1, + 101, + :update, + "resources", + %{"id" => resource_id, "account_id" => account.id, "name" => "test"}, + %{"id" => resource_id, "account_id" => account.id, "name" => "updated"} + ) + + assert map_size(state2.flush_buffer) == 2 + + # Delete (non-soft) + state3 = + ReplicationConnection.on_write( + state2, + 102, + :delete, + "resources", + %{"id" => resource_id, "account_id" => account.id, "name" => "updated"}, + nil + ) + + assert map_size(state3.flush_buffer) == 3 + + # Verify LSNs + assert Map.has_key?(state3.flush_buffer, 100) + assert Map.has_key?(state3.flush_buffer, 101) + assert Map.has_key?(state3.flush_buffer, 102) + + assert state3.flush_buffer[100].op == :insert + assert state3.flush_buffer[101].op == :update + assert state3.flush_buffer[102].op == :delete + end + + test "mixed operations with soft deletes", %{account: account} do + state = %{flush_buffer: %{}} + + # Regular insert + state1 = + ReplicationConnection.on_write( + state, + 100, + :insert, + "resources", + nil, + %{"id" => Ecto.UUID.generate(), "account_id" => account.id, "name" => "test"} + ) + + # Regular update + resource_id = Ecto.UUID.generate() + + state2 = + ReplicationConnection.on_write( + state1, + 101, + :update, + "resources", + %{"id" => resource_id, "account_id" => account.id, "name" => "test"}, + %{"id" => resource_id, "account_id" => account.id, "name" => "updated"} + ) + + # Soft delete (should be ignored) + state3 = + ReplicationConnection.on_write( + state2, + 102, + :delete, + "resources", + %{ + "id" => resource_id, + "account_id" => account.id, + "name" => "updated", + "deleted_at" => "2024-01-01T00:00:00Z" + }, + nil + ) + + # Hard delete (should be included) + state4 = + ReplicationConnection.on_write( + state3, + 103, + :delete, + "resources", + %{ + "id" => resource_id, + "account_id" => account.id, + "name" => "updated", + "deleted_at" => nil + }, + nil + ) + + # Should have 3 operations: insert, update, hard delete (soft delete ignored) + assert map_size(state4.flush_buffer) == 3 + assert Map.has_key?(state4.flush_buffer, 100) + assert Map.has_key?(state4.flush_buffer, 101) + refute Map.has_key?(state4.flush_buffer, 102) + assert Map.has_key?(state4.flush_buffer, 103) + end + end + + describe "on_flush/1" do + test "handles empty flush buffer" do + state = %{flush_buffer: %{}} + + result_state = ReplicationConnection.on_flush(state) + + assert result_state == state + end + + test "successfully flushes buffer and clears it", %{account: account} do + # Create valid change log data + attrs1 = %{ + lsn: 100, + table: "resources", + op: :insert, + account_id: account.id, + data: %{"id" => Ecto.UUID.generate(), "account_id" => account.id, "name" => "test1"}, + old_data: nil, + vsn: 0 + } + + attrs2 = %{ + lsn: 101, + table: "resources", + op: :update, + account_id: account.id, + data: %{"id" => Ecto.UUID.generate(), "account_id" => account.id, "name" => "test2"}, + old_data: %{"id" => Ecto.UUID.generate(), "account_id" => account.id, "name" => "test1"}, + vsn: 0 + } + + state = %{ + flush_buffer: %{100 => attrs1, 101 => attrs2}, + last_flushed_lsn: 99 + } + + result_state = ReplicationConnection.on_flush(state) + + assert result_state.flush_buffer == %{} + # Should be the highest LSN + assert result_state.last_flushed_lsn == 101 + + # Verify actual records were created in database + change_logs = Repo.all(from cl in ChangeLog, where: cl.lsn in [100, 101], order_by: cl.lsn) + assert length(change_logs) == 2 + + [log1, log2] = change_logs + assert log1.lsn == 100 + assert log1.op == :insert + assert log2.lsn == 101 + assert log2.op == :update + end + + test "calculates last_flushed_lsn correctly as max LSN", %{account: account} do + # Create multiple records with non-sequential LSNs + attrs_map = %{ + 400 => %{ + lsn: 400, + table: "resources", + op: :insert, + account_id: account.id, + data: %{"id" => Ecto.UUID.generate(), "account_id" => account.id}, + old_data: nil, + vsn: 0 + }, + 402 => %{ + lsn: 402, + table: "resources", + op: :insert, + account_id: account.id, + data: %{"id" => Ecto.UUID.generate(), "account_id" => account.id}, + old_data: nil, + vsn: 0 + }, + 401 => %{ + lsn: 401, + table: "resources", + op: :insert, + account_id: account.id, + data: %{"id" => Ecto.UUID.generate(), "account_id" => account.id}, + old_data: nil, + vsn: 0 } } - assert :ok = on_insert(0, table, complex_data) + state = %{flush_buffer: attrs_map, last_flushed_lsn: 399} - change_log = Repo.one!(from cl in ChangeLog, order_by: [desc: cl.inserted_at], limit: 1) + result_state = ReplicationConnection.on_flush(state) - # Data should be preserved exactly as provided - assert change_log.data == complex_data - assert change_log.op == :insert - assert change_log.table == table - assert change_log.account_id == account.id - end - - test "tracks operation sequence correctly", %{account: account} do - table = "accounts" - initial_data = %{"id" => account.id, "name" => "initial"} - updated_data = %{"id" => account.id, "name" => "updated"} - - # Insert - assert :ok = on_insert(0, table, initial_data) - - # Update - assert :ok = on_update(1, table, initial_data, updated_data) - - # Delete - assert :ok = on_delete(2, table, updated_data) - - # Get the three most recent records in reverse chronological order - logs = - Repo.all( - from cl in ChangeLog, - where: cl.account_id == ^account.id, - order_by: [desc: cl.inserted_at], - limit: 3 - ) - - # Should have 3 records (delete, update, insert in that order) - assert length(logs) >= 3 - [delete_log, update_log, insert_log] = Enum.take(logs, 3) - - # Verify sequence (most recent first) - assert delete_log.op == :delete - assert delete_log.old_data == updated_data - assert delete_log.data == nil - assert delete_log.account_id == account.id - - assert update_log.op == :update - assert update_log.old_data == initial_data - assert update_log.data == updated_data - assert update_log.account_id == account.id - - assert insert_log.op == :insert - assert insert_log.old_data == nil - assert insert_log.data == initial_data - assert insert_log.account_id == account.id - - # All should have same version - assert insert_log.vsn == 0 - assert update_log.vsn == 0 - assert delete_log.vsn == 0 + # Should update to max LSN (402) + assert result_state.last_flushed_lsn == 402 + assert result_state.flush_buffer == %{} end end - describe "flows table comprehensive test" do - test "flows table never creates records regardless of operation or data" do - initial_count = Repo.aggregate(ChangeLog, :count, :id) + describe "LSN tracking and ordering" do + test "LSNs are preserved correctly in buffer" do + lsns = [1000, 1001, 1002] - # Test various data shapes and operations - test_data_sets = [ - %{}, - %{"id" => 1}, - %{"complex" => %{"nested" => ["data", 1, true, nil]}}, - nil - ] + state = %{flush_buffer: %{}} - for data <- test_data_sets do - assert :ok = on_insert(0, "flows", data) - assert :ok = on_update(1, "flows", data, data) - assert :ok = on_delete(2, "flows", data) - end + # Add multiple operations with specific LSNs + final_state = + Enum.reduce(lsns, state, fn lsn, acc_state -> + ReplicationConnection.on_write( + acc_state, + lsn, + :insert, + "resources", + nil, + %{ + "id" => Ecto.UUID.generate(), + "account_id" => "test-account", + "name" => "test_#{lsn}" + } + ) + end) - # No records should have been created - final_count = Repo.aggregate(ChangeLog, :count, :id) - assert final_count == initial_count + # Verify LSNs are preserved as keys + assert Map.keys(final_state.flush_buffer) |> Enum.sort() == lsns + + # Verify each entry has correct LSN + Enum.each(lsns, fn lsn -> + assert final_state.flush_buffer[lsn].lsn == lsn + end) + end + + test "handles large LSN values", %{account: account} do + large_lsn = 999_999_999_999_999 + + state = %{flush_buffer: %{}} + + result_state = + ReplicationConnection.on_write( + state, + large_lsn, + :insert, + "resources", + nil, + %{"id" => Ecto.UUID.generate(), "account_id" => account.id} + ) + + attrs = result_state.flush_buffer[large_lsn] + assert attrs.lsn == large_lsn + end + + test "preserves LSN ordering through flush", %{account: account} do + # Add operations with non-sequential LSNs + lsns = [1005, 1003, 1007, 1001] + + state = %{flush_buffer: %{}, last_flushed_lsn: 0} + + final_state = + Enum.reduce(lsns, state, fn lsn, acc_state -> + ReplicationConnection.on_write( + acc_state, + lsn, + :insert, + "resources", + nil, + %{"id" => Ecto.UUID.generate(), "account_id" => account.id} + ) + end) + + # Flush to database + ReplicationConnection.on_flush(final_state) + + # Verify records in database have correct LSNs + change_logs = Repo.all(from cl in ChangeLog, where: cl.lsn in ^lsns, order_by: cl.lsn) + db_lsns = Enum.map(change_logs, & &1.lsn) + assert db_lsns == Enum.sort(lsns) + end + end + + describe "edge cases and error scenarios" do + test "logs error for writes without account_id" do + import ExUnit.CaptureLog + + state = %{flush_buffer: %{}} + + log = + capture_log(fn -> + result = + ReplicationConnection.on_write( + state, + 500, + :insert, + "some_table", + nil, + %{"id" => Ecto.UUID.generate(), "name" => "no account_id"} + ) + + # State should remain unchanged + assert result == state + end) + + assert log =~ "Unexpected write operation!" + assert log =~ "lsn=500" + end + + test "handles account_id in old_data for deletes", %{account: account} do + state = %{flush_buffer: %{}} + lsn = 600 + + result_state = + ReplicationConnection.on_write( + state, + lsn, + :delete, + "resources", + %{"id" => Ecto.UUID.generate(), "account_id" => account.id}, + nil + ) + + assert map_size(result_state.flush_buffer) == 1 + assert result_state.flush_buffer[lsn].account_id == account.id + end + + test "handles very large buffers" do + account_id = Ecto.UUID.generate() + state = %{flush_buffer: %{}, last_flushed_lsn: 0} + + # Simulate adding many operations + operations = 1..100 + + final_state = + Enum.reduce(operations, state, fn i, acc_state -> + ReplicationConnection.on_write( + acc_state, + i, + :insert, + "resources", + nil, + %{"id" => Ecto.UUID.generate(), "account_id" => account_id, "name" => "user#{i}"} + ) + end) + + assert map_size(final_state.flush_buffer) == 100 + + # Verify all LSNs are present + buffer_lsns = Map.keys(final_state.flush_buffer) |> Enum.sort() + assert buffer_lsns == Enum.to_list(1..100) + end + end + + describe "special table handling" do + test "ignores relay_group token updates" do + state = %{flush_buffer: %{}} + + # Update where old_data has relay_group type + result_state1 = + ReplicationConnection.on_write( + state, + 100, + :update, + "tokens", + %{"id" => Ecto.UUID.generate(), "type" => "relay_group"}, + %{"id" => Ecto.UUID.generate(), "type" => "relay_group", "updated" => true} + ) + + assert result_state1 == state + + # Update where new data has relay_group type + result_state2 = + ReplicationConnection.on_write( + state, + 101, + :update, + "tokens", + %{"id" => Ecto.UUID.generate(), "type" => "other"}, + %{"id" => Ecto.UUID.generate(), "type" => "relay_group"} + ) + + assert result_state2 == state + end + + test "processes non-relay_group tokens normally", %{account: account} do + state = %{flush_buffer: %{}} + lsn = 102 + + result_state = + ReplicationConnection.on_write( + state, + lsn, + :insert, + "tokens", + nil, + %{"id" => Ecto.UUID.generate(), "account_id" => account.id, "type" => "browser"} + ) + + assert map_size(result_state.flush_buffer) == 1 + assert result_state.flush_buffer[lsn].table == "tokens" end end end diff --git a/elixir/apps/domain/test/domain/change_logs_test.exs b/elixir/apps/domain/test/domain/change_logs_test.exs index 48b8cf85d..f9af0b863 100644 --- a/elixir/apps/domain/test/domain/change_logs_test.exs +++ b/elixir/apps/domain/test/domain/change_logs_test.exs @@ -2,15 +2,15 @@ defmodule Domain.ChangeLogsTest do use Domain.DataCase, async: true import Domain.ChangeLogs - describe "create/1" do + describe "bulk_insert/1" do setup do account = Fixtures.Accounts.create_account() - %{account: account} end - test "inserts a change_log for an account", %{account: account} do + test "skips duplicate lsn", %{account: account} do attrs = %{ + account_id: account.id, lsn: 1, table: "resources", op: :insert, @@ -19,152 +19,16 @@ defmodule Domain.ChangeLogsTest do vsn: 1 } - assert {:ok, %Domain.ChangeLogs.ChangeLog{} = change_log} = create_change_log(attrs) - - assert change_log.account_id == account.id - assert change_log.op == :insert - assert change_log.old_data == nil - assert change_log.data == %{"account_id" => account.id, "key" => "value"} - end - - test "uses the 'id' field accounts table updates", %{account: account} do - attrs = %{ - lsn: 1, - table: "accounts", - op: :update, - old_data: %{"id" => account.id, "name" => "Old Name"}, - data: %{"id" => account.id, "name" => "New Name"}, - vsn: 1 - } - - assert {:ok, %Domain.ChangeLogs.ChangeLog{} = change_log} = create_change_log(attrs) - - assert change_log.account_id == account.id - assert change_log.op == :update - assert change_log.old_data == %{"id" => account.id, "name" => "Old Name"} - assert change_log.data == %{"id" => account.id, "name" => "New Name"} - end - - test "requires vsn field", %{account: account} do - attrs = %{ - lsn: 1, - table: "resources", - op: :insert, - old_data: nil, - data: %{"account_id" => account.id, "key" => "value"} - } - - assert {:error, changeset} = create_change_log(attrs) - assert changeset.valid? == false - assert changeset.errors[:vsn] == {"can't be blank", [validation: :required]} - end - - test "requires table field", %{account: account} do - attrs = %{ - lsn: 1, - op: :insert, - old_data: nil, - data: %{"account_id" => account.id, "key" => "value"}, - vsn: 1 - } - - assert {:error, changeset} = create_change_log(attrs) - assert changeset.valid? == false - assert changeset.errors[:table] == {"can't be blank", [validation: :required]} - end - - test "prevents inserting duplicate lsn", %{account: account} do - attrs = %{ - lsn: 1, - table: "resources", - op: :insert, - old_data: nil, - data: %{"account_id" => account.id, "key" => "value"}, - vsn: 1 - } - - assert {:ok, _change_log} = create_change_log(attrs) + assert {1, nil} = bulk_insert([attrs]) + # Try to insert with same LSN but different data dupe_lsn_attrs = Map.put(attrs, :data, %{"account_id" => account.id, "key" => "new_value"}) - - assert {:error, changeset} = create_change_log(dupe_lsn_attrs) - assert changeset.valid? == false - - assert changeset.errors[:lsn] == - {"has already been taken", - [constraint: :unique, constraint_name: "change_logs_lsn_index"]} + assert {0, nil} = bulk_insert([dupe_lsn_attrs]) end - test "requires op field to be one of :insert, :update, :delete", %{account: account} do - attrs = %{ - lsn: 1, - table: "resources", - op: :invalid_op, - old_data: nil, - data: %{"account_id" => account.id, "key" => "value"}, - vsn: 1 - } - - assert {:error, changeset} = create_change_log(attrs) - assert changeset.valid? == false - assert {"is invalid", errors} = changeset.errors[:op] - assert {:validation, :inclusion} in errors - end - - test "requires correct combination of operation and data", %{account: account} do - # Invalid combination: :insert with old_data present - attrs = %{ - lsn: 1, - table: "resources", - op: :insert, - old_data: %{"account_id" => account.id, "key" => "old_value"}, - data: %{"account_id" => account.id, "key" => "new_value"}, - vsn: 1 - } - - assert {:error, changeset} = create_change_log(attrs) - assert changeset.valid? == false - assert changeset.errors[:base] == {"Invalid combination of operation and data", []} - - # Valid combination: :insert with old_data nil and data present - attrs = %{ - lsn: 1, - table: "resources", - op: :insert, - old_data: nil, - data: %{"account_id" => account.id, "key" => "new_value"}, - vsn: 1 - } - - assert {:ok, _change_log} = create_change_log(attrs) - - # Valid combination: :update with both old_data and data present - attrs = %{ - lsn: 2, - table: "resources", - op: :update, - old_data: %{"account_id" => account.id, "key" => "old_value"}, - data: %{"account_id" => account.id, "key" => "new_value"}, - vsn: 1 - } - - assert {:ok, _change_log} = create_change_log(attrs) - - # Valid combination: :delete with old_data present and data nil - attrs = %{ - lsn: 3, - table: "resources", - op: :delete, - old_data: %{"account_id" => account.id, "key" => "old_value"}, - data: nil, - vsn: 1 - } - - assert {:ok, _change_log} = create_change_log(attrs) - end - - test "requires account_id to be populated from old_data or data" do + test "raises not null constraint when account_id is missing" do attrs = %{ + # account_id is missing lsn: 1, table: "resources", op: :insert, @@ -173,26 +37,97 @@ defmodule Domain.ChangeLogsTest do vsn: 1 } - assert {:error, changeset} = create_change_log(attrs) - assert changeset.valid? == false - assert changeset.errors[:account_id] == {"can't be blank", [validation: :required]} + assert_raise Postgrex.Error, + ~r/null value in column "account_id".*violates not-null constraint/, + fn -> + bulk_insert([attrs]) + end end - test "requires old_data[\"account_id\"] and data[\"account_id\"] to match", %{ - account: account - } do + test "raises not null constraint when table is missing", %{account: account} do attrs = %{ + account_id: account.id, lsn: 1, - table: "resources", - op: :update, - old_data: %{"account_id" => account.id, "key" => "old_value"}, - data: %{"account_id" => "different_account_id", "key" => "new_value"}, + # table is missing + op: :insert, + old_data: nil, + data: %{"key" => "value"}, vsn: 1 } - assert {:error, changeset} = create_change_log(attrs) - assert changeset.valid? == false - assert changeset.errors[:base] == {"Account ID cannot be changed", []} + assert_raise Postgrex.Error, + ~r/null value in column "table".*violates not-null constraint/, + fn -> + bulk_insert([attrs]) + end + end + + test "raises not null constraint when op is missing", %{account: account} do + attrs = %{ + account_id: account.id, + lsn: 1, + table: "resources", + # op is missing + old_data: nil, + data: %{"key" => "value"}, + vsn: 1 + } + + assert_raise Postgrex.Error, + ~r/null value in column "op".*violates not-null constraint/, + fn -> + bulk_insert([attrs]) + end + end + + test "enforces data constraints based on operation type", %{account: account} do + # Invalid insert (has old_data) + assert_raise Postgrex.Error, ~r/valid_data_for_operation/, fn -> + bulk_insert([ + %{ + account_id: account.id, + lsn: 1, + table: "resources", + op: :insert, + # Should be null for insert + old_data: %{"id" => "123"}, + data: %{"id" => "123"}, + vsn: 1 + } + ]) + end + + # Invalid update (missing old_data) + assert_raise Postgrex.Error, ~r/valid_data_for_operation/, fn -> + bulk_insert([ + %{ + account_id: account.id, + lsn: 2, + table: "resources", + op: :update, + # Should not be null for update + old_data: nil, + data: %{"id" => "123"}, + vsn: 1 + } + ]) + end + + # Invalid delete (has data) + assert_raise Postgrex.Error, ~r/valid_data_for_operation/, fn -> + bulk_insert([ + %{ + account_id: account.id, + lsn: 3, + table: "resources", + op: :delete, + old_data: %{"id" => "123"}, + # Should be null for delete + data: %{"id" => "123"}, + vsn: 1 + } + ]) + end end end end diff --git a/elixir/apps/domain/test/domain/events/replication_connection_test.exs b/elixir/apps/domain/test/domain/events/replication_connection_test.exs index b95a0773a..fdbc63b17 100644 --- a/elixir/apps/domain/test/domain/events/replication_connection_test.exs +++ b/elixir/apps/domain/test/domain/events/replication_connection_test.exs @@ -2,7 +2,7 @@ defmodule Domain.Events.ReplicationConnectionTest do use ExUnit.Case, async: true import ExUnit.CaptureLog - import Domain.Events.ReplicationConnection + alias Domain.Events.ReplicationConnection setup do tables = @@ -12,14 +12,15 @@ defmodule Domain.Events.ReplicationConnectionTest do %{tables: tables} end - describe "on_insert/2" do + describe "on_write/6 for inserts" do test "logs warning for unknown table" do table = "unknown_table" data = %{"id" => Ecto.UUID.generate(), "name" => "test"} log_output = capture_log(fn -> - assert :ok = on_insert(0, table, data) + result = ReplicationConnection.on_write(%{}, 0, :insert, table, nil, data) + assert result == %{} end) assert log_output =~ "No hook defined for insert on table unknown_table" @@ -33,9 +34,8 @@ defmodule Domain.Events.ReplicationConnectionTest do # The actual hook call might fail if the hook modules aren't available, # but we can test that our routing logic works try do - result = on_insert(0, table, data) - # Should either succeed or fail gracefully - assert result in [:ok, :error] or match?({:error, _}, result) + result = ReplicationConnection.on_write(%{}, 0, :insert, table, nil, data) + assert result == %{} rescue # Depending on the shape of the data we might get a function clause error. This is ok, # as we are testing the routing logic, not the actual hook implementations. @@ -50,7 +50,9 @@ defmodule Domain.Events.ReplicationConnectionTest do log_output = capture_log(fn -> try do - on_insert(0, table, %{"id" => Ecto.UUID.generate()}) + ReplicationConnection.on_write(%{}, 0, :insert, table, nil, %{ + "id" => Ecto.UUID.generate() + }) rescue FunctionClauseError -> # Shape of the data might not match the expected one, which is fine @@ -63,15 +65,16 @@ defmodule Domain.Events.ReplicationConnectionTest do end end - describe "on_update/3" do + describe "on_write/6 for updates" do test "logs warning for unknown table" do table = "unknown_table" old_data = %{"id" => Ecto.UUID.generate(), "name" => "old"} - data = %{"id" => Ecto.UUID.generate(), "name" => "new"} + data = %{"id" => old_data["id"], "name" => "new"} log_output = capture_log(fn -> - assert :ok = on_update(0, table, old_data, data) + result = ReplicationConnection.on_write(%{}, 0, :update, table, old_data, data) + assert result == %{} end) assert log_output =~ "No hook defined for update on table unknown_table" @@ -80,12 +83,12 @@ defmodule Domain.Events.ReplicationConnectionTest do test "handles known tables", %{tables: tables} do old_data = %{"id" => Ecto.UUID.generate(), "name" => "old name"} - data = %{"id" => Ecto.UUID.generate(), "name" => "new name"} + data = %{"id" => old_data["id"], "name" => "new name"} for table <- tables do try do - result = on_update(0, table, old_data, data) - assert result in [:ok, :error] or match?({:error, _}, result) + result = ReplicationConnection.on_write(%{}, 0, :update, table, old_data, data) + assert result == %{} rescue FunctionClauseError -> # Shape of the data might not match the expected one, which is fine @@ -95,14 +98,15 @@ defmodule Domain.Events.ReplicationConnectionTest do end end - describe "on_delete/2" do + describe "on_write/6 for deletes" do test "logs warning for unknown table" do table = "unknown_table" old_data = %{"id" => Ecto.UUID.generate(), "name" => "deleted"} log_output = capture_log(fn -> - assert :ok = on_delete(0, table, old_data) + result = ReplicationConnection.on_write(%{}, 0, :delete, table, old_data, nil) + assert result == %{} end) assert log_output =~ "No hook defined for delete on table unknown_table" @@ -110,11 +114,12 @@ defmodule Domain.Events.ReplicationConnectionTest do end test "handles known tables", %{tables: tables} do - old_data = %{"id" => Ecto.UUID.generate(), "name" => "deleted gateway"} + old_data = %{"id" => Ecto.UUID.generate(), "name" => "deleted item"} for table <- tables do try do - assert :ok = on_delete(0, table, old_data) + result = ReplicationConnection.on_write(%{}, 0, :delete, table, old_data, nil) + assert result == %{} rescue # Shape of the data might not match the expected one, which is fine FunctionClauseError -> :ok @@ -123,31 +128,132 @@ defmodule Domain.Events.ReplicationConnectionTest do end end + describe "operation routing" do + test "routes to correct hook based on operation type" do + # Test that we dispatch to the right operation + # Since we can't directly test the hook calls without the actual hook modules, + # we can at least verify the routing logic doesn't crash + + state = %{} + table = "accounts" + + # Insert + try do + result = ReplicationConnection.on_write(state, 1, :insert, table, nil, %{"id" => "123"}) + assert result == state + rescue + FunctionClauseError -> :ok + end + + # Update + try do + result = + ReplicationConnection.on_write(state, 2, :update, table, %{"id" => "123"}, %{ + "id" => "123", + "updated" => true + }) + + assert result == state + rescue + FunctionClauseError -> :ok + end + + # Delete + try do + result = ReplicationConnection.on_write(state, 3, :delete, table, %{"id" => "123"}, nil) + assert result == state + rescue + FunctionClauseError -> :ok + end + end + end + describe "warning message formatting" do - test "log_warning generates correct message format" do + test "log_warning generates correct message format for each operation" do + # Test insert log_output = capture_log(fn -> - assert :ok = on_insert(0, "test_table_insert", %{}) + ReplicationConnection.on_write(%{}, 0, :insert, "test_table_insert", nil, %{}) end) assert log_output =~ "No hook defined for insert on table test_table_insert" assert log_output =~ "Please implement Domain.Events.Hooks for this table" + # Test update log_output = capture_log(fn -> - assert :ok = on_update(0, "test_table_update", %{}, %{}) + ReplicationConnection.on_write(%{}, 0, :update, "test_table_update", %{}, %{}) end) assert log_output =~ "No hook defined for update on table test_table_update" assert log_output =~ "Please implement Domain.Events.Hooks for this table" + # Test delete log_output = capture_log(fn -> - assert :ok = on_delete(0, "test_table_delete", %{}) + ReplicationConnection.on_write(%{}, 0, :delete, "test_table_delete", %{}, nil) end) assert log_output =~ "No hook defined for delete on table test_table_delete" assert log_output =~ "Please implement Domain.Events.Hooks for this table" end end + + describe "state preservation" do + test "always returns the state unchanged" do + initial_state = %{some: "data", counter: 42} + + # Unknown table - should log warning and return state unchanged + result1 = ReplicationConnection.on_write(initial_state, 1, :insert, "unknown", nil, %{}) + assert result1 == initial_state + + # Known table (might error in hook, but should still preserve state) + try do + result2 = ReplicationConnection.on_write(initial_state, 2, :insert, "accounts", nil, %{}) + assert result2 == initial_state + rescue + FunctionClauseError -> :ok + end + end + end + + describe "table_to_hooks mapping" do + test "all configured tables have hook modules" do + # This test ensures our tables_to_hooks map is properly configured + tables_to_hooks = %{ + "accounts" => Domain.Events.Hooks.Accounts, + "actor_group_memberships" => Domain.Events.Hooks.ActorGroupMemberships, + "actor_groups" => Domain.Events.Hooks.ActorGroups, + "actors" => Domain.Events.Hooks.Actors, + "auth_identities" => Domain.Events.Hooks.AuthIdentities, + "auth_providers" => Domain.Events.Hooks.AuthProviders, + "clients" => Domain.Events.Hooks.Clients, + "gateway_groups" => Domain.Events.Hooks.GatewayGroups, + "gateways" => Domain.Events.Hooks.Gateways, + "policies" => Domain.Events.Hooks.Policies, + "resource_connections" => Domain.Events.Hooks.ResourceConnections, + "resources" => Domain.Events.Hooks.Resources, + "tokens" => Domain.Events.Hooks.Tokens + } + + # Verify the mapping includes all expected tables + assert Map.keys(tables_to_hooks) |> Enum.sort() == + [ + "accounts", + "actor_group_memberships", + "actor_groups", + "actors", + "auth_identities", + "auth_providers", + "clients", + "gateway_groups", + "gateways", + "policies", + "resource_connections", + "resources", + "tokens" + ] + |> Enum.sort() + end + end end diff --git a/elixir/apps/domain/test/domain/replication/connection_test.exs b/elixir/apps/domain/test/domain/replication/connection_test.exs index 720ece52b..bf8cd61a8 100644 --- a/elixir/apps/domain/test/domain/replication/connection_test.exs +++ b/elixir/apps/domain/test/domain/replication/connection_test.exs @@ -1,728 +1,494 @@ defmodule Domain.Replication.ConnectionTest do - # Only one ReplicationConnection should be started in the cluster - use ExUnit.Case, async: false + use ExUnit.Case, async: true - # Create a test module that uses the macro + import ExUnit.CaptureLog + + # Define a test module that uses the Domain.Replication.Connection macro defmodule TestReplicationConnection do - use Domain.Replication.Connection, - warning_threshold_ms: 5_000, - error_threshold_ms: 60_000 - end + use Domain.Replication.Connection - alias TestReplicationConnection + def on_write(state, lsn, op, table, old_data, data) do + # Simple test implementation that tracks operations + operations = Map.get(state, :operations, []) - # Used to test callbacks, not used for live connection - def mock_state, - do: %TestReplicationConnection{ - schema: "test_schema", - step: :disconnected, - publication_name: "test_pub", - replication_slot_name: "test_slot", - output_plugin: "pgoutput", - proto_version: 1, - table_subscriptions: ["accounts", "resources"], - relations: %{}, - counter: 0, - tables_to_remove: MapSet.new() - } - - # Used to test live connection - setup do - {connection_opts, config} = - Application.fetch_env!(:domain, Domain.Events.ReplicationConnection) - |> Keyword.pop(:connection_opts) - - init_state = %{ - connection_opts: connection_opts, - instance: struct(TestReplicationConnection, config) - } - - child_spec = %{ - id: TestReplicationConnection, - start: {TestReplicationConnection, :start_link, [init_state]} - } - - {:ok, pid} = - case start_supervised(child_spec) do - {:ok, pid} -> - {:ok, pid} - - {:error, {:already_started, pid}} -> - {:ok, pid} - end - - {:ok, pid: pid} - end - - describe "handle_connect/1 callback" do - test "handle_connect initiates publication check" do - state = mock_state() - expected_query = "SELECT 1 FROM pg_publication WHERE pubname = '#{state.publication_name}'" - expected_next_state = %{state | step: :check_publication} - - assert {:query, ^expected_query, ^expected_next_state} = - TestReplicationConnection.handle_connect(state) - end - end - - describe "handle_result/2 callback" do - test "handle_result transitions from check_publication to check_publication_tables when publication exists" do - state = %{mock_state() | step: :check_publication} - result = [%Postgrex.Result{num_rows: 1}] - - expected_query = """ - SELECT schemaname, tablename - FROM pg_publication_tables - WHERE pubname = '#{state.publication_name}' - ORDER BY schemaname, tablename - """ - - expected_next_state = %{state | step: :check_publication_tables} - - assert {:query, ^expected_query, ^expected_next_state} = - TestReplicationConnection.handle_result(result, state) - end - - test "handle_result creates publication if it doesn't exist" do - state = %{mock_state() | step: :check_publication} - result = [%Postgrex.Result{num_rows: 0}] - - expected_tables = "test_schema.accounts,test_schema.resources" - expected_query = "CREATE PUBLICATION #{state.publication_name} FOR TABLE #{expected_tables}" - expected_next_state = %{state | step: :check_replication_slot} - - assert {:query, ^expected_query, ^expected_next_state} = - TestReplicationConnection.handle_result(result, state) - end - - test "handle_result proceeds to replication slot when publication tables are up to date" do - state = %{mock_state() | step: :check_publication_tables} - # Mock existing tables that match our desired tables exactly - result = [ - %Postgrex.Result{rows: [["test_schema", "accounts"], ["test_schema", "resources"]]} - ] - - expected_query = - "SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'" - - expected_next_state = %{state | step: :create_slot} - - assert {:query, ^expected_query, ^expected_next_state} = - TestReplicationConnection.handle_result(result, state) - end - - test "handle_result adds new tables when they are missing from publication" do - state = %{mock_state() | step: :check_publication_tables} - # Mock existing tables that are missing "resources" - result = [%Postgrex.Result{rows: [["test_schema", "accounts"]]}] - - expected_query = - "ALTER PUBLICATION #{state.publication_name} ADD TABLE test_schema.resources" - - expected_next_state = %{ - state - | step: :remove_publication_tables, - tables_to_remove: MapSet.new() + operation = %{ + lsn: lsn, + op: op, + table: table, + old_data: old_data, + data: data } - assert {:query, ^expected_query, ^expected_next_state} = - TestReplicationConnection.handle_result(result, state) + Map.put(state, :operations, [operation | operations]) end - test "handle_result removes unwanted tables when they exist in publication" do - state = %{mock_state() | step: :check_publication_tables} - # Mock existing tables that include an unwanted "old_table" - result = [ - %Postgrex.Result{ - rows: [ - ["test_schema", "accounts"], - ["test_schema", "resources"], - ["test_schema", "old_table"] - ] - } - ] + def on_flush(state) do + # Test implementation that counts flushes + flush_count = Map.get(state, :flush_count, 0) - expected_query = - "ALTER PUBLICATION #{state.publication_name} DROP TABLE test_schema.old_table" - - expected_next_state = %{state | step: :check_replication_slot} - - assert {:query, ^expected_query, ^expected_next_state} = - TestReplicationConnection.handle_result(result, state) - end - - test "handle_result adds tables first, then removes unwanted tables" do - state = %{mock_state() | step: :check_publication_tables} - # Mock existing tables that have "old_table" but missing "resources" - result = [ - %Postgrex.Result{rows: [["test_schema", "accounts"], ["test_schema", "old_table"]]} - ] - - expected_query = - "ALTER PUBLICATION #{state.publication_name} ADD TABLE test_schema.resources" - - expected_tables_to_remove = MapSet.new(["test_schema.old_table"]) - - expected_next_state = %{ - state - | step: :remove_publication_tables, - tables_to_remove: expected_tables_to_remove - } - - assert {:query, ^expected_query, ^expected_next_state} = - TestReplicationConnection.handle_result(result, state) - end - - test "verify MapSet behavior for debugging" do - # Verify that our MapSet is not empty - tables_to_remove = MapSet.new(["test_schema.old_table"]) - refute Enum.empty?(tables_to_remove) - assert MapSet.size(tables_to_remove) == 1 - assert MapSet.member?(tables_to_remove, "test_schema.old_table") - end - - test "handle_result removes tables after adding when tables_to_remove is not empty" do - tables_to_remove = MapSet.new(["test_schema.old_table"]) - - state = %{ - mock_state() - | step: :remove_publication_tables, - tables_to_remove: tables_to_remove - } - - result = [%Postgrex.Result{}] - - # Debug: verify the state is what we think it is - refute Enum.empty?(state.tables_to_remove) - assert state.step == :remove_publication_tables - - expected_query = - "ALTER PUBLICATION #{state.publication_name} DROP TABLE test_schema.old_table" - - expected_next_state = %{state | step: :check_replication_slot} - - assert {:query, ^expected_query, ^expected_next_state} = - TestReplicationConnection.handle_result(result, state) - end - - test "handle_result proceeds to replication slot when no tables to remove" do - state = %{mock_state() | step: :remove_publication_tables, tables_to_remove: MapSet.new()} - result = [%Postgrex.Result{}] - - expected_query = - "SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'" - - expected_next_state = %{state | step: :create_slot} - - assert {:query, ^expected_query, ^expected_next_state} = - TestReplicationConnection.handle_result(result, state) - end - - test "handle_result transitions from create_slot to start_replication_slot when slot exists" do - state = %{mock_state() | step: :create_slot} - result = [%Postgrex.Result{num_rows: 1}] - - expected_query = "SELECT 1" - expected_next_state = %{state | step: :start_replication_slot} - - assert {:query, ^expected_query, ^expected_next_state} = - TestReplicationConnection.handle_result(result, state) - end - - test "handle_result transitions from start_replication_slot to streaming" do - state = %{mock_state() | step: :start_replication_slot} - result = [%Postgrex.Result{num_rows: 1}] - - expected_stream_query = - "START_REPLICATION SLOT \"#{state.replication_slot_name}\" LOGICAL 0/0 (proto_version '#{state.proto_version}', publication_names '#{state.publication_name}')" - - expected_next_state = %{state | step: :streaming} - - assert {:stream, ^expected_stream_query, [], ^expected_next_state} = - TestReplicationConnection.handle_result(result, state) - end - - test "handle_result transitions from check_replication_slot to create_slot after creating publication" do - state = %{mock_state() | step: :check_replication_slot} - result = [%Postgrex.Result{num_rows: 0}] - - expected_query = - "SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'" - - expected_next_state = %{state | step: :create_slot} - - assert {:query, ^expected_query, ^expected_next_state} = - TestReplicationConnection.handle_result(result, state) - end - - test "handle_result creates replication slot if it doesn't exist" do - state = %{mock_state() | step: :create_slot} - result = [%Postgrex.Result{num_rows: 0}] - - expected_query = - "CREATE_REPLICATION_SLOT #{state.replication_slot_name} LOGICAL #{state.output_plugin} NOEXPORT_SNAPSHOT" - - expected_next_state = %{state | step: :start_replication_slot} - - assert {:query, ^expected_query, ^expected_next_state} = - TestReplicationConnection.handle_result(result, state) + state + |> Map.put(:flush_count, flush_count + 1) + |> Map.put(:flush_buffer, %{}) end end - describe "handle_data/2" do - test "handle_data handles KeepAlive with reply :now" do - state = %{mock_state() | step: :streaming} - wal_end = 12345 + # Create a test module that captures relation updates + defmodule RelationTestConnection do + use Domain.Replication.Connection - now = - System.os_time(:microsecond) - DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond) + def on_write(state, _, _, _, _, _), do: state - # 100 milliseconds - grace_period = 100_000 - keepalive_data = <> + def on_flush(state), do: state + end - assert {:noreply, reply, ^state} = - TestReplicationConnection.handle_data(keepalive_data, state) - - assert [<>] = reply - - assert now <= clock - assert clock < now + grace_period - end - - test "handle_data handles KeepAlive with reply :later" do - state = %{mock_state() | step: :streaming} - wal_end = 54321 - - keepalive_data = <> - expected_reply_message = [] - - assert {:noreply, ^expected_reply_message, ^state} = - TestReplicationConnection.handle_data(keepalive_data, state) - end - - test "handle_data handles Write message and increments counter" do - state = %{mock_state() | step: :streaming} - server_wal_start = 123_456_789 - server_wal_end = 987_654_321 - server_system_clock = 1_234_567_890 - message = "Hello, world!" - - write_data = - <> - - new_state = %{state | counter: state.counter + 1} - expected_wal_end = server_wal_end + 1 - - assert {:noreply, [ack_message], ^new_state} = - TestReplicationConnection.handle_data(write_data, state) - - # Validate the acknowledgment structure without pinning the timestamp - assert <> = ack_message - end - - test "handle_data handles unknown message" do - state = %{mock_state() | step: :streaming} - unknown_data = <> - - assert {:noreply, [], ^state} = TestReplicationConnection.handle_data(unknown_data, state) - end - - test "sends {:check_warning_threshold, lag_ms} > 5_000 ms" do - state = - %{mock_state() | step: :streaming} - |> Map.put(:warning_threshold_exceeded?, false) - - server_wal_start = 123_456_789 - server_wal_end = 987_654_321 - server_system_clock = 1_234_567_890 - lsn = <<0::32, 100::32>> - xid = <<0::32>> - - # Simulate a commit timestamp that exceeds the threshold - timestamp = - DateTime.diff(DateTime.utc_now(), ~U[2000-01-01 00:00:00Z], :microsecond) - 10_000_000 - - begin_data = <> - - write_message = - <> - - expected_wal_end = server_wal_end + 1 - - assert {:noreply, [ack_message], _state} = - TestReplicationConnection.handle_data(write_message, state) - - # Validate acknowledgment structure - assert <> = ack_message - - assert_receive({:check_warning_threshold, lag_ms}) - assert lag_ms > 5_000 - end - - test "sends {:check_warning_threshold, lag_ms} < 5_000 ms" do - state = - %{mock_state() | step: :streaming} - |> Map.put(:warning_threshold_exceeded?, true) - - server_wal_start = 123_456_789 - server_wal_end = 987_654_321 - server_system_clock = 1_234_567_890 - lsn = <<0::32, 100::32>> - xid = <<0::32>> - - # Simulate a commit timestamp that is within the threshold - timestamp = - DateTime.diff(DateTime.utc_now(), ~U[2000-01-01 00:00:00Z], :microsecond) - 1_000_000 - - begin_data = <> - - write_message = - <> - - expected_wal_end = server_wal_end + 1 - - assert {:noreply, [ack_message], _state} = - TestReplicationConnection.handle_data(write_message, state) - - # Validate acknowledgment structure - assert <> = ack_message - - assert_receive({:check_warning_threshold, lag_ms}) - assert lag_ms < 5_000 + describe "macro compilation" do + test "creates a module with required functions" do + # Verify the module was created with expected functions + assert function_exported?(TestReplicationConnection, :start_link, 1) + assert function_exported?(TestReplicationConnection, :init, 1) + assert function_exported?(TestReplicationConnection, :handle_connect, 1) + assert function_exported?(TestReplicationConnection, :handle_disconnect, 1) + assert function_exported?(TestReplicationConnection, :handle_result, 2) + assert function_exported?(TestReplicationConnection, :handle_data, 2) + assert function_exported?(TestReplicationConnection, :handle_info, 2) + assert function_exported?(TestReplicationConnection, :on_write, 6) + assert function_exported?(TestReplicationConnection, :on_flush, 1) end end - describe "handle_info/2" do - test "handle_info handles :shutdown message" do - state = mock_state() - assert {:disconnect, :normal} = TestReplicationConnection.handle_info(:shutdown, state) - end + describe "struct definition" do + test "defines correct struct with expected fields" do + assert %TestReplicationConnection{} = struct = %TestReplicationConnection{} - test "handle_info handles :DOWN message from monitored process" do - state = mock_state() - monitor_ref = make_ref() - down_msg = {:DOWN, monitor_ref, :process, :some_pid, :shutdown} - - assert {:disconnect, :normal} = TestReplicationConnection.handle_info(down_msg, state) - end - - test "handle_info ignores other messages" do - state = mock_state() - random_msg = {:some_other_info, "data"} - - assert {:noreply, ^state} = TestReplicationConnection.handle_info(random_msg, state) - end - - test "handle_info processes warning threshold alerts" do - state = Map.put(mock_state(), :warning_threshold_exceeded?, false) - - # Test crossing threshold - assert {:noreply, %{warning_threshold_exceeded?: true}} = - TestReplicationConnection.handle_info({:check_warning_threshold, 6_000}, state) - - # Test going back below threshold - state_above = %{state | warning_threshold_exceeded?: true} - - assert {:noreply, %{warning_threshold_exceeded?: false}} = - TestReplicationConnection.handle_info( - {:check_warning_threshold, 3_000}, - state_above - ) - - # Test staying below threshold - assert {:noreply, %{warning_threshold_exceeded?: false}} = - TestReplicationConnection.handle_info({:check_warning_threshold, 2_000}, state) - - # Test staying above threshold - assert {:noreply, %{warning_threshold_exceeded?: true}} = - TestReplicationConnection.handle_info( - {:check_warning_threshold, 7_000}, - state_above - ) + # Check default values + assert struct.schema == "public" + assert struct.step == :disconnected + assert struct.output_plugin == "pgoutput" + assert struct.proto_version == 1 + assert struct.table_subscriptions == [] + assert struct.relations == %{} + assert struct.counter == 0 + assert struct.tables_to_remove == MapSet.new() + assert struct.flush_interval == 0 + assert struct.flush_buffer == %{} + assert struct.last_flushed_lsn == 0 + assert struct.warning_threshold_exceeded? == false + assert struct.error_threshold_exceeded? == false + assert struct.flush_buffer_size == 0 + assert struct.status_log_interval == :timer.minutes(1) + assert struct.warning_threshold == :timer.seconds(30) + assert struct.error_threshold == :timer.seconds(60) end end - describe "error threshold functionality" do - test "handle_info sets error_threshold_exceeded? to true when lag exceeds error threshold" do - state = - mock_state() - |> Map.put(:error_threshold_exceeded?, false) - - # Test crossing the error threshold (60_000ms from TestReplicationConnection config) - assert {:noreply, updated_state} = - TestReplicationConnection.handle_info({:check_error_threshold, 65_000}, state) - - assert updated_state.error_threshold_exceeded? == true + describe "initialization" do + test "init/1 preserves state" do + initial_state = %TestReplicationConnection{counter: 42} + {:ok, state} = TestReplicationConnection.init(initial_state) + assert state.counter == 42 end - test "handle_info sets error_threshold_exceeded? to false when lag drops below error threshold" do - state = - mock_state() - |> Map.put(:error_threshold_exceeded?, true) + test "init/1 schedules flush when flush_interval > 0" do + initial_state = %TestReplicationConnection{flush_interval: 1000} + {:ok, _state} = TestReplicationConnection.init(initial_state) - # Test going back below threshold - assert {:noreply, updated_state} = - TestReplicationConnection.handle_info({:check_error_threshold, 30_000}, state) - - assert updated_state.error_threshold_exceeded? == false + # Should receive flush message after interval + assert_receive :flush, 1100 end - test "handle_info keeps error_threshold_exceeded? true when lag stays above error threshold" do - state = - mock_state() - |> Map.put(:error_threshold_exceeded?, true) + test "init/1 does not schedule flush when flush_interval is 0" do + initial_state = %TestReplicationConnection{flush_interval: 0} + {:ok, _state} = TestReplicationConnection.init(initial_state) - # Test staying above threshold - assert {:noreply, updated_state} = - TestReplicationConnection.handle_info({:check_error_threshold, 70_000}, state) - - assert updated_state.error_threshold_exceeded? == true - end - - test "handle_info keeps error_threshold_exceeded? false when lag stays below error threshold" do - state = - mock_state() - |> Map.put(:error_threshold_exceeded?, false) - - # Test staying below threshold - assert {:noreply, updated_state} = - TestReplicationConnection.handle_info({:check_error_threshold, 30_000}, state) - - assert updated_state.error_threshold_exceeded? == false + # Should not receive flush message + refute_receive :flush, 100 end end - describe "BEGIN message lag tracking with error threshold" do - test "sends both check_warning_threshold and check_error_threshold messages" do - state = - %{mock_state() | step: :streaming} - |> Map.put(:warning_threshold_exceeded?, false) - |> Map.put(:error_threshold_exceeded?, false) + describe "handle_connect/1" do + test "returns query to check publication" do + state = %TestReplicationConnection{publication_name: "test_pub"} - server_wal_start = 123_456_789 - server_wal_end = 987_654_321 - server_system_clock = 1_234_567_890 - lsn = <<0::32, 100::32>> - xid = <<0::32>> + {:query, query, new_state} = TestReplicationConnection.handle_connect(state) - # Simulate a commit timestamp that exceeds both thresholds (70 seconds lag) - timestamp = - DateTime.diff(DateTime.utc_now(), ~U[2000-01-01 00:00:00Z], :microsecond) - 70_000_000 - - begin_data = <> - - write_message = - <> - - expected_wal_end = server_wal_end + 1 - - assert {:noreply, [ack_message], _state} = - TestReplicationConnection.handle_data(write_message, state) - - # Validate acknowledgment structure - assert <> = ack_message - - # Should receive both threshold check messages - assert_receive {:check_warning_threshold, warning_lag_ms} - assert warning_lag_ms > 5_000 - - assert_receive {:check_error_threshold, error_lag_ms} - assert error_lag_ms > 60_000 - - # Both should report the same lag time - assert warning_lag_ms == error_lag_ms - end - - test "sends check_error_threshold with lag below error threshold" do - state = - %{mock_state() | step: :streaming} - |> Map.put(:warning_threshold_exceeded?, false) - |> Map.put(:error_threshold_exceeded?, false) - - server_wal_start = 123_456_789 - server_wal_end = 987_654_321 - server_system_clock = 1_234_567_890 - lsn = <<0::32, 100::32>> - xid = <<0::32>> - - # Simulate a commit timestamp with moderate lag (10 seconds) - timestamp = - DateTime.diff(DateTime.utc_now(), ~U[2000-01-01 00:00:00Z], :microsecond) - 10_000_000 - - begin_data = <> - - write_message = - <> - - expected_wal_end = server_wal_end + 1 - - assert {:noreply, [ack_message], _state} = - TestReplicationConnection.handle_data(write_message, state) - - # Validate acknowledgment structure - assert <> = ack_message - - # Should receive both threshold check messages - assert_receive {:check_warning_threshold, warning_lag_ms} - assert warning_lag_ms > 5_000 - - assert_receive {:check_error_threshold, error_lag_ms} - assert error_lag_ms < 60_000 - # Still above warning threshold - assert error_lag_ms > 5_000 - end - end - - describe "message processing bypass behavior" do - # Note: We can't directly test handle_message/3 since it's private, - # but we can test the behavior by mocking the on_insert/on_update/on_delete callbacks - # and verifying they're not called when error_threshold_exceeded? is true - - defmodule TestCallbackModule do - use Domain.Replication.Connection, - warning_threshold_ms: 5_000, - error_threshold_ms: 60_000 - - def on_insert(lsn, table, data) do - send(self(), {:callback_called, :on_insert, lsn, table, data}) - :ok - end - - def on_update(lsn, table, old_data, data) do - send(self(), {:callback_called, :on_update, lsn, table, old_data, data}) - :ok - end - - def on_delete(lsn, table, old_data) do - send(self(), {:callback_called, :on_delete, lsn, table, old_data}) - :ok - end - end - - test "insert processing is bypassed when error threshold exceeded" do - # Create a relation that can be referenced - relation = %{ - namespace: "test_schema", - name: "test_table", - columns: [%{name: "id"}, %{name: "name"}] - } - - state = - %TestCallbackModule{ - schema: "test_schema", - step: :streaming, - publication_name: "test_pub", - replication_slot_name: "test_slot", - output_plugin: "pgoutput", - proto_version: 1, - table_subscriptions: ["test_table"], - relations: %{1 => relation}, - counter: 0, - tables_to_remove: MapSet.new() - } - |> Map.put(:error_threshold_exceeded?, true) - - # Since we can't easily construct valid WAL insert messages without more - # complex setup, let's focus on testing the threshold management itself - - # The key test is that when error_threshold_exceeded? is true, - # the system should handle this gracefully - assert state.error_threshold_exceeded? == true - end - - test "callbacks are called when error threshold not exceeded" do - # Similar setup but with error_threshold_exceeded? set to false - relation = %{ - namespace: "test_schema", - name: "test_table", - columns: [%{name: "id"}, %{name: "name"}] - } - - state = - %TestCallbackModule{ - schema: "test_schema", - step: :streaming, - publication_name: "test_pub", - replication_slot_name: "test_slot", - output_plugin: "pgoutput", - proto_version: 1, - table_subscriptions: ["test_table"], - relations: %{1 => relation}, - counter: 0, - tables_to_remove: MapSet.new() - } - |> Map.put(:error_threshold_exceeded?, false) - - # Test that the state is properly configured for normal processing - assert state.error_threshold_exceeded? == false - - # The actual message processing would require constructing complex WAL messages, - # but the important thing is that the state management works correctly - end - end - - describe "error threshold integration with warning threshold" do - test "both thresholds can be managed independently" do - state = - mock_state() - |> Map.put(:warning_threshold_exceeded?, false) - |> Map.put(:error_threshold_exceeded?, false) - - # Cross warning threshold only - assert {:noreply, updated_state} = - TestReplicationConnection.handle_info({:check_warning_threshold, 10_000}, state) - - assert updated_state.warning_threshold_exceeded? == true - assert updated_state.error_threshold_exceeded? == false - - # Cross error threshold - assert {:noreply, updated_state2} = - TestReplicationConnection.handle_info( - {:check_error_threshold, 70_000}, - updated_state - ) - - assert updated_state2.warning_threshold_exceeded? == true - assert updated_state2.error_threshold_exceeded? == true - - # Drop below error threshold but stay above warning - assert {:noreply, updated_state3} = - TestReplicationConnection.handle_info( - {:check_error_threshold, 10_000}, - updated_state2 - ) - - assert updated_state3.warning_threshold_exceeded? == true - assert updated_state3.error_threshold_exceeded? == false - - # Drop below warning threshold - assert {:noreply, updated_state4} = - TestReplicationConnection.handle_info( - {:check_warning_threshold, 2_000}, - updated_state3 - ) - - assert updated_state4.warning_threshold_exceeded? == false - assert updated_state4.error_threshold_exceeded? == false + assert query == "SELECT 1 FROM pg_publication WHERE pubname = 'test_pub'" + assert new_state.step == :check_publication end end describe "handle_disconnect/1" do - test "handle_disconnect resets step to :disconnected" do - state = %{mock_state() | step: :streaming} - expected_state = %{state | step: :disconnected} + test "logs disconnection and updates state" do + state = %TestReplicationConnection{counter: 123, step: :streaming} - assert {:noreply, ^expected_state} = TestReplicationConnection.handle_disconnect(state) + log = + capture_log(fn -> + {:noreply, new_state} = TestReplicationConnection.handle_disconnect(state) + assert new_state.step == :disconnected + assert new_state.counter == 123 + end) + + assert log =~ "Replication connection disconnected" + assert log =~ "counter=123" + end + end + + describe "handle_info/2" do + test "handles :shutdown message" do + assert {:disconnect, :normal} = TestReplicationConnection.handle_info(:shutdown, %{}) + end + + test "handles DOWN message" do + assert {:disconnect, :normal} = + TestReplicationConnection.handle_info({:DOWN, nil, :process, nil, nil}, %{}) + end + + test "handles :flush message when flush_interval > 0" do + state = + %TestReplicationConnection{ + flush_interval: 1000, + flush_buffer: %{1 => %{data: "test"}} + } + |> Map.put(:operations, []) + |> Map.put(:flush_count, 0) + + {:noreply, new_state} = TestReplicationConnection.handle_info(:flush, state) + + # Our test on_flush implementation increments flush_count + assert Map.get(new_state, :flush_count) == 1 + assert new_state.flush_buffer == %{} + + # Should schedule next flush + assert_receive :flush, 1100 + end + + test "handles :interval_logger message" do + state = %TestReplicationConnection{counter: 456, status_log_interval: 100} + + log = + capture_log(fn -> + {:noreply, _new_state} = TestReplicationConnection.handle_info(:interval_logger, state) + end) + + assert log =~ "Processed 456 write messages from the WAL stream" + + # Should schedule next log + assert_receive :interval_logger, 150 + end + + test "handles warning threshold checks" do + # Below threshold + state = %TestReplicationConnection{ + warning_threshold_exceeded?: false, + warning_threshold: 1000 + } + + {:noreply, new_state} = + TestReplicationConnection.handle_info({:check_warning_threshold, 500}, state) + + assert new_state.warning_threshold_exceeded? == false + + # Above threshold + log = + capture_log(fn -> + {:noreply, new_state2} = + TestReplicationConnection.handle_info({:check_warning_threshold, 1500}, state) + + assert new_state2.warning_threshold_exceeded? == true + end) + + assert log =~ "Processing lag exceeds warning threshold" + + # Back below threshold + exceeded_state = %{state | warning_threshold_exceeded?: true} + + log2 = + capture_log(fn -> + {:noreply, new_state3} = + TestReplicationConnection.handle_info({:check_warning_threshold, 500}, exceeded_state) + + assert new_state3.warning_threshold_exceeded? == false + end) + + assert log2 =~ "Processing lag is back below warning threshold" + end + + test "handles error threshold checks" do + # Below threshold + state = %TestReplicationConnection{ + error_threshold_exceeded?: false, + error_threshold: 2000 + } + + {:noreply, new_state} = + TestReplicationConnection.handle_info({:check_error_threshold, 1000}, state) + + assert new_state.error_threshold_exceeded? == false + + # Above threshold + log = + capture_log(fn -> + {:noreply, new_state2} = + TestReplicationConnection.handle_info({:check_error_threshold, 3000}, state) + + assert new_state2.error_threshold_exceeded? == true + end) + + assert log =~ "Processing lag exceeds error threshold; skipping side effects!" + + # Back below threshold + exceeded_state = %{state | error_threshold_exceeded?: true} + + log2 = + capture_log(fn -> + {:noreply, new_state3} = + TestReplicationConnection.handle_info({:check_error_threshold, 1000}, exceeded_state) + + assert new_state3.error_threshold_exceeded? == false + end) + + assert log2 =~ "Processing lag is back below error threshold" + end + + test "handles unknown messages" do + state = %TestReplicationConnection{counter: 789} + {:noreply, new_state} = TestReplicationConnection.handle_info(:unknown_message, state) + assert new_state == state + end + end + + describe "write message handling" do + test "processes insert messages" do + state = + %TestReplicationConnection{ + relations: %{ + 1 => %{ + namespace: "public", + name: "users", + columns: [ + %{name: "id"}, + %{name: "name"} + ] + } + }, + counter: 0 + } + |> Map.put(:operations, []) + + # Test on_write callback directly + # In a real scenario, handle_data would parse WAL messages and eventually + # call on_write with the decoded operation data + new_state = + TestReplicationConnection.on_write( + state, + 100, + :insert, + "users", + nil, + %{"id" => "123", "name" => "John Doe"} + ) + + operations = Map.get(new_state, :operations, []) + assert length(operations) == 1 + [operation] = operations + assert operation.op == :insert + assert operation.table == "users" + assert operation.lsn == 100 + end + + test "processes update messages" do + state = + %TestReplicationConnection{} + |> Map.put(:operations, []) + + new_state = + TestReplicationConnection.on_write( + state, + 101, + :update, + "users", + %{"id" => "123", "name" => "John"}, + %{"id" => "123", "name" => "John Doe"} + ) + + operations = Map.get(new_state, :operations, []) + assert length(operations) == 1 + [operation] = operations + assert operation.op == :update + assert operation.old_data == %{"id" => "123", "name" => "John"} + assert operation.data == %{"id" => "123", "name" => "John Doe"} + end + + test "processes delete messages" do + state = + %TestReplicationConnection{} + |> Map.put(:operations, []) + + new_state = + TestReplicationConnection.on_write( + state, + 102, + :delete, + "users", + %{"id" => "123", "name" => "John Doe"}, + nil + ) + + operations = Map.get(new_state, :operations, []) + assert length(operations) == 1 + [operation] = operations + assert operation.op == :delete + assert operation.old_data == %{"id" => "123", "name" => "John Doe"} + assert operation.data == nil + end + + test "on_write always processes operations in test implementation" do + # Note: In the real implementation, process_write checks error_threshold_exceeded? + # but our test implementation doesn't, so operations are always processed + state = + %TestReplicationConnection{ + error_threshold_exceeded?: true + } + |> Map.put(:operations, []) + + new_state = + TestReplicationConnection.on_write( + state, + 103, + :insert, + "users", + nil, + %{"id" => "456"} + ) + + # Our test implementation always processes operations + operations = Map.get(new_state, :operations, []) + assert length(operations) == 1 + end + end + + describe "flush behavior" do + test "calls on_flush when buffer size reached" do + state = + %TestReplicationConnection{ + flush_buffer: %{1 => %{}, 2 => %{}}, + flush_buffer_size: 3 + } + |> Map.put(:flush_count, 0) + |> Map.put(:operations, []) + + # Adding one more should trigger flush + # In the real implementation, this would happen in process_write + # when maybe_flush is called + new_state = %{state | flush_buffer: Map.put(state.flush_buffer, 3, %{})} + + # Simulate maybe_flush logic + flushed_state = + if map_size(new_state.flush_buffer) >= new_state.flush_buffer_size do + TestReplicationConnection.on_flush(new_state) + else + new_state + end + + assert Map.get(flushed_state, :flush_count) == 1 + assert flushed_state.flush_buffer == %{} + end + end + + describe "publication and slot management flow" do + test "creates publication when it doesn't exist" do + state = %TestReplicationConnection{ + publication_name: "test_pub", + table_subscriptions: ["users", "posts"], + schema: "public", + step: :check_publication + } + + # Simulate publication not existing + {:query, query, new_state} = + TestReplicationConnection.handle_result( + [%Postgrex.Result{num_rows: 0}], + state + ) + + assert query == "CREATE PUBLICATION test_pub FOR TABLE public.users,public.posts" + assert new_state.step == :check_replication_slot + end + + test "checks publication tables when publication exists" do + state = %TestReplicationConnection{ + publication_name: "test_pub", + step: :check_publication + } + + {:query, query, new_state} = + TestReplicationConnection.handle_result( + [%Postgrex.Result{num_rows: 1}], + state + ) + + assert query =~ "SELECT schemaname, tablename" + assert query =~ "FROM pg_publication_tables" + assert query =~ "WHERE pubname = 'test_pub'" + assert new_state.step == :check_publication_tables + end + + test "creates replication slot when it doesn't exist" do + state = %TestReplicationConnection{ + replication_slot_name: "test_slot", + output_plugin: "pgoutput", + step: :create_slot + } + + {:query, query, new_state} = + TestReplicationConnection.handle_result( + [%Postgrex.Result{num_rows: 0}], + state + ) + + assert query == "CREATE_REPLICATION_SLOT test_slot LOGICAL pgoutput NOEXPORT_SNAPSHOT" + assert new_state.step == :start_replication_slot + end + + test "starts replication when slot exists" do + state = %TestReplicationConnection{ + replication_slot_name: "test_slot", + publication_name: "test_pub", + proto_version: 1, + step: :start_replication_slot + } + + {:stream, query, [], new_state} = + TestReplicationConnection.handle_result( + [%Postgrex.Result{}], + state + ) + + assert query =~ "START_REPLICATION SLOT \"test_slot\"" + assert query =~ "publication_names 'test_pub'" + assert query =~ "proto_version '1'" + assert new_state.step == :streaming + end + end + + describe "relation message handling" do + test "stores relation information" do + state = %RelationTestConnection{relations: %{}} + + # In the real implementation, this would be called from handle_data + # when a Relation message is received. Since handle_write is private, + # we can't test it directly, but we know it updates the relations map + relation = %{ + id: 1, + namespace: "public", + name: "test_table", + columns: [%{name: "id"}, %{name: "data"}] + } + + # The relation would be stored with id as key + new_state = %{state | relations: Map.put(state.relations, relation.id, relation)} + + assert new_state.relations[1].name == "test_table" + assert length(new_state.relations[1].columns) == 2 end end end diff --git a/elixir/config/config.exs b/elixir/config/config.exs index 31d929588..6d66a3c75 100644 --- a/elixir/config/config.exs +++ b/elixir/config/config.exs @@ -64,7 +64,19 @@ config :domain, Domain.ChangeLogs.ReplicationConnection, resource_connections resources tokens - ] + ], + # Allow up to 5 minutes of processing lag before alerting. This needs to be able to survive + # deploys without alerting. + warning_threshold: :timer.minutes(5), + + # We almost never want to bypass changelog inserts + error_threshold: :timer.hours(30 * 24), + + # Flush change logs data at least every 30 seconds + flush_interval: :timer.seconds(30), + + # We want to flush at most 500 change logs at a time + flush_buffer_size: 500 config :domain, Domain.Events.ReplicationConnection, replication_slot_name: "events_slot", @@ -98,7 +110,16 @@ config :domain, Domain.Events.ReplicationConnection, resource_connections resources tokens - ] + ], + # Allow up to 60 seconds of lag before alerting + warning_threshold: :timer.seconds(60), + + # Allow up to 30 minutes of lag before bypassing hooks + error_threshold: :timer.minutes(30), + + # Disable flush + flush_interval: 0, + flush_buffer_size: 0 config :domain, Domain.Tokens, key_base: "5OVYJ83AcoQcPmdKNksuBhJFBhjHD1uUa9mDOHV/6EIdBQ6pXksIhkVeWIzFk5S2", diff --git a/elixir/config/test.exs b/elixir/config/test.exs index 87eb6ae2c..7603f00bb 100644 --- a/elixir/config/test.exs +++ b/elixir/config/test.exs @@ -107,7 +107,8 @@ config :api, ############################### config :domain, Domain.Mailer, adapter: Domain.Mailer.TestAdapter -config :logger, level: :warning +# Allow asserting on info logs and higher +config :logger, level: :info config :argon2_elixir, t_cost: 1, m_cost: 8