diff --git a/elixir/apps/domain/lib/domain/change_logs/change_log/changeset.ex b/elixir/apps/domain/lib/domain/change_logs/change_log/changeset.ex index ee2482fc4..608e6263b 100644 --- a/elixir/apps/domain/lib/domain/change_logs/change_log/changeset.ex +++ b/elixir/apps/domain/lib/domain/change_logs/change_log/changeset.ex @@ -11,6 +11,7 @@ defmodule Domain.ChangeLogs.ChangeLog.Changeset do |> validate_same_account() |> put_account_id() |> validate_required([:account_id, :lsn, :table, :op, :vsn]) + |> unique_constraint(:lsn) |> foreign_key_constraint(:account_id, name: :change_logs_account_id_fkey) end diff --git a/elixir/apps/domain/lib/domain/change_logs/replication_connection.ex b/elixir/apps/domain/lib/domain/change_logs/replication_connection.ex index 92242a19e..687f0b7ad 100644 --- a/elixir/apps/domain/lib/domain/change_logs/replication_connection.ex +++ b/elixir/apps/domain/lib/domain/change_logs/replication_connection.ex @@ -55,8 +55,9 @@ defmodule Domain.ChangeLogs.ReplicationConnection do :ok {:error, %Ecto.Changeset{errors: errors} = changeset} -> - if foreign_key_error?(errors) do - # Expected under normal operation when an account is deleted + if Enum.any?(errors, &should_skip_change_log?/1) do + # Expected under normal operation when an account is deleted or we are catching up on + # already-processed but not acknowledged WAL data. :ok else Logger.warning("Failed to create change log", @@ -75,9 +76,15 @@ defmodule Domain.ChangeLogs.ReplicationConnection do end end - defp foreign_key_error?(errors) do - Enum.any?(errors, fn {field, {message, _}} -> - field == :account_id and message == "does not exist" - end) + defp should_skip_change_log?({:account_id, {"does not exist", _violations}}) do + true + end + + defp should_skip_change_log?({:lsn, {"has already been taken", _violations}}) do + true + end + + defp should_skip_change_log?(_error) do + false end end diff --git a/elixir/apps/domain/priv/repo/migrations/20250627171443_unique_index_lsn_on_change_logs.exs b/elixir/apps/domain/priv/repo/migrations/20250627171443_unique_index_lsn_on_change_logs.exs new file mode 100644 index 000000000..d424df130 --- /dev/null +++ b/elixir/apps/domain/priv/repo/migrations/20250627171443_unique_index_lsn_on_change_logs.exs @@ -0,0 +1,20 @@ +defmodule Domain.Repo.Migrations.UniqueIndexLsnOnChangeLogs do + use Ecto.Migration + + def up do + execute(""" + DELETE FROM change_logs + WHERE (lsn, inserted_at) NOT IN ( + SELECT lsn, MIN(inserted_at) + FROM change_logs + GROUP BY lsn + ) + """) + + create(index(:change_logs, :lsn, unique: true)) + end + + def down do + drop(index(:change_logs, :lsn, unique: true)) + end +end diff --git a/elixir/apps/domain/test/domain/change_logs/replication_connection_test.exs b/elixir/apps/domain/test/domain/change_logs/replication_connection_test.exs index 87d3dbd0f..39b856134 100644 --- a/elixir/apps/domain/test/domain/change_logs/replication_connection_test.exs +++ b/elixir/apps/domain/test/domain/change_logs/replication_connection_test.exs @@ -54,10 +54,10 @@ defmodule Domain.ChangeLogs.ReplicationConnectionTest do %{"id" => Ecto.UUID.generate(), "name" => "test actor", "account_id" => account.id}} ] - for {table, data} <- test_cases do + for {{table, data}, idx} <- Enum.with_index(test_cases) do initial_count = Repo.aggregate(ChangeLog, :count, :id) - assert :ok = on_insert(0, table, data) + assert :ok = on_insert(idx, table, data) final_count = Repo.aggregate(ChangeLog, :count, :id) assert final_count == initial_count + 1 @@ -295,10 +295,10 @@ defmodule Domain.ChangeLogs.ReplicationConnectionTest do assert :ok = on_insert(0, table, initial_data) # Update - assert :ok = on_update(0, table, initial_data, updated_data) + assert :ok = on_update(1, table, initial_data, updated_data) # Delete - assert :ok = on_delete(0, table, updated_data) + assert :ok = on_delete(2, table, updated_data) # Get the three most recent records in reverse chronological order logs = @@ -350,8 +350,8 @@ defmodule Domain.ChangeLogs.ReplicationConnectionTest do for data <- test_data_sets do assert :ok = on_insert(0, "flows", data) - assert :ok = on_update(0, "flows", data, data) - assert :ok = on_delete(0, "flows", data) + assert :ok = on_update(1, "flows", data, data) + assert :ok = on_delete(2, "flows", data) end # No records should have been created diff --git a/elixir/apps/domain/test/domain/change_logs_test.exs b/elixir/apps/domain/test/domain/change_logs_test.exs index a7d4037e5..48b8cf85d 100644 --- a/elixir/apps/domain/test/domain/change_logs_test.exs +++ b/elixir/apps/domain/test/domain/change_logs_test.exs @@ -73,6 +73,28 @@ defmodule Domain.ChangeLogsTest do assert changeset.errors[:table] == {"can't be blank", [validation: :required]} end + test "prevents inserting duplicate lsn", %{account: account} do + attrs = %{ + lsn: 1, + table: "resources", + op: :insert, + old_data: nil, + data: %{"account_id" => account.id, "key" => "value"}, + vsn: 1 + } + + assert {:ok, _change_log} = create_change_log(attrs) + + dupe_lsn_attrs = Map.put(attrs, :data, %{"account_id" => account.id, "key" => "new_value"}) + + assert {:error, changeset} = create_change_log(dupe_lsn_attrs) + assert changeset.valid? == false + + assert changeset.errors[:lsn] == + {"has already been taken", + [constraint: :unique, constraint_name: "change_logs_lsn_index"]} + end + test "requires op field to be one of :insert, :update, :delete", %{account: account} do attrs = %{ lsn: 1, @@ -118,7 +140,7 @@ defmodule Domain.ChangeLogsTest do # Valid combination: :update with both old_data and data present attrs = %{ - lsn: 1, + lsn: 2, table: "resources", op: :update, old_data: %{"account_id" => account.id, "key" => "old_value"}, @@ -130,7 +152,7 @@ defmodule Domain.ChangeLogsTest do # Valid combination: :delete with old_data present and data nil attrs = %{ - lsn: 1, + lsn: 3, table: "resources", op: :delete, old_data: %{"account_id" => account.id, "key" => "old_value"},