diff --git a/elixir/apps/domain/lib/domain/application.ex b/elixir/apps/domain/lib/domain/application.ex index 68a37dfa7..85ccd90a0 100644 --- a/elixir/apps/domain/lib/domain/application.ex +++ b/elixir/apps/domain/lib/domain/application.ex @@ -81,12 +81,25 @@ defmodule Domain.Application do end defp replication do - config = Application.fetch_env!(:domain, Domain.Events.ReplicationConnection) + connection_modules = [ + Domain.Events.ReplicationConnection, + Domain.ChangeLogs.ReplicationConnection + ] - if config[:enabled] do - [Domain.Events.ReplicationConnectionManager] - else - [] - end + # Filter out disabled replication connections + Enum.reduce(connection_modules, [], fn module, enabled -> + config = Application.fetch_env!(:domain, module) + + if config[:enabled] do + spec = %{ + id: module, + start: {Domain.Replication.Manager, :start_link, [module, []]} + } + + [spec | enabled] + else + enabled + end + end) end end diff --git a/elixir/apps/domain/lib/domain/change_logs.ex b/elixir/apps/domain/lib/domain/change_logs.ex new file mode 100644 index 000000000..8c665f0a4 --- /dev/null +++ b/elixir/apps/domain/lib/domain/change_logs.ex @@ -0,0 +1,10 @@ +defmodule Domain.ChangeLogs do + alias Domain.ChangeLogs.ChangeLog + alias Domain.Repo + + def create_change_log(attrs) do + attrs + |> ChangeLog.Changeset.changeset() + |> Repo.insert() + end +end diff --git a/elixir/apps/domain/lib/domain/change_logs/change_log.ex b/elixir/apps/domain/lib/domain/change_logs/change_log.ex new file mode 100644 index 000000000..48b7037b4 --- /dev/null +++ b/elixir/apps/domain/lib/domain/change_logs/change_log.ex @@ -0,0 +1,16 @@ +defmodule Domain.ChangeLogs.ChangeLog do + use Domain, :schema + + schema "change_logs" do + belongs_to :account, Domain.Accounts.Account + + field :lsn, :integer + field :table, :string + field :op, Ecto.Enum, values: [:insert, :update, :delete] + field :old_data, :map + field :data, :map + field :vsn, :integer + + timestamps(type: :utc_datetime_usec, updated_at: false) + end +end diff --git a/elixir/apps/domain/lib/domain/change_logs/change_log/changeset.ex b/elixir/apps/domain/lib/domain/change_logs/change_log/changeset.ex new file mode 100644 index 000000000..ee2482fc4 --- /dev/null +++ b/elixir/apps/domain/lib/domain/change_logs/change_log/changeset.ex @@ -0,0 +1,80 @@ +defmodule Domain.ChangeLogs.ChangeLog.Changeset do + use Domain, :changeset + + @fields ~w[account_id lsn table op old_data data vsn]a + + def changeset(attrs) do + %Domain.ChangeLogs.ChangeLog{} + |> cast(attrs, @fields) + |> validate_inclusion(:op, [:insert, :update, :delete]) + |> validate_correct_data_present() + |> validate_same_account() + |> put_account_id() + |> validate_required([:account_id, :lsn, :table, :op, :vsn]) + |> foreign_key_constraint(:account_id, name: :change_logs_account_id_fkey) + end + + # :insert requires old_data = nil and data != nil + # :update requires old_data != nil and data != nil + # :delete requires old_data != nil and data = nil + def validate_correct_data_present(changeset) do + op = get_field(changeset, :op) + old_data = get_field(changeset, :old_data) + data = get_field(changeset, :data) + + case {op, old_data, data} do + {:insert, nil, %{} = _data} -> + changeset + + {:update, %{} = _old_data, %{} = _data} -> + changeset + + {:delete, %{} = _old_data, nil} -> + changeset + + _ -> + add_error(changeset, :base, "Invalid combination of operation and data") + end + end + + # Add an error if data["account_id"] != old_data["account_id"] + defp validate_same_account(changeset) do + old_data = get_field(changeset, :old_data) + data = get_field(changeset, :data) + + account_id_key = account_id_field(changeset) + + if old_data && data && old_data[account_id_key] != data[account_id_key] do + add_error(changeset, :base, "Account ID cannot be changed") + else + changeset + end + end + + # Populate account_id from one of data, old_data + defp put_account_id(changeset) do + old_data = get_field(changeset, :old_data) + data = get_field(changeset, :data) + + account_id_key = account_id_field(changeset) + + account_id = + case {old_data, data} do + {nil, nil} -> nil + {_, %{^account_id_key => id}} -> id + {%{^account_id_key => id}, _} -> id + _ -> nil + end + + put_change(changeset, :account_id, account_id) + end + + # For accounts table updates, the account_id is in the "id" field + # For other tables, it is in the "account_id" field + defp account_id_field(changeset) do + case get_field(changeset, :table) do + "accounts" -> "id" + _ -> "account_id" + end + end +end diff --git a/elixir/apps/domain/lib/domain/change_logs/replication_connection.ex b/elixir/apps/domain/lib/domain/change_logs/replication_connection.ex new file mode 100644 index 000000000..63fa6aecd --- /dev/null +++ b/elixir/apps/domain/lib/domain/change_logs/replication_connection.ex @@ -0,0 +1,82 @@ +defmodule Domain.ChangeLogs.ReplicationConnection do + alias Domain.ChangeLogs + + use Domain.Replication.Connection, + # Allow up to 30 seconds of processing lag before alerting - this should be less + # than or equal to the wal_sender_timeout setting, which is typically 60s. + alert_threshold_ms: 30_000, + publication_name: "change_logs" + + # Bump this to signify a change in the audit log schema. Use with care. + @vsn 0 + + def on_insert(lsn, table, data) do + log(:insert, lsn, table, nil, data) + end + + def on_update(lsn, table, old_data, data) do + log(:update, lsn, table, old_data, data) + end + + def on_delete(lsn, table, old_data) do + log(:delete, lsn, table, old_data, nil) + end + + # Relay group tokens don't have account_ids + + defp log(_op, _lsn, "tokens", %{"type" => "relay_group"}, _data) do + :ok + end + + defp log(_op, _lsn, "tokens", _old_data, %{"type" => "relay_group"}) do + :ok + end + + defp log(_op, _lsn, "flows", _old_data, _data) do + # TODO: WAL + # Flows are not logged to the change log as they are used only to trigger side effects which + # will be removed. Remove the flows table publication when that happens. + :ok + end + + defp log(op, lsn, table, old_data, data) do + attrs = %{ + op: op, + lsn: lsn, + table: table, + old_data: old_data, + data: data, + vsn: @vsn + } + + case ChangeLogs.create_change_log(attrs) do + {:ok, _change_log} -> + :ok + + {:error, %Ecto.Changeset{errors: errors} = changeset} -> + if foreign_key_error?(errors) do + # Expected under normal operation when an account is deleted + :ok + else + Logger.warning("Failed to create change log", + errors: inspect(changeset.errors), + table: table, + op: op, + lsn: lsn, + vsn: @vsn + ) + + # TODO: WAL + # Don't ignore failures to insert change logs. Improve this after we have some + # operational experience with the data flowing in here. + :ok + end + end + end + + defp foreign_key_error?(errors) do + Enum.any?(errors, fn {field, {message, _}} -> + field == :account_id and message == "does not exist" + end) + end +end diff --git a/elixir/apps/domain/lib/domain/events/event.ex b/elixir/apps/domain/lib/domain/events/event.ex deleted file mode 100644 index ba0e832ca..000000000 --- a/elixir/apps/domain/lib/domain/events/event.ex +++ /dev/null @@ -1,329 +0,0 @@ -defmodule Domain.Events.Event do - alias Domain.Events.Decoder - alias Domain.Events.Hooks - - require Logger - - @doc """ - Ingests a WAL write message from Postgres, transforms it into an event, and sends - it to the appropriate hook module for processing. - """ - def ingest(msg, relations) do - {op, old_tuple_data, tuple_data} = extract_msg_data(msg) - {:ok, relation} = Map.fetch(relations, msg.relation_id) - - table = relation.name - old_data = zip(old_tuple_data, relation.columns) - data = zip(tuple_data, relation.columns) - - process(op, table, old_data, data) - - # TODO: WAL - # This is only for load testing. Remove this. - Domain.PubSub.broadcast("events", {op, table, old_data, data}) - end - - ############ - # accounts # - ############ - - defp process(:insert, "accounts", _old_data, data) do - Hooks.Accounts.on_insert(data) - end - - defp process(:update, "accounts", old_data, data) do - Hooks.Accounts.on_update(old_data, data) - end - - defp process(:delete, "accounts", old_data, _data) do - Hooks.Accounts.on_delete(old_data) - end - - ########################### - # actor_group_memberships # - ########################### - - defp process(:insert, "actor_group_memberships", _old_data, data) do - Hooks.ActorGroupMemberships.on_insert(data) - end - - defp process(:update, "actor_group_memberships", old_data, data) do - Hooks.ActorGroupMemberships.on_update(old_data, data) - end - - defp process(:delete, "actor_group_memberships", old_data, _data) do - Hooks.ActorGroupMemberships.on_delete(old_data) - end - - ################ - # actor_groups # - ################ - - defp process(:insert, "actor_groups", _old_data, data) do - Hooks.ActorGroups.on_insert(data) - end - - defp process(:update, "actor_groups", old_data, data) do - Hooks.ActorGroups.on_update(old_data, data) - end - - defp process(:delete, "actor_groups", old_data, _data) do - Hooks.ActorGroups.on_delete(old_data) - end - - ########## - # actors # - ########## - - defp process(:insert, "actors", _old_data, data) do - Hooks.Actors.on_insert(data) - end - - defp process(:update, "actors", old_data, data) do - Hooks.Actors.on_update(old_data, data) - end - - defp process(:delete, "actors", old_data, _data) do - Hooks.Actors.on_delete(old_data) - end - - ################### - # auth_identities # - ################### - - defp process(:insert, "auth_identities", _old_data, data) do - Hooks.AuthIdentities.on_insert(data) - end - - defp process(:update, "auth_identities", old_data, data) do - Hooks.AuthIdentities.on_update(old_data, data) - end - - defp process(:delete, "auth_identities", old_data, _data) do - Hooks.AuthIdentities.on_delete(old_data) - end - - ################## - # auth_providers # - ################## - - defp process(:insert, "auth_providers", _old_data, data) do - Hooks.AuthProviders.on_insert(data) - end - - defp process(:update, "auth_providers", old_data, data) do - Hooks.AuthProviders.on_update(old_data, data) - end - - defp process(:delete, "auth_providers", old_data, _data) do - Hooks.AuthProviders.on_delete(old_data) - end - - ########### - # clients # - ########### - - defp process(:insert, "clients", _old_data, data) do - Hooks.Clients.on_insert(data) - end - - defp process(:update, "clients", old_data, data) do - Hooks.Clients.on_update(old_data, data) - end - - defp process(:delete, "clients", old_data, _data) do - Hooks.Clients.on_delete(old_data) - end - - ################### - # flow_activities # - ################### - - defp process(:insert, "flow_activities", _old_data, data) do - Hooks.FlowActivities.on_insert(data) - end - - defp process(:update, "flow_activities", old_data, data) do - Hooks.FlowActivities.on_update(old_data, data) - end - - defp process(:delete, "flow_activities", old_data, _data) do - Hooks.FlowActivities.on_delete(old_data) - end - - ######### - # flows # - ######### - - defp process(:insert, "flows", _old_data, data) do - Hooks.Flows.on_insert(data) - end - - defp process(:update, "flows", old_data, data) do - Hooks.Flows.on_update(old_data, data) - end - - defp process(:delete, "flows", old_data, _data) do - Hooks.Flows.on_delete(old_data) - end - - ################## - # gateway_groups # - ################## - - defp process(:insert, "gateway_groups", _old_data, data) do - Hooks.GatewayGroups.on_insert(data) - end - - defp process(:update, "gateway_groups", old_data, data) do - Hooks.GatewayGroups.on_update(old_data, data) - end - - defp process(:delete, "gateway_groups", old_data, _data) do - Hooks.GatewayGroups.on_delete(old_data) - end - - ############ - # gateways # - ############ - - defp process(:insert, "gateways", _old_data, data) do - Hooks.Gateways.on_insert(data) - end - - defp process(:update, "gateways", old_data, data) do - Hooks.Gateways.on_update(old_data, data) - end - - defp process(:delete, "gateways", old_data, _data) do - Hooks.Gateways.on_delete(old_data) - end - - ############ - # policies # - ############ - - defp process(:insert, "policies", _old_data, data) do - Hooks.Policies.on_insert(data) - end - - defp process(:update, "policies", old_data, data) do - Hooks.Policies.on_update(old_data, data) - end - - defp process(:delete, "policies", old_data, _data) do - Hooks.Policies.on_delete(old_data) - end - - ################ - # relay_groups # - ################ - - defp process(:insert, "relay_groups", _old_data, data) do - Hooks.RelayGroups.on_insert(data) - end - - defp process(:update, "relay_groups", old_data, data) do - Hooks.RelayGroups.on_update(old_data, data) - end - - defp process(:delete, "relay_groups", old_data, _data) do - Hooks.RelayGroups.on_delete(old_data) - end - - ########## - # relays # - ########## - - defp process(:insert, "relays", _old_data, data) do - Hooks.Relays.on_insert(data) - end - - defp process(:update, "relays", old_data, data) do - Hooks.Relays.on_update(old_data, data) - end - - defp process(:delete, "relays", old_data, _data) do - Hooks.Relays.on_delete(old_data) - end - - ######################## - # resource_connections # - ######################## - - defp process(:insert, "resource_connections", _old_data, data) do - Hooks.ResourceConnections.on_insert(data) - end - - defp process(:update, "resource_connections", old_data, data) do - Hooks.ResourceConnections.on_update(old_data, data) - end - - defp process(:delete, "resource_connections", old_data, _data) do - Hooks.ResourceConnections.on_delete(old_data) - end - - ############# - # resources # - ############# - - defp process(:insert, "resources", _old_data, data) do - Hooks.Resources.on_insert(data) - end - - defp process(:update, "resources", old_data, data) do - Hooks.Resources.on_update(old_data, data) - end - - defp process(:delete, "resources", old_data, _data) do - Hooks.Resources.on_delete(old_data) - end - - ########## - # tokens # - ########## - - defp process(:insert, "tokens", _old_data, data) do - Hooks.Tokens.on_insert(data) - end - - defp process(:update, "tokens", old_data, data) do - Hooks.Tokens.on_update(old_data, data) - end - - defp process(:delete, "tokens", old_data, _data) do - Hooks.Tokens.on_delete(old_data) - end - - ############# - # CATCH-ALL # - ############# - - defp process(op, table, _old_data, _data) do - Logger.warning("Unhandled event type!", op: op, table: table) - - :ok - end - - defp extract_msg_data(%Decoder.Messages.Insert{tuple_data: data}) do - {:insert, nil, data} - end - - defp extract_msg_data(%Decoder.Messages.Update{old_tuple_data: old, tuple_data: data}) do - {:update, old, data} - end - - defp extract_msg_data(%Decoder.Messages.Delete{old_tuple_data: old}) do - {:delete, old, nil} - end - - defp zip(nil, _), do: nil - - defp zip(tuple_data, columns) do - tuple_data - |> Tuple.to_list() - |> Enum.zip(columns) - |> Map.new(fn {value, column} -> {column.name, value} end) - |> Enum.into(%{}) - end -end diff --git a/elixir/apps/domain/lib/domain/events/hooks/flows.ex b/elixir/apps/domain/lib/domain/events/hooks/flows.ex index c6fbcac3f..d0806ab03 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/flows.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/flows.ex @@ -1,3 +1,5 @@ +# TODO: WAL +# Move side-effects from flows to state table in clients and gateways defmodule Domain.Events.Hooks.Flows do @behaviour Domain.Events.Hooks alias Domain.PubSub diff --git a/elixir/apps/domain/lib/domain/events/replication_connection.ex b/elixir/apps/domain/lib/domain/events/replication_connection.ex index afb869f88..46e913c84 100644 --- a/elixir/apps/domain/lib/domain/events/replication_connection.ex +++ b/elixir/apps/domain/lib/domain/events/replication_connection.ex @@ -1,313 +1,67 @@ -# CREDIT: https://github.com/supabase/realtime/blob/main/lib/realtime/tenants/replication_connection.ex defmodule Domain.Events.ReplicationConnection do - @moduledoc """ - Receives WAL events from PostgreSQL and broadcasts them where they need to go. + alias Domain.Events.Hooks - Generally, we only want to start one of these connections per cluster in order - to obtain a serial stream of the WAL. We can then fanout these events to the - appropriate consumers. + use Domain.Replication.Connection, + # Allow up to 5 seconds of lag before alerting + alert_threshold_ms: 5_000, + publication_name: "events" - The ReplicationConnection is started with a durable slot so that whatever data we - fail to acknowledge is retained in the slot on the server's disk. The server will - then send us the data when we reconnect. This is important because we want to - ensure that we don't lose any WAL data if we disconnect or crash, such as during a deploy. - - The WAL data we receive is sent only once a COMMIT completes on the server. So even though - COMMIT is one of the message types we receive here, we can safely ignore it and process - insert/update/delete messages one-by-one in this module as we receive them. - """ - use Postgrex.ReplicationConnection require Logger - import Domain.Events.Protocol - import Domain.Events.Decoder + @tables_to_hooks %{ + "accounts" => Hooks.Accounts, + "actor_group_memberships" => Hooks.ActorGroupMemberships, + "actor_groups" => Hooks.ActorGroups, + "actors" => Hooks.Actors, + "auth_identities" => Hooks.AuthIdentities, + "auth_providers" => Hooks.AuthProviders, + "clients" => Hooks.Clients, + "flow_activities" => Hooks.FlowActivities, + "flows" => Hooks.Flows, + "gateway_groups" => Hooks.GatewayGroups, + "gateways" => Hooks.Gateways, + "policies" => Hooks.Policies, + "resource_connections" => Hooks.ResourceConnections, + "resources" => Hooks.Resources, + "tokens" => Hooks.Tokens + } - alias Domain.Events.Event - alias Domain.Events.Decoder - alias Domain.Events.Protocol.{KeepAlive, Write} + def on_insert(_lsn, table, data) do + hook = Map.get(@tables_to_hooks, table) - @status_log_interval :timer.minutes(5) - - @type t :: %__MODULE__{ - schema: String.t(), - step: - :disconnected - | :check_publication - | :create_publication - | :check_replication_slot - | :create_slot - | :start_replication_slot - | :streaming, - publication_name: String.t(), - replication_slot_name: String.t(), - output_plugin: String.t(), - proto_version: integer(), - table_subscriptions: list(), - relations: map(), - counter: integer() - } - defstruct schema: "public", - step: :disconnected, - publication_name: "events", - replication_slot_name: "events_slot", - output_plugin: "pgoutput", - proto_version: 1, - table_subscriptions: [], - relations: %{}, - counter: 0 - - def start_link(%{instance: %__MODULE__{} = instance, connection_opts: connection_opts}) do - # Start only one ReplicationConnection in the cluster. - opts = connection_opts ++ [name: {:global, __MODULE__}] - - Postgrex.ReplicationConnection.start_link(__MODULE__, instance, opts) + if hook do + hook.on_insert(data) + else + log_warning(:insert, table) + :ok + end end - @impl true - def init(state) do - {:ok, state} + def on_update(_lsn, table, old_data, data) do + hook = Map.get(@tables_to_hooks, table) + + if hook do + hook.on_update(old_data, data) + else + log_warning(:update, table) + :ok + end end - @doc """ - Called when we make a successful connection to the PostgreSQL server. - """ + def on_delete(_lsn, table, old_data) do + hook = Map.get(@tables_to_hooks, table) - @impl true - def handle_connect(state) do - query = "SELECT 1 FROM pg_publication WHERE pubname = '#{state.publication_name}'" - {:query, query, %{state | step: :create_publication}} + if hook do + hook.on_delete(old_data) + else + log_warning(:delete, table) + :ok + end end - @doc """ - Generic callback that handles replies to the queries we send. - - We use a simple state machine to issue queries one at a time to Postgres in order to: - - 1. Check if the publication exists - 2. Check if the replication slot exists - 3. Create the publication if it doesn't exist - 4. Create the replication slot if it doesn't exist - 5. Start the replication slot - 6. Start streaming data from the replication slot - """ - - @impl true - - def handle_result( - [%Postgrex.Result{num_rows: 1}], - %__MODULE__{step: :create_publication} = state - ) do - query = - "SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'" - - {:query, query, %{state | step: :create_replication_slot}} - end - - def handle_result( - [%Postgrex.Result{num_rows: 1}], - %__MODULE__{step: :create_replication_slot} = state - ) do - {:query, "SELECT 1", %{state | step: :start_replication_slot}} - end - - def handle_result([%Postgrex.Result{}], %__MODULE__{step: :start_replication_slot} = state) do - Logger.info("Starting replication slot", state: inspect(state)) - - # Start logging regular status updates - send(self(), :interval_logger) - - query = - "START_REPLICATION SLOT \"#{state.replication_slot_name}\" LOGICAL 0/0 (proto_version '#{state.proto_version}', publication_names '#{state.publication_name}')" - - {:stream, query, [], %{state | step: :streaming}} - end - - def handle_result( - [%Postgrex.Result{num_rows: 0}], - %__MODULE__{step: :create_publication} = state - ) do - tables = - state.table_subscriptions - |> Enum.map_join(",", fn table -> "#{state.schema}.#{table}" end) - - query = "CREATE PUBLICATION #{state.publication_name} FOR TABLE #{tables}" - {:query, query, %{state | step: :check_replication_slot}} - end - - def handle_result([%Postgrex.Result{}], %__MODULE__{step: :check_replication_slot} = state) do - query = - "SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'" - - {:query, query, %{state | step: :create_replication_slot}} - end - - def handle_result( - [%Postgrex.Result{num_rows: 0}], - %__MODULE__{step: :create_replication_slot} = state - ) do - query = - "CREATE_REPLICATION_SLOT #{state.replication_slot_name} LOGICAL #{state.output_plugin} NOEXPORT_SNAPSHOT" - - {:query, query, %{state | step: :start_replication_slot}} - end - - @doc """ - Called when we receive a message from the PostgreSQL server. - - We handle the following messages: - 1. KeepAlive: A message sent by PostgreSQL to keep the connection alive and also acknowledge - processed data by responding with the current WAL position. - 2. Write: A message containing a WAL event - the actual data we are interested in. - 3. Unknown: Any other message that we don't know how to handle - we log and ignore it. - - For the KeepAlive message, we respond immediately with the current WAL position. Note: it is expected - that we receive many more of these messages than expected. That is because the rate at which the server - sends these messages scales proportionally to the number of Write messages it sends. - - For the Write message, we send broadcast for each message one-by-one as we receive it. This is important - because the WAL stream from Postgres is ordered; if we reply to a Keepalive advancing the WAL position, - we should have already processed all the messages up to that point. - """ - - @impl true - - def handle_data(data, state) when is_keep_alive(data) do - %KeepAlive{reply: reply, wal_end: wal_end} = parse(data) - - wal_end = wal_end + 1 - - message = - case reply do - :now -> standby_status(wal_end, wal_end, wal_end, reply) - :later -> hold() - end - - {:noreply, message, state} - end - - def handle_data(data, state) when is_write(data) do - %Write{message: message} = parse(data) - - message - |> decode_message() - |> handle_message(%{state | counter: state.counter + 1}) - end - - def handle_data(data, state) do - Logger.error("Unknown WAL message received!", - data: inspect(data), - state: inspect(state) + defp log_warning(op, table) do + Logger.warning( + "No hook defined for #{op} on table #{table}. Please implement Domain.Events.Hooks for this table." ) - - {:noreply, [], state} - end - - # Handles messages received: - # - # 1. Insert/Update/Delete - send to Event.ingest/2 for further processing - # 2. Relation messages - store the relation data in our state so we can use it later - # to associate column names etc with the data we receive. In practice, we'll always - # see a Relation message before we see any data for that relation. - # 3. Begin/Commit/Origin/Truncate/Type - we ignore these messages for now - # 4. Graceful shutdown - we respond with {:disconnect, :normal} to - # indicate that we are shutting down gracefully and prevent auto reconnecting. - defp handle_message( - %Decoder.Messages.Relation{ - id: id, - namespace: namespace, - name: name, - columns: columns - }, - state - ) do - relation = %{ - namespace: namespace, - name: name, - columns: columns - } - - {:noreply, [], %{state | relations: Map.put(state.relations, id, relation)}} - end - - defp handle_message(%Decoder.Messages.Insert{} = msg, state) do - :ok = Event.ingest(msg, state.relations) - {:noreply, [], state} - end - - defp handle_message(%Decoder.Messages.Update{} = msg, state) do - :ok = Event.ingest(msg, state.relations) - {:noreply, [], state} - end - - defp handle_message(%Decoder.Messages.Delete{} = msg, state) do - :ok = Event.ingest(msg, state.relations) - {:noreply, [], state} - end - - defp handle_message(%Decoder.Messages.Begin{}, state) do - {:noreply, [], state} - end - - defp handle_message(%Decoder.Messages.Commit{}, state) do - {:noreply, [], state} - end - - defp handle_message(%Decoder.Messages.Origin{}, state) do - {:noreply, [], state} - end - - defp handle_message(%Decoder.Messages.Truncate{}, state) do - {:noreply, [], state} - end - - defp handle_message(%Decoder.Messages.Type{}, state) do - {:noreply, [], state} - end - - defp handle_message(%Decoder.Messages.Unsupported{data: data}, state) do - Logger.warning("Unsupported message received", - data: inspect(data), - counter: state.counter - ) - - {:noreply, [], state} - end - - @impl true - - def handle_info(:interval_logger, state) do - Logger.info("Processed #{state.counter} write messages from the WAL stream") - - Process.send_after(self(), :interval_logger, @status_log_interval) - - {:noreply, state} - end - - def handle_info(:shutdown, _), do: {:disconnect, :normal} - def handle_info({:DOWN, _, :process, _, _}, _), do: {:disconnect, :normal} - - def handle_info(_, state), do: {:noreply, state} - - @doc """ - Called when the connection is disconnected unexpectedly. - - This will happen if: - 1. Postgres is restarted such as during a maintenance window - 2. The connection is closed by the server due to our failure to acknowledge - Keepalive messages in a timely manner - 3. The connection is cut due to a network error - 4. The ReplicationConnection process crashes or is killed abruptly for any reason - 5. Potentially during a deploy if the connection is not closed gracefully. - - Our Supervisor will restart this process automatically so this is not an error. - """ - - @impl true - def handle_disconnect(state) do - Logger.info("Replication connection disconnected", - counter: state.counter - ) - - {:noreply, %{state | step: :disconnected}} end end diff --git a/elixir/apps/domain/lib/domain/replication/connection.ex b/elixir/apps/domain/lib/domain/replication/connection.ex new file mode 100644 index 000000000..3ec398b4c --- /dev/null +++ b/elixir/apps/domain/lib/domain/replication/connection.ex @@ -0,0 +1,476 @@ +# CREDIT: https://github.com/supabase/realtime/blob/main/lib/realtime/tenants/replication_connection.ex +defmodule Domain.Replication.Connection do + @moduledoc """ + Receives WAL events from PostgreSQL and broadcasts them where they need to go. + + The ReplicationConnection is started with a durable slot so that whatever data we + fail to acknowledge is retained in the slot on the server's disk. The server will + then send us the data when we reconnect. This is important because we want to + ensure that we don't lose any WAL data if we disconnect or crash, such as during a deploy. + + The WAL data we receive is sent only once a COMMIT completes on the server. So even though + COMMIT is one of the message types we receive here, we can safely ignore it and process + insert/update/delete messages one-by-one in this module as we receive them. + + ## Usage + + defmodule MyApp.ReplicationConnection do + use Domain.Replication.Connection, + alert_threshold_ms: 30_000, + publication_name: "my_events" + end + + ## Options + + * `:alert_threshold_ms` - How long to allow the WAL stream to lag before logging a warning (default: 5000) + * `:publication_name` - Name of the PostgreSQL publication (default: "events") + """ + + defmacro __using__(opts \\ []) do + # Compose all the quote blocks without nesting + [ + basic_setup(), + struct_and_constants(opts), + connection_functions(), + query_handlers(), + data_handlers(), + message_handlers(), + transaction_handlers(), + ignored_message_handlers(), + utility_functions(), + info_handlers(opts), + default_callbacks() + ] + end + + # Extract basic imports and aliases + defp basic_setup do + quote do + use Postgrex.ReplicationConnection + require Logger + require OpenTelemetry.Tracer + + import Domain.Replication.Protocol + import Domain.Replication.Decoder + + alias Domain.Replication.Decoder + alias Domain.Replication.Protocol.{KeepAlive, Write} + end + end + + # Extract struct definition and constants + defp struct_and_constants(opts) do + quote bind_quoted: [opts: opts] do + # Only these two are configurable + @alert_threshold_ms Keyword.fetch!(opts, :alert_threshold_ms) + @publication_name Keyword.fetch!(opts, :publication_name) + + # Everything else uses defaults + @status_log_interval :timer.minutes(5) + @replication_slot_name "#{@publication_name}_slot" + @schema "public" + @output_plugin "pgoutput" + @proto_version 1 + + @type t :: %__MODULE__{ + schema: String.t(), + step: + :disconnected + | :check_publication + | :create_publication + | :check_replication_slot + | :create_slot + | :start_replication_slot + | :streaming, + publication_name: String.t(), + replication_slot_name: String.t(), + output_plugin: String.t(), + proto_version: integer(), + table_subscriptions: list(), + relations: map(), + counter: integer() + } + + defstruct schema: @schema, + step: :disconnected, + publication_name: @publication_name, + replication_slot_name: @replication_slot_name, + output_plugin: @output_plugin, + proto_version: @proto_version, + table_subscriptions: [], + relations: %{}, + counter: 0 + end + end + + # Extract connection setup functions + defp connection_functions do + quote do + def start_link(%{instance: %__MODULE__{} = instance, connection_opts: connection_opts}) do + opts = connection_opts ++ [name: {:global, __MODULE__}] + Postgrex.ReplicationConnection.start_link(__MODULE__, instance, opts) + end + + @impl true + def init(state) do + {:ok, Map.put(state, :lag_threshold_exceeded, false)} + end + + @doc """ + Called when we make a successful connection to the PostgreSQL server. + """ + @impl true + def handle_connect(state) do + query = "SELECT 1 FROM pg_publication WHERE pubname = '#{state.publication_name}'" + {:query, query, %{state | step: :create_publication}} + end + + @doc """ + Called when the connection is disconnected unexpectedly. + + This will happen if: + 1. Postgres is restarted such as during a maintenance window + 2. The connection is closed by the server due to our failure to acknowledge + Keepalive messages in a timely manner + 3. The connection is cut due to a network error + 4. The ReplicationConnection process crashes or is killed abruptly for any reason + 5. Potentially during a deploy if the connection is not closed gracefully. + + Our Supervisor will restart this process automatically so this is not an error. + """ + @impl true + def handle_disconnect(state) do + Logger.info("#{__MODULE__}: Replication connection disconnected", + counter: state.counter + ) + + {:noreply, %{state | step: :disconnected}} + end + end + end + + # Extract query result handlers + defp query_handlers do + quote do + @doc """ + Generic callback that handles replies to the queries we send. + + We use a simple state machine to issue queries one at a time to Postgres in order to: + + 1. Check if the publication exists + 2. Check if the replication slot exists + 3. Create the publication if it doesn't exist + 4. Create the replication slot if it doesn't exist + 5. Start the replication slot + 6. Start streaming data from the replication slot + """ + @impl true + def handle_result( + [%Postgrex.Result{num_rows: 1}], + %__MODULE__{step: :create_publication} = state + ) do + query = + "SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'" + + {:query, query, %{state | step: :create_replication_slot}} + end + + def handle_result( + [%Postgrex.Result{num_rows: 1}], + %__MODULE__{step: :create_replication_slot} = state + ) do + {:query, "SELECT 1", %{state | step: :start_replication_slot}} + end + + def handle_result([%Postgrex.Result{}], %__MODULE__{step: :start_replication_slot} = state) do + Logger.info("Starting replication slot #{state.replication_slot_name}", + state: inspect(state) + ) + + # Start logging regular status updates + send(self(), :interval_logger) + + query = + "START_REPLICATION SLOT \"#{state.replication_slot_name}\" LOGICAL 0/0 (proto_version '#{state.proto_version}', publication_names '#{state.publication_name}')" + + {:stream, query, [], %{state | step: :streaming}} + end + + def handle_result( + [%Postgrex.Result{num_rows: 0}], + %__MODULE__{step: :create_publication} = state + ) do + tables = + state.table_subscriptions + |> Enum.map_join(",", fn table -> "#{state.schema}.#{table}" end) + + query = "CREATE PUBLICATION #{state.publication_name} FOR TABLE #{tables}" + {:query, query, %{state | step: :check_replication_slot}} + end + + def handle_result([%Postgrex.Result{}], %__MODULE__{step: :check_replication_slot} = state) do + query = + "SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'" + + {:query, query, %{state | step: :create_replication_slot}} + end + + def handle_result( + [%Postgrex.Result{num_rows: 0}], + %__MODULE__{step: :create_replication_slot} = state + ) do + query = + "CREATE_REPLICATION_SLOT #{state.replication_slot_name} LOGICAL #{state.output_plugin} NOEXPORT_SNAPSHOT" + + {:query, query, %{state | step: :start_replication_slot}} + end + end + end + + # Extract data handling functions + defp data_handlers do + quote do + @doc """ + Called when we receive a message from the PostgreSQL server. + + We handle the following messages: + 1. KeepAlive: A message sent by PostgreSQL to keep the connection alive and also acknowledge + processed data by responding with the current WAL position. + 2. Write: A message containing a WAL event - the actual data we are interested in. + 3. Unknown: Any other message that we don't know how to handle - we log and ignore it. + + For the KeepAlive message, we respond immediately with the current WAL position. Note: it is expected + that we receive many more of these messages than expected. That is because the rate at which the server + sends these messages scales proportionally to the number of Write messages it sends. + + For the Write message, we send broadcast for each message one-by-one as we receive it. This is important + because the WAL stream from Postgres is ordered; if we reply to a Keepalive advancing the WAL position, + we should have already processed all the messages up to that point. + """ + @impl true + def handle_data(data, state) when is_keep_alive(data) do + %KeepAlive{reply: reply, wal_end: wal_end} = parse(data) + + wal_end = wal_end + 1 + + message = + case reply do + :now -> standby_status(wal_end, wal_end, wal_end, reply) + :later -> hold() + end + + {:noreply, message, state} + end + + def handle_data(data, state) when is_write(data) do + OpenTelemetry.Tracer.with_span "#{__MODULE__}.handle_data/2" do + %Write{server_wal_end: server_wal_end, message: message} = parse(data) + + message + |> decode_message() + |> handle_message(server_wal_end, %{state | counter: state.counter + 1}) + end + end + + def handle_data(data, state) do + Logger.error("#{__MODULE__}: Unknown WAL message received!", + data: inspect(data), + state: inspect(state) + ) + + {:noreply, [], state} + end + end + end + + # Extract core message handling functions + defp message_handlers do + quote do + # Handles messages received: + # + # 1. Insert/Update/Delete/Begin/Commit - send to appropriate hook + # 2. Relation messages - store the relation data in our state so we can use it later + # to associate column names etc with the data we receive. In practice, we'll always + # see a Relation message before we see any data for that relation. + # 3. Origin/Truncate/Type - we ignore these messages for now + # 4. Graceful shutdown - we respond with {:disconnect, :normal} to + # indicate that we are shutting down gracefully and prevent auto reconnecting. + defp handle_message( + %Decoder.Messages.Relation{ + id: id, + namespace: namespace, + name: name, + columns: columns + }, + _server_wal_end, + state + ) do + relation = %{ + namespace: namespace, + name: name, + columns: columns + } + + {:noreply, [], %{state | relations: Map.put(state.relations, id, relation)}} + end + + defp handle_message(%Decoder.Messages.Insert{} = msg, server_wal_end, state) do + {op, table, _old_data, data} = transform(msg, state.relations) + :ok = on_insert(server_wal_end, table, data) + {:noreply, [], state} + end + + defp handle_message(%Decoder.Messages.Update{} = msg, server_wal_end, state) do + {op, table, old_data, data} = transform(msg, state.relations) + :ok = on_update(server_wal_end, table, old_data, data) + {:noreply, [], state} + end + + defp handle_message(%Decoder.Messages.Delete{} = msg, server_wal_end, state) do + {op, table, old_data, _data} = transform(msg, state.relations) + :ok = on_delete(server_wal_end, table, old_data) + {:noreply, [], state} + end + end + end + + # Extract transaction and ignored message handlers + defp transaction_handlers do + quote do + defp handle_message(%Decoder.Messages.Begin{} = msg, server_wal_end, state) do + {:noreply, [], state} + end + + defp handle_message( + %Decoder.Messages.Commit{commit_timestamp: commit_timestamp} = msg, + _server_wal_end, + state + ) do + # Since we receive a commit for each operation and we process each operation + # one-by-one, we can use the commit timestamp to check if we are lagging behind. + lag_ms = DateTime.diff(commit_timestamp, DateTime.utc_now(), :millisecond) + send(self(), {:check_alert, lag_ms}) + + {:noreply, [], state} + end + end + end + + # Extract handlers for ignored message types + defp ignored_message_handlers do + quote do + # These messages are not relevant for our use case, so we ignore them. + defp handle_message(%Decoder.Messages.Origin{}, _server_wal_end, state) do + {:noreply, [], state} + end + + defp handle_message(%Decoder.Messages.Truncate{}, _server_wal_end, state) do + {:noreply, [], state} + end + + defp handle_message(%Decoder.Messages.Type{}, _server_wal_end, state) do + {:noreply, [], state} + end + + defp handle_message(%Decoder.Messages.Unsupported{data: data}, _server_wal_end, state) do + Logger.warning("#{__MODULE__}: Unsupported message received", + data: inspect(data), + counter: state.counter + ) + + {:noreply, [], state} + end + end + end + + # Extract data transformation utilities + defp utility_functions do + quote do + defp transform(msg, relations) do + {op, old_tuple_data, tuple_data} = extract_msg_data(msg) + {:ok, relation} = Map.fetch(relations, msg.relation_id) + table = relation.name + old_data = zip(old_tuple_data, relation.columns) + data = zip(tuple_data, relation.columns) + + { + op, + table, + old_data, + data + } + end + + defp extract_msg_data(%Decoder.Messages.Insert{tuple_data: data}) do + {:insert, nil, data} + end + + defp extract_msg_data(%Decoder.Messages.Update{old_tuple_data: old, tuple_data: data}) do + {:update, old, data} + end + + defp extract_msg_data(%Decoder.Messages.Delete{old_tuple_data: old}) do + {:delete, old, nil} + end + + defp zip(nil, _), do: nil + + defp zip(tuple_data, columns) do + tuple_data + |> Tuple.to_list() + |> Enum.zip(columns) + |> Map.new(fn {value, column} -> {column.name, value} end) + |> Enum.into(%{}) + end + end + end + + # Extract info handlers + defp info_handlers(opts) do + quote bind_quoted: [opts: opts] do + @alert_threshold_ms Keyword.fetch!(opts, :alert_threshold_ms) + @status_log_interval :timer.minutes(5) + + @impl true + # Log only once when crossing the threshold + def handle_info({:check_alert, lag_ms}, %{lag_threshold_exceeded: false} = state) + when lag_ms >= @alert_threshold_ms do + Logger.warning("#{__MODULE__}: Processing lag exceeds threshold", lag_ms: lag_ms) + {:noreply, %{state | lag_threshold_exceeded: true}} + end + + def handle_info({:check_alert, lag_ms}, %{lag_threshold_exceeded: true} = state) + when lag_ms < @alert_threshold_ms do + Logger.info("#{__MODULE__}: Processing lag is back below threshold", lag_ms: lag_ms) + {:noreply, %{state | lag_threshold_exceeded: false}} + end + + def handle_info(:interval_logger, state) do + Logger.info( + "#{__MODULE__}: Processed #{state.counter} write messages from the WAL stream" + ) + + Process.send_after(self(), :interval_logger, @status_log_interval) + + {:noreply, state} + end + + def handle_info(:shutdown, _), do: {:disconnect, :normal} + def handle_info({:DOWN, _, :process, _, _}, _), do: {:disconnect, :normal} + + def handle_info(_, state), do: {:noreply, state} + end + end + + # Extract default callback implementations + defp default_callbacks do + quote do + # Default implementations for required callbacks - modules using this should implement these + def on_insert(_lsn, _table, _data), do: :ok + def on_update(_lsn, _table, _old_data, _data), do: :ok + def on_delete(_lsn, _table, _old_data), do: :ok + + defoverridable on_insert: 3, on_update: 4, on_delete: 3 + end + end +end diff --git a/elixir/apps/domain/lib/domain/events/decoder.ex b/elixir/apps/domain/lib/domain/replication/decoder.ex similarity index 98% rename from elixir/apps/domain/lib/domain/events/decoder.ex rename to elixir/apps/domain/lib/domain/replication/decoder.ex index e2660f7ce..62a4d7456 100644 --- a/elixir/apps/domain/lib/domain/events/decoder.ex +++ b/elixir/apps/domain/lib/domain/replication/decoder.ex @@ -1,5 +1,5 @@ # CREDIT: https://github.com/supabase/realtime/blob/main/lib/realtime/adapters/postgres/decoder.ex -defmodule Domain.Events.Decoder do +defmodule Domain.Replication.Decoder do @moduledoc """ Functions for decoding different types of logical replication messages. """ @@ -148,7 +148,7 @@ defmodule Domain.Events.Decoder do Unsupported } - alias Domain.Events.OidDatabase + alias Domain.Replication.OidDatabase @doc """ Parses logical replication messages from Postgres @@ -156,7 +156,7 @@ defmodule Domain.Events.Decoder do ## Examples iex> decode_message(<<73, 0, 0, 96, 0, 78, 0, 2, 116, 0, 0, 0, 3, 98, 97, 122, 116, 0, 0, 0, 3, 53, 54, 48>>) - %Domain.Events.Decoder.Messages.Insert{relation_id: 24576, tuple_data: {"baz", "560"}} + %Domain.Replication.Decoder.Messages.Insert{relation_id: 24576, tuple_data: {"baz", "560"}} """ def decode_message(message) when is_binary(message) do diff --git a/elixir/apps/domain/lib/domain/events/replication_connection_manager.ex b/elixir/apps/domain/lib/domain/replication/manager.ex similarity index 60% rename from elixir/apps/domain/lib/domain/events/replication_connection_manager.ex rename to elixir/apps/domain/lib/domain/replication/manager.ex index d1541f707..e4da21786 100644 --- a/elixir/apps/domain/lib/domain/events/replication_connection_manager.ex +++ b/elixir/apps/domain/lib/domain/replication/manager.ex @@ -1,4 +1,4 @@ -defmodule Domain.Events.ReplicationConnectionManager do +defmodule Domain.Replication.Manager do @moduledoc """ Manages the Postgrex.ReplicationConnection to ensure that we always have one running to prevent unbounded growth of the WAL log and ensure we are processing events. @@ -12,22 +12,22 @@ defmodule Domain.Events.ReplicationConnectionManager do # but not too long to avoid broadcasting needed events. @max_retries 10 - def start_link(opts) do - GenServer.start_link(__MODULE__, opts, name: __MODULE__) + def start_link(connection_module, opts) do + GenServer.start_link(__MODULE__, connection_module, opts) end @impl true - def init(_opts) do - send(self(), :connect) + def init(connection_module) do + send(self(), {:connect, connection_module}) {:ok, %{retries: 0}} end @impl true - def handle_info(:connect, %{retries: retries} = state) do - Process.send_after(self(), :connect, @retry_interval) + def handle_info({:connect, connection_module}, %{retries: retries} = state) do + Process.send_after(self(), {:connect, connection_module}, @retry_interval) - case Domain.Events.ReplicationConnection.start_link(replication_child_spec()) do + case connection_module.start_link(replication_child_spec(connection_module)) do {:ok, _pid} -> # Our process won {:noreply, %{state | retries: 0}} @@ -38,7 +38,7 @@ defmodule Domain.Events.ReplicationConnectionManager do {:error, reason} -> if retries < @max_retries do - Logger.info("Failed to start replication connection", + Logger.info("Failed to start replication connection #{connection_module}", retries: retries, max_retries: @max_retries, reason: inspect(reason) @@ -47,7 +47,7 @@ defmodule Domain.Events.ReplicationConnectionManager do {:noreply, %{state | retries: retries + 1}} else Logger.error( - "Failed to start replication connection after #{@max_retries} attempts, giving up!", + "Failed to start replication connection #{connection_module} after #{@max_retries} attempts, giving up!", reason: inspect(reason) ) @@ -57,14 +57,14 @@ defmodule Domain.Events.ReplicationConnectionManager do end end - defp replication_child_spec do + def replication_child_spec(connection_module) do {connection_opts, config} = - Application.fetch_env!(:domain, Domain.Events.ReplicationConnection) + Application.fetch_env!(:domain, connection_module) |> Keyword.pop(:connection_opts) %{ connection_opts: connection_opts, - instance: struct(Domain.Events.ReplicationConnection, config) + instance: struct(connection_module, config) } end end diff --git a/elixir/apps/domain/lib/domain/events/oid_database.ex b/elixir/apps/domain/lib/domain/replication/oid_database.ex similarity index 98% rename from elixir/apps/domain/lib/domain/events/oid_database.ex rename to elixir/apps/domain/lib/domain/replication/oid_database.ex index c1626385f..0cee74c6a 100644 --- a/elixir/apps/domain/lib/domain/events/oid_database.ex +++ b/elixir/apps/domain/lib/domain/replication/oid_database.ex @@ -1,5 +1,5 @@ # CREDIT: https://github.com/supabase/realtime/blob/main/lib/realtime/adapters/postgres/oid_database.ex -defmodule Domain.Events.OidDatabase do +defmodule Domain.Replication.OidDatabase do @moduledoc "This module maps a numeric PostgreSQL type ID to a descriptive string." @doc """ diff --git a/elixir/apps/domain/lib/domain/events/protocol.ex b/elixir/apps/domain/lib/domain/replication/protocol.ex similarity index 94% rename from elixir/apps/domain/lib/domain/events/protocol.ex rename to elixir/apps/domain/lib/domain/replication/protocol.ex index 6b3dd68b6..9fd38f8e3 100644 --- a/elixir/apps/domain/lib/domain/events/protocol.ex +++ b/elixir/apps/domain/lib/domain/replication/protocol.ex @@ -1,10 +1,10 @@ # CREDIT: https://github.com/supabase/realtime/blob/main/lib/realtime/adapters/postgres/protocol.ex -defmodule Domain.Events.Protocol do +defmodule Domain.Replication.Protocol do @moduledoc """ This module is responsible for parsing the Postgres WAL messages. """ - alias Domain.Events.Protocol.Write - alias Domain.Events.Protocol.KeepAlive + alias Domain.Replication.Protocol.Write + alias Domain.Replication.Protocol.KeepAlive defguard is_write(value) when binary_part(value, 0, 1) == <> defguard is_keep_alive(value) when binary_part(value, 0, 1) == <> diff --git a/elixir/apps/domain/lib/domain/events/protocol/keep_alive.ex b/elixir/apps/domain/lib/domain/replication/protocol/keep_alive.ex similarity index 93% rename from elixir/apps/domain/lib/domain/events/protocol/keep_alive.ex rename to elixir/apps/domain/lib/domain/replication/protocol/keep_alive.ex index 21e2cf978..916b980d9 100644 --- a/elixir/apps/domain/lib/domain/events/protocol/keep_alive.ex +++ b/elixir/apps/domain/lib/domain/replication/protocol/keep_alive.ex @@ -1,4 +1,4 @@ -defmodule Domain.Events.Protocol.KeepAlive do +defmodule Domain.Replication.Protocol.KeepAlive do @moduledoc """ Primary keepalive message (B) Byte1('k') diff --git a/elixir/apps/domain/lib/domain/events/protocol/write.ex b/elixir/apps/domain/lib/domain/replication/protocol/write.ex similarity index 94% rename from elixir/apps/domain/lib/domain/events/protocol/write.ex rename to elixir/apps/domain/lib/domain/replication/protocol/write.ex index 9af5f146b..87178950f 100644 --- a/elixir/apps/domain/lib/domain/events/protocol/write.ex +++ b/elixir/apps/domain/lib/domain/replication/protocol/write.ex @@ -1,4 +1,4 @@ -defmodule Domain.Events.Protocol.Write do +defmodule Domain.Replication.Protocol.Write do @moduledoc """ XLogData (B) Byte1('w') diff --git a/elixir/apps/domain/priv/repo/migrations/20250616205321_create_change_logs.exs b/elixir/apps/domain/priv/repo/migrations/20250616205321_create_change_logs.exs new file mode 100644 index 000000000..be714db6c --- /dev/null +++ b/elixir/apps/domain/priv/repo/migrations/20250616205321_create_change_logs.exs @@ -0,0 +1,32 @@ +defmodule Domain.Repo.Migrations.CreateChangeLogs do + use Ecto.Migration + + def up do + create table(:change_logs, primary_key: false) do + add(:id, :binary_id, primary_key: true) + + add(:account_id, references(:accounts, type: :binary_id, on_delete: :delete_all), + null: false + ) + + add(:lsn, :bigint, null: false) + add(:table, :string, null: false) + add(:op, :string, null: false) + add(:old_data, :map) + add(:data, :map) + add(:vsn, :integer, null: false) + + timestamps(type: :utc_datetime_usec, updated_at: false) + end + + # For pulling logs for a particular customer + create(index(:change_logs, [:account_id])) + + # For truncating logs by date + create(index(:change_logs, [:inserted_at])) + end + + def down do + drop(table(:change_logs)) + end +end diff --git a/elixir/apps/domain/priv/repo/migrations/20250616230420_remove_replica_from_relays.exs b/elixir/apps/domain/priv/repo/migrations/20250616230420_remove_replica_from_relays.exs new file mode 100644 index 000000000..8493def13 --- /dev/null +++ b/elixir/apps/domain/priv/repo/migrations/20250616230420_remove_replica_from_relays.exs @@ -0,0 +1,20 @@ +defmodule Domain.Repo.Migrations.RemoveReplicaFromRelays do + use Ecto.Migration + + @relations ~w[ + relay_groups + relays + ] + + def up do + for relation <- @relations do + execute("ALTER TABLE #{relation} REPLICA IDENTITY DEFAULT") + end + end + + def down do + for relation <- @relations do + execute("ALTER TABLE #{relation} REPLICA IDENTITY FULL") + end + end +end diff --git a/elixir/apps/domain/test/domain/change_logs/replication_connection_test.exs b/elixir/apps/domain/test/domain/change_logs/replication_connection_test.exs new file mode 100644 index 000000000..5c4049d6f --- /dev/null +++ b/elixir/apps/domain/test/domain/change_logs/replication_connection_test.exs @@ -0,0 +1,349 @@ +defmodule Domain.ChangeLogs.ReplicationConnectionTest do + use Domain.DataCase, async: true + + import ExUnit.CaptureLog + import Ecto.Query + import Domain.ChangeLogs.ReplicationConnection + alias Domain.ChangeLogs.ChangeLog + alias Domain.Repo + + setup do + account = Fixtures.Accounts.create_account() + %{account: account} + end + + describe "on_insert/2" do + test "ignores flows table - no record created" do + table = "flows" + data = %{"id" => 1, "name" => "test flow"} + + initial_count = Repo.aggregate(ChangeLog, :count, :id) + + assert :ok = on_insert(0, table, data) + + # No record should be created for flows + final_count = Repo.aggregate(ChangeLog, :count, :id) + assert final_count == initial_count + end + + test "creates change log record for non-flows tables", %{account: account} do + table = "accounts" + data = %{"id" => account.id, "name" => "test account"} + + assert :ok = on_insert(0, table, data) + + # Verify the record was created + change_log = Repo.one!(from cl in ChangeLog, order_by: [desc: cl.inserted_at], limit: 1) + + assert change_log.op == :insert + assert change_log.table == table + assert change_log.old_data == nil + assert change_log.data == data + assert change_log.vsn == 0 + assert change_log.account_id == account.id + end + + test "creates records for different table types", %{account: account} do + test_cases = [ + {"accounts", %{"id" => account.id, "name" => "test account"}}, + {"resources", + %{"id" => Ecto.UUID.generate(), "name" => "test resource", "account_id" => account.id}}, + {"policies", + %{"id" => Ecto.UUID.generate(), "name" => "test policy", "account_id" => account.id}}, + {"actors", + %{"id" => Ecto.UUID.generate(), "name" => "test actor", "account_id" => account.id}} + ] + + for {table, data} <- test_cases do + initial_count = Repo.aggregate(ChangeLog, :count, :id) + + assert :ok = on_insert(0, table, data) + + final_count = Repo.aggregate(ChangeLog, :count, :id) + assert final_count == initial_count + 1 + + change_log = Repo.one!(from cl in ChangeLog, order_by: [desc: cl.inserted_at], limit: 1) + + assert change_log.op == :insert + assert change_log.table == table + assert change_log.old_data == nil + assert change_log.data == data + assert change_log.vsn == 0 + assert change_log.account_id == account.id + end + end + end + + describe "on_update/3" do + test "ignores flows table - no record created" do + table = "flows" + old_data = %{"id" => 1, "name" => "old flow"} + data = %{"id" => 1, "name" => "new flow"} + + initial_count = Repo.aggregate(ChangeLog, :count, :id) + + assert :ok = on_update(0, table, old_data, data) + + # No record should be created for flows + final_count = Repo.aggregate(ChangeLog, :count, :id) + assert final_count == initial_count + end + + test "creates change log record for non-flows tables", %{account: account} do + table = "accounts" + old_data = %{"id" => account.id, "name" => "old name"} + data = %{"id" => account.id, "name" => "new name"} + + assert :ok = on_update(0, table, old_data, data) + + # Verify the record was created + change_log = Repo.one!(from cl in ChangeLog, order_by: [desc: cl.inserted_at], limit: 1) + + assert change_log.op == :update + assert change_log.table == table + assert change_log.old_data == old_data + assert change_log.data == data + assert change_log.vsn == 0 + assert change_log.account_id == account.id + end + + test "handles complex data structures", %{account: account} do + table = "resources" + resource_id = Ecto.UUID.generate() + + old_data = %{ + "id" => resource_id, + "name" => "old name", + "account_id" => account.id, + "settings" => %{"theme" => "dark", "notifications" => true} + } + + data = %{ + "id" => resource_id, + "name" => "new name", + "account_id" => account.id, + "settings" => %{"theme" => "light", "notifications" => false}, + "tags" => ["updated", "important"] + } + + assert :ok = on_update(0, table, old_data, data) + + change_log = Repo.one!(from cl in ChangeLog, order_by: [desc: cl.inserted_at], limit: 1) + + assert change_log.op == :update + assert change_log.table == table + assert change_log.old_data == old_data + assert change_log.data == data + assert change_log.vsn == 0 + assert change_log.account_id == account.id + end + end + + describe "on_delete/2" do + test "ignores flows table - no record created" do + table = "flows" + old_data = %{"id" => 1, "name" => "deleted flow"} + + initial_count = Repo.aggregate(ChangeLog, :count, :id) + + assert :ok = on_delete(0, table, old_data) + + # No record should be created for flows + final_count = Repo.aggregate(ChangeLog, :count, :id) + assert final_count == initial_count + end + + test "creates change log record for non-flows tables", %{account: account} do + table = "accounts" + old_data = %{"id" => account.id, "name" => "deleted account"} + + assert :ok = on_delete(0, table, old_data) + + # Verify the record was created + change_log = Repo.one!(from cl in ChangeLog, order_by: [desc: cl.inserted_at], limit: 1) + + assert change_log.op == :delete + assert change_log.table == table + assert change_log.old_data == old_data + assert change_log.data == nil + assert change_log.vsn == 0 + assert change_log.account_id == account.id + end + + test "handles various data types in old_data", %{account: account} do + table = "resources" + resource_id = Ecto.UUID.generate() + + old_data = %{ + "id" => resource_id, + "name" => "complex resource", + "account_id" => account.id, + "metadata" => %{ + "created_by" => "system", + "permissions" => ["read", "write"], + "config" => %{"timeout" => 30, "retries" => 3} + }, + "active" => true, + "count" => 42 + } + + assert :ok = on_delete(0, table, old_data) + + change_log = Repo.one!(from cl in ChangeLog, order_by: [desc: cl.inserted_at], limit: 1) + + assert change_log.op == :delete + assert change_log.table == table + assert change_log.old_data == old_data + assert change_log.data == nil + assert change_log.vsn == 0 + assert change_log.account_id == account.id + end + end + + describe "error handling" do + test "handles foreign key errors gracefully" do + # Create a change log entry that references a non-existent account + table = "resources" + # Non-existent account_id + data = %{"id" => Ecto.UUID.generate(), "account_id" => Ecto.UUID.generate()} + + initial_count = Repo.aggregate(ChangeLog, :count, :id) + + # Should return :ok even if foreign key constraint fails + assert :ok = on_insert(0, table, data) + + # No record should be created due to foreign key error + final_count = Repo.aggregate(ChangeLog, :count, :id) + assert final_count == initial_count + end + + test "logs and handles non-foreign-key validation errors gracefully", %{account: account} do + # Test with invalid data that would cause validation errors (not foreign key) + table = "accounts" + # Missing required fields but valid FK + data = %{"account_id" => account.id} + + initial_count = Repo.aggregate(ChangeLog, :count, :id) + + log_output = + capture_log(fn -> + assert :ok = on_insert(0, table, data) + end) + + # Should log the error + assert log_output =~ "Failed to create change log" + + # No record should be created + final_count = Repo.aggregate(ChangeLog, :count, :id) + assert final_count == initial_count + end + end + + describe "data integrity" do + test "preserves exact data structures", %{account: account} do + table = "policies" + policy_id = Ecto.UUID.generate() + + # Test with various data types + complex_data = %{ + "id" => policy_id, + "account_id" => account.id, + "string_field" => "test string", + "integer_field" => 42, + "boolean_field" => true, + "null_field" => nil, + "array_field" => [1, "two", %{"three" => 3}], + "nested_object" => %{ + "level1" => %{ + "level2" => %{ + "deep_value" => "preserved" + } + } + } + } + + assert :ok = on_insert(0, table, complex_data) + + change_log = Repo.one!(from cl in ChangeLog, order_by: [desc: cl.inserted_at], limit: 1) + + # Data should be preserved exactly as provided + assert change_log.data == complex_data + assert change_log.op == :insert + assert change_log.table == table + assert change_log.account_id == account.id + end + + test "tracks operation sequence correctly", %{account: account} do + table = "accounts" + initial_data = %{"id" => account.id, "name" => "initial"} + updated_data = %{"id" => account.id, "name" => "updated"} + + # Insert + assert :ok = on_insert(0, table, initial_data) + + # Update + assert :ok = on_update(0, table, initial_data, updated_data) + + # Delete + assert :ok = on_delete(0, table, updated_data) + + # Get the three most recent records in reverse chronological order + logs = + Repo.all( + from cl in ChangeLog, + where: cl.account_id == ^account.id, + order_by: [desc: cl.inserted_at], + limit: 3 + ) + + # Should have 3 records (delete, update, insert in that order) + assert length(logs) >= 3 + [delete_log, update_log, insert_log] = Enum.take(logs, 3) + + # Verify sequence (most recent first) + assert delete_log.op == :delete + assert delete_log.old_data == updated_data + assert delete_log.data == nil + assert delete_log.account_id == account.id + + assert update_log.op == :update + assert update_log.old_data == initial_data + assert update_log.data == updated_data + assert update_log.account_id == account.id + + assert insert_log.op == :insert + assert insert_log.old_data == nil + assert insert_log.data == initial_data + assert insert_log.account_id == account.id + + # All should have same version + assert insert_log.vsn == 0 + assert update_log.vsn == 0 + assert delete_log.vsn == 0 + end + end + + describe "flows table comprehensive test" do + test "flows table never creates records regardless of operation or data" do + initial_count = Repo.aggregate(ChangeLog, :count, :id) + + # Test various data shapes and operations + test_data_sets = [ + %{}, + %{"id" => 1}, + %{"complex" => %{"nested" => ["data", 1, true, nil]}}, + nil + ] + + for data <- test_data_sets do + assert :ok = on_insert(0, "flows", data) + assert :ok = on_update(0, "flows", data, data) + assert :ok = on_delete(0, "flows", data) + end + + # No records should have been created + final_count = Repo.aggregate(ChangeLog, :count, :id) + assert final_count == initial_count + end + end +end diff --git a/elixir/apps/domain/test/domain/change_logs_test.exs b/elixir/apps/domain/test/domain/change_logs_test.exs new file mode 100644 index 000000000..a7d4037e5 --- /dev/null +++ b/elixir/apps/domain/test/domain/change_logs_test.exs @@ -0,0 +1,176 @@ +defmodule Domain.ChangeLogsTest do + use Domain.DataCase, async: true + import Domain.ChangeLogs + + describe "create/1" do + setup do + account = Fixtures.Accounts.create_account() + + %{account: account} + end + + test "inserts a change_log for an account", %{account: account} do + attrs = %{ + lsn: 1, + table: "resources", + op: :insert, + old_data: nil, + data: %{"account_id" => account.id, "key" => "value"}, + vsn: 1 + } + + assert {:ok, %Domain.ChangeLogs.ChangeLog{} = change_log} = create_change_log(attrs) + + assert change_log.account_id == account.id + assert change_log.op == :insert + assert change_log.old_data == nil + assert change_log.data == %{"account_id" => account.id, "key" => "value"} + end + + test "uses the 'id' field accounts table updates", %{account: account} do + attrs = %{ + lsn: 1, + table: "accounts", + op: :update, + old_data: %{"id" => account.id, "name" => "Old Name"}, + data: %{"id" => account.id, "name" => "New Name"}, + vsn: 1 + } + + assert {:ok, %Domain.ChangeLogs.ChangeLog{} = change_log} = create_change_log(attrs) + + assert change_log.account_id == account.id + assert change_log.op == :update + assert change_log.old_data == %{"id" => account.id, "name" => "Old Name"} + assert change_log.data == %{"id" => account.id, "name" => "New Name"} + end + + test "requires vsn field", %{account: account} do + attrs = %{ + lsn: 1, + table: "resources", + op: :insert, + old_data: nil, + data: %{"account_id" => account.id, "key" => "value"} + } + + assert {:error, changeset} = create_change_log(attrs) + assert changeset.valid? == false + assert changeset.errors[:vsn] == {"can't be blank", [validation: :required]} + end + + test "requires table field", %{account: account} do + attrs = %{ + lsn: 1, + op: :insert, + old_data: nil, + data: %{"account_id" => account.id, "key" => "value"}, + vsn: 1 + } + + assert {:error, changeset} = create_change_log(attrs) + assert changeset.valid? == false + assert changeset.errors[:table] == {"can't be blank", [validation: :required]} + end + + test "requires op field to be one of :insert, :update, :delete", %{account: account} do + attrs = %{ + lsn: 1, + table: "resources", + op: :invalid_op, + old_data: nil, + data: %{"account_id" => account.id, "key" => "value"}, + vsn: 1 + } + + assert {:error, changeset} = create_change_log(attrs) + assert changeset.valid? == false + assert {"is invalid", errors} = changeset.errors[:op] + assert {:validation, :inclusion} in errors + end + + test "requires correct combination of operation and data", %{account: account} do + # Invalid combination: :insert with old_data present + attrs = %{ + lsn: 1, + table: "resources", + op: :insert, + old_data: %{"account_id" => account.id, "key" => "old_value"}, + data: %{"account_id" => account.id, "key" => "new_value"}, + vsn: 1 + } + + assert {:error, changeset} = create_change_log(attrs) + assert changeset.valid? == false + assert changeset.errors[:base] == {"Invalid combination of operation and data", []} + + # Valid combination: :insert with old_data nil and data present + attrs = %{ + lsn: 1, + table: "resources", + op: :insert, + old_data: nil, + data: %{"account_id" => account.id, "key" => "new_value"}, + vsn: 1 + } + + assert {:ok, _change_log} = create_change_log(attrs) + + # Valid combination: :update with both old_data and data present + attrs = %{ + lsn: 1, + table: "resources", + op: :update, + old_data: %{"account_id" => account.id, "key" => "old_value"}, + data: %{"account_id" => account.id, "key" => "new_value"}, + vsn: 1 + } + + assert {:ok, _change_log} = create_change_log(attrs) + + # Valid combination: :delete with old_data present and data nil + attrs = %{ + lsn: 1, + table: "resources", + op: :delete, + old_data: %{"account_id" => account.id, "key" => "old_value"}, + data: nil, + vsn: 1 + } + + assert {:ok, _change_log} = create_change_log(attrs) + end + + test "requires account_id to be populated from old_data or data" do + attrs = %{ + lsn: 1, + table: "resources", + op: :insert, + old_data: nil, + data: %{"key" => "value"}, + vsn: 1 + } + + assert {:error, changeset} = create_change_log(attrs) + assert changeset.valid? == false + assert changeset.errors[:account_id] == {"can't be blank", [validation: :required]} + end + + test "requires old_data[\"account_id\"] and data[\"account_id\"] to match", %{ + account: account + } do + attrs = %{ + lsn: 1, + table: "resources", + op: :update, + old_data: %{"account_id" => account.id, "key" => "old_value"}, + data: %{"account_id" => "different_account_id", "key" => "new_value"}, + vsn: 1 + } + + assert {:error, changeset} = create_change_log(attrs) + assert changeset.valid? == false + assert changeset.errors[:base] == {"Account ID cannot be changed", []} + end + end +end diff --git a/elixir/apps/domain/test/domain/events/event_test.exs b/elixir/apps/domain/test/domain/events/event_test.exs deleted file mode 100644 index 184b720d3..000000000 --- a/elixir/apps/domain/test/domain/events/event_test.exs +++ /dev/null @@ -1,28 +0,0 @@ -defmodule Domain.Events.EventTest do - use ExUnit.Case, async: true - # import Domain.Events.Event - # alias Domain.Events.Decoder - - setup do - config = Application.fetch_env!(:domain, Domain.Events.ReplicationConnection) - table_subscriptions = config[:table_subscriptions] - - %{table_subscriptions: table_subscriptions} - end - - # TODO: WAL - # Refactor this to test ingest of all table subscriptions as structs with stringified - # keys in order to assert on the shape of the data. - # describe "ingest/2" do - # test "returns :ok for insert on all configured table subscriptions", %{ - # table_subscriptions: table_subscriptions - # } do - # for table <- table_subscriptions do - # relations = %{"1" => %{name: table, columns: []}} - # msg = %Decoder.Messages.Insert{tuple_data: {}, relation_id: "1"} - # - # assert :ok == ingest(msg, relations) - # end - # end - # end -end diff --git a/elixir/apps/domain/test/domain/events/replication_connection_test.exs b/elixir/apps/domain/test/domain/events/replication_connection_test.exs index f9a1b6ed5..b95a0773a 100644 --- a/elixir/apps/domain/test/domain/events/replication_connection_test.exs +++ b/elixir/apps/domain/test/domain/events/replication_connection_test.exs @@ -1,222 +1,153 @@ defmodule Domain.Events.ReplicationConnectionTest do - # Only one ReplicationConnection should be started in the cluster - use ExUnit.Case, async: false + use ExUnit.Case, async: true - alias Domain.Events.ReplicationConnection + import ExUnit.CaptureLog + import Domain.Events.ReplicationConnection - # Used to test callbacks, not used for live connection - @mock_state %ReplicationConnection{ - schema: "test_schema", - step: :disconnected, - publication_name: "test_pub", - replication_slot_name: "test_slot", - output_plugin: "pgoutput", - proto_version: 1, - table_subscriptions: ["accounts", "resources"], - relations: %{}, - counter: 0 - } - - # Used to test live connection setup do - {connection_opts, config} = + tables = Application.fetch_env!(:domain, Domain.Events.ReplicationConnection) - |> Keyword.pop(:connection_opts) + |> Keyword.fetch!(:table_subscriptions) - init_state = %{ - connection_opts: connection_opts, - instance: struct(Domain.Events.ReplicationConnection, config) - } + %{tables: tables} + end - child_spec = %{ - id: Domain.Events.ReplicationConnection, - start: {Domain.Events.ReplicationConnection, :start_link, [init_state]} - } + describe "on_insert/2" do + test "logs warning for unknown table" do + table = "unknown_table" + data = %{"id" => Ecto.UUID.generate(), "name" => "test"} - {:ok, pid} = - case start_supervised(child_spec) do - {:ok, pid} -> - {:ok, pid} + log_output = + capture_log(fn -> + assert :ok = on_insert(0, table, data) + end) - {:error, {:already_started, pid}} -> - {:ok, pid} + assert log_output =~ "No hook defined for insert on table unknown_table" + assert log_output =~ "Please implement Domain.Events.Hooks for this table" + end + + test "handles known tables without errors", %{tables: tables} do + for table <- tables do + data = %{"id" => Ecto.UUID.generate(), "table" => table} + + # The actual hook call might fail if the hook modules aren't available, + # but we can test that our routing logic works + try do + result = on_insert(0, table, data) + # Should either succeed or fail gracefully + assert result in [:ok, :error] or match?({:error, _}, result) + rescue + # Depending on the shape of the data we might get a function clause error. This is ok, + # as we are testing the routing logic, not the actual hook implementations. + FunctionClauseError -> :ok + end end + end - {:ok, pid: pid} - end + test "handles all configured tables", %{tables: tables} do + for table <- tables do + # Should not log warnings for configured tables + log_output = + capture_log(fn -> + try do + on_insert(0, table, %{"id" => Ecto.UUID.generate()}) + rescue + FunctionClauseError -> + # Shape of the data might not match the expected one, which is fine + :ok + end + end) - describe "handle_connect/1 callback" do - test "handle_connect initiates publication check" do - state = @mock_state - expected_query = "SELECT 1 FROM pg_publication WHERE pubname = '#{state.publication_name}'" - expected_next_state = %{state | step: :create_publication} - - assert {:query, ^expected_query, ^expected_next_state} = - ReplicationConnection.handle_connect(state) + refute log_output =~ "No hook defined for insert" + end end end - describe "handle_result/2 callback" do - test "handle_result transitions from create_publication to create_replication_slot when publication exists" do - state = %{@mock_state | step: :create_publication} - result = [%Postgrex.Result{num_rows: 1}] + describe "on_update/3" do + test "logs warning for unknown table" do + table = "unknown_table" + old_data = %{"id" => Ecto.UUID.generate(), "name" => "old"} + data = %{"id" => Ecto.UUID.generate(), "name" => "new"} - expected_query = - "SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'" + log_output = + capture_log(fn -> + assert :ok = on_update(0, table, old_data, data) + end) - expected_next_state = %{state | step: :create_replication_slot} - - assert {:query, ^expected_query, ^expected_next_state} = - ReplicationConnection.handle_result(result, state) + assert log_output =~ "No hook defined for update on table unknown_table" + assert log_output =~ "Please implement Domain.Events.Hooks for this table" end - test "handle_result transitions from create_replication_slot to start_replication_slot when slot exists" do - state = %{@mock_state | step: :create_replication_slot} - result = [%Postgrex.Result{num_rows: 1}] + test "handles known tables", %{tables: tables} do + old_data = %{"id" => Ecto.UUID.generate(), "name" => "old name"} + data = %{"id" => Ecto.UUID.generate(), "name" => "new name"} - expected_query = "SELECT 1" - expected_next_state = %{state | step: :start_replication_slot} - - assert {:query, ^expected_query, ^expected_next_state} = - ReplicationConnection.handle_result(result, state) - end - - test "handle_result transitions from start_replication_slot to streaming" do - state = %{@mock_state | step: :start_replication_slot} - result = [%Postgrex.Result{num_rows: 1}] - - expected_stream_query = - "START_REPLICATION SLOT \"#{state.replication_slot_name}\" LOGICAL 0/0 (proto_version '#{state.proto_version}', publication_names '#{state.publication_name}')" - - expected_next_state = %{state | step: :streaming} - - assert {:stream, ^expected_stream_query, [], ^expected_next_state} = - ReplicationConnection.handle_result(result, state) - end - - test "handle_result creates publication if it doesn't exist" do - state = %{@mock_state | step: :create_publication} - result = [%Postgrex.Result{num_rows: 0}] - - expected_tables = "test_schema.accounts,test_schema.resources" - expected_query = "CREATE PUBLICATION #{state.publication_name} FOR TABLE #{expected_tables}" - expected_next_state = %{state | step: :check_replication_slot} - - assert {:query, ^expected_query, ^expected_next_state} = - ReplicationConnection.handle_result(result, state) - end - - test "handle_result transitions from check_replication_slot to create_replication_slot after creating publication" do - state = %{@mock_state | step: :check_replication_slot} - result = [%Postgrex.Result{num_rows: 0}] - - expected_query = - "SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'" - - expected_next_state = %{state | step: :create_replication_slot} - - assert {:query, ^expected_query, ^expected_next_state} = - ReplicationConnection.handle_result(result, state) - end - - test "handle_result creates replication slot if it doesn't exist" do - state = %{@mock_state | step: :create_replication_slot} - result = [%Postgrex.Result{num_rows: 0}] - - expected_query = - "CREATE_REPLICATION_SLOT #{state.replication_slot_name} LOGICAL #{state.output_plugin} NOEXPORT_SNAPSHOT" - - expected_next_state = %{state | step: :start_replication_slot} - - assert {:query, ^expected_query, ^expected_next_state} = - ReplicationConnection.handle_result(result, state) + for table <- tables do + try do + result = on_update(0, table, old_data, data) + assert result in [:ok, :error] or match?({:error, _}, result) + rescue + FunctionClauseError -> + # Shape of the data might not match the expected one, which is fine + :ok + end + end end end - # In-depth decoding tests are handled in Domain.Events.DecoderTest - describe "handle_data/2" do - test "handle_data handles KeepAlive with reply :now" do - state = %{@mock_state | step: :streaming} - wal_end = 12345 + describe "on_delete/2" do + test "logs warning for unknown table" do + table = "unknown_table" + old_data = %{"id" => Ecto.UUID.generate(), "name" => "deleted"} - now = - System.os_time(:microsecond) - DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond) + log_output = + capture_log(fn -> + assert :ok = on_delete(0, table, old_data) + end) - # 100 milliseconds - grace_period = 100_000 - keepalive_data = <> - - assert {:noreply, reply, ^state} = - ReplicationConnection.handle_data(keepalive_data, state) - - assert [<>] = reply - - assert now <= clock - assert clock < now + grace_period + assert log_output =~ "No hook defined for delete on table unknown_table" + assert log_output =~ "Please implement Domain.Events.Hooks for this table" end - test "handle_data handles KeepAlive with reply :later" do - state = %{@mock_state | step: :streaming} - wal_end = 54321 + test "handles known tables", %{tables: tables} do + old_data = %{"id" => Ecto.UUID.generate(), "name" => "deleted gateway"} - keepalive_data = <> - expected_reply_message = [] - - assert {:noreply, ^expected_reply_message, ^state} = - ReplicationConnection.handle_data(keepalive_data, state) - end - - test "handle_data handles Write message" do - state = %{@mock_state | step: :streaming} - server_wal_start = 123_456_789 - server_wal_end = 987_654_321 - server_system_clock = 1_234_567_890 - message = "Hello, world!" - - write_data = - <> - - new_state = %{state | counter: state.counter + 1} - - assert {:noreply, [], ^new_state} = ReplicationConnection.handle_data(write_data, state) - end - - test "handle_data handles unknown message" do - state = %{@mock_state | step: :streaming} - unknown_data = <> - - assert {:noreply, [], ^state} = ReplicationConnection.handle_data(unknown_data, state) + for table <- tables do + try do + assert :ok = on_delete(0, table, old_data) + rescue + # Shape of the data might not match the expected one, which is fine + FunctionClauseError -> :ok + end + end end end - describe "handle_info/2" do - test "handle_info handles :shutdown message" do - state = @mock_state - assert {:disconnect, :normal} = ReplicationConnection.handle_info(:shutdown, state) - end + describe "warning message formatting" do + test "log_warning generates correct message format" do + log_output = + capture_log(fn -> + assert :ok = on_insert(0, "test_table_insert", %{}) + end) - test "handle_info handles :DOWN message from monitored process" do - state = @mock_state - monitor_ref = make_ref() - down_msg = {:DOWN, monitor_ref, :process, :some_pid, :shutdown} + assert log_output =~ "No hook defined for insert on table test_table_insert" + assert log_output =~ "Please implement Domain.Events.Hooks for this table" - assert {:disconnect, :normal} = ReplicationConnection.handle_info(down_msg, state) - end + log_output = + capture_log(fn -> + assert :ok = on_update(0, "test_table_update", %{}, %{}) + end) - test "handle_info ignores other messages" do - state = @mock_state - random_msg = {:some_other_info, "data"} + assert log_output =~ "No hook defined for update on table test_table_update" + assert log_output =~ "Please implement Domain.Events.Hooks for this table" - assert {:noreply, ^state} = ReplicationConnection.handle_info(random_msg, state) - end - end + log_output = + capture_log(fn -> + assert :ok = on_delete(0, "test_table_delete", %{}) + end) - describe "handle_disconnect/1" do - test "handle_disconnect resets step to :disconnected" do - state = %{@mock_state | step: :streaming} - expected_state = %{state | step: :disconnected} - - assert {:noreply, ^expected_state} = ReplicationConnection.handle_disconnect(state) + assert log_output =~ "No hook defined for delete on table test_table_delete" + assert log_output =~ "Please implement Domain.Events.Hooks for this table" end end end diff --git a/elixir/apps/domain/test/domain/replication/connection_test.exs b/elixir/apps/domain/test/domain/replication/connection_test.exs new file mode 100644 index 000000000..729bf1b35 --- /dev/null +++ b/elixir/apps/domain/test/domain/replication/connection_test.exs @@ -0,0 +1,308 @@ +defmodule Domain.Replication.ConnectionTest do + # Only one ReplicationConnection should be started in the cluster + use ExUnit.Case, async: false + + # Create a test module that uses the macro + defmodule TestReplicationConnection do + use Domain.Replication.Connection, + alert_threshold_ms: 5_000, + publication_name: "test_events" + end + + alias TestReplicationConnection + + # Used to test callbacks, not used for live connection + def mock_state, + do: %TestReplicationConnection{ + schema: "test_schema", + step: :disconnected, + publication_name: "test_pub", + replication_slot_name: "test_slot", + output_plugin: "pgoutput", + proto_version: 1, + table_subscriptions: ["accounts", "resources"], + relations: %{}, + counter: 0 + } + + # Used to test live connection + setup do + {connection_opts, config} = + Application.fetch_env!(:domain, Domain.Events.ReplicationConnection) + |> Keyword.pop(:connection_opts) + + init_state = %{ + connection_opts: connection_opts, + instance: struct(TestReplicationConnection, config) + } + + child_spec = %{ + id: TestReplicationConnection, + start: {TestReplicationConnection, :start_link, [init_state]} + } + + {:ok, pid} = + case start_supervised(child_spec) do + {:ok, pid} -> + {:ok, pid} + + {:error, {:already_started, pid}} -> + {:ok, pid} + end + + {:ok, pid: pid} + end + + describe "handle_connect/1 callback" do + test "handle_connect initiates publication check" do + state = mock_state() + expected_query = "SELECT 1 FROM pg_publication WHERE pubname = '#{state.publication_name}'" + expected_next_state = %{state | step: :create_publication} + + assert {:query, ^expected_query, ^expected_next_state} = + TestReplicationConnection.handle_connect(state) + end + end + + describe "handle_result/2 callback" do + test "handle_result transitions from create_publication to create_replication_slot when publication exists" do + state = %{mock_state() | step: :create_publication} + result = [%Postgrex.Result{num_rows: 1}] + + expected_query = + "SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'" + + expected_next_state = %{state | step: :create_replication_slot} + + assert {:query, ^expected_query, ^expected_next_state} = + TestReplicationConnection.handle_result(result, state) + end + + test "handle_result transitions from create_replication_slot to start_replication_slot when slot exists" do + state = %{mock_state() | step: :create_replication_slot} + result = [%Postgrex.Result{num_rows: 1}] + + expected_query = "SELECT 1" + expected_next_state = %{state | step: :start_replication_slot} + + assert {:query, ^expected_query, ^expected_next_state} = + TestReplicationConnection.handle_result(result, state) + end + + test "handle_result transitions from start_replication_slot to streaming" do + state = %{mock_state() | step: :start_replication_slot} + result = [%Postgrex.Result{num_rows: 1}] + + expected_stream_query = + "START_REPLICATION SLOT \"#{state.replication_slot_name}\" LOGICAL 0/0 (proto_version '#{state.proto_version}', publication_names '#{state.publication_name}')" + + expected_next_state = %{state | step: :streaming} + + assert {:stream, ^expected_stream_query, [], ^expected_next_state} = + TestReplicationConnection.handle_result(result, state) + end + + test "handle_result creates publication if it doesn't exist" do + state = %{mock_state() | step: :create_publication} + result = [%Postgrex.Result{num_rows: 0}] + + expected_tables = "test_schema.accounts,test_schema.resources" + expected_query = "CREATE PUBLICATION #{state.publication_name} FOR TABLE #{expected_tables}" + expected_next_state = %{state | step: :check_replication_slot} + + assert {:query, ^expected_query, ^expected_next_state} = + TestReplicationConnection.handle_result(result, state) + end + + test "handle_result transitions from check_replication_slot to create_replication_slot after creating publication" do + state = %{mock_state() | step: :check_replication_slot} + result = [%Postgrex.Result{num_rows: 0}] + + expected_query = + "SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'" + + expected_next_state = %{state | step: :create_replication_slot} + + assert {:query, ^expected_query, ^expected_next_state} = + TestReplicationConnection.handle_result(result, state) + end + + test "handle_result creates replication slot if it doesn't exist" do + state = %{mock_state() | step: :create_replication_slot} + result = [%Postgrex.Result{num_rows: 0}] + + expected_query = + "CREATE_REPLICATION_SLOT #{state.replication_slot_name} LOGICAL #{state.output_plugin} NOEXPORT_SNAPSHOT" + + expected_next_state = %{state | step: :start_replication_slot} + + assert {:query, ^expected_query, ^expected_next_state} = + TestReplicationConnection.handle_result(result, state) + end + end + + describe "handle_data/2" do + test "handle_data handles KeepAlive with reply :now" do + state = %{mock_state() | step: :streaming} + wal_end = 12345 + + now = + System.os_time(:microsecond) - DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond) + + # 100 milliseconds + grace_period = 100_000 + keepalive_data = <> + + assert {:noreply, reply, ^state} = + TestReplicationConnection.handle_data(keepalive_data, state) + + assert [<>] = reply + + assert now <= clock + assert clock < now + grace_period + end + + test "handle_data handles KeepAlive with reply :later" do + state = %{mock_state() | step: :streaming} + wal_end = 54321 + + keepalive_data = <> + expected_reply_message = [] + + assert {:noreply, ^expected_reply_message, ^state} = + TestReplicationConnection.handle_data(keepalive_data, state) + end + + test "handle_data handles Write message and increments counter" do + state = %{mock_state() | step: :streaming} + server_wal_start = 123_456_789 + server_wal_end = 987_654_321 + server_system_clock = 1_234_567_890 + message = "Hello, world!" + + write_data = + <> + + new_state = %{state | counter: state.counter + 1} + + assert {:noreply, [], ^new_state} = TestReplicationConnection.handle_data(write_data, state) + end + + test "handle_data handles unknown message" do + state = %{mock_state() | step: :streaming} + unknown_data = <> + + assert {:noreply, [], ^state} = TestReplicationConnection.handle_data(unknown_data, state) + end + + test "sends {:check_alert, lag_ms} > 5_000 ms" do + state = + %{mock_state() | step: :streaming} + |> Map.put(:lag_threshold_exceeded, false) + + server_wal_start = 123_456_789 + server_wal_end = 987_654_321 + server_system_clock = 1_234_567_890 + flags = <<0>> + lsn = <<0::32, 100::32>> + end_lsn = <<0::32, 200::32>> + + # Simulate a commit timestamp that exceeds the threshold + timestamp = + DateTime.diff(DateTime.utc_now(), ~U[2000-01-01 00:00:00Z], :microsecond) + 10_000_000 + + commit_data = <> + + write_message = + <> + + assert {:noreply, [], _state} = + TestReplicationConnection.handle_data(write_message, state) + + assert_receive({:check_alert, lag_ms}) + assert lag_ms > 5_000 + end + + test "sends {:check_alert, lag_ms} < 5_000 ms" do + state = + %{mock_state() | step: :streaming} + |> Map.put(:lag_threshold_exceeded, true) + + server_wal_start = 123_456_789 + server_wal_end = 987_654_321 + server_system_clock = 1_234_567_890 + flags = <<0>> + lsn = <<0::32, 100::32>> + end_lsn = <<0::32, 200::32>> + # Simulate a commit timestamp that is within the threshold + timestamp = + DateTime.diff(DateTime.utc_now(), ~U[2000-01-01 00:00:00Z], :microsecond) + 1_000_000 + + commit_data = <> + + write_message = + <> + + assert {:noreply, [], _state} = + TestReplicationConnection.handle_data(write_message, state) + + assert_receive({:check_alert, lag_ms}) + assert lag_ms < 5_000 + end + end + + describe "handle_info/2" do + test "handle_info handles :shutdown message" do + state = mock_state() + assert {:disconnect, :normal} = TestReplicationConnection.handle_info(:shutdown, state) + end + + test "handle_info handles :DOWN message from monitored process" do + state = mock_state() + monitor_ref = make_ref() + down_msg = {:DOWN, monitor_ref, :process, :some_pid, :shutdown} + + assert {:disconnect, :normal} = TestReplicationConnection.handle_info(down_msg, state) + end + + test "handle_info ignores other messages" do + state = mock_state() + random_msg = {:some_other_info, "data"} + + assert {:noreply, ^state} = TestReplicationConnection.handle_info(random_msg, state) + end + + test "handle_info processes lag alerts" do + state = Map.put(mock_state(), :lag_threshold_exceeded, false) + + # Test crossing threshold + assert {:noreply, %{lag_threshold_exceeded: true}} = + TestReplicationConnection.handle_info({:check_alert, 6_000}, state) + + # Test going back below threshold + state_above = %{state | lag_threshold_exceeded: true} + + assert {:noreply, %{lag_threshold_exceeded: false}} = + TestReplicationConnection.handle_info({:check_alert, 3_000}, state_above) + + # Test staying below threshold + assert {:noreply, %{lag_threshold_exceeded: false}} = + TestReplicationConnection.handle_info({:check_alert, 2_000}, state) + + # Test staying above threshold + assert {:noreply, %{lag_threshold_exceeded: true}} = + TestReplicationConnection.handle_info({:check_alert, 7_000}, state_above) + end + end + + describe "handle_disconnect/1" do + test "handle_disconnect resets step to :disconnected" do + state = %{mock_state() | step: :streaming} + expected_state = %{state | step: :disconnected} + + assert {:noreply, ^expected_state} = TestReplicationConnection.handle_disconnect(state) + end + end +end diff --git a/elixir/apps/domain/test/domain/events/decoder_test.exs b/elixir/apps/domain/test/domain/replication/decoder_test.exs similarity index 99% rename from elixir/apps/domain/test/domain/events/decoder_test.exs rename to elixir/apps/domain/test/domain/replication/decoder_test.exs index 085b0cb8f..723b77857 100644 --- a/elixir/apps/domain/test/domain/events/decoder_test.exs +++ b/elixir/apps/domain/test/domain/replication/decoder_test.exs @@ -1,8 +1,8 @@ -defmodule Domain.Events.DecoderTest do +defmodule Domain.Replication.DecoderTest do use ExUnit.Case, async: true - alias Domain.Events.Decoder - alias Domain.Events.Decoder.Messages + alias Domain.Replication.Decoder + alias Domain.Replication.Decoder.Messages @lsn_binary <<0::integer-32, 23_785_280::integer-32>> @lsn_decoded {0, 23_785_280} diff --git a/elixir/config/config.exs b/elixir/config/config.exs index 366f4fb04..88734ff93 100644 --- a/elixir/config/config.exs +++ b/elixir/config/config.exs @@ -33,6 +33,39 @@ config :domain, Domain.Repo, migration_lock: :pg_advisory_lock, start_apps_before_migration: [:ssl, :logger_json] +config :domain, Domain.ChangeLogs.ReplicationConnection, + enabled: true, + connection_opts: [ + hostname: "localhost", + port: 5432, + ssl: false, + ssl_opts: [], + parameters: [], + username: "postgres", + database: "firezone_dev", + password: "postgres" + ], + # When changing these, make sure to also: + # 1. Make appropriate changes to `Domain.Events.Event` + # 2. Add an appropriate `Domain.Events.Hooks` module + table_subscriptions: ~w[ + accounts + actor_group_memberships + actor_groups + actors + auth_identities + auth_providers + clients + flow_activities + flows + gateway_groups + gateways + policies + resource_connections + resources + tokens + ] + config :domain, Domain.Events.ReplicationConnection, enabled: true, connection_opts: [ @@ -61,8 +94,6 @@ config :domain, Domain.Events.ReplicationConnection, gateway_groups gateways policies - relay_groups - relays resource_connections resources tokens diff --git a/elixir/config/runtime.exs b/elixir/config/runtime.exs index d68e1f649..a43cae0c0 100644 --- a/elixir/config/runtime.exs +++ b/elixir/config/runtime.exs @@ -27,6 +27,21 @@ if config_env() == :prod do else: [{:hostname, env_var_to_config!(:database_host)}] ) + config :domain, Domain.ChangeLogs.ReplicationConnection, + enabled: env_var_to_config!(:background_jobs_enabled), + replication_slot_name: env_var_to_config!(:database_replication_slot_name), + publication_name: env_var_to_config!(:database_publication_name), + connection_opts: [ + hostname: env_var_to_config!(:database_host), + port: env_var_to_config!(:database_port), + ssl: env_var_to_config!(:database_ssl_enabled), + ssl_opts: env_var_to_config!(:database_ssl_opts), + parameters: env_var_to_config!(:database_parameters), + username: env_var_to_config!(:database_user), + password: env_var_to_config!(:database_password), + database: env_var_to_config!(:database_name) + ] + config :domain, Domain.Events.ReplicationConnection, enabled: env_var_to_config!(:background_jobs_enabled), replication_slot_name: env_var_to_config!(:database_replication_slot_name), diff --git a/elixir/config/test.exs b/elixir/config/test.exs index 4b7481cc2..2bfb3b1cf 100644 --- a/elixir/config/test.exs +++ b/elixir/config/test.exs @@ -20,11 +20,15 @@ config :domain, Domain.Repo, pool: Ecto.Adapters.SQL.Sandbox, queue_target: 1000 -config :domain, Domain.Events.ReplicationConnection, - publication_name: "events_test", - replication_slot_name: "events_slot_test", +config :domain, Domain.ChangeLogs.ReplicationConnection, + enabled: false, + connection_opts: [ + database: "firezone_test#{partition_suffix}" + ] + +config :domain, Domain.Events.ReplicationConnection, + enabled: false, connection_opts: [ - auto_reconnect: false, database: "firezone_test#{partition_suffix}" ] diff --git a/elixir/test.exs b/elixir/test.exs deleted file mode 100644 index 41ca8bbbf..000000000 --- a/elixir/test.exs +++ /dev/null @@ -1,515 +0,0 @@ -defmodule Domain.Events.ReplicationConnectionTest do - # Only one ReplicationConnection should be started in the cluster - use ExUnit.Case, async: false - - alias Domain.Events.Decoder.Messages - alias Domain.Events.ReplicationConnection - - # Used to test callbacks, not used for live connection - @mock_state %ReplicationConnection{ - schema: "test_schema", - connection_opts: [], - step: :disconnected, - publication_name: "test_pub", - replication_slot_name: "test_slot", - output_plugin: "pgoutput", - proto_version: 1, - # Example, adjust if needed - table_subscriptions: ["accounts", "resources"], - relations: %{} - } - - # Used to test live connection (Setup remains unchanged) - setup do - # Ensure Postgrex is started if your tests rely on it implicitly - {:ok, pid} = start_supervised(Domain.Events.ReplicationConnection) - - {:ok, pid: pid} - end - - describe "handle_connect/1 callback" do - test "handle_connect initiates publication check" do - state = @mock_state - expected_query = "SELECT 1 FROM pg_publication WHERE pubname = '#{state.publication_name}'" - expected_next_state = %{state | step: :create_publication} - - assert {:query, ^expected_query, ^expected_next_state} = - ReplicationConnection.handle_connect(state) - end - end - - describe "handle_result/2 callback" do - test "handle_result transitions from create_publication to create_replication_slot when publication exists" do - state = %{@mock_state | step: :create_publication} - # Mock a successful result for the SELECT query - result = %Postgrex.Result{ - command: :select, - columns: ["?column?"], - num_rows: 1, - rows: [[1]] - } - - expected_query = - "SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'" - - expected_next_state = %{state | step: :create_replication_slot} - - assert {:query, ^expected_query, ^expected_next_state} = - ReplicationConnection.handle_result(result, state) - end - - test "handle_result transitions from create_replication_slot to start_replication_slot when slot exists" do - state = %{@mock_state | step: :create_replication_slot} - # Mock a successful result for the SELECT query - result = %Postgrex.Result{ - command: :select, - columns: ["?column?"], - num_rows: 1, - rows: [[1]] - } - - expected_query = - "CREATE_REPLICATION_SLOT #{state.replication_slot_name} LOGICAL #{state.output_plugin} NOEXPORT_SNAPSHOT" - - expected_next_state = %{state | step: :start_replication_slot} - - expected_stream_query = - "START_REPLICATION SLOT \"#{state.replication_slot_name}\" LOGICAL 0/0 (proto_version '#{state.proto_version}', publication_names '#{state.publication_name}')" - - # Should be :streaming directly? Check impl. - expected_next_state_direct = %{state | step: :start_replication_slot} - - # Let's assume it first goes to :start_replication_slot step, then handle_result for *that* step triggers START_REPLICATION - assert {:query, _query, ^expected_next_state_direct} = - ReplicationConnection.handle_result(result, state) - end - - test "handle_result transitions from start_replication_slot to streaming" do - state = %{@mock_state | step: :start_replication_slot} - # Mock a successful result for the CREATE_REPLICATION_SLOT or preceding step - result = %Postgrex.Result{ - # Or whatever command led here - command: :create_replication_slot, - columns: nil, - num_rows: 0, - rows: nil - } - - expected_stream_query = - "START_REPLICATION SLOT \"#{state.replication_slot_name}\" LOGICAL 0/0 (proto_version '#{state.proto_version}', publication_names '#{state.publication_name}')" - - expected_next_state = %{state | step: :streaming} - - assert {:stream, ^expected_stream_query, [], ^expected_next_state} = - ReplicationConnection.handle_result(result, state) - end - - test "handle_result creates publication if it doesn't exist" do - state = %{@mock_state | step: :create_publication} - # Mock result indicating publication doesn't exist - result = %Postgrex.Result{ - command: :select, - columns: ["?column?"], - num_rows: 0, - rows: [] - } - - # Combine schema and table names correctly - expected_tables = - state.table_subscriptions - |> Enum.map(fn table -> "#{state.schema}.#{table}" end) - |> Enum.join(",") - - expected_query = "CREATE PUBLICATION #{state.publication_name} FOR TABLE #{expected_tables}" - # The original test expected the next step to be :check_replication_slot, let's keep that - expected_next_state = %{state | step: :check_replication_slot} - - assert {:query, ^expected_query, ^expected_next_state} = - ReplicationConnection.handle_result(result, state) - end - - test "handle_result transitions from check_replication_slot to create_replication_slot after creating publication" do - state = %{@mock_state | step: :check_replication_slot} - # Mock a successful result from the CREATE PUBLICATION command - result = %Postgrex.Result{ - command: :create_publication, - columns: nil, - num_rows: 0, - rows: nil - } - - expected_query = - "SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'" - - expected_next_state = %{state | step: :create_replication_slot} - - assert {:query, ^expected_query, ^expected_next_state} = - ReplicationConnection.handle_result(result, state) - end - - test "handle_result creates replication slot if it doesn't exist" do - state = %{@mock_state | step: :create_replication_slot} - # Mock result indicating slot doesn't exist - result = %Postgrex.Result{ - command: :select, - columns: ["?column?"], - num_rows: 0, - rows: [] - } - - expected_query = - "CREATE_REPLICATION_SLOT #{state.replication_slot_name} LOGICAL #{state.output_plugin} NOEXPORT_SNAPSHOT" - - expected_next_state = %{state | step: :start_replication_slot} - - assert {:query, ^expected_query, ^expected_next_state} = - ReplicationConnection.handle_result(result, state) - end - end - - # --- handle_data tests remain unchanged --- - # In-depth decoding tests are handled in Domain.Events.DecoderTest - describe "handle_data/2" do - test "handle_data handles KeepAlive with reply :now" do - state = %{@mock_state | step: :streaming} - wal_end = 12345 - # Keepalive doesn't use this field meaningfully here - server_wal_start = 0 - # Reply requested - reply_requested = 1 - - now_microseconds = - System.os_time(:microsecond) - DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond) - - # 100 milliseconds tolerance for clock check - grace_period_microseconds = 100_000 - - keepalive_data = <> - - # Expected reply format: 'r', confirmed_lsn::64, confirmed_lsn_commit::64, no_reply::8, high_priority::8, clock::64 - # The actual implementation might construct the reply differently. - # This assertion needs to match the exact binary structure returned by handle_data. - # Let's assume the implementation sends back the received wal_end as confirmed LSNs, - # and the current time. The no_reply and high_priority flags might be 0. - assert {:reply, reply_binary, ^state} = - ReplicationConnection.handle_data(keepalive_data, state) - - # Deconstruct the reply to verify its parts - assert <> = reply_binary - - assert confirmed_lsn == wal_end - # Or potentially server_wal_start? Check impl. - assert confirmed_lsn_commit == wal_end - assert no_reply == 0 - assert high_priority == 0 - assert now_microseconds <= clock < now_microseconds + grace_period_microseconds - end - - test "handle_data handles KeepAlive with reply :later" do - state = %{@mock_state | step: :streaming} - wal_end = 54321 - server_wal_start = 0 - # No reply requested - reply_requested = 0 - - keepalive_data = <> - - # When no reply is requested, it should return :noreply with no binary message - assert {:noreply, [], ^state} = - ReplicationConnection.handle_data(keepalive_data, state) - end - - test "handle_data handles Write message (XLogData)" do - state = %{@mock_state | step: :streaming} - server_wal_start = 123_456_789 - # This is the LSN of the end of the WAL data in this message - server_wal_end = 987_654_321 - # Timestamp in microseconds since PG epoch - server_system_clock = 1_234_567_890 - # Example decoded message data (e.g., a BEGIN message binary) - # This data should be passed to handle_info via send(self(), decoded_msg) - message_binary = - <<"B", @lsn_binary || <<0::64>>::binary-8, @timestamp_int || 0::integer-64, - @xid || 0::integer-32>> - - write_data = - <> - - # handle_data for 'w' should decode the message_binary and send it to self() - # It returns {:noreply, [], state} because the reply/acknowledgement happens - # via the KeepAlive ('k') mechanism. - assert {:noreply, [], ^state} = ReplicationConnection.handle_data(write_data, state) - - # Assert that the decoded message was sent to self() - # Note: This requires the test process to receive the message. - # You might need `allow_receive` or similar testing patterns if handle_data - # directly uses `send`. If it calls another function that sends, test that function. - # Let's assume handle_data directly sends for this example. - # Need some sample data defined earlier for the assertion - @lsn_binary <<0::integer-32, 23_785_280::integer-32>> - @timestamp_int 704_521_200_000 - @xid 1234 - @timestamp_decoded ~U[2022-04-29 12:20:00.000000Z] - @lsn_decoded {0, 23_785_280} - - expected_decoded_msg = %Messages.Begin{ - final_lsn: @lsn_decoded, - commit_timestamp: @timestamp_decoded, - xid: @xid - } - - assert_receive(^expected_decoded_msg) - end - - test "handle_data handles unknown message type" do - state = %{@mock_state | step: :streaming} - # Using 'q' as an example unknown type - unknown_data = <> - - # Expect it to ignore unknown types and return noreply - assert {:noreply, [], ^state} = ReplicationConnection.handle_data(unknown_data, state) - # Optionally, assert that a warning was logged if applicable - end - end - - # --- handle_info tests are CORRECTED below --- - describe "handle_info/2" do - test "handle_info updates relations on Relation message" do - state = @mock_state - - # Use the correct fields from Messages.Relation struct - relation_msg = %Messages.Relation{ - id: 101, - namespace: "public", - name: "accounts", - # Added replica_identity - replica_identity: :default, - columns: [ - %Messages.Relation.Column{ - flags: [:key], - name: "id", - type: "int4", - type_modifier: -1 - }, - %Messages.Relation.Column{ - flags: [], - name: "name", - type: "text", - type_modifier: -1 - } - ] - } - - # The state should store the relevant parts of the relation message, keyed by ID - expected_relation_data = %{ - namespace: "public", - name: "accounts", - replica_identity: :default, - columns: [ - %Messages.Relation.Column{ - flags: [:key], - name: "id", - type: "int4", - type_modifier: -1 - }, - %Messages.Relation.Column{ - flags: [], - name: "name", - type: "text", - type_modifier: -1 - } - ] - } - - expected_relations = %{101 => expected_relation_data} - expected_state = %{state | relations: expected_relations} - - assert {:noreply, ^expected_state} = ReplicationConnection.handle_info(relation_msg, state) - end - - test "handle_info returns noreply for Insert message" do - # Pre-populate state with relation info if the handler needs it - state = %{ - @mock_state - | relations: %{ - 101 => %{ - name: "accounts", - namespace: "public", - columns: [ - %Messages.Relation.Column{name: "id", type: "int4"}, - %Messages.Relation.Column{name: "name", type: "text"} - ] - } - } - } - - # Use the correct field: tuple_data (which is a tuple) - insert_msg = %Messages.Insert{relation_id: 101, tuple_data: {1, "Alice"}} - - # handle_info likely broadcasts or processes the insert, but returns noreply - assert {:noreply, ^state} = ReplicationConnection.handle_info(insert_msg, state) - # Add assertions here if handle_info is expected to send messages or call other funcs - end - - test "handle_info returns noreply for Update message" do - state = %{ - @mock_state - | relations: %{ - 101 => %{ - name: "accounts", - namespace: "public", - columns: [ - %Messages.Relation.Column{name: "id", type: "int4"}, - %Messages.Relation.Column{name: "name", type: "text"} - ] - } - } - } - - # Use the correct fields: relation_id, old_tuple_data, tuple_data, changed_key_tuple_data - update_msg = %Messages.Update{ - relation_id: 101, - # Example: only old data provided - old_tuple_data: {1, "Alice"}, - # Example: new data - tuple_data: {1, "Bob"}, - # Example: key didn't change or wasn't provided - changed_key_tuple_data: nil - } - - assert {:noreply, ^state} = ReplicationConnection.handle_info(update_msg, state) - # Add assertions for side effects (broadcasts etc.) if needed - end - - test "handle_info returns noreply for Delete message" do - state = %{ - @mock_state - | relations: %{ - 101 => %{ - name: "accounts", - namespace: "public", - columns: [ - %Messages.Relation.Column{name: "id", type: "int4"}, - %Messages.Relation.Column{name: "name", type: "text"} - ] - } - } - } - - # Use the correct fields: relation_id, old_tuple_data, changed_key_tuple_data - delete_msg = %Messages.Delete{ - relation_id: 101, - # Example: old data provided - old_tuple_data: {1, "Bob"}, - # Example: key data not provided - changed_key_tuple_data: nil - } - - assert {:noreply, ^state} = ReplicationConnection.handle_info(delete_msg, state) - # Add assertions for side effects if needed - end - - test "handle_info ignores Begin message" do - state = @mock_state - # Use correct fields: final_lsn, commit_timestamp, xid - begin_msg = %Messages.Begin{ - final_lsn: {0, 123}, - commit_timestamp: ~U[2023-01-01 10:00:00Z], - xid: 789 - } - - assert {:noreply, ^state} = ReplicationConnection.handle_info(begin_msg, state) - end - - test "handle_info ignores Commit message" do - state = @mock_state - # Use correct fields: flags, lsn, end_lsn, commit_timestamp - commit_msg = %Messages.Commit{ - flags: [], - lsn: {0, 123}, - end_lsn: {0, 456}, - commit_timestamp: ~U[2023-01-01 10:00:01Z] - } - - assert {:noreply, ^state} = ReplicationConnection.handle_info(commit_msg, state) - end - - test "handle_info ignores Origin message" do - state = @mock_state - # Use correct fields: origin_commit_lsn, name - origin_msg = %Messages.Origin{origin_commit_lsn: {0, 1}, name: "origin_name"} - assert {:noreply, ^state} = ReplicationConnection.handle_info(origin_msg, state) - end - - test "handle_info ignores Truncate message" do - state = @mock_state - # Use correct fields: number_of_relations, options, truncated_relations - truncate_msg = %Messages.Truncate{ - number_of_relations: 2, - options: [:cascade], - truncated_relations: [101, 102] - } - - assert {:noreply, ^state} = ReplicationConnection.handle_info(truncate_msg, state) - end - - test "handle_info ignores Type message" do - state = @mock_state - # Use correct fields: id, namespace, name - type_msg = %Messages.Type{id: 23, namespace: "pg_catalog", name: "int4"} - assert {:noreply, ^state} = ReplicationConnection.handle_info(type_msg, state) - end - - test "handle_info returns noreply for Unsupported message" do - state = @mock_state - unsupported_msg = %Messages.Unsupported{data: <<1, 2, 3>>} - # We cannot easily verify Logger.warning was called without mocks/capture. - assert {:noreply, ^state} = ReplicationConnection.handle_info(unsupported_msg, state) - end - - test "handle_info handles :shutdown message" do - state = @mock_state - # Expect :disconnect tuple based on common GenServer patterns for shutdown - assert {:stop, :normal, ^state} = ReplicationConnection.handle_info(:shutdown, state) - # Note: The original test asserted {:disconnect, :normal}. {:stop, :normal, state} is - # the standard GenServer return for a clean stop triggered by handle_info. Adjust - # if your implementation specifically returns :disconnect. - end - - test "handle_info handles :DOWN message from monitored process" do - state = @mock_state - monitor_ref = make_ref() - # Example DOWN message structure - down_msg = {:DOWN, monitor_ref, :process, :some_pid, :shutdown} - - # Expect the server to stop itself upon receiving DOWN for a critical process - assert {:stop, :normal, ^state} = ReplicationConnection.handle_info(down_msg, state) - # Again, adjust the expected return (:disconnect vs :stop) based on implementation. - end - - test "handle_info ignores other messages" do - state = @mock_state - random_msg = {:some_other_info, "data"} - assert {:noreply, ^state} = ReplicationConnection.handle_info(random_msg, state) - end - end - - # --- Moved handle_disconnect test to its own describe block --- - describe "handle_disconnect/1" do - test "handle_disconnect resets step to :disconnected and logs warning" do - state = %{@mock_state | step: :streaming} - expected_state = %{state | step: :disconnected} - - # Capture log to verify warning (requires ExUnit config) - log_output = - ExUnit.CaptureLog.capture_log(fn -> - assert {:noreply, ^expected_state} = ReplicationConnection.handle_disconnect(state) - end) - - assert log_output =~ "Replication connection disconnected." - # Or match the exact log message if needed - end - end -end