Files
firezone/elixir/test.exs
Jamil 968db2ae39 feat(portal): Receive WAL events (#8909)
Firezone's control plane is a realtime, distributed system that relies
on a broadcast/subscribe system to function. In many cases, these events
are broadcasted whenever relevant data in the DB changes, such as an
actor losing access to a policy, a membership being deleted, and so
forth.

Today, this is handled in the application layer, typically happening at
the place where the relevant DB call is made (i.e. in an
`after_commit`). While this approach has worked thus far, it has several
issues:

1. We have no guarantee that the DB change will issue a broadcast. If
the application is deployed or the process crashes after the DB changes
are made but before the broadcast happens, we will have potentially
failed to update any connected clients or gateways with the changes.
2. We have no guarantee that the order of DB updates will be maintained
in order for broadcasts. In other words, app server A could win its DB
operation against app server B, but then proceed to lose being the first
to broadcast.
3. If the cluster is in a bad state where broadcasts may return an error
(i.e. https://github.com/firezone/firezone/issues/8660), we will never
retry the broadcast.

To fix the above issues, we introduce a WAL logical decoder that process
the event stream one message at a time and performs any needed work.
Serializability is guaranteed since we only process the WAL in a single,
cluster-global process, `ReplicationConnection`. Durability is also
guaranteed since we only ACK WAL segments after we've successfully
ingested the event.

This means we will only advance the position of our WAL stream after
successfully broadcasting the event.

This PR only introduces the WAL stream processing system but does not
introduce any changes to our current broadcasting behavior - that's
saved for another PR.
2025-04-29 23:53:06 -07:00

516 lines
19 KiB
Elixir

defmodule Domain.Events.ReplicationConnectionTest do
# Only one ReplicationConnection should be started in the cluster
use ExUnit.Case, async: false
alias Domain.Events.Decoder.Messages
alias Domain.Events.ReplicationConnection
# Used to test callbacks, not used for live connection
@mock_state %ReplicationConnection{
schema: "test_schema",
connection_opts: [],
step: :disconnected,
publication_name: "test_pub",
replication_slot_name: "test_slot",
output_plugin: "pgoutput",
proto_version: 1,
# Example, adjust if needed
table_subscriptions: ["accounts", "resources"],
relations: %{}
}
# Used to test live connection (Setup remains unchanged)
setup do
# Ensure Postgrex is started if your tests rely on it implicitly
{:ok, pid} = start_supervised(Domain.Events.ReplicationConnection)
{:ok, pid: pid}
end
describe "handle_connect/1 callback" do
test "handle_connect initiates publication check" do
state = @mock_state
expected_query = "SELECT 1 FROM pg_publication WHERE pubname = '#{state.publication_name}'"
expected_next_state = %{state | step: :create_publication}
assert {:query, ^expected_query, ^expected_next_state} =
ReplicationConnection.handle_connect(state)
end
end
describe "handle_result/2 callback" do
test "handle_result transitions from create_publication to create_replication_slot when publication exists" do
state = %{@mock_state | step: :create_publication}
# Mock a successful result for the SELECT query
result = %Postgrex.Result{
command: :select,
columns: ["?column?"],
num_rows: 1,
rows: [[1]]
}
expected_query =
"SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'"
expected_next_state = %{state | step: :create_replication_slot}
assert {:query, ^expected_query, ^expected_next_state} =
ReplicationConnection.handle_result(result, state)
end
test "handle_result transitions from create_replication_slot to start_replication_slot when slot exists" do
state = %{@mock_state | step: :create_replication_slot}
# Mock a successful result for the SELECT query
result = %Postgrex.Result{
command: :select,
columns: ["?column?"],
num_rows: 1,
rows: [[1]]
}
expected_query =
"CREATE_REPLICATION_SLOT #{state.replication_slot_name} LOGICAL #{state.output_plugin} NOEXPORT_SNAPSHOT"
expected_next_state = %{state | step: :start_replication_slot}
expected_stream_query =
"START_REPLICATION SLOT \"#{state.replication_slot_name}\" LOGICAL 0/0 (proto_version '#{state.proto_version}', publication_names '#{state.publication_name}')"
# Should be :streaming directly? Check impl.
expected_next_state_direct = %{state | step: :start_replication_slot}
# Let's assume it first goes to :start_replication_slot step, then handle_result for *that* step triggers START_REPLICATION
assert {:query, _query, ^expected_next_state_direct} =
ReplicationConnection.handle_result(result, state)
end
test "handle_result transitions from start_replication_slot to streaming" do
state = %{@mock_state | step: :start_replication_slot}
# Mock a successful result for the CREATE_REPLICATION_SLOT or preceding step
result = %Postgrex.Result{
# Or whatever command led here
command: :create_replication_slot,
columns: nil,
num_rows: 0,
rows: nil
}
expected_stream_query =
"START_REPLICATION SLOT \"#{state.replication_slot_name}\" LOGICAL 0/0 (proto_version '#{state.proto_version}', publication_names '#{state.publication_name}')"
expected_next_state = %{state | step: :streaming}
assert {:stream, ^expected_stream_query, [], ^expected_next_state} =
ReplicationConnection.handle_result(result, state)
end
test "handle_result creates publication if it doesn't exist" do
state = %{@mock_state | step: :create_publication}
# Mock result indicating publication doesn't exist
result = %Postgrex.Result{
command: :select,
columns: ["?column?"],
num_rows: 0,
rows: []
}
# Combine schema and table names correctly
expected_tables =
state.table_subscriptions
|> Enum.map(fn table -> "#{state.schema}.#{table}" end)
|> Enum.join(",")
expected_query = "CREATE PUBLICATION #{state.publication_name} FOR TABLE #{expected_tables}"
# The original test expected the next step to be :check_replication_slot, let's keep that
expected_next_state = %{state | step: :check_replication_slot}
assert {:query, ^expected_query, ^expected_next_state} =
ReplicationConnection.handle_result(result, state)
end
test "handle_result transitions from check_replication_slot to create_replication_slot after creating publication" do
state = %{@mock_state | step: :check_replication_slot}
# Mock a successful result from the CREATE PUBLICATION command
result = %Postgrex.Result{
command: :create_publication,
columns: nil,
num_rows: 0,
rows: nil
}
expected_query =
"SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'"
expected_next_state = %{state | step: :create_replication_slot}
assert {:query, ^expected_query, ^expected_next_state} =
ReplicationConnection.handle_result(result, state)
end
test "handle_result creates replication slot if it doesn't exist" do
state = %{@mock_state | step: :create_replication_slot}
# Mock result indicating slot doesn't exist
result = %Postgrex.Result{
command: :select,
columns: ["?column?"],
num_rows: 0,
rows: []
}
expected_query =
"CREATE_REPLICATION_SLOT #{state.replication_slot_name} LOGICAL #{state.output_plugin} NOEXPORT_SNAPSHOT"
expected_next_state = %{state | step: :start_replication_slot}
assert {:query, ^expected_query, ^expected_next_state} =
ReplicationConnection.handle_result(result, state)
end
end
# --- handle_data tests remain unchanged ---
# In-depth decoding tests are handled in Domain.Events.DecoderTest
describe "handle_data/2" do
test "handle_data handles KeepAlive with reply :now" do
state = %{@mock_state | step: :streaming}
wal_end = 12345
# Keepalive doesn't use this field meaningfully here
server_wal_start = 0
# Reply requested
reply_requested = 1
now_microseconds =
System.os_time(:microsecond) - DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond)
# 100 milliseconds tolerance for clock check
grace_period_microseconds = 100_000
keepalive_data = <<?k, wal_end::64, server_wal_start::64, reply_requested::8>>
# Expected reply format: 'r', confirmed_lsn::64, confirmed_lsn_commit::64, no_reply::8, high_priority::8, clock::64
# The actual implementation might construct the reply differently.
# This assertion needs to match the exact binary structure returned by handle_data.
# Let's assume the implementation sends back the received wal_end as confirmed LSNs,
# and the current time. The no_reply and high_priority flags might be 0.
assert {:reply, reply_binary, ^state} =
ReplicationConnection.handle_data(keepalive_data, state)
# Deconstruct the reply to verify its parts
assert <<?r, confirmed_lsn::64, confirmed_lsn_commit::64, no_reply::8, high_priority::8,
clock::64>> = reply_binary
assert confirmed_lsn == wal_end
# Or potentially server_wal_start? Check impl.
assert confirmed_lsn_commit == wal_end
assert no_reply == 0
assert high_priority == 0
assert now_microseconds <= clock < now_microseconds + grace_period_microseconds
end
test "handle_data handles KeepAlive with reply :later" do
state = %{@mock_state | step: :streaming}
wal_end = 54321
server_wal_start = 0
# No reply requested
reply_requested = 0
keepalive_data = <<?k, wal_end::64, server_wal_start::64, reply_requested::8>>
# When no reply is requested, it should return :noreply with no binary message
assert {:noreply, [], ^state} =
ReplicationConnection.handle_data(keepalive_data, state)
end
test "handle_data handles Write message (XLogData)" do
state = %{@mock_state | step: :streaming}
server_wal_start = 123_456_789
# This is the LSN of the end of the WAL data in this message
server_wal_end = 987_654_321
# Timestamp in microseconds since PG epoch
server_system_clock = 1_234_567_890
# Example decoded message data (e.g., a BEGIN message binary)
# This data should be passed to handle_info via send(self(), decoded_msg)
message_binary =
<<"B", @lsn_binary || <<0::64>>::binary-8, @timestamp_int || 0::integer-64,
@xid || 0::integer-32>>
write_data =
<<?w, server_wal_start::64, server_wal_end::64, server_system_clock::64,
message_binary::binary>>
# handle_data for 'w' should decode the message_binary and send it to self()
# It returns {:noreply, [], state} because the reply/acknowledgement happens
# via the KeepAlive ('k') mechanism.
assert {:noreply, [], ^state} = ReplicationConnection.handle_data(write_data, state)
# Assert that the decoded message was sent to self()
# Note: This requires the test process to receive the message.
# You might need `allow_receive` or similar testing patterns if handle_data
# directly uses `send`. If it calls another function that sends, test that function.
# Let's assume handle_data directly sends for this example.
# Need some sample data defined earlier for the assertion
@lsn_binary <<0::integer-32, 23_785_280::integer-32>>
@timestamp_int 704_521_200_000
@xid 1234
@timestamp_decoded ~U[2022-04-29 12:20:00.000000Z]
@lsn_decoded {0, 23_785_280}
expected_decoded_msg = %Messages.Begin{
final_lsn: @lsn_decoded,
commit_timestamp: @timestamp_decoded,
xid: @xid
}
assert_receive(^expected_decoded_msg)
end
test "handle_data handles unknown message type" do
state = %{@mock_state | step: :streaming}
# Using 'q' as an example unknown type
unknown_data = <<?q, 1, 2, 3>>
# Expect it to ignore unknown types and return noreply
assert {:noreply, [], ^state} = ReplicationConnection.handle_data(unknown_data, state)
# Optionally, assert that a warning was logged if applicable
end
end
# --- handle_info tests are CORRECTED below ---
describe "handle_info/2" do
test "handle_info updates relations on Relation message" do
state = @mock_state
# Use the correct fields from Messages.Relation struct
relation_msg = %Messages.Relation{
id: 101,
namespace: "public",
name: "accounts",
# Added replica_identity
replica_identity: :default,
columns: [
%Messages.Relation.Column{
flags: [:key],
name: "id",
type: "int4",
type_modifier: -1
},
%Messages.Relation.Column{
flags: [],
name: "name",
type: "text",
type_modifier: -1
}
]
}
# The state should store the relevant parts of the relation message, keyed by ID
expected_relation_data = %{
namespace: "public",
name: "accounts",
replica_identity: :default,
columns: [
%Messages.Relation.Column{
flags: [:key],
name: "id",
type: "int4",
type_modifier: -1
},
%Messages.Relation.Column{
flags: [],
name: "name",
type: "text",
type_modifier: -1
}
]
}
expected_relations = %{101 => expected_relation_data}
expected_state = %{state | relations: expected_relations}
assert {:noreply, ^expected_state} = ReplicationConnection.handle_info(relation_msg, state)
end
test "handle_info returns noreply for Insert message" do
# Pre-populate state with relation info if the handler needs it
state = %{
@mock_state
| relations: %{
101 => %{
name: "accounts",
namespace: "public",
columns: [
%Messages.Relation.Column{name: "id", type: "int4"},
%Messages.Relation.Column{name: "name", type: "text"}
]
}
}
}
# Use the correct field: tuple_data (which is a tuple)
insert_msg = %Messages.Insert{relation_id: 101, tuple_data: {1, "Alice"}}
# handle_info likely broadcasts or processes the insert, but returns noreply
assert {:noreply, ^state} = ReplicationConnection.handle_info(insert_msg, state)
# Add assertions here if handle_info is expected to send messages or call other funcs
end
test "handle_info returns noreply for Update message" do
state = %{
@mock_state
| relations: %{
101 => %{
name: "accounts",
namespace: "public",
columns: [
%Messages.Relation.Column{name: "id", type: "int4"},
%Messages.Relation.Column{name: "name", type: "text"}
]
}
}
}
# Use the correct fields: relation_id, old_tuple_data, tuple_data, changed_key_tuple_data
update_msg = %Messages.Update{
relation_id: 101,
# Example: only old data provided
old_tuple_data: {1, "Alice"},
# Example: new data
tuple_data: {1, "Bob"},
# Example: key didn't change or wasn't provided
changed_key_tuple_data: nil
}
assert {:noreply, ^state} = ReplicationConnection.handle_info(update_msg, state)
# Add assertions for side effects (broadcasts etc.) if needed
end
test "handle_info returns noreply for Delete message" do
state = %{
@mock_state
| relations: %{
101 => %{
name: "accounts",
namespace: "public",
columns: [
%Messages.Relation.Column{name: "id", type: "int4"},
%Messages.Relation.Column{name: "name", type: "text"}
]
}
}
}
# Use the correct fields: relation_id, old_tuple_data, changed_key_tuple_data
delete_msg = %Messages.Delete{
relation_id: 101,
# Example: old data provided
old_tuple_data: {1, "Bob"},
# Example: key data not provided
changed_key_tuple_data: nil
}
assert {:noreply, ^state} = ReplicationConnection.handle_info(delete_msg, state)
# Add assertions for side effects if needed
end
test "handle_info ignores Begin message" do
state = @mock_state
# Use correct fields: final_lsn, commit_timestamp, xid
begin_msg = %Messages.Begin{
final_lsn: {0, 123},
commit_timestamp: ~U[2023-01-01 10:00:00Z],
xid: 789
}
assert {:noreply, ^state} = ReplicationConnection.handle_info(begin_msg, state)
end
test "handle_info ignores Commit message" do
state = @mock_state
# Use correct fields: flags, lsn, end_lsn, commit_timestamp
commit_msg = %Messages.Commit{
flags: [],
lsn: {0, 123},
end_lsn: {0, 456},
commit_timestamp: ~U[2023-01-01 10:00:01Z]
}
assert {:noreply, ^state} = ReplicationConnection.handle_info(commit_msg, state)
end
test "handle_info ignores Origin message" do
state = @mock_state
# Use correct fields: origin_commit_lsn, name
origin_msg = %Messages.Origin{origin_commit_lsn: {0, 1}, name: "origin_name"}
assert {:noreply, ^state} = ReplicationConnection.handle_info(origin_msg, state)
end
test "handle_info ignores Truncate message" do
state = @mock_state
# Use correct fields: number_of_relations, options, truncated_relations
truncate_msg = %Messages.Truncate{
number_of_relations: 2,
options: [:cascade],
truncated_relations: [101, 102]
}
assert {:noreply, ^state} = ReplicationConnection.handle_info(truncate_msg, state)
end
test "handle_info ignores Type message" do
state = @mock_state
# Use correct fields: id, namespace, name
type_msg = %Messages.Type{id: 23, namespace: "pg_catalog", name: "int4"}
assert {:noreply, ^state} = ReplicationConnection.handle_info(type_msg, state)
end
test "handle_info returns noreply for Unsupported message" do
state = @mock_state
unsupported_msg = %Messages.Unsupported{data: <<1, 2, 3>>}
# We cannot easily verify Logger.warning was called without mocks/capture.
assert {:noreply, ^state} = ReplicationConnection.handle_info(unsupported_msg, state)
end
test "handle_info handles :shutdown message" do
state = @mock_state
# Expect :disconnect tuple based on common GenServer patterns for shutdown
assert {:stop, :normal, ^state} = ReplicationConnection.handle_info(:shutdown, state)
# Note: The original test asserted {:disconnect, :normal}. {:stop, :normal, state} is
# the standard GenServer return for a clean stop triggered by handle_info. Adjust
# if your implementation specifically returns :disconnect.
end
test "handle_info handles :DOWN message from monitored process" do
state = @mock_state
monitor_ref = make_ref()
# Example DOWN message structure
down_msg = {:DOWN, monitor_ref, :process, :some_pid, :shutdown}
# Expect the server to stop itself upon receiving DOWN for a critical process
assert {:stop, :normal, ^state} = ReplicationConnection.handle_info(down_msg, state)
# Again, adjust the expected return (:disconnect vs :stop) based on implementation.
end
test "handle_info ignores other messages" do
state = @mock_state
random_msg = {:some_other_info, "data"}
assert {:noreply, ^state} = ReplicationConnection.handle_info(random_msg, state)
end
end
# --- Moved handle_disconnect test to its own describe block ---
describe "handle_disconnect/1" do
test "handle_disconnect resets step to :disconnected and logs warning" do
state = %{@mock_state | step: :streaming}
expected_state = %{state | step: :disconnected}
# Capture log to verify warning (requires ExUnit config)
log_output =
ExUnit.CaptureLog.capture_log(fn ->
assert {:noreply, ^expected_state} = ReplicationConnection.handle_disconnect(state)
end)
assert log_output =~ "Replication connection disconnected."
# Or match the exact log message if needed
end
end
end