refactor(portal): Move config_changed to WAL broadcaster (#9285)

Now that the WAL consumer has been dry running in production for some
time, we can begin moving events over to it.

We start with a relatively simple case: the account `config_changed`
event.

Since side effects now happen decoupled from the actual record updates,
testing is updated in this PR:

- We don't expect broadcasts to happen in the `accounts_test.exs` -
these context modules are now solely responsible for managing updates to
records and will no longer need to worry about side effects (in the
typical case) like subscribe and broadcast
- The Event hooks module now contains all logic related to processing
side effects for a particular account update.

The net effect is that we now have dedicated module and tests for side
effects, starting with `accounts`.

Related: #6294 
Related: #8187
This commit is contained in:
Jamil
2025-05-28 11:23:48 -07:00
committed by GitHub
parent 1358da189d
commit 8d701efe4b
8 changed files with 65 additions and 68 deletions

View File

@@ -1,7 +1,7 @@
defmodule API.Client.Channel do
use API, :channel
alias API.Client.Views
alias Domain.{Accounts, Clients, Actors, Resources, Gateways, Relays, Policies, Flows}
alias Domain.{Accounts, Clients, Actors, Events, Resources, Gateways, Relays, Policies, Flows}
alias Domain.Relays.Presence.Debouncer
require Logger
require OpenTelemetry.Tracer
@@ -127,7 +127,7 @@ defmodule API.Client.Channel do
:ok = Clients.connect_client(socket.assigns.client)
# Subscribe for account config updates
:ok = Accounts.subscribe_to_events_in_account(socket.assigns.client.account_id)
:ok = Events.Hooks.Accounts.subscribe(socket.assigns.client.account_id)
# We subscribe for membership updates for all actor groups the client is a member of,
:ok = Actors.subscribe_to_membership_updates_for_actor(socket.assigns.subject.actor)

View File

@@ -1,6 +1,6 @@
defmodule Domain.Accounts do
alias Web.Settings.Account
alias Domain.{Repo, Config, PubSub}
alias Domain.{Repo, Config}
alias Domain.{Auth, Billing}
alias Domain.Accounts.{Account, Features, Authorizer}
@@ -109,13 +109,6 @@ defmodule Domain.Accounts do
defp on_account_update(account, changeset) do
:ok = Billing.on_account_update(account, changeset)
# TODO: WAL
if Ecto.Changeset.changed?(changeset, :config) do
broadcast_config_update_to_account(account)
else
:ok
end
end
for feature <- Features.__schema__(:fields) do
@@ -161,28 +154,4 @@ defmodule Domain.Accounts do
def type(%Account{}) do
"Starter"
end
### PubSub
defp account_topic(%Account{} = account), do: account_topic(account.id)
defp account_topic(account_id), do: "accounts:#{account_id}"
def subscribe_to_events_in_account(account_or_id) do
account_or_id |> account_topic() |> PubSub.subscribe()
end
def unsubscribe_from_events_in_account(account_or_id) do
account_or_id |> account_topic() |> PubSub.unsubscribe()
end
# TODO: WAL
defp broadcast_config_update_to_account(%Account{} = account) do
broadcast_to_account(account.id, :config_changed)
end
defp broadcast_to_account(account_or_id, payload) do
account_or_id
|> account_topic()
|> PubSub.broadcast(payload)
end
end

View File

@@ -754,6 +754,7 @@ defmodule Domain.Actors do
end
def broadcast_membership_event(action, actor_id, group_id) do
# TODO: WAL - this is an n+1 problem
:ok = Policies.broadcast_access_events_for(action, actor_id, group_id)
actor_id

View File

@@ -1,13 +1,30 @@
defmodule Domain.Events.Hooks.Accounts do
alias Domain.PubSub
require Logger
def on_insert(_data) do
:ok
end
def on_update(_old_data, _data) do
:ok
def on_update(%{"config" => old_config}, %{"config" => config, "id" => account_id}) do
if old_config != config do
broadcast(account_id, :config_changed)
else
:ok
end
end
def on_delete(_old_data) do
:ok
end
def subscribe(account_id) do
PubSub.subscribe("accounts:#{account_id}")
end
# No unsubscribe needed - account deletions destroy any subscribed entities
defp broadcast(account_id, event) do
PubSub.broadcast("accounts:#{account_id}", event)
end
end

View File

@@ -400,8 +400,6 @@ defmodule Domain.AccountsTest do
}
}
:ok = subscribe_to_events_in_account(account)
assert {:ok, account} = update_account(account, attrs, subject)
assert account.name == attrs.name
@@ -424,8 +422,6 @@ defmodule Domain.AccountsTest do
address: "8.8.8.8"
}
]
assert_receive :config_changed
end
test "returns an error when trying to update other account", %{
@@ -719,8 +715,6 @@ defmodule Domain.AccountsTest do
}
}
:ok = subscribe_to_events_in_account(account)
assert {:ok, account} = update_account_by_id(account.id, attrs)
assert account.name == attrs.name
@@ -750,8 +744,6 @@ defmodule Domain.AccountsTest do
address: "8.8.8.8"
}
]
assert_receive :config_changed
end
test "broadcasts disconnect message for the clients when account is disabled", %{

View File

@@ -21,27 +21,5 @@ defmodule Domain.Events.EventTest do
assert :ok == ingest(msg, relations)
end
end
test "returns :ok for update on all configured table subscriptions", %{
table_subscriptions: table_subscriptions
} do
for table <- table_subscriptions do
relations = %{"1" => %{name: table, columns: []}}
msg = %Decoder.Messages.Update{old_tuple_data: {}, tuple_data: {}, relation_id: "1"}
assert :ok == ingest(msg, relations)
end
end
test "returns :ok for delete on all configured table subscriptions", %{
table_subscriptions: table_subscriptions
} do
for table <- table_subscriptions do
relations = %{"1" => %{name: table, columns: []}}
msg = %Decoder.Messages.Delete{old_tuple_data: {}, relation_id: "1"}
assert :ok == ingest(msg, relations)
end
end
end
end

View File

@@ -1,5 +1,5 @@
defmodule Domain.Events.Hooks.AccountsTest do
use ExUnit.Case, async: true
use Domain.DataCase, async: true
import Domain.Events.Hooks.Accounts
setup do
@@ -13,8 +13,45 @@ defmodule Domain.Events.Hooks.AccountsTest do
end
describe "update/2" do
test "returns :ok", %{old_data: old_data, data: data} do
test "sends :config_changed if config changes" do
account = Fixtures.Accounts.create_account()
:ok = subscribe(account.id)
old_data = %{
"id" => account.id,
"config" => %{"search_domain" => "old_value", "clients_upstream_dns" => []}
}
data = %{
"id" => account.id,
"config" => %{
"search_domain" => "new_value",
"clients_upstream_dns" => [%{"protocol" => "ip_port", "address" => "8.8.8.8"}]
}
}
assert :ok == on_update(old_data, data)
assert_receive :config_changed
end
test "does not send :config_changed if config does not change" do
account = Fixtures.Accounts.create_account()
:ok = subscribe(account.id)
old_data = %{
"id" => account.id,
"config" => %{"search_domain" => "old_value", "clients_upstream_dns" => []}
}
data = %{
"id" => account.id,
"config" => %{"search_domain" => "old_value", "clients_upstream_dns" => []}
}
assert :ok == on_update(old_data, data)
refute_receive :config_changed
end
end

View File

@@ -0,0 +1,3 @@
defmodule Domain.EventsTest do
# TODO: Add integration tests to ensure side effects trigger broadcasts in general
end