chore(portal): add unique index to lsn (#9699)

This commit is contained in:
Jamil
2025-06-27 13:58:20 -07:00
committed by GitHub
parent dddd1b57fc
commit 3760536afd
5 changed files with 64 additions and 14 deletions

View File

@@ -11,6 +11,7 @@ defmodule Domain.ChangeLogs.ChangeLog.Changeset do
|> validate_same_account()
|> put_account_id()
|> validate_required([:account_id, :lsn, :table, :op, :vsn])
|> unique_constraint(:lsn)
|> foreign_key_constraint(:account_id, name: :change_logs_account_id_fkey)
end

View File

@@ -55,8 +55,9 @@ defmodule Domain.ChangeLogs.ReplicationConnection do
:ok
{:error, %Ecto.Changeset{errors: errors} = changeset} ->
if foreign_key_error?(errors) do
# Expected under normal operation when an account is deleted
if Enum.any?(errors, &should_skip_change_log?/1) do
# Expected under normal operation when an account is deleted or we are catching up on
# already-processed but not acknowledged WAL data.
:ok
else
Logger.warning("Failed to create change log",
@@ -75,9 +76,15 @@ defmodule Domain.ChangeLogs.ReplicationConnection do
end
end
defp foreign_key_error?(errors) do
Enum.any?(errors, fn {field, {message, _}} ->
field == :account_id and message == "does not exist"
end)
defp should_skip_change_log?({:account_id, {"does not exist", _violations}}) do
true
end
defp should_skip_change_log?({:lsn, {"has already been taken", _violations}}) do
true
end
defp should_skip_change_log?(_error) do
false
end
end

View File

@@ -0,0 +1,20 @@
defmodule Domain.Repo.Migrations.UniqueIndexLsnOnChangeLogs do
use Ecto.Migration
def up do
execute("""
DELETE FROM change_logs
WHERE (lsn, inserted_at) NOT IN (
SELECT lsn, MIN(inserted_at)
FROM change_logs
GROUP BY lsn
)
""")
create(index(:change_logs, :lsn, unique: true))
end
def down do
drop(index(:change_logs, :lsn, unique: true))
end
end

View File

@@ -54,10 +54,10 @@ defmodule Domain.ChangeLogs.ReplicationConnectionTest do
%{"id" => Ecto.UUID.generate(), "name" => "test actor", "account_id" => account.id}}
]
for {table, data} <- test_cases do
for {{table, data}, idx} <- Enum.with_index(test_cases) do
initial_count = Repo.aggregate(ChangeLog, :count, :id)
assert :ok = on_insert(0, table, data)
assert :ok = on_insert(idx, table, data)
final_count = Repo.aggregate(ChangeLog, :count, :id)
assert final_count == initial_count + 1
@@ -295,10 +295,10 @@ defmodule Domain.ChangeLogs.ReplicationConnectionTest do
assert :ok = on_insert(0, table, initial_data)
# Update
assert :ok = on_update(0, table, initial_data, updated_data)
assert :ok = on_update(1, table, initial_data, updated_data)
# Delete
assert :ok = on_delete(0, table, updated_data)
assert :ok = on_delete(2, table, updated_data)
# Get the three most recent records in reverse chronological order
logs =
@@ -350,8 +350,8 @@ defmodule Domain.ChangeLogs.ReplicationConnectionTest do
for data <- test_data_sets do
assert :ok = on_insert(0, "flows", data)
assert :ok = on_update(0, "flows", data, data)
assert :ok = on_delete(0, "flows", data)
assert :ok = on_update(1, "flows", data, data)
assert :ok = on_delete(2, "flows", data)
end
# No records should have been created

View File

@@ -73,6 +73,28 @@ defmodule Domain.ChangeLogsTest do
assert changeset.errors[:table] == {"can't be blank", [validation: :required]}
end
test "prevents inserting duplicate lsn", %{account: account} do
attrs = %{
lsn: 1,
table: "resources",
op: :insert,
old_data: nil,
data: %{"account_id" => account.id, "key" => "value"},
vsn: 1
}
assert {:ok, _change_log} = create_change_log(attrs)
dupe_lsn_attrs = Map.put(attrs, :data, %{"account_id" => account.id, "key" => "new_value"})
assert {:error, changeset} = create_change_log(dupe_lsn_attrs)
assert changeset.valid? == false
assert changeset.errors[:lsn] ==
{"has already been taken",
[constraint: :unique, constraint_name: "change_logs_lsn_index"]}
end
test "requires op field to be one of :insert, :update, :delete", %{account: account} do
attrs = %{
lsn: 1,
@@ -118,7 +140,7 @@ defmodule Domain.ChangeLogsTest do
# Valid combination: :update with both old_data and data present
attrs = %{
lsn: 1,
lsn: 2,
table: "resources",
op: :update,
old_data: %{"account_id" => account.id, "key" => "old_value"},
@@ -130,7 +152,7 @@ defmodule Domain.ChangeLogsTest do
# Valid combination: :delete with old_data present and data nil
attrs = %{
lsn: 1,
lsn: 3,
table: "resources",
op: :delete,
old_data: %{"account_id" => account.id, "key" => "old_value"},