From a9f49629aecdec13a35b4034ff949e32d600709e Mon Sep 17 00:00:00 2001 From: Jamil Date: Tue, 24 Jun 2025 19:06:20 -0700 Subject: [PATCH] feat(portal): add change_logs table and insert data (#9553) Building on the WAL consumer that's been in development over the past several weeks, we introduce a new `change_logs` table that stores very lightly up-fitted data decoded from the WAL: - `account_id` (indexed): a foreign key reference to an account. - `inserted_at` (indexed): the timestamp of insert, for truncating rows later. - `table`: the table where the op took place. - `op`: the operation performed (insert/update/delete) - `old_data`: a nullable map of the old row data (update/delete) - `data`: a nullable map of the new row data(insert/update) - `vsn`: an integer version field we can bump to signify schema changes in the data in case we need to apply operations to only new or only old data. Judging from our prod metrics, we're currently average about 1,000 write operations a minute, which will generate about 1-2 dozen changelogs / s. Doing the math on this, 30 days at our current volume will yield about 50M / month, which should be ok for some time, since this is an append-only, rarely (if ever) read from table. The one aspect of this we may need to handle sooner than later is batch-inserting these. That raises an issue though - currently, in this PR, we process each WAL event serially, ending with the final acknowledgement `:ok` which will signal to Postgres our status in processing the WAL. If we do anything async here, this processing "cursor" then becomes inaccurate, so we may need to think about what to track and what data we care about. Related: #7124 --- elixir/apps/domain/lib/domain/application.ex | 25 +- elixir/apps/domain/lib/domain/change_logs.ex | 10 + .../lib/domain/change_logs/change_log.ex | 16 + .../change_logs/change_log/changeset.ex | 80 +++ .../change_logs/replication_connection.ex | 82 +++ elixir/apps/domain/lib/domain/events/event.ex | 329 ----------- .../domain/lib/domain/events/hooks/flows.ex | 2 + .../domain/events/replication_connection.ex | 346 ++---------- .../lib/domain/replication/connection.ex | 476 ++++++++++++++++ .../domain/{events => replication}/decoder.ex | 6 +- .../manager.ex} | 26 +- .../{events => replication}/oid_database.ex | 2 +- .../{events => replication}/protocol.ex | 6 +- .../protocol/keep_alive.ex | 2 +- .../{events => replication}/protocol/write.ex | 2 +- .../20250616205321_create_change_logs.exs | 32 ++ ...50616230420_remove_replica_from_relays.exs | 20 + .../replication_connection_test.exs | 349 ++++++++++++ .../domain/test/domain/change_logs_test.exs | 176 ++++++ .../domain/test/domain/events/event_test.exs | 28 - .../events/replication_connection_test.exs | 297 ++++------ .../domain/replication/connection_test.exs | 308 +++++++++++ .../{events => replication}/decoder_test.exs | 6 +- elixir/config/config.exs | 35 +- elixir/config/runtime.exs | 15 + elixir/config/test.exs | 12 +- elixir/test.exs | 515 ------------------ 27 files changed, 1815 insertions(+), 1388 deletions(-) create mode 100644 elixir/apps/domain/lib/domain/change_logs.ex create mode 100644 elixir/apps/domain/lib/domain/change_logs/change_log.ex create mode 100644 elixir/apps/domain/lib/domain/change_logs/change_log/changeset.ex create mode 100644 elixir/apps/domain/lib/domain/change_logs/replication_connection.ex delete mode 100644 elixir/apps/domain/lib/domain/events/event.ex create mode 100644 elixir/apps/domain/lib/domain/replication/connection.ex rename elixir/apps/domain/lib/domain/{events => replication}/decoder.ex (98%) rename elixir/apps/domain/lib/domain/{events/replication_connection_manager.ex => replication/manager.ex} (60%) rename elixir/apps/domain/lib/domain/{events => replication}/oid_database.ex (98%) rename elixir/apps/domain/lib/domain/{events => replication}/protocol.ex (94%) rename elixir/apps/domain/lib/domain/{events => replication}/protocol/keep_alive.ex (93%) rename elixir/apps/domain/lib/domain/{events => replication}/protocol/write.ex (94%) create mode 100644 elixir/apps/domain/priv/repo/migrations/20250616205321_create_change_logs.exs create mode 100644 elixir/apps/domain/priv/repo/migrations/20250616230420_remove_replica_from_relays.exs create mode 100644 elixir/apps/domain/test/domain/change_logs/replication_connection_test.exs create mode 100644 elixir/apps/domain/test/domain/change_logs_test.exs delete mode 100644 elixir/apps/domain/test/domain/events/event_test.exs create mode 100644 elixir/apps/domain/test/domain/replication/connection_test.exs rename elixir/apps/domain/test/domain/{events => replication}/decoder_test.exs (99%) delete mode 100644 elixir/test.exs diff --git a/elixir/apps/domain/lib/domain/application.ex b/elixir/apps/domain/lib/domain/application.ex index 68a37dfa7..85ccd90a0 100644 --- a/elixir/apps/domain/lib/domain/application.ex +++ b/elixir/apps/domain/lib/domain/application.ex @@ -81,12 +81,25 @@ defmodule Domain.Application do end defp replication do - config = Application.fetch_env!(:domain, Domain.Events.ReplicationConnection) + connection_modules = [ + Domain.Events.ReplicationConnection, + Domain.ChangeLogs.ReplicationConnection + ] - if config[:enabled] do - [Domain.Events.ReplicationConnectionManager] - else - [] - end + # Filter out disabled replication connections + Enum.reduce(connection_modules, [], fn module, enabled -> + config = Application.fetch_env!(:domain, module) + + if config[:enabled] do + spec = %{ + id: module, + start: {Domain.Replication.Manager, :start_link, [module, []]} + } + + [spec | enabled] + else + enabled + end + end) end end diff --git a/elixir/apps/domain/lib/domain/change_logs.ex b/elixir/apps/domain/lib/domain/change_logs.ex new file mode 100644 index 000000000..8c665f0a4 --- /dev/null +++ b/elixir/apps/domain/lib/domain/change_logs.ex @@ -0,0 +1,10 @@ +defmodule Domain.ChangeLogs do + alias Domain.ChangeLogs.ChangeLog + alias Domain.Repo + + def create_change_log(attrs) do + attrs + |> ChangeLog.Changeset.changeset() + |> Repo.insert() + end +end diff --git a/elixir/apps/domain/lib/domain/change_logs/change_log.ex b/elixir/apps/domain/lib/domain/change_logs/change_log.ex new file mode 100644 index 000000000..48b7037b4 --- /dev/null +++ b/elixir/apps/domain/lib/domain/change_logs/change_log.ex @@ -0,0 +1,16 @@ +defmodule Domain.ChangeLogs.ChangeLog do + use Domain, :schema + + schema "change_logs" do + belongs_to :account, Domain.Accounts.Account + + field :lsn, :integer + field :table, :string + field :op, Ecto.Enum, values: [:insert, :update, :delete] + field :old_data, :map + field :data, :map + field :vsn, :integer + + timestamps(type: :utc_datetime_usec, updated_at: false) + end +end diff --git a/elixir/apps/domain/lib/domain/change_logs/change_log/changeset.ex b/elixir/apps/domain/lib/domain/change_logs/change_log/changeset.ex new file mode 100644 index 000000000..ee2482fc4 --- /dev/null +++ b/elixir/apps/domain/lib/domain/change_logs/change_log/changeset.ex @@ -0,0 +1,80 @@ +defmodule Domain.ChangeLogs.ChangeLog.Changeset do + use Domain, :changeset + + @fields ~w[account_id lsn table op old_data data vsn]a + + def changeset(attrs) do + %Domain.ChangeLogs.ChangeLog{} + |> cast(attrs, @fields) + |> validate_inclusion(:op, [:insert, :update, :delete]) + |> validate_correct_data_present() + |> validate_same_account() + |> put_account_id() + |> validate_required([:account_id, :lsn, :table, :op, :vsn]) + |> foreign_key_constraint(:account_id, name: :change_logs_account_id_fkey) + end + + # :insert requires old_data = nil and data != nil + # :update requires old_data != nil and data != nil + # :delete requires old_data != nil and data = nil + def validate_correct_data_present(changeset) do + op = get_field(changeset, :op) + old_data = get_field(changeset, :old_data) + data = get_field(changeset, :data) + + case {op, old_data, data} do + {:insert, nil, %{} = _data} -> + changeset + + {:update, %{} = _old_data, %{} = _data} -> + changeset + + {:delete, %{} = _old_data, nil} -> + changeset + + _ -> + add_error(changeset, :base, "Invalid combination of operation and data") + end + end + + # Add an error if data["account_id"] != old_data["account_id"] + defp validate_same_account(changeset) do + old_data = get_field(changeset, :old_data) + data = get_field(changeset, :data) + + account_id_key = account_id_field(changeset) + + if old_data && data && old_data[account_id_key] != data[account_id_key] do + add_error(changeset, :base, "Account ID cannot be changed") + else + changeset + end + end + + # Populate account_id from one of data, old_data + defp put_account_id(changeset) do + old_data = get_field(changeset, :old_data) + data = get_field(changeset, :data) + + account_id_key = account_id_field(changeset) + + account_id = + case {old_data, data} do + {nil, nil} -> nil + {_, %{^account_id_key => id}} -> id + {%{^account_id_key => id}, _} -> id + _ -> nil + end + + put_change(changeset, :account_id, account_id) + end + + # For accounts table updates, the account_id is in the "id" field + # For other tables, it is in the "account_id" field + defp account_id_field(changeset) do + case get_field(changeset, :table) do + "accounts" -> "id" + _ -> "account_id" + end + end +end diff --git a/elixir/apps/domain/lib/domain/change_logs/replication_connection.ex b/elixir/apps/domain/lib/domain/change_logs/replication_connection.ex new file mode 100644 index 000000000..63fa6aecd --- /dev/null +++ b/elixir/apps/domain/lib/domain/change_logs/replication_connection.ex @@ -0,0 +1,82 @@ +defmodule Domain.ChangeLogs.ReplicationConnection do + alias Domain.ChangeLogs + + use Domain.Replication.Connection, + # Allow up to 30 seconds of processing lag before alerting - this should be less + # than or equal to the wal_sender_timeout setting, which is typically 60s. + alert_threshold_ms: 30_000, + publication_name: "change_logs" + + # Bump this to signify a change in the audit log schema. Use with care. + @vsn 0 + + def on_insert(lsn, table, data) do + log(:insert, lsn, table, nil, data) + end + + def on_update(lsn, table, old_data, data) do + log(:update, lsn, table, old_data, data) + end + + def on_delete(lsn, table, old_data) do + log(:delete, lsn, table, old_data, nil) + end + + # Relay group tokens don't have account_ids + + defp log(_op, _lsn, "tokens", %{"type" => "relay_group"}, _data) do + :ok + end + + defp log(_op, _lsn, "tokens", _old_data, %{"type" => "relay_group"}) do + :ok + end + + defp log(_op, _lsn, "flows", _old_data, _data) do + # TODO: WAL + # Flows are not logged to the change log as they are used only to trigger side effects which + # will be removed. Remove the flows table publication when that happens. + :ok + end + + defp log(op, lsn, table, old_data, data) do + attrs = %{ + op: op, + lsn: lsn, + table: table, + old_data: old_data, + data: data, + vsn: @vsn + } + + case ChangeLogs.create_change_log(attrs) do + {:ok, _change_log} -> + :ok + + {:error, %Ecto.Changeset{errors: errors} = changeset} -> + if foreign_key_error?(errors) do + # Expected under normal operation when an account is deleted + :ok + else + Logger.warning("Failed to create change log", + errors: inspect(changeset.errors), + table: table, + op: op, + lsn: lsn, + vsn: @vsn + ) + + # TODO: WAL + # Don't ignore failures to insert change logs. Improve this after we have some + # operational experience with the data flowing in here. + :ok + end + end + end + + defp foreign_key_error?(errors) do + Enum.any?(errors, fn {field, {message, _}} -> + field == :account_id and message == "does not exist" + end) + end +end diff --git a/elixir/apps/domain/lib/domain/events/event.ex b/elixir/apps/domain/lib/domain/events/event.ex deleted file mode 100644 index ba0e832ca..000000000 --- a/elixir/apps/domain/lib/domain/events/event.ex +++ /dev/null @@ -1,329 +0,0 @@ -defmodule Domain.Events.Event do - alias Domain.Events.Decoder - alias Domain.Events.Hooks - - require Logger - - @doc """ - Ingests a WAL write message from Postgres, transforms it into an event, and sends - it to the appropriate hook module for processing. - """ - def ingest(msg, relations) do - {op, old_tuple_data, tuple_data} = extract_msg_data(msg) - {:ok, relation} = Map.fetch(relations, msg.relation_id) - - table = relation.name - old_data = zip(old_tuple_data, relation.columns) - data = zip(tuple_data, relation.columns) - - process(op, table, old_data, data) - - # TODO: WAL - # This is only for load testing. Remove this. - Domain.PubSub.broadcast("events", {op, table, old_data, data}) - end - - ############ - # accounts # - ############ - - defp process(:insert, "accounts", _old_data, data) do - Hooks.Accounts.on_insert(data) - end - - defp process(:update, "accounts", old_data, data) do - Hooks.Accounts.on_update(old_data, data) - end - - defp process(:delete, "accounts", old_data, _data) do - Hooks.Accounts.on_delete(old_data) - end - - ########################### - # actor_group_memberships # - ########################### - - defp process(:insert, "actor_group_memberships", _old_data, data) do - Hooks.ActorGroupMemberships.on_insert(data) - end - - defp process(:update, "actor_group_memberships", old_data, data) do - Hooks.ActorGroupMemberships.on_update(old_data, data) - end - - defp process(:delete, "actor_group_memberships", old_data, _data) do - Hooks.ActorGroupMemberships.on_delete(old_data) - end - - ################ - # actor_groups # - ################ - - defp process(:insert, "actor_groups", _old_data, data) do - Hooks.ActorGroups.on_insert(data) - end - - defp process(:update, "actor_groups", old_data, data) do - Hooks.ActorGroups.on_update(old_data, data) - end - - defp process(:delete, "actor_groups", old_data, _data) do - Hooks.ActorGroups.on_delete(old_data) - end - - ########## - # actors # - ########## - - defp process(:insert, "actors", _old_data, data) do - Hooks.Actors.on_insert(data) - end - - defp process(:update, "actors", old_data, data) do - Hooks.Actors.on_update(old_data, data) - end - - defp process(:delete, "actors", old_data, _data) do - Hooks.Actors.on_delete(old_data) - end - - ################### - # auth_identities # - ################### - - defp process(:insert, "auth_identities", _old_data, data) do - Hooks.AuthIdentities.on_insert(data) - end - - defp process(:update, "auth_identities", old_data, data) do - Hooks.AuthIdentities.on_update(old_data, data) - end - - defp process(:delete, "auth_identities", old_data, _data) do - Hooks.AuthIdentities.on_delete(old_data) - end - - ################## - # auth_providers # - ################## - - defp process(:insert, "auth_providers", _old_data, data) do - Hooks.AuthProviders.on_insert(data) - end - - defp process(:update, "auth_providers", old_data, data) do - Hooks.AuthProviders.on_update(old_data, data) - end - - defp process(:delete, "auth_providers", old_data, _data) do - Hooks.AuthProviders.on_delete(old_data) - end - - ########### - # clients # - ########### - - defp process(:insert, "clients", _old_data, data) do - Hooks.Clients.on_insert(data) - end - - defp process(:update, "clients", old_data, data) do - Hooks.Clients.on_update(old_data, data) - end - - defp process(:delete, "clients", old_data, _data) do - Hooks.Clients.on_delete(old_data) - end - - ################### - # flow_activities # - ################### - - defp process(:insert, "flow_activities", _old_data, data) do - Hooks.FlowActivities.on_insert(data) - end - - defp process(:update, "flow_activities", old_data, data) do - Hooks.FlowActivities.on_update(old_data, data) - end - - defp process(:delete, "flow_activities", old_data, _data) do - Hooks.FlowActivities.on_delete(old_data) - end - - ######### - # flows # - ######### - - defp process(:insert, "flows", _old_data, data) do - Hooks.Flows.on_insert(data) - end - - defp process(:update, "flows", old_data, data) do - Hooks.Flows.on_update(old_data, data) - end - - defp process(:delete, "flows", old_data, _data) do - Hooks.Flows.on_delete(old_data) - end - - ################## - # gateway_groups # - ################## - - defp process(:insert, "gateway_groups", _old_data, data) do - Hooks.GatewayGroups.on_insert(data) - end - - defp process(:update, "gateway_groups", old_data, data) do - Hooks.GatewayGroups.on_update(old_data, data) - end - - defp process(:delete, "gateway_groups", old_data, _data) do - Hooks.GatewayGroups.on_delete(old_data) - end - - ############ - # gateways # - ############ - - defp process(:insert, "gateways", _old_data, data) do - Hooks.Gateways.on_insert(data) - end - - defp process(:update, "gateways", old_data, data) do - Hooks.Gateways.on_update(old_data, data) - end - - defp process(:delete, "gateways", old_data, _data) do - Hooks.Gateways.on_delete(old_data) - end - - ############ - # policies # - ############ - - defp process(:insert, "policies", _old_data, data) do - Hooks.Policies.on_insert(data) - end - - defp process(:update, "policies", old_data, data) do - Hooks.Policies.on_update(old_data, data) - end - - defp process(:delete, "policies", old_data, _data) do - Hooks.Policies.on_delete(old_data) - end - - ################ - # relay_groups # - ################ - - defp process(:insert, "relay_groups", _old_data, data) do - Hooks.RelayGroups.on_insert(data) - end - - defp process(:update, "relay_groups", old_data, data) do - Hooks.RelayGroups.on_update(old_data, data) - end - - defp process(:delete, "relay_groups", old_data, _data) do - Hooks.RelayGroups.on_delete(old_data) - end - - ########## - # relays # - ########## - - defp process(:insert, "relays", _old_data, data) do - Hooks.Relays.on_insert(data) - end - - defp process(:update, "relays", old_data, data) do - Hooks.Relays.on_update(old_data, data) - end - - defp process(:delete, "relays", old_data, _data) do - Hooks.Relays.on_delete(old_data) - end - - ######################## - # resource_connections # - ######################## - - defp process(:insert, "resource_connections", _old_data, data) do - Hooks.ResourceConnections.on_insert(data) - end - - defp process(:update, "resource_connections", old_data, data) do - Hooks.ResourceConnections.on_update(old_data, data) - end - - defp process(:delete, "resource_connections", old_data, _data) do - Hooks.ResourceConnections.on_delete(old_data) - end - - ############# - # resources # - ############# - - defp process(:insert, "resources", _old_data, data) do - Hooks.Resources.on_insert(data) - end - - defp process(:update, "resources", old_data, data) do - Hooks.Resources.on_update(old_data, data) - end - - defp process(:delete, "resources", old_data, _data) do - Hooks.Resources.on_delete(old_data) - end - - ########## - # tokens # - ########## - - defp process(:insert, "tokens", _old_data, data) do - Hooks.Tokens.on_insert(data) - end - - defp process(:update, "tokens", old_data, data) do - Hooks.Tokens.on_update(old_data, data) - end - - defp process(:delete, "tokens", old_data, _data) do - Hooks.Tokens.on_delete(old_data) - end - - ############# - # CATCH-ALL # - ############# - - defp process(op, table, _old_data, _data) do - Logger.warning("Unhandled event type!", op: op, table: table) - - :ok - end - - defp extract_msg_data(%Decoder.Messages.Insert{tuple_data: data}) do - {:insert, nil, data} - end - - defp extract_msg_data(%Decoder.Messages.Update{old_tuple_data: old, tuple_data: data}) do - {:update, old, data} - end - - defp extract_msg_data(%Decoder.Messages.Delete{old_tuple_data: old}) do - {:delete, old, nil} - end - - defp zip(nil, _), do: nil - - defp zip(tuple_data, columns) do - tuple_data - |> Tuple.to_list() - |> Enum.zip(columns) - |> Map.new(fn {value, column} -> {column.name, value} end) - |> Enum.into(%{}) - end -end diff --git a/elixir/apps/domain/lib/domain/events/hooks/flows.ex b/elixir/apps/domain/lib/domain/events/hooks/flows.ex index c6fbcac3f..d0806ab03 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/flows.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/flows.ex @@ -1,3 +1,5 @@ +# TODO: WAL +# Move side-effects from flows to state table in clients and gateways defmodule Domain.Events.Hooks.Flows do @behaviour Domain.Events.Hooks alias Domain.PubSub diff --git a/elixir/apps/domain/lib/domain/events/replication_connection.ex b/elixir/apps/domain/lib/domain/events/replication_connection.ex index afb869f88..46e913c84 100644 --- a/elixir/apps/domain/lib/domain/events/replication_connection.ex +++ b/elixir/apps/domain/lib/domain/events/replication_connection.ex @@ -1,313 +1,67 @@ -# CREDIT: https://github.com/supabase/realtime/blob/main/lib/realtime/tenants/replication_connection.ex defmodule Domain.Events.ReplicationConnection do - @moduledoc """ - Receives WAL events from PostgreSQL and broadcasts them where they need to go. + alias Domain.Events.Hooks - Generally, we only want to start one of these connections per cluster in order - to obtain a serial stream of the WAL. We can then fanout these events to the - appropriate consumers. + use Domain.Replication.Connection, + # Allow up to 5 seconds of lag before alerting + alert_threshold_ms: 5_000, + publication_name: "events" - The ReplicationConnection is started with a durable slot so that whatever data we - fail to acknowledge is retained in the slot on the server's disk. The server will - then send us the data when we reconnect. This is important because we want to - ensure that we don't lose any WAL data if we disconnect or crash, such as during a deploy. - - The WAL data we receive is sent only once a COMMIT completes on the server. So even though - COMMIT is one of the message types we receive here, we can safely ignore it and process - insert/update/delete messages one-by-one in this module as we receive them. - """ - use Postgrex.ReplicationConnection require Logger - import Domain.Events.Protocol - import Domain.Events.Decoder + @tables_to_hooks %{ + "accounts" => Hooks.Accounts, + "actor_group_memberships" => Hooks.ActorGroupMemberships, + "actor_groups" => Hooks.ActorGroups, + "actors" => Hooks.Actors, + "auth_identities" => Hooks.AuthIdentities, + "auth_providers" => Hooks.AuthProviders, + "clients" => Hooks.Clients, + "flow_activities" => Hooks.FlowActivities, + "flows" => Hooks.Flows, + "gateway_groups" => Hooks.GatewayGroups, + "gateways" => Hooks.Gateways, + "policies" => Hooks.Policies, + "resource_connections" => Hooks.ResourceConnections, + "resources" => Hooks.Resources, + "tokens" => Hooks.Tokens + } - alias Domain.Events.Event - alias Domain.Events.Decoder - alias Domain.Events.Protocol.{KeepAlive, Write} + def on_insert(_lsn, table, data) do + hook = Map.get(@tables_to_hooks, table) - @status_log_interval :timer.minutes(5) - - @type t :: %__MODULE__{ - schema: String.t(), - step: - :disconnected - | :check_publication - | :create_publication - | :check_replication_slot - | :create_slot - | :start_replication_slot - | :streaming, - publication_name: String.t(), - replication_slot_name: String.t(), - output_plugin: String.t(), - proto_version: integer(), - table_subscriptions: list(), - relations: map(), - counter: integer() - } - defstruct schema: "public", - step: :disconnected, - publication_name: "events", - replication_slot_name: "events_slot", - output_plugin: "pgoutput", - proto_version: 1, - table_subscriptions: [], - relations: %{}, - counter: 0 - - def start_link(%{instance: %__MODULE__{} = instance, connection_opts: connection_opts}) do - # Start only one ReplicationConnection in the cluster. - opts = connection_opts ++ [name: {:global, __MODULE__}] - - Postgrex.ReplicationConnection.start_link(__MODULE__, instance, opts) + if hook do + hook.on_insert(data) + else + log_warning(:insert, table) + :ok + end end - @impl true - def init(state) do - {:ok, state} + def on_update(_lsn, table, old_data, data) do + hook = Map.get(@tables_to_hooks, table) + + if hook do + hook.on_update(old_data, data) + else + log_warning(:update, table) + :ok + end end - @doc """ - Called when we make a successful connection to the PostgreSQL server. - """ + def on_delete(_lsn, table, old_data) do + hook = Map.get(@tables_to_hooks, table) - @impl true - def handle_connect(state) do - query = "SELECT 1 FROM pg_publication WHERE pubname = '#{state.publication_name}'" - {:query, query, %{state | step: :create_publication}} + if hook do + hook.on_delete(old_data) + else + log_warning(:delete, table) + :ok + end end - @doc """ - Generic callback that handles replies to the queries we send. - - We use a simple state machine to issue queries one at a time to Postgres in order to: - - 1. Check if the publication exists - 2. Check if the replication slot exists - 3. Create the publication if it doesn't exist - 4. Create the replication slot if it doesn't exist - 5. Start the replication slot - 6. Start streaming data from the replication slot - """ - - @impl true - - def handle_result( - [%Postgrex.Result{num_rows: 1}], - %__MODULE__{step: :create_publication} = state - ) do - query = - "SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'" - - {:query, query, %{state | step: :create_replication_slot}} - end - - def handle_result( - [%Postgrex.Result{num_rows: 1}], - %__MODULE__{step: :create_replication_slot} = state - ) do - {:query, "SELECT 1", %{state | step: :start_replication_slot}} - end - - def handle_result([%Postgrex.Result{}], %__MODULE__{step: :start_replication_slot} = state) do - Logger.info("Starting replication slot", state: inspect(state)) - - # Start logging regular status updates - send(self(), :interval_logger) - - query = - "START_REPLICATION SLOT \"#{state.replication_slot_name}\" LOGICAL 0/0 (proto_version '#{state.proto_version}', publication_names '#{state.publication_name}')" - - {:stream, query, [], %{state | step: :streaming}} - end - - def handle_result( - [%Postgrex.Result{num_rows: 0}], - %__MODULE__{step: :create_publication} = state - ) do - tables = - state.table_subscriptions - |> Enum.map_join(",", fn table -> "#{state.schema}.#{table}" end) - - query = "CREATE PUBLICATION #{state.publication_name} FOR TABLE #{tables}" - {:query, query, %{state | step: :check_replication_slot}} - end - - def handle_result([%Postgrex.Result{}], %__MODULE__{step: :check_replication_slot} = state) do - query = - "SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'" - - {:query, query, %{state | step: :create_replication_slot}} - end - - def handle_result( - [%Postgrex.Result{num_rows: 0}], - %__MODULE__{step: :create_replication_slot} = state - ) do - query = - "CREATE_REPLICATION_SLOT #{state.replication_slot_name} LOGICAL #{state.output_plugin} NOEXPORT_SNAPSHOT" - - {:query, query, %{state | step: :start_replication_slot}} - end - - @doc """ - Called when we receive a message from the PostgreSQL server. - - We handle the following messages: - 1. KeepAlive: A message sent by PostgreSQL to keep the connection alive and also acknowledge - processed data by responding with the current WAL position. - 2. Write: A message containing a WAL event - the actual data we are interested in. - 3. Unknown: Any other message that we don't know how to handle - we log and ignore it. - - For the KeepAlive message, we respond immediately with the current WAL position. Note: it is expected - that we receive many more of these messages than expected. That is because the rate at which the server - sends these messages scales proportionally to the number of Write messages it sends. - - For the Write message, we send broadcast for each message one-by-one as we receive it. This is important - because the WAL stream from Postgres is ordered; if we reply to a Keepalive advancing the WAL position, - we should have already processed all the messages up to that point. - """ - - @impl true - - def handle_data(data, state) when is_keep_alive(data) do - %KeepAlive{reply: reply, wal_end: wal_end} = parse(data) - - wal_end = wal_end + 1 - - message = - case reply do - :now -> standby_status(wal_end, wal_end, wal_end, reply) - :later -> hold() - end - - {:noreply, message, state} - end - - def handle_data(data, state) when is_write(data) do - %Write{message: message} = parse(data) - - message - |> decode_message() - |> handle_message(%{state | counter: state.counter + 1}) - end - - def handle_data(data, state) do - Logger.error("Unknown WAL message received!", - data: inspect(data), - state: inspect(state) + defp log_warning(op, table) do + Logger.warning( + "No hook defined for #{op} on table #{table}. Please implement Domain.Events.Hooks for this table." ) - - {:noreply, [], state} - end - - # Handles messages received: - # - # 1. Insert/Update/Delete - send to Event.ingest/2 for further processing - # 2. Relation messages - store the relation data in our state so we can use it later - # to associate column names etc with the data we receive. In practice, we'll always - # see a Relation message before we see any data for that relation. - # 3. Begin/Commit/Origin/Truncate/Type - we ignore these messages for now - # 4. Graceful shutdown - we respond with {:disconnect, :normal} to - # indicate that we are shutting down gracefully and prevent auto reconnecting. - defp handle_message( - %Decoder.Messages.Relation{ - id: id, - namespace: namespace, - name: name, - columns: columns - }, - state - ) do - relation = %{ - namespace: namespace, - name: name, - columns: columns - } - - {:noreply, [], %{state | relations: Map.put(state.relations, id, relation)}} - end - - defp handle_message(%Decoder.Messages.Insert{} = msg, state) do - :ok = Event.ingest(msg, state.relations) - {:noreply, [], state} - end - - defp handle_message(%Decoder.Messages.Update{} = msg, state) do - :ok = Event.ingest(msg, state.relations) - {:noreply, [], state} - end - - defp handle_message(%Decoder.Messages.Delete{} = msg, state) do - :ok = Event.ingest(msg, state.relations) - {:noreply, [], state} - end - - defp handle_message(%Decoder.Messages.Begin{}, state) do - {:noreply, [], state} - end - - defp handle_message(%Decoder.Messages.Commit{}, state) do - {:noreply, [], state} - end - - defp handle_message(%Decoder.Messages.Origin{}, state) do - {:noreply, [], state} - end - - defp handle_message(%Decoder.Messages.Truncate{}, state) do - {:noreply, [], state} - end - - defp handle_message(%Decoder.Messages.Type{}, state) do - {:noreply, [], state} - end - - defp handle_message(%Decoder.Messages.Unsupported{data: data}, state) do - Logger.warning("Unsupported message received", - data: inspect(data), - counter: state.counter - ) - - {:noreply, [], state} - end - - @impl true - - def handle_info(:interval_logger, state) do - Logger.info("Processed #{state.counter} write messages from the WAL stream") - - Process.send_after(self(), :interval_logger, @status_log_interval) - - {:noreply, state} - end - - def handle_info(:shutdown, _), do: {:disconnect, :normal} - def handle_info({:DOWN, _, :process, _, _}, _), do: {:disconnect, :normal} - - def handle_info(_, state), do: {:noreply, state} - - @doc """ - Called when the connection is disconnected unexpectedly. - - This will happen if: - 1. Postgres is restarted such as during a maintenance window - 2. The connection is closed by the server due to our failure to acknowledge - Keepalive messages in a timely manner - 3. The connection is cut due to a network error - 4. The ReplicationConnection process crashes or is killed abruptly for any reason - 5. Potentially during a deploy if the connection is not closed gracefully. - - Our Supervisor will restart this process automatically so this is not an error. - """ - - @impl true - def handle_disconnect(state) do - Logger.info("Replication connection disconnected", - counter: state.counter - ) - - {:noreply, %{state | step: :disconnected}} end end diff --git a/elixir/apps/domain/lib/domain/replication/connection.ex b/elixir/apps/domain/lib/domain/replication/connection.ex new file mode 100644 index 000000000..3ec398b4c --- /dev/null +++ b/elixir/apps/domain/lib/domain/replication/connection.ex @@ -0,0 +1,476 @@ +# CREDIT: https://github.com/supabase/realtime/blob/main/lib/realtime/tenants/replication_connection.ex +defmodule Domain.Replication.Connection do + @moduledoc """ + Receives WAL events from PostgreSQL and broadcasts them where they need to go. + + The ReplicationConnection is started with a durable slot so that whatever data we + fail to acknowledge is retained in the slot on the server's disk. The server will + then send us the data when we reconnect. This is important because we want to + ensure that we don't lose any WAL data if we disconnect or crash, such as during a deploy. + + The WAL data we receive is sent only once a COMMIT completes on the server. So even though + COMMIT is one of the message types we receive here, we can safely ignore it and process + insert/update/delete messages one-by-one in this module as we receive them. + + ## Usage + + defmodule MyApp.ReplicationConnection do + use Domain.Replication.Connection, + alert_threshold_ms: 30_000, + publication_name: "my_events" + end + + ## Options + + * `:alert_threshold_ms` - How long to allow the WAL stream to lag before logging a warning (default: 5000) + * `:publication_name` - Name of the PostgreSQL publication (default: "events") + """ + + defmacro __using__(opts \\ []) do + # Compose all the quote blocks without nesting + [ + basic_setup(), + struct_and_constants(opts), + connection_functions(), + query_handlers(), + data_handlers(), + message_handlers(), + transaction_handlers(), + ignored_message_handlers(), + utility_functions(), + info_handlers(opts), + default_callbacks() + ] + end + + # Extract basic imports and aliases + defp basic_setup do + quote do + use Postgrex.ReplicationConnection + require Logger + require OpenTelemetry.Tracer + + import Domain.Replication.Protocol + import Domain.Replication.Decoder + + alias Domain.Replication.Decoder + alias Domain.Replication.Protocol.{KeepAlive, Write} + end + end + + # Extract struct definition and constants + defp struct_and_constants(opts) do + quote bind_quoted: [opts: opts] do + # Only these two are configurable + @alert_threshold_ms Keyword.fetch!(opts, :alert_threshold_ms) + @publication_name Keyword.fetch!(opts, :publication_name) + + # Everything else uses defaults + @status_log_interval :timer.minutes(5) + @replication_slot_name "#{@publication_name}_slot" + @schema "public" + @output_plugin "pgoutput" + @proto_version 1 + + @type t :: %__MODULE__{ + schema: String.t(), + step: + :disconnected + | :check_publication + | :create_publication + | :check_replication_slot + | :create_slot + | :start_replication_slot + | :streaming, + publication_name: String.t(), + replication_slot_name: String.t(), + output_plugin: String.t(), + proto_version: integer(), + table_subscriptions: list(), + relations: map(), + counter: integer() + } + + defstruct schema: @schema, + step: :disconnected, + publication_name: @publication_name, + replication_slot_name: @replication_slot_name, + output_plugin: @output_plugin, + proto_version: @proto_version, + table_subscriptions: [], + relations: %{}, + counter: 0 + end + end + + # Extract connection setup functions + defp connection_functions do + quote do + def start_link(%{instance: %__MODULE__{} = instance, connection_opts: connection_opts}) do + opts = connection_opts ++ [name: {:global, __MODULE__}] + Postgrex.ReplicationConnection.start_link(__MODULE__, instance, opts) + end + + @impl true + def init(state) do + {:ok, Map.put(state, :lag_threshold_exceeded, false)} + end + + @doc """ + Called when we make a successful connection to the PostgreSQL server. + """ + @impl true + def handle_connect(state) do + query = "SELECT 1 FROM pg_publication WHERE pubname = '#{state.publication_name}'" + {:query, query, %{state | step: :create_publication}} + end + + @doc """ + Called when the connection is disconnected unexpectedly. + + This will happen if: + 1. Postgres is restarted such as during a maintenance window + 2. The connection is closed by the server due to our failure to acknowledge + Keepalive messages in a timely manner + 3. The connection is cut due to a network error + 4. The ReplicationConnection process crashes or is killed abruptly for any reason + 5. Potentially during a deploy if the connection is not closed gracefully. + + Our Supervisor will restart this process automatically so this is not an error. + """ + @impl true + def handle_disconnect(state) do + Logger.info("#{__MODULE__}: Replication connection disconnected", + counter: state.counter + ) + + {:noreply, %{state | step: :disconnected}} + end + end + end + + # Extract query result handlers + defp query_handlers do + quote do + @doc """ + Generic callback that handles replies to the queries we send. + + We use a simple state machine to issue queries one at a time to Postgres in order to: + + 1. Check if the publication exists + 2. Check if the replication slot exists + 3. Create the publication if it doesn't exist + 4. Create the replication slot if it doesn't exist + 5. Start the replication slot + 6. Start streaming data from the replication slot + """ + @impl true + def handle_result( + [%Postgrex.Result{num_rows: 1}], + %__MODULE__{step: :create_publication} = state + ) do + query = + "SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'" + + {:query, query, %{state | step: :create_replication_slot}} + end + + def handle_result( + [%Postgrex.Result{num_rows: 1}], + %__MODULE__{step: :create_replication_slot} = state + ) do + {:query, "SELECT 1", %{state | step: :start_replication_slot}} + end + + def handle_result([%Postgrex.Result{}], %__MODULE__{step: :start_replication_slot} = state) do + Logger.info("Starting replication slot #{state.replication_slot_name}", + state: inspect(state) + ) + + # Start logging regular status updates + send(self(), :interval_logger) + + query = + "START_REPLICATION SLOT \"#{state.replication_slot_name}\" LOGICAL 0/0 (proto_version '#{state.proto_version}', publication_names '#{state.publication_name}')" + + {:stream, query, [], %{state | step: :streaming}} + end + + def handle_result( + [%Postgrex.Result{num_rows: 0}], + %__MODULE__{step: :create_publication} = state + ) do + tables = + state.table_subscriptions + |> Enum.map_join(",", fn table -> "#{state.schema}.#{table}" end) + + query = "CREATE PUBLICATION #{state.publication_name} FOR TABLE #{tables}" + {:query, query, %{state | step: :check_replication_slot}} + end + + def handle_result([%Postgrex.Result{}], %__MODULE__{step: :check_replication_slot} = state) do + query = + "SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'" + + {:query, query, %{state | step: :create_replication_slot}} + end + + def handle_result( + [%Postgrex.Result{num_rows: 0}], + %__MODULE__{step: :create_replication_slot} = state + ) do + query = + "CREATE_REPLICATION_SLOT #{state.replication_slot_name} LOGICAL #{state.output_plugin} NOEXPORT_SNAPSHOT" + + {:query, query, %{state | step: :start_replication_slot}} + end + end + end + + # Extract data handling functions + defp data_handlers do + quote do + @doc """ + Called when we receive a message from the PostgreSQL server. + + We handle the following messages: + 1. KeepAlive: A message sent by PostgreSQL to keep the connection alive and also acknowledge + processed data by responding with the current WAL position. + 2. Write: A message containing a WAL event - the actual data we are interested in. + 3. Unknown: Any other message that we don't know how to handle - we log and ignore it. + + For the KeepAlive message, we respond immediately with the current WAL position. Note: it is expected + that we receive many more of these messages than expected. That is because the rate at which the server + sends these messages scales proportionally to the number of Write messages it sends. + + For the Write message, we send broadcast for each message one-by-one as we receive it. This is important + because the WAL stream from Postgres is ordered; if we reply to a Keepalive advancing the WAL position, + we should have already processed all the messages up to that point. + """ + @impl true + def handle_data(data, state) when is_keep_alive(data) do + %KeepAlive{reply: reply, wal_end: wal_end} = parse(data) + + wal_end = wal_end + 1 + + message = + case reply do + :now -> standby_status(wal_end, wal_end, wal_end, reply) + :later -> hold() + end + + {:noreply, message, state} + end + + def handle_data(data, state) when is_write(data) do + OpenTelemetry.Tracer.with_span "#{__MODULE__}.handle_data/2" do + %Write{server_wal_end: server_wal_end, message: message} = parse(data) + + message + |> decode_message() + |> handle_message(server_wal_end, %{state | counter: state.counter + 1}) + end + end + + def handle_data(data, state) do + Logger.error("#{__MODULE__}: Unknown WAL message received!", + data: inspect(data), + state: inspect(state) + ) + + {:noreply, [], state} + end + end + end + + # Extract core message handling functions + defp message_handlers do + quote do + # Handles messages received: + # + # 1. Insert/Update/Delete/Begin/Commit - send to appropriate hook + # 2. Relation messages - store the relation data in our state so we can use it later + # to associate column names etc with the data we receive. In practice, we'll always + # see a Relation message before we see any data for that relation. + # 3. Origin/Truncate/Type - we ignore these messages for now + # 4. Graceful shutdown - we respond with {:disconnect, :normal} to + # indicate that we are shutting down gracefully and prevent auto reconnecting. + defp handle_message( + %Decoder.Messages.Relation{ + id: id, + namespace: namespace, + name: name, + columns: columns + }, + _server_wal_end, + state + ) do + relation = %{ + namespace: namespace, + name: name, + columns: columns + } + + {:noreply, [], %{state | relations: Map.put(state.relations, id, relation)}} + end + + defp handle_message(%Decoder.Messages.Insert{} = msg, server_wal_end, state) do + {op, table, _old_data, data} = transform(msg, state.relations) + :ok = on_insert(server_wal_end, table, data) + {:noreply, [], state} + end + + defp handle_message(%Decoder.Messages.Update{} = msg, server_wal_end, state) do + {op, table, old_data, data} = transform(msg, state.relations) + :ok = on_update(server_wal_end, table, old_data, data) + {:noreply, [], state} + end + + defp handle_message(%Decoder.Messages.Delete{} = msg, server_wal_end, state) do + {op, table, old_data, _data} = transform(msg, state.relations) + :ok = on_delete(server_wal_end, table, old_data) + {:noreply, [], state} + end + end + end + + # Extract transaction and ignored message handlers + defp transaction_handlers do + quote do + defp handle_message(%Decoder.Messages.Begin{} = msg, server_wal_end, state) do + {:noreply, [], state} + end + + defp handle_message( + %Decoder.Messages.Commit{commit_timestamp: commit_timestamp} = msg, + _server_wal_end, + state + ) do + # Since we receive a commit for each operation and we process each operation + # one-by-one, we can use the commit timestamp to check if we are lagging behind. + lag_ms = DateTime.diff(commit_timestamp, DateTime.utc_now(), :millisecond) + send(self(), {:check_alert, lag_ms}) + + {:noreply, [], state} + end + end + end + + # Extract handlers for ignored message types + defp ignored_message_handlers do + quote do + # These messages are not relevant for our use case, so we ignore them. + defp handle_message(%Decoder.Messages.Origin{}, _server_wal_end, state) do + {:noreply, [], state} + end + + defp handle_message(%Decoder.Messages.Truncate{}, _server_wal_end, state) do + {:noreply, [], state} + end + + defp handle_message(%Decoder.Messages.Type{}, _server_wal_end, state) do + {:noreply, [], state} + end + + defp handle_message(%Decoder.Messages.Unsupported{data: data}, _server_wal_end, state) do + Logger.warning("#{__MODULE__}: Unsupported message received", + data: inspect(data), + counter: state.counter + ) + + {:noreply, [], state} + end + end + end + + # Extract data transformation utilities + defp utility_functions do + quote do + defp transform(msg, relations) do + {op, old_tuple_data, tuple_data} = extract_msg_data(msg) + {:ok, relation} = Map.fetch(relations, msg.relation_id) + table = relation.name + old_data = zip(old_tuple_data, relation.columns) + data = zip(tuple_data, relation.columns) + + { + op, + table, + old_data, + data + } + end + + defp extract_msg_data(%Decoder.Messages.Insert{tuple_data: data}) do + {:insert, nil, data} + end + + defp extract_msg_data(%Decoder.Messages.Update{old_tuple_data: old, tuple_data: data}) do + {:update, old, data} + end + + defp extract_msg_data(%Decoder.Messages.Delete{old_tuple_data: old}) do + {:delete, old, nil} + end + + defp zip(nil, _), do: nil + + defp zip(tuple_data, columns) do + tuple_data + |> Tuple.to_list() + |> Enum.zip(columns) + |> Map.new(fn {value, column} -> {column.name, value} end) + |> Enum.into(%{}) + end + end + end + + # Extract info handlers + defp info_handlers(opts) do + quote bind_quoted: [opts: opts] do + @alert_threshold_ms Keyword.fetch!(opts, :alert_threshold_ms) + @status_log_interval :timer.minutes(5) + + @impl true + # Log only once when crossing the threshold + def handle_info({:check_alert, lag_ms}, %{lag_threshold_exceeded: false} = state) + when lag_ms >= @alert_threshold_ms do + Logger.warning("#{__MODULE__}: Processing lag exceeds threshold", lag_ms: lag_ms) + {:noreply, %{state | lag_threshold_exceeded: true}} + end + + def handle_info({:check_alert, lag_ms}, %{lag_threshold_exceeded: true} = state) + when lag_ms < @alert_threshold_ms do + Logger.info("#{__MODULE__}: Processing lag is back below threshold", lag_ms: lag_ms) + {:noreply, %{state | lag_threshold_exceeded: false}} + end + + def handle_info(:interval_logger, state) do + Logger.info( + "#{__MODULE__}: Processed #{state.counter} write messages from the WAL stream" + ) + + Process.send_after(self(), :interval_logger, @status_log_interval) + + {:noreply, state} + end + + def handle_info(:shutdown, _), do: {:disconnect, :normal} + def handle_info({:DOWN, _, :process, _, _}, _), do: {:disconnect, :normal} + + def handle_info(_, state), do: {:noreply, state} + end + end + + # Extract default callback implementations + defp default_callbacks do + quote do + # Default implementations for required callbacks - modules using this should implement these + def on_insert(_lsn, _table, _data), do: :ok + def on_update(_lsn, _table, _old_data, _data), do: :ok + def on_delete(_lsn, _table, _old_data), do: :ok + + defoverridable on_insert: 3, on_update: 4, on_delete: 3 + end + end +end diff --git a/elixir/apps/domain/lib/domain/events/decoder.ex b/elixir/apps/domain/lib/domain/replication/decoder.ex similarity index 98% rename from elixir/apps/domain/lib/domain/events/decoder.ex rename to elixir/apps/domain/lib/domain/replication/decoder.ex index e2660f7ce..62a4d7456 100644 --- a/elixir/apps/domain/lib/domain/events/decoder.ex +++ b/elixir/apps/domain/lib/domain/replication/decoder.ex @@ -1,5 +1,5 @@ # CREDIT: https://github.com/supabase/realtime/blob/main/lib/realtime/adapters/postgres/decoder.ex -defmodule Domain.Events.Decoder do +defmodule Domain.Replication.Decoder do @moduledoc """ Functions for decoding different types of logical replication messages. """ @@ -148,7 +148,7 @@ defmodule Domain.Events.Decoder do Unsupported } - alias Domain.Events.OidDatabase + alias Domain.Replication.OidDatabase @doc """ Parses logical replication messages from Postgres @@ -156,7 +156,7 @@ defmodule Domain.Events.Decoder do ## Examples iex> decode_message(<<73, 0, 0, 96, 0, 78, 0, 2, 116, 0, 0, 0, 3, 98, 97, 122, 116, 0, 0, 0, 3, 53, 54, 48>>) - %Domain.Events.Decoder.Messages.Insert{relation_id: 24576, tuple_data: {"baz", "560"}} + %Domain.Replication.Decoder.Messages.Insert{relation_id: 24576, tuple_data: {"baz", "560"}} """ def decode_message(message) when is_binary(message) do diff --git a/elixir/apps/domain/lib/domain/events/replication_connection_manager.ex b/elixir/apps/domain/lib/domain/replication/manager.ex similarity index 60% rename from elixir/apps/domain/lib/domain/events/replication_connection_manager.ex rename to elixir/apps/domain/lib/domain/replication/manager.ex index d1541f707..e4da21786 100644 --- a/elixir/apps/domain/lib/domain/events/replication_connection_manager.ex +++ b/elixir/apps/domain/lib/domain/replication/manager.ex @@ -1,4 +1,4 @@ -defmodule Domain.Events.ReplicationConnectionManager do +defmodule Domain.Replication.Manager do @moduledoc """ Manages the Postgrex.ReplicationConnection to ensure that we always have one running to prevent unbounded growth of the WAL log and ensure we are processing events. @@ -12,22 +12,22 @@ defmodule Domain.Events.ReplicationConnectionManager do # but not too long to avoid broadcasting needed events. @max_retries 10 - def start_link(opts) do - GenServer.start_link(__MODULE__, opts, name: __MODULE__) + def start_link(connection_module, opts) do + GenServer.start_link(__MODULE__, connection_module, opts) end @impl true - def init(_opts) do - send(self(), :connect) + def init(connection_module) do + send(self(), {:connect, connection_module}) {:ok, %{retries: 0}} end @impl true - def handle_info(:connect, %{retries: retries} = state) do - Process.send_after(self(), :connect, @retry_interval) + def handle_info({:connect, connection_module}, %{retries: retries} = state) do + Process.send_after(self(), {:connect, connection_module}, @retry_interval) - case Domain.Events.ReplicationConnection.start_link(replication_child_spec()) do + case connection_module.start_link(replication_child_spec(connection_module)) do {:ok, _pid} -> # Our process won {:noreply, %{state | retries: 0}} @@ -38,7 +38,7 @@ defmodule Domain.Events.ReplicationConnectionManager do {:error, reason} -> if retries < @max_retries do - Logger.info("Failed to start replication connection", + Logger.info("Failed to start replication connection #{connection_module}", retries: retries, max_retries: @max_retries, reason: inspect(reason) @@ -47,7 +47,7 @@ defmodule Domain.Events.ReplicationConnectionManager do {:noreply, %{state | retries: retries + 1}} else Logger.error( - "Failed to start replication connection after #{@max_retries} attempts, giving up!", + "Failed to start replication connection #{connection_module} after #{@max_retries} attempts, giving up!", reason: inspect(reason) ) @@ -57,14 +57,14 @@ defmodule Domain.Events.ReplicationConnectionManager do end end - defp replication_child_spec do + def replication_child_spec(connection_module) do {connection_opts, config} = - Application.fetch_env!(:domain, Domain.Events.ReplicationConnection) + Application.fetch_env!(:domain, connection_module) |> Keyword.pop(:connection_opts) %{ connection_opts: connection_opts, - instance: struct(Domain.Events.ReplicationConnection, config) + instance: struct(connection_module, config) } end end diff --git a/elixir/apps/domain/lib/domain/events/oid_database.ex b/elixir/apps/domain/lib/domain/replication/oid_database.ex similarity index 98% rename from elixir/apps/domain/lib/domain/events/oid_database.ex rename to elixir/apps/domain/lib/domain/replication/oid_database.ex index c1626385f..0cee74c6a 100644 --- a/elixir/apps/domain/lib/domain/events/oid_database.ex +++ b/elixir/apps/domain/lib/domain/replication/oid_database.ex @@ -1,5 +1,5 @@ # CREDIT: https://github.com/supabase/realtime/blob/main/lib/realtime/adapters/postgres/oid_database.ex -defmodule Domain.Events.OidDatabase do +defmodule Domain.Replication.OidDatabase do @moduledoc "This module maps a numeric PostgreSQL type ID to a descriptive string." @doc """ diff --git a/elixir/apps/domain/lib/domain/events/protocol.ex b/elixir/apps/domain/lib/domain/replication/protocol.ex similarity index 94% rename from elixir/apps/domain/lib/domain/events/protocol.ex rename to elixir/apps/domain/lib/domain/replication/protocol.ex index 6b3dd68b6..9fd38f8e3 100644 --- a/elixir/apps/domain/lib/domain/events/protocol.ex +++ b/elixir/apps/domain/lib/domain/replication/protocol.ex @@ -1,10 +1,10 @@ # CREDIT: https://github.com/supabase/realtime/blob/main/lib/realtime/adapters/postgres/protocol.ex -defmodule Domain.Events.Protocol do +defmodule Domain.Replication.Protocol do @moduledoc """ This module is responsible for parsing the Postgres WAL messages. """ - alias Domain.Events.Protocol.Write - alias Domain.Events.Protocol.KeepAlive + alias Domain.Replication.Protocol.Write + alias Domain.Replication.Protocol.KeepAlive defguard is_write(value) when binary_part(value, 0, 1) == <> defguard is_keep_alive(value) when binary_part(value, 0, 1) == <> diff --git a/elixir/apps/domain/lib/domain/events/protocol/keep_alive.ex b/elixir/apps/domain/lib/domain/replication/protocol/keep_alive.ex similarity index 93% rename from elixir/apps/domain/lib/domain/events/protocol/keep_alive.ex rename to elixir/apps/domain/lib/domain/replication/protocol/keep_alive.ex index 21e2cf978..916b980d9 100644 --- a/elixir/apps/domain/lib/domain/events/protocol/keep_alive.ex +++ b/elixir/apps/domain/lib/domain/replication/protocol/keep_alive.ex @@ -1,4 +1,4 @@ -defmodule Domain.Events.Protocol.KeepAlive do +defmodule Domain.Replication.Protocol.KeepAlive do @moduledoc """ Primary keepalive message (B) Byte1('k') diff --git a/elixir/apps/domain/lib/domain/events/protocol/write.ex b/elixir/apps/domain/lib/domain/replication/protocol/write.ex similarity index 94% rename from elixir/apps/domain/lib/domain/events/protocol/write.ex rename to elixir/apps/domain/lib/domain/replication/protocol/write.ex index 9af5f146b..87178950f 100644 --- a/elixir/apps/domain/lib/domain/events/protocol/write.ex +++ b/elixir/apps/domain/lib/domain/replication/protocol/write.ex @@ -1,4 +1,4 @@ -defmodule Domain.Events.Protocol.Write do +defmodule Domain.Replication.Protocol.Write do @moduledoc """ XLogData (B) Byte1('w') diff --git a/elixir/apps/domain/priv/repo/migrations/20250616205321_create_change_logs.exs b/elixir/apps/domain/priv/repo/migrations/20250616205321_create_change_logs.exs new file mode 100644 index 000000000..be714db6c --- /dev/null +++ b/elixir/apps/domain/priv/repo/migrations/20250616205321_create_change_logs.exs @@ -0,0 +1,32 @@ +defmodule Domain.Repo.Migrations.CreateChangeLogs do + use Ecto.Migration + + def up do + create table(:change_logs, primary_key: false) do + add(:id, :binary_id, primary_key: true) + + add(:account_id, references(:accounts, type: :binary_id, on_delete: :delete_all), + null: false + ) + + add(:lsn, :bigint, null: false) + add(:table, :string, null: false) + add(:op, :string, null: false) + add(:old_data, :map) + add(:data, :map) + add(:vsn, :integer, null: false) + + timestamps(type: :utc_datetime_usec, updated_at: false) + end + + # For pulling logs for a particular customer + create(index(:change_logs, [:account_id])) + + # For truncating logs by date + create(index(:change_logs, [:inserted_at])) + end + + def down do + drop(table(:change_logs)) + end +end diff --git a/elixir/apps/domain/priv/repo/migrations/20250616230420_remove_replica_from_relays.exs b/elixir/apps/domain/priv/repo/migrations/20250616230420_remove_replica_from_relays.exs new file mode 100644 index 000000000..8493def13 --- /dev/null +++ b/elixir/apps/domain/priv/repo/migrations/20250616230420_remove_replica_from_relays.exs @@ -0,0 +1,20 @@ +defmodule Domain.Repo.Migrations.RemoveReplicaFromRelays do + use Ecto.Migration + + @relations ~w[ + relay_groups + relays + ] + + def up do + for relation <- @relations do + execute("ALTER TABLE #{relation} REPLICA IDENTITY DEFAULT") + end + end + + def down do + for relation <- @relations do + execute("ALTER TABLE #{relation} REPLICA IDENTITY FULL") + end + end +end diff --git a/elixir/apps/domain/test/domain/change_logs/replication_connection_test.exs b/elixir/apps/domain/test/domain/change_logs/replication_connection_test.exs new file mode 100644 index 000000000..5c4049d6f --- /dev/null +++ b/elixir/apps/domain/test/domain/change_logs/replication_connection_test.exs @@ -0,0 +1,349 @@ +defmodule Domain.ChangeLogs.ReplicationConnectionTest do + use Domain.DataCase, async: true + + import ExUnit.CaptureLog + import Ecto.Query + import Domain.ChangeLogs.ReplicationConnection + alias Domain.ChangeLogs.ChangeLog + alias Domain.Repo + + setup do + account = Fixtures.Accounts.create_account() + %{account: account} + end + + describe "on_insert/2" do + test "ignores flows table - no record created" do + table = "flows" + data = %{"id" => 1, "name" => "test flow"} + + initial_count = Repo.aggregate(ChangeLog, :count, :id) + + assert :ok = on_insert(0, table, data) + + # No record should be created for flows + final_count = Repo.aggregate(ChangeLog, :count, :id) + assert final_count == initial_count + end + + test "creates change log record for non-flows tables", %{account: account} do + table = "accounts" + data = %{"id" => account.id, "name" => "test account"} + + assert :ok = on_insert(0, table, data) + + # Verify the record was created + change_log = Repo.one!(from cl in ChangeLog, order_by: [desc: cl.inserted_at], limit: 1) + + assert change_log.op == :insert + assert change_log.table == table + assert change_log.old_data == nil + assert change_log.data == data + assert change_log.vsn == 0 + assert change_log.account_id == account.id + end + + test "creates records for different table types", %{account: account} do + test_cases = [ + {"accounts", %{"id" => account.id, "name" => "test account"}}, + {"resources", + %{"id" => Ecto.UUID.generate(), "name" => "test resource", "account_id" => account.id}}, + {"policies", + %{"id" => Ecto.UUID.generate(), "name" => "test policy", "account_id" => account.id}}, + {"actors", + %{"id" => Ecto.UUID.generate(), "name" => "test actor", "account_id" => account.id}} + ] + + for {table, data} <- test_cases do + initial_count = Repo.aggregate(ChangeLog, :count, :id) + + assert :ok = on_insert(0, table, data) + + final_count = Repo.aggregate(ChangeLog, :count, :id) + assert final_count == initial_count + 1 + + change_log = Repo.one!(from cl in ChangeLog, order_by: [desc: cl.inserted_at], limit: 1) + + assert change_log.op == :insert + assert change_log.table == table + assert change_log.old_data == nil + assert change_log.data == data + assert change_log.vsn == 0 + assert change_log.account_id == account.id + end + end + end + + describe "on_update/3" do + test "ignores flows table - no record created" do + table = "flows" + old_data = %{"id" => 1, "name" => "old flow"} + data = %{"id" => 1, "name" => "new flow"} + + initial_count = Repo.aggregate(ChangeLog, :count, :id) + + assert :ok = on_update(0, table, old_data, data) + + # No record should be created for flows + final_count = Repo.aggregate(ChangeLog, :count, :id) + assert final_count == initial_count + end + + test "creates change log record for non-flows tables", %{account: account} do + table = "accounts" + old_data = %{"id" => account.id, "name" => "old name"} + data = %{"id" => account.id, "name" => "new name"} + + assert :ok = on_update(0, table, old_data, data) + + # Verify the record was created + change_log = Repo.one!(from cl in ChangeLog, order_by: [desc: cl.inserted_at], limit: 1) + + assert change_log.op == :update + assert change_log.table == table + assert change_log.old_data == old_data + assert change_log.data == data + assert change_log.vsn == 0 + assert change_log.account_id == account.id + end + + test "handles complex data structures", %{account: account} do + table = "resources" + resource_id = Ecto.UUID.generate() + + old_data = %{ + "id" => resource_id, + "name" => "old name", + "account_id" => account.id, + "settings" => %{"theme" => "dark", "notifications" => true} + } + + data = %{ + "id" => resource_id, + "name" => "new name", + "account_id" => account.id, + "settings" => %{"theme" => "light", "notifications" => false}, + "tags" => ["updated", "important"] + } + + assert :ok = on_update(0, table, old_data, data) + + change_log = Repo.one!(from cl in ChangeLog, order_by: [desc: cl.inserted_at], limit: 1) + + assert change_log.op == :update + assert change_log.table == table + assert change_log.old_data == old_data + assert change_log.data == data + assert change_log.vsn == 0 + assert change_log.account_id == account.id + end + end + + describe "on_delete/2" do + test "ignores flows table - no record created" do + table = "flows" + old_data = %{"id" => 1, "name" => "deleted flow"} + + initial_count = Repo.aggregate(ChangeLog, :count, :id) + + assert :ok = on_delete(0, table, old_data) + + # No record should be created for flows + final_count = Repo.aggregate(ChangeLog, :count, :id) + assert final_count == initial_count + end + + test "creates change log record for non-flows tables", %{account: account} do + table = "accounts" + old_data = %{"id" => account.id, "name" => "deleted account"} + + assert :ok = on_delete(0, table, old_data) + + # Verify the record was created + change_log = Repo.one!(from cl in ChangeLog, order_by: [desc: cl.inserted_at], limit: 1) + + assert change_log.op == :delete + assert change_log.table == table + assert change_log.old_data == old_data + assert change_log.data == nil + assert change_log.vsn == 0 + assert change_log.account_id == account.id + end + + test "handles various data types in old_data", %{account: account} do + table = "resources" + resource_id = Ecto.UUID.generate() + + old_data = %{ + "id" => resource_id, + "name" => "complex resource", + "account_id" => account.id, + "metadata" => %{ + "created_by" => "system", + "permissions" => ["read", "write"], + "config" => %{"timeout" => 30, "retries" => 3} + }, + "active" => true, + "count" => 42 + } + + assert :ok = on_delete(0, table, old_data) + + change_log = Repo.one!(from cl in ChangeLog, order_by: [desc: cl.inserted_at], limit: 1) + + assert change_log.op == :delete + assert change_log.table == table + assert change_log.old_data == old_data + assert change_log.data == nil + assert change_log.vsn == 0 + assert change_log.account_id == account.id + end + end + + describe "error handling" do + test "handles foreign key errors gracefully" do + # Create a change log entry that references a non-existent account + table = "resources" + # Non-existent account_id + data = %{"id" => Ecto.UUID.generate(), "account_id" => Ecto.UUID.generate()} + + initial_count = Repo.aggregate(ChangeLog, :count, :id) + + # Should return :ok even if foreign key constraint fails + assert :ok = on_insert(0, table, data) + + # No record should be created due to foreign key error + final_count = Repo.aggregate(ChangeLog, :count, :id) + assert final_count == initial_count + end + + test "logs and handles non-foreign-key validation errors gracefully", %{account: account} do + # Test with invalid data that would cause validation errors (not foreign key) + table = "accounts" + # Missing required fields but valid FK + data = %{"account_id" => account.id} + + initial_count = Repo.aggregate(ChangeLog, :count, :id) + + log_output = + capture_log(fn -> + assert :ok = on_insert(0, table, data) + end) + + # Should log the error + assert log_output =~ "Failed to create change log" + + # No record should be created + final_count = Repo.aggregate(ChangeLog, :count, :id) + assert final_count == initial_count + end + end + + describe "data integrity" do + test "preserves exact data structures", %{account: account} do + table = "policies" + policy_id = Ecto.UUID.generate() + + # Test with various data types + complex_data = %{ + "id" => policy_id, + "account_id" => account.id, + "string_field" => "test string", + "integer_field" => 42, + "boolean_field" => true, + "null_field" => nil, + "array_field" => [1, "two", %{"three" => 3}], + "nested_object" => %{ + "level1" => %{ + "level2" => %{ + "deep_value" => "preserved" + } + } + } + } + + assert :ok = on_insert(0, table, complex_data) + + change_log = Repo.one!(from cl in ChangeLog, order_by: [desc: cl.inserted_at], limit: 1) + + # Data should be preserved exactly as provided + assert change_log.data == complex_data + assert change_log.op == :insert + assert change_log.table == table + assert change_log.account_id == account.id + end + + test "tracks operation sequence correctly", %{account: account} do + table = "accounts" + initial_data = %{"id" => account.id, "name" => "initial"} + updated_data = %{"id" => account.id, "name" => "updated"} + + # Insert + assert :ok = on_insert(0, table, initial_data) + + # Update + assert :ok = on_update(0, table, initial_data, updated_data) + + # Delete + assert :ok = on_delete(0, table, updated_data) + + # Get the three most recent records in reverse chronological order + logs = + Repo.all( + from cl in ChangeLog, + where: cl.account_id == ^account.id, + order_by: [desc: cl.inserted_at], + limit: 3 + ) + + # Should have 3 records (delete, update, insert in that order) + assert length(logs) >= 3 + [delete_log, update_log, insert_log] = Enum.take(logs, 3) + + # Verify sequence (most recent first) + assert delete_log.op == :delete + assert delete_log.old_data == updated_data + assert delete_log.data == nil + assert delete_log.account_id == account.id + + assert update_log.op == :update + assert update_log.old_data == initial_data + assert update_log.data == updated_data + assert update_log.account_id == account.id + + assert insert_log.op == :insert + assert insert_log.old_data == nil + assert insert_log.data == initial_data + assert insert_log.account_id == account.id + + # All should have same version + assert insert_log.vsn == 0 + assert update_log.vsn == 0 + assert delete_log.vsn == 0 + end + end + + describe "flows table comprehensive test" do + test "flows table never creates records regardless of operation or data" do + initial_count = Repo.aggregate(ChangeLog, :count, :id) + + # Test various data shapes and operations + test_data_sets = [ + %{}, + %{"id" => 1}, + %{"complex" => %{"nested" => ["data", 1, true, nil]}}, + nil + ] + + for data <- test_data_sets do + assert :ok = on_insert(0, "flows", data) + assert :ok = on_update(0, "flows", data, data) + assert :ok = on_delete(0, "flows", data) + end + + # No records should have been created + final_count = Repo.aggregate(ChangeLog, :count, :id) + assert final_count == initial_count + end + end +end diff --git a/elixir/apps/domain/test/domain/change_logs_test.exs b/elixir/apps/domain/test/domain/change_logs_test.exs new file mode 100644 index 000000000..a7d4037e5 --- /dev/null +++ b/elixir/apps/domain/test/domain/change_logs_test.exs @@ -0,0 +1,176 @@ +defmodule Domain.ChangeLogsTest do + use Domain.DataCase, async: true + import Domain.ChangeLogs + + describe "create/1" do + setup do + account = Fixtures.Accounts.create_account() + + %{account: account} + end + + test "inserts a change_log for an account", %{account: account} do + attrs = %{ + lsn: 1, + table: "resources", + op: :insert, + old_data: nil, + data: %{"account_id" => account.id, "key" => "value"}, + vsn: 1 + } + + assert {:ok, %Domain.ChangeLogs.ChangeLog{} = change_log} = create_change_log(attrs) + + assert change_log.account_id == account.id + assert change_log.op == :insert + assert change_log.old_data == nil + assert change_log.data == %{"account_id" => account.id, "key" => "value"} + end + + test "uses the 'id' field accounts table updates", %{account: account} do + attrs = %{ + lsn: 1, + table: "accounts", + op: :update, + old_data: %{"id" => account.id, "name" => "Old Name"}, + data: %{"id" => account.id, "name" => "New Name"}, + vsn: 1 + } + + assert {:ok, %Domain.ChangeLogs.ChangeLog{} = change_log} = create_change_log(attrs) + + assert change_log.account_id == account.id + assert change_log.op == :update + assert change_log.old_data == %{"id" => account.id, "name" => "Old Name"} + assert change_log.data == %{"id" => account.id, "name" => "New Name"} + end + + test "requires vsn field", %{account: account} do + attrs = %{ + lsn: 1, + table: "resources", + op: :insert, + old_data: nil, + data: %{"account_id" => account.id, "key" => "value"} + } + + assert {:error, changeset} = create_change_log(attrs) + assert changeset.valid? == false + assert changeset.errors[:vsn] == {"can't be blank", [validation: :required]} + end + + test "requires table field", %{account: account} do + attrs = %{ + lsn: 1, + op: :insert, + old_data: nil, + data: %{"account_id" => account.id, "key" => "value"}, + vsn: 1 + } + + assert {:error, changeset} = create_change_log(attrs) + assert changeset.valid? == false + assert changeset.errors[:table] == {"can't be blank", [validation: :required]} + end + + test "requires op field to be one of :insert, :update, :delete", %{account: account} do + attrs = %{ + lsn: 1, + table: "resources", + op: :invalid_op, + old_data: nil, + data: %{"account_id" => account.id, "key" => "value"}, + vsn: 1 + } + + assert {:error, changeset} = create_change_log(attrs) + assert changeset.valid? == false + assert {"is invalid", errors} = changeset.errors[:op] + assert {:validation, :inclusion} in errors + end + + test "requires correct combination of operation and data", %{account: account} do + # Invalid combination: :insert with old_data present + attrs = %{ + lsn: 1, + table: "resources", + op: :insert, + old_data: %{"account_id" => account.id, "key" => "old_value"}, + data: %{"account_id" => account.id, "key" => "new_value"}, + vsn: 1 + } + + assert {:error, changeset} = create_change_log(attrs) + assert changeset.valid? == false + assert changeset.errors[:base] == {"Invalid combination of operation and data", []} + + # Valid combination: :insert with old_data nil and data present + attrs = %{ + lsn: 1, + table: "resources", + op: :insert, + old_data: nil, + data: %{"account_id" => account.id, "key" => "new_value"}, + vsn: 1 + } + + assert {:ok, _change_log} = create_change_log(attrs) + + # Valid combination: :update with both old_data and data present + attrs = %{ + lsn: 1, + table: "resources", + op: :update, + old_data: %{"account_id" => account.id, "key" => "old_value"}, + data: %{"account_id" => account.id, "key" => "new_value"}, + vsn: 1 + } + + assert {:ok, _change_log} = create_change_log(attrs) + + # Valid combination: :delete with old_data present and data nil + attrs = %{ + lsn: 1, + table: "resources", + op: :delete, + old_data: %{"account_id" => account.id, "key" => "old_value"}, + data: nil, + vsn: 1 + } + + assert {:ok, _change_log} = create_change_log(attrs) + end + + test "requires account_id to be populated from old_data or data" do + attrs = %{ + lsn: 1, + table: "resources", + op: :insert, + old_data: nil, + data: %{"key" => "value"}, + vsn: 1 + } + + assert {:error, changeset} = create_change_log(attrs) + assert changeset.valid? == false + assert changeset.errors[:account_id] == {"can't be blank", [validation: :required]} + end + + test "requires old_data[\"account_id\"] and data[\"account_id\"] to match", %{ + account: account + } do + attrs = %{ + lsn: 1, + table: "resources", + op: :update, + old_data: %{"account_id" => account.id, "key" => "old_value"}, + data: %{"account_id" => "different_account_id", "key" => "new_value"}, + vsn: 1 + } + + assert {:error, changeset} = create_change_log(attrs) + assert changeset.valid? == false + assert changeset.errors[:base] == {"Account ID cannot be changed", []} + end + end +end diff --git a/elixir/apps/domain/test/domain/events/event_test.exs b/elixir/apps/domain/test/domain/events/event_test.exs deleted file mode 100644 index 184b720d3..000000000 --- a/elixir/apps/domain/test/domain/events/event_test.exs +++ /dev/null @@ -1,28 +0,0 @@ -defmodule Domain.Events.EventTest do - use ExUnit.Case, async: true - # import Domain.Events.Event - # alias Domain.Events.Decoder - - setup do - config = Application.fetch_env!(:domain, Domain.Events.ReplicationConnection) - table_subscriptions = config[:table_subscriptions] - - %{table_subscriptions: table_subscriptions} - end - - # TODO: WAL - # Refactor this to test ingest of all table subscriptions as structs with stringified - # keys in order to assert on the shape of the data. - # describe "ingest/2" do - # test "returns :ok for insert on all configured table subscriptions", %{ - # table_subscriptions: table_subscriptions - # } do - # for table <- table_subscriptions do - # relations = %{"1" => %{name: table, columns: []}} - # msg = %Decoder.Messages.Insert{tuple_data: {}, relation_id: "1"} - # - # assert :ok == ingest(msg, relations) - # end - # end - # end -end diff --git a/elixir/apps/domain/test/domain/events/replication_connection_test.exs b/elixir/apps/domain/test/domain/events/replication_connection_test.exs index f9a1b6ed5..b95a0773a 100644 --- a/elixir/apps/domain/test/domain/events/replication_connection_test.exs +++ b/elixir/apps/domain/test/domain/events/replication_connection_test.exs @@ -1,222 +1,153 @@ defmodule Domain.Events.ReplicationConnectionTest do - # Only one ReplicationConnection should be started in the cluster - use ExUnit.Case, async: false + use ExUnit.Case, async: true - alias Domain.Events.ReplicationConnection + import ExUnit.CaptureLog + import Domain.Events.ReplicationConnection - # Used to test callbacks, not used for live connection - @mock_state %ReplicationConnection{ - schema: "test_schema", - step: :disconnected, - publication_name: "test_pub", - replication_slot_name: "test_slot", - output_plugin: "pgoutput", - proto_version: 1, - table_subscriptions: ["accounts", "resources"], - relations: %{}, - counter: 0 - } - - # Used to test live connection setup do - {connection_opts, config} = + tables = Application.fetch_env!(:domain, Domain.Events.ReplicationConnection) - |> Keyword.pop(:connection_opts) + |> Keyword.fetch!(:table_subscriptions) - init_state = %{ - connection_opts: connection_opts, - instance: struct(Domain.Events.ReplicationConnection, config) - } + %{tables: tables} + end - child_spec = %{ - id: Domain.Events.ReplicationConnection, - start: {Domain.Events.ReplicationConnection, :start_link, [init_state]} - } + describe "on_insert/2" do + test "logs warning for unknown table" do + table = "unknown_table" + data = %{"id" => Ecto.UUID.generate(), "name" => "test"} - {:ok, pid} = - case start_supervised(child_spec) do - {:ok, pid} -> - {:ok, pid} + log_output = + capture_log(fn -> + assert :ok = on_insert(0, table, data) + end) - {:error, {:already_started, pid}} -> - {:ok, pid} + assert log_output =~ "No hook defined for insert on table unknown_table" + assert log_output =~ "Please implement Domain.Events.Hooks for this table" + end + + test "handles known tables without errors", %{tables: tables} do + for table <- tables do + data = %{"id" => Ecto.UUID.generate(), "table" => table} + + # The actual hook call might fail if the hook modules aren't available, + # but we can test that our routing logic works + try do + result = on_insert(0, table, data) + # Should either succeed or fail gracefully + assert result in [:ok, :error] or match?({:error, _}, result) + rescue + # Depending on the shape of the data we might get a function clause error. This is ok, + # as we are testing the routing logic, not the actual hook implementations. + FunctionClauseError -> :ok + end end + end - {:ok, pid: pid} - end + test "handles all configured tables", %{tables: tables} do + for table <- tables do + # Should not log warnings for configured tables + log_output = + capture_log(fn -> + try do + on_insert(0, table, %{"id" => Ecto.UUID.generate()}) + rescue + FunctionClauseError -> + # Shape of the data might not match the expected one, which is fine + :ok + end + end) - describe "handle_connect/1 callback" do - test "handle_connect initiates publication check" do - state = @mock_state - expected_query = "SELECT 1 FROM pg_publication WHERE pubname = '#{state.publication_name}'" - expected_next_state = %{state | step: :create_publication} - - assert {:query, ^expected_query, ^expected_next_state} = - ReplicationConnection.handle_connect(state) + refute log_output =~ "No hook defined for insert" + end end end - describe "handle_result/2 callback" do - test "handle_result transitions from create_publication to create_replication_slot when publication exists" do - state = %{@mock_state | step: :create_publication} - result = [%Postgrex.Result{num_rows: 1}] + describe "on_update/3" do + test "logs warning for unknown table" do + table = "unknown_table" + old_data = %{"id" => Ecto.UUID.generate(), "name" => "old"} + data = %{"id" => Ecto.UUID.generate(), "name" => "new"} - expected_query = - "SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'" + log_output = + capture_log(fn -> + assert :ok = on_update(0, table, old_data, data) + end) - expected_next_state = %{state | step: :create_replication_slot} - - assert {:query, ^expected_query, ^expected_next_state} = - ReplicationConnection.handle_result(result, state) + assert log_output =~ "No hook defined for update on table unknown_table" + assert log_output =~ "Please implement Domain.Events.Hooks for this table" end - test "handle_result transitions from create_replication_slot to start_replication_slot when slot exists" do - state = %{@mock_state | step: :create_replication_slot} - result = [%Postgrex.Result{num_rows: 1}] + test "handles known tables", %{tables: tables} do + old_data = %{"id" => Ecto.UUID.generate(), "name" => "old name"} + data = %{"id" => Ecto.UUID.generate(), "name" => "new name"} - expected_query = "SELECT 1" - expected_next_state = %{state | step: :start_replication_slot} - - assert {:query, ^expected_query, ^expected_next_state} = - ReplicationConnection.handle_result(result, state) - end - - test "handle_result transitions from start_replication_slot to streaming" do - state = %{@mock_state | step: :start_replication_slot} - result = [%Postgrex.Result{num_rows: 1}] - - expected_stream_query = - "START_REPLICATION SLOT \"#{state.replication_slot_name}\" LOGICAL 0/0 (proto_version '#{state.proto_version}', publication_names '#{state.publication_name}')" - - expected_next_state = %{state | step: :streaming} - - assert {:stream, ^expected_stream_query, [], ^expected_next_state} = - ReplicationConnection.handle_result(result, state) - end - - test "handle_result creates publication if it doesn't exist" do - state = %{@mock_state | step: :create_publication} - result = [%Postgrex.Result{num_rows: 0}] - - expected_tables = "test_schema.accounts,test_schema.resources" - expected_query = "CREATE PUBLICATION #{state.publication_name} FOR TABLE #{expected_tables}" - expected_next_state = %{state | step: :check_replication_slot} - - assert {:query, ^expected_query, ^expected_next_state} = - ReplicationConnection.handle_result(result, state) - end - - test "handle_result transitions from check_replication_slot to create_replication_slot after creating publication" do - state = %{@mock_state | step: :check_replication_slot} - result = [%Postgrex.Result{num_rows: 0}] - - expected_query = - "SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'" - - expected_next_state = %{state | step: :create_replication_slot} - - assert {:query, ^expected_query, ^expected_next_state} = - ReplicationConnection.handle_result(result, state) - end - - test "handle_result creates replication slot if it doesn't exist" do - state = %{@mock_state | step: :create_replication_slot} - result = [%Postgrex.Result{num_rows: 0}] - - expected_query = - "CREATE_REPLICATION_SLOT #{state.replication_slot_name} LOGICAL #{state.output_plugin} NOEXPORT_SNAPSHOT" - - expected_next_state = %{state | step: :start_replication_slot} - - assert {:query, ^expected_query, ^expected_next_state} = - ReplicationConnection.handle_result(result, state) + for table <- tables do + try do + result = on_update(0, table, old_data, data) + assert result in [:ok, :error] or match?({:error, _}, result) + rescue + FunctionClauseError -> + # Shape of the data might not match the expected one, which is fine + :ok + end + end end end - # In-depth decoding tests are handled in Domain.Events.DecoderTest - describe "handle_data/2" do - test "handle_data handles KeepAlive with reply :now" do - state = %{@mock_state | step: :streaming} - wal_end = 12345 + describe "on_delete/2" do + test "logs warning for unknown table" do + table = "unknown_table" + old_data = %{"id" => Ecto.UUID.generate(), "name" => "deleted"} - now = - System.os_time(:microsecond) - DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond) + log_output = + capture_log(fn -> + assert :ok = on_delete(0, table, old_data) + end) - # 100 milliseconds - grace_period = 100_000 - keepalive_data = <> - - assert {:noreply, reply, ^state} = - ReplicationConnection.handle_data(keepalive_data, state) - - assert [<>] = reply - - assert now <= clock - assert clock < now + grace_period + assert log_output =~ "No hook defined for delete on table unknown_table" + assert log_output =~ "Please implement Domain.Events.Hooks for this table" end - test "handle_data handles KeepAlive with reply :later" do - state = %{@mock_state | step: :streaming} - wal_end = 54321 + test "handles known tables", %{tables: tables} do + old_data = %{"id" => Ecto.UUID.generate(), "name" => "deleted gateway"} - keepalive_data = <> - expected_reply_message = [] - - assert {:noreply, ^expected_reply_message, ^state} = - ReplicationConnection.handle_data(keepalive_data, state) - end - - test "handle_data handles Write message" do - state = %{@mock_state | step: :streaming} - server_wal_start = 123_456_789 - server_wal_end = 987_654_321 - server_system_clock = 1_234_567_890 - message = "Hello, world!" - - write_data = - <> - - new_state = %{state | counter: state.counter + 1} - - assert {:noreply, [], ^new_state} = ReplicationConnection.handle_data(write_data, state) - end - - test "handle_data handles unknown message" do - state = %{@mock_state | step: :streaming} - unknown_data = <> - - assert {:noreply, [], ^state} = ReplicationConnection.handle_data(unknown_data, state) + for table <- tables do + try do + assert :ok = on_delete(0, table, old_data) + rescue + # Shape of the data might not match the expected one, which is fine + FunctionClauseError -> :ok + end + end end end - describe "handle_info/2" do - test "handle_info handles :shutdown message" do - state = @mock_state - assert {:disconnect, :normal} = ReplicationConnection.handle_info(:shutdown, state) - end + describe "warning message formatting" do + test "log_warning generates correct message format" do + log_output = + capture_log(fn -> + assert :ok = on_insert(0, "test_table_insert", %{}) + end) - test "handle_info handles :DOWN message from monitored process" do - state = @mock_state - monitor_ref = make_ref() - down_msg = {:DOWN, monitor_ref, :process, :some_pid, :shutdown} + assert log_output =~ "No hook defined for insert on table test_table_insert" + assert log_output =~ "Please implement Domain.Events.Hooks for this table" - assert {:disconnect, :normal} = ReplicationConnection.handle_info(down_msg, state) - end + log_output = + capture_log(fn -> + assert :ok = on_update(0, "test_table_update", %{}, %{}) + end) - test "handle_info ignores other messages" do - state = @mock_state - random_msg = {:some_other_info, "data"} + assert log_output =~ "No hook defined for update on table test_table_update" + assert log_output =~ "Please implement Domain.Events.Hooks for this table" - assert {:noreply, ^state} = ReplicationConnection.handle_info(random_msg, state) - end - end + log_output = + capture_log(fn -> + assert :ok = on_delete(0, "test_table_delete", %{}) + end) - describe "handle_disconnect/1" do - test "handle_disconnect resets step to :disconnected" do - state = %{@mock_state | step: :streaming} - expected_state = %{state | step: :disconnected} - - assert {:noreply, ^expected_state} = ReplicationConnection.handle_disconnect(state) + assert log_output =~ "No hook defined for delete on table test_table_delete" + assert log_output =~ "Please implement Domain.Events.Hooks for this table" end end end diff --git a/elixir/apps/domain/test/domain/replication/connection_test.exs b/elixir/apps/domain/test/domain/replication/connection_test.exs new file mode 100644 index 000000000..729bf1b35 --- /dev/null +++ b/elixir/apps/domain/test/domain/replication/connection_test.exs @@ -0,0 +1,308 @@ +defmodule Domain.Replication.ConnectionTest do + # Only one ReplicationConnection should be started in the cluster + use ExUnit.Case, async: false + + # Create a test module that uses the macro + defmodule TestReplicationConnection do + use Domain.Replication.Connection, + alert_threshold_ms: 5_000, + publication_name: "test_events" + end + + alias TestReplicationConnection + + # Used to test callbacks, not used for live connection + def mock_state, + do: %TestReplicationConnection{ + schema: "test_schema", + step: :disconnected, + publication_name: "test_pub", + replication_slot_name: "test_slot", + output_plugin: "pgoutput", + proto_version: 1, + table_subscriptions: ["accounts", "resources"], + relations: %{}, + counter: 0 + } + + # Used to test live connection + setup do + {connection_opts, config} = + Application.fetch_env!(:domain, Domain.Events.ReplicationConnection) + |> Keyword.pop(:connection_opts) + + init_state = %{ + connection_opts: connection_opts, + instance: struct(TestReplicationConnection, config) + } + + child_spec = %{ + id: TestReplicationConnection, + start: {TestReplicationConnection, :start_link, [init_state]} + } + + {:ok, pid} = + case start_supervised(child_spec) do + {:ok, pid} -> + {:ok, pid} + + {:error, {:already_started, pid}} -> + {:ok, pid} + end + + {:ok, pid: pid} + end + + describe "handle_connect/1 callback" do + test "handle_connect initiates publication check" do + state = mock_state() + expected_query = "SELECT 1 FROM pg_publication WHERE pubname = '#{state.publication_name}'" + expected_next_state = %{state | step: :create_publication} + + assert {:query, ^expected_query, ^expected_next_state} = + TestReplicationConnection.handle_connect(state) + end + end + + describe "handle_result/2 callback" do + test "handle_result transitions from create_publication to create_replication_slot when publication exists" do + state = %{mock_state() | step: :create_publication} + result = [%Postgrex.Result{num_rows: 1}] + + expected_query = + "SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'" + + expected_next_state = %{state | step: :create_replication_slot} + + assert {:query, ^expected_query, ^expected_next_state} = + TestReplicationConnection.handle_result(result, state) + end + + test "handle_result transitions from create_replication_slot to start_replication_slot when slot exists" do + state = %{mock_state() | step: :create_replication_slot} + result = [%Postgrex.Result{num_rows: 1}] + + expected_query = "SELECT 1" + expected_next_state = %{state | step: :start_replication_slot} + + assert {:query, ^expected_query, ^expected_next_state} = + TestReplicationConnection.handle_result(result, state) + end + + test "handle_result transitions from start_replication_slot to streaming" do + state = %{mock_state() | step: :start_replication_slot} + result = [%Postgrex.Result{num_rows: 1}] + + expected_stream_query = + "START_REPLICATION SLOT \"#{state.replication_slot_name}\" LOGICAL 0/0 (proto_version '#{state.proto_version}', publication_names '#{state.publication_name}')" + + expected_next_state = %{state | step: :streaming} + + assert {:stream, ^expected_stream_query, [], ^expected_next_state} = + TestReplicationConnection.handle_result(result, state) + end + + test "handle_result creates publication if it doesn't exist" do + state = %{mock_state() | step: :create_publication} + result = [%Postgrex.Result{num_rows: 0}] + + expected_tables = "test_schema.accounts,test_schema.resources" + expected_query = "CREATE PUBLICATION #{state.publication_name} FOR TABLE #{expected_tables}" + expected_next_state = %{state | step: :check_replication_slot} + + assert {:query, ^expected_query, ^expected_next_state} = + TestReplicationConnection.handle_result(result, state) + end + + test "handle_result transitions from check_replication_slot to create_replication_slot after creating publication" do + state = %{mock_state() | step: :check_replication_slot} + result = [%Postgrex.Result{num_rows: 0}] + + expected_query = + "SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'" + + expected_next_state = %{state | step: :create_replication_slot} + + assert {:query, ^expected_query, ^expected_next_state} = + TestReplicationConnection.handle_result(result, state) + end + + test "handle_result creates replication slot if it doesn't exist" do + state = %{mock_state() | step: :create_replication_slot} + result = [%Postgrex.Result{num_rows: 0}] + + expected_query = + "CREATE_REPLICATION_SLOT #{state.replication_slot_name} LOGICAL #{state.output_plugin} NOEXPORT_SNAPSHOT" + + expected_next_state = %{state | step: :start_replication_slot} + + assert {:query, ^expected_query, ^expected_next_state} = + TestReplicationConnection.handle_result(result, state) + end + end + + describe "handle_data/2" do + test "handle_data handles KeepAlive with reply :now" do + state = %{mock_state() | step: :streaming} + wal_end = 12345 + + now = + System.os_time(:microsecond) - DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond) + + # 100 milliseconds + grace_period = 100_000 + keepalive_data = <> + + assert {:noreply, reply, ^state} = + TestReplicationConnection.handle_data(keepalive_data, state) + + assert [<>] = reply + + assert now <= clock + assert clock < now + grace_period + end + + test "handle_data handles KeepAlive with reply :later" do + state = %{mock_state() | step: :streaming} + wal_end = 54321 + + keepalive_data = <> + expected_reply_message = [] + + assert {:noreply, ^expected_reply_message, ^state} = + TestReplicationConnection.handle_data(keepalive_data, state) + end + + test "handle_data handles Write message and increments counter" do + state = %{mock_state() | step: :streaming} + server_wal_start = 123_456_789 + server_wal_end = 987_654_321 + server_system_clock = 1_234_567_890 + message = "Hello, world!" + + write_data = + <> + + new_state = %{state | counter: state.counter + 1} + + assert {:noreply, [], ^new_state} = TestReplicationConnection.handle_data(write_data, state) + end + + test "handle_data handles unknown message" do + state = %{mock_state() | step: :streaming} + unknown_data = <> + + assert {:noreply, [], ^state} = TestReplicationConnection.handle_data(unknown_data, state) + end + + test "sends {:check_alert, lag_ms} > 5_000 ms" do + state = + %{mock_state() | step: :streaming} + |> Map.put(:lag_threshold_exceeded, false) + + server_wal_start = 123_456_789 + server_wal_end = 987_654_321 + server_system_clock = 1_234_567_890 + flags = <<0>> + lsn = <<0::32, 100::32>> + end_lsn = <<0::32, 200::32>> + + # Simulate a commit timestamp that exceeds the threshold + timestamp = + DateTime.diff(DateTime.utc_now(), ~U[2000-01-01 00:00:00Z], :microsecond) + 10_000_000 + + commit_data = <> + + write_message = + <> + + assert {:noreply, [], _state} = + TestReplicationConnection.handle_data(write_message, state) + + assert_receive({:check_alert, lag_ms}) + assert lag_ms > 5_000 + end + + test "sends {:check_alert, lag_ms} < 5_000 ms" do + state = + %{mock_state() | step: :streaming} + |> Map.put(:lag_threshold_exceeded, true) + + server_wal_start = 123_456_789 + server_wal_end = 987_654_321 + server_system_clock = 1_234_567_890 + flags = <<0>> + lsn = <<0::32, 100::32>> + end_lsn = <<0::32, 200::32>> + # Simulate a commit timestamp that is within the threshold + timestamp = + DateTime.diff(DateTime.utc_now(), ~U[2000-01-01 00:00:00Z], :microsecond) + 1_000_000 + + commit_data = <> + + write_message = + <> + + assert {:noreply, [], _state} = + TestReplicationConnection.handle_data(write_message, state) + + assert_receive({:check_alert, lag_ms}) + assert lag_ms < 5_000 + end + end + + describe "handle_info/2" do + test "handle_info handles :shutdown message" do + state = mock_state() + assert {:disconnect, :normal} = TestReplicationConnection.handle_info(:shutdown, state) + end + + test "handle_info handles :DOWN message from monitored process" do + state = mock_state() + monitor_ref = make_ref() + down_msg = {:DOWN, monitor_ref, :process, :some_pid, :shutdown} + + assert {:disconnect, :normal} = TestReplicationConnection.handle_info(down_msg, state) + end + + test "handle_info ignores other messages" do + state = mock_state() + random_msg = {:some_other_info, "data"} + + assert {:noreply, ^state} = TestReplicationConnection.handle_info(random_msg, state) + end + + test "handle_info processes lag alerts" do + state = Map.put(mock_state(), :lag_threshold_exceeded, false) + + # Test crossing threshold + assert {:noreply, %{lag_threshold_exceeded: true}} = + TestReplicationConnection.handle_info({:check_alert, 6_000}, state) + + # Test going back below threshold + state_above = %{state | lag_threshold_exceeded: true} + + assert {:noreply, %{lag_threshold_exceeded: false}} = + TestReplicationConnection.handle_info({:check_alert, 3_000}, state_above) + + # Test staying below threshold + assert {:noreply, %{lag_threshold_exceeded: false}} = + TestReplicationConnection.handle_info({:check_alert, 2_000}, state) + + # Test staying above threshold + assert {:noreply, %{lag_threshold_exceeded: true}} = + TestReplicationConnection.handle_info({:check_alert, 7_000}, state_above) + end + end + + describe "handle_disconnect/1" do + test "handle_disconnect resets step to :disconnected" do + state = %{mock_state() | step: :streaming} + expected_state = %{state | step: :disconnected} + + assert {:noreply, ^expected_state} = TestReplicationConnection.handle_disconnect(state) + end + end +end diff --git a/elixir/apps/domain/test/domain/events/decoder_test.exs b/elixir/apps/domain/test/domain/replication/decoder_test.exs similarity index 99% rename from elixir/apps/domain/test/domain/events/decoder_test.exs rename to elixir/apps/domain/test/domain/replication/decoder_test.exs index 085b0cb8f..723b77857 100644 --- a/elixir/apps/domain/test/domain/events/decoder_test.exs +++ b/elixir/apps/domain/test/domain/replication/decoder_test.exs @@ -1,8 +1,8 @@ -defmodule Domain.Events.DecoderTest do +defmodule Domain.Replication.DecoderTest do use ExUnit.Case, async: true - alias Domain.Events.Decoder - alias Domain.Events.Decoder.Messages + alias Domain.Replication.Decoder + alias Domain.Replication.Decoder.Messages @lsn_binary <<0::integer-32, 23_785_280::integer-32>> @lsn_decoded {0, 23_785_280} diff --git a/elixir/config/config.exs b/elixir/config/config.exs index 366f4fb04..88734ff93 100644 --- a/elixir/config/config.exs +++ b/elixir/config/config.exs @@ -33,6 +33,39 @@ config :domain, Domain.Repo, migration_lock: :pg_advisory_lock, start_apps_before_migration: [:ssl, :logger_json] +config :domain, Domain.ChangeLogs.ReplicationConnection, + enabled: true, + connection_opts: [ + hostname: "localhost", + port: 5432, + ssl: false, + ssl_opts: [], + parameters: [], + username: "postgres", + database: "firezone_dev", + password: "postgres" + ], + # When changing these, make sure to also: + # 1. Make appropriate changes to `Domain.Events.Event` + # 2. Add an appropriate `Domain.Events.Hooks` module + table_subscriptions: ~w[ + accounts + actor_group_memberships + actor_groups + actors + auth_identities + auth_providers + clients + flow_activities + flows + gateway_groups + gateways + policies + resource_connections + resources + tokens + ] + config :domain, Domain.Events.ReplicationConnection, enabled: true, connection_opts: [ @@ -61,8 +94,6 @@ config :domain, Domain.Events.ReplicationConnection, gateway_groups gateways policies - relay_groups - relays resource_connections resources tokens diff --git a/elixir/config/runtime.exs b/elixir/config/runtime.exs index d68e1f649..a43cae0c0 100644 --- a/elixir/config/runtime.exs +++ b/elixir/config/runtime.exs @@ -27,6 +27,21 @@ if config_env() == :prod do else: [{:hostname, env_var_to_config!(:database_host)}] ) + config :domain, Domain.ChangeLogs.ReplicationConnection, + enabled: env_var_to_config!(:background_jobs_enabled), + replication_slot_name: env_var_to_config!(:database_replication_slot_name), + publication_name: env_var_to_config!(:database_publication_name), + connection_opts: [ + hostname: env_var_to_config!(:database_host), + port: env_var_to_config!(:database_port), + ssl: env_var_to_config!(:database_ssl_enabled), + ssl_opts: env_var_to_config!(:database_ssl_opts), + parameters: env_var_to_config!(:database_parameters), + username: env_var_to_config!(:database_user), + password: env_var_to_config!(:database_password), + database: env_var_to_config!(:database_name) + ] + config :domain, Domain.Events.ReplicationConnection, enabled: env_var_to_config!(:background_jobs_enabled), replication_slot_name: env_var_to_config!(:database_replication_slot_name), diff --git a/elixir/config/test.exs b/elixir/config/test.exs index 4b7481cc2..2bfb3b1cf 100644 --- a/elixir/config/test.exs +++ b/elixir/config/test.exs @@ -20,11 +20,15 @@ config :domain, Domain.Repo, pool: Ecto.Adapters.SQL.Sandbox, queue_target: 1000 -config :domain, Domain.Events.ReplicationConnection, - publication_name: "events_test", - replication_slot_name: "events_slot_test", +config :domain, Domain.ChangeLogs.ReplicationConnection, + enabled: false, + connection_opts: [ + database: "firezone_test#{partition_suffix}" + ] + +config :domain, Domain.Events.ReplicationConnection, + enabled: false, connection_opts: [ - auto_reconnect: false, database: "firezone_test#{partition_suffix}" ] diff --git a/elixir/test.exs b/elixir/test.exs deleted file mode 100644 index 41ca8bbbf..000000000 --- a/elixir/test.exs +++ /dev/null @@ -1,515 +0,0 @@ -defmodule Domain.Events.ReplicationConnectionTest do - # Only one ReplicationConnection should be started in the cluster - use ExUnit.Case, async: false - - alias Domain.Events.Decoder.Messages - alias Domain.Events.ReplicationConnection - - # Used to test callbacks, not used for live connection - @mock_state %ReplicationConnection{ - schema: "test_schema", - connection_opts: [], - step: :disconnected, - publication_name: "test_pub", - replication_slot_name: "test_slot", - output_plugin: "pgoutput", - proto_version: 1, - # Example, adjust if needed - table_subscriptions: ["accounts", "resources"], - relations: %{} - } - - # Used to test live connection (Setup remains unchanged) - setup do - # Ensure Postgrex is started if your tests rely on it implicitly - {:ok, pid} = start_supervised(Domain.Events.ReplicationConnection) - - {:ok, pid: pid} - end - - describe "handle_connect/1 callback" do - test "handle_connect initiates publication check" do - state = @mock_state - expected_query = "SELECT 1 FROM pg_publication WHERE pubname = '#{state.publication_name}'" - expected_next_state = %{state | step: :create_publication} - - assert {:query, ^expected_query, ^expected_next_state} = - ReplicationConnection.handle_connect(state) - end - end - - describe "handle_result/2 callback" do - test "handle_result transitions from create_publication to create_replication_slot when publication exists" do - state = %{@mock_state | step: :create_publication} - # Mock a successful result for the SELECT query - result = %Postgrex.Result{ - command: :select, - columns: ["?column?"], - num_rows: 1, - rows: [[1]] - } - - expected_query = - "SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'" - - expected_next_state = %{state | step: :create_replication_slot} - - assert {:query, ^expected_query, ^expected_next_state} = - ReplicationConnection.handle_result(result, state) - end - - test "handle_result transitions from create_replication_slot to start_replication_slot when slot exists" do - state = %{@mock_state | step: :create_replication_slot} - # Mock a successful result for the SELECT query - result = %Postgrex.Result{ - command: :select, - columns: ["?column?"], - num_rows: 1, - rows: [[1]] - } - - expected_query = - "CREATE_REPLICATION_SLOT #{state.replication_slot_name} LOGICAL #{state.output_plugin} NOEXPORT_SNAPSHOT" - - expected_next_state = %{state | step: :start_replication_slot} - - expected_stream_query = - "START_REPLICATION SLOT \"#{state.replication_slot_name}\" LOGICAL 0/0 (proto_version '#{state.proto_version}', publication_names '#{state.publication_name}')" - - # Should be :streaming directly? Check impl. - expected_next_state_direct = %{state | step: :start_replication_slot} - - # Let's assume it first goes to :start_replication_slot step, then handle_result for *that* step triggers START_REPLICATION - assert {:query, _query, ^expected_next_state_direct} = - ReplicationConnection.handle_result(result, state) - end - - test "handle_result transitions from start_replication_slot to streaming" do - state = %{@mock_state | step: :start_replication_slot} - # Mock a successful result for the CREATE_REPLICATION_SLOT or preceding step - result = %Postgrex.Result{ - # Or whatever command led here - command: :create_replication_slot, - columns: nil, - num_rows: 0, - rows: nil - } - - expected_stream_query = - "START_REPLICATION SLOT \"#{state.replication_slot_name}\" LOGICAL 0/0 (proto_version '#{state.proto_version}', publication_names '#{state.publication_name}')" - - expected_next_state = %{state | step: :streaming} - - assert {:stream, ^expected_stream_query, [], ^expected_next_state} = - ReplicationConnection.handle_result(result, state) - end - - test "handle_result creates publication if it doesn't exist" do - state = %{@mock_state | step: :create_publication} - # Mock result indicating publication doesn't exist - result = %Postgrex.Result{ - command: :select, - columns: ["?column?"], - num_rows: 0, - rows: [] - } - - # Combine schema and table names correctly - expected_tables = - state.table_subscriptions - |> Enum.map(fn table -> "#{state.schema}.#{table}" end) - |> Enum.join(",") - - expected_query = "CREATE PUBLICATION #{state.publication_name} FOR TABLE #{expected_tables}" - # The original test expected the next step to be :check_replication_slot, let's keep that - expected_next_state = %{state | step: :check_replication_slot} - - assert {:query, ^expected_query, ^expected_next_state} = - ReplicationConnection.handle_result(result, state) - end - - test "handle_result transitions from check_replication_slot to create_replication_slot after creating publication" do - state = %{@mock_state | step: :check_replication_slot} - # Mock a successful result from the CREATE PUBLICATION command - result = %Postgrex.Result{ - command: :create_publication, - columns: nil, - num_rows: 0, - rows: nil - } - - expected_query = - "SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'" - - expected_next_state = %{state | step: :create_replication_slot} - - assert {:query, ^expected_query, ^expected_next_state} = - ReplicationConnection.handle_result(result, state) - end - - test "handle_result creates replication slot if it doesn't exist" do - state = %{@mock_state | step: :create_replication_slot} - # Mock result indicating slot doesn't exist - result = %Postgrex.Result{ - command: :select, - columns: ["?column?"], - num_rows: 0, - rows: [] - } - - expected_query = - "CREATE_REPLICATION_SLOT #{state.replication_slot_name} LOGICAL #{state.output_plugin} NOEXPORT_SNAPSHOT" - - expected_next_state = %{state | step: :start_replication_slot} - - assert {:query, ^expected_query, ^expected_next_state} = - ReplicationConnection.handle_result(result, state) - end - end - - # --- handle_data tests remain unchanged --- - # In-depth decoding tests are handled in Domain.Events.DecoderTest - describe "handle_data/2" do - test "handle_data handles KeepAlive with reply :now" do - state = %{@mock_state | step: :streaming} - wal_end = 12345 - # Keepalive doesn't use this field meaningfully here - server_wal_start = 0 - # Reply requested - reply_requested = 1 - - now_microseconds = - System.os_time(:microsecond) - DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond) - - # 100 milliseconds tolerance for clock check - grace_period_microseconds = 100_000 - - keepalive_data = <> - - # Expected reply format: 'r', confirmed_lsn::64, confirmed_lsn_commit::64, no_reply::8, high_priority::8, clock::64 - # The actual implementation might construct the reply differently. - # This assertion needs to match the exact binary structure returned by handle_data. - # Let's assume the implementation sends back the received wal_end as confirmed LSNs, - # and the current time. The no_reply and high_priority flags might be 0. - assert {:reply, reply_binary, ^state} = - ReplicationConnection.handle_data(keepalive_data, state) - - # Deconstruct the reply to verify its parts - assert <> = reply_binary - - assert confirmed_lsn == wal_end - # Or potentially server_wal_start? Check impl. - assert confirmed_lsn_commit == wal_end - assert no_reply == 0 - assert high_priority == 0 - assert now_microseconds <= clock < now_microseconds + grace_period_microseconds - end - - test "handle_data handles KeepAlive with reply :later" do - state = %{@mock_state | step: :streaming} - wal_end = 54321 - server_wal_start = 0 - # No reply requested - reply_requested = 0 - - keepalive_data = <> - - # When no reply is requested, it should return :noreply with no binary message - assert {:noreply, [], ^state} = - ReplicationConnection.handle_data(keepalive_data, state) - end - - test "handle_data handles Write message (XLogData)" do - state = %{@mock_state | step: :streaming} - server_wal_start = 123_456_789 - # This is the LSN of the end of the WAL data in this message - server_wal_end = 987_654_321 - # Timestamp in microseconds since PG epoch - server_system_clock = 1_234_567_890 - # Example decoded message data (e.g., a BEGIN message binary) - # This data should be passed to handle_info via send(self(), decoded_msg) - message_binary = - <<"B", @lsn_binary || <<0::64>>::binary-8, @timestamp_int || 0::integer-64, - @xid || 0::integer-32>> - - write_data = - <> - - # handle_data for 'w' should decode the message_binary and send it to self() - # It returns {:noreply, [], state} because the reply/acknowledgement happens - # via the KeepAlive ('k') mechanism. - assert {:noreply, [], ^state} = ReplicationConnection.handle_data(write_data, state) - - # Assert that the decoded message was sent to self() - # Note: This requires the test process to receive the message. - # You might need `allow_receive` or similar testing patterns if handle_data - # directly uses `send`. If it calls another function that sends, test that function. - # Let's assume handle_data directly sends for this example. - # Need some sample data defined earlier for the assertion - @lsn_binary <<0::integer-32, 23_785_280::integer-32>> - @timestamp_int 704_521_200_000 - @xid 1234 - @timestamp_decoded ~U[2022-04-29 12:20:00.000000Z] - @lsn_decoded {0, 23_785_280} - - expected_decoded_msg = %Messages.Begin{ - final_lsn: @lsn_decoded, - commit_timestamp: @timestamp_decoded, - xid: @xid - } - - assert_receive(^expected_decoded_msg) - end - - test "handle_data handles unknown message type" do - state = %{@mock_state | step: :streaming} - # Using 'q' as an example unknown type - unknown_data = <> - - # Expect it to ignore unknown types and return noreply - assert {:noreply, [], ^state} = ReplicationConnection.handle_data(unknown_data, state) - # Optionally, assert that a warning was logged if applicable - end - end - - # --- handle_info tests are CORRECTED below --- - describe "handle_info/2" do - test "handle_info updates relations on Relation message" do - state = @mock_state - - # Use the correct fields from Messages.Relation struct - relation_msg = %Messages.Relation{ - id: 101, - namespace: "public", - name: "accounts", - # Added replica_identity - replica_identity: :default, - columns: [ - %Messages.Relation.Column{ - flags: [:key], - name: "id", - type: "int4", - type_modifier: -1 - }, - %Messages.Relation.Column{ - flags: [], - name: "name", - type: "text", - type_modifier: -1 - } - ] - } - - # The state should store the relevant parts of the relation message, keyed by ID - expected_relation_data = %{ - namespace: "public", - name: "accounts", - replica_identity: :default, - columns: [ - %Messages.Relation.Column{ - flags: [:key], - name: "id", - type: "int4", - type_modifier: -1 - }, - %Messages.Relation.Column{ - flags: [], - name: "name", - type: "text", - type_modifier: -1 - } - ] - } - - expected_relations = %{101 => expected_relation_data} - expected_state = %{state | relations: expected_relations} - - assert {:noreply, ^expected_state} = ReplicationConnection.handle_info(relation_msg, state) - end - - test "handle_info returns noreply for Insert message" do - # Pre-populate state with relation info if the handler needs it - state = %{ - @mock_state - | relations: %{ - 101 => %{ - name: "accounts", - namespace: "public", - columns: [ - %Messages.Relation.Column{name: "id", type: "int4"}, - %Messages.Relation.Column{name: "name", type: "text"} - ] - } - } - } - - # Use the correct field: tuple_data (which is a tuple) - insert_msg = %Messages.Insert{relation_id: 101, tuple_data: {1, "Alice"}} - - # handle_info likely broadcasts or processes the insert, but returns noreply - assert {:noreply, ^state} = ReplicationConnection.handle_info(insert_msg, state) - # Add assertions here if handle_info is expected to send messages or call other funcs - end - - test "handle_info returns noreply for Update message" do - state = %{ - @mock_state - | relations: %{ - 101 => %{ - name: "accounts", - namespace: "public", - columns: [ - %Messages.Relation.Column{name: "id", type: "int4"}, - %Messages.Relation.Column{name: "name", type: "text"} - ] - } - } - } - - # Use the correct fields: relation_id, old_tuple_data, tuple_data, changed_key_tuple_data - update_msg = %Messages.Update{ - relation_id: 101, - # Example: only old data provided - old_tuple_data: {1, "Alice"}, - # Example: new data - tuple_data: {1, "Bob"}, - # Example: key didn't change or wasn't provided - changed_key_tuple_data: nil - } - - assert {:noreply, ^state} = ReplicationConnection.handle_info(update_msg, state) - # Add assertions for side effects (broadcasts etc.) if needed - end - - test "handle_info returns noreply for Delete message" do - state = %{ - @mock_state - | relations: %{ - 101 => %{ - name: "accounts", - namespace: "public", - columns: [ - %Messages.Relation.Column{name: "id", type: "int4"}, - %Messages.Relation.Column{name: "name", type: "text"} - ] - } - } - } - - # Use the correct fields: relation_id, old_tuple_data, changed_key_tuple_data - delete_msg = %Messages.Delete{ - relation_id: 101, - # Example: old data provided - old_tuple_data: {1, "Bob"}, - # Example: key data not provided - changed_key_tuple_data: nil - } - - assert {:noreply, ^state} = ReplicationConnection.handle_info(delete_msg, state) - # Add assertions for side effects if needed - end - - test "handle_info ignores Begin message" do - state = @mock_state - # Use correct fields: final_lsn, commit_timestamp, xid - begin_msg = %Messages.Begin{ - final_lsn: {0, 123}, - commit_timestamp: ~U[2023-01-01 10:00:00Z], - xid: 789 - } - - assert {:noreply, ^state} = ReplicationConnection.handle_info(begin_msg, state) - end - - test "handle_info ignores Commit message" do - state = @mock_state - # Use correct fields: flags, lsn, end_lsn, commit_timestamp - commit_msg = %Messages.Commit{ - flags: [], - lsn: {0, 123}, - end_lsn: {0, 456}, - commit_timestamp: ~U[2023-01-01 10:00:01Z] - } - - assert {:noreply, ^state} = ReplicationConnection.handle_info(commit_msg, state) - end - - test "handle_info ignores Origin message" do - state = @mock_state - # Use correct fields: origin_commit_lsn, name - origin_msg = %Messages.Origin{origin_commit_lsn: {0, 1}, name: "origin_name"} - assert {:noreply, ^state} = ReplicationConnection.handle_info(origin_msg, state) - end - - test "handle_info ignores Truncate message" do - state = @mock_state - # Use correct fields: number_of_relations, options, truncated_relations - truncate_msg = %Messages.Truncate{ - number_of_relations: 2, - options: [:cascade], - truncated_relations: [101, 102] - } - - assert {:noreply, ^state} = ReplicationConnection.handle_info(truncate_msg, state) - end - - test "handle_info ignores Type message" do - state = @mock_state - # Use correct fields: id, namespace, name - type_msg = %Messages.Type{id: 23, namespace: "pg_catalog", name: "int4"} - assert {:noreply, ^state} = ReplicationConnection.handle_info(type_msg, state) - end - - test "handle_info returns noreply for Unsupported message" do - state = @mock_state - unsupported_msg = %Messages.Unsupported{data: <<1, 2, 3>>} - # We cannot easily verify Logger.warning was called without mocks/capture. - assert {:noreply, ^state} = ReplicationConnection.handle_info(unsupported_msg, state) - end - - test "handle_info handles :shutdown message" do - state = @mock_state - # Expect :disconnect tuple based on common GenServer patterns for shutdown - assert {:stop, :normal, ^state} = ReplicationConnection.handle_info(:shutdown, state) - # Note: The original test asserted {:disconnect, :normal}. {:stop, :normal, state} is - # the standard GenServer return for a clean stop triggered by handle_info. Adjust - # if your implementation specifically returns :disconnect. - end - - test "handle_info handles :DOWN message from monitored process" do - state = @mock_state - monitor_ref = make_ref() - # Example DOWN message structure - down_msg = {:DOWN, monitor_ref, :process, :some_pid, :shutdown} - - # Expect the server to stop itself upon receiving DOWN for a critical process - assert {:stop, :normal, ^state} = ReplicationConnection.handle_info(down_msg, state) - # Again, adjust the expected return (:disconnect vs :stop) based on implementation. - end - - test "handle_info ignores other messages" do - state = @mock_state - random_msg = {:some_other_info, "data"} - assert {:noreply, ^state} = ReplicationConnection.handle_info(random_msg, state) - end - end - - # --- Moved handle_disconnect test to its own describe block --- - describe "handle_disconnect/1" do - test "handle_disconnect resets step to :disconnected and logs warning" do - state = %{@mock_state | step: :streaming} - expected_state = %{state | step: :disconnected} - - # Capture log to verify warning (requires ExUnit config) - log_output = - ExUnit.CaptureLog.capture_log(fn -> - assert {:noreply, ^expected_state} = ReplicationConnection.handle_disconnect(state) - end) - - assert log_output =~ "Replication connection disconnected." - # Or match the exact log message if needed - end - end -end