mirror of
https://github.com/outbackdingo/firezone.git
synced 2026-01-27 10:18:54 +00:00
chore(portal): add ChangeLogs.truncate/2 and tests (#10155)
In preparation to delete old change_logs based on account and insertion time, we introduce a simple `truncate` function that removes old change logs past a cutoff date. Related: https://github.com/firezone/firezone/issues/10146 --------- Signed-off-by: Jamil <jamilbk@users.noreply.github.com>
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
defmodule Domain.ChangeLogs do
|
||||
alias Domain.ChangeLogs.ChangeLog
|
||||
alias Domain.Repo
|
||||
alias Domain.{Accounts, Repo}
|
||||
|
||||
def bulk_insert(list_of_attrs) do
|
||||
ChangeLog
|
||||
@@ -9,4 +9,11 @@ defmodule Domain.ChangeLogs do
|
||||
conflict_target: [:lsn]
|
||||
)
|
||||
end
|
||||
|
||||
def truncate(%Accounts.Account{} = account, %DateTime{} = cutoff) do
|
||||
ChangeLog.Query.all()
|
||||
|> ChangeLog.Query.by_account_id(account.id)
|
||||
|> ChangeLog.Query.before_cutoff(cutoff)
|
||||
|> Repo.delete_all()
|
||||
end
|
||||
end
|
||||
|
||||
@@ -0,0 +1,25 @@
|
||||
defmodule Domain.ChangeLogs.ChangeLog.Query do
|
||||
use Domain, :query
|
||||
|
||||
def all do
|
||||
Domain.ChangeLogs.ChangeLog
|
||||
end
|
||||
|
||||
def by_account_id(queryable, account_id) do
|
||||
queryable
|
||||
|> where([c], c.account_id == ^account_id)
|
||||
end
|
||||
|
||||
# Note: This will return change_logs that were inserted before this date, which means it will
|
||||
# omit change_logs that were generated before the cut off but inserted after it. In practice,
|
||||
# this likely is not a major issue since:
|
||||
# (1) our replication lag should be fairly low
|
||||
# (2) at worst, we will omit changes older than the cut
|
||||
#
|
||||
# The "fix" is to add a commit_timestamp to change_logs and use that instead.
|
||||
# However, that adds a non-trivial amount of complexity to the ingestion processor.
|
||||
def before_cutoff(queryable, %DateTime{} = cutoff) do
|
||||
queryable
|
||||
|> where([c], c.inserted_at < ^cutoff)
|
||||
end
|
||||
end
|
||||
@@ -1,6 +1,7 @@
|
||||
defmodule Domain.ChangeLogsTest do
|
||||
use Domain.DataCase, async: true
|
||||
import Domain.ChangeLogs
|
||||
alias Domain.ChangeLogs.ChangeLog
|
||||
|
||||
describe "bulk_insert/1" do
|
||||
setup do
|
||||
@@ -130,4 +131,220 @@ defmodule Domain.ChangeLogsTest do
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe "truncate/2" do
|
||||
setup do
|
||||
account1 = Fixtures.Accounts.create_account()
|
||||
account2 = Fixtures.Accounts.create_account()
|
||||
%{account1: account1, account2: account2}
|
||||
end
|
||||
|
||||
test "deletes change logs before cutoff for specific account", %{
|
||||
account1: account1
|
||||
} do
|
||||
now = DateTime.utc_now()
|
||||
|
||||
# Create some old records (before cutoff)
|
||||
old_attrs = [
|
||||
%{
|
||||
account_id: account1.id,
|
||||
lsn: 1,
|
||||
table: "resources",
|
||||
op: :insert,
|
||||
old_data: nil,
|
||||
data: %{"id" => "1"},
|
||||
vsn: 1,
|
||||
inserted_at: now |> DateTime.add(-10, :second)
|
||||
},
|
||||
%{
|
||||
account_id: account1.id,
|
||||
lsn: 2,
|
||||
table: "resources",
|
||||
op: :update,
|
||||
old_data: %{"id" => "1", "name" => "old"},
|
||||
data: %{"id" => "1", "name" => "new"},
|
||||
vsn: 1,
|
||||
inserted_at: now |> DateTime.add(-10, :second)
|
||||
}
|
||||
]
|
||||
|
||||
assert {2, nil} = bulk_insert(old_attrs)
|
||||
|
||||
# Create some new records (after cutoff)
|
||||
new_attrs = %{
|
||||
account_id: account1.id,
|
||||
lsn: 3,
|
||||
table: "resources",
|
||||
op: :delete,
|
||||
old_data: %{"id" => "1"},
|
||||
data: nil,
|
||||
vsn: 1,
|
||||
inserted_at: now |> DateTime.add(10, :second)
|
||||
}
|
||||
|
||||
assert {1, nil} = bulk_insert([new_attrs])
|
||||
|
||||
# Truncate old records
|
||||
assert {2, nil} = truncate(account1, now)
|
||||
|
||||
# Verify only the new record remains
|
||||
remaining = Repo.all(ChangeLog.Query.by_account_id(ChangeLog.Query.all(), account1.id))
|
||||
assert length(remaining) == 1
|
||||
assert hd(remaining).lsn == 3
|
||||
end
|
||||
|
||||
test "does not delete records from other accounts", %{account1: account1, account2: account2} do
|
||||
now = DateTime.utc_now()
|
||||
|
||||
# Create records for both accounts before cutoff
|
||||
account1_attrs = %{
|
||||
account_id: account1.id,
|
||||
lsn: 1,
|
||||
table: "resources",
|
||||
op: :insert,
|
||||
old_data: nil,
|
||||
data: %{"id" => "1"},
|
||||
vsn: 1,
|
||||
inserted_at: now |> DateTime.add(-10, :second)
|
||||
}
|
||||
|
||||
account2_attrs = %{
|
||||
account_id: account2.id,
|
||||
lsn: 2,
|
||||
table: "resources",
|
||||
op: :insert,
|
||||
old_data: nil,
|
||||
data: %{"id" => "2"},
|
||||
vsn: 1,
|
||||
inserted_at: now |> DateTime.add(-10, :second)
|
||||
}
|
||||
|
||||
assert {1, nil} = bulk_insert([account1_attrs])
|
||||
assert {1, nil} = bulk_insert([account2_attrs])
|
||||
|
||||
# Truncate only account1's records
|
||||
assert {1, nil} = truncate(account1, DateTime.utc_now())
|
||||
|
||||
# Verify account1's records are gone
|
||||
account1_remaining =
|
||||
Repo.all(ChangeLog.Query.by_account_id(ChangeLog.Query.all(), account1.id))
|
||||
|
||||
assert length(account1_remaining) == 0
|
||||
|
||||
# Verify account2's records remain
|
||||
account2_remaining =
|
||||
Repo.all(ChangeLog.Query.by_account_id(ChangeLog.Query.all(), account2.id))
|
||||
|
||||
assert length(account2_remaining) == 1
|
||||
assert hd(account2_remaining).lsn == 2
|
||||
end
|
||||
|
||||
test "does not delete records inserted after cutoff", %{account1: account1} do
|
||||
now = DateTime.utc_now()
|
||||
|
||||
# Create record before cutoff
|
||||
old_attrs = %{
|
||||
account_id: account1.id,
|
||||
lsn: 1,
|
||||
table: "resources",
|
||||
op: :insert,
|
||||
old_data: nil,
|
||||
data: %{"id" => "1"},
|
||||
vsn: 1,
|
||||
inserted_at: now |> DateTime.add(-10, :second)
|
||||
}
|
||||
|
||||
assert {1, nil} = bulk_insert([old_attrs])
|
||||
|
||||
# Create record after cutoff
|
||||
new_attrs = %{
|
||||
account_id: account1.id,
|
||||
lsn: 2,
|
||||
table: "resources",
|
||||
op: :insert,
|
||||
old_data: nil,
|
||||
data: %{"id" => "2"},
|
||||
vsn: 1,
|
||||
inserted_at: now |> DateTime.add(10, :second)
|
||||
}
|
||||
|
||||
assert {1, nil} = bulk_insert([new_attrs])
|
||||
|
||||
# Truncate should only delete the old record
|
||||
assert {1, nil} = truncate(account1, now)
|
||||
|
||||
# Verify only the new record remains
|
||||
remaining = Repo.all(ChangeLog.Query.by_account_id(ChangeLog.Query.all(), account1.id))
|
||||
assert length(remaining) == 1
|
||||
assert hd(remaining).lsn == 2
|
||||
end
|
||||
|
||||
test "returns {0, nil} when no records match criteria", %{account1: account1} do
|
||||
now = DateTime.utc_now()
|
||||
|
||||
# Create record right at cutoff
|
||||
attrs = %{
|
||||
account_id: account1.id,
|
||||
lsn: 1,
|
||||
table: "resources",
|
||||
op: :insert,
|
||||
old_data: nil,
|
||||
data: %{"id" => "1"},
|
||||
vsn: 1,
|
||||
inserted_at: now
|
||||
}
|
||||
|
||||
assert {1, nil} = bulk_insert([attrs])
|
||||
|
||||
# Truncate with cutoff before any records
|
||||
assert {0, nil} = truncate(account1, now)
|
||||
|
||||
# Verify record still exists
|
||||
remaining = Repo.all(ChangeLog.Query.by_account_id(ChangeLog.Query.all(), account1.id))
|
||||
assert length(remaining) == 1
|
||||
end
|
||||
|
||||
test "handles empty table gracefully", %{account1: account1} do
|
||||
cutoff = DateTime.utc_now()
|
||||
|
||||
# No records exist
|
||||
assert {0, nil} = truncate(account1, cutoff)
|
||||
end
|
||||
|
||||
test "deletes all records when cutoff is in the future", %{account1: account1} do
|
||||
# Create some records
|
||||
attrs = [
|
||||
%{
|
||||
account_id: account1.id,
|
||||
lsn: 1,
|
||||
table: "resources",
|
||||
op: :insert,
|
||||
old_data: nil,
|
||||
data: %{"id" => "1"},
|
||||
vsn: 1
|
||||
},
|
||||
%{
|
||||
account_id: account1.id,
|
||||
lsn: 2,
|
||||
table: "resources",
|
||||
op: :insert,
|
||||
old_data: nil,
|
||||
data: %{"id" => "2"},
|
||||
vsn: 1
|
||||
}
|
||||
]
|
||||
|
||||
assert {2, nil} = bulk_insert(attrs)
|
||||
|
||||
# Set cutoff far in the future
|
||||
future_cutoff = DateTime.utc_now() |> DateTime.add(1, :hour)
|
||||
|
||||
# All records should be deleted
|
||||
assert {2, nil} = truncate(account1, future_cutoff)
|
||||
|
||||
# Verify no records remain
|
||||
remaining = Repo.all(ChangeLog.Query.by_account_id(ChangeLog.Query.all(), account1.id))
|
||||
assert length(remaining) == 0
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
Reference in New Issue
Block a user