From e5b2af1d4e2833beaa25d98fe719ee2725ce923a Mon Sep 17 00:00:00 2001 From: Jamil Date: Wed, 6 Aug 2025 15:19:30 -0400 Subject: [PATCH] chore(portal): add ChangeLogs.truncate/2 and tests (#10155) In preparation to delete old change_logs based on account and insertion time, we introduce a simple `truncate` function that removes old change logs past a cutoff date. Related: https://github.com/firezone/firezone/issues/10146 --------- Signed-off-by: Jamil --- elixir/apps/domain/lib/domain/change_logs.ex | 9 +- .../domain/change_logs/change_log/query.ex | 25 ++ .../domain/test/domain/change_logs_test.exs | 217 ++++++++++++++++++ 3 files changed, 250 insertions(+), 1 deletion(-) create mode 100644 elixir/apps/domain/lib/domain/change_logs/change_log/query.ex diff --git a/elixir/apps/domain/lib/domain/change_logs.ex b/elixir/apps/domain/lib/domain/change_logs.ex index 1a78708ea..c45581cf7 100644 --- a/elixir/apps/domain/lib/domain/change_logs.ex +++ b/elixir/apps/domain/lib/domain/change_logs.ex @@ -1,6 +1,6 @@ defmodule Domain.ChangeLogs do alias Domain.ChangeLogs.ChangeLog - alias Domain.Repo + alias Domain.{Accounts, Repo} def bulk_insert(list_of_attrs) do ChangeLog @@ -9,4 +9,11 @@ defmodule Domain.ChangeLogs do conflict_target: [:lsn] ) end + + def truncate(%Accounts.Account{} = account, %DateTime{} = cutoff) do + ChangeLog.Query.all() + |> ChangeLog.Query.by_account_id(account.id) + |> ChangeLog.Query.before_cutoff(cutoff) + |> Repo.delete_all() + end end diff --git a/elixir/apps/domain/lib/domain/change_logs/change_log/query.ex b/elixir/apps/domain/lib/domain/change_logs/change_log/query.ex new file mode 100644 index 000000000..6f5703e52 --- /dev/null +++ b/elixir/apps/domain/lib/domain/change_logs/change_log/query.ex @@ -0,0 +1,25 @@ +defmodule Domain.ChangeLogs.ChangeLog.Query do + use Domain, :query + + def all do + Domain.ChangeLogs.ChangeLog + end + + def by_account_id(queryable, account_id) do + queryable + |> where([c], c.account_id == ^account_id) + end + + # Note: This will return change_logs that were inserted before this date, which means it will + # omit change_logs that were generated before the cut off but inserted after it. In practice, + # this likely is not a major issue since: + # (1) our replication lag should be fairly low + # (2) at worst, we will omit changes older than the cut + # + # The "fix" is to add a commit_timestamp to change_logs and use that instead. + # However, that adds a non-trivial amount of complexity to the ingestion processor. + def before_cutoff(queryable, %DateTime{} = cutoff) do + queryable + |> where([c], c.inserted_at < ^cutoff) + end +end diff --git a/elixir/apps/domain/test/domain/change_logs_test.exs b/elixir/apps/domain/test/domain/change_logs_test.exs index f9af0b863..77cd4eabd 100644 --- a/elixir/apps/domain/test/domain/change_logs_test.exs +++ b/elixir/apps/domain/test/domain/change_logs_test.exs @@ -1,6 +1,7 @@ defmodule Domain.ChangeLogsTest do use Domain.DataCase, async: true import Domain.ChangeLogs + alias Domain.ChangeLogs.ChangeLog describe "bulk_insert/1" do setup do @@ -130,4 +131,220 @@ defmodule Domain.ChangeLogsTest do end end end + + describe "truncate/2" do + setup do + account1 = Fixtures.Accounts.create_account() + account2 = Fixtures.Accounts.create_account() + %{account1: account1, account2: account2} + end + + test "deletes change logs before cutoff for specific account", %{ + account1: account1 + } do + now = DateTime.utc_now() + + # Create some old records (before cutoff) + old_attrs = [ + %{ + account_id: account1.id, + lsn: 1, + table: "resources", + op: :insert, + old_data: nil, + data: %{"id" => "1"}, + vsn: 1, + inserted_at: now |> DateTime.add(-10, :second) + }, + %{ + account_id: account1.id, + lsn: 2, + table: "resources", + op: :update, + old_data: %{"id" => "1", "name" => "old"}, + data: %{"id" => "1", "name" => "new"}, + vsn: 1, + inserted_at: now |> DateTime.add(-10, :second) + } + ] + + assert {2, nil} = bulk_insert(old_attrs) + + # Create some new records (after cutoff) + new_attrs = %{ + account_id: account1.id, + lsn: 3, + table: "resources", + op: :delete, + old_data: %{"id" => "1"}, + data: nil, + vsn: 1, + inserted_at: now |> DateTime.add(10, :second) + } + + assert {1, nil} = bulk_insert([new_attrs]) + + # Truncate old records + assert {2, nil} = truncate(account1, now) + + # Verify only the new record remains + remaining = Repo.all(ChangeLog.Query.by_account_id(ChangeLog.Query.all(), account1.id)) + assert length(remaining) == 1 + assert hd(remaining).lsn == 3 + end + + test "does not delete records from other accounts", %{account1: account1, account2: account2} do + now = DateTime.utc_now() + + # Create records for both accounts before cutoff + account1_attrs = %{ + account_id: account1.id, + lsn: 1, + table: "resources", + op: :insert, + old_data: nil, + data: %{"id" => "1"}, + vsn: 1, + inserted_at: now |> DateTime.add(-10, :second) + } + + account2_attrs = %{ + account_id: account2.id, + lsn: 2, + table: "resources", + op: :insert, + old_data: nil, + data: %{"id" => "2"}, + vsn: 1, + inserted_at: now |> DateTime.add(-10, :second) + } + + assert {1, nil} = bulk_insert([account1_attrs]) + assert {1, nil} = bulk_insert([account2_attrs]) + + # Truncate only account1's records + assert {1, nil} = truncate(account1, DateTime.utc_now()) + + # Verify account1's records are gone + account1_remaining = + Repo.all(ChangeLog.Query.by_account_id(ChangeLog.Query.all(), account1.id)) + + assert length(account1_remaining) == 0 + + # Verify account2's records remain + account2_remaining = + Repo.all(ChangeLog.Query.by_account_id(ChangeLog.Query.all(), account2.id)) + + assert length(account2_remaining) == 1 + assert hd(account2_remaining).lsn == 2 + end + + test "does not delete records inserted after cutoff", %{account1: account1} do + now = DateTime.utc_now() + + # Create record before cutoff + old_attrs = %{ + account_id: account1.id, + lsn: 1, + table: "resources", + op: :insert, + old_data: nil, + data: %{"id" => "1"}, + vsn: 1, + inserted_at: now |> DateTime.add(-10, :second) + } + + assert {1, nil} = bulk_insert([old_attrs]) + + # Create record after cutoff + new_attrs = %{ + account_id: account1.id, + lsn: 2, + table: "resources", + op: :insert, + old_data: nil, + data: %{"id" => "2"}, + vsn: 1, + inserted_at: now |> DateTime.add(10, :second) + } + + assert {1, nil} = bulk_insert([new_attrs]) + + # Truncate should only delete the old record + assert {1, nil} = truncate(account1, now) + + # Verify only the new record remains + remaining = Repo.all(ChangeLog.Query.by_account_id(ChangeLog.Query.all(), account1.id)) + assert length(remaining) == 1 + assert hd(remaining).lsn == 2 + end + + test "returns {0, nil} when no records match criteria", %{account1: account1} do + now = DateTime.utc_now() + + # Create record right at cutoff + attrs = %{ + account_id: account1.id, + lsn: 1, + table: "resources", + op: :insert, + old_data: nil, + data: %{"id" => "1"}, + vsn: 1, + inserted_at: now + } + + assert {1, nil} = bulk_insert([attrs]) + + # Truncate with cutoff before any records + assert {0, nil} = truncate(account1, now) + + # Verify record still exists + remaining = Repo.all(ChangeLog.Query.by_account_id(ChangeLog.Query.all(), account1.id)) + assert length(remaining) == 1 + end + + test "handles empty table gracefully", %{account1: account1} do + cutoff = DateTime.utc_now() + + # No records exist + assert {0, nil} = truncate(account1, cutoff) + end + + test "deletes all records when cutoff is in the future", %{account1: account1} do + # Create some records + attrs = [ + %{ + account_id: account1.id, + lsn: 1, + table: "resources", + op: :insert, + old_data: nil, + data: %{"id" => "1"}, + vsn: 1 + }, + %{ + account_id: account1.id, + lsn: 2, + table: "resources", + op: :insert, + old_data: nil, + data: %{"id" => "2"}, + vsn: 1 + } + ] + + assert {2, nil} = bulk_insert(attrs) + + # Set cutoff far in the future + future_cutoff = DateTime.utc_now() |> DateTime.add(1, :hour) + + # All records should be deleted + assert {2, nil} = truncate(account1, future_cutoff) + + # Verify no records remain + remaining = Repo.all(ChangeLog.Query.by_account_id(ChangeLog.Query.all(), account1.id)) + assert length(remaining) == 0 + end + end end