chore(portal): handle lag up to 30m (#9681)

Now that we know the bypass system works, it might be a good idea to
allow it to lag data up to 30m so that events accrued during deploys are
not lost.

Also, this PR fixes a small bug where we triggered the threshold _after_
a transaction already committed (`COMMIT`), instead of before the data
came through (`BEGIN`). Since the timestamps are identical (see below),
it would be more accurate to read the timestamp of the transaction
before acting on the data contained within.

```
[(domain 0.1.0+dev) lib/domain/change_logs/replication_connection.ex:4: Domain.ChangeLogs.ReplicationConnection.handle_message/3]
"BEGIN #{commit_timestamp}" #=> "BEGIN 2025-06-26 04:22:45.283151Z"

[(domain 0.1.0+dev) lib/domain/change_logs/replication_connection.ex:4: Domain.ChangeLogs.ReplicationConnection.handle_message/3]
"END #{commit_timestamp}" #=> "END 2025-06-26 04:22:45.283151Z"
```

---------

Signed-off-by: Jamil <jamilbk@users.noreply.github.com>
Co-authored-by: Brian Manifold <bmanifold@users.noreply.github.com>
This commit is contained in:
Jamil
2025-06-26 06:38:40 -07:00
committed by GitHub
parent 59fa7fa4f1
commit fbf48a207a
3 changed files with 28 additions and 27 deletions

View File

@@ -2,11 +2,11 @@ defmodule Domain.Events.ReplicationConnection do
alias Domain.Events.Hooks
use Domain.Replication.Connection,
# Allow up to 5 seconds of lag before alerting
warning_threshold_ms: 5_000,
# Allow up to 60 seconds of lag before alerting
warning_threshold_ms: 60 * 1_000,
# Allow up to 5 minutes of lag before bypassing hooks
error_threshold_ms: 300_000
# Allow up to 30 minutes of lag before bypassing hooks
error_threshold_ms: 30 * 60 * 1_000
require Logger

View File

@@ -17,7 +17,7 @@ defmodule Domain.Replication.Connection do
defmodule MyApp.ReplicationConnection do
use Domain.Replication.Connection,
warning_threshold_ms: 30_000,
error_threshold_ms: 60_000,
error_threshold_ms: 60 * 1_000
end
## Options
@@ -465,12 +465,8 @@ defmodule Domain.Replication.Connection do
# Extract transaction and ignored message handlers
defp transaction_handlers do
quote do
defp handle_message(%Decoder.Messages.Begin{} = msg, server_wal_end, state) do
{:noreply, [], state}
end
defp handle_message(
%Decoder.Messages.Commit{commit_timestamp: commit_timestamp} = msg,
%Decoder.Messages.Begin{commit_timestamp: commit_timestamp} = msg,
_server_wal_end,
state
) do
@@ -482,6 +478,14 @@ defmodule Domain.Replication.Connection do
{:noreply, [], state}
end
defp handle_message(
%Decoder.Messages.Commit{commit_timestamp: commit_timestamp},
_server_wal_end,
state
) do
{:noreply, [], state}
end
end
end

View File

@@ -331,19 +331,18 @@ defmodule Domain.Replication.ConnectionTest do
server_wal_start = 123_456_789
server_wal_end = 987_654_321
server_system_clock = 1_234_567_890
flags = <<0>>
lsn = <<0::32, 100::32>>
end_lsn = <<0::32, 200::32>>
xid = <<0::32>>
# Simulate a commit timestamp that exceeds the threshold
timestamp =
DateTime.diff(DateTime.utc_now(), ~U[2000-01-01 00:00:00Z], :microsecond) - 10_000_000
commit_data = <<?C, flags::binary, lsn::binary, end_lsn::binary, timestamp::64>>
begin_data = <<?B, lsn::binary, timestamp::64, xid::binary>>
write_message =
<<?w, server_wal_start::64, server_wal_end::64, server_system_clock::64,
commit_data::binary>>
begin_data::binary>>
assert {:noreply, [], _state} =
TestReplicationConnection.handle_data(write_message, state)
@@ -360,18 +359,18 @@ defmodule Domain.Replication.ConnectionTest do
server_wal_start = 123_456_789
server_wal_end = 987_654_321
server_system_clock = 1_234_567_890
flags = <<0>>
lsn = <<0::32, 100::32>>
end_lsn = <<0::32, 200::32>>
xid = <<0::32>>
# Simulate a commit timestamp that is within the threshold
timestamp =
DateTime.diff(DateTime.utc_now(), ~U[2000-01-01 00:00:00Z], :microsecond) - 1_000_000
commit_data = <<?C, flags::binary, lsn::binary, end_lsn::binary, timestamp::64>>
begin_data = <<?B, lsn::binary, timestamp::64, xid::binary>>
write_message =
<<?w, server_wal_start::64, server_wal_end::64, server_system_clock::64,
commit_data::binary>>
begin_data::binary>>
assert {:noreply, [], _state} =
TestReplicationConnection.handle_data(write_message, state)
@@ -481,7 +480,7 @@ defmodule Domain.Replication.ConnectionTest do
end
end
describe "commit message lag tracking with error threshold" do
describe "BEGIN message lag tracking with error threshold" do
test "sends both check_warning_threshold and check_error_threshold messages" do
state =
%{mock_state() | step: :streaming}
@@ -491,19 +490,18 @@ defmodule Domain.Replication.ConnectionTest do
server_wal_start = 123_456_789
server_wal_end = 987_654_321
server_system_clock = 1_234_567_890
flags = <<0>>
lsn = <<0::32, 100::32>>
end_lsn = <<0::32, 200::32>>
xid = <<0::32>>
# Simulate a commit timestamp that exceeds both thresholds (70 seconds lag)
timestamp =
DateTime.diff(DateTime.utc_now(), ~U[2000-01-01 00:00:00Z], :microsecond) - 70_000_000
commit_data = <<?C, flags::binary, lsn::binary, end_lsn::binary, timestamp::64>>
begin_data = <<?B, lsn::binary, timestamp::64, xid::binary>>
write_message =
<<?w, server_wal_start::64, server_wal_end::64, server_system_clock::64,
commit_data::binary>>
begin_data::binary>>
assert {:noreply, [], _state} =
TestReplicationConnection.handle_data(write_message, state)
@@ -528,19 +526,18 @@ defmodule Domain.Replication.ConnectionTest do
server_wal_start = 123_456_789
server_wal_end = 987_654_321
server_system_clock = 1_234_567_890
flags = <<0>>
lsn = <<0::32, 100::32>>
end_lsn = <<0::32, 200::32>>
xid = <<0::32>>
# Simulate a commit timestamp with moderate lag (10 seconds)
timestamp =
DateTime.diff(DateTime.utc_now(), ~U[2000-01-01 00:00:00Z], :microsecond) - 10_000_000
commit_data = <<?C, flags::binary, lsn::binary, end_lsn::binary, timestamp::64>>
begin_data = <<?B, lsn::binary, timestamp::64, xid::binary>>
write_message =
<<?w, server_wal_start::64, server_wal_end::64, server_system_clock::64,
commit_data::binary>>
begin_data::binary>>
assert {:noreply, [], _state} =
TestReplicationConnection.handle_data(write_message, state)