diff --git a/elixir/apps/api/lib/api/client/channel.ex b/elixir/apps/api/lib/api/client/channel.ex index 07a4bcb24..dd9d389af 100644 --- a/elixir/apps/api/lib/api/client/channel.ex +++ b/elixir/apps/api/lib/api/client/channel.ex @@ -124,7 +124,7 @@ defmodule API.Client.Channel do OpenTelemetry.Tracer.set_current_span(opentelemetry_span_ctx) OpenTelemetry.Tracer.with_span "client.after_join" do - :ok = Clients.connect_client(socket.assigns.client) + :ok = Events.Hooks.Clients.connect(socket.assigns.client) # Subscribe for account config updates :ok = Events.Hooks.Accounts.subscribe(socket.assigns.client.account_id) diff --git a/elixir/apps/api/lib/api/gateway/channel.ex b/elixir/apps/api/lib/api/gateway/channel.ex index 8f92b3a8e..d6bfe8fcd 100644 --- a/elixir/apps/api/lib/api/gateway/channel.ex +++ b/elixir/apps/api/lib/api/gateway/channel.ex @@ -599,7 +599,7 @@ defmodule API.Gateway.Channel do :ok = Enum.each(client_ids, fn client_id -> - Clients.broadcast_to_client( + Events.Hooks.Clients.broadcast( client_id, {:ice_candidates, socket.assigns.gateway.id, candidates, {opentelemetry_ctx, opentelemetry_span_ctx}} @@ -624,7 +624,7 @@ defmodule API.Gateway.Channel do :ok = Enum.each(client_ids, fn client_id -> - Clients.broadcast_to_client( + Events.Hooks.Clients.broadcast( client_id, {:invalidate_ice_candidates, socket.assigns.gateway.id, candidates, {opentelemetry_ctx, opentelemetry_span_ctx}} diff --git a/elixir/apps/api/test/api/client/channel_test.exs b/elixir/apps/api/test/api/client/channel_test.exs index c587f5121..4a6fdeb00 100644 --- a/elixir/apps/api/test/api/client/channel_test.exs +++ b/elixir/apps/api/test/api/client/channel_test.exs @@ -1,5 +1,6 @@ defmodule API.Client.ChannelTest do use API.ChannelCase, async: true + alias Domain.Events setup do account = @@ -167,7 +168,7 @@ defmodule API.Client.ChannelTest do describe "join/3" do test "tracks presence after join", %{account: account, client: client} do presence = - Domain.Clients.Presence.list(Domain.Clients.account_clients_presence_topic(account)) + Domain.Clients.Presence.list(Events.Hooks.Accounts.clients_presence_topic(account.id)) assert %{metas: [%{online_at: online_at, phx_ref: _ref}]} = Map.fetch!(presence, client.id) assert is_number(online_at) @@ -499,7 +500,7 @@ defmodule API.Client.ChannelTest do } do assert_push "init", %{} Process.flag(:trap_exit, true) - Domain.Clients.broadcast_to_client(client, :token_expired) + Events.Hooks.Clients.broadcast(client.id, :token_expired) assert_push "disconnect", %{reason: :token_expired}, 250 end diff --git a/elixir/apps/api/test/api/gateway/channel_test.exs b/elixir/apps/api/test/api/gateway/channel_test.exs index 525ee7f35..b5579bee5 100644 --- a/elixir/apps/api/test/api/gateway/channel_test.exs +++ b/elixir/apps/api/test/api/gateway/channel_test.exs @@ -1,5 +1,6 @@ defmodule API.Gateway.ChannelTest do use API.ChannelCase, async: true + alias Domain.Events setup do account = Fixtures.Accounts.create_account() @@ -1094,7 +1095,7 @@ defmodule API.Gateway.ChannelTest do "client_ids" => [client.id] } - :ok = Domain.Clients.connect_client(client) + :ok = Events.Hooks.Clients.connect(client) Domain.PubSub.subscribe(Domain.Tokens.socket_id(subject.token_id)) push(socket, "broadcast_ice_candidates", attrs) @@ -1132,7 +1133,7 @@ defmodule API.Gateway.ChannelTest do "client_ids" => [client.id] } - :ok = Domain.Clients.connect_client(client) + :ok = Events.Hooks.Clients.connect(client) Domain.PubSub.subscribe(Domain.Tokens.socket_id(subject.token_id)) push(socket, "broadcast_invalidated_ice_candidates", attrs) diff --git a/elixir/apps/domain/lib/domain/accounts.ex b/elixir/apps/domain/lib/domain/accounts.ex index 89af8613a..382934976 100644 --- a/elixir/apps/domain/lib/domain/accounts.ex +++ b/elixir/apps/domain/lib/domain/accounts.ex @@ -93,18 +93,6 @@ defmodule Domain.Accounts do with: &Account.Changeset.update(&1, attrs), after_commit: &on_account_update/2 ) - |> case do - {:ok, %{disabled_at: nil} = account} -> - {:ok, account} - - {:ok, account} -> - # TODO: WAL - :ok = Domain.Clients.disconnect_account_clients(account) - {:ok, account} - - {:error, reason} -> - {:error, reason} - end end defp on_account_update(account, changeset) do diff --git a/elixir/apps/domain/lib/domain/clients.ex b/elixir/apps/domain/lib/domain/clients.ex index cbd74b739..8d7c5eefb 100644 --- a/elixir/apps/domain/lib/domain/clients.ex +++ b/elixir/apps/domain/lib/domain/clients.ex @@ -1,7 +1,7 @@ defmodule Domain.Clients do use Supervisor - alias Domain.{Repo, Auth, PubSub} - alias Domain.{Accounts, Actors, Flows} + alias Domain.{Repo, Auth} + alias Domain.{Accounts, Actors, Events, Flows} alias Domain.Clients.{Client, Authorizer, Presence} require Ecto.Query @@ -107,7 +107,7 @@ defmodule Domain.Clients do @doc false def preload_clients_presence([client]) do client.account_id - |> account_clients_presence_topic() + |> Events.Hooks.Accounts.clients_presence_topic() |> Presence.get_by_key(client.id) |> case do [] -> %{client | online?: false} @@ -135,7 +135,7 @@ defmodule Domain.Clients do def online_client_ids(account_id) do account_id - |> account_clients_presence_topic() + |> Events.Hooks.Accounts.clients_presence_topic() |> Presence.list() |> Map.keys() end @@ -188,15 +188,6 @@ defmodule Domain.Clients do with: &Client.Changeset.update(&1, attrs), preload: [:online?] ) - # TODO: WAL - |> case do - {:ok, client} -> - :ok = broadcast_to_client(client, :updated) - {:ok, client} - - {:error, reason} -> - {:error, reason} - end end end @@ -213,8 +204,6 @@ defmodule Domain.Clients do |> case do {:ok, client} -> client = Repo.preload(client, [:verified_by_actor, :verified_by_identity]) - # TODO: WAL - :ok = broadcast_to_client(client, :updated) {:ok, client} {:error, reason} -> @@ -236,8 +225,6 @@ defmodule Domain.Clients do |> case do {:ok, client} -> {:ok, _flows} = Flows.expire_flows_for(client) - # TODO: WAL - :ok = broadcast_to_client(client, :updated) {:ok, client} {:error, reason} -> @@ -254,8 +241,6 @@ defmodule Domain.Clients do with :ok <- authorize_actor_client_management(client.actor_id, subject) do case delete_clients(queryable, subject) do {:ok, [client]} -> - # TODO: WAL - :ok = disconnect_client(client) {:ok, client} {:ok, []} -> @@ -270,10 +255,8 @@ defmodule Domain.Clients do |> Client.Query.by_actor_id(actor.id) |> Client.Query.by_account_id(actor.account_id) - with :ok <- Auth.ensure_has_permissions(subject, Authorizer.manage_clients_permission()), - {:ok, _clients} <- delete_clients(queryable, subject) do - # TODO: WAL - :ok = disconnect_actor_clients(actor) + with :ok <- Auth.ensure_has_permissions(subject, Authorizer.manage_clients_permission()) do + {:ok, _clients} = delete_clients(queryable, subject) :ok end end @@ -285,9 +268,6 @@ defmodule Domain.Clients do |> Client.Query.delete() |> Repo.update_all([]) - # TODO: WAL - :ok = Enum.each(clients, &disconnect_client/1) - {:ok, clients} end @@ -302,86 +282,4 @@ defmodule Domain.Clients do defp authorize_actor_client_management(_actor_id, %Auth.Subject{} = subject) do Auth.ensure_has_permissions(subject, Authorizer.manage_clients_permission()) end - - def connect_client(%Client{} = client) do - with {:ok, _} <- - Presence.track(self(), account_clients_presence_topic(client.account_id), client.id, %{ - online_at: System.system_time(:second) - }), - {:ok, _} <- - Presence.track(self(), actor_clients_presence_topic(client.actor_id), client.id, %{}) do - :ok = PubSub.subscribe(client_topic(client)) - # TODO: WAL - # :ok = PubSub.subscribe(actor_clients_topic(client.actor_id)) - # :ok = PubSub.subscribe(identity_topic(client.actor_id)) - :ok = PubSub.subscribe(account_clients_topic(client.account_id)) - :ok - end - end - - ### Presence - - def account_clients_presence_topic(account_or_id), - do: "presences:#{account_clients_topic(account_or_id)}" - - defp actor_clients_presence_topic(actor_or_id), - do: "presences:#{actor_clients_topic(actor_or_id)}" - - ### PubSub - - defp client_topic(%Client{} = client), do: client_topic(client.id) - defp client_topic(client_id), do: "clients:#{client_id}" - - defp account_clients_topic(%Accounts.Account{} = account), do: account_clients_topic(account.id) - defp account_clients_topic(account_id), do: "account_clients:#{account_id}" - - defp actor_clients_topic(%Actors.Actor{} = actor), do: actor_clients_topic(actor.id) - defp actor_clients_topic(actor_id), do: "actor_clients:#{actor_id}" - - def subscribe_to_clients_presence_in_account(account_or_id) do - PubSub.subscribe(account_clients_presence_topic(account_or_id)) - end - - def unsubscribe_from_clients_presence_in_account(account_or_id) do - PubSub.unsubscribe(account_clients_presence_topic(account_or_id)) - end - - def subscribe_to_clients_presence_for_actor(actor_or_id) do - PubSub.subscribe(actor_clients_presence_topic(actor_or_id)) - end - - def unsubscribe_from_clients_presence_for_actor(actor_or_id) do - PubSub.unsubscribe(actor_clients_presence_topic(actor_or_id)) - end - - # TODO: WAL - def broadcast_to_account_clients(account_or_id, payload) do - account_or_id - |> account_clients_topic() - |> PubSub.broadcast(payload) - end - - def broadcast_to_client(client_or_id, payload) do - client_or_id - |> client_topic() - |> PubSub.broadcast(payload) - end - - defp broadcast_to_actor_clients(actor_or_id, payload) do - actor_or_id - |> actor_clients_topic() - |> PubSub.broadcast(payload) - end - - def disconnect_client(client_or_id) do - broadcast_to_client(client_or_id, "disconnect") - end - - def disconnect_actor_clients(actor_or_id) do - broadcast_to_actor_clients(actor_or_id, "disconnect") - end - - def disconnect_account_clients(account_or_id) do - broadcast_to_account_clients(account_or_id, "disconnect") - end end diff --git a/elixir/apps/domain/lib/domain/events/hooks/accounts.ex b/elixir/apps/domain/lib/domain/events/hooks/accounts.ex index 7a39df97b..feaa6adde 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/accounts.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/accounts.ex @@ -6,6 +6,15 @@ defmodule Domain.Events.Hooks.Accounts do :ok end + # Account disabled - disconnect clients + def on_update( + %{"disabled_at" => nil} = _old_data, + %{"disabled_at" => disabled_at, "id" => account_id} = _data + ) + when not is_nil(disabled_at) do + disconnect_clients(account_id) + end + def on_update(%{"config" => old_config}, %{"config" => config, "id" => account_id}) do if old_config != config do broadcast(account_id, :config_changed) @@ -22,9 +31,41 @@ defmodule Domain.Events.Hooks.Accounts do PubSub.subscribe("accounts:#{account_id}") end + def subscribe_to_clients(account_id) do + account_id + |> clients_topic() + |> PubSub.subscribe() + end + + def subscribe_to_clients_presence(account_id) do + account_id + |> clients_presence_topic() + |> PubSub.subscribe() + end + # No unsubscribe needed - account deletions destroy any subscribed entities + def clients_presence_topic(account_or_id) do + "presences:#{clients_topic(account_or_id)}" + end + + def clients_topic(account_id) do + "account_clients:#{account_id}" + end + + defp topic(account_id) do + "accounts:#{account_id}" + end + defp broadcast(account_id, event) do - PubSub.broadcast("accounts:#{account_id}", event) + account_id + |> topic() + |> PubSub.broadcast(event) + end + + defp disconnect_clients(account_id) do + account_id + |> clients_topic() + |> PubSub.broadcast("disconnect") end end diff --git a/elixir/apps/domain/lib/domain/events/hooks/actors.ex b/elixir/apps/domain/lib/domain/events/hooks/actors.ex index 63aa8f68f..48e7dc931 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/actors.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/actors.ex @@ -1,4 +1,6 @@ defmodule Domain.Events.Hooks.Actors do + alias Domain.PubSub + def on_insert(_data) do :ok end @@ -10,4 +12,18 @@ defmodule Domain.Events.Hooks.Actors do def on_delete(_old_data) do :ok end + + def subscribe_to_clients_presence(actor_id) do + actor_id + |> clients_presence_topic() + |> PubSub.subscribe() + end + + def clients_presence_topic(actor_id) do + "presences:#{clients_topic(actor_id)}" + end + + defp clients_topic(actor_id) do + "actor_clients:#{actor_id}" + end end diff --git a/elixir/apps/domain/lib/domain/events/hooks/clients.ex b/elixir/apps/domain/lib/domain/events/hooks/clients.ex index 9710e53d5..912487837 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/clients.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/clients.ex @@ -1,13 +1,61 @@ defmodule Domain.Events.Hooks.Clients do + alias Domain.PubSub + alias Domain.Clients.{Client, Presence} + alias Domain.Events + def on_insert(_data) do :ok end - def on_update(_old_data, _data) do - :ok + # Soft-delete + def on_update(%{"deleted_at" => nil} = old_data, %{"deleted_at" => deleted_at} = _data) + when not is_nil(deleted_at) do + on_delete(old_data) end - def on_delete(_old_data) do - :ok + # Regular update + def on_update(_old_data, %{"id" => client_id} = _data) do + broadcast(client_id, :updated) + end + + def on_delete(%{"id" => client_id} = _old_data) do + disconnect_client(client_id) + end + + def connect(%Client{} = client) do + with {:ok, _} <- + Presence.track( + self(), + Events.Hooks.Accounts.clients_presence_topic(client.account_id), + client.id, + %{ + online_at: System.system_time(:second) + } + ), + {:ok, _} <- + Presence.track( + self(), + Events.Hooks.Actors.clients_presence_topic(client.actor_id), + client.id, + %{} + ) do + :ok = PubSub.subscribe(topic(client.id)) + :ok = Events.Hooks.Accounts.subscribe_to_clients(client.account_id) + :ok + end + end + + ### PubSub + + def broadcast(client_id, payload) do + client_id + |> topic() + |> PubSub.broadcast(payload) + end + + defp topic(client_id), do: "clients:#{client_id}" + + defp disconnect_client(client_id) do + broadcast(client_id, "disconnect") end end diff --git a/elixir/apps/domain/test/domain/accounts_test.exs b/elixir/apps/domain/test/domain/accounts_test.exs index b03031dc6..96f42a43c 100644 --- a/elixir/apps/domain/test/domain/accounts_test.exs +++ b/elixir/apps/domain/test/domain/accounts_test.exs @@ -746,18 +746,14 @@ defmodule Domain.AccountsTest do ] end - test "broadcasts disconnect message for the clients when account is disabled", %{ + test "allows disabling an account", %{ account: account } do attrs = %{ disabled_at: DateTime.utc_now() } - :ok = Domain.PubSub.subscribe("account_clients:#{account.id}") - assert {:ok, _account} = update_account_by_id(account.id, attrs) - - assert_receive "disconnect" end end diff --git a/elixir/apps/domain/test/domain/actors_test.exs b/elixir/apps/domain/test/domain/actors_test.exs index 835b14b5f..777eac0d0 100644 --- a/elixir/apps/domain/test/domain/actors_test.exs +++ b/elixir/apps/domain/test/domain/actors_test.exs @@ -4,6 +4,7 @@ defmodule Domain.ActorsTest do alias Domain.Auth alias Domain.Clients alias Domain.Actors + alias Domain.Events describe "fetch_groups_count_grouped_by_provider_id/1" do test "returns empty map when there are no groups" do @@ -631,7 +632,7 @@ defmodule Domain.ActorsTest do } do actor = Fixtures.Actors.create_actor(account: account) client = Fixtures.Clients.create_client(account: account, actor: actor) - Domain.Clients.connect_client(client) + Events.Hooks.Clients.connect(client) assert {:ok, peek} = peek_actor_clients([actor], 3, subject) assert [%Clients.Client{} = client] = peek[actor.id].items diff --git a/elixir/apps/domain/test/domain/clients_test.exs b/elixir/apps/domain/test/domain/clients_test.exs index 47ec95c78..a60502ae8 100644 --- a/elixir/apps/domain/test/domain/clients_test.exs +++ b/elixir/apps/domain/test/domain/clients_test.exs @@ -1,7 +1,7 @@ defmodule Domain.ClientsTest do use Domain.DataCase, async: true import Domain.Clients - alias Domain.Clients + alias Domain.{Clients, Events} setup do account = Fixtures.Accounts.create_account() @@ -117,7 +117,7 @@ defmodule Domain.ClientsTest do assert {:ok, client} = fetch_client_by_id(client.id, subject, preload: [:online?]) assert client.online? == false - assert connect_client(client) == :ok + assert Events.Hooks.Clients.connect(client) == :ok assert {:ok, client} = fetch_client_by_id(client.id, subject, preload: [:online?]) assert client.online? == true end @@ -222,7 +222,7 @@ defmodule Domain.ClientsTest do assert client = fetch_client_by_id!(client.id, preload: [:online?]) assert client.online? == false - assert connect_client(client) == :ok + assert Events.Hooks.Clients.connect(client) == :ok assert client = fetch_client_by_id!(client.id, preload: [:online?]) assert client.online? == true end @@ -281,7 +281,7 @@ defmodule Domain.ClientsTest do assert {:ok, [client], _metadata} = list_clients(subject, preload: [:online?]) assert client.online? == false - assert connect_client(client) == :ok + assert Events.Hooks.Clients.connect(client) == :ok assert {:ok, [client], _metadata} = list_clients(subject, preload: [:online?]) assert client.online? == true end @@ -911,15 +911,12 @@ defmodule Domain.ClientsTest do unprivileged_subject: subject } do client = Fixtures.Clients.create_client(actor: actor) - :ok = Domain.PubSub.subscribe("clients:#{client.id}") attrs = %{name: "new name"} assert {:ok, client} = update_client(client, attrs, subject) assert client.name == attrs.name - - assert_receive :updated end test "does not allow unprivileged actor to update other actors clients", %{ @@ -1013,7 +1010,6 @@ defmodule Domain.ClientsTest do describe "verify_client/2" do test "allows admin actor to verify clients", %{admin_actor: actor, admin_subject: subject} do client = Fixtures.Clients.create_client(actor: actor) - :ok = Domain.PubSub.subscribe("clients:#{client.id}") assert {:ok, client} = verify_client(client, subject) assert client.verified_at @@ -1021,8 +1017,6 @@ defmodule Domain.ClientsTest do assert client.verified_by_actor_id == subject.actor.id assert client.verified_by_identity_id == subject.identity.id - assert_receive :updated - assert {:ok, double_verified_client} = verify_client(client, subject) assert double_verified_client.verified_at == client.verified_at end @@ -1053,7 +1047,6 @@ defmodule Domain.ClientsTest do admin_subject: subject } do client = Fixtures.Clients.create_client(actor: actor) - :ok = Domain.PubSub.subscribe("clients:#{client.id}") assert {:ok, client} = verify_client(client, subject) assert {:ok, client} = remove_client_verification(client, subject) @@ -1062,8 +1055,6 @@ defmodule Domain.ClientsTest do assert is_nil(client.verified_by) assert is_nil(client.verified_by_actor_id) assert is_nil(client.verified_by_identity_id) - - assert_receive :updated end test "expires flows for the unverified client", %{ @@ -1243,45 +1234,4 @@ defmodule Domain.ClientsTest do missing_permissions: [Clients.Authorizer.manage_clients_permission()]}} end end - - describe "connect_client/1" do - test "tracks client presence for account", %{account: account} do - client = Fixtures.Clients.create_client(account: account) - assert connect_client(client) == :ok - - client = fetch_client_by_id!(client.id, preload: [:online?]) - assert client.online? == true - end - - test "tracks client presence for actor", %{account: account} do - actor = Fixtures.Actors.create_actor(account: account) - client = Fixtures.Clients.create_client(account: account, actor: actor) - assert connect_client(client) == :ok - - assert broadcast_to_client(client, "test") == :ok - - assert_receive "test" - end - - test "subscribes to client events", %{account: account} do - actor = Fixtures.Actors.create_actor(account: account) - client = Fixtures.Clients.create_client(account: account, actor: actor) - assert connect_client(client) == :ok - - assert disconnect_client(client) == :ok - - assert_receive "disconnect" - end - - test "subscribes to account events", %{account: account} do - actor = Fixtures.Actors.create_actor(account: account) - client = Fixtures.Clients.create_client(account: account, actor: actor) - - assert connect_client(client) == :ok - - assert disconnect_account_clients(account) == :ok - - assert_receive "disconnect" - end - end end diff --git a/elixir/apps/domain/test/domain/events/hooks/accounts_test.exs b/elixir/apps/domain/test/domain/events/hooks/accounts_test.exs index 6d9176dbc..b5fe5e4f2 100644 --- a/elixir/apps/domain/test/domain/events/hooks/accounts_test.exs +++ b/elixir/apps/domain/test/domain/events/hooks/accounts_test.exs @@ -53,6 +53,19 @@ defmodule Domain.Events.Hooks.AccountsTest do assert :ok == on_update(old_data, data) refute_receive :config_changed end + + test "sends disconnect to clients if account is disabled" do + account_id = Fixtures.Accounts.create_account().id + + old_data = %{"id" => account_id, "disabled_at" => nil} + data = %{"id" => account_id, "disabled_at" => DateTime.utc_now()} + + :ok = subscribe_to_clients(account_id) + + assert :ok == on_update(old_data, data) + + assert_receive "disconnect" + end end describe "delete/1" do diff --git a/elixir/apps/domain/test/domain/events/hooks/clients_test.exs b/elixir/apps/domain/test/domain/events/hooks/clients_test.exs index 28a7c9995..58c796252 100644 --- a/elixir/apps/domain/test/domain/events/hooks/clients_test.exs +++ b/elixir/apps/domain/test/domain/events/hooks/clients_test.exs @@ -1,5 +1,5 @@ defmodule Domain.Events.Hooks.ClientsTest do - use ExUnit.Case, async: true + use Domain.DataCase, async: true import Domain.Events.Hooks.Clients setup do @@ -13,14 +13,77 @@ defmodule Domain.Events.Hooks.ClientsTest do end describe "update/2" do - test "returns :ok", %{old_data: old_data, data: data} do + test "soft-delete broadcasts disconnect" do + client = Fixtures.Clients.create_client() + :ok = connect(client) + + old_data = %{"id" => client.id, "deleted_at" => nil} + data = %{"id" => client.id, "deleted_at" => DateTime.utc_now()} + assert :ok == on_update(old_data, data) + + assert_receive "disconnect" + refute_receive :updated + end + + test "update broadcasts :update" do + client = Fixtures.Clients.create_client() + :ok = connect(client) + + old_data = %{"id" => client.id, "name" => "Old Client"} + data = %{"id" => client.id, "name" => "Updated Client"} + + assert :ok == on_update(old_data, data) + + assert_receive :updated + refute_receive "disconnect" end end describe "delete/1" do - test "returns :ok", %{data: data} do - assert :ok == on_delete(data) + test "broadcasts disconnect" do + client = Fixtures.Clients.create_client() + :ok = connect(client) + + old_data = %{"id" => client.id} + + assert :ok == on_delete(old_data) + + assert_receive "disconnect" + refute_receive :updated + end + end + + describe "connect/1" do + test "tracks client presence and subscribes to topics" do + client = Fixtures.Clients.create_client() + assert :ok == connect(client) + + assert client.account_id + |> Domain.Events.Hooks.Accounts.clients_presence_topic() + |> Domain.Clients.Presence.get_by_key(client.id) + + assert client.actor_id + |> Domain.Events.Hooks.Actors.clients_presence_topic() + |> Domain.Clients.Presence.get_by_key(client.id) + + Domain.PubSub.broadcast( + Domain.Events.Hooks.Accounts.clients_topic(client.account_id), + :test_event + ) + + assert_receive :test_event + end + end + + describe "broadcast/2" do + test "broadcasts payload to client topic" do + client = Fixtures.Clients.create_client() + :ok = connect(client) + + assert :ok == broadcast(client.id, :updated) + + assert_receive :updated end end end diff --git a/elixir/apps/web/lib/web/live/actors/show.ex b/elixir/apps/web/lib/web/live/actors/show.ex index 768121130..2c4778646 100644 --- a/elixir/apps/web/lib/web/live/actors/show.ex +++ b/elixir/apps/web/lib/web/live/actors/show.ex @@ -2,7 +2,7 @@ defmodule Web.Actors.Show do use Web, :live_view import Web.Actors.Components import Web.Clients.Components - alias Domain.{Accounts, Auth, Tokens, Flows, Clients} + alias Domain.{Accounts, Auth, Events, Tokens, Flows, Clients} alias Domain.Actors def mount(%{"id" => id}, _session, socket) do @@ -10,7 +10,9 @@ defmodule Web.Actors.Show do Actors.fetch_actor_by_id(id, socket.assigns.subject, preload: [:identities, :last_seen_at, groups: [:provider]] ) do - :ok = Clients.subscribe_to_clients_presence_for_actor(actor) + if connected?(socket) do + :ok = Events.Hooks.Actors.subscribe_to_clients_presence(actor.id) + end available_providers = Auth.all_active_providers_for_account!(socket.assigns.account) diff --git a/elixir/apps/web/lib/web/live/clients/index.ex b/elixir/apps/web/lib/web/live/clients/index.ex index 401fe1626..db4a2e241 100644 --- a/elixir/apps/web/lib/web/live/clients/index.ex +++ b/elixir/apps/web/lib/web/live/clients/index.ex @@ -3,10 +3,11 @@ defmodule Web.Clients.Index do import Web.Actors.Components import Web.Clients.Components alias Domain.Clients + alias Domain.Events def mount(_params, _session, socket) do if connected?(socket) do - :ok = Clients.subscribe_to_clients_presence_in_account(socket.assigns.subject.account) + :ok = Events.Hooks.Accounts.subscribe_to_clients_presence(socket.assigns.subject.account.id) end socket = diff --git a/elixir/apps/web/lib/web/live/clients/show.ex b/elixir/apps/web/lib/web/live/clients/show.ex index f94da017a..ac98e0fe2 100644 --- a/elixir/apps/web/lib/web/live/clients/show.ex +++ b/elixir/apps/web/lib/web/live/clients/show.ex @@ -2,7 +2,7 @@ defmodule Web.Clients.Show do use Web, :live_view import Web.Policies.Components import Web.Clients.Components - alias Domain.{Accounts, Clients, Flows} + alias Domain.{Accounts, Clients, Events, Flows} def mount(%{"id" => id}, _session, socket) do with {:ok, client} <- @@ -16,7 +16,7 @@ defmodule Web.Clients.Show do ] ) do if connected?(socket) do - :ok = Clients.subscribe_to_clients_presence_for_actor(client.actor) + :ok = Events.Hooks.Actors.subscribe_to_clients_presence(client.actor_id) end socket = diff --git a/elixir/apps/web/test/web/live/actors/index_test.exs b/elixir/apps/web/test/web/live/actors/index_test.exs index 839aaf288..2c09d1d8d 100644 --- a/elixir/apps/web/test/web/live/actors/index_test.exs +++ b/elixir/apps/web/test/web/live/actors/index_test.exs @@ -60,7 +60,7 @@ defmodule Web.Live.Actors.IndexTest do admin_actor = Fixtures.Actors.create_actor(account: account, type: :account_admin_user) Fixtures.Actors.create_membership(account: account, actor: admin_actor) client = Fixtures.Clients.create_client(account: account, actor: admin_actor) - Domain.Clients.connect_client(client) + Domain.Events.Hooks.Clients.connect(client) admin_actor = Repo.preload(admin_actor, identities: [:provider], groups: []) user_actor = Fixtures.Actors.create_actor(account: account, type: :account_user) diff --git a/elixir/apps/web/test/web/live/actors/show_test.exs b/elixir/apps/web/test/web/live/actors/show_test.exs index af5e67a5f..e39297c44 100644 --- a/elixir/apps/web/test/web/live/actors/show_test.exs +++ b/elixir/apps/web/test/web/live/actors/show_test.exs @@ -1,5 +1,6 @@ defmodule Web.Live.Actors.ShowTest do use Web.ConnCase, async: true + alias Domain.Events test "redirects to sign in page for unauthorized user", %{conn: conn} do account = Fixtures.Accounts.create_account() @@ -91,8 +92,8 @@ defmodule Web.Live.Actors.ShowTest do |> live(~p"/#{account}/actors/#{actor}") Domain.Config.put_env_override(:test_pid, self()) - Domain.Clients.subscribe_to_clients_presence_for_actor(actor) - assert Domain.Clients.connect_client(client) == :ok + :ok = Events.Hooks.Actors.subscribe_to_clients_presence(actor.id) + assert Events.Hooks.Clients.connect(client) == :ok assert_receive %Phoenix.Socket.Broadcast{topic: "presences:actor_clients:" <> _} assert_receive {:live_table_reloaded, "clients"}, 500 diff --git a/elixir/apps/web/test/web/live/clients/index_test.exs b/elixir/apps/web/test/web/live/clients/index_test.exs index 7639bddf7..8c5f3766c 100644 --- a/elixir/apps/web/test/web/live/clients/index_test.exs +++ b/elixir/apps/web/test/web/live/clients/index_test.exs @@ -1,5 +1,6 @@ defmodule Web.Live.Clients.IndexTest do use Web.ConnCase, async: true + alias Domain.Events setup do account = Fixtures.Accounts.create_account() @@ -59,7 +60,7 @@ defmodule Web.Live.Clients.IndexTest do online_client = Fixtures.Clients.create_client(account: account) offline_client = Fixtures.Clients.create_client(account: account) - :ok = Domain.Clients.connect_client(online_client) + :ok = Events.Hooks.Clients.connect(online_client) {:ok, lv, _html} = conn @@ -102,8 +103,8 @@ defmodule Web.Live.Clients.IndexTest do |> live(~p"/#{account}/clients") Domain.Config.put_env_override(:test_pid, self()) - Domain.Clients.subscribe_to_clients_presence_for_actor(actor) - assert Domain.Clients.connect_client(client) == :ok + :ok = Events.Hooks.Actors.subscribe_to_clients_presence(client.actor_id) + assert Events.Hooks.Clients.connect(client) == :ok assert_receive %Phoenix.Socket.Broadcast{topic: "presences:actor_clients:" <> _} assert_receive {:live_table_reloaded, "clients"}, 250 diff --git a/elixir/apps/web/test/web/live/clients/show_test.exs b/elixir/apps/web/test/web/live/clients/show_test.exs index b1a60b53a..61253ddf9 100644 --- a/elixir/apps/web/test/web/live/clients/show_test.exs +++ b/elixir/apps/web/test/web/live/clients/show_test.exs @@ -1,5 +1,6 @@ defmodule Web.Live.Clients.ShowTest do use Web.ConnCase, async: true + alias Domain.Events setup do account = Fixtures.Accounts.create_account() @@ -116,7 +117,7 @@ defmodule Web.Live.Clients.ShowTest do identity: identity, conn: conn } do - :ok = Domain.Clients.connect_client(client) + :ok = Events.Hooks.Clients.connect(client) {:ok, lv, _html} = conn @@ -144,8 +145,8 @@ defmodule Web.Live.Clients.ShowTest do |> authorize_conn(identity) |> live(~p"/#{account}/clients/#{client}") - Domain.Clients.subscribe_to_clients_presence_for_actor(actor) - assert Domain.Clients.connect_client(client) == :ok + Events.Hooks.Actors.subscribe_to_clients_presence(actor.id) + assert Events.Hooks.Clients.connect(client) == :ok assert_receive %Phoenix.Socket.Broadcast{topic: "presences:actor_clients:" <> _} wait_for(fn ->