From 8d701efe4b817369129bee627996f27cb2f92e7e Mon Sep 17 00:00:00 2001 From: Jamil Date: Wed, 28 May 2025 11:23:48 -0700 Subject: [PATCH] refactor(portal): Move `config_changed` to WAL broadcaster (#9285) Now that the WAL consumer has been dry running in production for some time, we can begin moving events over to it. We start with a relatively simple case: the account `config_changed` event. Since side effects now happen decoupled from the actual record updates, testing is updated in this PR: - We don't expect broadcasts to happen in the `accounts_test.exs` - these context modules are now solely responsible for managing updates to records and will no longer need to worry about side effects (in the typical case) like subscribe and broadcast - The Event hooks module now contains all logic related to processing side effects for a particular account update. The net effect is that we now have dedicated module and tests for side effects, starting with `accounts`. Related: #6294 Related: #8187 --- elixir/apps/api/lib/api/client/channel.ex | 4 +- elixir/apps/domain/lib/domain/accounts.ex | 33 +-------------- elixir/apps/domain/lib/domain/actors.ex | 1 + .../lib/domain/events/hooks/accounts.ex | 21 +++++++++- .../apps/domain/test/domain/accounts_test.exs | 8 ---- .../domain/test/domain/events/event_test.exs | 22 ---------- .../domain/events/hooks/accounts_test.exs | 41 ++++++++++++++++++- .../apps/domain/test/domain/events_test.exs | 3 ++ 8 files changed, 65 insertions(+), 68 deletions(-) create mode 100644 elixir/apps/domain/test/domain/events_test.exs diff --git a/elixir/apps/api/lib/api/client/channel.ex b/elixir/apps/api/lib/api/client/channel.ex index dc82dc6f0..07a4bcb24 100644 --- a/elixir/apps/api/lib/api/client/channel.ex +++ b/elixir/apps/api/lib/api/client/channel.ex @@ -1,7 +1,7 @@ defmodule API.Client.Channel do use API, :channel alias API.Client.Views - alias Domain.{Accounts, Clients, Actors, Resources, Gateways, Relays, Policies, Flows} + alias Domain.{Accounts, Clients, Actors, Events, Resources, Gateways, Relays, Policies, Flows} alias Domain.Relays.Presence.Debouncer require Logger require OpenTelemetry.Tracer @@ -127,7 +127,7 @@ defmodule API.Client.Channel do :ok = Clients.connect_client(socket.assigns.client) # Subscribe for account config updates - :ok = Accounts.subscribe_to_events_in_account(socket.assigns.client.account_id) + :ok = Events.Hooks.Accounts.subscribe(socket.assigns.client.account_id) # We subscribe for membership updates for all actor groups the client is a member of, :ok = Actors.subscribe_to_membership_updates_for_actor(socket.assigns.subject.actor) diff --git a/elixir/apps/domain/lib/domain/accounts.ex b/elixir/apps/domain/lib/domain/accounts.ex index 1be9a3e7f..89af8613a 100644 --- a/elixir/apps/domain/lib/domain/accounts.ex +++ b/elixir/apps/domain/lib/domain/accounts.ex @@ -1,6 +1,6 @@ defmodule Domain.Accounts do alias Web.Settings.Account - alias Domain.{Repo, Config, PubSub} + alias Domain.{Repo, Config} alias Domain.{Auth, Billing} alias Domain.Accounts.{Account, Features, Authorizer} @@ -109,13 +109,6 @@ defmodule Domain.Accounts do defp on_account_update(account, changeset) do :ok = Billing.on_account_update(account, changeset) - - # TODO: WAL - if Ecto.Changeset.changed?(changeset, :config) do - broadcast_config_update_to_account(account) - else - :ok - end end for feature <- Features.__schema__(:fields) do @@ -161,28 +154,4 @@ defmodule Domain.Accounts do def type(%Account{}) do "Starter" end - - ### PubSub - - defp account_topic(%Account{} = account), do: account_topic(account.id) - defp account_topic(account_id), do: "accounts:#{account_id}" - - def subscribe_to_events_in_account(account_or_id) do - account_or_id |> account_topic() |> PubSub.subscribe() - end - - def unsubscribe_from_events_in_account(account_or_id) do - account_or_id |> account_topic() |> PubSub.unsubscribe() - end - - # TODO: WAL - defp broadcast_config_update_to_account(%Account{} = account) do - broadcast_to_account(account.id, :config_changed) - end - - defp broadcast_to_account(account_or_id, payload) do - account_or_id - |> account_topic() - |> PubSub.broadcast(payload) - end end diff --git a/elixir/apps/domain/lib/domain/actors.ex b/elixir/apps/domain/lib/domain/actors.ex index c945f5506..164a0980d 100644 --- a/elixir/apps/domain/lib/domain/actors.ex +++ b/elixir/apps/domain/lib/domain/actors.ex @@ -754,6 +754,7 @@ defmodule Domain.Actors do end def broadcast_membership_event(action, actor_id, group_id) do + # TODO: WAL - this is an n+1 problem :ok = Policies.broadcast_access_events_for(action, actor_id, group_id) actor_id diff --git a/elixir/apps/domain/lib/domain/events/hooks/accounts.ex b/elixir/apps/domain/lib/domain/events/hooks/accounts.ex index 8b483ffdf..7a39df97b 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/accounts.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/accounts.ex @@ -1,13 +1,30 @@ defmodule Domain.Events.Hooks.Accounts do + alias Domain.PubSub + require Logger + def on_insert(_data) do :ok end - def on_update(_old_data, _data) do - :ok + def on_update(%{"config" => old_config}, %{"config" => config, "id" => account_id}) do + if old_config != config do + broadcast(account_id, :config_changed) + else + :ok + end end def on_delete(_old_data) do :ok end + + def subscribe(account_id) do + PubSub.subscribe("accounts:#{account_id}") + end + + # No unsubscribe needed - account deletions destroy any subscribed entities + + defp broadcast(account_id, event) do + PubSub.broadcast("accounts:#{account_id}", event) + end end diff --git a/elixir/apps/domain/test/domain/accounts_test.exs b/elixir/apps/domain/test/domain/accounts_test.exs index 241337f78..b03031dc6 100644 --- a/elixir/apps/domain/test/domain/accounts_test.exs +++ b/elixir/apps/domain/test/domain/accounts_test.exs @@ -400,8 +400,6 @@ defmodule Domain.AccountsTest do } } - :ok = subscribe_to_events_in_account(account) - assert {:ok, account} = update_account(account, attrs, subject) assert account.name == attrs.name @@ -424,8 +422,6 @@ defmodule Domain.AccountsTest do address: "8.8.8.8" } ] - - assert_receive :config_changed end test "returns an error when trying to update other account", %{ @@ -719,8 +715,6 @@ defmodule Domain.AccountsTest do } } - :ok = subscribe_to_events_in_account(account) - assert {:ok, account} = update_account_by_id(account.id, attrs) assert account.name == attrs.name @@ -750,8 +744,6 @@ defmodule Domain.AccountsTest do address: "8.8.8.8" } ] - - assert_receive :config_changed end test "broadcasts disconnect message for the clients when account is disabled", %{ diff --git a/elixir/apps/domain/test/domain/events/event_test.exs b/elixir/apps/domain/test/domain/events/event_test.exs index a83e7bfec..946fb7e3c 100644 --- a/elixir/apps/domain/test/domain/events/event_test.exs +++ b/elixir/apps/domain/test/domain/events/event_test.exs @@ -21,27 +21,5 @@ defmodule Domain.Events.EventTest do assert :ok == ingest(msg, relations) end end - - test "returns :ok for update on all configured table subscriptions", %{ - table_subscriptions: table_subscriptions - } do - for table <- table_subscriptions do - relations = %{"1" => %{name: table, columns: []}} - msg = %Decoder.Messages.Update{old_tuple_data: {}, tuple_data: {}, relation_id: "1"} - - assert :ok == ingest(msg, relations) - end - end - - test "returns :ok for delete on all configured table subscriptions", %{ - table_subscriptions: table_subscriptions - } do - for table <- table_subscriptions do - relations = %{"1" => %{name: table, columns: []}} - msg = %Decoder.Messages.Delete{old_tuple_data: {}, relation_id: "1"} - - assert :ok == ingest(msg, relations) - end - end end end diff --git a/elixir/apps/domain/test/domain/events/hooks/accounts_test.exs b/elixir/apps/domain/test/domain/events/hooks/accounts_test.exs index 63c68d629..6d9176dbc 100644 --- a/elixir/apps/domain/test/domain/events/hooks/accounts_test.exs +++ b/elixir/apps/domain/test/domain/events/hooks/accounts_test.exs @@ -1,5 +1,5 @@ defmodule Domain.Events.Hooks.AccountsTest do - use ExUnit.Case, async: true + use Domain.DataCase, async: true import Domain.Events.Hooks.Accounts setup do @@ -13,8 +13,45 @@ defmodule Domain.Events.Hooks.AccountsTest do end describe "update/2" do - test "returns :ok", %{old_data: old_data, data: data} do + test "sends :config_changed if config changes" do + account = Fixtures.Accounts.create_account() + + :ok = subscribe(account.id) + + old_data = %{ + "id" => account.id, + "config" => %{"search_domain" => "old_value", "clients_upstream_dns" => []} + } + + data = %{ + "id" => account.id, + "config" => %{ + "search_domain" => "new_value", + "clients_upstream_dns" => [%{"protocol" => "ip_port", "address" => "8.8.8.8"}] + } + } + assert :ok == on_update(old_data, data) + assert_receive :config_changed + end + + test "does not send :config_changed if config does not change" do + account = Fixtures.Accounts.create_account() + + :ok = subscribe(account.id) + + old_data = %{ + "id" => account.id, + "config" => %{"search_domain" => "old_value", "clients_upstream_dns" => []} + } + + data = %{ + "id" => account.id, + "config" => %{"search_domain" => "old_value", "clients_upstream_dns" => []} + } + + assert :ok == on_update(old_data, data) + refute_receive :config_changed end end diff --git a/elixir/apps/domain/test/domain/events_test.exs b/elixir/apps/domain/test/domain/events_test.exs new file mode 100644 index 000000000..abc156ef8 --- /dev/null +++ b/elixir/apps/domain/test/domain/events_test.exs @@ -0,0 +1,3 @@ +defmodule Domain.EventsTest do + # TODO: Add integration tests to ensure side effects trigger broadcasts in general +end