diff --git a/elixir/apps/domain/lib/domain/replication/connection.ex b/elixir/apps/domain/lib/domain/replication/connection.ex index f43ba34bb..2b5a6f885 100644 --- a/elixir/apps/domain/lib/domain/replication/connection.ex +++ b/elixir/apps/domain/lib/domain/replication/connection.ex @@ -422,7 +422,7 @@ defmodule Domain.Replication.Connection do state: inspect(state) ) - {:noreply, state} + {:noreply, [], state} end end end @@ -456,7 +456,7 @@ defmodule Domain.Replication.Connection do columns: columns } - {:noreply, %{state | relations: Map.put(state.relations, id, relation)}} + {:noreply, [], %{state | relations: Map.put(state.relations, id, relation)}} end defp handle_write(%Decoder.Messages.Insert{} = msg, server_wal_end, state) do @@ -481,7 +481,7 @@ defmodule Domain.Replication.Connection do state |> on_write(server_wal_end, op, table, old_data, data) |> maybe_flush() - |> then(&{:noreply, &1}) + |> then(&{:noreply, [], &1}) end defp maybe_flush(%{flush_buffer: buffer, flush_buffer_size: size} = state) @@ -506,7 +506,7 @@ defmodule Domain.Replication.Connection do send(self(), {:check_warning_threshold, lag_ms}) send(self(), {:check_error_threshold, lag_ms}) - {:noreply, state} + {:noreply, [], state} end end end @@ -517,19 +517,19 @@ defmodule Domain.Replication.Connection do # These messages are not relevant for our use case, so we ignore them. defp handle_write(%Decoder.Messages.Commit{}, _server_wal_end, state) do - {:noreply, state} + {:noreply, [], state} end defp handle_write(%Decoder.Messages.Origin{}, _server_wal_end, state) do - {:noreply, state} + {:noreply, [], state} end defp handle_write(%Decoder.Messages.Truncate{}, _server_wal_end, state) do - {:noreply, state} + {:noreply, [], state} end defp handle_write(%Decoder.Messages.Type{}, _server_wal_end, state) do - {:noreply, state} + {:noreply, [], state} end defp handle_write(%Decoder.Messages.Unsupported{data: data}, _server_wal_end, state) do @@ -538,7 +538,7 @@ defmodule Domain.Replication.Connection do counter: state.counter ) - {:noreply, state} + {:noreply, [], state} end end end diff --git a/elixir/apps/domain/test/domain/replication/connection_test.exs b/elixir/apps/domain/test/domain/replication/connection_test.exs index bf8cd61a8..19e026083 100644 --- a/elixir/apps/domain/test/domain/replication/connection_test.exs +++ b/elixir/apps/domain/test/domain/replication/connection_test.exs @@ -491,4 +491,198 @@ defmodule Domain.Replication.ConnectionTest do assert length(new_state.relations[1].columns) == 2 end end + + describe "handle_data/2" do + test "returns correct tuple format for unknown messages" do + state = %TestReplicationConnection{counter: 15} + + # Test with binary data that doesn't match WAL message patterns + unknown_data = <<255, 254, 253>> + + log = + capture_log(fn -> + result = TestReplicationConnection.handle_data(unknown_data, state) + + # Verify return format + assert match?({:noreply, [], %TestReplicationConnection{}}, result) + + {:noreply, reply_data, new_state} = result + assert reply_data == [] + # State should be unchanged + assert new_state == state + end) + + assert log =~ "Unknown WAL message received!" + end + + test "handle_data always returns 3-tuple with :noreply" do + state = %TestReplicationConnection{counter: 0} + + # Test various binary inputs to ensure consistent return patterns + test_inputs = [ + # Single byte + <<0>>, + # Multiple bytes + <<1, 2, 3>>, + # Empty binary + <<>>, + # High value bytes + <<255, 255>> + ] + + Enum.each(test_inputs, fn input -> + result = TestReplicationConnection.handle_data(input, state) + + # All handle_data calls should return 3-tuples starting with :noreply + assert match?({:noreply, _, _}, result) + + {tag, response, returned_state} = result + assert tag == :noreply + # Should be a list (empty for unknown messages) + assert is_list(response) + assert match?(%TestReplicationConnection{}, returned_state) + end) + end + + test "handle_data preserves state structure" do + complex_state = %TestReplicationConnection{ + counter: 42, + relations: %{1 => %{name: "test"}}, + flush_buffer: %{key: "value"}, + warning_threshold_exceeded?: true, + error_threshold_exceeded?: false + } + + unknown_data = <<99, 98, 97>> + + {:noreply, _response, new_state} = + TestReplicationConnection.handle_data(unknown_data, complex_state) + + # For unknown messages, state should be preserved exactly + assert new_state == complex_state + end + + test "handle_data logs unknown messages with context" do + state = %TestReplicationConnection{counter: 123} + unknown_data = <<1, 2, 3, 4, 5>> + + log = + capture_log(fn -> + TestReplicationConnection.handle_data(unknown_data, state) + end) + + # Should log the unknown message with data and state info + assert log =~ "Unknown WAL message received!" + assert log =~ "data=" + assert log =~ "state=" + end + + test "handle_data with empty binary" do + state = %TestReplicationConnection{counter: 0} + + {:noreply, response, new_state} = TestReplicationConnection.handle_data(<<>>, state) + + assert response == [] + assert new_state == state + end + + test "handle_data error handling doesn't crash" do + state = %TestReplicationConnection{counter: 999} + + # Test that malformed binary data doesn't crash the function + malformed_inputs = [ + <<0, 0, 0, 0, 0, 0, 0, 0, 255>>, + # Very large binary + <<1::size(1000)>>, + # Large binary + List.duplicate(<<255>>, 100) |> IO.iodata_to_binary() + ] + + Enum.each(malformed_inputs, fn input -> + # Should not raise an exception + result = TestReplicationConnection.handle_data(input, state) + assert match?({:noreply, [], _}, result) + end) + end + end + + describe "message processing components" do + test "on_write callback integration" do + state = + %TestReplicationConnection{} + |> Map.put(:operations, []) + + # Test our custom on_write implementation directly + result_state = + TestReplicationConnection.on_write( + state, + # lsn + 100, + # op + :insert, + # table + "users", + # old_data + nil, + # data + %{"id" => 1, "name" => "test"} + ) + + operations = Map.get(result_state, :operations, []) + assert length(operations) == 1 + + [operation] = operations + assert operation.lsn == 100 + assert operation.op == :insert + assert operation.table == "users" + assert operation.data == %{"id" => 1, "name" => "test"} + end + + test "on_flush callback integration" do + state = + %TestReplicationConnection{ + flush_buffer: %{1 => "data1", 2 => "data2"} + } + |> Map.put(:flush_count, 5) + + result_state = TestReplicationConnection.on_flush(state) + + # Our test implementation should increment flush count and clear buffer + assert Map.get(result_state, :flush_count) == 6 + assert result_state.flush_buffer == %{} + end + + test "state transformations preserve required fields" do + initial_state = %TestReplicationConnection{ + schema: "custom", + publication_name: "test_pub", + replication_slot_name: "test_slot", + counter: 50 + } + + # Test that our callbacks preserve the core state structure + after_write = + TestReplicationConnection.on_write( + initial_state, + 200, + :update, + "table", + %{}, + %{} + ) + + # Core fields should be preserved + assert after_write.schema == "custom" + assert after_write.publication_name == "test_pub" + assert after_write.replication_slot_name == "test_slot" + assert after_write.counter == 50 + + after_flush = TestReplicationConnection.on_flush(after_write) + + # Should still preserve core fields after flush + assert after_flush.schema == "custom" + assert after_flush.publication_name == "test_pub" + assert after_flush.replication_slot_name == "test_slot" + end + end end