diff --git a/elixir/apps/domain/lib/domain/events/replication_connection.ex b/elixir/apps/domain/lib/domain/events/replication_connection.ex index bd5fae52e..e1401dd95 100644 --- a/elixir/apps/domain/lib/domain/events/replication_connection.ex +++ b/elixir/apps/domain/lib/domain/events/replication_connection.ex @@ -2,11 +2,11 @@ defmodule Domain.Events.ReplicationConnection do alias Domain.Events.Hooks use Domain.Replication.Connection, - # Allow up to 5 seconds of lag before alerting - warning_threshold_ms: 5_000, + # Allow up to 60 seconds of lag before alerting + warning_threshold_ms: 60 * 1_000, - # Allow up to 5 minutes of lag before bypassing hooks - error_threshold_ms: 300_000 + # Allow up to 30 minutes of lag before bypassing hooks + error_threshold_ms: 30 * 60 * 1_000 require Logger diff --git a/elixir/apps/domain/lib/domain/replication/connection.ex b/elixir/apps/domain/lib/domain/replication/connection.ex index 124ccbd0c..e404c6ed6 100644 --- a/elixir/apps/domain/lib/domain/replication/connection.ex +++ b/elixir/apps/domain/lib/domain/replication/connection.ex @@ -17,7 +17,7 @@ defmodule Domain.Replication.Connection do defmodule MyApp.ReplicationConnection do use Domain.Replication.Connection, warning_threshold_ms: 30_000, - error_threshold_ms: 60_000, + error_threshold_ms: 60 * 1_000 end ## Options @@ -465,12 +465,8 @@ defmodule Domain.Replication.Connection do # Extract transaction and ignored message handlers defp transaction_handlers do quote do - defp handle_message(%Decoder.Messages.Begin{} = msg, server_wal_end, state) do - {:noreply, [], state} - end - defp handle_message( - %Decoder.Messages.Commit{commit_timestamp: commit_timestamp} = msg, + %Decoder.Messages.Begin{commit_timestamp: commit_timestamp} = msg, _server_wal_end, state ) do @@ -482,6 +478,14 @@ defmodule Domain.Replication.Connection do {:noreply, [], state} end + + defp handle_message( + %Decoder.Messages.Commit{commit_timestamp: commit_timestamp}, + _server_wal_end, + state + ) do + {:noreply, [], state} + end end end diff --git a/elixir/apps/domain/test/domain/replication/connection_test.exs b/elixir/apps/domain/test/domain/replication/connection_test.exs index 5b00e9bf2..6e216541b 100644 --- a/elixir/apps/domain/test/domain/replication/connection_test.exs +++ b/elixir/apps/domain/test/domain/replication/connection_test.exs @@ -331,19 +331,18 @@ defmodule Domain.Replication.ConnectionTest do server_wal_start = 123_456_789 server_wal_end = 987_654_321 server_system_clock = 1_234_567_890 - flags = <<0>> lsn = <<0::32, 100::32>> - end_lsn = <<0::32, 200::32>> + xid = <<0::32>> # Simulate a commit timestamp that exceeds the threshold timestamp = DateTime.diff(DateTime.utc_now(), ~U[2000-01-01 00:00:00Z], :microsecond) - 10_000_000 - commit_data = <> + begin_data = <> write_message = <> + begin_data::binary>> assert {:noreply, [], _state} = TestReplicationConnection.handle_data(write_message, state) @@ -360,18 +359,18 @@ defmodule Domain.Replication.ConnectionTest do server_wal_start = 123_456_789 server_wal_end = 987_654_321 server_system_clock = 1_234_567_890 - flags = <<0>> lsn = <<0::32, 100::32>> - end_lsn = <<0::32, 200::32>> + xid = <<0::32>> + # Simulate a commit timestamp that is within the threshold timestamp = DateTime.diff(DateTime.utc_now(), ~U[2000-01-01 00:00:00Z], :microsecond) - 1_000_000 - commit_data = <> + begin_data = <> write_message = <> + begin_data::binary>> assert {:noreply, [], _state} = TestReplicationConnection.handle_data(write_message, state) @@ -481,7 +480,7 @@ defmodule Domain.Replication.ConnectionTest do end end - describe "commit message lag tracking with error threshold" do + describe "BEGIN message lag tracking with error threshold" do test "sends both check_warning_threshold and check_error_threshold messages" do state = %{mock_state() | step: :streaming} @@ -491,19 +490,18 @@ defmodule Domain.Replication.ConnectionTest do server_wal_start = 123_456_789 server_wal_end = 987_654_321 server_system_clock = 1_234_567_890 - flags = <<0>> lsn = <<0::32, 100::32>> - end_lsn = <<0::32, 200::32>> + xid = <<0::32>> # Simulate a commit timestamp that exceeds both thresholds (70 seconds lag) timestamp = DateTime.diff(DateTime.utc_now(), ~U[2000-01-01 00:00:00Z], :microsecond) - 70_000_000 - commit_data = <> + begin_data = <> write_message = <> + begin_data::binary>> assert {:noreply, [], _state} = TestReplicationConnection.handle_data(write_message, state) @@ -528,19 +526,18 @@ defmodule Domain.Replication.ConnectionTest do server_wal_start = 123_456_789 server_wal_end = 987_654_321 server_system_clock = 1_234_567_890 - flags = <<0>> lsn = <<0::32, 100::32>> - end_lsn = <<0::32, 200::32>> + xid = <<0::32>> # Simulate a commit timestamp with moderate lag (10 seconds) timestamp = DateTime.diff(DateTime.utc_now(), ~U[2000-01-01 00:00:00Z], :microsecond) - 10_000_000 - commit_data = <> + begin_data = <> write_message = <> + begin_data::binary>> assert {:noreply, [], _state} = TestReplicationConnection.handle_data(write_message, state)