fix(portal): send empty reply for incoming wal messages (#9821)

In #9733, we changed the replies of the handle_data messages which seems
to have caused Postgres to not respect our acknowledgements sent in the
keepalive.

To fix this, we revert to sending an empty message in response to write
messages.
This commit is contained in:
Jamil
2025-07-10 12:50:00 -07:00
committed by GitHub
parent 13c8c70750
commit 704ff9fd7a
2 changed files with 203 additions and 9 deletions

View File

@@ -422,7 +422,7 @@ defmodule Domain.Replication.Connection do
state: inspect(state)
)
{:noreply, state}
{:noreply, [], state}
end
end
end
@@ -456,7 +456,7 @@ defmodule Domain.Replication.Connection do
columns: columns
}
{:noreply, %{state | relations: Map.put(state.relations, id, relation)}}
{:noreply, [], %{state | relations: Map.put(state.relations, id, relation)}}
end
defp handle_write(%Decoder.Messages.Insert{} = msg, server_wal_end, state) do
@@ -481,7 +481,7 @@ defmodule Domain.Replication.Connection do
state
|> on_write(server_wal_end, op, table, old_data, data)
|> maybe_flush()
|> then(&{:noreply, &1})
|> then(&{:noreply, [], &1})
end
defp maybe_flush(%{flush_buffer: buffer, flush_buffer_size: size} = state)
@@ -506,7 +506,7 @@ defmodule Domain.Replication.Connection do
send(self(), {:check_warning_threshold, lag_ms})
send(self(), {:check_error_threshold, lag_ms})
{:noreply, state}
{:noreply, [], state}
end
end
end
@@ -517,19 +517,19 @@ defmodule Domain.Replication.Connection do
# These messages are not relevant for our use case, so we ignore them.
defp handle_write(%Decoder.Messages.Commit{}, _server_wal_end, state) do
{:noreply, state}
{:noreply, [], state}
end
defp handle_write(%Decoder.Messages.Origin{}, _server_wal_end, state) do
{:noreply, state}
{:noreply, [], state}
end
defp handle_write(%Decoder.Messages.Truncate{}, _server_wal_end, state) do
{:noreply, state}
{:noreply, [], state}
end
defp handle_write(%Decoder.Messages.Type{}, _server_wal_end, state) do
{:noreply, state}
{:noreply, [], state}
end
defp handle_write(%Decoder.Messages.Unsupported{data: data}, _server_wal_end, state) do
@@ -538,7 +538,7 @@ defmodule Domain.Replication.Connection do
counter: state.counter
)
{:noreply, state}
{:noreply, [], state}
end
end
end

View File

@@ -491,4 +491,198 @@ defmodule Domain.Replication.ConnectionTest do
assert length(new_state.relations[1].columns) == 2
end
end
describe "handle_data/2" do
test "returns correct tuple format for unknown messages" do
state = %TestReplicationConnection{counter: 15}
# Test with binary data that doesn't match WAL message patterns
unknown_data = <<255, 254, 253>>
log =
capture_log(fn ->
result = TestReplicationConnection.handle_data(unknown_data, state)
# Verify return format
assert match?({:noreply, [], %TestReplicationConnection{}}, result)
{:noreply, reply_data, new_state} = result
assert reply_data == []
# State should be unchanged
assert new_state == state
end)
assert log =~ "Unknown WAL message received!"
end
test "handle_data always returns 3-tuple with :noreply" do
state = %TestReplicationConnection{counter: 0}
# Test various binary inputs to ensure consistent return patterns
test_inputs = [
# Single byte
<<0>>,
# Multiple bytes
<<1, 2, 3>>,
# Empty binary
<<>>,
# High value bytes
<<255, 255>>
]
Enum.each(test_inputs, fn input ->
result = TestReplicationConnection.handle_data(input, state)
# All handle_data calls should return 3-tuples starting with :noreply
assert match?({:noreply, _, _}, result)
{tag, response, returned_state} = result
assert tag == :noreply
# Should be a list (empty for unknown messages)
assert is_list(response)
assert match?(%TestReplicationConnection{}, returned_state)
end)
end
test "handle_data preserves state structure" do
complex_state = %TestReplicationConnection{
counter: 42,
relations: %{1 => %{name: "test"}},
flush_buffer: %{key: "value"},
warning_threshold_exceeded?: true,
error_threshold_exceeded?: false
}
unknown_data = <<99, 98, 97>>
{:noreply, _response, new_state} =
TestReplicationConnection.handle_data(unknown_data, complex_state)
# For unknown messages, state should be preserved exactly
assert new_state == complex_state
end
test "handle_data logs unknown messages with context" do
state = %TestReplicationConnection{counter: 123}
unknown_data = <<1, 2, 3, 4, 5>>
log =
capture_log(fn ->
TestReplicationConnection.handle_data(unknown_data, state)
end)
# Should log the unknown message with data and state info
assert log =~ "Unknown WAL message received!"
assert log =~ "data="
assert log =~ "state="
end
test "handle_data with empty binary" do
state = %TestReplicationConnection{counter: 0}
{:noreply, response, new_state} = TestReplicationConnection.handle_data(<<>>, state)
assert response == []
assert new_state == state
end
test "handle_data error handling doesn't crash" do
state = %TestReplicationConnection{counter: 999}
# Test that malformed binary data doesn't crash the function
malformed_inputs = [
<<0, 0, 0, 0, 0, 0, 0, 0, 255>>,
# Very large binary
<<1::size(1000)>>,
# Large binary
List.duplicate(<<255>>, 100) |> IO.iodata_to_binary()
]
Enum.each(malformed_inputs, fn input ->
# Should not raise an exception
result = TestReplicationConnection.handle_data(input, state)
assert match?({:noreply, [], _}, result)
end)
end
end
describe "message processing components" do
test "on_write callback integration" do
state =
%TestReplicationConnection{}
|> Map.put(:operations, [])
# Test our custom on_write implementation directly
result_state =
TestReplicationConnection.on_write(
state,
# lsn
100,
# op
:insert,
# table
"users",
# old_data
nil,
# data
%{"id" => 1, "name" => "test"}
)
operations = Map.get(result_state, :operations, [])
assert length(operations) == 1
[operation] = operations
assert operation.lsn == 100
assert operation.op == :insert
assert operation.table == "users"
assert operation.data == %{"id" => 1, "name" => "test"}
end
test "on_flush callback integration" do
state =
%TestReplicationConnection{
flush_buffer: %{1 => "data1", 2 => "data2"}
}
|> Map.put(:flush_count, 5)
result_state = TestReplicationConnection.on_flush(state)
# Our test implementation should increment flush count and clear buffer
assert Map.get(result_state, :flush_count) == 6
assert result_state.flush_buffer == %{}
end
test "state transformations preserve required fields" do
initial_state = %TestReplicationConnection{
schema: "custom",
publication_name: "test_pub",
replication_slot_name: "test_slot",
counter: 50
}
# Test that our callbacks preserve the core state structure
after_write =
TestReplicationConnection.on_write(
initial_state,
200,
:update,
"table",
%{},
%{}
)
# Core fields should be preserved
assert after_write.schema == "custom"
assert after_write.publication_name == "test_pub"
assert after_write.replication_slot_name == "test_slot"
assert after_write.counter == 50
after_flush = TestReplicationConnection.on_flush(after_write)
# Should still preserve core fields after flush
assert after_flush.schema == "custom"
assert after_flush.publication_name == "test_pub"
assert after_flush.replication_slot_name == "test_slot"
end
end
end