From 968db2ae39f87fc38633cf6dea5ee89476b2516b Mon Sep 17 00:00:00 2001 From: Jamil Date: Tue, 29 Apr 2025 23:53:06 -0700 Subject: [PATCH] feat(portal): Receive WAL events (#8909) Firezone's control plane is a realtime, distributed system that relies on a broadcast/subscribe system to function. In many cases, these events are broadcasted whenever relevant data in the DB changes, such as an actor losing access to a policy, a membership being deleted, and so forth. Today, this is handled in the application layer, typically happening at the place where the relevant DB call is made (i.e. in an `after_commit`). While this approach has worked thus far, it has several issues: 1. We have no guarantee that the DB change will issue a broadcast. If the application is deployed or the process crashes after the DB changes are made but before the broadcast happens, we will have potentially failed to update any connected clients or gateways with the changes. 2. We have no guarantee that the order of DB updates will be maintained in order for broadcasts. In other words, app server A could win its DB operation against app server B, but then proceed to lose being the first to broadcast. 3. If the cluster is in a bad state where broadcasts may return an error (i.e. https://github.com/firezone/firezone/issues/8660), we will never retry the broadcast. To fix the above issues, we introduce a WAL logical decoder that process the event stream one message at a time and performs any needed work. Serializability is guaranteed since we only process the WAL in a single, cluster-global process, `ReplicationConnection`. Durability is also guaranteed since we only ACK WAL segments after we've successfully ingested the event. This means we will only advance the position of our WAL stream after successfully broadcasting the event. This PR only introduces the WAL stream processing system but does not introduce any changes to our current broadcasting behavior - that's saved for another PR. --- .github/actions/setup-postgres/action.yml | 40 ++ .github/workflows/_elixir.yml | 41 +- docker-compose.yml | 8 + elixir/apps/domain/lib/domain/accounts.ex | 3 + elixir/apps/domain/lib/domain/actors.ex | 10 + .../lib/domain/actors/membership/sync.ex | 2 + elixir/apps/domain/lib/domain/application.ex | 12 + elixir/apps/domain/lib/domain/clients.ex | 8 + .../domain/lib/domain/config/definitions.ex | 10 + .../apps/domain/lib/domain/events/decoder.ex | 370 +++++++++++++ elixir/apps/domain/lib/domain/events/event.ex | 325 +++++++++++ .../lib/domain/events/hooks/accounts.ex | 13 + .../events/hooks/actor_group_memberships.ex | 13 + .../lib/domain/events/hooks/actor_groups.ex | 13 + .../domain/lib/domain/events/hooks/actors.ex | 13 + .../domain/events/hooks/auth_identities.ex | 13 + .../lib/domain/events/hooks/auth_providers.ex | 13 + .../domain/lib/domain/events/hooks/clients.ex | 13 + .../domain/events/hooks/flow_activities.ex | 13 + .../domain/lib/domain/events/hooks/flows.ex | 13 + .../lib/domain/events/hooks/gateway_groups.ex | 13 + .../lib/domain/events/hooks/gateways.ex | 13 + .../lib/domain/events/hooks/policies.ex | 13 + .../lib/domain/events/hooks/relay_groups.ex | 13 + .../domain/lib/domain/events/hooks/relays.ex | 13 + .../events/hooks/resource_connections.ex | 13 + .../lib/domain/events/hooks/resources.ex | 13 + .../domain/lib/domain/events/hooks/tokens.ex | 13 + .../domain/lib/domain/events/oid_database.ex | 163 ++++++ .../apps/domain/lib/domain/events/protocol.ex | 67 +++ .../lib/domain/events/protocol/keep_alive.ex | 24 + .../lib/domain/events/protocol/write.ex | 22 + .../domain/events/replication_connection.ex | 312 +++++++++++ elixir/apps/domain/lib/domain/flows.ex | 2 + elixir/apps/domain/lib/domain/gateways.ex | 2 + elixir/apps/domain/lib/domain/policies.ex | 6 + elixir/apps/domain/lib/domain/resources.ex | 6 + elixir/apps/domain/lib/domain/tokens.ex | 2 + ...04_set_tables_to_replica_identity_full.exs | 29 + ...utomatically_set_replica_identity_full.exs | 39 ++ .../test/domain/events/decoder_test.exs | 470 ++++++++++++++++ .../domain/test/domain/events/event_test.exs | 47 ++ .../domain/events/hooks/accounts_test.exs | 26 + .../hooks/actor_group_memberships_test.exs | 26 + .../domain/events/hooks/actor_groups_test.exs | 26 + .../test/domain/events/hooks/actors_test.exs | 26 + .../events/hooks/auth_identities_test.exs | 26 + .../events/hooks/auth_providers_test.exs | 26 + .../test/domain/events/hooks/clients_test.exs | 26 + .../events/hooks/flow_activities_test.exs | 26 + .../test/domain/events/hooks/flows_test.exs | 26 + .../events/hooks/gateway_groups_test.exs | 26 + .../domain/events/hooks/gateways_test.exs | 26 + .../domain/events/hooks/policies_test.exs | 26 + .../domain/events/hooks/relay_groups_test.exs | 26 + .../test/domain/events/hooks/relays_test.exs | 26 + .../hooks/resource_connections_test.exs | 26 + .../domain/events/hooks/resources_test.exs | 26 + .../test/domain/events/hooks/tokens_test.exs | 26 + .../events/replication_connection_test.exs | 215 ++++++++ elixir/config/config.exs | 36 ++ elixir/config/runtime.exs | 14 + elixir/config/test.exs | 8 + elixir/test.exs | 515 ++++++++++++++++++ 64 files changed, 3439 insertions(+), 32 deletions(-) create mode 100644 .github/actions/setup-postgres/action.yml create mode 100644 elixir/apps/domain/lib/domain/events/decoder.ex create mode 100644 elixir/apps/domain/lib/domain/events/event.ex create mode 100644 elixir/apps/domain/lib/domain/events/hooks/accounts.ex create mode 100644 elixir/apps/domain/lib/domain/events/hooks/actor_group_memberships.ex create mode 100644 elixir/apps/domain/lib/domain/events/hooks/actor_groups.ex create mode 100644 elixir/apps/domain/lib/domain/events/hooks/actors.ex create mode 100644 elixir/apps/domain/lib/domain/events/hooks/auth_identities.ex create mode 100644 elixir/apps/domain/lib/domain/events/hooks/auth_providers.ex create mode 100644 elixir/apps/domain/lib/domain/events/hooks/clients.ex create mode 100644 elixir/apps/domain/lib/domain/events/hooks/flow_activities.ex create mode 100644 elixir/apps/domain/lib/domain/events/hooks/flows.ex create mode 100644 elixir/apps/domain/lib/domain/events/hooks/gateway_groups.ex create mode 100644 elixir/apps/domain/lib/domain/events/hooks/gateways.ex create mode 100644 elixir/apps/domain/lib/domain/events/hooks/policies.ex create mode 100644 elixir/apps/domain/lib/domain/events/hooks/relay_groups.ex create mode 100644 elixir/apps/domain/lib/domain/events/hooks/relays.ex create mode 100644 elixir/apps/domain/lib/domain/events/hooks/resource_connections.ex create mode 100644 elixir/apps/domain/lib/domain/events/hooks/resources.ex create mode 100644 elixir/apps/domain/lib/domain/events/hooks/tokens.ex create mode 100644 elixir/apps/domain/lib/domain/events/oid_database.ex create mode 100644 elixir/apps/domain/lib/domain/events/protocol.ex create mode 100644 elixir/apps/domain/lib/domain/events/protocol/keep_alive.ex create mode 100644 elixir/apps/domain/lib/domain/events/protocol/write.ex create mode 100644 elixir/apps/domain/lib/domain/events/replication_connection.ex create mode 100644 elixir/apps/domain/priv/repo/migrations/20250426183104_set_tables_to_replica_identity_full.exs create mode 100644 elixir/apps/domain/priv/repo/migrations/20250428102100_automatically_set_replica_identity_full.exs create mode 100644 elixir/apps/domain/test/domain/events/decoder_test.exs create mode 100644 elixir/apps/domain/test/domain/events/event_test.exs create mode 100644 elixir/apps/domain/test/domain/events/hooks/accounts_test.exs create mode 100644 elixir/apps/domain/test/domain/events/hooks/actor_group_memberships_test.exs create mode 100644 elixir/apps/domain/test/domain/events/hooks/actor_groups_test.exs create mode 100644 elixir/apps/domain/test/domain/events/hooks/actors_test.exs create mode 100644 elixir/apps/domain/test/domain/events/hooks/auth_identities_test.exs create mode 100644 elixir/apps/domain/test/domain/events/hooks/auth_providers_test.exs create mode 100644 elixir/apps/domain/test/domain/events/hooks/clients_test.exs create mode 100644 elixir/apps/domain/test/domain/events/hooks/flow_activities_test.exs create mode 100644 elixir/apps/domain/test/domain/events/hooks/flows_test.exs create mode 100644 elixir/apps/domain/test/domain/events/hooks/gateway_groups_test.exs create mode 100644 elixir/apps/domain/test/domain/events/hooks/gateways_test.exs create mode 100644 elixir/apps/domain/test/domain/events/hooks/policies_test.exs create mode 100644 elixir/apps/domain/test/domain/events/hooks/relay_groups_test.exs create mode 100644 elixir/apps/domain/test/domain/events/hooks/relays_test.exs create mode 100644 elixir/apps/domain/test/domain/events/hooks/resource_connections_test.exs create mode 100644 elixir/apps/domain/test/domain/events/hooks/resources_test.exs create mode 100644 elixir/apps/domain/test/domain/events/hooks/tokens_test.exs create mode 100644 elixir/apps/domain/test/domain/events/replication_connection_test.exs create mode 100644 elixir/test.exs diff --git a/.github/actions/setup-postgres/action.yml b/.github/actions/setup-postgres/action.yml new file mode 100644 index 000000000..f8e6ba601 --- /dev/null +++ b/.github/actions/setup-postgres/action.yml @@ -0,0 +1,40 @@ +name: "Setup Postgres" +description: "Starts a Postgres container" +inputs: + version: + default: "latest" + description: "Postgres version" + required: false + port: + default: "5432" + description: "Port to expose" + required: false + username: + default: "postgres" + description: "Username" + required: false + password: + default: "postgres" + description: "Password" + required: false + options: + default: "" + description: "Additional options to pass to the container" + required: false +runs: + using: "composite" + steps: + - name: Start Postgres + id: start-postgres + shell: bash + run: | + docker run \ + --name postgres \ + --env POSTGRES_USER=${{ inputs.username }} \ + --env POSTGRES_PASSWORD=${{ inputs.password }} \ + --publish ${{ inputs.port }}:5432 \ + --health-cmd pg_isready \ + --health-interval 10s \ + --health-timeout 5s \ + --health-retries 5 \ + --detach postgres:${{ inputs.version }} postgres -c "wal_level=logical" diff --git a/.github/workflows/_elixir.yml b/.github/workflows/_elixir.yml index 0c1267a10..fb20633b9 100644 --- a/.github/workflows/_elixir.yml +++ b/.github/workflows/_elixir.yml @@ -14,19 +14,11 @@ jobs: MIX_ENV: test POSTGRES_HOST: localhost GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - services: - postgres: - image: postgres:15 - ports: - - 5432:5432 - env: - POSTGRES_USER: postgres - POSTGRES_PASSWORD: postgres - options: >- - --health-cmd pg_isready --health-interval 10s --health-timeout 5s - --health-retries 5 steps: - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + - uses: ./.github/actions/setup-postgres + with: + version: 15 - uses: ./.github/actions/setup-elixir with: mix_env: ${{ env.MIX_ENV }} @@ -129,19 +121,11 @@ jobs: MIX_ENV: dev POSTGRES_HOST: localhost GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - services: - postgres: - image: postgres:15 - ports: - - 5432:5432 - env: - POSTGRES_USER: postgres - POSTGRES_PASSWORD: postgres - options: >- - --health-cmd pg_isready --health-interval 10s --health-timeout 5s - --health-retries 5 steps: - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + - uses: ./.github/actions/setup-postgres + with: + version: 15 - uses: ./.github/actions/setup-elixir with: mix_env: ${{ env.MIX_ENV }} @@ -184,16 +168,6 @@ jobs: matrix: MIX_TEST_PARTITION: [1] services: - postgres: - image: postgres:15 - ports: - - 5432:5432 - env: - POSTGRES_USER: postgres - POSTGRES_PASSWORD: postgres - options: >- - --health-cmd pg_isready --health-interval 10s --health-timeout 5s - --health-retries 5 vault: image: vault:1.12.2 env: @@ -204,6 +178,9 @@ jobs: options: --cap-add=IPC_LOCK steps: - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + - uses: ./.github/actions/setup-postgres + with: + version: 15 - uses: nanasess/setup-chromedriver@e93e57b843c0c92788f22483f1a31af8ee48db25 # v2.3.0 - run: | export DISPLAY=:99 diff --git a/docker-compose.yml b/docker-compose.yml index 89eb5b1b3..4c90ae176 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -80,6 +80,8 @@ services: DATABASE_NAME: firezone_dev DATABASE_USER: postgres DATABASE_PASSWORD: postgres + DATABASE_REPLICATION_USER: postgres + DATABASE_REPLICATION_PASSWORD: postgres # Auth AUTH_PROVIDER_ADAPTERS: "email,openid_connect,userpass,token,google_workspace,microsoft_entra,okta,jumpcloud,mock" # Secrets @@ -152,6 +154,8 @@ services: DATABASE_NAME: firezone_dev DATABASE_USER: postgres DATABASE_PASSWORD: postgres + DATABASE_REPLICATION_USER: postgres + DATABASE_REPLICATION_PASSWORD: postgres # Auth AUTH_PROVIDER_ADAPTERS: "email,openid_connect,userpass,token,google_workspace,microsoft_entra,okta,jumpcloud,mock" # Secrets @@ -216,6 +220,8 @@ services: DATABASE_NAME: firezone_dev DATABASE_USER: postgres DATABASE_PASSWORD: postgres + DATABASE_REPLICATION_USER: postgres + DATABASE_REPLICATION_PASSWORD: postgres # Auth AUTH_PROVIDER_ADAPTERS: "email,openid_connect,userpass,token,google_workspace,microsoft_entra,okta,jumpcloud,mock" # Secrets @@ -284,6 +290,8 @@ services: DATABASE_NAME: firezone_dev DATABASE_USER: postgres DATABASE_PASSWORD: postgres + DATABASE_REPLICATION_USER: postgres + DATABASE_REPLICATION_PASSWORD: postgres # Auth AUTH_PROVIDER_ADAPTERS: "email,openid_connect,userpass,token,google_workspace,microsoft_entra,okta,jumpcloud,mock" # Secrets diff --git a/elixir/apps/domain/lib/domain/accounts.ex b/elixir/apps/domain/lib/domain/accounts.ex index 6c361a354..1be9a3e7f 100644 --- a/elixir/apps/domain/lib/domain/accounts.ex +++ b/elixir/apps/domain/lib/domain/accounts.ex @@ -98,6 +98,7 @@ defmodule Domain.Accounts do {:ok, account} {:ok, account} -> + # TODO: WAL :ok = Domain.Clients.disconnect_account_clients(account) {:ok, account} @@ -109,6 +110,7 @@ defmodule Domain.Accounts do defp on_account_update(account, changeset) do :ok = Billing.on_account_update(account, changeset) + # TODO: WAL if Ecto.Changeset.changed?(changeset, :config) do broadcast_config_update_to_account(account) else @@ -173,6 +175,7 @@ defmodule Domain.Accounts do account_or_id |> account_topic() |> PubSub.unsubscribe() end + # TODO: WAL defp broadcast_config_update_to_account(%Account{} = account) do broadcast_to_account(account.id, :config_changed) end diff --git a/elixir/apps/domain/lib/domain/actors.ex b/elixir/apps/domain/lib/domain/actors.ex index 37044eaef..c945f5506 100644 --- a/elixir/apps/domain/lib/domain/actors.ex +++ b/elixir/apps/domain/lib/domain/actors.ex @@ -175,6 +175,7 @@ defmodule Domain.Actors do case Repo.insert(changeset) do {:ok, group} -> + # TODO: WAL :ok = broadcast_group_memberships_events(group, changeset) {:ok, group} @@ -189,6 +190,7 @@ defmodule Domain.Actors do case Repo.insert(changeset) do {:ok, group} -> + # TODO: WAL :ok = broadcast_group_memberships_events(group, changeset) {:ok, group} @@ -227,6 +229,7 @@ defmodule Domain.Actors do |> Repo.preload(:memberships) |> Group.Changeset.update(attrs) end, + # TODO: WAL after_commit: fn _actor, changeset -> broadcast_memberships_events(changeset) end ) end @@ -252,6 +255,7 @@ defmodule Domain.Actors do {:ok, group} = Repo.update(changeset) + # TODO: WAL :ok = broadcast_memberships_events(changeset) group @@ -274,6 +278,7 @@ defmodule Domain.Actors do |> Membership.Query.returning_all() |> Repo.delete_all() + # TODO: WAL :ok = broadcast_membership_removal_events(memberships) {:ok, group} @@ -301,6 +306,7 @@ defmodule Domain.Actors do |> Membership.Query.returning_all() |> Repo.delete_all() + # TODO: WAL :ok = broadcast_membership_removal_events(memberships) with {:ok, groups} <- delete_groups(queryable, subject) do @@ -339,6 +345,7 @@ defmodule Domain.Actors do |> Membership.Query.returning_all() |> Repo.delete_all() + # TODO: WAL :ok = broadcast_membership_removal_events(memberships) {:ok, groups} @@ -522,6 +529,7 @@ defmodule Domain.Actors do true -> :cant_remove_admin_type end end, + # TODO: WAL after_commit: fn _actor, changeset -> broadcast_memberships_events(changeset) end ) end @@ -606,6 +614,7 @@ defmodule Domain.Actors do |> Repo.delete_all() {:ok, _groups} = update_dynamic_group_memberships(actor.account_id) + # TODO: WAL :ok = broadcast_membership_removal_events(memberships) {:ok, _tokens} = Tokens.delete_tokens_for(actor, subject) @@ -676,6 +685,7 @@ defmodule Domain.Actors do actor_or_id |> actor_memberships_topic() |> PubSub.unsubscribe() end + # TODO: WAL defp broadcast_memberships_events(changeset) do if changeset.valid? and Ecto.Changeset.changed?(changeset, :memberships) do case Ecto.Changeset.apply_action(changeset, :update) do diff --git a/elixir/apps/domain/lib/domain/actors/membership/sync.ex b/elixir/apps/domain/lib/domain/actors/membership/sync.ex index 7e162acc7..238bc7977 100644 --- a/elixir/apps/domain/lib/domain/actors/membership/sync.ex +++ b/elixir/apps/domain/lib/domain/actors/membership/sync.ex @@ -28,11 +28,13 @@ defmodule Domain.Actors.Membership.Sync do {:ok, inserted} <- insert_memberships(provider, insert) do :ok = Enum.each(insert, fn {group_id, actor_id} -> + # TODO: WAL Actors.broadcast_membership_event(:create, actor_id, group_id) end) :ok = Enum.each(delete, fn {group_id, actor_id} -> + # TODO: WAL Actors.broadcast_membership_event(:delete, actor_id, group_id) end) diff --git a/elixir/apps/domain/lib/domain/application.ex b/elixir/apps/domain/lib/domain/application.ex index a7cc93f39..5dc366c09 100644 --- a/elixir/apps/domain/lib/domain/application.ex +++ b/elixir/apps/domain/lib/domain/application.ex @@ -22,6 +22,13 @@ defmodule Domain.Application do Domain.Repo, Domain.PubSub, + # WAL replication + %{ + id: Domain.Events.ReplicationConnection, + start: {Domain.Events.ReplicationConnection, :start_link, [replication_instance()]}, + restart: :transient + }, + # Infrastructure services # Note: only one of platform adapters will be actually started. Domain.GoogleCloudPlatform, @@ -45,6 +52,11 @@ defmodule Domain.Application do ] end + defp replication_instance do + config = Application.fetch_env!(:domain, Domain.Events.ReplicationConnection) + struct(Domain.Events.ReplicationConnection, config) + end + defp configure_logger do # Attach Oban to the logger Oban.Telemetry.attach_default_logger(encode: false, level: log_level()) diff --git a/elixir/apps/domain/lib/domain/clients.ex b/elixir/apps/domain/lib/domain/clients.ex index cb9c03dc0..cbd74b739 100644 --- a/elixir/apps/domain/lib/domain/clients.ex +++ b/elixir/apps/domain/lib/domain/clients.ex @@ -188,6 +188,7 @@ defmodule Domain.Clients do with: &Client.Changeset.update(&1, attrs), preload: [:online?] ) + # TODO: WAL |> case do {:ok, client} -> :ok = broadcast_to_client(client, :updated) @@ -212,6 +213,7 @@ defmodule Domain.Clients do |> case do {:ok, client} -> client = Repo.preload(client, [:verified_by_actor, :verified_by_identity]) + # TODO: WAL :ok = broadcast_to_client(client, :updated) {:ok, client} @@ -234,6 +236,7 @@ defmodule Domain.Clients do |> case do {:ok, client} -> {:ok, _flows} = Flows.expire_flows_for(client) + # TODO: WAL :ok = broadcast_to_client(client, :updated) {:ok, client} @@ -251,6 +254,7 @@ defmodule Domain.Clients do with :ok <- authorize_actor_client_management(client.actor_id, subject) do case delete_clients(queryable, subject) do {:ok, [client]} -> + # TODO: WAL :ok = disconnect_client(client) {:ok, client} @@ -268,6 +272,7 @@ defmodule Domain.Clients do with :ok <- Auth.ensure_has_permissions(subject, Authorizer.manage_clients_permission()), {:ok, _clients} <- delete_clients(queryable, subject) do + # TODO: WAL :ok = disconnect_actor_clients(actor) :ok end @@ -280,6 +285,7 @@ defmodule Domain.Clients do |> Client.Query.delete() |> Repo.update_all([]) + # TODO: WAL :ok = Enum.each(clients, &disconnect_client/1) {:ok, clients} @@ -305,6 +311,7 @@ defmodule Domain.Clients do {:ok, _} <- Presence.track(self(), actor_clients_presence_topic(client.actor_id), client.id, %{}) do :ok = PubSub.subscribe(client_topic(client)) + # TODO: WAL # :ok = PubSub.subscribe(actor_clients_topic(client.actor_id)) # :ok = PubSub.subscribe(identity_topic(client.actor_id)) :ok = PubSub.subscribe(account_clients_topic(client.account_id)) @@ -347,6 +354,7 @@ defmodule Domain.Clients do PubSub.unsubscribe(actor_clients_presence_topic(actor_or_id)) end + # TODO: WAL def broadcast_to_account_clients(account_or_id, payload) do account_or_id |> account_clients_topic() diff --git a/elixir/apps/domain/lib/domain/config/definitions.ex b/elixir/apps/domain/lib/domain/config/definitions.ex index cef78e186..1264e0411 100644 --- a/elixir/apps/domain/lib/domain/config/definitions.ex +++ b/elixir/apps/domain/lib/domain/config/definitions.ex @@ -290,6 +290,16 @@ defmodule Domain.Config.Definitions do """ defconfig(:database_password, :string, default: nil, sensitive: true) + @doc """ + Replication user that will be used to connect to replication slots. + """ + defconfig(:database_replication_user, :string, default: nil, sensitive: true) + + @doc """ + Replication password for the replication user. + """ + defconfig(:database_replication_password, :string, default: nil, sensitive: true) + @doc """ Size of the connection pool to the PostgreSQL database. """ diff --git a/elixir/apps/domain/lib/domain/events/decoder.ex b/elixir/apps/domain/lib/domain/events/decoder.ex new file mode 100644 index 000000000..f4ac1038e --- /dev/null +++ b/elixir/apps/domain/lib/domain/events/decoder.ex @@ -0,0 +1,370 @@ +# CREDIT: https://github.com/supabase/realtime/blob/main/lib/realtime/adapters/postgres/decoder.ex +defmodule Domain.Events.Decoder do + @moduledoc """ + Functions for decoding different types of logical replication messages. + """ + defmodule Messages do + @moduledoc """ + Different types of logical replication messages from Postgres + """ + defmodule Begin do + @moduledoc """ + Struct representing the BEGIN message in PostgreSQL's logical decoding output. + + * `final_lsn` - The LSN of the commit that this transaction ended at. + * `commit_timestamp` - The timestamp of the commit that this transaction ended at. + * `xid` - The transaction ID of this transaction. + """ + defstruct [:final_lsn, :commit_timestamp, :xid] + end + + defmodule Commit do + @moduledoc """ + Struct representing the COMMIT message in PostgreSQL's logical decoding output. + + * `flags` - Bitmask of flags associated with this commit. + * `lsn` - The LSN of the commit. + * `end_lsn` - The LSN of the next record in the WAL stream. + * `commit_timestamp` - The timestamp of the commit. + """ + defstruct [:flags, :lsn, :end_lsn, :commit_timestamp] + end + + defmodule Origin do + @moduledoc """ + Struct representing the ORIGIN message in PostgreSQL's logical decoding output. + + * `origin_commit_lsn` - The LSN of the commit in the database that the change originated from. + * `name` - The name of the origin. + """ + defstruct [:origin_commit_lsn, :name] + end + + defmodule Relation do + @moduledoc """ + Struct representing the RELATION message in PostgreSQL's logical decoding output. + + * `id` - The OID of the relation. + * `namespace` - The OID of the namespace that the relation belongs to. + * `name` - The name of the relation. + * `replica_identity` - The replica identity setting of the relation. + * `columns` - A list of columns in the relation. + """ + defstruct [:id, :namespace, :name, :replica_identity, :columns] + + defmodule Column do + @moduledoc """ + Struct representing a column in a relation. + + * `flags` - Bitmask of flags associated with this column. + * `name` - The name of the column. + * `type` - The OID of the data type of the column. + * `type_modifier` - The type modifier of the column. + """ + defstruct [:flags, :name, :type, :type_modifier] + end + end + + defmodule Insert do + @moduledoc """ + Struct representing the INSERT message in PostgreSQL's logical decoding output. + + * `relation_id` - The OID of the relation that the tuple was inserted into. + * `tuple_data` - The data of the inserted tuple. + """ + defstruct [:relation_id, :tuple_data] + end + + defmodule Update do + @moduledoc """ + Struct representing the UPDATE message in PostgreSQL's logical decoding output. + + * `relation_id` - The OID of the relation that the tuple was updated in. + * `changed_key_tuple_data` - The data of the tuple with the old key values. + * `old_tuple_data` - The data of the tuple before the update. + * `tuple_data` - The data of the tuple after the update. + """ + defstruct [:relation_id, :changed_key_tuple_data, :old_tuple_data, :tuple_data] + end + + defmodule Delete do + @moduledoc """ + Struct representing the DELETE message in PostgreSQL's logical decoding output. + + * `relation_id` - The OID of the relation that the tuple was deleted from. + * `changed_key_tuple_data` - The data of the tuple with the old key values. + * `old_tuple_data` - The data of the tuple before the delete. + """ + defstruct [:relation_id, :changed_key_tuple_data, :old_tuple_data] + end + + defmodule Truncate do + @moduledoc """ + Struct representing the TRUNCATE message in PostgreSQL's logical decoding output. + + * `number_of_relations` - The number of truncated relations. + * `options` - Additional options provided when truncating the relations. + * `truncated_relations` - List of relations that have been truncated. + """ + defstruct [:number_of_relations, :options, :truncated_relations] + end + + defmodule Type do + @moduledoc """ + Struct representing the TYPE message in PostgreSQL's logical decoding output. + + * `id` - The OID of the type. + * `namespace` - The namespace of the type. + * `name` - The name of the type. + """ + defstruct [:id, :namespace, :name] + end + + defmodule Unsupported do + @moduledoc """ + Struct representing an unsupported message in PostgreSQL's logical decoding output. + + * `data` - The raw data of the unsupported message. + """ + defstruct [:data] + end + end + + require Logger + + @pg_epoch DateTime.from_iso8601("2000-01-01T00:00:00Z") + + alias Messages.{ + Begin, + Commit, + Origin, + Relation, + Relation.Column, + Insert, + Update, + Delete, + Truncate, + Type, + Unsupported + } + + alias Domain.Events.OidDatabase + + @doc """ + Parses logical replication messages from Postgres + + ## Examples + + iex> decode_message(<<73, 0, 0, 96, 0, 78, 0, 2, 116, 0, 0, 0, 3, 98, 97, 122, 116, 0, 0, 0, 3, 53, 54, 48>>) + %Domain.Events.Decoder.Messages.Insert{relation_id: 24576, tuple_data: {"baz", "560"}} + + """ + def decode_message(message) when is_binary(message) do + decode_message_impl(message) + end + + defp decode_message_impl(<<"B", lsn::binary-8, timestamp::integer-64, xid::integer-32>>) do + %Begin{ + final_lsn: decode_lsn(lsn), + commit_timestamp: pgtimestamp_to_timestamp(timestamp), + xid: xid + } + end + + defp decode_message_impl( + <<"C", _flags::binary-1, lsn::binary-8, end_lsn::binary-8, timestamp::integer-64>> + ) do + %Commit{ + flags: [], + lsn: decode_lsn(lsn), + end_lsn: decode_lsn(end_lsn), + commit_timestamp: pgtimestamp_to_timestamp(timestamp) + } + end + + # TODO: Verify this is correct with real data from Postgres + defp decode_message_impl(<<"O", lsn::binary-8, name::binary>>) do + %Origin{ + origin_commit_lsn: decode_lsn(lsn), + name: name + } + end + + defp decode_message_impl(<<"R", id::integer-32, rest::binary>>) do + [ + namespace + | [name | [<>]] + ] = String.split(rest, <<0>>, parts: 3) + + # TODO: Handle case where pg_catalog is blank, we should still return the schema as pg_catalog + friendly_replica_identity = + case replica_identity do + "d" -> :default + "n" -> :nothing + "f" -> :all_columns + "i" -> :index + end + + %Relation{ + id: id, + namespace: namespace, + name: name, + replica_identity: friendly_replica_identity, + columns: decode_columns(columns) + } + end + + defp decode_message_impl( + <<"I", relation_id::integer-32, "N", number_of_columns::integer-16, tuple_data::binary>> + ) do + {<<>>, decoded_tuple_data} = decode_tuple_data(tuple_data, number_of_columns) + + %Insert{ + relation_id: relation_id, + tuple_data: decoded_tuple_data + } + end + + defp decode_message_impl( + <<"U", relation_id::integer-32, "N", number_of_columns::integer-16, tuple_data::binary>> + ) do + {<<>>, decoded_tuple_data} = decode_tuple_data(tuple_data, number_of_columns) + + %Update{ + relation_id: relation_id, + tuple_data: decoded_tuple_data + } + end + + defp decode_message_impl( + <<"U", relation_id::integer-32, key_or_old::binary-1, number_of_columns::integer-16, + tuple_data::binary>> + ) + when key_or_old == "O" or key_or_old == "K" do + {<<"N", new_number_of_columns::integer-16, new_tuple_binary::binary>>, old_decoded_tuple_data} = + decode_tuple_data(tuple_data, number_of_columns) + + {<<>>, decoded_tuple_data} = decode_tuple_data(new_tuple_binary, new_number_of_columns) + + base_update_msg = %Update{ + relation_id: relation_id, + tuple_data: decoded_tuple_data + } + + case key_or_old do + "K" -> Map.put(base_update_msg, :changed_key_tuple_data, old_decoded_tuple_data) + "O" -> Map.put(base_update_msg, :old_tuple_data, old_decoded_tuple_data) + end + end + + defp decode_message_impl( + <<"D", relation_id::integer-32, key_or_old::binary-1, number_of_columns::integer-16, + tuple_data::binary>> + ) + when key_or_old == "K" or key_or_old == "O" do + {<<>>, decoded_tuple_data} = decode_tuple_data(tuple_data, number_of_columns) + + base_delete_msg = %Delete{ + relation_id: relation_id + } + + case key_or_old do + "K" -> Map.put(base_delete_msg, :changed_key_tuple_data, decoded_tuple_data) + "O" -> Map.put(base_delete_msg, :old_tuple_data, decoded_tuple_data) + end + end + + defp decode_message_impl( + <<"T", number_of_relations::integer-32, options::integer-8, column_ids::binary>> + ) do + truncated_relations = + for relation_id_bin <- column_ids |> :binary.bin_to_list() |> Enum.chunk_every(4), + do: relation_id_bin |> :binary.list_to_bin() |> :binary.decode_unsigned() + + decoded_options = + case options do + 0 -> [] + 1 -> [:cascade] + 2 -> [:restart_identity] + 3 -> [:cascade, :restart_identity] + end + + %Truncate{ + number_of_relations: number_of_relations, + options: decoded_options, + truncated_relations: truncated_relations + } + end + + defp decode_message_impl(<<"Y", data_type_id::integer-32, namespace_and_name::binary>>) do + [namespace, name_with_null] = :binary.split(namespace_and_name, <<0>>) + name = String.slice(name_with_null, 0..-2//1) + + %Type{ + id: data_type_id, + namespace: namespace, + name: name + } + end + + defp decode_message_impl(binary), do: %Unsupported{data: binary} + + defp decode_tuple_data(binary, columns_remaining, accumulator \\ []) + + defp decode_tuple_data(remaining_binary, 0, accumulator) when is_binary(remaining_binary), + do: {remaining_binary, accumulator |> Enum.reverse() |> List.to_tuple()} + + defp decode_tuple_data(<<"n", rest::binary>>, columns_remaining, accumulator), + do: decode_tuple_data(rest, columns_remaining - 1, [nil | accumulator]) + + defp decode_tuple_data(<<"u", rest::binary>>, columns_remaining, accumulator), + do: decode_tuple_data(rest, columns_remaining - 1, [:unchanged_toast | accumulator]) + + defp decode_tuple_data( + <<"t", column_length::integer-32, rest::binary>>, + columns_remaining, + accumulator + ), + do: + decode_tuple_data( + :erlang.binary_part(rest, {byte_size(rest), -(byte_size(rest) - column_length)}), + columns_remaining - 1, + [ + :erlang.binary_part(rest, {0, column_length}) | accumulator + ] + ) + + defp decode_columns(binary, accumulator \\ []) + defp decode_columns(<<>>, accumulator), do: Enum.reverse(accumulator) + + defp decode_columns(<>, accumulator) do + [name | [<>]] = + String.split(rest, <<0>>, parts: 2) + + decoded_flags = + case flags do + 1 -> [:key] + _ -> [] + end + + decode_columns(columns, [ + %Column{ + name: name, + flags: decoded_flags, + type: OidDatabase.name_for_type_id(data_type_id), + # type: data_type_id, + type_modifier: type_modifier + } + | accumulator + ]) + end + + defp pgtimestamp_to_timestamp(microsecond_offset) when is_integer(microsecond_offset) do + {:ok, epoch, 0} = @pg_epoch + + DateTime.add(epoch, microsecond_offset, :microsecond) + end + + defp decode_lsn(<>), + do: {xlog_file, xlog_offset} +end diff --git a/elixir/apps/domain/lib/domain/events/event.ex b/elixir/apps/domain/lib/domain/events/event.ex new file mode 100644 index 000000000..ea40e82ef --- /dev/null +++ b/elixir/apps/domain/lib/domain/events/event.ex @@ -0,0 +1,325 @@ +defmodule Domain.Events.Event do + alias Domain.Events.Decoder + alias Domain.Events.Hooks + + require Logger + + @doc """ + Ingests a WAL write message from Postgres, transforms it into an event, and sends + it to the appropriate hook module for processing. + """ + def ingest(msg, relations) do + {op, tuple_data, old_tuple_data} = extract_msg_data(msg) + {:ok, relation} = Map.fetch(relations, msg.relation_id) + + table = relation.name + old_data = zip(old_tuple_data, relation.columns) + data = zip(tuple_data, relation.columns) + + process(op, table, old_data, data) + end + + ############ + # accounts # + ############ + + defp process(:insert, "accounts", _old_data, data) do + Hooks.Accounts.insert(data) + end + + defp process(:update, "accounts", old_data, data) do + Hooks.Accounts.update(old_data, data) + end + + defp process(:delete, "accounts", old_data, _data) do + Hooks.Accounts.delete(old_data) + end + + ########################### + # actor_group_memberships # + ########################### + + defp process(:insert, "actor_group_memberships", _old_data, data) do + Hooks.ActorGroupMemberships.insert(data) + end + + defp process(:update, "actor_group_memberships", old_data, data) do + Hooks.ActorGroupMemberships.update(old_data, data) + end + + defp process(:delete, "actor_group_memberships", old_data, _data) do + Hooks.ActorGroupMemberships.delete(old_data) + end + + ################ + # actor_groups # + ################ + + defp process(:insert, "actor_groups", _old_data, data) do + Hooks.ActorGroups.insert(data) + end + + defp process(:update, "actor_groups", old_data, data) do + Hooks.ActorGroups.update(old_data, data) + end + + defp process(:delete, "actor_groups", old_data, _data) do + Hooks.ActorGroups.delete(old_data) + end + + ########## + # actors # + ########## + + defp process(:insert, "actors", _old_data, data) do + Hooks.Actors.insert(data) + end + + defp process(:update, "actors", old_data, data) do + Hooks.Actors.update(old_data, data) + end + + defp process(:delete, "actors", old_data, _data) do + Hooks.Actors.delete(old_data) + end + + ################### + # auth_identities # + ################### + + defp process(:insert, "auth_identities", _old_data, data) do + Hooks.AuthIdentities.insert(data) + end + + defp process(:update, "auth_identities", old_data, data) do + Hooks.AuthIdentities.update(old_data, data) + end + + defp process(:delete, "auth_identities", old_data, _data) do + Hooks.AuthIdentities.delete(old_data) + end + + ################## + # auth_providers # + ################## + + defp process(:insert, "auth_providers", _old_data, data) do + Hooks.AuthProviders.insert(data) + end + + defp process(:update, "auth_providers", old_data, data) do + Hooks.AuthProviders.update(old_data, data) + end + + defp process(:delete, "auth_providers", old_data, _data) do + Hooks.AuthProviders.delete(old_data) + end + + ########### + # clients # + ########### + + defp process(:insert, "clients", _old_data, data) do + Hooks.Clients.insert(data) + end + + defp process(:update, "clients", old_data, data) do + Hooks.Clients.update(old_data, data) + end + + defp process(:delete, "clients", old_data, _data) do + Hooks.Clients.delete(old_data) + end + + ################### + # flow_activities # + ################### + + defp process(:insert, "flow_activities", _old_data, data) do + Hooks.FlowActivities.insert(data) + end + + defp process(:update, "flow_activities", old_data, data) do + Hooks.FlowActivities.update(old_data, data) + end + + defp process(:delete, "flow_activities", old_data, _data) do + Hooks.FlowActivities.delete(old_data) + end + + ######### + # flows # + ######### + + defp process(:insert, "flows", _old_data, data) do + Hooks.Flows.insert(data) + end + + defp process(:update, "flows", old_data, data) do + Hooks.Flows.update(old_data, data) + end + + defp process(:delete, "flows", old_data, _data) do + Hooks.Flows.delete(old_data) + end + + ################## + # gateway_groups # + ################## + + defp process(:insert, "gateway_groups", _old_data, data) do + Hooks.GatewayGroups.insert(data) + end + + defp process(:update, "gateway_groups", old_data, data) do + Hooks.GatewayGroups.update(old_data, data) + end + + defp process(:delete, "gateway_groups", old_data, _data) do + Hooks.GatewayGroups.delete(old_data) + end + + ############ + # gateways # + ############ + + defp process(:insert, "gateways", _old_data, data) do + Hooks.Gateways.insert(data) + end + + defp process(:update, "gateways", old_data, data) do + Hooks.Gateways.update(old_data, data) + end + + defp process(:delete, "gateways", old_data, _data) do + Hooks.Gateways.delete(old_data) + end + + ############ + # policies # + ############ + + defp process(:insert, "policies", _old_data, data) do + Hooks.Policies.insert(data) + end + + defp process(:update, "policies", old_data, data) do + Hooks.Policies.update(old_data, data) + end + + defp process(:delete, "policies", old_data, _data) do + Hooks.Policies.delete(old_data) + end + + ################ + # relay_groups # + ################ + + defp process(:insert, "relay_groups", _old_data, data) do + Hooks.RelayGroups.insert(data) + end + + defp process(:update, "relay_groups", old_data, data) do + Hooks.RelayGroups.update(old_data, data) + end + + defp process(:delete, "relay_groups", old_data, _data) do + Hooks.RelayGroups.delete(old_data) + end + + ########## + # relays # + ########## + + defp process(:insert, "relays", _old_data, data) do + Hooks.Relays.insert(data) + end + + defp process(:update, "relays", old_data, data) do + Hooks.Relays.update(old_data, data) + end + + defp process(:delete, "relays", old_data, _data) do + Hooks.Relays.delete(old_data) + end + + ######################## + # resource_connections # + ######################## + + defp process(:insert, "resource_connections", _old_data, data) do + Hooks.ResourceConnections.insert(data) + end + + defp process(:update, "resource_connections", old_data, data) do + Hooks.ResourceConnections.update(old_data, data) + end + + defp process(:delete, "resource_connections", old_data, _data) do + Hooks.ResourceConnections.delete(old_data) + end + + ############# + # resources # + ############# + + defp process(:insert, "resources", _old_data, data) do + Hooks.Resources.insert(data) + end + + defp process(:update, "resources", old_data, data) do + Hooks.Resources.update(old_data, data) + end + + defp process(:delete, "resources", old_data, _data) do + Hooks.Resources.delete(old_data) + end + + ########## + # tokens # + ########## + + defp process(:insert, "tokens", _old_data, data) do + Hooks.Tokens.insert(data) + end + + defp process(:update, "tokens", old_data, data) do + Hooks.Tokens.update(old_data, data) + end + + defp process(:delete, "tokens", old_data, _data) do + Hooks.Tokens.delete(old_data) + end + + ############# + # CATCH-ALL # + ############# + + defp process(op, table, _old_data, _data) do + Logger.warning("Unhandled event type!", op: op, table: table) + + :ok + end + + defp extract_msg_data(%Decoder.Messages.Insert{tuple_data: data}) do + {:insert, nil, data} + end + + defp extract_msg_data(%Decoder.Messages.Update{old_tuple_data: data, tuple_data: old}) do + {:update, old, data} + end + + defp extract_msg_data(%Decoder.Messages.Delete{old_tuple_data: old}) do + {:delete, old, nil} + end + + defp zip(nil, _), do: nil + + defp zip(tuple_data, columns) do + tuple_data + |> Tuple.to_list() + |> Enum.zip(columns) + |> Map.new(fn {value, column} -> {column.name, value} end) + |> Enum.into(%{}) + end +end diff --git a/elixir/apps/domain/lib/domain/events/hooks/accounts.ex b/elixir/apps/domain/lib/domain/events/hooks/accounts.ex new file mode 100644 index 000000000..928e786dc --- /dev/null +++ b/elixir/apps/domain/lib/domain/events/hooks/accounts.ex @@ -0,0 +1,13 @@ +defmodule Domain.Events.Hooks.Accounts do + def insert(_data) do + :ok + end + + def update(_old_data, _data) do + :ok + end + + def delete(_old_data) do + :ok + end +end diff --git a/elixir/apps/domain/lib/domain/events/hooks/actor_group_memberships.ex b/elixir/apps/domain/lib/domain/events/hooks/actor_group_memberships.ex new file mode 100644 index 000000000..c17a49bc4 --- /dev/null +++ b/elixir/apps/domain/lib/domain/events/hooks/actor_group_memberships.ex @@ -0,0 +1,13 @@ +defmodule Domain.Events.Hooks.ActorGroupMemberships do + def insert(_data) do + :ok + end + + def update(_old_data, _data) do + :ok + end + + def delete(_old_data) do + :ok + end +end diff --git a/elixir/apps/domain/lib/domain/events/hooks/actor_groups.ex b/elixir/apps/domain/lib/domain/events/hooks/actor_groups.ex new file mode 100644 index 000000000..55f89e906 --- /dev/null +++ b/elixir/apps/domain/lib/domain/events/hooks/actor_groups.ex @@ -0,0 +1,13 @@ +defmodule Domain.Events.Hooks.ActorGroups do + def insert(_data) do + :ok + end + + def update(_old_data, _data) do + :ok + end + + def delete(_old_data) do + :ok + end +end diff --git a/elixir/apps/domain/lib/domain/events/hooks/actors.ex b/elixir/apps/domain/lib/domain/events/hooks/actors.ex new file mode 100644 index 000000000..382472390 --- /dev/null +++ b/elixir/apps/domain/lib/domain/events/hooks/actors.ex @@ -0,0 +1,13 @@ +defmodule Domain.Events.Hooks.Actors do + def insert(_data) do + :ok + end + + def update(_old_data, _data) do + :ok + end + + def delete(_old_data) do + :ok + end +end diff --git a/elixir/apps/domain/lib/domain/events/hooks/auth_identities.ex b/elixir/apps/domain/lib/domain/events/hooks/auth_identities.ex new file mode 100644 index 000000000..75b222291 --- /dev/null +++ b/elixir/apps/domain/lib/domain/events/hooks/auth_identities.ex @@ -0,0 +1,13 @@ +defmodule Domain.Events.Hooks.AuthIdentities do + def insert(_data) do + :ok + end + + def update(_old_data, _data) do + :ok + end + + def delete(_old_data) do + :ok + end +end diff --git a/elixir/apps/domain/lib/domain/events/hooks/auth_providers.ex b/elixir/apps/domain/lib/domain/events/hooks/auth_providers.ex new file mode 100644 index 000000000..20c2dddf8 --- /dev/null +++ b/elixir/apps/domain/lib/domain/events/hooks/auth_providers.ex @@ -0,0 +1,13 @@ +defmodule Domain.Events.Hooks.AuthProviders do + def insert(_data) do + :ok + end + + def update(_old_data, _data) do + :ok + end + + def delete(_old_data) do + :ok + end +end diff --git a/elixir/apps/domain/lib/domain/events/hooks/clients.ex b/elixir/apps/domain/lib/domain/events/hooks/clients.ex new file mode 100644 index 000000000..6d4ba02cd --- /dev/null +++ b/elixir/apps/domain/lib/domain/events/hooks/clients.ex @@ -0,0 +1,13 @@ +defmodule Domain.Events.Hooks.Clients do + def insert(_data) do + :ok + end + + def update(_old_data, _data) do + :ok + end + + def delete(_old_data) do + :ok + end +end diff --git a/elixir/apps/domain/lib/domain/events/hooks/flow_activities.ex b/elixir/apps/domain/lib/domain/events/hooks/flow_activities.ex new file mode 100644 index 000000000..6d6a1d1ac --- /dev/null +++ b/elixir/apps/domain/lib/domain/events/hooks/flow_activities.ex @@ -0,0 +1,13 @@ +defmodule Domain.Events.Hooks.FlowActivities do + def insert(_data) do + :ok + end + + def update(_old_data, _data) do + :ok + end + + def delete(_old_data) do + :ok + end +end diff --git a/elixir/apps/domain/lib/domain/events/hooks/flows.ex b/elixir/apps/domain/lib/domain/events/hooks/flows.ex new file mode 100644 index 000000000..3cf6162ae --- /dev/null +++ b/elixir/apps/domain/lib/domain/events/hooks/flows.ex @@ -0,0 +1,13 @@ +defmodule Domain.Events.Hooks.Flows do + def insert(_data) do + :ok + end + + def update(_old_data, _data) do + :ok + end + + def delete(_old_data) do + :ok + end +end diff --git a/elixir/apps/domain/lib/domain/events/hooks/gateway_groups.ex b/elixir/apps/domain/lib/domain/events/hooks/gateway_groups.ex new file mode 100644 index 000000000..663bf79bf --- /dev/null +++ b/elixir/apps/domain/lib/domain/events/hooks/gateway_groups.ex @@ -0,0 +1,13 @@ +defmodule Domain.Events.Hooks.GatewayGroups do + def insert(_data) do + :ok + end + + def update(_old_data, _data) do + :ok + end + + def delete(_old_data) do + :ok + end +end diff --git a/elixir/apps/domain/lib/domain/events/hooks/gateways.ex b/elixir/apps/domain/lib/domain/events/hooks/gateways.ex new file mode 100644 index 000000000..59c6e7bda --- /dev/null +++ b/elixir/apps/domain/lib/domain/events/hooks/gateways.ex @@ -0,0 +1,13 @@ +defmodule Domain.Events.Hooks.Gateways do + def insert(_data) do + :ok + end + + def update(_old_data, _data) do + :ok + end + + def delete(_old_data) do + :ok + end +end diff --git a/elixir/apps/domain/lib/domain/events/hooks/policies.ex b/elixir/apps/domain/lib/domain/events/hooks/policies.ex new file mode 100644 index 000000000..0ad2320db --- /dev/null +++ b/elixir/apps/domain/lib/domain/events/hooks/policies.ex @@ -0,0 +1,13 @@ +defmodule Domain.Events.Hooks.Policies do + def insert(_data) do + :ok + end + + def update(_old_data, _data) do + :ok + end + + def delete(_old_data) do + :ok + end +end diff --git a/elixir/apps/domain/lib/domain/events/hooks/relay_groups.ex b/elixir/apps/domain/lib/domain/events/hooks/relay_groups.ex new file mode 100644 index 000000000..b78aeaa75 --- /dev/null +++ b/elixir/apps/domain/lib/domain/events/hooks/relay_groups.ex @@ -0,0 +1,13 @@ +defmodule Domain.Events.Hooks.RelayGroups do + def insert(_data) do + :ok + end + + def update(_old_data, _data) do + :ok + end + + def delete(_old_data) do + :ok + end +end diff --git a/elixir/apps/domain/lib/domain/events/hooks/relays.ex b/elixir/apps/domain/lib/domain/events/hooks/relays.ex new file mode 100644 index 000000000..aeb0fb80a --- /dev/null +++ b/elixir/apps/domain/lib/domain/events/hooks/relays.ex @@ -0,0 +1,13 @@ +defmodule Domain.Events.Hooks.Relays do + def insert(_data) do + :ok + end + + def update(_old_data, _data) do + :ok + end + + def delete(_old_data) do + :ok + end +end diff --git a/elixir/apps/domain/lib/domain/events/hooks/resource_connections.ex b/elixir/apps/domain/lib/domain/events/hooks/resource_connections.ex new file mode 100644 index 000000000..30aeca7dd --- /dev/null +++ b/elixir/apps/domain/lib/domain/events/hooks/resource_connections.ex @@ -0,0 +1,13 @@ +defmodule Domain.Events.Hooks.ResourceConnections do + def insert(_data) do + :ok + end + + def update(_old_data, _data) do + :ok + end + + def delete(_old_data) do + :ok + end +end diff --git a/elixir/apps/domain/lib/domain/events/hooks/resources.ex b/elixir/apps/domain/lib/domain/events/hooks/resources.ex new file mode 100644 index 000000000..a5bbe07a4 --- /dev/null +++ b/elixir/apps/domain/lib/domain/events/hooks/resources.ex @@ -0,0 +1,13 @@ +defmodule Domain.Events.Hooks.Resources do + def insert(_data) do + :ok + end + + def update(_old_data, _data) do + :ok + end + + def delete(_old_data) do + :ok + end +end diff --git a/elixir/apps/domain/lib/domain/events/hooks/tokens.ex b/elixir/apps/domain/lib/domain/events/hooks/tokens.ex new file mode 100644 index 000000000..480bc6189 --- /dev/null +++ b/elixir/apps/domain/lib/domain/events/hooks/tokens.ex @@ -0,0 +1,13 @@ +defmodule Domain.Events.Hooks.Tokens do + def insert(_data) do + :ok + end + + def update(_old_data, _data) do + :ok + end + + def delete(_old_data) do + :ok + end +end diff --git a/elixir/apps/domain/lib/domain/events/oid_database.ex b/elixir/apps/domain/lib/domain/events/oid_database.ex new file mode 100644 index 000000000..c1626385f --- /dev/null +++ b/elixir/apps/domain/lib/domain/events/oid_database.ex @@ -0,0 +1,163 @@ +# CREDIT: https://github.com/supabase/realtime/blob/main/lib/realtime/adapters/postgres/oid_database.ex +defmodule Domain.Events.OidDatabase do + @moduledoc "This module maps a numeric PostgreSQL type ID to a descriptive string." + + @doc """ + Maps a numeric PostgreSQL type ID to a descriptive string. + + ## Examples + + iex> name_for_type_id(1700) + "numeric" + + iex> name_for_type_id(25) + "text" + + iex> name_for_type_id(3802) + "jsonb" + + """ + # credo:disable-for-next-line Credo.Check.Refactor.CyclomaticComplexity + def name_for_type_id(type_id) do + case type_id do + 16 -> "bool" + 17 -> "bytea" + 18 -> "char" + 19 -> "name" + 20 -> "int8" + 21 -> "int2" + 22 -> "int2vector" + 23 -> "int4" + 24 -> "regproc" + 25 -> "text" + 26 -> "oid" + 27 -> "tid" + 28 -> "xid" + 29 -> "cid" + 30 -> "oidvector" + 114 -> "json" + 142 -> "xml" + 143 -> "_xml" + 194 -> "pg_node_tree" + 199 -> "_json" + 210 -> "smgr" + 600 -> "point" + 601 -> "lseg" + 602 -> "path" + 603 -> "box" + 604 -> "polygon" + 628 -> "line" + 629 -> "_line" + 650 -> "cidr" + 651 -> "_cidr" + 700 -> "float4" + 701 -> "float8" + 702 -> "abstime" + 703 -> "reltime" + 704 -> "tinterval" + 718 -> "circle" + 719 -> "_circle" + 774 -> "macaddr8" + 775 -> "_macaddr8" + 790 -> "money" + 791 -> "_money" + 829 -> "macaddr" + 869 -> "inet" + 1000 -> "_bool" + 1001 -> "_bytea" + 1002 -> "_char" + 1003 -> "_name" + 1005 -> "_int2" + 1006 -> "_int2vector" + 1007 -> "_int4" + 1008 -> "_regproc" + 1009 -> "_text" + 1010 -> "_tid" + 1011 -> "_xid" + 1012 -> "_cid" + 1013 -> "_oidvector" + 1014 -> "_bpchar" + 1015 -> "_varchar" + 1016 -> "_int8" + 1017 -> "_point" + 1018 -> "_lseg" + 1019 -> "_path" + 1020 -> "_box" + 1021 -> "_float4" + 1022 -> "_float8" + 1023 -> "_abstime" + 1024 -> "_reltime" + 1025 -> "_tinterval" + 1027 -> "_polygon" + 1028 -> "_oid" + 1033 -> "aclitem" + 1034 -> "_aclitem" + 1040 -> "_macaddr" + 1041 -> "_inet" + 1042 -> "bpchar" + 1043 -> "varchar" + 1082 -> "date" + 1083 -> "time" + 1114 -> "timestamp" + 1115 -> "_timestamp" + 1182 -> "_date" + 1183 -> "_time" + 1184 -> "timestamptz" + 1185 -> "_timestamptz" + 1186 -> "interval" + 1187 -> "_interval" + 1231 -> "_numeric" + 1263 -> "_cstring" + 1266 -> "timetz" + 1270 -> "_timetz" + 1560 -> "bit" + 1561 -> "_bit" + 1562 -> "varbit" + 1563 -> "_varbit" + 1700 -> "numeric" + 1790 -> "refcursor" + 2201 -> "_refcursor" + 2202 -> "regprocedure" + 2203 -> "regoper" + 2204 -> "regoperator" + 2205 -> "regclass" + 2206 -> "regtype" + 2207 -> "_regprocedure" + 2208 -> "_regoper" + 2209 -> "_regoperator" + 2210 -> "_regclass" + 2211 -> "_regtype" + 2949 -> "_txid_snapshot" + 2950 -> "uuid" + 2951 -> "_uuid" + 2970 -> "txid_snapshot" + 3220 -> "pg_lsn" + 3221 -> "_pg_lsn" + 3361 -> "pg_ndistinct" + 3402 -> "pg_dependencies" + 3614 -> "tsvector" + 3615 -> "tsquery" + 3642 -> "gtsvector" + 3643 -> "_tsvector" + 3644 -> "_gtsvector" + 3645 -> "_tsquery" + 3734 -> "regconfig" + 3735 -> "_regconfig" + 3769 -> "regdictionary" + 3770 -> "_regdictionary" + 3802 -> "jsonb" + 3807 -> "_jsonb" + 3905 -> "_int4range" + 3907 -> "_numrange" + 3909 -> "_tsrange" + 3911 -> "_tstzrange" + 3913 -> "_daterange" + 3927 -> "_int8range" + 4089 -> "regnamespace" + 4090 -> "_regnamespace" + 4096 -> "regrole" + 4097 -> "_regrole" + _ -> type_id + end + end +end diff --git a/elixir/apps/domain/lib/domain/events/protocol.ex b/elixir/apps/domain/lib/domain/events/protocol.ex new file mode 100644 index 000000000..6b3dd68b6 --- /dev/null +++ b/elixir/apps/domain/lib/domain/events/protocol.ex @@ -0,0 +1,67 @@ +# CREDIT: https://github.com/supabase/realtime/blob/main/lib/realtime/adapters/postgres/protocol.ex +defmodule Domain.Events.Protocol do + @moduledoc """ + This module is responsible for parsing the Postgres WAL messages. + """ + alias Domain.Events.Protocol.Write + alias Domain.Events.Protocol.KeepAlive + + defguard is_write(value) when binary_part(value, 0, 1) == <> + defguard is_keep_alive(value) when binary_part(value, 0, 1) == <> + + def parse( + <> + ) do + %Write{ + server_wal_start: server_wal_start, + server_wal_end: server_wal_end, + server_system_clock: server_system_clock, + message: message + } + end + + def parse(<>) do + reply = + case reply do + 0 -> :later + 1 -> :now + end + + %KeepAlive{wal_end: wal_end, clock: clock, reply: reply} + end + + @doc """ + Message to send to the server to request a standby status update. + + Check https://www.postgresql.org/docs/current/protocol-replication.html#PROTOCOL-REPLICATION-STANDBY-STATUS-UPDATE for more information + """ + @spec standby_status(integer(), integer(), integer(), :now | :later, integer() | nil) :: [ + binary() + ] + def standby_status(last_wal_received, last_wal_flushed, last_wal_applied, reply, clock \\ nil) + + def standby_status(last_wal_received, last_wal_flushed, last_wal_applied, reply, nil) do + standby_status(last_wal_received, last_wal_flushed, last_wal_applied, reply, current_time()) + end + + def standby_status(last_wal_received, last_wal_flushed, last_wal_applied, reply, clock) do + reply = + case reply do + :now -> 1 + :later -> 0 + end + + [ + <> + ] + end + + @doc """ + Message to send the server to not do any operation since the server can wait + """ + def hold, do: [] + + @epoch DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond) + def current_time, do: System.os_time(:microsecond) - @epoch +end diff --git a/elixir/apps/domain/lib/domain/events/protocol/keep_alive.ex b/elixir/apps/domain/lib/domain/events/protocol/keep_alive.ex new file mode 100644 index 000000000..21e2cf978 --- /dev/null +++ b/elixir/apps/domain/lib/domain/events/protocol/keep_alive.ex @@ -0,0 +1,24 @@ +defmodule Domain.Events.Protocol.KeepAlive do + @moduledoc """ + Primary keepalive message (B) + Byte1('k') + Identifies the message as a sender keepalive. + + Int64 + The current end of WAL on the server. + + Int64 + The server's system clock at the time of transmission, as microseconds since midnight on 2000-01-01. + + Byte1 + 1 means that the client should reply to this message as soon as possible, to avoid a timeout disconnect. 0 otherwise. + + The receiving process can send replies back to the sender at any time, using one of the following message formats (also in the payload of a CopyData message): + """ + @type t :: %__MODULE__{ + wal_end: integer(), + clock: integer(), + reply: :now | :await + } + defstruct [:wal_end, :clock, :reply] +end diff --git a/elixir/apps/domain/lib/domain/events/protocol/write.ex b/elixir/apps/domain/lib/domain/events/protocol/write.ex new file mode 100644 index 000000000..9af5f146b --- /dev/null +++ b/elixir/apps/domain/lib/domain/events/protocol/write.ex @@ -0,0 +1,22 @@ +defmodule Domain.Events.Protocol.Write do + @moduledoc """ + XLogData (B) + Byte1('w') + Identifies the message as WAL data. + + Int64 + The starting point of the WAL data in this message. + + Int64 + The current end of WAL on the server. + + Int64 + The server's system clock at the time of transmission, as microseconds since midnight on 2000-01-01. + + Byten + A section of the WAL data stream. + + A single WAL record is never split across two XLogData messages. When a WAL record crosses a WAL page boundary, and is therefore already split using continuation records, it can be split at the page boundary. In other words, the first main WAL record and its continuation records can be sent in different XLogData messages. + """ + defstruct [:server_wal_start, :server_wal_end, :server_system_clock, :message] +end diff --git a/elixir/apps/domain/lib/domain/events/replication_connection.ex b/elixir/apps/domain/lib/domain/events/replication_connection.ex new file mode 100644 index 000000000..4e180c89e --- /dev/null +++ b/elixir/apps/domain/lib/domain/events/replication_connection.ex @@ -0,0 +1,312 @@ +# CREDIT: https://github.com/supabase/realtime/blob/main/lib/realtime/tenants/replication_connection.ex +defmodule Domain.Events.ReplicationConnection do + @moduledoc """ + Receives WAL events from PostgreSQL and broadcasts them where they need to go. + + Generally, we only want to start one of these connections per cluster in order + to obtain a serial stream of the WAL. We can then fanout these events to the + appropriate consumers. + + The ReplicationConnection is started with a durable slot so that whatever data we + fail to acknowledge is retained in the slot on the server's disk. The server will + then send us the data when we reconnect. This is important because we want to + ensure that we don't lose any WAL data if we disconnect or crash, such as during a deploy. + + The WAL data we receive is sent only once a COMMIT completes on the server. So even though + COMMIT is one of the message types we receive here, we can safely ignore it and process + insert/update/delete messages one-by-one in this module as we receive them. + """ + use Postgrex.ReplicationConnection + require Logger + + import Domain.Events.Protocol + import Domain.Events.Decoder + + alias Domain.Events.Event + alias Domain.Events.Decoder + alias Domain.Events.Protocol.{KeepAlive, Write} + + @type t :: %__MODULE__{ + schema: String.t(), + connection_opts: Keyword.t(), + step: + :disconnected + | :check_publication + | :create_publication + | :check_replication_slot + | :create_slot + | :start_replication_slot + | :streaming, + publication_name: String.t(), + replication_slot_name: String.t(), + output_plugin: String.t(), + proto_version: integer(), + table_subscriptions: list(), + relations: map() + } + defstruct schema: "public", + connection_opts: [], + step: :disconnected, + publication_name: "events", + replication_slot_name: "events_slot", + output_plugin: "pgoutput", + proto_version: 1, + table_subscriptions: [], + relations: %{} + + def start_link(%__MODULE__{} = instance) do + # Start only one ReplicationConnection in the cluster. + opts = instance.connection_opts ++ [name: {:global, __MODULE__}] + + case(Postgrex.ReplicationConnection.start_link(__MODULE__, instance, opts)) do + {:ok, pid} -> + {:ok, pid} + + {:error, {:already_started, pid}} -> + {:ok, pid} + + error -> + Logger.error("Failed to start replication connection!", + error: inspect(error) + ) + + error + end + end + + @impl true + def init(state) do + {:ok, state} + end + + @doc """ + Called when we make a successful connection to the PostgreSQL server. + """ + + @impl true + def handle_connect(state) do + query = "SELECT 1 FROM pg_publication WHERE pubname = '#{state.publication_name}'" + {:query, query, %{state | step: :create_publication}} + end + + @doc """ + Generic callback that handles replies to the queries we send. + + We use a simple state machine to issue queries one at a time to Postgres in order to: + + 1. Check if the publication exists + 2. Check if the replication slot exists + 3. Create the publication if it doesn't exist + 4. Create the replication slot if it doesn't exist + 5. Start the replication slot + 6. Start streaming data from the replication slot + """ + + @impl true + + def handle_result( + [%Postgrex.Result{num_rows: 1}], + %__MODULE__{step: :create_publication} = state + ) do + query = + "SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'" + + {:query, query, %{state | step: :create_replication_slot}} + end + + def handle_result( + [%Postgrex.Result{num_rows: 1}], + %__MODULE__{step: :create_replication_slot} = state + ) do + {:query, "SELECT 1", %{state | step: :start_replication_slot}} + end + + def handle_result([%Postgrex.Result{}], %__MODULE__{step: :start_replication_slot} = state) do + query = + "START_REPLICATION SLOT \"#{state.replication_slot_name}\" LOGICAL 0/0 (proto_version '#{state.proto_version}', publication_names '#{state.publication_name}')" + + {:stream, query, [], %{state | step: :streaming}} + end + + def handle_result( + [%Postgrex.Result{num_rows: 0}], + %__MODULE__{step: :create_publication} = state + ) do + tables = + state.table_subscriptions + |> Enum.map_join(",", fn table -> "#{state.schema}.#{table}" end) + + query = "CREATE PUBLICATION #{state.publication_name} FOR TABLE #{tables}" + {:query, query, %{state | step: :check_replication_slot}} + end + + def handle_result([%Postgrex.Result{}], %__MODULE__{step: :check_replication_slot} = state) do + query = + "SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'" + + {:query, query, %{state | step: :create_replication_slot}} + end + + def handle_result( + [%Postgrex.Result{num_rows: 0}], + %__MODULE__{step: :create_replication_slot} = state + ) do + query = + "CREATE_REPLICATION_SLOT #{state.replication_slot_name} LOGICAL #{state.output_plugin} NOEXPORT_SNAPSHOT" + + {:query, query, %{state | step: :start_replication_slot}} + end + + @doc """ + Called when we receive a message from the PostgreSQL server. + + We handle the following messages: + 1. KeepAlive: A message sent by PostgreSQL to keep the connection alive and also acknowledge + processed data by responding with the current WAL position. + 2. Write: A message containing a WAL event - the actual data we are interested in. + 3. Unknown: Any other message that we don't know how to handle - we log and ignore it. + + For the KeepAlive message, we respond immediately with the current WAL position. Note: it is expected + that we receive many more of these messages than expected. That is because the rate at which the server + sends these messages scales proportionally to the number of Write messages it sends. + + For the Write message, we send broadcast for each message one-by-one as we receive it. This is important + because the WAL stream from Postgres is ordered; if we reply to a Keepalive advancing the WAL position, + we should have already processed all the messages up to that point. + """ + + @impl true + + def handle_data(data, state) when is_keep_alive(data) do + %KeepAlive{reply: reply, wal_end: wal_end} = parse(data) + + wal_end = wal_end + 1 + + message = + case reply do + :now -> standby_status(wal_end, wal_end, wal_end, reply) + :later -> hold() + end + + {:noreply, message, state} + end + + def handle_data(data, state) when is_write(data) do + %Write{message: message} = parse(data) + + # TODO: Telemetry: Mark start + message + |> decode_message() + |> handle_message(state) + end + + def handle_data(data, state) do + Logger.error("Unknown WAL message received!", + data: inspect(data), + state: inspect(state) + ) + + {:noreply, [], state} + end + + # Handles messages received: + # + # 1. Insert/Update/Delete - send to Event.ingest/2 for further processing + # 2. Relation messages - store the relation data in our state so we can use it later + # to associate column names etc with the data we receive. In practice, we'll always + # see a Relation message before we see any data for that relation. + # 3. Begin/Commit/Origin/Truncate/Type - we ignore these messages for now + # 4. Graceful shutdown - we respond with {:disconnect, :normal} to + # indicate that we are shutting down gracefully and prevent auto reconnecting. + defp handle_message( + %Decoder.Messages.Relation{ + id: id, + namespace: namespace, + name: name, + columns: columns + }, + state + ) do + relation = %{ + namespace: namespace, + name: name, + columns: columns + } + + {:noreply, [], %{state | relations: Map.put(state.relations, id, relation)}} + end + + defp handle_message(%Decoder.Messages.Insert{} = msg, state) do + :ok = Event.ingest(msg, state.relations) + {:noreply, [], state} + end + + defp handle_message(%Decoder.Messages.Update{} = msg, state) do + :ok = Event.ingest(msg, state.relations) + {:noreply, [], state} + end + + defp handle_message(%Decoder.Messages.Delete{} = msg, state) do + :ok = Event.ingest(msg, state.relations) + {:noreply, [], state} + end + + defp handle_message(%Decoder.Messages.Begin{}, state) do + {:noreply, [], state} + end + + defp handle_message(%Decoder.Messages.Commit{}, state) do + {:noreply, [], state} + end + + defp handle_message(%Decoder.Messages.Origin{}, state) do + {:noreply, [], state} + end + + defp handle_message(%Decoder.Messages.Truncate{}, state) do + {:noreply, [], state} + end + + defp handle_message(%Decoder.Messages.Type{}, state) do + {:noreply, [], state} + end + + defp handle_message(%Decoder.Messages.Unsupported{data: data}, state) do + Logger.warning("Unsupported message received", + data: inspect(data), + state: inspect(state) + ) + + {:noreply, [], state} + end + + @impl true + + def handle_info(:shutdown, _), do: {:disconnect, :normal} + def handle_info({:DOWN, _, :process, _, _}, _), do: {:disconnect, :normal} + + def handle_info(_, state), do: {:noreply, state} + + @doc """ + Called when the connection is disconnected unexpectedly. + + We log the error and set the state to :disconnected, which will cause the + ReplicationConnection to attempt to reconnect when auto_reconnect is enabled. + + This will happen if: + 1. Postgres is restarted such as during a maintenance window + 2. The connection is closed by the server due to our failure to acknowledge + Keepalive messages in a timely manner + 3. The connection is cut due to a network error + 4. The ReplicationConnection process crashes or is killed abruptly for any reason + """ + + @impl true + def handle_disconnect(state) do + Logger.warning("Replication connection disconnected", + state: inspect(state) + ) + + {:noreply, %{state | step: :disconnected}} + end +end diff --git a/elixir/apps/domain/lib/domain/flows.ex b/elixir/apps/domain/lib/domain/flows.ex index 09800a94a..d944cb3b7 100644 --- a/elixir/apps/domain/lib/domain/flows.ex +++ b/elixir/apps/domain/lib/domain/flows.ex @@ -242,6 +242,7 @@ defmodule Domain.Flows do |> Flow.Query.expire() |> Repo.update_all([]) + # TODO: WAL :ok = Enum.each(flows, fn flow -> :ok = broadcast_flow_expiration_event(flow) @@ -263,6 +264,7 @@ defmodule Domain.Flows do flow_or_id |> flow_topic() |> PubSub.subscribe() end + # TODO: WAL defp broadcast_flow_expiration_event(flow) do flow |> flow_topic() diff --git a/elixir/apps/domain/lib/domain/gateways.ex b/elixir/apps/domain/lib/domain/gateways.ex index ad9df9ad7..25220c9b3 100644 --- a/elixir/apps/domain/lib/domain/gateways.ex +++ b/elixir/apps/domain/lib/domain/gateways.ex @@ -120,6 +120,7 @@ defmodule Domain.Gateways do ) |> case do {:ok, group} -> + # TODO: WAL :ok = broadcast_to_group(group, :updated) {:ok, group} @@ -512,6 +513,7 @@ defmodule Domain.Gateways do |> PubSub.unsubscribe() end + # TODO: WAL def broadcast_to_group(group_or_id, payload) do group_or_id |> group_topic() diff --git a/elixir/apps/domain/lib/domain/policies.ex b/elixir/apps/domain/lib/domain/policies.ex index 2dce8304b..852cfd26d 100644 --- a/elixir/apps/domain/lib/domain/policies.ex +++ b/elixir/apps/domain/lib/domain/policies.ex @@ -68,6 +68,7 @@ defmodule Domain.Policies do with :ok <- Auth.ensure_has_permissions(subject, required_permissions) do Policy.Changeset.create(attrs, subject) |> Repo.insert() + # TODO: WAL |> case do {:ok, policy} -> :ok = broadcast_policy_events(:create, policy) @@ -92,6 +93,7 @@ defmodule Domain.Policies do |> Authorizer.for_subject(subject) |> Repo.fetch_and_update_breakable(Policy.Query, with: &Policy.Changeset.update(&1, attrs), + # TODO: WAL after_update_commit: &broadcast_policy_events(:update, &1), after_breaking_update_commit: fn updated_policy, _changeset -> {:ok, _flows} = Flows.expire_flows_for(policy, subject) @@ -109,6 +111,7 @@ defmodule Domain.Policies do |> Authorizer.for_subject(subject) |> Repo.fetch_and_update(Policy.Query, with: &Policy.Changeset.disable(&1, subject), + # TODO: WAL after_commit: &broadcast_policy_events(:disable, &1) ) |> case do @@ -129,6 +132,7 @@ defmodule Domain.Policies do |> Authorizer.for_subject(subject) |> Repo.fetch_and_update(Policy.Query, with: &Policy.Changeset.enable/1, + # TODO: WAL after_commit: &broadcast_policy_events(:enable, &1) ) end @@ -187,6 +191,7 @@ defmodule Domain.Policies do |> Policy.Query.delete() |> Repo.update_all([]) + # TODO: WAL :ok = Enum.each(policies, fn policy -> :ok = broadcast_policy_events(:delete, policy) @@ -281,6 +286,7 @@ defmodule Domain.Policies do actor_group_or_id |> actor_group_topic() |> PubSub.unsubscribe() end + # TODO: WAL defp broadcast_policy_events(action, %Policy{} = policy) do payload = {:"#{action}_policy", policy.id} :ok = broadcast_to_policy(policy, payload) diff --git a/elixir/apps/domain/lib/domain/resources.ex b/elixir/apps/domain/lib/domain/resources.ex index ff34c4d58..05cc5cf9d 100644 --- a/elixir/apps/domain/lib/domain/resources.ex +++ b/elixir/apps/domain/lib/domain/resources.ex @@ -232,6 +232,7 @@ defmodule Domain.Resources do changeset = Resource.Changeset.create(subject.account, attrs, subject) with {:ok, resource} <- Repo.insert(changeset) do + # TODO: WAL :ok = broadcast_resource_events(:create, resource) {:ok, resource} end @@ -253,6 +254,7 @@ defmodule Domain.Resources do changeset = Resource.Changeset.create(account, attrs) with {:ok, resource} <- Repo.insert(changeset) do + # TODO: WAL :ok = broadcast_resource_events(:create, resource) {:ok, resource} end @@ -280,6 +282,7 @@ defmodule Domain.Resources do {:ok, _flows} = Flows.expire_flows_for(resource, subject) end + # TODO: WAL broadcast_resource_events(:update, resource) end, after_breaking_update_commit: fn updated_resource, _changeset -> @@ -287,6 +290,7 @@ defmodule Domain.Resources do # This is used to reset the resource on the client and gateway in case filters, conditions, etc are changed. {:ok, _flows} = Flows.expire_flows_for(resource, subject) + # TODO: WAL :ok = broadcast_resource_events(:delete, resource) :ok = broadcast_resource_events(:create, updated_resource) end @@ -314,6 +318,7 @@ defmodule Domain.Resources do ) |> case do {:ok, resource} -> + # TODO: WAL :ok = broadcast_resource_events(:delete, resource) {:ok, _policies} = Policies.delete_policies_for(resource, subject) {:ok, resource} @@ -374,6 +379,7 @@ defmodule Domain.Resources do account_or_id |> account_topic() |> PubSub.subscribe() end + # TODO: WAL defp broadcast_resource_events(action, %Resource{} = resource) do payload = {:"#{action}_resource", resource.id} :ok = broadcast_to_resource(resource, payload) diff --git a/elixir/apps/domain/lib/domain/tokens.ex b/elixir/apps/domain/lib/domain/tokens.ex index 73fb3637e..894c7ee02 100644 --- a/elixir/apps/domain/lib/domain/tokens.ex +++ b/elixir/apps/domain/lib/domain/tokens.ex @@ -290,11 +290,13 @@ defmodule Domain.Tokens do |> Token.Query.delete() |> Repo.update_all([]) + # TODO: WAL :ok = Enum.each(tokens, &broadcast_disconnect_message/1) {:ok, tokens} end + # TODO: WAL defp broadcast_disconnect_message(%{type: :email}) do :ok end diff --git a/elixir/apps/domain/priv/repo/migrations/20250426183104_set_tables_to_replica_identity_full.exs b/elixir/apps/domain/priv/repo/migrations/20250426183104_set_tables_to_replica_identity_full.exs new file mode 100644 index 000000000..61b316a4c --- /dev/null +++ b/elixir/apps/domain/priv/repo/migrations/20250426183104_set_tables_to_replica_identity_full.exs @@ -0,0 +1,29 @@ +defmodule Domain.Repo.Migrations.SetTablesToReplicaIdentityFull do + use Ecto.Migration + + @relations ~w[ + accounts + actor_group_memberships + actor_groups + actors + auth_identities + auth_providers + clients + flow_activities + flows + gateway_groups + gateways + policies + relay_groups + relays + resource_connections + resources + tokens + ] + + def change do + for relation <- @relations do + execute("ALTER TABLE #{relation} REPLICA IDENTITY FULL") + end + end +end diff --git a/elixir/apps/domain/priv/repo/migrations/20250428102100_automatically_set_replica_identity_full.exs b/elixir/apps/domain/priv/repo/migrations/20250428102100_automatically_set_replica_identity_full.exs new file mode 100644 index 000000000..b3b8d91ef --- /dev/null +++ b/elixir/apps/domain/priv/repo/migrations/20250428102100_automatically_set_replica_identity_full.exs @@ -0,0 +1,39 @@ +defmodule Domain.Repo.Migrations.AutomaticallySetReplicaIdentityFull do + use Ecto.Migration + + # Creates a trigger that automatically sets the REPLICA IDENTITY to FULL for new tables. This is + # needed to ensure we can capture changes to a table in replication in order to reliably + # broadcast events. + def change do + execute( + """ + CREATE OR REPLACE FUNCTION set_replica_identity_full() + RETURNS EVENT_TRIGGER AS $$ + DECLARE + rec RECORD; + BEGIN + FOR rec IN SELECT * FROM pg_event_trigger_ddl_commands() WHERE command_tag = 'CREATE TABLE' + LOOP + EXECUTE format('ALTER TABLE %s REPLICA IDENTITY FULL', rec.object_identity); + END LOOP; + END; + $$ LANGUAGE plpgsql; + """, + """ + DROP FUNCTION IF EXISTS set_replica_identity_full(); + """ + ) + + execute( + """ + CREATE EVENT TRIGGER trigger_set_replica_identity + ON ddl_command_end + WHEN TAG IN ('CREATE TABLE') + EXECUTE FUNCTION set_replica_identity_full(); + """, + """ + DROP EVENT TRIGGER IF EXISTS trigger_set_replica_identity; + """ + ) + end +end diff --git a/elixir/apps/domain/test/domain/events/decoder_test.exs b/elixir/apps/domain/test/domain/events/decoder_test.exs new file mode 100644 index 000000000..085b0cb8f --- /dev/null +++ b/elixir/apps/domain/test/domain/events/decoder_test.exs @@ -0,0 +1,470 @@ +defmodule Domain.Events.DecoderTest do + use ExUnit.Case, async: true + + alias Domain.Events.Decoder + alias Domain.Events.Decoder.Messages + + @lsn_binary <<0::integer-32, 23_785_280::integer-32>> + @lsn_decoded {0, 23_785_280} + + @timestamp_int 704_521_200_000 + @timestamp_decoded ~U[2000-01-09 03:42:01.200000Z] + + @xid 1234 + @relation_id 16384 + + # Example OIDs for testing RELATION decoding + @oid_int4 23 + @oid_text 25 + @oid_numeric 1700 + @oid_unknown 9999 + + describe "decode_message/1" do + test "decodes BEGIN message" do + # Construct binary message: 'B', final_lsn, commit_timestamp, xid + message = <<"B", @lsn_binary::binary, @timestamp_int::integer-64, @xid::integer-32>> + + expected = %Messages.Begin{ + final_lsn: @lsn_decoded, + commit_timestamp: @timestamp_decoded, + xid: @xid + } + + assert Decoder.decode_message(message) == expected + end + + test "decodes COMMIT message" do + # Construct binary message: 'C', flags (ignored), lsn, end_lsn, commit_timestamp + # Flags are currently ignored, represented as [] + flags = <<0::integer-8>> + end_lsn_binary = <<0::integer-32, 23_785_300::integer-32>> + end_lsn_decoded = {0, 23_785_300} + + message = + <<"C", flags::binary-1, @lsn_binary::binary, end_lsn_binary::binary, + @timestamp_int::integer-64>> + + expected = %Messages.Commit{ + flags: [], + lsn: @lsn_decoded, + end_lsn: end_lsn_decoded, + commit_timestamp: @timestamp_decoded + } + + assert Decoder.decode_message(message) == expected + end + + test "decodes ORIGIN message" do + # Construct binary message: 'O', origin_commit_lsn, name (null-terminated) + origin_name = "origin_node_1\0" + message = <<"O", @lsn_binary::binary, origin_name::binary>> + + expected = %Messages.Origin{ + origin_commit_lsn: @lsn_decoded, + # The decoder currently includes the null terminator from the split + name: "origin_node_1\0" + } + + assert Decoder.decode_message(message) == expected + end + + test "decodes RELATION message with known types" do + # Construct binary message: 'R', id, namespace\0, name\0, replica_identity, num_columns, columns_data + namespace = "public\0" + name = "users\0" + # full + replica_identity = "f" + num_columns = 3 + # Column 1: flags=1 (key), name="id", type=23 (int4), modifier=-1 + # Column 2: flags=0, name="email", type=25 (text), modifier=-1 + # Column 3: flags=0, name="balance", type=1700 (numeric), modifier=131076 (e.g., NUMERIC(10,2)) + col1_flags = <<1::integer-8>> + col1_name = "id\0" + col1_type = <<@oid_int4::integer-32>> + col1_mod = <<-1::integer-32>> + col2_flags = <<0::integer-8>> + col2_name = "email\0" + col2_type = <<@oid_text::integer-32>> + col2_mod = <<-1::integer-32>> + col3_flags = <<0::integer-8>> + col3_name = "balance\0" + col3_type = <<@oid_numeric::integer-32>> + col3_mod = <<131_076::integer-32>> + + columns_binary = + <> + + message = + <<"R", @relation_id::integer-32, namespace::binary, name::binary, + replica_identity::binary-1, num_columns::integer-16, columns_binary::binary>> + + # Expect the string names returned by the actual OidDatabase.name_for_type_id + expected = %Messages.Relation{ + id: @relation_id, + namespace: "public", + name: "users", + # 'f' maps to :all_columns + replica_identity: :all_columns, + columns: [ + %Messages.Relation.Column{ + flags: [:key], + name: "id", + # OidDatabase.name_for_type_id(23) + type: "int4", + type_modifier: 4_294_967_295 + }, + %Messages.Relation.Column{ + flags: [], + name: "email", + # OidDatabase.name_for_type_id(25) + type: "text", + type_modifier: 4_294_967_295 + }, + %Messages.Relation.Column{ + flags: [], + name: "balance", + # OidDatabase.name_for_type_id(1700) + type: "numeric", + type_modifier: 131_076 + } + ] + } + + assert Decoder.decode_message(message) == expected + end + + test "decodes RELATION message with unknown type" do + # Construct binary message with an OID not listed in OidDatabase + namespace = "custom_schema\0" + name = "gadgets\0" + # index + replica_identity = "i" + num_columns = 1 + # Column 1: flags=0, name="widget_type", type=9999 (unknown), modifier=-1 + col1_flags = <<0::integer-8>> + col1_name = "widget_type\0" + col1_type = <<@oid_unknown::integer-32>> + col1_mod = <<-1::integer-32>> + + columns_binary = + <> + + message = + <<"R", @relation_id::integer-32, namespace::binary, name::binary, + replica_identity::binary-1, num_columns::integer-16, columns_binary::binary>> + + # Expect the raw OID itself, as per the fallback case in OidDatabase.name_for_type_id + expected = %Messages.Relation{ + id: @relation_id, + namespace: "custom_schema", + name: "gadgets", + # 'i' maps to :index + replica_identity: :index, + columns: [ + %Messages.Relation.Column{ + flags: [], + name: "widget_type", + # OidDatabase.name_for_type_id(9999) returns 9999 + type: @oid_unknown, + type_modifier: 4_294_967_295 + } + ] + } + + assert Decoder.decode_message(message) == expected + end + + test "decodes INSERT message" do + # Construct binary message: 'I', relation_id, 'N', num_columns, tuple_data + num_columns = 3 + # Tuple data: 't', len1, val1, 'n', 't', len2, val2 + val1 = "hello world" + len1 = byte_size(val1) + val2 = "test" + len2 = byte_size(val2) + + tuple_data_binary = + <<"t", len1::integer-32, val1::binary, "n", "t", len2::integer-32, val2::binary>> + + message = + <<"I", @relation_id::integer-32, "N", num_columns::integer-16, tuple_data_binary::binary>> + + expected = %Messages.Insert{ + relation_id: @relation_id, + tuple_data: {val1, nil, val2} + } + + assert Decoder.decode_message(message) == expected + end + + test "decodes INSERT message with unchanged toast" do + # Construct binary message: 'I', relation_id, 'N', num_columns, tuple_data + num_columns = 1 + # Tuple data: 'u' + tuple_data_binary = <<"u">> + + message = + <<"I", @relation_id::integer-32, "N", num_columns::integer-16, tuple_data_binary::binary>> + + expected = %Messages.Insert{ + relation_id: @relation_id, + tuple_data: {:unchanged_toast} + } + + assert Decoder.decode_message(message) == expected + end + + test "decodes UPDATE message (simple - New tuple only)" do + # Construct binary message: 'U', relation_id, 'N', num_columns, tuple_data + num_columns = 1 + val1 = "new value" + len1 = byte_size(val1) + tuple_data_binary = <<"t", len1::integer-32, val1::binary>> + + message = + <<"U", @relation_id::integer-32, "N", num_columns::integer-16, tuple_data_binary::binary>> + + expected = %Messages.Update{ + relation_id: @relation_id, + # Default value when not present + changed_key_tuple_data: nil, + # Default value when not present + old_tuple_data: nil, + tuple_data: {val1} + } + + assert Decoder.decode_message(message) == expected + end + + test "decodes UPDATE message (with Old tuple)" do + # Construct binary message: 'U', relation_id, 'O', num_old_cols, old_tuple_data, 'N', num_new_cols, new_tuple_data + num_old_cols = 2 + old_val1 = "old value" + old_len1 = byte_size(old_val1) + # Old Tuple: text, null + old_tuple_binary = <<"t", old_len1::integer-32, old_val1::binary, "n">> + + num_new_cols = 2 + new_val1 = "new value" + new_len1 = byte_size(new_val1) + new_val2 = "another new" + new_len2 = byte_size(new_val2) + # New Tuple: text, text + new_tuple_binary = + <<"t", new_len1::integer-32, new_val1::binary, "t", new_len2::integer-32, + new_val2::binary>> + + message = + <<"U", @relation_id::integer-32, "O", num_old_cols::integer-16, old_tuple_binary::binary, + "N", num_new_cols::integer-16, new_tuple_binary::binary>> + + expected = %Messages.Update{ + relation_id: @relation_id, + changed_key_tuple_data: nil, + old_tuple_data: {old_val1, nil}, + tuple_data: {new_val1, new_val2} + } + + assert Decoder.decode_message(message) == expected + end + + test "decodes UPDATE message (with Key tuple)" do + # Construct binary message: 'U', relation_id, 'K', num_key_cols, key_tuple_data, 'N', num_new_cols, new_tuple_data + num_key_cols = 1 + key_val = "key value" + key_len = byte_size(key_val) + key_tuple_binary = <<"t", key_len::integer-32, key_val::binary>> + + num_new_cols = 2 + new_val1 = "new value 1" + new_len1 = byte_size(new_val1) + # New Tuple: text, unchanged_toast + new_tuple_binary = <<"t", new_len1::integer-32, new_val1::binary, "u">> + + message = + <<"U", @relation_id::integer-32, "K", num_key_cols::integer-16, key_tuple_binary::binary, + "N", num_new_cols::integer-16, new_tuple_binary::binary>> + + expected = %Messages.Update{ + relation_id: @relation_id, + changed_key_tuple_data: {key_val}, + old_tuple_data: nil, + tuple_data: {new_val1, :unchanged_toast} + } + + assert Decoder.decode_message(message) == expected + end + + test "decodes DELETE message (with Old tuple)" do + # Construct binary message: 'D', relation_id, 'O', num_columns, tuple_data + num_columns = 2 + val1 = "deleted value" + len1 = byte_size(val1) + # Data: text value, null + tuple_data_binary = <<"t", len1::integer-32, val1::binary, "n">> + + message = + <<"D", @relation_id::integer-32, "O", num_columns::integer-16, tuple_data_binary::binary>> + + expected = %Messages.Delete{ + relation_id: @relation_id, + # Default value + changed_key_tuple_data: nil, + old_tuple_data: {val1, nil} + } + + assert Decoder.decode_message(message) == expected + end + + test "decodes DELETE message (with Key tuple)" do + # Construct binary message: 'D', relation_id, 'K', num_columns, tuple_data + num_columns = 1 + val1 = "key value" + len1 = byte_size(val1) + tuple_data_binary = <<"t", len1::integer-32, val1::binary>> + + message = + <<"D", @relation_id::integer-32, "K", num_columns::integer-16, tuple_data_binary::binary>> + + expected = %Messages.Delete{ + relation_id: @relation_id, + changed_key_tuple_data: {val1}, + # Default value + old_tuple_data: nil + } + + assert Decoder.decode_message(message) == expected + end + + test "decodes TRUNCATE message with no options" do + # Construct binary message: 'T', num_relations, options, relation_ids + num_relations = 2 + # No options + options = 0 + rel_id1 = <<16384::integer-32>> + rel_id2 = <<16385::integer-32>> + relation_ids_binary = <> + + message = + <<"T", num_relations::integer-32, options::integer-8, relation_ids_binary::binary>> + + expected = %Messages.Truncate{ + number_of_relations: num_relations, + # Empty list for 0 + options: [], + truncated_relations: [16384, 16385] + } + + assert Decoder.decode_message(message) == expected + end + + test "decodes TRUNCATE message with CASCADE option" do + # Construct binary message: 'T', num_relations, options, relation_ids + num_relations = 1 + # CASCADE + options = 1 + rel_id1 = <<16384::integer-32>> + relation_ids_binary = <> + + message = + <<"T", num_relations::integer-32, options::integer-8, relation_ids_binary::binary>> + + expected = %Messages.Truncate{ + number_of_relations: num_relations, + options: [:cascade], + truncated_relations: [16384] + } + + assert Decoder.decode_message(message) == expected + end + + test "decodes TRUNCATE message with RESTART IDENTITY option" do + # Construct binary message: 'T', num_relations, options, relation_ids + num_relations = 1 + # RESTART IDENTITY + options = 2 + rel_id1 = <<16384::integer-32>> + relation_ids_binary = <> + + message = + <<"T", num_relations::integer-32, options::integer-8, relation_ids_binary::binary>> + + expected = %Messages.Truncate{ + number_of_relations: num_relations, + options: [:restart_identity], + truncated_relations: [16384] + } + + assert Decoder.decode_message(message) == expected + end + + test "decodes TRUNCATE message with CASCADE and RESTART IDENTITY options" do + # Construct binary message: 'T', num_relations, options, relation_ids + num_relations = 3 + # CASCADE | RESTART IDENTITY + options = 3 + rel_id1 = <<100::integer-32>> + rel_id2 = <<200::integer-32>> + rel_id3 = <<300::integer-32>> + relation_ids_binary = <> + + message = + <<"T", num_relations::integer-32, options::integer-8, relation_ids_binary::binary>> + + expected = %Messages.Truncate{ + number_of_relations: num_relations, + options: [:cascade, :restart_identity], + truncated_relations: [100, 200, 300] + } + + assert Decoder.decode_message(message) == expected + end + + test "decodes TYPE message" do + # Construct binary message: 'Y', data_type_id, namespace\0, name\0 + # Example OID for varchar + type_id = 1043 + namespace = "pg_catalog\0" + name = "varchar\0" + + message = <<"Y", type_id::integer-32, namespace::binary, name::binary>> + + expected = %Messages.Type{ + id: type_id, + namespace: "pg_catalog", + name: "varchar" + } + + assert Decoder.decode_message(message) == expected + end + + test "handles unsupported message type" do + # Use an arbitrary starting byte not handled ('X') + message = <<"X", 1, 2, 3, 4>> + + expected = %Messages.Unsupported{ + data: message + } + + assert Decoder.decode_message(message) == expected + end + + test "handles empty binary message" do + message = <<>> + expected = %Messages.Unsupported{data: <<>>} + assert Decoder.decode_message(message) == expected + end + + test "handles message with only type byte" do + message = <<"B">> + + expected = %Messages.Unsupported{ + data: "B" + } + + assert Decoder.decode_message(message) == expected + end + end +end diff --git a/elixir/apps/domain/test/domain/events/event_test.exs b/elixir/apps/domain/test/domain/events/event_test.exs new file mode 100644 index 000000000..a83e7bfec --- /dev/null +++ b/elixir/apps/domain/test/domain/events/event_test.exs @@ -0,0 +1,47 @@ +defmodule Domain.Events.EventTest do + use ExUnit.Case, async: true + import Domain.Events.Event + alias Domain.Events.Decoder + + setup do + config = Application.fetch_env!(:domain, Domain.Events.ReplicationConnection) + table_subscriptions = config[:table_subscriptions] + + %{table_subscriptions: table_subscriptions} + end + + describe "ingest/2" do + test "returns :ok for insert on all configured table subscriptions", %{ + table_subscriptions: table_subscriptions + } do + for table <- table_subscriptions do + relations = %{"1" => %{name: table, columns: []}} + msg = %Decoder.Messages.Insert{tuple_data: {}, relation_id: "1"} + + assert :ok == ingest(msg, relations) + end + end + + test "returns :ok for update on all configured table subscriptions", %{ + table_subscriptions: table_subscriptions + } do + for table <- table_subscriptions do + relations = %{"1" => %{name: table, columns: []}} + msg = %Decoder.Messages.Update{old_tuple_data: {}, tuple_data: {}, relation_id: "1"} + + assert :ok == ingest(msg, relations) + end + end + + test "returns :ok for delete on all configured table subscriptions", %{ + table_subscriptions: table_subscriptions + } do + for table <- table_subscriptions do + relations = %{"1" => %{name: table, columns: []}} + msg = %Decoder.Messages.Delete{old_tuple_data: {}, relation_id: "1"} + + assert :ok == ingest(msg, relations) + end + end + end +end diff --git a/elixir/apps/domain/test/domain/events/hooks/accounts_test.exs b/elixir/apps/domain/test/domain/events/hooks/accounts_test.exs new file mode 100644 index 000000000..9aae1c071 --- /dev/null +++ b/elixir/apps/domain/test/domain/events/hooks/accounts_test.exs @@ -0,0 +1,26 @@ +defmodule Domain.Events.Hooks.AccountsTest do + use ExUnit.Case, async: true + import Domain.Events.Hooks.Accounts + + setup do + %{old_data: %{}, data: %{}} + end + + describe "insert/1" do + test "returns :ok", %{data: data} do + assert :ok == insert(data) + end + end + + describe "update/2" do + test "returns :ok", %{old_data: old_data, data: data} do + assert :ok == update(old_data, data) + end + end + + describe "delete/1" do + test "returns :ok", %{data: data} do + assert :ok == delete(data) + end + end +end diff --git a/elixir/apps/domain/test/domain/events/hooks/actor_group_memberships_test.exs b/elixir/apps/domain/test/domain/events/hooks/actor_group_memberships_test.exs new file mode 100644 index 000000000..24a7dff92 --- /dev/null +++ b/elixir/apps/domain/test/domain/events/hooks/actor_group_memberships_test.exs @@ -0,0 +1,26 @@ +defmodule Domain.Events.Hooks.ActorGroupMembershipsTest do + use ExUnit.Case, async: true + import Domain.Events.Hooks.ActorGroupMemberships + + setup do + %{old_data: %{}, data: %{}} + end + + describe "insert/1" do + test "returns :ok", %{data: data} do + assert :ok == insert(data) + end + end + + describe "update/2" do + test "returns :ok", %{old_data: old_data, data: data} do + assert :ok == update(old_data, data) + end + end + + describe "delete/1" do + test "returns :ok", %{data: data} do + assert :ok == delete(data) + end + end +end diff --git a/elixir/apps/domain/test/domain/events/hooks/actor_groups_test.exs b/elixir/apps/domain/test/domain/events/hooks/actor_groups_test.exs new file mode 100644 index 000000000..7d4c48eea --- /dev/null +++ b/elixir/apps/domain/test/domain/events/hooks/actor_groups_test.exs @@ -0,0 +1,26 @@ +defmodule Domain.Events.Hooks.ActorGroupsTest do + use ExUnit.Case, async: true + import Domain.Events.Hooks.ActorGroups + + setup do + %{old_data: %{}, data: %{}} + end + + describe "insert/1" do + test "returns :ok", %{data: data} do + assert :ok == insert(data) + end + end + + describe "update/2" do + test "returns :ok", %{old_data: old_data, data: data} do + assert :ok == update(old_data, data) + end + end + + describe "delete/1" do + test "returns :ok", %{data: data} do + assert :ok == delete(data) + end + end +end diff --git a/elixir/apps/domain/test/domain/events/hooks/actors_test.exs b/elixir/apps/domain/test/domain/events/hooks/actors_test.exs new file mode 100644 index 000000000..d6571fb95 --- /dev/null +++ b/elixir/apps/domain/test/domain/events/hooks/actors_test.exs @@ -0,0 +1,26 @@ +defmodule Domain.Events.Hooks.ActorsTest do + use ExUnit.Case, async: true + import Domain.Events.Hooks.Actors + + setup do + %{old_data: %{}, data: %{}} + end + + describe "insert/1" do + test "returns :ok", %{data: data} do + assert :ok == insert(data) + end + end + + describe "update/2" do + test "returns :ok", %{old_data: old_data, data: data} do + assert :ok == update(old_data, data) + end + end + + describe "delete/1" do + test "returns :ok", %{data: data} do + assert :ok == delete(data) + end + end +end diff --git a/elixir/apps/domain/test/domain/events/hooks/auth_identities_test.exs b/elixir/apps/domain/test/domain/events/hooks/auth_identities_test.exs new file mode 100644 index 000000000..cbef39fa9 --- /dev/null +++ b/elixir/apps/domain/test/domain/events/hooks/auth_identities_test.exs @@ -0,0 +1,26 @@ +defmodule Domain.Events.Hooks.AuthIdentitiesTest do + use ExUnit.Case, async: true + import Domain.Events.Hooks.AuthIdentities + + setup do + %{old_data: %{}, data: %{}} + end + + describe "insert/1" do + test "returns :ok", %{data: data} do + assert :ok == insert(data) + end + end + + describe "update/2" do + test "returns :ok", %{old_data: old_data, data: data} do + assert :ok == update(old_data, data) + end + end + + describe "delete/1" do + test "returns :ok", %{data: data} do + assert :ok == delete(data) + end + end +end diff --git a/elixir/apps/domain/test/domain/events/hooks/auth_providers_test.exs b/elixir/apps/domain/test/domain/events/hooks/auth_providers_test.exs new file mode 100644 index 000000000..c9f3808bc --- /dev/null +++ b/elixir/apps/domain/test/domain/events/hooks/auth_providers_test.exs @@ -0,0 +1,26 @@ +defmodule Domain.Events.Hooks.AuthProvidersTest do + use ExUnit.Case, async: true + import Domain.Events.Hooks.AuthProviders + + setup do + %{old_data: %{}, data: %{}} + end + + describe "insert/1" do + test "returns :ok", %{data: data} do + assert :ok == insert(data) + end + end + + describe "update/2" do + test "returns :ok", %{old_data: old_data, data: data} do + assert :ok == update(old_data, data) + end + end + + describe "delete/1" do + test "returns :ok", %{data: data} do + assert :ok == delete(data) + end + end +end diff --git a/elixir/apps/domain/test/domain/events/hooks/clients_test.exs b/elixir/apps/domain/test/domain/events/hooks/clients_test.exs new file mode 100644 index 000000000..19beaee56 --- /dev/null +++ b/elixir/apps/domain/test/domain/events/hooks/clients_test.exs @@ -0,0 +1,26 @@ +defmodule Domain.Events.Hooks.ClientsTest do + use ExUnit.Case, async: true + import Domain.Events.Hooks.Clients + + setup do + %{old_data: %{}, data: %{}} + end + + describe "insert/1" do + test "returns :ok", %{data: data} do + assert :ok == insert(data) + end + end + + describe "update/2" do + test "returns :ok", %{old_data: old_data, data: data} do + assert :ok == update(old_data, data) + end + end + + describe "delete/1" do + test "returns :ok", %{data: data} do + assert :ok == delete(data) + end + end +end diff --git a/elixir/apps/domain/test/domain/events/hooks/flow_activities_test.exs b/elixir/apps/domain/test/domain/events/hooks/flow_activities_test.exs new file mode 100644 index 000000000..d7db59537 --- /dev/null +++ b/elixir/apps/domain/test/domain/events/hooks/flow_activities_test.exs @@ -0,0 +1,26 @@ +defmodule Domain.Events.Hooks.FlowActivitiesTest do + use ExUnit.Case, async: true + import Domain.Events.Hooks.FlowActivities + + setup do + %{old_data: %{}, data: %{}} + end + + describe "insert/1" do + test "returns :ok", %{data: data} do + assert :ok == insert(data) + end + end + + describe "update/2" do + test "returns :ok", %{old_data: old_data, data: data} do + assert :ok == update(old_data, data) + end + end + + describe "delete/1" do + test "returns :ok", %{data: data} do + assert :ok == delete(data) + end + end +end diff --git a/elixir/apps/domain/test/domain/events/hooks/flows_test.exs b/elixir/apps/domain/test/domain/events/hooks/flows_test.exs new file mode 100644 index 000000000..3e1467f1a --- /dev/null +++ b/elixir/apps/domain/test/domain/events/hooks/flows_test.exs @@ -0,0 +1,26 @@ +defmodule Domain.Events.Hooks.FlowsTest do + use ExUnit.Case, async: true + import Domain.Events.Hooks.Flows + + setup do + %{old_data: %{}, data: %{}} + end + + describe "insert/1" do + test "returns :ok", %{data: data} do + assert :ok == insert(data) + end + end + + describe "update/2" do + test "returns :ok", %{old_data: old_data, data: data} do + assert :ok == update(old_data, data) + end + end + + describe "delete/1" do + test "returns :ok", %{data: data} do + assert :ok == delete(data) + end + end +end diff --git a/elixir/apps/domain/test/domain/events/hooks/gateway_groups_test.exs b/elixir/apps/domain/test/domain/events/hooks/gateway_groups_test.exs new file mode 100644 index 000000000..a039e9a5e --- /dev/null +++ b/elixir/apps/domain/test/domain/events/hooks/gateway_groups_test.exs @@ -0,0 +1,26 @@ +defmodule Domain.Events.Hooks.GatewayGroupsTest do + use ExUnit.Case, async: true + import Domain.Events.Hooks.GatewayGroups + + setup do + %{old_data: %{}, data: %{}} + end + + describe "insert/1" do + test "returns :ok", %{data: data} do + assert :ok == insert(data) + end + end + + describe "update/2" do + test "returns :ok", %{old_data: old_data, data: data} do + assert :ok == update(old_data, data) + end + end + + describe "delete/1" do + test "returns :ok", %{data: data} do + assert :ok == delete(data) + end + end +end diff --git a/elixir/apps/domain/test/domain/events/hooks/gateways_test.exs b/elixir/apps/domain/test/domain/events/hooks/gateways_test.exs new file mode 100644 index 000000000..22cca15bf --- /dev/null +++ b/elixir/apps/domain/test/domain/events/hooks/gateways_test.exs @@ -0,0 +1,26 @@ +defmodule Domain.Events.Hooks.GatewaysTest do + use ExUnit.Case, async: true + import Domain.Events.Hooks.Gateways + + setup do + %{old_data: %{}, data: %{}} + end + + describe "insert/1" do + test "returns :ok", %{data: data} do + assert :ok == insert(data) + end + end + + describe "update/2" do + test "returns :ok", %{old_data: old_data, data: data} do + assert :ok == update(old_data, data) + end + end + + describe "delete/1" do + test "returns :ok", %{data: data} do + assert :ok == delete(data) + end + end +end diff --git a/elixir/apps/domain/test/domain/events/hooks/policies_test.exs b/elixir/apps/domain/test/domain/events/hooks/policies_test.exs new file mode 100644 index 000000000..52792c4ec --- /dev/null +++ b/elixir/apps/domain/test/domain/events/hooks/policies_test.exs @@ -0,0 +1,26 @@ +defmodule Domain.Events.Hooks.PoliciesTest do + use ExUnit.Case, async: true + import Domain.Events.Hooks.Policies + + setup do + %{old_data: %{}, data: %{}} + end + + describe "insert/1" do + test "returns :ok", %{data: data} do + assert :ok == insert(data) + end + end + + describe "update/2" do + test "returns :ok", %{old_data: old_data, data: data} do + assert :ok == update(old_data, data) + end + end + + describe "delete/1" do + test "returns :ok", %{data: data} do + assert :ok == delete(data) + end + end +end diff --git a/elixir/apps/domain/test/domain/events/hooks/relay_groups_test.exs b/elixir/apps/domain/test/domain/events/hooks/relay_groups_test.exs new file mode 100644 index 000000000..b2ad51816 --- /dev/null +++ b/elixir/apps/domain/test/domain/events/hooks/relay_groups_test.exs @@ -0,0 +1,26 @@ +defmodule Domain.Events.Hooks.RelayGroupsTest do + use ExUnit.Case, async: true + import Domain.Events.Hooks.RelayGroups + + setup do + %{old_data: %{}, data: %{}} + end + + describe "insert/1" do + test "returns :ok", %{data: data} do + assert :ok == insert(data) + end + end + + describe "update/2" do + test "returns :ok", %{old_data: old_data, data: data} do + assert :ok == update(old_data, data) + end + end + + describe "delete/1" do + test "returns :ok", %{data: data} do + assert :ok == delete(data) + end + end +end diff --git a/elixir/apps/domain/test/domain/events/hooks/relays_test.exs b/elixir/apps/domain/test/domain/events/hooks/relays_test.exs new file mode 100644 index 000000000..1d5c75f78 --- /dev/null +++ b/elixir/apps/domain/test/domain/events/hooks/relays_test.exs @@ -0,0 +1,26 @@ +defmodule Domain.Events.Hooks.RelaysTest do + use ExUnit.Case, async: true + import Domain.Events.Hooks.Relays + + setup do + %{old_data: %{}, data: %{}} + end + + describe "insert/1" do + test "returns :ok", %{data: data} do + assert :ok == insert(data) + end + end + + describe "update/2" do + test "returns :ok", %{old_data: old_data, data: data} do + assert :ok == update(old_data, data) + end + end + + describe "delete/1" do + test "returns :ok", %{data: data} do + assert :ok == delete(data) + end + end +end diff --git a/elixir/apps/domain/test/domain/events/hooks/resource_connections_test.exs b/elixir/apps/domain/test/domain/events/hooks/resource_connections_test.exs new file mode 100644 index 000000000..aa05dca39 --- /dev/null +++ b/elixir/apps/domain/test/domain/events/hooks/resource_connections_test.exs @@ -0,0 +1,26 @@ +defmodule Domain.Events.Hooks.ResourceConnectionsTest do + use ExUnit.Case, async: true + import Domain.Events.Hooks.ResourceConnections + + setup do + %{old_data: %{}, data: %{}} + end + + describe "insert/1" do + test "returns :ok", %{data: data} do + assert :ok == insert(data) + end + end + + describe "update/2" do + test "returns :ok", %{old_data: old_data, data: data} do + assert :ok == update(old_data, data) + end + end + + describe "delete/1" do + test "returns :ok", %{data: data} do + assert :ok == delete(data) + end + end +end diff --git a/elixir/apps/domain/test/domain/events/hooks/resources_test.exs b/elixir/apps/domain/test/domain/events/hooks/resources_test.exs new file mode 100644 index 000000000..689625ad8 --- /dev/null +++ b/elixir/apps/domain/test/domain/events/hooks/resources_test.exs @@ -0,0 +1,26 @@ +defmodule Domain.Events.Hooks.ResourcesTest do + use ExUnit.Case, async: true + import Domain.Events.Hooks.Resources + + setup do + %{old_data: %{}, data: %{}} + end + + describe "insert/1" do + test "returns :ok", %{data: data} do + assert :ok == insert(data) + end + end + + describe "update/2" do + test "returns :ok", %{old_data: old_data, data: data} do + assert :ok == update(old_data, data) + end + end + + describe "delete/1" do + test "returns :ok", %{data: data} do + assert :ok == delete(data) + end + end +end diff --git a/elixir/apps/domain/test/domain/events/hooks/tokens_test.exs b/elixir/apps/domain/test/domain/events/hooks/tokens_test.exs new file mode 100644 index 000000000..2a6b726db --- /dev/null +++ b/elixir/apps/domain/test/domain/events/hooks/tokens_test.exs @@ -0,0 +1,26 @@ +defmodule Domain.Events.Hooks.TokensTest do + use ExUnit.Case, async: true + import Domain.Events.Hooks.Tokens + + setup do + %{old_data: %{}, data: %{}} + end + + describe "insert/1" do + test "returns :ok", %{data: data} do + assert :ok == insert(data) + end + end + + describe "update/2" do + test "returns :ok", %{old_data: old_data, data: data} do + assert :ok == update(old_data, data) + end + end + + describe "delete/1" do + test "returns :ok", %{data: data} do + assert :ok == delete(data) + end + end +end diff --git a/elixir/apps/domain/test/domain/events/replication_connection_test.exs b/elixir/apps/domain/test/domain/events/replication_connection_test.exs new file mode 100644 index 000000000..d74bf08ee --- /dev/null +++ b/elixir/apps/domain/test/domain/events/replication_connection_test.exs @@ -0,0 +1,215 @@ +defmodule Domain.Events.ReplicationConnectionTest do + # Only one ReplicationConnection should be started in the cluster + use ExUnit.Case, async: false + + alias Domain.Events.ReplicationConnection + + # Used to test callbacks, not used for live connection + @mock_state %ReplicationConnection{ + schema: "test_schema", + connection_opts: [], + step: :disconnected, + publication_name: "test_pub", + replication_slot_name: "test_slot", + output_plugin: "pgoutput", + proto_version: 1, + table_subscriptions: ["accounts", "resources"], + relations: %{} + } + + # Used to test live connection + setup_all do + config = + Application.fetch_env!(:domain, Domain.Events.ReplicationConnection) + + instance = struct(Domain.Events.ReplicationConnection, config) + + child_spec = %{ + id: Domain.Events.ReplicationConnection, + start: {Domain.Events.ReplicationConnection, :start_link, [instance]}, + restart: :transient + } + + {:ok, pid} = start_supervised(child_spec) + + {:ok, pid: pid} + end + + describe "handle_connect/1 callback" do + test "handle_connect initiates publication check" do + state = @mock_state + expected_query = "SELECT 1 FROM pg_publication WHERE pubname = '#{state.publication_name}'" + expected_next_state = %{state | step: :create_publication} + + assert {:query, ^expected_query, ^expected_next_state} = + ReplicationConnection.handle_connect(state) + end + end + + describe "handle_result/2 callback" do + test "handle_result transitions from create_publication to create_replication_slot when publication exists" do + state = %{@mock_state | step: :create_publication} + result = [%Postgrex.Result{num_rows: 1}] + + expected_query = + "SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'" + + expected_next_state = %{state | step: :create_replication_slot} + + assert {:query, ^expected_query, ^expected_next_state} = + ReplicationConnection.handle_result(result, state) + end + + test "handle_result transitions from create_replication_slot to start_replication_slot when slot exists" do + state = %{@mock_state | step: :create_replication_slot} + result = [%Postgrex.Result{num_rows: 1}] + + expected_query = "SELECT 1" + expected_next_state = %{state | step: :start_replication_slot} + + assert {:query, ^expected_query, ^expected_next_state} = + ReplicationConnection.handle_result(result, state) + end + + test "handle_result transitions from start_replication_slot to streaming" do + state = %{@mock_state | step: :start_replication_slot} + result = [%Postgrex.Result{num_rows: 1}] + + expected_stream_query = + "START_REPLICATION SLOT \"#{state.replication_slot_name}\" LOGICAL 0/0 (proto_version '#{state.proto_version}', publication_names '#{state.publication_name}')" + + expected_next_state = %{state | step: :streaming} + + assert {:stream, ^expected_stream_query, [], ^expected_next_state} = + ReplicationConnection.handle_result(result, state) + end + + test "handle_result creates publication if it doesn't exist" do + state = %{@mock_state | step: :create_publication} + result = [%Postgrex.Result{num_rows: 0}] + + expected_tables = "test_schema.accounts,test_schema.resources" + expected_query = "CREATE PUBLICATION #{state.publication_name} FOR TABLE #{expected_tables}" + expected_next_state = %{state | step: :check_replication_slot} + + assert {:query, ^expected_query, ^expected_next_state} = + ReplicationConnection.handle_result(result, state) + end + + test "handle_result transitions from check_replication_slot to create_replication_slot after creating publication" do + state = %{@mock_state | step: :check_replication_slot} + result = [%Postgrex.Result{num_rows: 0}] + + expected_query = + "SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'" + + expected_next_state = %{state | step: :create_replication_slot} + + assert {:query, ^expected_query, ^expected_next_state} = + ReplicationConnection.handle_result(result, state) + end + + test "handle_result creates replication slot if it doesn't exist" do + state = %{@mock_state | step: :create_replication_slot} + result = [%Postgrex.Result{num_rows: 0}] + + expected_query = + "CREATE_REPLICATION_SLOT #{state.replication_slot_name} LOGICAL #{state.output_plugin} NOEXPORT_SNAPSHOT" + + expected_next_state = %{state | step: :start_replication_slot} + + assert {:query, ^expected_query, ^expected_next_state} = + ReplicationConnection.handle_result(result, state) + end + end + + # In-depth decoding tests are handled in Domain.Events.DecoderTest + describe "handle_data/2" do + test "handle_data handles KeepAlive with reply :now" do + state = %{@mock_state | step: :streaming} + wal_end = 12345 + + now = + System.os_time(:microsecond) - DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond) + + # 100 milliseconds + grace_period = 100_000 + keepalive_data = <> + + assert {:noreply, reply, ^state} = + ReplicationConnection.handle_data(keepalive_data, state) + + assert [<>] = reply + + assert now <= clock + assert clock < now + grace_period + end + + test "handle_data handles KeepAlive with reply :later" do + state = %{@mock_state | step: :streaming} + wal_end = 54321 + + keepalive_data = <> + expected_reply_message = [] + + assert {:noreply, ^expected_reply_message, ^state} = + ReplicationConnection.handle_data(keepalive_data, state) + end + + test "handle_data handles Write message" do + state = %{@mock_state | step: :streaming} + server_wal_start = 123_456_789 + server_wal_end = 987_654_321 + server_system_clock = 1_234_567_890 + message = "Hello, world!" + + write_data = + <> + + assert {:noreply, [], ^state} = ReplicationConnection.handle_data(write_data, state) + end + + test "handle_data handles unknown message" do + state = %{@mock_state | step: :streaming} + unknown_data = <> + + assert {:noreply, [], ^state} = ReplicationConnection.handle_data(unknown_data, state) + end + end + + describe "handle_info/2" do + test "handle_info handles :shutdown message" do + state = @mock_state + assert {:disconnect, :normal} = ReplicationConnection.handle_info(:shutdown, state) + end + + test "handle_info handles :DOWN message from monitored process" do + state = @mock_state + monitor_ref = make_ref() + down_msg = {:DOWN, monitor_ref, :process, :some_pid, :shutdown} + + assert {:disconnect, :normal} = ReplicationConnection.handle_info(down_msg, state) + end + + test "handle_info ignores other messages" do + state = @mock_state + random_msg = {:some_other_info, "data"} + + assert {:noreply, ^state} = ReplicationConnection.handle_info(random_msg, state) + end + end + + describe "handle_disconnect/1" do + test "handle_disconnect resets step to :disconnected and logs warning" do + state = %{@mock_state | step: :streaming} + expected_state = %{state | step: :disconnected} + + log_output = + ExUnit.CaptureLog.capture_log(fn -> + assert {:noreply, ^expected_state} = ReplicationConnection.handle_disconnect(state) + end) + + assert log_output =~ "Replication connection disconnected" + end + end +end diff --git a/elixir/config/config.exs b/elixir/config/config.exs index 74f3da02c..6f89d94df 100644 --- a/elixir/config/config.exs +++ b/elixir/config/config.exs @@ -30,6 +30,42 @@ config :domain, Domain.Repo, migration_lock: :pg_advisory_lock, start_apps_before_migration: [:ssl, :logger_json] +config :domain, Domain.Events.ReplicationConnection, + connection_opts: [ + # Automatically reconnect if we lose connection. + auto_reconnect: true, + hostname: "localhost", + port: 5432, + ssl: false, + ssl_opts: [], + parameters: [], + username: "postgres", + database: "firezone_dev", + password: "postgres" + ], + # When changing these, make sure to also: + # 1. Make appropriate changes to `Domain.Events.Event` + # 2. Add an appropriate `Domain.Events.Hooks` module + table_subscriptions: ~w[ + accounts + actor_group_memberships + actor_groups + actors + auth_identities + auth_providers + clients + flow_activities + flows + gateway_groups + gateways + policies + relay_groups + relays + resource_connections + resources + tokens + ] + config :domain, Domain.Tokens, key_base: "5OVYJ83AcoQcPmdKNksuBhJFBhjHD1uUa9mDOHV/6EIdBQ6pXksIhkVeWIzFk5S2", salt: "t01wa0K4lUd7mKa0HAtZdE+jFOPDDej2" diff --git a/elixir/config/runtime.exs b/elixir/config/runtime.exs index 582196343..ffc1e4a7d 100644 --- a/elixir/config/runtime.exs +++ b/elixir/config/runtime.exs @@ -27,6 +27,20 @@ if config_env() == :prod do else: [{:hostname, compile_config!(:database_host)}] ) + config :domain, Domain.Events.ReplicationConnection, + connection_opts: [ + # Automatically reconnect if we lose connection. + auto_reconnect: true, + hostname: compile_config!(:database_host), + port: compile_config!(:database_port), + ssl: compile_config!(:database_ssl_enabled), + ssl_opts: compile_config!(:database_ssl_opts), + parameters: compile_config!(:database_parameters), + username: compile_config!(:database_replication_user), + password: compile_config!(:database_replication_password), + database: compile_config!(:database_name) + ] + config :domain, Domain.Tokens, key_base: compile_config!(:tokens_key_base), salt: compile_config!(:tokens_salt) diff --git a/elixir/config/test.exs b/elixir/config/test.exs index 459ed023e..af6a2b598 100644 --- a/elixir/config/test.exs +++ b/elixir/config/test.exs @@ -18,6 +18,14 @@ config :domain, Domain.Repo, pool: Ecto.Adapters.SQL.Sandbox, queue_target: 1000 +config :domain, Domain.Events.ReplicationConnection, + publication_name: "events_test", + replication_slot_name: "events_slot_test", + connection_opts: [ + auto_reconnect: false, + database: "firezone_test#{partition_suffix}" + ] + config :domain, Domain.Telemetry, enabled: false config :domain, Domain.ConnectivityChecks, enabled: false diff --git a/elixir/test.exs b/elixir/test.exs new file mode 100644 index 000000000..41ca8bbbf --- /dev/null +++ b/elixir/test.exs @@ -0,0 +1,515 @@ +defmodule Domain.Events.ReplicationConnectionTest do + # Only one ReplicationConnection should be started in the cluster + use ExUnit.Case, async: false + + alias Domain.Events.Decoder.Messages + alias Domain.Events.ReplicationConnection + + # Used to test callbacks, not used for live connection + @mock_state %ReplicationConnection{ + schema: "test_schema", + connection_opts: [], + step: :disconnected, + publication_name: "test_pub", + replication_slot_name: "test_slot", + output_plugin: "pgoutput", + proto_version: 1, + # Example, adjust if needed + table_subscriptions: ["accounts", "resources"], + relations: %{} + } + + # Used to test live connection (Setup remains unchanged) + setup do + # Ensure Postgrex is started if your tests rely on it implicitly + {:ok, pid} = start_supervised(Domain.Events.ReplicationConnection) + + {:ok, pid: pid} + end + + describe "handle_connect/1 callback" do + test "handle_connect initiates publication check" do + state = @mock_state + expected_query = "SELECT 1 FROM pg_publication WHERE pubname = '#{state.publication_name}'" + expected_next_state = %{state | step: :create_publication} + + assert {:query, ^expected_query, ^expected_next_state} = + ReplicationConnection.handle_connect(state) + end + end + + describe "handle_result/2 callback" do + test "handle_result transitions from create_publication to create_replication_slot when publication exists" do + state = %{@mock_state | step: :create_publication} + # Mock a successful result for the SELECT query + result = %Postgrex.Result{ + command: :select, + columns: ["?column?"], + num_rows: 1, + rows: [[1]] + } + + expected_query = + "SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'" + + expected_next_state = %{state | step: :create_replication_slot} + + assert {:query, ^expected_query, ^expected_next_state} = + ReplicationConnection.handle_result(result, state) + end + + test "handle_result transitions from create_replication_slot to start_replication_slot when slot exists" do + state = %{@mock_state | step: :create_replication_slot} + # Mock a successful result for the SELECT query + result = %Postgrex.Result{ + command: :select, + columns: ["?column?"], + num_rows: 1, + rows: [[1]] + } + + expected_query = + "CREATE_REPLICATION_SLOT #{state.replication_slot_name} LOGICAL #{state.output_plugin} NOEXPORT_SNAPSHOT" + + expected_next_state = %{state | step: :start_replication_slot} + + expected_stream_query = + "START_REPLICATION SLOT \"#{state.replication_slot_name}\" LOGICAL 0/0 (proto_version '#{state.proto_version}', publication_names '#{state.publication_name}')" + + # Should be :streaming directly? Check impl. + expected_next_state_direct = %{state | step: :start_replication_slot} + + # Let's assume it first goes to :start_replication_slot step, then handle_result for *that* step triggers START_REPLICATION + assert {:query, _query, ^expected_next_state_direct} = + ReplicationConnection.handle_result(result, state) + end + + test "handle_result transitions from start_replication_slot to streaming" do + state = %{@mock_state | step: :start_replication_slot} + # Mock a successful result for the CREATE_REPLICATION_SLOT or preceding step + result = %Postgrex.Result{ + # Or whatever command led here + command: :create_replication_slot, + columns: nil, + num_rows: 0, + rows: nil + } + + expected_stream_query = + "START_REPLICATION SLOT \"#{state.replication_slot_name}\" LOGICAL 0/0 (proto_version '#{state.proto_version}', publication_names '#{state.publication_name}')" + + expected_next_state = %{state | step: :streaming} + + assert {:stream, ^expected_stream_query, [], ^expected_next_state} = + ReplicationConnection.handle_result(result, state) + end + + test "handle_result creates publication if it doesn't exist" do + state = %{@mock_state | step: :create_publication} + # Mock result indicating publication doesn't exist + result = %Postgrex.Result{ + command: :select, + columns: ["?column?"], + num_rows: 0, + rows: [] + } + + # Combine schema and table names correctly + expected_tables = + state.table_subscriptions + |> Enum.map(fn table -> "#{state.schema}.#{table}" end) + |> Enum.join(",") + + expected_query = "CREATE PUBLICATION #{state.publication_name} FOR TABLE #{expected_tables}" + # The original test expected the next step to be :check_replication_slot, let's keep that + expected_next_state = %{state | step: :check_replication_slot} + + assert {:query, ^expected_query, ^expected_next_state} = + ReplicationConnection.handle_result(result, state) + end + + test "handle_result transitions from check_replication_slot to create_replication_slot after creating publication" do + state = %{@mock_state | step: :check_replication_slot} + # Mock a successful result from the CREATE PUBLICATION command + result = %Postgrex.Result{ + command: :create_publication, + columns: nil, + num_rows: 0, + rows: nil + } + + expected_query = + "SELECT 1 FROM pg_replication_slots WHERE slot_name = '#{state.replication_slot_name}'" + + expected_next_state = %{state | step: :create_replication_slot} + + assert {:query, ^expected_query, ^expected_next_state} = + ReplicationConnection.handle_result(result, state) + end + + test "handle_result creates replication slot if it doesn't exist" do + state = %{@mock_state | step: :create_replication_slot} + # Mock result indicating slot doesn't exist + result = %Postgrex.Result{ + command: :select, + columns: ["?column?"], + num_rows: 0, + rows: [] + } + + expected_query = + "CREATE_REPLICATION_SLOT #{state.replication_slot_name} LOGICAL #{state.output_plugin} NOEXPORT_SNAPSHOT" + + expected_next_state = %{state | step: :start_replication_slot} + + assert {:query, ^expected_query, ^expected_next_state} = + ReplicationConnection.handle_result(result, state) + end + end + + # --- handle_data tests remain unchanged --- + # In-depth decoding tests are handled in Domain.Events.DecoderTest + describe "handle_data/2" do + test "handle_data handles KeepAlive with reply :now" do + state = %{@mock_state | step: :streaming} + wal_end = 12345 + # Keepalive doesn't use this field meaningfully here + server_wal_start = 0 + # Reply requested + reply_requested = 1 + + now_microseconds = + System.os_time(:microsecond) - DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond) + + # 100 milliseconds tolerance for clock check + grace_period_microseconds = 100_000 + + keepalive_data = <> + + # Expected reply format: 'r', confirmed_lsn::64, confirmed_lsn_commit::64, no_reply::8, high_priority::8, clock::64 + # The actual implementation might construct the reply differently. + # This assertion needs to match the exact binary structure returned by handle_data. + # Let's assume the implementation sends back the received wal_end as confirmed LSNs, + # and the current time. The no_reply and high_priority flags might be 0. + assert {:reply, reply_binary, ^state} = + ReplicationConnection.handle_data(keepalive_data, state) + + # Deconstruct the reply to verify its parts + assert <> = reply_binary + + assert confirmed_lsn == wal_end + # Or potentially server_wal_start? Check impl. + assert confirmed_lsn_commit == wal_end + assert no_reply == 0 + assert high_priority == 0 + assert now_microseconds <= clock < now_microseconds + grace_period_microseconds + end + + test "handle_data handles KeepAlive with reply :later" do + state = %{@mock_state | step: :streaming} + wal_end = 54321 + server_wal_start = 0 + # No reply requested + reply_requested = 0 + + keepalive_data = <> + + # When no reply is requested, it should return :noreply with no binary message + assert {:noreply, [], ^state} = + ReplicationConnection.handle_data(keepalive_data, state) + end + + test "handle_data handles Write message (XLogData)" do + state = %{@mock_state | step: :streaming} + server_wal_start = 123_456_789 + # This is the LSN of the end of the WAL data in this message + server_wal_end = 987_654_321 + # Timestamp in microseconds since PG epoch + server_system_clock = 1_234_567_890 + # Example decoded message data (e.g., a BEGIN message binary) + # This data should be passed to handle_info via send(self(), decoded_msg) + message_binary = + <<"B", @lsn_binary || <<0::64>>::binary-8, @timestamp_int || 0::integer-64, + @xid || 0::integer-32>> + + write_data = + <> + + # handle_data for 'w' should decode the message_binary and send it to self() + # It returns {:noreply, [], state} because the reply/acknowledgement happens + # via the KeepAlive ('k') mechanism. + assert {:noreply, [], ^state} = ReplicationConnection.handle_data(write_data, state) + + # Assert that the decoded message was sent to self() + # Note: This requires the test process to receive the message. + # You might need `allow_receive` or similar testing patterns if handle_data + # directly uses `send`. If it calls another function that sends, test that function. + # Let's assume handle_data directly sends for this example. + # Need some sample data defined earlier for the assertion + @lsn_binary <<0::integer-32, 23_785_280::integer-32>> + @timestamp_int 704_521_200_000 + @xid 1234 + @timestamp_decoded ~U[2022-04-29 12:20:00.000000Z] + @lsn_decoded {0, 23_785_280} + + expected_decoded_msg = %Messages.Begin{ + final_lsn: @lsn_decoded, + commit_timestamp: @timestamp_decoded, + xid: @xid + } + + assert_receive(^expected_decoded_msg) + end + + test "handle_data handles unknown message type" do + state = %{@mock_state | step: :streaming} + # Using 'q' as an example unknown type + unknown_data = <> + + # Expect it to ignore unknown types and return noreply + assert {:noreply, [], ^state} = ReplicationConnection.handle_data(unknown_data, state) + # Optionally, assert that a warning was logged if applicable + end + end + + # --- handle_info tests are CORRECTED below --- + describe "handle_info/2" do + test "handle_info updates relations on Relation message" do + state = @mock_state + + # Use the correct fields from Messages.Relation struct + relation_msg = %Messages.Relation{ + id: 101, + namespace: "public", + name: "accounts", + # Added replica_identity + replica_identity: :default, + columns: [ + %Messages.Relation.Column{ + flags: [:key], + name: "id", + type: "int4", + type_modifier: -1 + }, + %Messages.Relation.Column{ + flags: [], + name: "name", + type: "text", + type_modifier: -1 + } + ] + } + + # The state should store the relevant parts of the relation message, keyed by ID + expected_relation_data = %{ + namespace: "public", + name: "accounts", + replica_identity: :default, + columns: [ + %Messages.Relation.Column{ + flags: [:key], + name: "id", + type: "int4", + type_modifier: -1 + }, + %Messages.Relation.Column{ + flags: [], + name: "name", + type: "text", + type_modifier: -1 + } + ] + } + + expected_relations = %{101 => expected_relation_data} + expected_state = %{state | relations: expected_relations} + + assert {:noreply, ^expected_state} = ReplicationConnection.handle_info(relation_msg, state) + end + + test "handle_info returns noreply for Insert message" do + # Pre-populate state with relation info if the handler needs it + state = %{ + @mock_state + | relations: %{ + 101 => %{ + name: "accounts", + namespace: "public", + columns: [ + %Messages.Relation.Column{name: "id", type: "int4"}, + %Messages.Relation.Column{name: "name", type: "text"} + ] + } + } + } + + # Use the correct field: tuple_data (which is a tuple) + insert_msg = %Messages.Insert{relation_id: 101, tuple_data: {1, "Alice"}} + + # handle_info likely broadcasts or processes the insert, but returns noreply + assert {:noreply, ^state} = ReplicationConnection.handle_info(insert_msg, state) + # Add assertions here if handle_info is expected to send messages or call other funcs + end + + test "handle_info returns noreply for Update message" do + state = %{ + @mock_state + | relations: %{ + 101 => %{ + name: "accounts", + namespace: "public", + columns: [ + %Messages.Relation.Column{name: "id", type: "int4"}, + %Messages.Relation.Column{name: "name", type: "text"} + ] + } + } + } + + # Use the correct fields: relation_id, old_tuple_data, tuple_data, changed_key_tuple_data + update_msg = %Messages.Update{ + relation_id: 101, + # Example: only old data provided + old_tuple_data: {1, "Alice"}, + # Example: new data + tuple_data: {1, "Bob"}, + # Example: key didn't change or wasn't provided + changed_key_tuple_data: nil + } + + assert {:noreply, ^state} = ReplicationConnection.handle_info(update_msg, state) + # Add assertions for side effects (broadcasts etc.) if needed + end + + test "handle_info returns noreply for Delete message" do + state = %{ + @mock_state + | relations: %{ + 101 => %{ + name: "accounts", + namespace: "public", + columns: [ + %Messages.Relation.Column{name: "id", type: "int4"}, + %Messages.Relation.Column{name: "name", type: "text"} + ] + } + } + } + + # Use the correct fields: relation_id, old_tuple_data, changed_key_tuple_data + delete_msg = %Messages.Delete{ + relation_id: 101, + # Example: old data provided + old_tuple_data: {1, "Bob"}, + # Example: key data not provided + changed_key_tuple_data: nil + } + + assert {:noreply, ^state} = ReplicationConnection.handle_info(delete_msg, state) + # Add assertions for side effects if needed + end + + test "handle_info ignores Begin message" do + state = @mock_state + # Use correct fields: final_lsn, commit_timestamp, xid + begin_msg = %Messages.Begin{ + final_lsn: {0, 123}, + commit_timestamp: ~U[2023-01-01 10:00:00Z], + xid: 789 + } + + assert {:noreply, ^state} = ReplicationConnection.handle_info(begin_msg, state) + end + + test "handle_info ignores Commit message" do + state = @mock_state + # Use correct fields: flags, lsn, end_lsn, commit_timestamp + commit_msg = %Messages.Commit{ + flags: [], + lsn: {0, 123}, + end_lsn: {0, 456}, + commit_timestamp: ~U[2023-01-01 10:00:01Z] + } + + assert {:noreply, ^state} = ReplicationConnection.handle_info(commit_msg, state) + end + + test "handle_info ignores Origin message" do + state = @mock_state + # Use correct fields: origin_commit_lsn, name + origin_msg = %Messages.Origin{origin_commit_lsn: {0, 1}, name: "origin_name"} + assert {:noreply, ^state} = ReplicationConnection.handle_info(origin_msg, state) + end + + test "handle_info ignores Truncate message" do + state = @mock_state + # Use correct fields: number_of_relations, options, truncated_relations + truncate_msg = %Messages.Truncate{ + number_of_relations: 2, + options: [:cascade], + truncated_relations: [101, 102] + } + + assert {:noreply, ^state} = ReplicationConnection.handle_info(truncate_msg, state) + end + + test "handle_info ignores Type message" do + state = @mock_state + # Use correct fields: id, namespace, name + type_msg = %Messages.Type{id: 23, namespace: "pg_catalog", name: "int4"} + assert {:noreply, ^state} = ReplicationConnection.handle_info(type_msg, state) + end + + test "handle_info returns noreply for Unsupported message" do + state = @mock_state + unsupported_msg = %Messages.Unsupported{data: <<1, 2, 3>>} + # We cannot easily verify Logger.warning was called without mocks/capture. + assert {:noreply, ^state} = ReplicationConnection.handle_info(unsupported_msg, state) + end + + test "handle_info handles :shutdown message" do + state = @mock_state + # Expect :disconnect tuple based on common GenServer patterns for shutdown + assert {:stop, :normal, ^state} = ReplicationConnection.handle_info(:shutdown, state) + # Note: The original test asserted {:disconnect, :normal}. {:stop, :normal, state} is + # the standard GenServer return for a clean stop triggered by handle_info. Adjust + # if your implementation specifically returns :disconnect. + end + + test "handle_info handles :DOWN message from monitored process" do + state = @mock_state + monitor_ref = make_ref() + # Example DOWN message structure + down_msg = {:DOWN, monitor_ref, :process, :some_pid, :shutdown} + + # Expect the server to stop itself upon receiving DOWN for a critical process + assert {:stop, :normal, ^state} = ReplicationConnection.handle_info(down_msg, state) + # Again, adjust the expected return (:disconnect vs :stop) based on implementation. + end + + test "handle_info ignores other messages" do + state = @mock_state + random_msg = {:some_other_info, "data"} + assert {:noreply, ^state} = ReplicationConnection.handle_info(random_msg, state) + end + end + + # --- Moved handle_disconnect test to its own describe block --- + describe "handle_disconnect/1" do + test "handle_disconnect resets step to :disconnected and logs warning" do + state = %{@mock_state | step: :streaming} + expected_state = %{state | step: :disconnected} + + # Capture log to verify warning (requires ExUnit config) + log_output = + ExUnit.CaptureLog.capture_log(fn -> + assert {:noreply, ^expected_state} = ReplicationConnection.handle_disconnect(state) + end) + + assert log_output =~ "Replication connection disconnected." + # Or match the exact log message if needed + end + end +end