mirror of
https://github.com/outbackdingo/firezone.git
synced 2026-01-27 10:18:54 +00:00
Firezone's control plane is a realtime, distributed system that relies on a broadcast/subscribe system to function. In many cases, these events are broadcasted whenever relevant data in the DB changes, such as an actor losing access to a policy, a membership being deleted, and so forth. Today, this is handled in the application layer, typically happening at the place where the relevant DB call is made (i.e. in an `after_commit`). While this approach has worked thus far, it has several issues: 1. We have no guarantee that the DB change will issue a broadcast. If the application is deployed or the process crashes after the DB changes are made but before the broadcast happens, we will have potentially failed to update any connected clients or gateways with the changes. 2. We have no guarantee that the order of DB updates will be maintained in order for broadcasts. In other words, app server A could win its DB operation against app server B, but then proceed to lose being the first to broadcast. 3. If the cluster is in a bad state where broadcasts may return an error (i.e. https://github.com/firezone/firezone/issues/8660), we will never retry the broadcast. To fix the above issues, we introduce a WAL logical decoder that process the event stream one message at a time and performs any needed work. Serializability is guaranteed since we only process the WAL in a single, cluster-global process, `ReplicationConnection`. Durability is also guaranteed since we only ACK WAL segments after we've successfully ingested the event. This means we will only advance the position of our WAL stream after successfully broadcasting the event. This PR only introduces the WAL stream processing system but does not introduce any changes to our current broadcasting behavior - that's saved for another PR.
516 lines
19 KiB
Elixir
516 lines
19 KiB
Elixir
defmodule Domain.Events.ReplicationConnectionTest do
|
|
# Only one ReplicationConnection should be started in the cluster
|
|
use ExUnit.Case, async: false
|
|
|
|
alias Domain.Events.Decoder.Messages
|
|
alias Domain.Events.ReplicationConnection
|
|
|
|
# Used to test callbacks, not used for live connection
|
|
@mock_state %ReplicationConnection{
|
|
schema: "test_schema",
|
|
connection_opts: [],
|
|
step: :disconnected,
|
|
publication_name: "test_pub",
|
|
replication_slot_name: "test_slot",
|
|
output_plugin: "pgoutput",
|
|
proto_version: 1,
|
|
# Example, adjust if needed
|
|
table_subscriptions: ["accounts", "resources"],
|
|
relations: %{}
|
|
}
|
|
|
|
# Used to test live connection (Setup remains unchanged)
|
|
setup do
|
|
# Ensure Postgrex is started if your tests rely on it implicitly
|
|
{:ok, pid} = start_supervised(Domain.Events.ReplicationConnection)
|
|
|
|
{:ok, pid: pid}
|
|
end
|
|
|
|
describe "handle_connect/1 callback" do
|
|
test "handle_connect initiates publication check" do
|
|
state = @mock_state
|
|
expected_query = "SELECT 1 FROM pg_publication WHERE pubname = '#{state.publication_name}'"
|
|
expected_next_state = %{state | step: :create_publication}
|
|
|
|
assert {:query, ^expected_query, ^expected_next_state} =
|
|
ReplicationConnection.handle_connect(state)
|
|
end
|
|
end
|
|
|
|
describe "handle_result/2 callback" do
|
|
test "handle_result transitions from create_publication to create_replication_slot when publication exists" do
|
|
state = %{@mock_state | step: :create_publication}
|
|
# Mock a successful result for the SELECT query
|
|
result = %Postgrex.Result{
|
|
command: :select,
|
|
columns: ["?column?"],
|
|
num_rows: 1,
|
|
rows: [[1]]
|
|
}
|
|
|
|
expected_query =
|
|
"SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'"
|
|
|
|
expected_next_state = %{state | step: :create_replication_slot}
|
|
|
|
assert {:query, ^expected_query, ^expected_next_state} =
|
|
ReplicationConnection.handle_result(result, state)
|
|
end
|
|
|
|
test "handle_result transitions from create_replication_slot to start_replication_slot when slot exists" do
|
|
state = %{@mock_state | step: :create_replication_slot}
|
|
# Mock a successful result for the SELECT query
|
|
result = %Postgrex.Result{
|
|
command: :select,
|
|
columns: ["?column?"],
|
|
num_rows: 1,
|
|
rows: [[1]]
|
|
}
|
|
|
|
expected_query =
|
|
"CREATE_REPLICATION_SLOT #{state.replication_slot_name} LOGICAL #{state.output_plugin} NOEXPORT_SNAPSHOT"
|
|
|
|
expected_next_state = %{state | step: :start_replication_slot}
|
|
|
|
expected_stream_query =
|
|
"START_REPLICATION SLOT \"#{state.replication_slot_name}\" LOGICAL 0/0 (proto_version '#{state.proto_version}', publication_names '#{state.publication_name}')"
|
|
|
|
# Should be :streaming directly? Check impl.
|
|
expected_next_state_direct = %{state | step: :start_replication_slot}
|
|
|
|
# Let's assume it first goes to :start_replication_slot step, then handle_result for *that* step triggers START_REPLICATION
|
|
assert {:query, _query, ^expected_next_state_direct} =
|
|
ReplicationConnection.handle_result(result, state)
|
|
end
|
|
|
|
test "handle_result transitions from start_replication_slot to streaming" do
|
|
state = %{@mock_state | step: :start_replication_slot}
|
|
# Mock a successful result for the CREATE_REPLICATION_SLOT or preceding step
|
|
result = %Postgrex.Result{
|
|
# Or whatever command led here
|
|
command: :create_replication_slot,
|
|
columns: nil,
|
|
num_rows: 0,
|
|
rows: nil
|
|
}
|
|
|
|
expected_stream_query =
|
|
"START_REPLICATION SLOT \"#{state.replication_slot_name}\" LOGICAL 0/0 (proto_version '#{state.proto_version}', publication_names '#{state.publication_name}')"
|
|
|
|
expected_next_state = %{state | step: :streaming}
|
|
|
|
assert {:stream, ^expected_stream_query, [], ^expected_next_state} =
|
|
ReplicationConnection.handle_result(result, state)
|
|
end
|
|
|
|
test "handle_result creates publication if it doesn't exist" do
|
|
state = %{@mock_state | step: :create_publication}
|
|
# Mock result indicating publication doesn't exist
|
|
result = %Postgrex.Result{
|
|
command: :select,
|
|
columns: ["?column?"],
|
|
num_rows: 0,
|
|
rows: []
|
|
}
|
|
|
|
# Combine schema and table names correctly
|
|
expected_tables =
|
|
state.table_subscriptions
|
|
|> Enum.map(fn table -> "#{state.schema}.#{table}" end)
|
|
|> Enum.join(",")
|
|
|
|
expected_query = "CREATE PUBLICATION #{state.publication_name} FOR TABLE #{expected_tables}"
|
|
# The original test expected the next step to be :check_replication_slot, let's keep that
|
|
expected_next_state = %{state | step: :check_replication_slot}
|
|
|
|
assert {:query, ^expected_query, ^expected_next_state} =
|
|
ReplicationConnection.handle_result(result, state)
|
|
end
|
|
|
|
test "handle_result transitions from check_replication_slot to create_replication_slot after creating publication" do
|
|
state = %{@mock_state | step: :check_replication_slot}
|
|
# Mock a successful result from the CREATE PUBLICATION command
|
|
result = %Postgrex.Result{
|
|
command: :create_publication,
|
|
columns: nil,
|
|
num_rows: 0,
|
|
rows: nil
|
|
}
|
|
|
|
expected_query =
|
|
"SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'"
|
|
|
|
expected_next_state = %{state | step: :create_replication_slot}
|
|
|
|
assert {:query, ^expected_query, ^expected_next_state} =
|
|
ReplicationConnection.handle_result(result, state)
|
|
end
|
|
|
|
test "handle_result creates replication slot if it doesn't exist" do
|
|
state = %{@mock_state | step: :create_replication_slot}
|
|
# Mock result indicating slot doesn't exist
|
|
result = %Postgrex.Result{
|
|
command: :select,
|
|
columns: ["?column?"],
|
|
num_rows: 0,
|
|
rows: []
|
|
}
|
|
|
|
expected_query =
|
|
"CREATE_REPLICATION_SLOT #{state.replication_slot_name} LOGICAL #{state.output_plugin} NOEXPORT_SNAPSHOT"
|
|
|
|
expected_next_state = %{state | step: :start_replication_slot}
|
|
|
|
assert {:query, ^expected_query, ^expected_next_state} =
|
|
ReplicationConnection.handle_result(result, state)
|
|
end
|
|
end
|
|
|
|
# --- handle_data tests remain unchanged ---
|
|
# In-depth decoding tests are handled in Domain.Events.DecoderTest
|
|
describe "handle_data/2" do
|
|
test "handle_data handles KeepAlive with reply :now" do
|
|
state = %{@mock_state | step: :streaming}
|
|
wal_end = 12345
|
|
# Keepalive doesn't use this field meaningfully here
|
|
server_wal_start = 0
|
|
# Reply requested
|
|
reply_requested = 1
|
|
|
|
now_microseconds =
|
|
System.os_time(:microsecond) - DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond)
|
|
|
|
# 100 milliseconds tolerance for clock check
|
|
grace_period_microseconds = 100_000
|
|
|
|
keepalive_data = <<?k, wal_end::64, server_wal_start::64, reply_requested::8>>
|
|
|
|
# Expected reply format: 'r', confirmed_lsn::64, confirmed_lsn_commit::64, no_reply::8, high_priority::8, clock::64
|
|
# The actual implementation might construct the reply differently.
|
|
# This assertion needs to match the exact binary structure returned by handle_data.
|
|
# Let's assume the implementation sends back the received wal_end as confirmed LSNs,
|
|
# and the current time. The no_reply and high_priority flags might be 0.
|
|
assert {:reply, reply_binary, ^state} =
|
|
ReplicationConnection.handle_data(keepalive_data, state)
|
|
|
|
# Deconstruct the reply to verify its parts
|
|
assert <<?r, confirmed_lsn::64, confirmed_lsn_commit::64, no_reply::8, high_priority::8,
|
|
clock::64>> = reply_binary
|
|
|
|
assert confirmed_lsn == wal_end
|
|
# Or potentially server_wal_start? Check impl.
|
|
assert confirmed_lsn_commit == wal_end
|
|
assert no_reply == 0
|
|
assert high_priority == 0
|
|
assert now_microseconds <= clock < now_microseconds + grace_period_microseconds
|
|
end
|
|
|
|
test "handle_data handles KeepAlive with reply :later" do
|
|
state = %{@mock_state | step: :streaming}
|
|
wal_end = 54321
|
|
server_wal_start = 0
|
|
# No reply requested
|
|
reply_requested = 0
|
|
|
|
keepalive_data = <<?k, wal_end::64, server_wal_start::64, reply_requested::8>>
|
|
|
|
# When no reply is requested, it should return :noreply with no binary message
|
|
assert {:noreply, [], ^state} =
|
|
ReplicationConnection.handle_data(keepalive_data, state)
|
|
end
|
|
|
|
test "handle_data handles Write message (XLogData)" do
|
|
state = %{@mock_state | step: :streaming}
|
|
server_wal_start = 123_456_789
|
|
# This is the LSN of the end of the WAL data in this message
|
|
server_wal_end = 987_654_321
|
|
# Timestamp in microseconds since PG epoch
|
|
server_system_clock = 1_234_567_890
|
|
# Example decoded message data (e.g., a BEGIN message binary)
|
|
# This data should be passed to handle_info via send(self(), decoded_msg)
|
|
message_binary =
|
|
<<"B", @lsn_binary || <<0::64>>::binary-8, @timestamp_int || 0::integer-64,
|
|
@xid || 0::integer-32>>
|
|
|
|
write_data =
|
|
<<?w, server_wal_start::64, server_wal_end::64, server_system_clock::64,
|
|
message_binary::binary>>
|
|
|
|
# handle_data for 'w' should decode the message_binary and send it to self()
|
|
# It returns {:noreply, [], state} because the reply/acknowledgement happens
|
|
# via the KeepAlive ('k') mechanism.
|
|
assert {:noreply, [], ^state} = ReplicationConnection.handle_data(write_data, state)
|
|
|
|
# Assert that the decoded message was sent to self()
|
|
# Note: This requires the test process to receive the message.
|
|
# You might need `allow_receive` or similar testing patterns if handle_data
|
|
# directly uses `send`. If it calls another function that sends, test that function.
|
|
# Let's assume handle_data directly sends for this example.
|
|
# Need some sample data defined earlier for the assertion
|
|
@lsn_binary <<0::integer-32, 23_785_280::integer-32>>
|
|
@timestamp_int 704_521_200_000
|
|
@xid 1234
|
|
@timestamp_decoded ~U[2022-04-29 12:20:00.000000Z]
|
|
@lsn_decoded {0, 23_785_280}
|
|
|
|
expected_decoded_msg = %Messages.Begin{
|
|
final_lsn: @lsn_decoded,
|
|
commit_timestamp: @timestamp_decoded,
|
|
xid: @xid
|
|
}
|
|
|
|
assert_receive(^expected_decoded_msg)
|
|
end
|
|
|
|
test "handle_data handles unknown message type" do
|
|
state = %{@mock_state | step: :streaming}
|
|
# Using 'q' as an example unknown type
|
|
unknown_data = <<?q, 1, 2, 3>>
|
|
|
|
# Expect it to ignore unknown types and return noreply
|
|
assert {:noreply, [], ^state} = ReplicationConnection.handle_data(unknown_data, state)
|
|
# Optionally, assert that a warning was logged if applicable
|
|
end
|
|
end
|
|
|
|
# --- handle_info tests are CORRECTED below ---
|
|
describe "handle_info/2" do
|
|
test "handle_info updates relations on Relation message" do
|
|
state = @mock_state
|
|
|
|
# Use the correct fields from Messages.Relation struct
|
|
relation_msg = %Messages.Relation{
|
|
id: 101,
|
|
namespace: "public",
|
|
name: "accounts",
|
|
# Added replica_identity
|
|
replica_identity: :default,
|
|
columns: [
|
|
%Messages.Relation.Column{
|
|
flags: [:key],
|
|
name: "id",
|
|
type: "int4",
|
|
type_modifier: -1
|
|
},
|
|
%Messages.Relation.Column{
|
|
flags: [],
|
|
name: "name",
|
|
type: "text",
|
|
type_modifier: -1
|
|
}
|
|
]
|
|
}
|
|
|
|
# The state should store the relevant parts of the relation message, keyed by ID
|
|
expected_relation_data = %{
|
|
namespace: "public",
|
|
name: "accounts",
|
|
replica_identity: :default,
|
|
columns: [
|
|
%Messages.Relation.Column{
|
|
flags: [:key],
|
|
name: "id",
|
|
type: "int4",
|
|
type_modifier: -1
|
|
},
|
|
%Messages.Relation.Column{
|
|
flags: [],
|
|
name: "name",
|
|
type: "text",
|
|
type_modifier: -1
|
|
}
|
|
]
|
|
}
|
|
|
|
expected_relations = %{101 => expected_relation_data}
|
|
expected_state = %{state | relations: expected_relations}
|
|
|
|
assert {:noreply, ^expected_state} = ReplicationConnection.handle_info(relation_msg, state)
|
|
end
|
|
|
|
test "handle_info returns noreply for Insert message" do
|
|
# Pre-populate state with relation info if the handler needs it
|
|
state = %{
|
|
@mock_state
|
|
| relations: %{
|
|
101 => %{
|
|
name: "accounts",
|
|
namespace: "public",
|
|
columns: [
|
|
%Messages.Relation.Column{name: "id", type: "int4"},
|
|
%Messages.Relation.Column{name: "name", type: "text"}
|
|
]
|
|
}
|
|
}
|
|
}
|
|
|
|
# Use the correct field: tuple_data (which is a tuple)
|
|
insert_msg = %Messages.Insert{relation_id: 101, tuple_data: {1, "Alice"}}
|
|
|
|
# handle_info likely broadcasts or processes the insert, but returns noreply
|
|
assert {:noreply, ^state} = ReplicationConnection.handle_info(insert_msg, state)
|
|
# Add assertions here if handle_info is expected to send messages or call other funcs
|
|
end
|
|
|
|
test "handle_info returns noreply for Update message" do
|
|
state = %{
|
|
@mock_state
|
|
| relations: %{
|
|
101 => %{
|
|
name: "accounts",
|
|
namespace: "public",
|
|
columns: [
|
|
%Messages.Relation.Column{name: "id", type: "int4"},
|
|
%Messages.Relation.Column{name: "name", type: "text"}
|
|
]
|
|
}
|
|
}
|
|
}
|
|
|
|
# Use the correct fields: relation_id, old_tuple_data, tuple_data, changed_key_tuple_data
|
|
update_msg = %Messages.Update{
|
|
relation_id: 101,
|
|
# Example: only old data provided
|
|
old_tuple_data: {1, "Alice"},
|
|
# Example: new data
|
|
tuple_data: {1, "Bob"},
|
|
# Example: key didn't change or wasn't provided
|
|
changed_key_tuple_data: nil
|
|
}
|
|
|
|
assert {:noreply, ^state} = ReplicationConnection.handle_info(update_msg, state)
|
|
# Add assertions for side effects (broadcasts etc.) if needed
|
|
end
|
|
|
|
test "handle_info returns noreply for Delete message" do
|
|
state = %{
|
|
@mock_state
|
|
| relations: %{
|
|
101 => %{
|
|
name: "accounts",
|
|
namespace: "public",
|
|
columns: [
|
|
%Messages.Relation.Column{name: "id", type: "int4"},
|
|
%Messages.Relation.Column{name: "name", type: "text"}
|
|
]
|
|
}
|
|
}
|
|
}
|
|
|
|
# Use the correct fields: relation_id, old_tuple_data, changed_key_tuple_data
|
|
delete_msg = %Messages.Delete{
|
|
relation_id: 101,
|
|
# Example: old data provided
|
|
old_tuple_data: {1, "Bob"},
|
|
# Example: key data not provided
|
|
changed_key_tuple_data: nil
|
|
}
|
|
|
|
assert {:noreply, ^state} = ReplicationConnection.handle_info(delete_msg, state)
|
|
# Add assertions for side effects if needed
|
|
end
|
|
|
|
test "handle_info ignores Begin message" do
|
|
state = @mock_state
|
|
# Use correct fields: final_lsn, commit_timestamp, xid
|
|
begin_msg = %Messages.Begin{
|
|
final_lsn: {0, 123},
|
|
commit_timestamp: ~U[2023-01-01 10:00:00Z],
|
|
xid: 789
|
|
}
|
|
|
|
assert {:noreply, ^state} = ReplicationConnection.handle_info(begin_msg, state)
|
|
end
|
|
|
|
test "handle_info ignores Commit message" do
|
|
state = @mock_state
|
|
# Use correct fields: flags, lsn, end_lsn, commit_timestamp
|
|
commit_msg = %Messages.Commit{
|
|
flags: [],
|
|
lsn: {0, 123},
|
|
end_lsn: {0, 456},
|
|
commit_timestamp: ~U[2023-01-01 10:00:01Z]
|
|
}
|
|
|
|
assert {:noreply, ^state} = ReplicationConnection.handle_info(commit_msg, state)
|
|
end
|
|
|
|
test "handle_info ignores Origin message" do
|
|
state = @mock_state
|
|
# Use correct fields: origin_commit_lsn, name
|
|
origin_msg = %Messages.Origin{origin_commit_lsn: {0, 1}, name: "origin_name"}
|
|
assert {:noreply, ^state} = ReplicationConnection.handle_info(origin_msg, state)
|
|
end
|
|
|
|
test "handle_info ignores Truncate message" do
|
|
state = @mock_state
|
|
# Use correct fields: number_of_relations, options, truncated_relations
|
|
truncate_msg = %Messages.Truncate{
|
|
number_of_relations: 2,
|
|
options: [:cascade],
|
|
truncated_relations: [101, 102]
|
|
}
|
|
|
|
assert {:noreply, ^state} = ReplicationConnection.handle_info(truncate_msg, state)
|
|
end
|
|
|
|
test "handle_info ignores Type message" do
|
|
state = @mock_state
|
|
# Use correct fields: id, namespace, name
|
|
type_msg = %Messages.Type{id: 23, namespace: "pg_catalog", name: "int4"}
|
|
assert {:noreply, ^state} = ReplicationConnection.handle_info(type_msg, state)
|
|
end
|
|
|
|
test "handle_info returns noreply for Unsupported message" do
|
|
state = @mock_state
|
|
unsupported_msg = %Messages.Unsupported{data: <<1, 2, 3>>}
|
|
# We cannot easily verify Logger.warning was called without mocks/capture.
|
|
assert {:noreply, ^state} = ReplicationConnection.handle_info(unsupported_msg, state)
|
|
end
|
|
|
|
test "handle_info handles :shutdown message" do
|
|
state = @mock_state
|
|
# Expect :disconnect tuple based on common GenServer patterns for shutdown
|
|
assert {:stop, :normal, ^state} = ReplicationConnection.handle_info(:shutdown, state)
|
|
# Note: The original test asserted {:disconnect, :normal}. {:stop, :normal, state} is
|
|
# the standard GenServer return for a clean stop triggered by handle_info. Adjust
|
|
# if your implementation specifically returns :disconnect.
|
|
end
|
|
|
|
test "handle_info handles :DOWN message from monitored process" do
|
|
state = @mock_state
|
|
monitor_ref = make_ref()
|
|
# Example DOWN message structure
|
|
down_msg = {:DOWN, monitor_ref, :process, :some_pid, :shutdown}
|
|
|
|
# Expect the server to stop itself upon receiving DOWN for a critical process
|
|
assert {:stop, :normal, ^state} = ReplicationConnection.handle_info(down_msg, state)
|
|
# Again, adjust the expected return (:disconnect vs :stop) based on implementation.
|
|
end
|
|
|
|
test "handle_info ignores other messages" do
|
|
state = @mock_state
|
|
random_msg = {:some_other_info, "data"}
|
|
assert {:noreply, ^state} = ReplicationConnection.handle_info(random_msg, state)
|
|
end
|
|
end
|
|
|
|
# --- Moved handle_disconnect test to its own describe block ---
|
|
describe "handle_disconnect/1" do
|
|
test "handle_disconnect resets step to :disconnected and logs warning" do
|
|
state = %{@mock_state | step: :streaming}
|
|
expected_state = %{state | step: :disconnected}
|
|
|
|
# Capture log to verify warning (requires ExUnit config)
|
|
log_output =
|
|
ExUnit.CaptureLog.capture_log(fn ->
|
|
assert {:noreply, ^expected_state} = ReplicationConnection.handle_disconnect(state)
|
|
end)
|
|
|
|
assert log_output =~ "Replication connection disconnected."
|
|
# Or match the exact log message if needed
|
|
end
|
|
end
|
|
end
|