From 704ff9fd7abd4c94f66165cf24398e844bf41244 Mon Sep 17 00:00:00 2001 From: Jamil Date: Thu, 10 Jul 2025 12:50:00 -0700 Subject: [PATCH] fix(portal): send empty reply for incoming wal messages (#9821) In #9733, we changed the replies of the handle_data messages which seems to have caused Postgres to not respect our acknowledgements sent in the keepalive. To fix this, we revert to sending an empty message in response to write messages. --- .../lib/domain/replication/connection.ex | 18 +- .../domain/replication/connection_test.exs | 194 ++++++++++++++++++ 2 files changed, 203 insertions(+), 9 deletions(-) diff --git a/elixir/apps/domain/lib/domain/replication/connection.ex b/elixir/apps/domain/lib/domain/replication/connection.ex index f43ba34bb..2b5a6f885 100644 --- a/elixir/apps/domain/lib/domain/replication/connection.ex +++ b/elixir/apps/domain/lib/domain/replication/connection.ex @@ -422,7 +422,7 @@ defmodule Domain.Replication.Connection do state: inspect(state) ) - {:noreply, state} + {:noreply, [], state} end end end @@ -456,7 +456,7 @@ defmodule Domain.Replication.Connection do columns: columns } - {:noreply, %{state | relations: Map.put(state.relations, id, relation)}} + {:noreply, [], %{state | relations: Map.put(state.relations, id, relation)}} end defp handle_write(%Decoder.Messages.Insert{} = msg, server_wal_end, state) do @@ -481,7 +481,7 @@ defmodule Domain.Replication.Connection do state |> on_write(server_wal_end, op, table, old_data, data) |> maybe_flush() - |> then(&{:noreply, &1}) + |> then(&{:noreply, [], &1}) end defp maybe_flush(%{flush_buffer: buffer, flush_buffer_size: size} = state) @@ -506,7 +506,7 @@ defmodule Domain.Replication.Connection do send(self(), {:check_warning_threshold, lag_ms}) send(self(), {:check_error_threshold, lag_ms}) - {:noreply, state} + {:noreply, [], state} end end end @@ -517,19 +517,19 @@ defmodule Domain.Replication.Connection do # These messages are not relevant for our use case, so we ignore them. defp handle_write(%Decoder.Messages.Commit{}, _server_wal_end, state) do - {:noreply, state} + {:noreply, [], state} end defp handle_write(%Decoder.Messages.Origin{}, _server_wal_end, state) do - {:noreply, state} + {:noreply, [], state} end defp handle_write(%Decoder.Messages.Truncate{}, _server_wal_end, state) do - {:noreply, state} + {:noreply, [], state} end defp handle_write(%Decoder.Messages.Type{}, _server_wal_end, state) do - {:noreply, state} + {:noreply, [], state} end defp handle_write(%Decoder.Messages.Unsupported{data: data}, _server_wal_end, state) do @@ -538,7 +538,7 @@ defmodule Domain.Replication.Connection do counter: state.counter ) - {:noreply, state} + {:noreply, [], state} end end end diff --git a/elixir/apps/domain/test/domain/replication/connection_test.exs b/elixir/apps/domain/test/domain/replication/connection_test.exs index bf8cd61a8..19e026083 100644 --- a/elixir/apps/domain/test/domain/replication/connection_test.exs +++ b/elixir/apps/domain/test/domain/replication/connection_test.exs @@ -491,4 +491,198 @@ defmodule Domain.Replication.ConnectionTest do assert length(new_state.relations[1].columns) == 2 end end + + describe "handle_data/2" do + test "returns correct tuple format for unknown messages" do + state = %TestReplicationConnection{counter: 15} + + # Test with binary data that doesn't match WAL message patterns + unknown_data = <<255, 254, 253>> + + log = + capture_log(fn -> + result = TestReplicationConnection.handle_data(unknown_data, state) + + # Verify return format + assert match?({:noreply, [], %TestReplicationConnection{}}, result) + + {:noreply, reply_data, new_state} = result + assert reply_data == [] + # State should be unchanged + assert new_state == state + end) + + assert log =~ "Unknown WAL message received!" + end + + test "handle_data always returns 3-tuple with :noreply" do + state = %TestReplicationConnection{counter: 0} + + # Test various binary inputs to ensure consistent return patterns + test_inputs = [ + # Single byte + <<0>>, + # Multiple bytes + <<1, 2, 3>>, + # Empty binary + <<>>, + # High value bytes + <<255, 255>> + ] + + Enum.each(test_inputs, fn input -> + result = TestReplicationConnection.handle_data(input, state) + + # All handle_data calls should return 3-tuples starting with :noreply + assert match?({:noreply, _, _}, result) + + {tag, response, returned_state} = result + assert tag == :noreply + # Should be a list (empty for unknown messages) + assert is_list(response) + assert match?(%TestReplicationConnection{}, returned_state) + end) + end + + test "handle_data preserves state structure" do + complex_state = %TestReplicationConnection{ + counter: 42, + relations: %{1 => %{name: "test"}}, + flush_buffer: %{key: "value"}, + warning_threshold_exceeded?: true, + error_threshold_exceeded?: false + } + + unknown_data = <<99, 98, 97>> + + {:noreply, _response, new_state} = + TestReplicationConnection.handle_data(unknown_data, complex_state) + + # For unknown messages, state should be preserved exactly + assert new_state == complex_state + end + + test "handle_data logs unknown messages with context" do + state = %TestReplicationConnection{counter: 123} + unknown_data = <<1, 2, 3, 4, 5>> + + log = + capture_log(fn -> + TestReplicationConnection.handle_data(unknown_data, state) + end) + + # Should log the unknown message with data and state info + assert log =~ "Unknown WAL message received!" + assert log =~ "data=" + assert log =~ "state=" + end + + test "handle_data with empty binary" do + state = %TestReplicationConnection{counter: 0} + + {:noreply, response, new_state} = TestReplicationConnection.handle_data(<<>>, state) + + assert response == [] + assert new_state == state + end + + test "handle_data error handling doesn't crash" do + state = %TestReplicationConnection{counter: 999} + + # Test that malformed binary data doesn't crash the function + malformed_inputs = [ + <<0, 0, 0, 0, 0, 0, 0, 0, 255>>, + # Very large binary + <<1::size(1000)>>, + # Large binary + List.duplicate(<<255>>, 100) |> IO.iodata_to_binary() + ] + + Enum.each(malformed_inputs, fn input -> + # Should not raise an exception + result = TestReplicationConnection.handle_data(input, state) + assert match?({:noreply, [], _}, result) + end) + end + end + + describe "message processing components" do + test "on_write callback integration" do + state = + %TestReplicationConnection{} + |> Map.put(:operations, []) + + # Test our custom on_write implementation directly + result_state = + TestReplicationConnection.on_write( + state, + # lsn + 100, + # op + :insert, + # table + "users", + # old_data + nil, + # data + %{"id" => 1, "name" => "test"} + ) + + operations = Map.get(result_state, :operations, []) + assert length(operations) == 1 + + [operation] = operations + assert operation.lsn == 100 + assert operation.op == :insert + assert operation.table == "users" + assert operation.data == %{"id" => 1, "name" => "test"} + end + + test "on_flush callback integration" do + state = + %TestReplicationConnection{ + flush_buffer: %{1 => "data1", 2 => "data2"} + } + |> Map.put(:flush_count, 5) + + result_state = TestReplicationConnection.on_flush(state) + + # Our test implementation should increment flush count and clear buffer + assert Map.get(result_state, :flush_count) == 6 + assert result_state.flush_buffer == %{} + end + + test "state transformations preserve required fields" do + initial_state = %TestReplicationConnection{ + schema: "custom", + publication_name: "test_pub", + replication_slot_name: "test_slot", + counter: 50 + } + + # Test that our callbacks preserve the core state structure + after_write = + TestReplicationConnection.on_write( + initial_state, + 200, + :update, + "table", + %{}, + %{} + ) + + # Core fields should be preserved + assert after_write.schema == "custom" + assert after_write.publication_name == "test_pub" + assert after_write.replication_slot_name == "test_slot" + assert after_write.counter == 50 + + after_flush = TestReplicationConnection.on_flush(after_write) + + # Should still preserve core fields after flush + assert after_flush.schema == "custom" + assert after_flush.publication_name == "test_pub" + assert after_flush.replication_slot_name == "test_slot" + end + end end