feat(portal): Receive WAL events (#8909)

Firezone's control plane is a realtime, distributed system that relies
on a broadcast/subscribe system to function. In many cases, these events
are broadcasted whenever relevant data in the DB changes, such as an
actor losing access to a policy, a membership being deleted, and so
forth.

Today, this is handled in the application layer, typically happening at
the place where the relevant DB call is made (i.e. in an
`after_commit`). While this approach has worked thus far, it has several
issues:

1. We have no guarantee that the DB change will issue a broadcast. If
the application is deployed or the process crashes after the DB changes
are made but before the broadcast happens, we will have potentially
failed to update any connected clients or gateways with the changes.
2. We have no guarantee that the order of DB updates will be maintained
in order for broadcasts. In other words, app server A could win its DB
operation against app server B, but then proceed to lose being the first
to broadcast.
3. If the cluster is in a bad state where broadcasts may return an error
(i.e. https://github.com/firezone/firezone/issues/8660), we will never
retry the broadcast.

To fix the above issues, we introduce a WAL logical decoder that process
the event stream one message at a time and performs any needed work.
Serializability is guaranteed since we only process the WAL in a single,
cluster-global process, `ReplicationConnection`. Durability is also
guaranteed since we only ACK WAL segments after we've successfully
ingested the event.

This means we will only advance the position of our WAL stream after
successfully broadcasting the event.

This PR only introduces the WAL stream processing system but does not
introduce any changes to our current broadcasting behavior - that's
saved for another PR.
This commit is contained in:
Jamil
2025-04-29 23:53:06 -07:00
committed by GitHub
parent 2650d81444
commit 968db2ae39
64 changed files with 3439 additions and 32 deletions

View File

@@ -0,0 +1,40 @@
name: "Setup Postgres"
description: "Starts a Postgres container"
inputs:
version:
default: "latest"
description: "Postgres version"
required: false
port:
default: "5432"
description: "Port to expose"
required: false
username:
default: "postgres"
description: "Username"
required: false
password:
default: "postgres"
description: "Password"
required: false
options:
default: ""
description: "Additional options to pass to the container"
required: false
runs:
using: "composite"
steps:
- name: Start Postgres
id: start-postgres
shell: bash
run: |
docker run \
--name postgres \
--env POSTGRES_USER=${{ inputs.username }} \
--env POSTGRES_PASSWORD=${{ inputs.password }} \
--publish ${{ inputs.port }}:5432 \
--health-cmd pg_isready \
--health-interval 10s \
--health-timeout 5s \
--health-retries 5 \
--detach postgres:${{ inputs.version }} postgres -c "wal_level=logical"

View File

@@ -14,19 +14,11 @@ jobs:
MIX_ENV: test
POSTGRES_HOST: localhost
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
services:
postgres:
image: postgres:15
ports:
- 5432:5432
env:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
options: >-
--health-cmd pg_isready --health-interval 10s --health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: ./.github/actions/setup-postgres
with:
version: 15
- uses: ./.github/actions/setup-elixir
with:
mix_env: ${{ env.MIX_ENV }}
@@ -129,19 +121,11 @@ jobs:
MIX_ENV: dev
POSTGRES_HOST: localhost
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
services:
postgres:
image: postgres:15
ports:
- 5432:5432
env:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
options: >-
--health-cmd pg_isready --health-interval 10s --health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: ./.github/actions/setup-postgres
with:
version: 15
- uses: ./.github/actions/setup-elixir
with:
mix_env: ${{ env.MIX_ENV }}
@@ -184,16 +168,6 @@ jobs:
matrix:
MIX_TEST_PARTITION: [1]
services:
postgres:
image: postgres:15
ports:
- 5432:5432
env:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
options: >-
--health-cmd pg_isready --health-interval 10s --health-timeout 5s
--health-retries 5
vault:
image: vault:1.12.2
env:
@@ -204,6 +178,9 @@ jobs:
options: --cap-add=IPC_LOCK
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: ./.github/actions/setup-postgres
with:
version: 15
- uses: nanasess/setup-chromedriver@e93e57b843c0c92788f22483f1a31af8ee48db25 # v2.3.0
- run: |
export DISPLAY=:99

View File

@@ -80,6 +80,8 @@ services:
DATABASE_NAME: firezone_dev
DATABASE_USER: postgres
DATABASE_PASSWORD: postgres
DATABASE_REPLICATION_USER: postgres
DATABASE_REPLICATION_PASSWORD: postgres
# Auth
AUTH_PROVIDER_ADAPTERS: "email,openid_connect,userpass,token,google_workspace,microsoft_entra,okta,jumpcloud,mock"
# Secrets
@@ -152,6 +154,8 @@ services:
DATABASE_NAME: firezone_dev
DATABASE_USER: postgres
DATABASE_PASSWORD: postgres
DATABASE_REPLICATION_USER: postgres
DATABASE_REPLICATION_PASSWORD: postgres
# Auth
AUTH_PROVIDER_ADAPTERS: "email,openid_connect,userpass,token,google_workspace,microsoft_entra,okta,jumpcloud,mock"
# Secrets
@@ -216,6 +220,8 @@ services:
DATABASE_NAME: firezone_dev
DATABASE_USER: postgres
DATABASE_PASSWORD: postgres
DATABASE_REPLICATION_USER: postgres
DATABASE_REPLICATION_PASSWORD: postgres
# Auth
AUTH_PROVIDER_ADAPTERS: "email,openid_connect,userpass,token,google_workspace,microsoft_entra,okta,jumpcloud,mock"
# Secrets
@@ -284,6 +290,8 @@ services:
DATABASE_NAME: firezone_dev
DATABASE_USER: postgres
DATABASE_PASSWORD: postgres
DATABASE_REPLICATION_USER: postgres
DATABASE_REPLICATION_PASSWORD: postgres
# Auth
AUTH_PROVIDER_ADAPTERS: "email,openid_connect,userpass,token,google_workspace,microsoft_entra,okta,jumpcloud,mock"
# Secrets

View File

@@ -98,6 +98,7 @@ defmodule Domain.Accounts do
{:ok, account}
{:ok, account} ->
# TODO: WAL
:ok = Domain.Clients.disconnect_account_clients(account)
{:ok, account}
@@ -109,6 +110,7 @@ defmodule Domain.Accounts do
defp on_account_update(account, changeset) do
:ok = Billing.on_account_update(account, changeset)
# TODO: WAL
if Ecto.Changeset.changed?(changeset, :config) do
broadcast_config_update_to_account(account)
else
@@ -173,6 +175,7 @@ defmodule Domain.Accounts do
account_or_id |> account_topic() |> PubSub.unsubscribe()
end
# TODO: WAL
defp broadcast_config_update_to_account(%Account{} = account) do
broadcast_to_account(account.id, :config_changed)
end

View File

@@ -175,6 +175,7 @@ defmodule Domain.Actors do
case Repo.insert(changeset) do
{:ok, group} ->
# TODO: WAL
:ok = broadcast_group_memberships_events(group, changeset)
{:ok, group}
@@ -189,6 +190,7 @@ defmodule Domain.Actors do
case Repo.insert(changeset) do
{:ok, group} ->
# TODO: WAL
:ok = broadcast_group_memberships_events(group, changeset)
{:ok, group}
@@ -227,6 +229,7 @@ defmodule Domain.Actors do
|> Repo.preload(:memberships)
|> Group.Changeset.update(attrs)
end,
# TODO: WAL
after_commit: fn _actor, changeset -> broadcast_memberships_events(changeset) end
)
end
@@ -252,6 +255,7 @@ defmodule Domain.Actors do
{:ok, group} = Repo.update(changeset)
# TODO: WAL
:ok = broadcast_memberships_events(changeset)
group
@@ -274,6 +278,7 @@ defmodule Domain.Actors do
|> Membership.Query.returning_all()
|> Repo.delete_all()
# TODO: WAL
:ok = broadcast_membership_removal_events(memberships)
{:ok, group}
@@ -301,6 +306,7 @@ defmodule Domain.Actors do
|> Membership.Query.returning_all()
|> Repo.delete_all()
# TODO: WAL
:ok = broadcast_membership_removal_events(memberships)
with {:ok, groups} <- delete_groups(queryable, subject) do
@@ -339,6 +345,7 @@ defmodule Domain.Actors do
|> Membership.Query.returning_all()
|> Repo.delete_all()
# TODO: WAL
:ok = broadcast_membership_removal_events(memberships)
{:ok, groups}
@@ -522,6 +529,7 @@ defmodule Domain.Actors do
true -> :cant_remove_admin_type
end
end,
# TODO: WAL
after_commit: fn _actor, changeset -> broadcast_memberships_events(changeset) end
)
end
@@ -606,6 +614,7 @@ defmodule Domain.Actors do
|> Repo.delete_all()
{:ok, _groups} = update_dynamic_group_memberships(actor.account_id)
# TODO: WAL
:ok = broadcast_membership_removal_events(memberships)
{:ok, _tokens} = Tokens.delete_tokens_for(actor, subject)
@@ -676,6 +685,7 @@ defmodule Domain.Actors do
actor_or_id |> actor_memberships_topic() |> PubSub.unsubscribe()
end
# TODO: WAL
defp broadcast_memberships_events(changeset) do
if changeset.valid? and Ecto.Changeset.changed?(changeset, :memberships) do
case Ecto.Changeset.apply_action(changeset, :update) do

View File

@@ -28,11 +28,13 @@ defmodule Domain.Actors.Membership.Sync do
{:ok, inserted} <- insert_memberships(provider, insert) do
:ok =
Enum.each(insert, fn {group_id, actor_id} ->
# TODO: WAL
Actors.broadcast_membership_event(:create, actor_id, group_id)
end)
:ok =
Enum.each(delete, fn {group_id, actor_id} ->
# TODO: WAL
Actors.broadcast_membership_event(:delete, actor_id, group_id)
end)

View File

@@ -22,6 +22,13 @@ defmodule Domain.Application do
Domain.Repo,
Domain.PubSub,
# WAL replication
%{
id: Domain.Events.ReplicationConnection,
start: {Domain.Events.ReplicationConnection, :start_link, [replication_instance()]},
restart: :transient
},
# Infrastructure services
# Note: only one of platform adapters will be actually started.
Domain.GoogleCloudPlatform,
@@ -45,6 +52,11 @@ defmodule Domain.Application do
]
end
defp replication_instance do
config = Application.fetch_env!(:domain, Domain.Events.ReplicationConnection)
struct(Domain.Events.ReplicationConnection, config)
end
defp configure_logger do
# Attach Oban to the logger
Oban.Telemetry.attach_default_logger(encode: false, level: log_level())

View File

@@ -188,6 +188,7 @@ defmodule Domain.Clients do
with: &Client.Changeset.update(&1, attrs),
preload: [:online?]
)
# TODO: WAL
|> case do
{:ok, client} ->
:ok = broadcast_to_client(client, :updated)
@@ -212,6 +213,7 @@ defmodule Domain.Clients do
|> case do
{:ok, client} ->
client = Repo.preload(client, [:verified_by_actor, :verified_by_identity])
# TODO: WAL
:ok = broadcast_to_client(client, :updated)
{:ok, client}
@@ -234,6 +236,7 @@ defmodule Domain.Clients do
|> case do
{:ok, client} ->
{:ok, _flows} = Flows.expire_flows_for(client)
# TODO: WAL
:ok = broadcast_to_client(client, :updated)
{:ok, client}
@@ -251,6 +254,7 @@ defmodule Domain.Clients do
with :ok <- authorize_actor_client_management(client.actor_id, subject) do
case delete_clients(queryable, subject) do
{:ok, [client]} ->
# TODO: WAL
:ok = disconnect_client(client)
{:ok, client}
@@ -268,6 +272,7 @@ defmodule Domain.Clients do
with :ok <- Auth.ensure_has_permissions(subject, Authorizer.manage_clients_permission()),
{:ok, _clients} <- delete_clients(queryable, subject) do
# TODO: WAL
:ok = disconnect_actor_clients(actor)
:ok
end
@@ -280,6 +285,7 @@ defmodule Domain.Clients do
|> Client.Query.delete()
|> Repo.update_all([])
# TODO: WAL
:ok = Enum.each(clients, &disconnect_client/1)
{:ok, clients}
@@ -305,6 +311,7 @@ defmodule Domain.Clients do
{:ok, _} <-
Presence.track(self(), actor_clients_presence_topic(client.actor_id), client.id, %{}) do
:ok = PubSub.subscribe(client_topic(client))
# TODO: WAL
# :ok = PubSub.subscribe(actor_clients_topic(client.actor_id))
# :ok = PubSub.subscribe(identity_topic(client.actor_id))
:ok = PubSub.subscribe(account_clients_topic(client.account_id))
@@ -347,6 +354,7 @@ defmodule Domain.Clients do
PubSub.unsubscribe(actor_clients_presence_topic(actor_or_id))
end
# TODO: WAL
def broadcast_to_account_clients(account_or_id, payload) do
account_or_id
|> account_clients_topic()

View File

@@ -290,6 +290,16 @@ defmodule Domain.Config.Definitions do
"""
defconfig(:database_password, :string, default: nil, sensitive: true)
@doc """
Replication user that will be used to connect to replication slots.
"""
defconfig(:database_replication_user, :string, default: nil, sensitive: true)
@doc """
Replication password for the replication user.
"""
defconfig(:database_replication_password, :string, default: nil, sensitive: true)
@doc """
Size of the connection pool to the PostgreSQL database.
"""

View File

@@ -0,0 +1,370 @@
# CREDIT: https://github.com/supabase/realtime/blob/main/lib/realtime/adapters/postgres/decoder.ex
defmodule Domain.Events.Decoder do
@moduledoc """
Functions for decoding different types of logical replication messages.
"""
defmodule Messages do
@moduledoc """
Different types of logical replication messages from Postgres
"""
defmodule Begin do
@moduledoc """
Struct representing the BEGIN message in PostgreSQL's logical decoding output.
* `final_lsn` - The LSN of the commit that this transaction ended at.
* `commit_timestamp` - The timestamp of the commit that this transaction ended at.
* `xid` - The transaction ID of this transaction.
"""
defstruct [:final_lsn, :commit_timestamp, :xid]
end
defmodule Commit do
@moduledoc """
Struct representing the COMMIT message in PostgreSQL's logical decoding output.
* `flags` - Bitmask of flags associated with this commit.
* `lsn` - The LSN of the commit.
* `end_lsn` - The LSN of the next record in the WAL stream.
* `commit_timestamp` - The timestamp of the commit.
"""
defstruct [:flags, :lsn, :end_lsn, :commit_timestamp]
end
defmodule Origin do
@moduledoc """
Struct representing the ORIGIN message in PostgreSQL's logical decoding output.
* `origin_commit_lsn` - The LSN of the commit in the database that the change originated from.
* `name` - The name of the origin.
"""
defstruct [:origin_commit_lsn, :name]
end
defmodule Relation do
@moduledoc """
Struct representing the RELATION message in PostgreSQL's logical decoding output.
* `id` - The OID of the relation.
* `namespace` - The OID of the namespace that the relation belongs to.
* `name` - The name of the relation.
* `replica_identity` - The replica identity setting of the relation.
* `columns` - A list of columns in the relation.
"""
defstruct [:id, :namespace, :name, :replica_identity, :columns]
defmodule Column do
@moduledoc """
Struct representing a column in a relation.
* `flags` - Bitmask of flags associated with this column.
* `name` - The name of the column.
* `type` - The OID of the data type of the column.
* `type_modifier` - The type modifier of the column.
"""
defstruct [:flags, :name, :type, :type_modifier]
end
end
defmodule Insert do
@moduledoc """
Struct representing the INSERT message in PostgreSQL's logical decoding output.
* `relation_id` - The OID of the relation that the tuple was inserted into.
* `tuple_data` - The data of the inserted tuple.
"""
defstruct [:relation_id, :tuple_data]
end
defmodule Update do
@moduledoc """
Struct representing the UPDATE message in PostgreSQL's logical decoding output.
* `relation_id` - The OID of the relation that the tuple was updated in.
* `changed_key_tuple_data` - The data of the tuple with the old key values.
* `old_tuple_data` - The data of the tuple before the update.
* `tuple_data` - The data of the tuple after the update.
"""
defstruct [:relation_id, :changed_key_tuple_data, :old_tuple_data, :tuple_data]
end
defmodule Delete do
@moduledoc """
Struct representing the DELETE message in PostgreSQL's logical decoding output.
* `relation_id` - The OID of the relation that the tuple was deleted from.
* `changed_key_tuple_data` - The data of the tuple with the old key values.
* `old_tuple_data` - The data of the tuple before the delete.
"""
defstruct [:relation_id, :changed_key_tuple_data, :old_tuple_data]
end
defmodule Truncate do
@moduledoc """
Struct representing the TRUNCATE message in PostgreSQL's logical decoding output.
* `number_of_relations` - The number of truncated relations.
* `options` - Additional options provided when truncating the relations.
* `truncated_relations` - List of relations that have been truncated.
"""
defstruct [:number_of_relations, :options, :truncated_relations]
end
defmodule Type do
@moduledoc """
Struct representing the TYPE message in PostgreSQL's logical decoding output.
* `id` - The OID of the type.
* `namespace` - The namespace of the type.
* `name` - The name of the type.
"""
defstruct [:id, :namespace, :name]
end
defmodule Unsupported do
@moduledoc """
Struct representing an unsupported message in PostgreSQL's logical decoding output.
* `data` - The raw data of the unsupported message.
"""
defstruct [:data]
end
end
require Logger
@pg_epoch DateTime.from_iso8601("2000-01-01T00:00:00Z")
alias Messages.{
Begin,
Commit,
Origin,
Relation,
Relation.Column,
Insert,
Update,
Delete,
Truncate,
Type,
Unsupported
}
alias Domain.Events.OidDatabase
@doc """
Parses logical replication messages from Postgres
## Examples
iex> decode_message(<<73, 0, 0, 96, 0, 78, 0, 2, 116, 0, 0, 0, 3, 98, 97, 122, 116, 0, 0, 0, 3, 53, 54, 48>>)
%Domain.Events.Decoder.Messages.Insert{relation_id: 24576, tuple_data: {"baz", "560"}}
"""
def decode_message(message) when is_binary(message) do
decode_message_impl(message)
end
defp decode_message_impl(<<"B", lsn::binary-8, timestamp::integer-64, xid::integer-32>>) do
%Begin{
final_lsn: decode_lsn(lsn),
commit_timestamp: pgtimestamp_to_timestamp(timestamp),
xid: xid
}
end
defp decode_message_impl(
<<"C", _flags::binary-1, lsn::binary-8, end_lsn::binary-8, timestamp::integer-64>>
) do
%Commit{
flags: [],
lsn: decode_lsn(lsn),
end_lsn: decode_lsn(end_lsn),
commit_timestamp: pgtimestamp_to_timestamp(timestamp)
}
end
# TODO: Verify this is correct with real data from Postgres
defp decode_message_impl(<<"O", lsn::binary-8, name::binary>>) do
%Origin{
origin_commit_lsn: decode_lsn(lsn),
name: name
}
end
defp decode_message_impl(<<"R", id::integer-32, rest::binary>>) do
[
namespace
| [name | [<<replica_identity::binary-1, _number_of_columns::integer-16, columns::binary>>]]
] = String.split(rest, <<0>>, parts: 3)
# TODO: Handle case where pg_catalog is blank, we should still return the schema as pg_catalog
friendly_replica_identity =
case replica_identity do
"d" -> :default
"n" -> :nothing
"f" -> :all_columns
"i" -> :index
end
%Relation{
id: id,
namespace: namespace,
name: name,
replica_identity: friendly_replica_identity,
columns: decode_columns(columns)
}
end
defp decode_message_impl(
<<"I", relation_id::integer-32, "N", number_of_columns::integer-16, tuple_data::binary>>
) do
{<<>>, decoded_tuple_data} = decode_tuple_data(tuple_data, number_of_columns)
%Insert{
relation_id: relation_id,
tuple_data: decoded_tuple_data
}
end
defp decode_message_impl(
<<"U", relation_id::integer-32, "N", number_of_columns::integer-16, tuple_data::binary>>
) do
{<<>>, decoded_tuple_data} = decode_tuple_data(tuple_data, number_of_columns)
%Update{
relation_id: relation_id,
tuple_data: decoded_tuple_data
}
end
defp decode_message_impl(
<<"U", relation_id::integer-32, key_or_old::binary-1, number_of_columns::integer-16,
tuple_data::binary>>
)
when key_or_old == "O" or key_or_old == "K" do
{<<"N", new_number_of_columns::integer-16, new_tuple_binary::binary>>, old_decoded_tuple_data} =
decode_tuple_data(tuple_data, number_of_columns)
{<<>>, decoded_tuple_data} = decode_tuple_data(new_tuple_binary, new_number_of_columns)
base_update_msg = %Update{
relation_id: relation_id,
tuple_data: decoded_tuple_data
}
case key_or_old do
"K" -> Map.put(base_update_msg, :changed_key_tuple_data, old_decoded_tuple_data)
"O" -> Map.put(base_update_msg, :old_tuple_data, old_decoded_tuple_data)
end
end
defp decode_message_impl(
<<"D", relation_id::integer-32, key_or_old::binary-1, number_of_columns::integer-16,
tuple_data::binary>>
)
when key_or_old == "K" or key_or_old == "O" do
{<<>>, decoded_tuple_data} = decode_tuple_data(tuple_data, number_of_columns)
base_delete_msg = %Delete{
relation_id: relation_id
}
case key_or_old do
"K" -> Map.put(base_delete_msg, :changed_key_tuple_data, decoded_tuple_data)
"O" -> Map.put(base_delete_msg, :old_tuple_data, decoded_tuple_data)
end
end
defp decode_message_impl(
<<"T", number_of_relations::integer-32, options::integer-8, column_ids::binary>>
) do
truncated_relations =
for relation_id_bin <- column_ids |> :binary.bin_to_list() |> Enum.chunk_every(4),
do: relation_id_bin |> :binary.list_to_bin() |> :binary.decode_unsigned()
decoded_options =
case options do
0 -> []
1 -> [:cascade]
2 -> [:restart_identity]
3 -> [:cascade, :restart_identity]
end
%Truncate{
number_of_relations: number_of_relations,
options: decoded_options,
truncated_relations: truncated_relations
}
end
defp decode_message_impl(<<"Y", data_type_id::integer-32, namespace_and_name::binary>>) do
[namespace, name_with_null] = :binary.split(namespace_and_name, <<0>>)
name = String.slice(name_with_null, 0..-2//1)
%Type{
id: data_type_id,
namespace: namespace,
name: name
}
end
defp decode_message_impl(binary), do: %Unsupported{data: binary}
defp decode_tuple_data(binary, columns_remaining, accumulator \\ [])
defp decode_tuple_data(remaining_binary, 0, accumulator) when is_binary(remaining_binary),
do: {remaining_binary, accumulator |> Enum.reverse() |> List.to_tuple()}
defp decode_tuple_data(<<"n", rest::binary>>, columns_remaining, accumulator),
do: decode_tuple_data(rest, columns_remaining - 1, [nil | accumulator])
defp decode_tuple_data(<<"u", rest::binary>>, columns_remaining, accumulator),
do: decode_tuple_data(rest, columns_remaining - 1, [:unchanged_toast | accumulator])
defp decode_tuple_data(
<<"t", column_length::integer-32, rest::binary>>,
columns_remaining,
accumulator
),
do:
decode_tuple_data(
:erlang.binary_part(rest, {byte_size(rest), -(byte_size(rest) - column_length)}),
columns_remaining - 1,
[
:erlang.binary_part(rest, {0, column_length}) | accumulator
]
)
defp decode_columns(binary, accumulator \\ [])
defp decode_columns(<<>>, accumulator), do: Enum.reverse(accumulator)
defp decode_columns(<<flags::integer-8, rest::binary>>, accumulator) do
[name | [<<data_type_id::integer-32, type_modifier::integer-32, columns::binary>>]] =
String.split(rest, <<0>>, parts: 2)
decoded_flags =
case flags do
1 -> [:key]
_ -> []
end
decode_columns(columns, [
%Column{
name: name,
flags: decoded_flags,
type: OidDatabase.name_for_type_id(data_type_id),
# type: data_type_id,
type_modifier: type_modifier
}
| accumulator
])
end
defp pgtimestamp_to_timestamp(microsecond_offset) when is_integer(microsecond_offset) do
{:ok, epoch, 0} = @pg_epoch
DateTime.add(epoch, microsecond_offset, :microsecond)
end
defp decode_lsn(<<xlog_file::integer-32, xlog_offset::integer-32>>),
do: {xlog_file, xlog_offset}
end

View File

@@ -0,0 +1,325 @@
defmodule Domain.Events.Event do
alias Domain.Events.Decoder
alias Domain.Events.Hooks
require Logger
@doc """
Ingests a WAL write message from Postgres, transforms it into an event, and sends
it to the appropriate hook module for processing.
"""
def ingest(msg, relations) do
{op, tuple_data, old_tuple_data} = extract_msg_data(msg)
{:ok, relation} = Map.fetch(relations, msg.relation_id)
table = relation.name
old_data = zip(old_tuple_data, relation.columns)
data = zip(tuple_data, relation.columns)
process(op, table, old_data, data)
end
############
# accounts #
############
defp process(:insert, "accounts", _old_data, data) do
Hooks.Accounts.insert(data)
end
defp process(:update, "accounts", old_data, data) do
Hooks.Accounts.update(old_data, data)
end
defp process(:delete, "accounts", old_data, _data) do
Hooks.Accounts.delete(old_data)
end
###########################
# actor_group_memberships #
###########################
defp process(:insert, "actor_group_memberships", _old_data, data) do
Hooks.ActorGroupMemberships.insert(data)
end
defp process(:update, "actor_group_memberships", old_data, data) do
Hooks.ActorGroupMemberships.update(old_data, data)
end
defp process(:delete, "actor_group_memberships", old_data, _data) do
Hooks.ActorGroupMemberships.delete(old_data)
end
################
# actor_groups #
################
defp process(:insert, "actor_groups", _old_data, data) do
Hooks.ActorGroups.insert(data)
end
defp process(:update, "actor_groups", old_data, data) do
Hooks.ActorGroups.update(old_data, data)
end
defp process(:delete, "actor_groups", old_data, _data) do
Hooks.ActorGroups.delete(old_data)
end
##########
# actors #
##########
defp process(:insert, "actors", _old_data, data) do
Hooks.Actors.insert(data)
end
defp process(:update, "actors", old_data, data) do
Hooks.Actors.update(old_data, data)
end
defp process(:delete, "actors", old_data, _data) do
Hooks.Actors.delete(old_data)
end
###################
# auth_identities #
###################
defp process(:insert, "auth_identities", _old_data, data) do
Hooks.AuthIdentities.insert(data)
end
defp process(:update, "auth_identities", old_data, data) do
Hooks.AuthIdentities.update(old_data, data)
end
defp process(:delete, "auth_identities", old_data, _data) do
Hooks.AuthIdentities.delete(old_data)
end
##################
# auth_providers #
##################
defp process(:insert, "auth_providers", _old_data, data) do
Hooks.AuthProviders.insert(data)
end
defp process(:update, "auth_providers", old_data, data) do
Hooks.AuthProviders.update(old_data, data)
end
defp process(:delete, "auth_providers", old_data, _data) do
Hooks.AuthProviders.delete(old_data)
end
###########
# clients #
###########
defp process(:insert, "clients", _old_data, data) do
Hooks.Clients.insert(data)
end
defp process(:update, "clients", old_data, data) do
Hooks.Clients.update(old_data, data)
end
defp process(:delete, "clients", old_data, _data) do
Hooks.Clients.delete(old_data)
end
###################
# flow_activities #
###################
defp process(:insert, "flow_activities", _old_data, data) do
Hooks.FlowActivities.insert(data)
end
defp process(:update, "flow_activities", old_data, data) do
Hooks.FlowActivities.update(old_data, data)
end
defp process(:delete, "flow_activities", old_data, _data) do
Hooks.FlowActivities.delete(old_data)
end
#########
# flows #
#########
defp process(:insert, "flows", _old_data, data) do
Hooks.Flows.insert(data)
end
defp process(:update, "flows", old_data, data) do
Hooks.Flows.update(old_data, data)
end
defp process(:delete, "flows", old_data, _data) do
Hooks.Flows.delete(old_data)
end
##################
# gateway_groups #
##################
defp process(:insert, "gateway_groups", _old_data, data) do
Hooks.GatewayGroups.insert(data)
end
defp process(:update, "gateway_groups", old_data, data) do
Hooks.GatewayGroups.update(old_data, data)
end
defp process(:delete, "gateway_groups", old_data, _data) do
Hooks.GatewayGroups.delete(old_data)
end
############
# gateways #
############
defp process(:insert, "gateways", _old_data, data) do
Hooks.Gateways.insert(data)
end
defp process(:update, "gateways", old_data, data) do
Hooks.Gateways.update(old_data, data)
end
defp process(:delete, "gateways", old_data, _data) do
Hooks.Gateways.delete(old_data)
end
############
# policies #
############
defp process(:insert, "policies", _old_data, data) do
Hooks.Policies.insert(data)
end
defp process(:update, "policies", old_data, data) do
Hooks.Policies.update(old_data, data)
end
defp process(:delete, "policies", old_data, _data) do
Hooks.Policies.delete(old_data)
end
################
# relay_groups #
################
defp process(:insert, "relay_groups", _old_data, data) do
Hooks.RelayGroups.insert(data)
end
defp process(:update, "relay_groups", old_data, data) do
Hooks.RelayGroups.update(old_data, data)
end
defp process(:delete, "relay_groups", old_data, _data) do
Hooks.RelayGroups.delete(old_data)
end
##########
# relays #
##########
defp process(:insert, "relays", _old_data, data) do
Hooks.Relays.insert(data)
end
defp process(:update, "relays", old_data, data) do
Hooks.Relays.update(old_data, data)
end
defp process(:delete, "relays", old_data, _data) do
Hooks.Relays.delete(old_data)
end
########################
# resource_connections #
########################
defp process(:insert, "resource_connections", _old_data, data) do
Hooks.ResourceConnections.insert(data)
end
defp process(:update, "resource_connections", old_data, data) do
Hooks.ResourceConnections.update(old_data, data)
end
defp process(:delete, "resource_connections", old_data, _data) do
Hooks.ResourceConnections.delete(old_data)
end
#############
# resources #
#############
defp process(:insert, "resources", _old_data, data) do
Hooks.Resources.insert(data)
end
defp process(:update, "resources", old_data, data) do
Hooks.Resources.update(old_data, data)
end
defp process(:delete, "resources", old_data, _data) do
Hooks.Resources.delete(old_data)
end
##########
# tokens #
##########
defp process(:insert, "tokens", _old_data, data) do
Hooks.Tokens.insert(data)
end
defp process(:update, "tokens", old_data, data) do
Hooks.Tokens.update(old_data, data)
end
defp process(:delete, "tokens", old_data, _data) do
Hooks.Tokens.delete(old_data)
end
#############
# CATCH-ALL #
#############
defp process(op, table, _old_data, _data) do
Logger.warning("Unhandled event type!", op: op, table: table)
:ok
end
defp extract_msg_data(%Decoder.Messages.Insert{tuple_data: data}) do
{:insert, nil, data}
end
defp extract_msg_data(%Decoder.Messages.Update{old_tuple_data: data, tuple_data: old}) do
{:update, old, data}
end
defp extract_msg_data(%Decoder.Messages.Delete{old_tuple_data: old}) do
{:delete, old, nil}
end
defp zip(nil, _), do: nil
defp zip(tuple_data, columns) do
tuple_data
|> Tuple.to_list()
|> Enum.zip(columns)
|> Map.new(fn {value, column} -> {column.name, value} end)
|> Enum.into(%{})
end
end

View File

@@ -0,0 +1,13 @@
defmodule Domain.Events.Hooks.Accounts do
def insert(_data) do
:ok
end
def update(_old_data, _data) do
:ok
end
def delete(_old_data) do
:ok
end
end

View File

@@ -0,0 +1,13 @@
defmodule Domain.Events.Hooks.ActorGroupMemberships do
def insert(_data) do
:ok
end
def update(_old_data, _data) do
:ok
end
def delete(_old_data) do
:ok
end
end

View File

@@ -0,0 +1,13 @@
defmodule Domain.Events.Hooks.ActorGroups do
def insert(_data) do
:ok
end
def update(_old_data, _data) do
:ok
end
def delete(_old_data) do
:ok
end
end

View File

@@ -0,0 +1,13 @@
defmodule Domain.Events.Hooks.Actors do
def insert(_data) do
:ok
end
def update(_old_data, _data) do
:ok
end
def delete(_old_data) do
:ok
end
end

View File

@@ -0,0 +1,13 @@
defmodule Domain.Events.Hooks.AuthIdentities do
def insert(_data) do
:ok
end
def update(_old_data, _data) do
:ok
end
def delete(_old_data) do
:ok
end
end

View File

@@ -0,0 +1,13 @@
defmodule Domain.Events.Hooks.AuthProviders do
def insert(_data) do
:ok
end
def update(_old_data, _data) do
:ok
end
def delete(_old_data) do
:ok
end
end

View File

@@ -0,0 +1,13 @@
defmodule Domain.Events.Hooks.Clients do
def insert(_data) do
:ok
end
def update(_old_data, _data) do
:ok
end
def delete(_old_data) do
:ok
end
end

View File

@@ -0,0 +1,13 @@
defmodule Domain.Events.Hooks.FlowActivities do
def insert(_data) do
:ok
end
def update(_old_data, _data) do
:ok
end
def delete(_old_data) do
:ok
end
end

View File

@@ -0,0 +1,13 @@
defmodule Domain.Events.Hooks.Flows do
def insert(_data) do
:ok
end
def update(_old_data, _data) do
:ok
end
def delete(_old_data) do
:ok
end
end

View File

@@ -0,0 +1,13 @@
defmodule Domain.Events.Hooks.GatewayGroups do
def insert(_data) do
:ok
end
def update(_old_data, _data) do
:ok
end
def delete(_old_data) do
:ok
end
end

View File

@@ -0,0 +1,13 @@
defmodule Domain.Events.Hooks.Gateways do
def insert(_data) do
:ok
end
def update(_old_data, _data) do
:ok
end
def delete(_old_data) do
:ok
end
end

View File

@@ -0,0 +1,13 @@
defmodule Domain.Events.Hooks.Policies do
def insert(_data) do
:ok
end
def update(_old_data, _data) do
:ok
end
def delete(_old_data) do
:ok
end
end

View File

@@ -0,0 +1,13 @@
defmodule Domain.Events.Hooks.RelayGroups do
def insert(_data) do
:ok
end
def update(_old_data, _data) do
:ok
end
def delete(_old_data) do
:ok
end
end

View File

@@ -0,0 +1,13 @@
defmodule Domain.Events.Hooks.Relays do
def insert(_data) do
:ok
end
def update(_old_data, _data) do
:ok
end
def delete(_old_data) do
:ok
end
end

View File

@@ -0,0 +1,13 @@
defmodule Domain.Events.Hooks.ResourceConnections do
def insert(_data) do
:ok
end
def update(_old_data, _data) do
:ok
end
def delete(_old_data) do
:ok
end
end

View File

@@ -0,0 +1,13 @@
defmodule Domain.Events.Hooks.Resources do
def insert(_data) do
:ok
end
def update(_old_data, _data) do
:ok
end
def delete(_old_data) do
:ok
end
end

View File

@@ -0,0 +1,13 @@
defmodule Domain.Events.Hooks.Tokens do
def insert(_data) do
:ok
end
def update(_old_data, _data) do
:ok
end
def delete(_old_data) do
:ok
end
end

View File

@@ -0,0 +1,163 @@
# CREDIT: https://github.com/supabase/realtime/blob/main/lib/realtime/adapters/postgres/oid_database.ex
defmodule Domain.Events.OidDatabase do
@moduledoc "This module maps a numeric PostgreSQL type ID to a descriptive string."
@doc """
Maps a numeric PostgreSQL type ID to a descriptive string.
## Examples
iex> name_for_type_id(1700)
"numeric"
iex> name_for_type_id(25)
"text"
iex> name_for_type_id(3802)
"jsonb"
"""
# credo:disable-for-next-line Credo.Check.Refactor.CyclomaticComplexity
def name_for_type_id(type_id) do
case type_id do
16 -> "bool"
17 -> "bytea"
18 -> "char"
19 -> "name"
20 -> "int8"
21 -> "int2"
22 -> "int2vector"
23 -> "int4"
24 -> "regproc"
25 -> "text"
26 -> "oid"
27 -> "tid"
28 -> "xid"
29 -> "cid"
30 -> "oidvector"
114 -> "json"
142 -> "xml"
143 -> "_xml"
194 -> "pg_node_tree"
199 -> "_json"
210 -> "smgr"
600 -> "point"
601 -> "lseg"
602 -> "path"
603 -> "box"
604 -> "polygon"
628 -> "line"
629 -> "_line"
650 -> "cidr"
651 -> "_cidr"
700 -> "float4"
701 -> "float8"
702 -> "abstime"
703 -> "reltime"
704 -> "tinterval"
718 -> "circle"
719 -> "_circle"
774 -> "macaddr8"
775 -> "_macaddr8"
790 -> "money"
791 -> "_money"
829 -> "macaddr"
869 -> "inet"
1000 -> "_bool"
1001 -> "_bytea"
1002 -> "_char"
1003 -> "_name"
1005 -> "_int2"
1006 -> "_int2vector"
1007 -> "_int4"
1008 -> "_regproc"
1009 -> "_text"
1010 -> "_tid"
1011 -> "_xid"
1012 -> "_cid"
1013 -> "_oidvector"
1014 -> "_bpchar"
1015 -> "_varchar"
1016 -> "_int8"
1017 -> "_point"
1018 -> "_lseg"
1019 -> "_path"
1020 -> "_box"
1021 -> "_float4"
1022 -> "_float8"
1023 -> "_abstime"
1024 -> "_reltime"
1025 -> "_tinterval"
1027 -> "_polygon"
1028 -> "_oid"
1033 -> "aclitem"
1034 -> "_aclitem"
1040 -> "_macaddr"
1041 -> "_inet"
1042 -> "bpchar"
1043 -> "varchar"
1082 -> "date"
1083 -> "time"
1114 -> "timestamp"
1115 -> "_timestamp"
1182 -> "_date"
1183 -> "_time"
1184 -> "timestamptz"
1185 -> "_timestamptz"
1186 -> "interval"
1187 -> "_interval"
1231 -> "_numeric"
1263 -> "_cstring"
1266 -> "timetz"
1270 -> "_timetz"
1560 -> "bit"
1561 -> "_bit"
1562 -> "varbit"
1563 -> "_varbit"
1700 -> "numeric"
1790 -> "refcursor"
2201 -> "_refcursor"
2202 -> "regprocedure"
2203 -> "regoper"
2204 -> "regoperator"
2205 -> "regclass"
2206 -> "regtype"
2207 -> "_regprocedure"
2208 -> "_regoper"
2209 -> "_regoperator"
2210 -> "_regclass"
2211 -> "_regtype"
2949 -> "_txid_snapshot"
2950 -> "uuid"
2951 -> "_uuid"
2970 -> "txid_snapshot"
3220 -> "pg_lsn"
3221 -> "_pg_lsn"
3361 -> "pg_ndistinct"
3402 -> "pg_dependencies"
3614 -> "tsvector"
3615 -> "tsquery"
3642 -> "gtsvector"
3643 -> "_tsvector"
3644 -> "_gtsvector"
3645 -> "_tsquery"
3734 -> "regconfig"
3735 -> "_regconfig"
3769 -> "regdictionary"
3770 -> "_regdictionary"
3802 -> "jsonb"
3807 -> "_jsonb"
3905 -> "_int4range"
3907 -> "_numrange"
3909 -> "_tsrange"
3911 -> "_tstzrange"
3913 -> "_daterange"
3927 -> "_int8range"
4089 -> "regnamespace"
4090 -> "_regnamespace"
4096 -> "regrole"
4097 -> "_regrole"
_ -> type_id
end
end
end

View File

@@ -0,0 +1,67 @@
# CREDIT: https://github.com/supabase/realtime/blob/main/lib/realtime/adapters/postgres/protocol.ex
defmodule Domain.Events.Protocol do
@moduledoc """
This module is responsible for parsing the Postgres WAL messages.
"""
alias Domain.Events.Protocol.Write
alias Domain.Events.Protocol.KeepAlive
defguard is_write(value) when binary_part(value, 0, 1) == <<?w>>
defguard is_keep_alive(value) when binary_part(value, 0, 1) == <<?k>>
def parse(
<<?w, server_wal_start::64, server_wal_end::64, server_system_clock::64, message::binary>>
) do
%Write{
server_wal_start: server_wal_start,
server_wal_end: server_wal_end,
server_system_clock: server_system_clock,
message: message
}
end
def parse(<<?k, wal_end::64, clock::64, reply::8>>) do
reply =
case reply do
0 -> :later
1 -> :now
end
%KeepAlive{wal_end: wal_end, clock: clock, reply: reply}
end
@doc """
Message to send to the server to request a standby status update.
Check https://www.postgresql.org/docs/current/protocol-replication.html#PROTOCOL-REPLICATION-STANDBY-STATUS-UPDATE for more information
"""
@spec standby_status(integer(), integer(), integer(), :now | :later, integer() | nil) :: [
binary()
]
def standby_status(last_wal_received, last_wal_flushed, last_wal_applied, reply, clock \\ nil)
def standby_status(last_wal_received, last_wal_flushed, last_wal_applied, reply, nil) do
standby_status(last_wal_received, last_wal_flushed, last_wal_applied, reply, current_time())
end
def standby_status(last_wal_received, last_wal_flushed, last_wal_applied, reply, clock) do
reply =
case reply do
:now -> 1
:later -> 0
end
[
<<?r, last_wal_received::64, last_wal_flushed::64, last_wal_applied::64, clock::64,
reply::8>>
]
end
@doc """
Message to send the server to not do any operation since the server can wait
"""
def hold, do: []
@epoch DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond)
def current_time, do: System.os_time(:microsecond) - @epoch
end

View File

@@ -0,0 +1,24 @@
defmodule Domain.Events.Protocol.KeepAlive do
@moduledoc """
Primary keepalive message (B)
Byte1('k')
Identifies the message as a sender keepalive.
Int64
The current end of WAL on the server.
Int64
The server's system clock at the time of transmission, as microseconds since midnight on 2000-01-01.
Byte1
1 means that the client should reply to this message as soon as possible, to avoid a timeout disconnect. 0 otherwise.
The receiving process can send replies back to the sender at any time, using one of the following message formats (also in the payload of a CopyData message):
"""
@type t :: %__MODULE__{
wal_end: integer(),
clock: integer(),
reply: :now | :await
}
defstruct [:wal_end, :clock, :reply]
end

View File

@@ -0,0 +1,22 @@
defmodule Domain.Events.Protocol.Write do
@moduledoc """
XLogData (B)
Byte1('w')
Identifies the message as WAL data.
Int64
The starting point of the WAL data in this message.
Int64
The current end of WAL on the server.
Int64
The server's system clock at the time of transmission, as microseconds since midnight on 2000-01-01.
Byten
A section of the WAL data stream.
A single WAL record is never split across two XLogData messages. When a WAL record crosses a WAL page boundary, and is therefore already split using continuation records, it can be split at the page boundary. In other words, the first main WAL record and its continuation records can be sent in different XLogData messages.
"""
defstruct [:server_wal_start, :server_wal_end, :server_system_clock, :message]
end

View File

@@ -0,0 +1,312 @@
# CREDIT: https://github.com/supabase/realtime/blob/main/lib/realtime/tenants/replication_connection.ex
defmodule Domain.Events.ReplicationConnection do
@moduledoc """
Receives WAL events from PostgreSQL and broadcasts them where they need to go.
Generally, we only want to start one of these connections per cluster in order
to obtain a serial stream of the WAL. We can then fanout these events to the
appropriate consumers.
The ReplicationConnection is started with a durable slot so that whatever data we
fail to acknowledge is retained in the slot on the server's disk. The server will
then send us the data when we reconnect. This is important because we want to
ensure that we don't lose any WAL data if we disconnect or crash, such as during a deploy.
The WAL data we receive is sent only once a COMMIT completes on the server. So even though
COMMIT is one of the message types we receive here, we can safely ignore it and process
insert/update/delete messages one-by-one in this module as we receive them.
"""
use Postgrex.ReplicationConnection
require Logger
import Domain.Events.Protocol
import Domain.Events.Decoder
alias Domain.Events.Event
alias Domain.Events.Decoder
alias Domain.Events.Protocol.{KeepAlive, Write}
@type t :: %__MODULE__{
schema: String.t(),
connection_opts: Keyword.t(),
step:
:disconnected
| :check_publication
| :create_publication
| :check_replication_slot
| :create_slot
| :start_replication_slot
| :streaming,
publication_name: String.t(),
replication_slot_name: String.t(),
output_plugin: String.t(),
proto_version: integer(),
table_subscriptions: list(),
relations: map()
}
defstruct schema: "public",
connection_opts: [],
step: :disconnected,
publication_name: "events",
replication_slot_name: "events_slot",
output_plugin: "pgoutput",
proto_version: 1,
table_subscriptions: [],
relations: %{}
def start_link(%__MODULE__{} = instance) do
# Start only one ReplicationConnection in the cluster.
opts = instance.connection_opts ++ [name: {:global, __MODULE__}]
case(Postgrex.ReplicationConnection.start_link(__MODULE__, instance, opts)) do
{:ok, pid} ->
{:ok, pid}
{:error, {:already_started, pid}} ->
{:ok, pid}
error ->
Logger.error("Failed to start replication connection!",
error: inspect(error)
)
error
end
end
@impl true
def init(state) do
{:ok, state}
end
@doc """
Called when we make a successful connection to the PostgreSQL server.
"""
@impl true
def handle_connect(state) do
query = "SELECT 1 FROM pg_publication WHERE pubname = '#{state.publication_name}'"
{:query, query, %{state | step: :create_publication}}
end
@doc """
Generic callback that handles replies to the queries we send.
We use a simple state machine to issue queries one at a time to Postgres in order to:
1. Check if the publication exists
2. Check if the replication slot exists
3. Create the publication if it doesn't exist
4. Create the replication slot if it doesn't exist
5. Start the replication slot
6. Start streaming data from the replication slot
"""
@impl true
def handle_result(
[%Postgrex.Result{num_rows: 1}],
%__MODULE__{step: :create_publication} = state
) do
query =
"SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'"
{:query, query, %{state | step: :create_replication_slot}}
end
def handle_result(
[%Postgrex.Result{num_rows: 1}],
%__MODULE__{step: :create_replication_slot} = state
) do
{:query, "SELECT 1", %{state | step: :start_replication_slot}}
end
def handle_result([%Postgrex.Result{}], %__MODULE__{step: :start_replication_slot} = state) do
query =
"START_REPLICATION SLOT \"#{state.replication_slot_name}\" LOGICAL 0/0 (proto_version '#{state.proto_version}', publication_names '#{state.publication_name}')"
{:stream, query, [], %{state | step: :streaming}}
end
def handle_result(
[%Postgrex.Result{num_rows: 0}],
%__MODULE__{step: :create_publication} = state
) do
tables =
state.table_subscriptions
|> Enum.map_join(",", fn table -> "#{state.schema}.#{table}" end)
query = "CREATE PUBLICATION #{state.publication_name} FOR TABLE #{tables}"
{:query, query, %{state | step: :check_replication_slot}}
end
def handle_result([%Postgrex.Result{}], %__MODULE__{step: :check_replication_slot} = state) do
query =
"SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'"
{:query, query, %{state | step: :create_replication_slot}}
end
def handle_result(
[%Postgrex.Result{num_rows: 0}],
%__MODULE__{step: :create_replication_slot} = state
) do
query =
"CREATE_REPLICATION_SLOT #{state.replication_slot_name} LOGICAL #{state.output_plugin} NOEXPORT_SNAPSHOT"
{:query, query, %{state | step: :start_replication_slot}}
end
@doc """
Called when we receive a message from the PostgreSQL server.
We handle the following messages:
1. KeepAlive: A message sent by PostgreSQL to keep the connection alive and also acknowledge
processed data by responding with the current WAL position.
2. Write: A message containing a WAL event - the actual data we are interested in.
3. Unknown: Any other message that we don't know how to handle - we log and ignore it.
For the KeepAlive message, we respond immediately with the current WAL position. Note: it is expected
that we receive many more of these messages than expected. That is because the rate at which the server
sends these messages scales proportionally to the number of Write messages it sends.
For the Write message, we send broadcast for each message one-by-one as we receive it. This is important
because the WAL stream from Postgres is ordered; if we reply to a Keepalive advancing the WAL position,
we should have already processed all the messages up to that point.
"""
@impl true
def handle_data(data, state) when is_keep_alive(data) do
%KeepAlive{reply: reply, wal_end: wal_end} = parse(data)
wal_end = wal_end + 1
message =
case reply do
:now -> standby_status(wal_end, wal_end, wal_end, reply)
:later -> hold()
end
{:noreply, message, state}
end
def handle_data(data, state) when is_write(data) do
%Write{message: message} = parse(data)
# TODO: Telemetry: Mark start
message
|> decode_message()
|> handle_message(state)
end
def handle_data(data, state) do
Logger.error("Unknown WAL message received!",
data: inspect(data),
state: inspect(state)
)
{:noreply, [], state}
end
# Handles messages received:
#
# 1. Insert/Update/Delete - send to Event.ingest/2 for further processing
# 2. Relation messages - store the relation data in our state so we can use it later
# to associate column names etc with the data we receive. In practice, we'll always
# see a Relation message before we see any data for that relation.
# 3. Begin/Commit/Origin/Truncate/Type - we ignore these messages for now
# 4. Graceful shutdown - we respond with {:disconnect, :normal} to
# indicate that we are shutting down gracefully and prevent auto reconnecting.
defp handle_message(
%Decoder.Messages.Relation{
id: id,
namespace: namespace,
name: name,
columns: columns
},
state
) do
relation = %{
namespace: namespace,
name: name,
columns: columns
}
{:noreply, [], %{state | relations: Map.put(state.relations, id, relation)}}
end
defp handle_message(%Decoder.Messages.Insert{} = msg, state) do
:ok = Event.ingest(msg, state.relations)
{:noreply, [], state}
end
defp handle_message(%Decoder.Messages.Update{} = msg, state) do
:ok = Event.ingest(msg, state.relations)
{:noreply, [], state}
end
defp handle_message(%Decoder.Messages.Delete{} = msg, state) do
:ok = Event.ingest(msg, state.relations)
{:noreply, [], state}
end
defp handle_message(%Decoder.Messages.Begin{}, state) do
{:noreply, [], state}
end
defp handle_message(%Decoder.Messages.Commit{}, state) do
{:noreply, [], state}
end
defp handle_message(%Decoder.Messages.Origin{}, state) do
{:noreply, [], state}
end
defp handle_message(%Decoder.Messages.Truncate{}, state) do
{:noreply, [], state}
end
defp handle_message(%Decoder.Messages.Type{}, state) do
{:noreply, [], state}
end
defp handle_message(%Decoder.Messages.Unsupported{data: data}, state) do
Logger.warning("Unsupported message received",
data: inspect(data),
state: inspect(state)
)
{:noreply, [], state}
end
@impl true
def handle_info(:shutdown, _), do: {:disconnect, :normal}
def handle_info({:DOWN, _, :process, _, _}, _), do: {:disconnect, :normal}
def handle_info(_, state), do: {:noreply, state}
@doc """
Called when the connection is disconnected unexpectedly.
We log the error and set the state to :disconnected, which will cause the
ReplicationConnection to attempt to reconnect when auto_reconnect is enabled.
This will happen if:
1. Postgres is restarted such as during a maintenance window
2. The connection is closed by the server due to our failure to acknowledge
Keepalive messages in a timely manner
3. The connection is cut due to a network error
4. The ReplicationConnection process crashes or is killed abruptly for any reason
"""
@impl true
def handle_disconnect(state) do
Logger.warning("Replication connection disconnected",
state: inspect(state)
)
{:noreply, %{state | step: :disconnected}}
end
end

View File

@@ -242,6 +242,7 @@ defmodule Domain.Flows do
|> Flow.Query.expire()
|> Repo.update_all([])
# TODO: WAL
:ok =
Enum.each(flows, fn flow ->
:ok = broadcast_flow_expiration_event(flow)
@@ -263,6 +264,7 @@ defmodule Domain.Flows do
flow_or_id |> flow_topic() |> PubSub.subscribe()
end
# TODO: WAL
defp broadcast_flow_expiration_event(flow) do
flow
|> flow_topic()

View File

@@ -120,6 +120,7 @@ defmodule Domain.Gateways do
)
|> case do
{:ok, group} ->
# TODO: WAL
:ok = broadcast_to_group(group, :updated)
{:ok, group}
@@ -512,6 +513,7 @@ defmodule Domain.Gateways do
|> PubSub.unsubscribe()
end
# TODO: WAL
def broadcast_to_group(group_or_id, payload) do
group_or_id
|> group_topic()

View File

@@ -68,6 +68,7 @@ defmodule Domain.Policies do
with :ok <- Auth.ensure_has_permissions(subject, required_permissions) do
Policy.Changeset.create(attrs, subject)
|> Repo.insert()
# TODO: WAL
|> case do
{:ok, policy} ->
:ok = broadcast_policy_events(:create, policy)
@@ -92,6 +93,7 @@ defmodule Domain.Policies do
|> Authorizer.for_subject(subject)
|> Repo.fetch_and_update_breakable(Policy.Query,
with: &Policy.Changeset.update(&1, attrs),
# TODO: WAL
after_update_commit: &broadcast_policy_events(:update, &1),
after_breaking_update_commit: fn updated_policy, _changeset ->
{:ok, _flows} = Flows.expire_flows_for(policy, subject)
@@ -109,6 +111,7 @@ defmodule Domain.Policies do
|> Authorizer.for_subject(subject)
|> Repo.fetch_and_update(Policy.Query,
with: &Policy.Changeset.disable(&1, subject),
# TODO: WAL
after_commit: &broadcast_policy_events(:disable, &1)
)
|> case do
@@ -129,6 +132,7 @@ defmodule Domain.Policies do
|> Authorizer.for_subject(subject)
|> Repo.fetch_and_update(Policy.Query,
with: &Policy.Changeset.enable/1,
# TODO: WAL
after_commit: &broadcast_policy_events(:enable, &1)
)
end
@@ -187,6 +191,7 @@ defmodule Domain.Policies do
|> Policy.Query.delete()
|> Repo.update_all([])
# TODO: WAL
:ok =
Enum.each(policies, fn policy ->
:ok = broadcast_policy_events(:delete, policy)
@@ -281,6 +286,7 @@ defmodule Domain.Policies do
actor_group_or_id |> actor_group_topic() |> PubSub.unsubscribe()
end
# TODO: WAL
defp broadcast_policy_events(action, %Policy{} = policy) do
payload = {:"#{action}_policy", policy.id}
:ok = broadcast_to_policy(policy, payload)

View File

@@ -232,6 +232,7 @@ defmodule Domain.Resources do
changeset = Resource.Changeset.create(subject.account, attrs, subject)
with {:ok, resource} <- Repo.insert(changeset) do
# TODO: WAL
:ok = broadcast_resource_events(:create, resource)
{:ok, resource}
end
@@ -253,6 +254,7 @@ defmodule Domain.Resources do
changeset = Resource.Changeset.create(account, attrs)
with {:ok, resource} <- Repo.insert(changeset) do
# TODO: WAL
:ok = broadcast_resource_events(:create, resource)
{:ok, resource}
end
@@ -280,6 +282,7 @@ defmodule Domain.Resources do
{:ok, _flows} = Flows.expire_flows_for(resource, subject)
end
# TODO: WAL
broadcast_resource_events(:update, resource)
end,
after_breaking_update_commit: fn updated_resource, _changeset ->
@@ -287,6 +290,7 @@ defmodule Domain.Resources do
# This is used to reset the resource on the client and gateway in case filters, conditions, etc are changed.
{:ok, _flows} = Flows.expire_flows_for(resource, subject)
# TODO: WAL
:ok = broadcast_resource_events(:delete, resource)
:ok = broadcast_resource_events(:create, updated_resource)
end
@@ -314,6 +318,7 @@ defmodule Domain.Resources do
)
|> case do
{:ok, resource} ->
# TODO: WAL
:ok = broadcast_resource_events(:delete, resource)
{:ok, _policies} = Policies.delete_policies_for(resource, subject)
{:ok, resource}
@@ -374,6 +379,7 @@ defmodule Domain.Resources do
account_or_id |> account_topic() |> PubSub.subscribe()
end
# TODO: WAL
defp broadcast_resource_events(action, %Resource{} = resource) do
payload = {:"#{action}_resource", resource.id}
:ok = broadcast_to_resource(resource, payload)

View File

@@ -290,11 +290,13 @@ defmodule Domain.Tokens do
|> Token.Query.delete()
|> Repo.update_all([])
# TODO: WAL
:ok = Enum.each(tokens, &broadcast_disconnect_message/1)
{:ok, tokens}
end
# TODO: WAL
defp broadcast_disconnect_message(%{type: :email}) do
:ok
end

View File

@@ -0,0 +1,29 @@
defmodule Domain.Repo.Migrations.SetTablesToReplicaIdentityFull do
use Ecto.Migration
@relations ~w[
accounts
actor_group_memberships
actor_groups
actors
auth_identities
auth_providers
clients
flow_activities
flows
gateway_groups
gateways
policies
relay_groups
relays
resource_connections
resources
tokens
]
def change do
for relation <- @relations do
execute("ALTER TABLE #{relation} REPLICA IDENTITY FULL")
end
end
end

View File

@@ -0,0 +1,39 @@
defmodule Domain.Repo.Migrations.AutomaticallySetReplicaIdentityFull do
use Ecto.Migration
# Creates a trigger that automatically sets the REPLICA IDENTITY to FULL for new tables. This is
# needed to ensure we can capture changes to a table in replication in order to reliably
# broadcast events.
def change do
execute(
"""
CREATE OR REPLACE FUNCTION set_replica_identity_full()
RETURNS EVENT_TRIGGER AS $$
DECLARE
rec RECORD;
BEGIN
FOR rec IN SELECT * FROM pg_event_trigger_ddl_commands() WHERE command_tag = 'CREATE TABLE'
LOOP
EXECUTE format('ALTER TABLE %s REPLICA IDENTITY FULL', rec.object_identity);
END LOOP;
END;
$$ LANGUAGE plpgsql;
""",
"""
DROP FUNCTION IF EXISTS set_replica_identity_full();
"""
)
execute(
"""
CREATE EVENT TRIGGER trigger_set_replica_identity
ON ddl_command_end
WHEN TAG IN ('CREATE TABLE')
EXECUTE FUNCTION set_replica_identity_full();
""",
"""
DROP EVENT TRIGGER IF EXISTS trigger_set_replica_identity;
"""
)
end
end

View File

@@ -0,0 +1,470 @@
defmodule Domain.Events.DecoderTest do
use ExUnit.Case, async: true
alias Domain.Events.Decoder
alias Domain.Events.Decoder.Messages
@lsn_binary <<0::integer-32, 23_785_280::integer-32>>
@lsn_decoded {0, 23_785_280}
@timestamp_int 704_521_200_000
@timestamp_decoded ~U[2000-01-09 03:42:01.200000Z]
@xid 1234
@relation_id 16384
# Example OIDs for testing RELATION decoding
@oid_int4 23
@oid_text 25
@oid_numeric 1700
@oid_unknown 9999
describe "decode_message/1" do
test "decodes BEGIN message" do
# Construct binary message: 'B', final_lsn, commit_timestamp, xid
message = <<"B", @lsn_binary::binary, @timestamp_int::integer-64, @xid::integer-32>>
expected = %Messages.Begin{
final_lsn: @lsn_decoded,
commit_timestamp: @timestamp_decoded,
xid: @xid
}
assert Decoder.decode_message(message) == expected
end
test "decodes COMMIT message" do
# Construct binary message: 'C', flags (ignored), lsn, end_lsn, commit_timestamp
# Flags are currently ignored, represented as []
flags = <<0::integer-8>>
end_lsn_binary = <<0::integer-32, 23_785_300::integer-32>>
end_lsn_decoded = {0, 23_785_300}
message =
<<"C", flags::binary-1, @lsn_binary::binary, end_lsn_binary::binary,
@timestamp_int::integer-64>>
expected = %Messages.Commit{
flags: [],
lsn: @lsn_decoded,
end_lsn: end_lsn_decoded,
commit_timestamp: @timestamp_decoded
}
assert Decoder.decode_message(message) == expected
end
test "decodes ORIGIN message" do
# Construct binary message: 'O', origin_commit_lsn, name (null-terminated)
origin_name = "origin_node_1\0"
message = <<"O", @lsn_binary::binary, origin_name::binary>>
expected = %Messages.Origin{
origin_commit_lsn: @lsn_decoded,
# The decoder currently includes the null terminator from the split
name: "origin_node_1\0"
}
assert Decoder.decode_message(message) == expected
end
test "decodes RELATION message with known types" do
# Construct binary message: 'R', id, namespace\0, name\0, replica_identity, num_columns, columns_data
namespace = "public\0"
name = "users\0"
# full
replica_identity = "f"
num_columns = 3
# Column 1: flags=1 (key), name="id", type=23 (int4), modifier=-1
# Column 2: flags=0, name="email", type=25 (text), modifier=-1
# Column 3: flags=0, name="balance", type=1700 (numeric), modifier=131076 (e.g., NUMERIC(10,2))
col1_flags = <<1::integer-8>>
col1_name = "id\0"
col1_type = <<@oid_int4::integer-32>>
col1_mod = <<-1::integer-32>>
col2_flags = <<0::integer-8>>
col2_name = "email\0"
col2_type = <<@oid_text::integer-32>>
col2_mod = <<-1::integer-32>>
col3_flags = <<0::integer-8>>
col3_name = "balance\0"
col3_type = <<@oid_numeric::integer-32>>
col3_mod = <<131_076::integer-32>>
columns_binary =
<<col1_flags::binary-1, col1_name::binary, col1_type::binary-4, col1_mod::binary-4,
col2_flags::binary-1, col2_name::binary, col2_type::binary-4, col2_mod::binary-4,
col3_flags::binary-1, col3_name::binary, col3_type::binary-4, col3_mod::binary-4>>
message =
<<"R", @relation_id::integer-32, namespace::binary, name::binary,
replica_identity::binary-1, num_columns::integer-16, columns_binary::binary>>
# Expect the string names returned by the actual OidDatabase.name_for_type_id
expected = %Messages.Relation{
id: @relation_id,
namespace: "public",
name: "users",
# 'f' maps to :all_columns
replica_identity: :all_columns,
columns: [
%Messages.Relation.Column{
flags: [:key],
name: "id",
# OidDatabase.name_for_type_id(23)
type: "int4",
type_modifier: 4_294_967_295
},
%Messages.Relation.Column{
flags: [],
name: "email",
# OidDatabase.name_for_type_id(25)
type: "text",
type_modifier: 4_294_967_295
},
%Messages.Relation.Column{
flags: [],
name: "balance",
# OidDatabase.name_for_type_id(1700)
type: "numeric",
type_modifier: 131_076
}
]
}
assert Decoder.decode_message(message) == expected
end
test "decodes RELATION message with unknown type" do
# Construct binary message with an OID not listed in OidDatabase
namespace = "custom_schema\0"
name = "gadgets\0"
# index
replica_identity = "i"
num_columns = 1
# Column 1: flags=0, name="widget_type", type=9999 (unknown), modifier=-1
col1_flags = <<0::integer-8>>
col1_name = "widget_type\0"
col1_type = <<@oid_unknown::integer-32>>
col1_mod = <<-1::integer-32>>
columns_binary =
<<col1_flags::binary-1, col1_name::binary, col1_type::binary-4, col1_mod::binary-4>>
message =
<<"R", @relation_id::integer-32, namespace::binary, name::binary,
replica_identity::binary-1, num_columns::integer-16, columns_binary::binary>>
# Expect the raw OID itself, as per the fallback case in OidDatabase.name_for_type_id
expected = %Messages.Relation{
id: @relation_id,
namespace: "custom_schema",
name: "gadgets",
# 'i' maps to :index
replica_identity: :index,
columns: [
%Messages.Relation.Column{
flags: [],
name: "widget_type",
# OidDatabase.name_for_type_id(9999) returns 9999
type: @oid_unknown,
type_modifier: 4_294_967_295
}
]
}
assert Decoder.decode_message(message) == expected
end
test "decodes INSERT message" do
# Construct binary message: 'I', relation_id, 'N', num_columns, tuple_data
num_columns = 3
# Tuple data: 't', len1, val1, 'n', 't', len2, val2
val1 = "hello world"
len1 = byte_size(val1)
val2 = "test"
len2 = byte_size(val2)
tuple_data_binary =
<<"t", len1::integer-32, val1::binary, "n", "t", len2::integer-32, val2::binary>>
message =
<<"I", @relation_id::integer-32, "N", num_columns::integer-16, tuple_data_binary::binary>>
expected = %Messages.Insert{
relation_id: @relation_id,
tuple_data: {val1, nil, val2}
}
assert Decoder.decode_message(message) == expected
end
test "decodes INSERT message with unchanged toast" do
# Construct binary message: 'I', relation_id, 'N', num_columns, tuple_data
num_columns = 1
# Tuple data: 'u'
tuple_data_binary = <<"u">>
message =
<<"I", @relation_id::integer-32, "N", num_columns::integer-16, tuple_data_binary::binary>>
expected = %Messages.Insert{
relation_id: @relation_id,
tuple_data: {:unchanged_toast}
}
assert Decoder.decode_message(message) == expected
end
test "decodes UPDATE message (simple - New tuple only)" do
# Construct binary message: 'U', relation_id, 'N', num_columns, tuple_data
num_columns = 1
val1 = "new value"
len1 = byte_size(val1)
tuple_data_binary = <<"t", len1::integer-32, val1::binary>>
message =
<<"U", @relation_id::integer-32, "N", num_columns::integer-16, tuple_data_binary::binary>>
expected = %Messages.Update{
relation_id: @relation_id,
# Default value when not present
changed_key_tuple_data: nil,
# Default value when not present
old_tuple_data: nil,
tuple_data: {val1}
}
assert Decoder.decode_message(message) == expected
end
test "decodes UPDATE message (with Old tuple)" do
# Construct binary message: 'U', relation_id, 'O', num_old_cols, old_tuple_data, 'N', num_new_cols, new_tuple_data
num_old_cols = 2
old_val1 = "old value"
old_len1 = byte_size(old_val1)
# Old Tuple: text, null
old_tuple_binary = <<"t", old_len1::integer-32, old_val1::binary, "n">>
num_new_cols = 2
new_val1 = "new value"
new_len1 = byte_size(new_val1)
new_val2 = "another new"
new_len2 = byte_size(new_val2)
# New Tuple: text, text
new_tuple_binary =
<<"t", new_len1::integer-32, new_val1::binary, "t", new_len2::integer-32,
new_val2::binary>>
message =
<<"U", @relation_id::integer-32, "O", num_old_cols::integer-16, old_tuple_binary::binary,
"N", num_new_cols::integer-16, new_tuple_binary::binary>>
expected = %Messages.Update{
relation_id: @relation_id,
changed_key_tuple_data: nil,
old_tuple_data: {old_val1, nil},
tuple_data: {new_val1, new_val2}
}
assert Decoder.decode_message(message) == expected
end
test "decodes UPDATE message (with Key tuple)" do
# Construct binary message: 'U', relation_id, 'K', num_key_cols, key_tuple_data, 'N', num_new_cols, new_tuple_data
num_key_cols = 1
key_val = "key value"
key_len = byte_size(key_val)
key_tuple_binary = <<"t", key_len::integer-32, key_val::binary>>
num_new_cols = 2
new_val1 = "new value 1"
new_len1 = byte_size(new_val1)
# New Tuple: text, unchanged_toast
new_tuple_binary = <<"t", new_len1::integer-32, new_val1::binary, "u">>
message =
<<"U", @relation_id::integer-32, "K", num_key_cols::integer-16, key_tuple_binary::binary,
"N", num_new_cols::integer-16, new_tuple_binary::binary>>
expected = %Messages.Update{
relation_id: @relation_id,
changed_key_tuple_data: {key_val},
old_tuple_data: nil,
tuple_data: {new_val1, :unchanged_toast}
}
assert Decoder.decode_message(message) == expected
end
test "decodes DELETE message (with Old tuple)" do
# Construct binary message: 'D', relation_id, 'O', num_columns, tuple_data
num_columns = 2
val1 = "deleted value"
len1 = byte_size(val1)
# Data: text value, null
tuple_data_binary = <<"t", len1::integer-32, val1::binary, "n">>
message =
<<"D", @relation_id::integer-32, "O", num_columns::integer-16, tuple_data_binary::binary>>
expected = %Messages.Delete{
relation_id: @relation_id,
# Default value
changed_key_tuple_data: nil,
old_tuple_data: {val1, nil}
}
assert Decoder.decode_message(message) == expected
end
test "decodes DELETE message (with Key tuple)" do
# Construct binary message: 'D', relation_id, 'K', num_columns, tuple_data
num_columns = 1
val1 = "key value"
len1 = byte_size(val1)
tuple_data_binary = <<"t", len1::integer-32, val1::binary>>
message =
<<"D", @relation_id::integer-32, "K", num_columns::integer-16, tuple_data_binary::binary>>
expected = %Messages.Delete{
relation_id: @relation_id,
changed_key_tuple_data: {val1},
# Default value
old_tuple_data: nil
}
assert Decoder.decode_message(message) == expected
end
test "decodes TRUNCATE message with no options" do
# Construct binary message: 'T', num_relations, options, relation_ids
num_relations = 2
# No options
options = 0
rel_id1 = <<16384::integer-32>>
rel_id2 = <<16385::integer-32>>
relation_ids_binary = <<rel_id1::binary-4, rel_id2::binary-4>>
message =
<<"T", num_relations::integer-32, options::integer-8, relation_ids_binary::binary>>
expected = %Messages.Truncate{
number_of_relations: num_relations,
# Empty list for 0
options: [],
truncated_relations: [16384, 16385]
}
assert Decoder.decode_message(message) == expected
end
test "decodes TRUNCATE message with CASCADE option" do
# Construct binary message: 'T', num_relations, options, relation_ids
num_relations = 1
# CASCADE
options = 1
rel_id1 = <<16384::integer-32>>
relation_ids_binary = <<rel_id1::binary-4>>
message =
<<"T", num_relations::integer-32, options::integer-8, relation_ids_binary::binary>>
expected = %Messages.Truncate{
number_of_relations: num_relations,
options: [:cascade],
truncated_relations: [16384]
}
assert Decoder.decode_message(message) == expected
end
test "decodes TRUNCATE message with RESTART IDENTITY option" do
# Construct binary message: 'T', num_relations, options, relation_ids
num_relations = 1
# RESTART IDENTITY
options = 2
rel_id1 = <<16384::integer-32>>
relation_ids_binary = <<rel_id1::binary-4>>
message =
<<"T", num_relations::integer-32, options::integer-8, relation_ids_binary::binary>>
expected = %Messages.Truncate{
number_of_relations: num_relations,
options: [:restart_identity],
truncated_relations: [16384]
}
assert Decoder.decode_message(message) == expected
end
test "decodes TRUNCATE message with CASCADE and RESTART IDENTITY options" do
# Construct binary message: 'T', num_relations, options, relation_ids
num_relations = 3
# CASCADE | RESTART IDENTITY
options = 3
rel_id1 = <<100::integer-32>>
rel_id2 = <<200::integer-32>>
rel_id3 = <<300::integer-32>>
relation_ids_binary = <<rel_id1::binary-4, rel_id2::binary-4, rel_id3::binary-4>>
message =
<<"T", num_relations::integer-32, options::integer-8, relation_ids_binary::binary>>
expected = %Messages.Truncate{
number_of_relations: num_relations,
options: [:cascade, :restart_identity],
truncated_relations: [100, 200, 300]
}
assert Decoder.decode_message(message) == expected
end
test "decodes TYPE message" do
# Construct binary message: 'Y', data_type_id, namespace\0, name\0
# Example OID for varchar
type_id = 1043
namespace = "pg_catalog\0"
name = "varchar\0"
message = <<"Y", type_id::integer-32, namespace::binary, name::binary>>
expected = %Messages.Type{
id: type_id,
namespace: "pg_catalog",
name: "varchar"
}
assert Decoder.decode_message(message) == expected
end
test "handles unsupported message type" do
# Use an arbitrary starting byte not handled ('X')
message = <<"X", 1, 2, 3, 4>>
expected = %Messages.Unsupported{
data: message
}
assert Decoder.decode_message(message) == expected
end
test "handles empty binary message" do
message = <<>>
expected = %Messages.Unsupported{data: <<>>}
assert Decoder.decode_message(message) == expected
end
test "handles message with only type byte" do
message = <<"B">>
expected = %Messages.Unsupported{
data: "B"
}
assert Decoder.decode_message(message) == expected
end
end
end

View File

@@ -0,0 +1,47 @@
defmodule Domain.Events.EventTest do
use ExUnit.Case, async: true
import Domain.Events.Event
alias Domain.Events.Decoder
setup do
config = Application.fetch_env!(:domain, Domain.Events.ReplicationConnection)
table_subscriptions = config[:table_subscriptions]
%{table_subscriptions: table_subscriptions}
end
describe "ingest/2" do
test "returns :ok for insert on all configured table subscriptions", %{
table_subscriptions: table_subscriptions
} do
for table <- table_subscriptions do
relations = %{"1" => %{name: table, columns: []}}
msg = %Decoder.Messages.Insert{tuple_data: {}, relation_id: "1"}
assert :ok == ingest(msg, relations)
end
end
test "returns :ok for update on all configured table subscriptions", %{
table_subscriptions: table_subscriptions
} do
for table <- table_subscriptions do
relations = %{"1" => %{name: table, columns: []}}
msg = %Decoder.Messages.Update{old_tuple_data: {}, tuple_data: {}, relation_id: "1"}
assert :ok == ingest(msg, relations)
end
end
test "returns :ok for delete on all configured table subscriptions", %{
table_subscriptions: table_subscriptions
} do
for table <- table_subscriptions do
relations = %{"1" => %{name: table, columns: []}}
msg = %Decoder.Messages.Delete{old_tuple_data: {}, relation_id: "1"}
assert :ok == ingest(msg, relations)
end
end
end
end

View File

@@ -0,0 +1,26 @@
defmodule Domain.Events.Hooks.AccountsTest do
use ExUnit.Case, async: true
import Domain.Events.Hooks.Accounts
setup do
%{old_data: %{}, data: %{}}
end
describe "insert/1" do
test "returns :ok", %{data: data} do
assert :ok == insert(data)
end
end
describe "update/2" do
test "returns :ok", %{old_data: old_data, data: data} do
assert :ok == update(old_data, data)
end
end
describe "delete/1" do
test "returns :ok", %{data: data} do
assert :ok == delete(data)
end
end
end

View File

@@ -0,0 +1,26 @@
defmodule Domain.Events.Hooks.ActorGroupMembershipsTest do
use ExUnit.Case, async: true
import Domain.Events.Hooks.ActorGroupMemberships
setup do
%{old_data: %{}, data: %{}}
end
describe "insert/1" do
test "returns :ok", %{data: data} do
assert :ok == insert(data)
end
end
describe "update/2" do
test "returns :ok", %{old_data: old_data, data: data} do
assert :ok == update(old_data, data)
end
end
describe "delete/1" do
test "returns :ok", %{data: data} do
assert :ok == delete(data)
end
end
end

View File

@@ -0,0 +1,26 @@
defmodule Domain.Events.Hooks.ActorGroupsTest do
use ExUnit.Case, async: true
import Domain.Events.Hooks.ActorGroups
setup do
%{old_data: %{}, data: %{}}
end
describe "insert/1" do
test "returns :ok", %{data: data} do
assert :ok == insert(data)
end
end
describe "update/2" do
test "returns :ok", %{old_data: old_data, data: data} do
assert :ok == update(old_data, data)
end
end
describe "delete/1" do
test "returns :ok", %{data: data} do
assert :ok == delete(data)
end
end
end

View File

@@ -0,0 +1,26 @@
defmodule Domain.Events.Hooks.ActorsTest do
use ExUnit.Case, async: true
import Domain.Events.Hooks.Actors
setup do
%{old_data: %{}, data: %{}}
end
describe "insert/1" do
test "returns :ok", %{data: data} do
assert :ok == insert(data)
end
end
describe "update/2" do
test "returns :ok", %{old_data: old_data, data: data} do
assert :ok == update(old_data, data)
end
end
describe "delete/1" do
test "returns :ok", %{data: data} do
assert :ok == delete(data)
end
end
end

View File

@@ -0,0 +1,26 @@
defmodule Domain.Events.Hooks.AuthIdentitiesTest do
use ExUnit.Case, async: true
import Domain.Events.Hooks.AuthIdentities
setup do
%{old_data: %{}, data: %{}}
end
describe "insert/1" do
test "returns :ok", %{data: data} do
assert :ok == insert(data)
end
end
describe "update/2" do
test "returns :ok", %{old_data: old_data, data: data} do
assert :ok == update(old_data, data)
end
end
describe "delete/1" do
test "returns :ok", %{data: data} do
assert :ok == delete(data)
end
end
end

View File

@@ -0,0 +1,26 @@
defmodule Domain.Events.Hooks.AuthProvidersTest do
use ExUnit.Case, async: true
import Domain.Events.Hooks.AuthProviders
setup do
%{old_data: %{}, data: %{}}
end
describe "insert/1" do
test "returns :ok", %{data: data} do
assert :ok == insert(data)
end
end
describe "update/2" do
test "returns :ok", %{old_data: old_data, data: data} do
assert :ok == update(old_data, data)
end
end
describe "delete/1" do
test "returns :ok", %{data: data} do
assert :ok == delete(data)
end
end
end

View File

@@ -0,0 +1,26 @@
defmodule Domain.Events.Hooks.ClientsTest do
use ExUnit.Case, async: true
import Domain.Events.Hooks.Clients
setup do
%{old_data: %{}, data: %{}}
end
describe "insert/1" do
test "returns :ok", %{data: data} do
assert :ok == insert(data)
end
end
describe "update/2" do
test "returns :ok", %{old_data: old_data, data: data} do
assert :ok == update(old_data, data)
end
end
describe "delete/1" do
test "returns :ok", %{data: data} do
assert :ok == delete(data)
end
end
end

View File

@@ -0,0 +1,26 @@
defmodule Domain.Events.Hooks.FlowActivitiesTest do
use ExUnit.Case, async: true
import Domain.Events.Hooks.FlowActivities
setup do
%{old_data: %{}, data: %{}}
end
describe "insert/1" do
test "returns :ok", %{data: data} do
assert :ok == insert(data)
end
end
describe "update/2" do
test "returns :ok", %{old_data: old_data, data: data} do
assert :ok == update(old_data, data)
end
end
describe "delete/1" do
test "returns :ok", %{data: data} do
assert :ok == delete(data)
end
end
end

View File

@@ -0,0 +1,26 @@
defmodule Domain.Events.Hooks.FlowsTest do
use ExUnit.Case, async: true
import Domain.Events.Hooks.Flows
setup do
%{old_data: %{}, data: %{}}
end
describe "insert/1" do
test "returns :ok", %{data: data} do
assert :ok == insert(data)
end
end
describe "update/2" do
test "returns :ok", %{old_data: old_data, data: data} do
assert :ok == update(old_data, data)
end
end
describe "delete/1" do
test "returns :ok", %{data: data} do
assert :ok == delete(data)
end
end
end

View File

@@ -0,0 +1,26 @@
defmodule Domain.Events.Hooks.GatewayGroupsTest do
use ExUnit.Case, async: true
import Domain.Events.Hooks.GatewayGroups
setup do
%{old_data: %{}, data: %{}}
end
describe "insert/1" do
test "returns :ok", %{data: data} do
assert :ok == insert(data)
end
end
describe "update/2" do
test "returns :ok", %{old_data: old_data, data: data} do
assert :ok == update(old_data, data)
end
end
describe "delete/1" do
test "returns :ok", %{data: data} do
assert :ok == delete(data)
end
end
end

View File

@@ -0,0 +1,26 @@
defmodule Domain.Events.Hooks.GatewaysTest do
use ExUnit.Case, async: true
import Domain.Events.Hooks.Gateways
setup do
%{old_data: %{}, data: %{}}
end
describe "insert/1" do
test "returns :ok", %{data: data} do
assert :ok == insert(data)
end
end
describe "update/2" do
test "returns :ok", %{old_data: old_data, data: data} do
assert :ok == update(old_data, data)
end
end
describe "delete/1" do
test "returns :ok", %{data: data} do
assert :ok == delete(data)
end
end
end

View File

@@ -0,0 +1,26 @@
defmodule Domain.Events.Hooks.PoliciesTest do
use ExUnit.Case, async: true
import Domain.Events.Hooks.Policies
setup do
%{old_data: %{}, data: %{}}
end
describe "insert/1" do
test "returns :ok", %{data: data} do
assert :ok == insert(data)
end
end
describe "update/2" do
test "returns :ok", %{old_data: old_data, data: data} do
assert :ok == update(old_data, data)
end
end
describe "delete/1" do
test "returns :ok", %{data: data} do
assert :ok == delete(data)
end
end
end

View File

@@ -0,0 +1,26 @@
defmodule Domain.Events.Hooks.RelayGroupsTest do
use ExUnit.Case, async: true
import Domain.Events.Hooks.RelayGroups
setup do
%{old_data: %{}, data: %{}}
end
describe "insert/1" do
test "returns :ok", %{data: data} do
assert :ok == insert(data)
end
end
describe "update/2" do
test "returns :ok", %{old_data: old_data, data: data} do
assert :ok == update(old_data, data)
end
end
describe "delete/1" do
test "returns :ok", %{data: data} do
assert :ok == delete(data)
end
end
end

View File

@@ -0,0 +1,26 @@
defmodule Domain.Events.Hooks.RelaysTest do
use ExUnit.Case, async: true
import Domain.Events.Hooks.Relays
setup do
%{old_data: %{}, data: %{}}
end
describe "insert/1" do
test "returns :ok", %{data: data} do
assert :ok == insert(data)
end
end
describe "update/2" do
test "returns :ok", %{old_data: old_data, data: data} do
assert :ok == update(old_data, data)
end
end
describe "delete/1" do
test "returns :ok", %{data: data} do
assert :ok == delete(data)
end
end
end

View File

@@ -0,0 +1,26 @@
defmodule Domain.Events.Hooks.ResourceConnectionsTest do
use ExUnit.Case, async: true
import Domain.Events.Hooks.ResourceConnections
setup do
%{old_data: %{}, data: %{}}
end
describe "insert/1" do
test "returns :ok", %{data: data} do
assert :ok == insert(data)
end
end
describe "update/2" do
test "returns :ok", %{old_data: old_data, data: data} do
assert :ok == update(old_data, data)
end
end
describe "delete/1" do
test "returns :ok", %{data: data} do
assert :ok == delete(data)
end
end
end

View File

@@ -0,0 +1,26 @@
defmodule Domain.Events.Hooks.ResourcesTest do
use ExUnit.Case, async: true
import Domain.Events.Hooks.Resources
setup do
%{old_data: %{}, data: %{}}
end
describe "insert/1" do
test "returns :ok", %{data: data} do
assert :ok == insert(data)
end
end
describe "update/2" do
test "returns :ok", %{old_data: old_data, data: data} do
assert :ok == update(old_data, data)
end
end
describe "delete/1" do
test "returns :ok", %{data: data} do
assert :ok == delete(data)
end
end
end

View File

@@ -0,0 +1,26 @@
defmodule Domain.Events.Hooks.TokensTest do
use ExUnit.Case, async: true
import Domain.Events.Hooks.Tokens
setup do
%{old_data: %{}, data: %{}}
end
describe "insert/1" do
test "returns :ok", %{data: data} do
assert :ok == insert(data)
end
end
describe "update/2" do
test "returns :ok", %{old_data: old_data, data: data} do
assert :ok == update(old_data, data)
end
end
describe "delete/1" do
test "returns :ok", %{data: data} do
assert :ok == delete(data)
end
end
end

View File

@@ -0,0 +1,215 @@
defmodule Domain.Events.ReplicationConnectionTest do
# Only one ReplicationConnection should be started in the cluster
use ExUnit.Case, async: false
alias Domain.Events.ReplicationConnection
# Used to test callbacks, not used for live connection
@mock_state %ReplicationConnection{
schema: "test_schema",
connection_opts: [],
step: :disconnected,
publication_name: "test_pub",
replication_slot_name: "test_slot",
output_plugin: "pgoutput",
proto_version: 1,
table_subscriptions: ["accounts", "resources"],
relations: %{}
}
# Used to test live connection
setup_all do
config =
Application.fetch_env!(:domain, Domain.Events.ReplicationConnection)
instance = struct(Domain.Events.ReplicationConnection, config)
child_spec = %{
id: Domain.Events.ReplicationConnection,
start: {Domain.Events.ReplicationConnection, :start_link, [instance]},
restart: :transient
}
{:ok, pid} = start_supervised(child_spec)
{:ok, pid: pid}
end
describe "handle_connect/1 callback" do
test "handle_connect initiates publication check" do
state = @mock_state
expected_query = "SELECT 1 FROM pg_publication WHERE pubname = '#{state.publication_name}'"
expected_next_state = %{state | step: :create_publication}
assert {:query, ^expected_query, ^expected_next_state} =
ReplicationConnection.handle_connect(state)
end
end
describe "handle_result/2 callback" do
test "handle_result transitions from create_publication to create_replication_slot when publication exists" do
state = %{@mock_state | step: :create_publication}
result = [%Postgrex.Result{num_rows: 1}]
expected_query =
"SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'"
expected_next_state = %{state | step: :create_replication_slot}
assert {:query, ^expected_query, ^expected_next_state} =
ReplicationConnection.handle_result(result, state)
end
test "handle_result transitions from create_replication_slot to start_replication_slot when slot exists" do
state = %{@mock_state | step: :create_replication_slot}
result = [%Postgrex.Result{num_rows: 1}]
expected_query = "SELECT 1"
expected_next_state = %{state | step: :start_replication_slot}
assert {:query, ^expected_query, ^expected_next_state} =
ReplicationConnection.handle_result(result, state)
end
test "handle_result transitions from start_replication_slot to streaming" do
state = %{@mock_state | step: :start_replication_slot}
result = [%Postgrex.Result{num_rows: 1}]
expected_stream_query =
"START_REPLICATION SLOT \"#{state.replication_slot_name}\" LOGICAL 0/0 (proto_version '#{state.proto_version}', publication_names '#{state.publication_name}')"
expected_next_state = %{state | step: :streaming}
assert {:stream, ^expected_stream_query, [], ^expected_next_state} =
ReplicationConnection.handle_result(result, state)
end
test "handle_result creates publication if it doesn't exist" do
state = %{@mock_state | step: :create_publication}
result = [%Postgrex.Result{num_rows: 0}]
expected_tables = "test_schema.accounts,test_schema.resources"
expected_query = "CREATE PUBLICATION #{state.publication_name} FOR TABLE #{expected_tables}"
expected_next_state = %{state | step: :check_replication_slot}
assert {:query, ^expected_query, ^expected_next_state} =
ReplicationConnection.handle_result(result, state)
end
test "handle_result transitions from check_replication_slot to create_replication_slot after creating publication" do
state = %{@mock_state | step: :check_replication_slot}
result = [%Postgrex.Result{num_rows: 0}]
expected_query =
"SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'"
expected_next_state = %{state | step: :create_replication_slot}
assert {:query, ^expected_query, ^expected_next_state} =
ReplicationConnection.handle_result(result, state)
end
test "handle_result creates replication slot if it doesn't exist" do
state = %{@mock_state | step: :create_replication_slot}
result = [%Postgrex.Result{num_rows: 0}]
expected_query =
"CREATE_REPLICATION_SLOT #{state.replication_slot_name} LOGICAL #{state.output_plugin} NOEXPORT_SNAPSHOT"
expected_next_state = %{state | step: :start_replication_slot}
assert {:query, ^expected_query, ^expected_next_state} =
ReplicationConnection.handle_result(result, state)
end
end
# In-depth decoding tests are handled in Domain.Events.DecoderTest
describe "handle_data/2" do
test "handle_data handles KeepAlive with reply :now" do
state = %{@mock_state | step: :streaming}
wal_end = 12345
now =
System.os_time(:microsecond) - DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond)
# 100 milliseconds
grace_period = 100_000
keepalive_data = <<?k, wal_end::64, 0::64, 1>>
assert {:noreply, reply, ^state} =
ReplicationConnection.handle_data(keepalive_data, state)
assert [<<?r, 12346::64, 12346::64, 12346::64, clock::64, 1::8>>] = reply
assert now <= clock
assert clock < now + grace_period
end
test "handle_data handles KeepAlive with reply :later" do
state = %{@mock_state | step: :streaming}
wal_end = 54321
keepalive_data = <<?k, wal_end::64, 0::64, 0>>
expected_reply_message = []
assert {:noreply, ^expected_reply_message, ^state} =
ReplicationConnection.handle_data(keepalive_data, state)
end
test "handle_data handles Write message" do
state = %{@mock_state | step: :streaming}
server_wal_start = 123_456_789
server_wal_end = 987_654_321
server_system_clock = 1_234_567_890
message = "Hello, world!"
write_data =
<<?w, server_wal_start::64, server_wal_end::64, server_system_clock::64, message::binary>>
assert {:noreply, [], ^state} = ReplicationConnection.handle_data(write_data, state)
end
test "handle_data handles unknown message" do
state = %{@mock_state | step: :streaming}
unknown_data = <<?q, 1, 2, 3>>
assert {:noreply, [], ^state} = ReplicationConnection.handle_data(unknown_data, state)
end
end
describe "handle_info/2" do
test "handle_info handles :shutdown message" do
state = @mock_state
assert {:disconnect, :normal} = ReplicationConnection.handle_info(:shutdown, state)
end
test "handle_info handles :DOWN message from monitored process" do
state = @mock_state
monitor_ref = make_ref()
down_msg = {:DOWN, monitor_ref, :process, :some_pid, :shutdown}
assert {:disconnect, :normal} = ReplicationConnection.handle_info(down_msg, state)
end
test "handle_info ignores other messages" do
state = @mock_state
random_msg = {:some_other_info, "data"}
assert {:noreply, ^state} = ReplicationConnection.handle_info(random_msg, state)
end
end
describe "handle_disconnect/1" do
test "handle_disconnect resets step to :disconnected and logs warning" do
state = %{@mock_state | step: :streaming}
expected_state = %{state | step: :disconnected}
log_output =
ExUnit.CaptureLog.capture_log(fn ->
assert {:noreply, ^expected_state} = ReplicationConnection.handle_disconnect(state)
end)
assert log_output =~ "Replication connection disconnected"
end
end
end

View File

@@ -30,6 +30,42 @@ config :domain, Domain.Repo,
migration_lock: :pg_advisory_lock,
start_apps_before_migration: [:ssl, :logger_json]
config :domain, Domain.Events.ReplicationConnection,
connection_opts: [
# Automatically reconnect if we lose connection.
auto_reconnect: true,
hostname: "localhost",
port: 5432,
ssl: false,
ssl_opts: [],
parameters: [],
username: "postgres",
database: "firezone_dev",
password: "postgres"
],
# When changing these, make sure to also:
# 1. Make appropriate changes to `Domain.Events.Event`
# 2. Add an appropriate `Domain.Events.Hooks` module
table_subscriptions: ~w[
accounts
actor_group_memberships
actor_groups
actors
auth_identities
auth_providers
clients
flow_activities
flows
gateway_groups
gateways
policies
relay_groups
relays
resource_connections
resources
tokens
]
config :domain, Domain.Tokens,
key_base: "5OVYJ83AcoQcPmdKNksuBhJFBhjHD1uUa9mDOHV/6EIdBQ6pXksIhkVeWIzFk5S2",
salt: "t01wa0K4lUd7mKa0HAtZdE+jFOPDDej2"

View File

@@ -27,6 +27,20 @@ if config_env() == :prod do
else: [{:hostname, compile_config!(:database_host)}]
)
config :domain, Domain.Events.ReplicationConnection,
connection_opts: [
# Automatically reconnect if we lose connection.
auto_reconnect: true,
hostname: compile_config!(:database_host),
port: compile_config!(:database_port),
ssl: compile_config!(:database_ssl_enabled),
ssl_opts: compile_config!(:database_ssl_opts),
parameters: compile_config!(:database_parameters),
username: compile_config!(:database_replication_user),
password: compile_config!(:database_replication_password),
database: compile_config!(:database_name)
]
config :domain, Domain.Tokens,
key_base: compile_config!(:tokens_key_base),
salt: compile_config!(:tokens_salt)

View File

@@ -18,6 +18,14 @@ config :domain, Domain.Repo,
pool: Ecto.Adapters.SQL.Sandbox,
queue_target: 1000
config :domain, Domain.Events.ReplicationConnection,
publication_name: "events_test",
replication_slot_name: "events_slot_test",
connection_opts: [
auto_reconnect: false,
database: "firezone_test#{partition_suffix}"
]
config :domain, Domain.Telemetry, enabled: false
config :domain, Domain.ConnectivityChecks, enabled: false

515
elixir/test.exs Normal file
View File

@@ -0,0 +1,515 @@
defmodule Domain.Events.ReplicationConnectionTest do
# Only one ReplicationConnection should be started in the cluster
use ExUnit.Case, async: false
alias Domain.Events.Decoder.Messages
alias Domain.Events.ReplicationConnection
# Used to test callbacks, not used for live connection
@mock_state %ReplicationConnection{
schema: "test_schema",
connection_opts: [],
step: :disconnected,
publication_name: "test_pub",
replication_slot_name: "test_slot",
output_plugin: "pgoutput",
proto_version: 1,
# Example, adjust if needed
table_subscriptions: ["accounts", "resources"],
relations: %{}
}
# Used to test live connection (Setup remains unchanged)
setup do
# Ensure Postgrex is started if your tests rely on it implicitly
{:ok, pid} = start_supervised(Domain.Events.ReplicationConnection)
{:ok, pid: pid}
end
describe "handle_connect/1 callback" do
test "handle_connect initiates publication check" do
state = @mock_state
expected_query = "SELECT 1 FROM pg_publication WHERE pubname = '#{state.publication_name}'"
expected_next_state = %{state | step: :create_publication}
assert {:query, ^expected_query, ^expected_next_state} =
ReplicationConnection.handle_connect(state)
end
end
describe "handle_result/2 callback" do
test "handle_result transitions from create_publication to create_replication_slot when publication exists" do
state = %{@mock_state | step: :create_publication}
# Mock a successful result for the SELECT query
result = %Postgrex.Result{
command: :select,
columns: ["?column?"],
num_rows: 1,
rows: [[1]]
}
expected_query =
"SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'"
expected_next_state = %{state | step: :create_replication_slot}
assert {:query, ^expected_query, ^expected_next_state} =
ReplicationConnection.handle_result(result, state)
end
test "handle_result transitions from create_replication_slot to start_replication_slot when slot exists" do
state = %{@mock_state | step: :create_replication_slot}
# Mock a successful result for the SELECT query
result = %Postgrex.Result{
command: :select,
columns: ["?column?"],
num_rows: 1,
rows: [[1]]
}
expected_query =
"CREATE_REPLICATION_SLOT #{state.replication_slot_name} LOGICAL #{state.output_plugin} NOEXPORT_SNAPSHOT"
expected_next_state = %{state | step: :start_replication_slot}
expected_stream_query =
"START_REPLICATION SLOT \"#{state.replication_slot_name}\" LOGICAL 0/0 (proto_version '#{state.proto_version}', publication_names '#{state.publication_name}')"
# Should be :streaming directly? Check impl.
expected_next_state_direct = %{state | step: :start_replication_slot}
# Let's assume it first goes to :start_replication_slot step, then handle_result for *that* step triggers START_REPLICATION
assert {:query, _query, ^expected_next_state_direct} =
ReplicationConnection.handle_result(result, state)
end
test "handle_result transitions from start_replication_slot to streaming" do
state = %{@mock_state | step: :start_replication_slot}
# Mock a successful result for the CREATE_REPLICATION_SLOT or preceding step
result = %Postgrex.Result{
# Or whatever command led here
command: :create_replication_slot,
columns: nil,
num_rows: 0,
rows: nil
}
expected_stream_query =
"START_REPLICATION SLOT \"#{state.replication_slot_name}\" LOGICAL 0/0 (proto_version '#{state.proto_version}', publication_names '#{state.publication_name}')"
expected_next_state = %{state | step: :streaming}
assert {:stream, ^expected_stream_query, [], ^expected_next_state} =
ReplicationConnection.handle_result(result, state)
end
test "handle_result creates publication if it doesn't exist" do
state = %{@mock_state | step: :create_publication}
# Mock result indicating publication doesn't exist
result = %Postgrex.Result{
command: :select,
columns: ["?column?"],
num_rows: 0,
rows: []
}
# Combine schema and table names correctly
expected_tables =
state.table_subscriptions
|> Enum.map(fn table -> "#{state.schema}.#{table}" end)
|> Enum.join(",")
expected_query = "CREATE PUBLICATION #{state.publication_name} FOR TABLE #{expected_tables}"
# The original test expected the next step to be :check_replication_slot, let's keep that
expected_next_state = %{state | step: :check_replication_slot}
assert {:query, ^expected_query, ^expected_next_state} =
ReplicationConnection.handle_result(result, state)
end
test "handle_result transitions from check_replication_slot to create_replication_slot after creating publication" do
state = %{@mock_state | step: :check_replication_slot}
# Mock a successful result from the CREATE PUBLICATION command
result = %Postgrex.Result{
command: :create_publication,
columns: nil,
num_rows: 0,
rows: nil
}
expected_query =
"SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'"
expected_next_state = %{state | step: :create_replication_slot}
assert {:query, ^expected_query, ^expected_next_state} =
ReplicationConnection.handle_result(result, state)
end
test "handle_result creates replication slot if it doesn't exist" do
state = %{@mock_state | step: :create_replication_slot}
# Mock result indicating slot doesn't exist
result = %Postgrex.Result{
command: :select,
columns: ["?column?"],
num_rows: 0,
rows: []
}
expected_query =
"CREATE_REPLICATION_SLOT #{state.replication_slot_name} LOGICAL #{state.output_plugin} NOEXPORT_SNAPSHOT"
expected_next_state = %{state | step: :start_replication_slot}
assert {:query, ^expected_query, ^expected_next_state} =
ReplicationConnection.handle_result(result, state)
end
end
# --- handle_data tests remain unchanged ---
# In-depth decoding tests are handled in Domain.Events.DecoderTest
describe "handle_data/2" do
test "handle_data handles KeepAlive with reply :now" do
state = %{@mock_state | step: :streaming}
wal_end = 12345
# Keepalive doesn't use this field meaningfully here
server_wal_start = 0
# Reply requested
reply_requested = 1
now_microseconds =
System.os_time(:microsecond) - DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond)
# 100 milliseconds tolerance for clock check
grace_period_microseconds = 100_000
keepalive_data = <<?k, wal_end::64, server_wal_start::64, reply_requested::8>>
# Expected reply format: 'r', confirmed_lsn::64, confirmed_lsn_commit::64, no_reply::8, high_priority::8, clock::64
# The actual implementation might construct the reply differently.
# This assertion needs to match the exact binary structure returned by handle_data.
# Let's assume the implementation sends back the received wal_end as confirmed LSNs,
# and the current time. The no_reply and high_priority flags might be 0.
assert {:reply, reply_binary, ^state} =
ReplicationConnection.handle_data(keepalive_data, state)
# Deconstruct the reply to verify its parts
assert <<?r, confirmed_lsn::64, confirmed_lsn_commit::64, no_reply::8, high_priority::8,
clock::64>> = reply_binary
assert confirmed_lsn == wal_end
# Or potentially server_wal_start? Check impl.
assert confirmed_lsn_commit == wal_end
assert no_reply == 0
assert high_priority == 0
assert now_microseconds <= clock < now_microseconds + grace_period_microseconds
end
test "handle_data handles KeepAlive with reply :later" do
state = %{@mock_state | step: :streaming}
wal_end = 54321
server_wal_start = 0
# No reply requested
reply_requested = 0
keepalive_data = <<?k, wal_end::64, server_wal_start::64, reply_requested::8>>
# When no reply is requested, it should return :noreply with no binary message
assert {:noreply, [], ^state} =
ReplicationConnection.handle_data(keepalive_data, state)
end
test "handle_data handles Write message (XLogData)" do
state = %{@mock_state | step: :streaming}
server_wal_start = 123_456_789
# This is the LSN of the end of the WAL data in this message
server_wal_end = 987_654_321
# Timestamp in microseconds since PG epoch
server_system_clock = 1_234_567_890
# Example decoded message data (e.g., a BEGIN message binary)
# This data should be passed to handle_info via send(self(), decoded_msg)
message_binary =
<<"B", @lsn_binary || <<0::64>>::binary-8, @timestamp_int || 0::integer-64,
@xid || 0::integer-32>>
write_data =
<<?w, server_wal_start::64, server_wal_end::64, server_system_clock::64,
message_binary::binary>>
# handle_data for 'w' should decode the message_binary and send it to self()
# It returns {:noreply, [], state} because the reply/acknowledgement happens
# via the KeepAlive ('k') mechanism.
assert {:noreply, [], ^state} = ReplicationConnection.handle_data(write_data, state)
# Assert that the decoded message was sent to self()
# Note: This requires the test process to receive the message.
# You might need `allow_receive` or similar testing patterns if handle_data
# directly uses `send`. If it calls another function that sends, test that function.
# Let's assume handle_data directly sends for this example.
# Need some sample data defined earlier for the assertion
@lsn_binary <<0::integer-32, 23_785_280::integer-32>>
@timestamp_int 704_521_200_000
@xid 1234
@timestamp_decoded ~U[2022-04-29 12:20:00.000000Z]
@lsn_decoded {0, 23_785_280}
expected_decoded_msg = %Messages.Begin{
final_lsn: @lsn_decoded,
commit_timestamp: @timestamp_decoded,
xid: @xid
}
assert_receive(^expected_decoded_msg)
end
test "handle_data handles unknown message type" do
state = %{@mock_state | step: :streaming}
# Using 'q' as an example unknown type
unknown_data = <<?q, 1, 2, 3>>
# Expect it to ignore unknown types and return noreply
assert {:noreply, [], ^state} = ReplicationConnection.handle_data(unknown_data, state)
# Optionally, assert that a warning was logged if applicable
end
end
# --- handle_info tests are CORRECTED below ---
describe "handle_info/2" do
test "handle_info updates relations on Relation message" do
state = @mock_state
# Use the correct fields from Messages.Relation struct
relation_msg = %Messages.Relation{
id: 101,
namespace: "public",
name: "accounts",
# Added replica_identity
replica_identity: :default,
columns: [
%Messages.Relation.Column{
flags: [:key],
name: "id",
type: "int4",
type_modifier: -1
},
%Messages.Relation.Column{
flags: [],
name: "name",
type: "text",
type_modifier: -1
}
]
}
# The state should store the relevant parts of the relation message, keyed by ID
expected_relation_data = %{
namespace: "public",
name: "accounts",
replica_identity: :default,
columns: [
%Messages.Relation.Column{
flags: [:key],
name: "id",
type: "int4",
type_modifier: -1
},
%Messages.Relation.Column{
flags: [],
name: "name",
type: "text",
type_modifier: -1
}
]
}
expected_relations = %{101 => expected_relation_data}
expected_state = %{state | relations: expected_relations}
assert {:noreply, ^expected_state} = ReplicationConnection.handle_info(relation_msg, state)
end
test "handle_info returns noreply for Insert message" do
# Pre-populate state with relation info if the handler needs it
state = %{
@mock_state
| relations: %{
101 => %{
name: "accounts",
namespace: "public",
columns: [
%Messages.Relation.Column{name: "id", type: "int4"},
%Messages.Relation.Column{name: "name", type: "text"}
]
}
}
}
# Use the correct field: tuple_data (which is a tuple)
insert_msg = %Messages.Insert{relation_id: 101, tuple_data: {1, "Alice"}}
# handle_info likely broadcasts or processes the insert, but returns noreply
assert {:noreply, ^state} = ReplicationConnection.handle_info(insert_msg, state)
# Add assertions here if handle_info is expected to send messages or call other funcs
end
test "handle_info returns noreply for Update message" do
state = %{
@mock_state
| relations: %{
101 => %{
name: "accounts",
namespace: "public",
columns: [
%Messages.Relation.Column{name: "id", type: "int4"},
%Messages.Relation.Column{name: "name", type: "text"}
]
}
}
}
# Use the correct fields: relation_id, old_tuple_data, tuple_data, changed_key_tuple_data
update_msg = %Messages.Update{
relation_id: 101,
# Example: only old data provided
old_tuple_data: {1, "Alice"},
# Example: new data
tuple_data: {1, "Bob"},
# Example: key didn't change or wasn't provided
changed_key_tuple_data: nil
}
assert {:noreply, ^state} = ReplicationConnection.handle_info(update_msg, state)
# Add assertions for side effects (broadcasts etc.) if needed
end
test "handle_info returns noreply for Delete message" do
state = %{
@mock_state
| relations: %{
101 => %{
name: "accounts",
namespace: "public",
columns: [
%Messages.Relation.Column{name: "id", type: "int4"},
%Messages.Relation.Column{name: "name", type: "text"}
]
}
}
}
# Use the correct fields: relation_id, old_tuple_data, changed_key_tuple_data
delete_msg = %Messages.Delete{
relation_id: 101,
# Example: old data provided
old_tuple_data: {1, "Bob"},
# Example: key data not provided
changed_key_tuple_data: nil
}
assert {:noreply, ^state} = ReplicationConnection.handle_info(delete_msg, state)
# Add assertions for side effects if needed
end
test "handle_info ignores Begin message" do
state = @mock_state
# Use correct fields: final_lsn, commit_timestamp, xid
begin_msg = %Messages.Begin{
final_lsn: {0, 123},
commit_timestamp: ~U[2023-01-01 10:00:00Z],
xid: 789
}
assert {:noreply, ^state} = ReplicationConnection.handle_info(begin_msg, state)
end
test "handle_info ignores Commit message" do
state = @mock_state
# Use correct fields: flags, lsn, end_lsn, commit_timestamp
commit_msg = %Messages.Commit{
flags: [],
lsn: {0, 123},
end_lsn: {0, 456},
commit_timestamp: ~U[2023-01-01 10:00:01Z]
}
assert {:noreply, ^state} = ReplicationConnection.handle_info(commit_msg, state)
end
test "handle_info ignores Origin message" do
state = @mock_state
# Use correct fields: origin_commit_lsn, name
origin_msg = %Messages.Origin{origin_commit_lsn: {0, 1}, name: "origin_name"}
assert {:noreply, ^state} = ReplicationConnection.handle_info(origin_msg, state)
end
test "handle_info ignores Truncate message" do
state = @mock_state
# Use correct fields: number_of_relations, options, truncated_relations
truncate_msg = %Messages.Truncate{
number_of_relations: 2,
options: [:cascade],
truncated_relations: [101, 102]
}
assert {:noreply, ^state} = ReplicationConnection.handle_info(truncate_msg, state)
end
test "handle_info ignores Type message" do
state = @mock_state
# Use correct fields: id, namespace, name
type_msg = %Messages.Type{id: 23, namespace: "pg_catalog", name: "int4"}
assert {:noreply, ^state} = ReplicationConnection.handle_info(type_msg, state)
end
test "handle_info returns noreply for Unsupported message" do
state = @mock_state
unsupported_msg = %Messages.Unsupported{data: <<1, 2, 3>>}
# We cannot easily verify Logger.warning was called without mocks/capture.
assert {:noreply, ^state} = ReplicationConnection.handle_info(unsupported_msg, state)
end
test "handle_info handles :shutdown message" do
state = @mock_state
# Expect :disconnect tuple based on common GenServer patterns for shutdown
assert {:stop, :normal, ^state} = ReplicationConnection.handle_info(:shutdown, state)
# Note: The original test asserted {:disconnect, :normal}. {:stop, :normal, state} is
# the standard GenServer return for a clean stop triggered by handle_info. Adjust
# if your implementation specifically returns :disconnect.
end
test "handle_info handles :DOWN message from monitored process" do
state = @mock_state
monitor_ref = make_ref()
# Example DOWN message structure
down_msg = {:DOWN, monitor_ref, :process, :some_pid, :shutdown}
# Expect the server to stop itself upon receiving DOWN for a critical process
assert {:stop, :normal, ^state} = ReplicationConnection.handle_info(down_msg, state)
# Again, adjust the expected return (:disconnect vs :stop) based on implementation.
end
test "handle_info ignores other messages" do
state = @mock_state
random_msg = {:some_other_info, "data"}
assert {:noreply, ^state} = ReplicationConnection.handle_info(random_msg, state)
end
end
# --- Moved handle_disconnect test to its own describe block ---
describe "handle_disconnect/1" do
test "handle_disconnect resets step to :disconnected and logs warning" do
state = %{@mock_state | step: :streaming}
expected_state = %{state | step: :disconnected}
# Capture log to verify warning (requires ExUnit config)
log_output =
ExUnit.CaptureLog.capture_log(fn ->
assert {:noreply, ^expected_state} = ReplicationConnection.handle_disconnect(state)
end)
assert log_output =~ "Replication connection disconnected."
# Or match the exact log message if needed
end
end
end