mirror of
https://github.com/outbackdingo/firezone.git
synced 2026-01-27 10:18:54 +00:00
refactor(portal): Move client updates to WAL broadcaster (#9288)
Client updates are next on the path to moving more side effects to the
WAL broadcaster. This one has the following notable changes:
- ~~The `actor_clients` pubsub topic were only used to broadcast removal
of clients belonging to an actor; these are no longer needed since we
handle this in the individual removal event~~ EDIT: only the presence is
kept
- The `account_clients:{account_id}` pubsub and presence topic
definition has been moved to `Events.Hooks.Accounts` because these are
broadcasted using the account_id field based on account changes, and
have nothing to do with the client lifecycle
Related: #6294
Related: #8187
This commit is contained in:
@@ -124,7 +124,7 @@ defmodule API.Client.Channel do
|
||||
OpenTelemetry.Tracer.set_current_span(opentelemetry_span_ctx)
|
||||
|
||||
OpenTelemetry.Tracer.with_span "client.after_join" do
|
||||
:ok = Clients.connect_client(socket.assigns.client)
|
||||
:ok = Events.Hooks.Clients.connect(socket.assigns.client)
|
||||
|
||||
# Subscribe for account config updates
|
||||
:ok = Events.Hooks.Accounts.subscribe(socket.assigns.client.account_id)
|
||||
|
||||
@@ -599,7 +599,7 @@ defmodule API.Gateway.Channel do
|
||||
|
||||
:ok =
|
||||
Enum.each(client_ids, fn client_id ->
|
||||
Clients.broadcast_to_client(
|
||||
Events.Hooks.Clients.broadcast(
|
||||
client_id,
|
||||
{:ice_candidates, socket.assigns.gateway.id, candidates,
|
||||
{opentelemetry_ctx, opentelemetry_span_ctx}}
|
||||
@@ -624,7 +624,7 @@ defmodule API.Gateway.Channel do
|
||||
|
||||
:ok =
|
||||
Enum.each(client_ids, fn client_id ->
|
||||
Clients.broadcast_to_client(
|
||||
Events.Hooks.Clients.broadcast(
|
||||
client_id,
|
||||
{:invalidate_ice_candidates, socket.assigns.gateway.id, candidates,
|
||||
{opentelemetry_ctx, opentelemetry_span_ctx}}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
defmodule API.Client.ChannelTest do
|
||||
use API.ChannelCase, async: true
|
||||
alias Domain.Events
|
||||
|
||||
setup do
|
||||
account =
|
||||
@@ -167,7 +168,7 @@ defmodule API.Client.ChannelTest do
|
||||
describe "join/3" do
|
||||
test "tracks presence after join", %{account: account, client: client} do
|
||||
presence =
|
||||
Domain.Clients.Presence.list(Domain.Clients.account_clients_presence_topic(account))
|
||||
Domain.Clients.Presence.list(Events.Hooks.Accounts.clients_presence_topic(account.id))
|
||||
|
||||
assert %{metas: [%{online_at: online_at, phx_ref: _ref}]} = Map.fetch!(presence, client.id)
|
||||
assert is_number(online_at)
|
||||
@@ -499,7 +500,7 @@ defmodule API.Client.ChannelTest do
|
||||
} do
|
||||
assert_push "init", %{}
|
||||
Process.flag(:trap_exit, true)
|
||||
Domain.Clients.broadcast_to_client(client, :token_expired)
|
||||
Events.Hooks.Clients.broadcast(client.id, :token_expired)
|
||||
assert_push "disconnect", %{reason: :token_expired}, 250
|
||||
end
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
defmodule API.Gateway.ChannelTest do
|
||||
use API.ChannelCase, async: true
|
||||
alias Domain.Events
|
||||
|
||||
setup do
|
||||
account = Fixtures.Accounts.create_account()
|
||||
@@ -1094,7 +1095,7 @@ defmodule API.Gateway.ChannelTest do
|
||||
"client_ids" => [client.id]
|
||||
}
|
||||
|
||||
:ok = Domain.Clients.connect_client(client)
|
||||
:ok = Events.Hooks.Clients.connect(client)
|
||||
Domain.PubSub.subscribe(Domain.Tokens.socket_id(subject.token_id))
|
||||
|
||||
push(socket, "broadcast_ice_candidates", attrs)
|
||||
@@ -1132,7 +1133,7 @@ defmodule API.Gateway.ChannelTest do
|
||||
"client_ids" => [client.id]
|
||||
}
|
||||
|
||||
:ok = Domain.Clients.connect_client(client)
|
||||
:ok = Events.Hooks.Clients.connect(client)
|
||||
Domain.PubSub.subscribe(Domain.Tokens.socket_id(subject.token_id))
|
||||
|
||||
push(socket, "broadcast_invalidated_ice_candidates", attrs)
|
||||
|
||||
@@ -93,18 +93,6 @@ defmodule Domain.Accounts do
|
||||
with: &Account.Changeset.update(&1, attrs),
|
||||
after_commit: &on_account_update/2
|
||||
)
|
||||
|> case do
|
||||
{:ok, %{disabled_at: nil} = account} ->
|
||||
{:ok, account}
|
||||
|
||||
{:ok, account} ->
|
||||
# TODO: WAL
|
||||
:ok = Domain.Clients.disconnect_account_clients(account)
|
||||
{:ok, account}
|
||||
|
||||
{:error, reason} ->
|
||||
{:error, reason}
|
||||
end
|
||||
end
|
||||
|
||||
defp on_account_update(account, changeset) do
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
defmodule Domain.Clients do
|
||||
use Supervisor
|
||||
alias Domain.{Repo, Auth, PubSub}
|
||||
alias Domain.{Accounts, Actors, Flows}
|
||||
alias Domain.{Repo, Auth}
|
||||
alias Domain.{Accounts, Actors, Events, Flows}
|
||||
alias Domain.Clients.{Client, Authorizer, Presence}
|
||||
require Ecto.Query
|
||||
|
||||
@@ -107,7 +107,7 @@ defmodule Domain.Clients do
|
||||
@doc false
|
||||
def preload_clients_presence([client]) do
|
||||
client.account_id
|
||||
|> account_clients_presence_topic()
|
||||
|> Events.Hooks.Accounts.clients_presence_topic()
|
||||
|> Presence.get_by_key(client.id)
|
||||
|> case do
|
||||
[] -> %{client | online?: false}
|
||||
@@ -135,7 +135,7 @@ defmodule Domain.Clients do
|
||||
|
||||
def online_client_ids(account_id) do
|
||||
account_id
|
||||
|> account_clients_presence_topic()
|
||||
|> Events.Hooks.Accounts.clients_presence_topic()
|
||||
|> Presence.list()
|
||||
|> Map.keys()
|
||||
end
|
||||
@@ -188,15 +188,6 @@ defmodule Domain.Clients do
|
||||
with: &Client.Changeset.update(&1, attrs),
|
||||
preload: [:online?]
|
||||
)
|
||||
# TODO: WAL
|
||||
|> case do
|
||||
{:ok, client} ->
|
||||
:ok = broadcast_to_client(client, :updated)
|
||||
{:ok, client}
|
||||
|
||||
{:error, reason} ->
|
||||
{:error, reason}
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -213,8 +204,6 @@ defmodule Domain.Clients do
|
||||
|> case do
|
||||
{:ok, client} ->
|
||||
client = Repo.preload(client, [:verified_by_actor, :verified_by_identity])
|
||||
# TODO: WAL
|
||||
:ok = broadcast_to_client(client, :updated)
|
||||
{:ok, client}
|
||||
|
||||
{:error, reason} ->
|
||||
@@ -236,8 +225,6 @@ defmodule Domain.Clients do
|
||||
|> case do
|
||||
{:ok, client} ->
|
||||
{:ok, _flows} = Flows.expire_flows_for(client)
|
||||
# TODO: WAL
|
||||
:ok = broadcast_to_client(client, :updated)
|
||||
{:ok, client}
|
||||
|
||||
{:error, reason} ->
|
||||
@@ -254,8 +241,6 @@ defmodule Domain.Clients do
|
||||
with :ok <- authorize_actor_client_management(client.actor_id, subject) do
|
||||
case delete_clients(queryable, subject) do
|
||||
{:ok, [client]} ->
|
||||
# TODO: WAL
|
||||
:ok = disconnect_client(client)
|
||||
{:ok, client}
|
||||
|
||||
{:ok, []} ->
|
||||
@@ -270,10 +255,8 @@ defmodule Domain.Clients do
|
||||
|> Client.Query.by_actor_id(actor.id)
|
||||
|> Client.Query.by_account_id(actor.account_id)
|
||||
|
||||
with :ok <- Auth.ensure_has_permissions(subject, Authorizer.manage_clients_permission()),
|
||||
{:ok, _clients} <- delete_clients(queryable, subject) do
|
||||
# TODO: WAL
|
||||
:ok = disconnect_actor_clients(actor)
|
||||
with :ok <- Auth.ensure_has_permissions(subject, Authorizer.manage_clients_permission()) do
|
||||
{:ok, _clients} = delete_clients(queryable, subject)
|
||||
:ok
|
||||
end
|
||||
end
|
||||
@@ -285,9 +268,6 @@ defmodule Domain.Clients do
|
||||
|> Client.Query.delete()
|
||||
|> Repo.update_all([])
|
||||
|
||||
# TODO: WAL
|
||||
:ok = Enum.each(clients, &disconnect_client/1)
|
||||
|
||||
{:ok, clients}
|
||||
end
|
||||
|
||||
@@ -302,86 +282,4 @@ defmodule Domain.Clients do
|
||||
defp authorize_actor_client_management(_actor_id, %Auth.Subject{} = subject) do
|
||||
Auth.ensure_has_permissions(subject, Authorizer.manage_clients_permission())
|
||||
end
|
||||
|
||||
def connect_client(%Client{} = client) do
|
||||
with {:ok, _} <-
|
||||
Presence.track(self(), account_clients_presence_topic(client.account_id), client.id, %{
|
||||
online_at: System.system_time(:second)
|
||||
}),
|
||||
{:ok, _} <-
|
||||
Presence.track(self(), actor_clients_presence_topic(client.actor_id), client.id, %{}) do
|
||||
:ok = PubSub.subscribe(client_topic(client))
|
||||
# TODO: WAL
|
||||
# :ok = PubSub.subscribe(actor_clients_topic(client.actor_id))
|
||||
# :ok = PubSub.subscribe(identity_topic(client.actor_id))
|
||||
:ok = PubSub.subscribe(account_clients_topic(client.account_id))
|
||||
:ok
|
||||
end
|
||||
end
|
||||
|
||||
### Presence
|
||||
|
||||
def account_clients_presence_topic(account_or_id),
|
||||
do: "presences:#{account_clients_topic(account_or_id)}"
|
||||
|
||||
defp actor_clients_presence_topic(actor_or_id),
|
||||
do: "presences:#{actor_clients_topic(actor_or_id)}"
|
||||
|
||||
### PubSub
|
||||
|
||||
defp client_topic(%Client{} = client), do: client_topic(client.id)
|
||||
defp client_topic(client_id), do: "clients:#{client_id}"
|
||||
|
||||
defp account_clients_topic(%Accounts.Account{} = account), do: account_clients_topic(account.id)
|
||||
defp account_clients_topic(account_id), do: "account_clients:#{account_id}"
|
||||
|
||||
defp actor_clients_topic(%Actors.Actor{} = actor), do: actor_clients_topic(actor.id)
|
||||
defp actor_clients_topic(actor_id), do: "actor_clients:#{actor_id}"
|
||||
|
||||
def subscribe_to_clients_presence_in_account(account_or_id) do
|
||||
PubSub.subscribe(account_clients_presence_topic(account_or_id))
|
||||
end
|
||||
|
||||
def unsubscribe_from_clients_presence_in_account(account_or_id) do
|
||||
PubSub.unsubscribe(account_clients_presence_topic(account_or_id))
|
||||
end
|
||||
|
||||
def subscribe_to_clients_presence_for_actor(actor_or_id) do
|
||||
PubSub.subscribe(actor_clients_presence_topic(actor_or_id))
|
||||
end
|
||||
|
||||
def unsubscribe_from_clients_presence_for_actor(actor_or_id) do
|
||||
PubSub.unsubscribe(actor_clients_presence_topic(actor_or_id))
|
||||
end
|
||||
|
||||
# TODO: WAL
|
||||
def broadcast_to_account_clients(account_or_id, payload) do
|
||||
account_or_id
|
||||
|> account_clients_topic()
|
||||
|> PubSub.broadcast(payload)
|
||||
end
|
||||
|
||||
def broadcast_to_client(client_or_id, payload) do
|
||||
client_or_id
|
||||
|> client_topic()
|
||||
|> PubSub.broadcast(payload)
|
||||
end
|
||||
|
||||
defp broadcast_to_actor_clients(actor_or_id, payload) do
|
||||
actor_or_id
|
||||
|> actor_clients_topic()
|
||||
|> PubSub.broadcast(payload)
|
||||
end
|
||||
|
||||
def disconnect_client(client_or_id) do
|
||||
broadcast_to_client(client_or_id, "disconnect")
|
||||
end
|
||||
|
||||
def disconnect_actor_clients(actor_or_id) do
|
||||
broadcast_to_actor_clients(actor_or_id, "disconnect")
|
||||
end
|
||||
|
||||
def disconnect_account_clients(account_or_id) do
|
||||
broadcast_to_account_clients(account_or_id, "disconnect")
|
||||
end
|
||||
end
|
||||
|
||||
@@ -6,6 +6,15 @@ defmodule Domain.Events.Hooks.Accounts do
|
||||
:ok
|
||||
end
|
||||
|
||||
# Account disabled - disconnect clients
|
||||
def on_update(
|
||||
%{"disabled_at" => nil} = _old_data,
|
||||
%{"disabled_at" => disabled_at, "id" => account_id} = _data
|
||||
)
|
||||
when not is_nil(disabled_at) do
|
||||
disconnect_clients(account_id)
|
||||
end
|
||||
|
||||
def on_update(%{"config" => old_config}, %{"config" => config, "id" => account_id}) do
|
||||
if old_config != config do
|
||||
broadcast(account_id, :config_changed)
|
||||
@@ -22,9 +31,41 @@ defmodule Domain.Events.Hooks.Accounts do
|
||||
PubSub.subscribe("accounts:#{account_id}")
|
||||
end
|
||||
|
||||
def subscribe_to_clients(account_id) do
|
||||
account_id
|
||||
|> clients_topic()
|
||||
|> PubSub.subscribe()
|
||||
end
|
||||
|
||||
def subscribe_to_clients_presence(account_id) do
|
||||
account_id
|
||||
|> clients_presence_topic()
|
||||
|> PubSub.subscribe()
|
||||
end
|
||||
|
||||
# No unsubscribe needed - account deletions destroy any subscribed entities
|
||||
|
||||
def clients_presence_topic(account_or_id) do
|
||||
"presences:#{clients_topic(account_or_id)}"
|
||||
end
|
||||
|
||||
def clients_topic(account_id) do
|
||||
"account_clients:#{account_id}"
|
||||
end
|
||||
|
||||
defp topic(account_id) do
|
||||
"accounts:#{account_id}"
|
||||
end
|
||||
|
||||
defp broadcast(account_id, event) do
|
||||
PubSub.broadcast("accounts:#{account_id}", event)
|
||||
account_id
|
||||
|> topic()
|
||||
|> PubSub.broadcast(event)
|
||||
end
|
||||
|
||||
defp disconnect_clients(account_id) do
|
||||
account_id
|
||||
|> clients_topic()
|
||||
|> PubSub.broadcast("disconnect")
|
||||
end
|
||||
end
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
defmodule Domain.Events.Hooks.Actors do
|
||||
alias Domain.PubSub
|
||||
|
||||
def on_insert(_data) do
|
||||
:ok
|
||||
end
|
||||
@@ -10,4 +12,18 @@ defmodule Domain.Events.Hooks.Actors do
|
||||
def on_delete(_old_data) do
|
||||
:ok
|
||||
end
|
||||
|
||||
def subscribe_to_clients_presence(actor_id) do
|
||||
actor_id
|
||||
|> clients_presence_topic()
|
||||
|> PubSub.subscribe()
|
||||
end
|
||||
|
||||
def clients_presence_topic(actor_id) do
|
||||
"presences:#{clients_topic(actor_id)}"
|
||||
end
|
||||
|
||||
defp clients_topic(actor_id) do
|
||||
"actor_clients:#{actor_id}"
|
||||
end
|
||||
end
|
||||
|
||||
@@ -1,13 +1,61 @@
|
||||
defmodule Domain.Events.Hooks.Clients do
|
||||
alias Domain.PubSub
|
||||
alias Domain.Clients.{Client, Presence}
|
||||
alias Domain.Events
|
||||
|
||||
def on_insert(_data) do
|
||||
:ok
|
||||
end
|
||||
|
||||
def on_update(_old_data, _data) do
|
||||
:ok
|
||||
# Soft-delete
|
||||
def on_update(%{"deleted_at" => nil} = old_data, %{"deleted_at" => deleted_at} = _data)
|
||||
when not is_nil(deleted_at) do
|
||||
on_delete(old_data)
|
||||
end
|
||||
|
||||
def on_delete(_old_data) do
|
||||
:ok
|
||||
# Regular update
|
||||
def on_update(_old_data, %{"id" => client_id} = _data) do
|
||||
broadcast(client_id, :updated)
|
||||
end
|
||||
|
||||
def on_delete(%{"id" => client_id} = _old_data) do
|
||||
disconnect_client(client_id)
|
||||
end
|
||||
|
||||
def connect(%Client{} = client) do
|
||||
with {:ok, _} <-
|
||||
Presence.track(
|
||||
self(),
|
||||
Events.Hooks.Accounts.clients_presence_topic(client.account_id),
|
||||
client.id,
|
||||
%{
|
||||
online_at: System.system_time(:second)
|
||||
}
|
||||
),
|
||||
{:ok, _} <-
|
||||
Presence.track(
|
||||
self(),
|
||||
Events.Hooks.Actors.clients_presence_topic(client.actor_id),
|
||||
client.id,
|
||||
%{}
|
||||
) do
|
||||
:ok = PubSub.subscribe(topic(client.id))
|
||||
:ok = Events.Hooks.Accounts.subscribe_to_clients(client.account_id)
|
||||
:ok
|
||||
end
|
||||
end
|
||||
|
||||
### PubSub
|
||||
|
||||
def broadcast(client_id, payload) do
|
||||
client_id
|
||||
|> topic()
|
||||
|> PubSub.broadcast(payload)
|
||||
end
|
||||
|
||||
defp topic(client_id), do: "clients:#{client_id}"
|
||||
|
||||
defp disconnect_client(client_id) do
|
||||
broadcast(client_id, "disconnect")
|
||||
end
|
||||
end
|
||||
|
||||
@@ -746,18 +746,14 @@ defmodule Domain.AccountsTest do
|
||||
]
|
||||
end
|
||||
|
||||
test "broadcasts disconnect message for the clients when account is disabled", %{
|
||||
test "allows disabling an account", %{
|
||||
account: account
|
||||
} do
|
||||
attrs = %{
|
||||
disabled_at: DateTime.utc_now()
|
||||
}
|
||||
|
||||
:ok = Domain.PubSub.subscribe("account_clients:#{account.id}")
|
||||
|
||||
assert {:ok, _account} = update_account_by_id(account.id, attrs)
|
||||
|
||||
assert_receive "disconnect"
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ defmodule Domain.ActorsTest do
|
||||
alias Domain.Auth
|
||||
alias Domain.Clients
|
||||
alias Domain.Actors
|
||||
alias Domain.Events
|
||||
|
||||
describe "fetch_groups_count_grouped_by_provider_id/1" do
|
||||
test "returns empty map when there are no groups" do
|
||||
@@ -631,7 +632,7 @@ defmodule Domain.ActorsTest do
|
||||
} do
|
||||
actor = Fixtures.Actors.create_actor(account: account)
|
||||
client = Fixtures.Clients.create_client(account: account, actor: actor)
|
||||
Domain.Clients.connect_client(client)
|
||||
Events.Hooks.Clients.connect(client)
|
||||
|
||||
assert {:ok, peek} = peek_actor_clients([actor], 3, subject)
|
||||
assert [%Clients.Client{} = client] = peek[actor.id].items
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
defmodule Domain.ClientsTest do
|
||||
use Domain.DataCase, async: true
|
||||
import Domain.Clients
|
||||
alias Domain.Clients
|
||||
alias Domain.{Clients, Events}
|
||||
|
||||
setup do
|
||||
account = Fixtures.Accounts.create_account()
|
||||
@@ -117,7 +117,7 @@ defmodule Domain.ClientsTest do
|
||||
assert {:ok, client} = fetch_client_by_id(client.id, subject, preload: [:online?])
|
||||
assert client.online? == false
|
||||
|
||||
assert connect_client(client) == :ok
|
||||
assert Events.Hooks.Clients.connect(client) == :ok
|
||||
assert {:ok, client} = fetch_client_by_id(client.id, subject, preload: [:online?])
|
||||
assert client.online? == true
|
||||
end
|
||||
@@ -222,7 +222,7 @@ defmodule Domain.ClientsTest do
|
||||
assert client = fetch_client_by_id!(client.id, preload: [:online?])
|
||||
assert client.online? == false
|
||||
|
||||
assert connect_client(client) == :ok
|
||||
assert Events.Hooks.Clients.connect(client) == :ok
|
||||
assert client = fetch_client_by_id!(client.id, preload: [:online?])
|
||||
assert client.online? == true
|
||||
end
|
||||
@@ -281,7 +281,7 @@ defmodule Domain.ClientsTest do
|
||||
assert {:ok, [client], _metadata} = list_clients(subject, preload: [:online?])
|
||||
assert client.online? == false
|
||||
|
||||
assert connect_client(client) == :ok
|
||||
assert Events.Hooks.Clients.connect(client) == :ok
|
||||
assert {:ok, [client], _metadata} = list_clients(subject, preload: [:online?])
|
||||
assert client.online? == true
|
||||
end
|
||||
@@ -911,15 +911,12 @@ defmodule Domain.ClientsTest do
|
||||
unprivileged_subject: subject
|
||||
} do
|
||||
client = Fixtures.Clients.create_client(actor: actor)
|
||||
:ok = Domain.PubSub.subscribe("clients:#{client.id}")
|
||||
|
||||
attrs = %{name: "new name"}
|
||||
|
||||
assert {:ok, client} = update_client(client, attrs, subject)
|
||||
|
||||
assert client.name == attrs.name
|
||||
|
||||
assert_receive :updated
|
||||
end
|
||||
|
||||
test "does not allow unprivileged actor to update other actors clients", %{
|
||||
@@ -1013,7 +1010,6 @@ defmodule Domain.ClientsTest do
|
||||
describe "verify_client/2" do
|
||||
test "allows admin actor to verify clients", %{admin_actor: actor, admin_subject: subject} do
|
||||
client = Fixtures.Clients.create_client(actor: actor)
|
||||
:ok = Domain.PubSub.subscribe("clients:#{client.id}")
|
||||
|
||||
assert {:ok, client} = verify_client(client, subject)
|
||||
assert client.verified_at
|
||||
@@ -1021,8 +1017,6 @@ defmodule Domain.ClientsTest do
|
||||
assert client.verified_by_actor_id == subject.actor.id
|
||||
assert client.verified_by_identity_id == subject.identity.id
|
||||
|
||||
assert_receive :updated
|
||||
|
||||
assert {:ok, double_verified_client} = verify_client(client, subject)
|
||||
assert double_verified_client.verified_at == client.verified_at
|
||||
end
|
||||
@@ -1053,7 +1047,6 @@ defmodule Domain.ClientsTest do
|
||||
admin_subject: subject
|
||||
} do
|
||||
client = Fixtures.Clients.create_client(actor: actor)
|
||||
:ok = Domain.PubSub.subscribe("clients:#{client.id}")
|
||||
|
||||
assert {:ok, client} = verify_client(client, subject)
|
||||
assert {:ok, client} = remove_client_verification(client, subject)
|
||||
@@ -1062,8 +1055,6 @@ defmodule Domain.ClientsTest do
|
||||
assert is_nil(client.verified_by)
|
||||
assert is_nil(client.verified_by_actor_id)
|
||||
assert is_nil(client.verified_by_identity_id)
|
||||
|
||||
assert_receive :updated
|
||||
end
|
||||
|
||||
test "expires flows for the unverified client", %{
|
||||
@@ -1243,45 +1234,4 @@ defmodule Domain.ClientsTest do
|
||||
missing_permissions: [Clients.Authorizer.manage_clients_permission()]}}
|
||||
end
|
||||
end
|
||||
|
||||
describe "connect_client/1" do
|
||||
test "tracks client presence for account", %{account: account} do
|
||||
client = Fixtures.Clients.create_client(account: account)
|
||||
assert connect_client(client) == :ok
|
||||
|
||||
client = fetch_client_by_id!(client.id, preload: [:online?])
|
||||
assert client.online? == true
|
||||
end
|
||||
|
||||
test "tracks client presence for actor", %{account: account} do
|
||||
actor = Fixtures.Actors.create_actor(account: account)
|
||||
client = Fixtures.Clients.create_client(account: account, actor: actor)
|
||||
assert connect_client(client) == :ok
|
||||
|
||||
assert broadcast_to_client(client, "test") == :ok
|
||||
|
||||
assert_receive "test"
|
||||
end
|
||||
|
||||
test "subscribes to client events", %{account: account} do
|
||||
actor = Fixtures.Actors.create_actor(account: account)
|
||||
client = Fixtures.Clients.create_client(account: account, actor: actor)
|
||||
assert connect_client(client) == :ok
|
||||
|
||||
assert disconnect_client(client) == :ok
|
||||
|
||||
assert_receive "disconnect"
|
||||
end
|
||||
|
||||
test "subscribes to account events", %{account: account} do
|
||||
actor = Fixtures.Actors.create_actor(account: account)
|
||||
client = Fixtures.Clients.create_client(account: account, actor: actor)
|
||||
|
||||
assert connect_client(client) == :ok
|
||||
|
||||
assert disconnect_account_clients(account) == :ok
|
||||
|
||||
assert_receive "disconnect"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -53,6 +53,19 @@ defmodule Domain.Events.Hooks.AccountsTest do
|
||||
assert :ok == on_update(old_data, data)
|
||||
refute_receive :config_changed
|
||||
end
|
||||
|
||||
test "sends disconnect to clients if account is disabled" do
|
||||
account_id = Fixtures.Accounts.create_account().id
|
||||
|
||||
old_data = %{"id" => account_id, "disabled_at" => nil}
|
||||
data = %{"id" => account_id, "disabled_at" => DateTime.utc_now()}
|
||||
|
||||
:ok = subscribe_to_clients(account_id)
|
||||
|
||||
assert :ok == on_update(old_data, data)
|
||||
|
||||
assert_receive "disconnect"
|
||||
end
|
||||
end
|
||||
|
||||
describe "delete/1" do
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
defmodule Domain.Events.Hooks.ClientsTest do
|
||||
use ExUnit.Case, async: true
|
||||
use Domain.DataCase, async: true
|
||||
import Domain.Events.Hooks.Clients
|
||||
|
||||
setup do
|
||||
@@ -13,14 +13,77 @@ defmodule Domain.Events.Hooks.ClientsTest do
|
||||
end
|
||||
|
||||
describe "update/2" do
|
||||
test "returns :ok", %{old_data: old_data, data: data} do
|
||||
test "soft-delete broadcasts disconnect" do
|
||||
client = Fixtures.Clients.create_client()
|
||||
:ok = connect(client)
|
||||
|
||||
old_data = %{"id" => client.id, "deleted_at" => nil}
|
||||
data = %{"id" => client.id, "deleted_at" => DateTime.utc_now()}
|
||||
|
||||
assert :ok == on_update(old_data, data)
|
||||
|
||||
assert_receive "disconnect"
|
||||
refute_receive :updated
|
||||
end
|
||||
|
||||
test "update broadcasts :update" do
|
||||
client = Fixtures.Clients.create_client()
|
||||
:ok = connect(client)
|
||||
|
||||
old_data = %{"id" => client.id, "name" => "Old Client"}
|
||||
data = %{"id" => client.id, "name" => "Updated Client"}
|
||||
|
||||
assert :ok == on_update(old_data, data)
|
||||
|
||||
assert_receive :updated
|
||||
refute_receive "disconnect"
|
||||
end
|
||||
end
|
||||
|
||||
describe "delete/1" do
|
||||
test "returns :ok", %{data: data} do
|
||||
assert :ok == on_delete(data)
|
||||
test "broadcasts disconnect" do
|
||||
client = Fixtures.Clients.create_client()
|
||||
:ok = connect(client)
|
||||
|
||||
old_data = %{"id" => client.id}
|
||||
|
||||
assert :ok == on_delete(old_data)
|
||||
|
||||
assert_receive "disconnect"
|
||||
refute_receive :updated
|
||||
end
|
||||
end
|
||||
|
||||
describe "connect/1" do
|
||||
test "tracks client presence and subscribes to topics" do
|
||||
client = Fixtures.Clients.create_client()
|
||||
assert :ok == connect(client)
|
||||
|
||||
assert client.account_id
|
||||
|> Domain.Events.Hooks.Accounts.clients_presence_topic()
|
||||
|> Domain.Clients.Presence.get_by_key(client.id)
|
||||
|
||||
assert client.actor_id
|
||||
|> Domain.Events.Hooks.Actors.clients_presence_topic()
|
||||
|> Domain.Clients.Presence.get_by_key(client.id)
|
||||
|
||||
Domain.PubSub.broadcast(
|
||||
Domain.Events.Hooks.Accounts.clients_topic(client.account_id),
|
||||
:test_event
|
||||
)
|
||||
|
||||
assert_receive :test_event
|
||||
end
|
||||
end
|
||||
|
||||
describe "broadcast/2" do
|
||||
test "broadcasts payload to client topic" do
|
||||
client = Fixtures.Clients.create_client()
|
||||
:ok = connect(client)
|
||||
|
||||
assert :ok == broadcast(client.id, :updated)
|
||||
|
||||
assert_receive :updated
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -2,7 +2,7 @@ defmodule Web.Actors.Show do
|
||||
use Web, :live_view
|
||||
import Web.Actors.Components
|
||||
import Web.Clients.Components
|
||||
alias Domain.{Accounts, Auth, Tokens, Flows, Clients}
|
||||
alias Domain.{Accounts, Auth, Events, Tokens, Flows, Clients}
|
||||
alias Domain.Actors
|
||||
|
||||
def mount(%{"id" => id}, _session, socket) do
|
||||
@@ -10,7 +10,9 @@ defmodule Web.Actors.Show do
|
||||
Actors.fetch_actor_by_id(id, socket.assigns.subject,
|
||||
preload: [:identities, :last_seen_at, groups: [:provider]]
|
||||
) do
|
||||
:ok = Clients.subscribe_to_clients_presence_for_actor(actor)
|
||||
if connected?(socket) do
|
||||
:ok = Events.Hooks.Actors.subscribe_to_clients_presence(actor.id)
|
||||
end
|
||||
|
||||
available_providers =
|
||||
Auth.all_active_providers_for_account!(socket.assigns.account)
|
||||
|
||||
@@ -3,10 +3,11 @@ defmodule Web.Clients.Index do
|
||||
import Web.Actors.Components
|
||||
import Web.Clients.Components
|
||||
alias Domain.Clients
|
||||
alias Domain.Events
|
||||
|
||||
def mount(_params, _session, socket) do
|
||||
if connected?(socket) do
|
||||
:ok = Clients.subscribe_to_clients_presence_in_account(socket.assigns.subject.account)
|
||||
:ok = Events.Hooks.Accounts.subscribe_to_clients_presence(socket.assigns.subject.account.id)
|
||||
end
|
||||
|
||||
socket =
|
||||
|
||||
@@ -2,7 +2,7 @@ defmodule Web.Clients.Show do
|
||||
use Web, :live_view
|
||||
import Web.Policies.Components
|
||||
import Web.Clients.Components
|
||||
alias Domain.{Accounts, Clients, Flows}
|
||||
alias Domain.{Accounts, Clients, Events, Flows}
|
||||
|
||||
def mount(%{"id" => id}, _session, socket) do
|
||||
with {:ok, client} <-
|
||||
@@ -16,7 +16,7 @@ defmodule Web.Clients.Show do
|
||||
]
|
||||
) do
|
||||
if connected?(socket) do
|
||||
:ok = Clients.subscribe_to_clients_presence_for_actor(client.actor)
|
||||
:ok = Events.Hooks.Actors.subscribe_to_clients_presence(client.actor_id)
|
||||
end
|
||||
|
||||
socket =
|
||||
|
||||
@@ -60,7 +60,7 @@ defmodule Web.Live.Actors.IndexTest do
|
||||
admin_actor = Fixtures.Actors.create_actor(account: account, type: :account_admin_user)
|
||||
Fixtures.Actors.create_membership(account: account, actor: admin_actor)
|
||||
client = Fixtures.Clients.create_client(account: account, actor: admin_actor)
|
||||
Domain.Clients.connect_client(client)
|
||||
Domain.Events.Hooks.Clients.connect(client)
|
||||
admin_actor = Repo.preload(admin_actor, identities: [:provider], groups: [])
|
||||
|
||||
user_actor = Fixtures.Actors.create_actor(account: account, type: :account_user)
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
defmodule Web.Live.Actors.ShowTest do
|
||||
use Web.ConnCase, async: true
|
||||
alias Domain.Events
|
||||
|
||||
test "redirects to sign in page for unauthorized user", %{conn: conn} do
|
||||
account = Fixtures.Accounts.create_account()
|
||||
@@ -91,8 +92,8 @@ defmodule Web.Live.Actors.ShowTest do
|
||||
|> live(~p"/#{account}/actors/#{actor}")
|
||||
|
||||
Domain.Config.put_env_override(:test_pid, self())
|
||||
Domain.Clients.subscribe_to_clients_presence_for_actor(actor)
|
||||
assert Domain.Clients.connect_client(client) == :ok
|
||||
:ok = Events.Hooks.Actors.subscribe_to_clients_presence(actor.id)
|
||||
assert Events.Hooks.Clients.connect(client) == :ok
|
||||
assert_receive %Phoenix.Socket.Broadcast{topic: "presences:actor_clients:" <> _}
|
||||
assert_receive {:live_table_reloaded, "clients"}, 500
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
defmodule Web.Live.Clients.IndexTest do
|
||||
use Web.ConnCase, async: true
|
||||
alias Domain.Events
|
||||
|
||||
setup do
|
||||
account = Fixtures.Accounts.create_account()
|
||||
@@ -59,7 +60,7 @@ defmodule Web.Live.Clients.IndexTest do
|
||||
online_client = Fixtures.Clients.create_client(account: account)
|
||||
offline_client = Fixtures.Clients.create_client(account: account)
|
||||
|
||||
:ok = Domain.Clients.connect_client(online_client)
|
||||
:ok = Events.Hooks.Clients.connect(online_client)
|
||||
|
||||
{:ok, lv, _html} =
|
||||
conn
|
||||
@@ -102,8 +103,8 @@ defmodule Web.Live.Clients.IndexTest do
|
||||
|> live(~p"/#{account}/clients")
|
||||
|
||||
Domain.Config.put_env_override(:test_pid, self())
|
||||
Domain.Clients.subscribe_to_clients_presence_for_actor(actor)
|
||||
assert Domain.Clients.connect_client(client) == :ok
|
||||
:ok = Events.Hooks.Actors.subscribe_to_clients_presence(client.actor_id)
|
||||
assert Events.Hooks.Clients.connect(client) == :ok
|
||||
assert_receive %Phoenix.Socket.Broadcast{topic: "presences:actor_clients:" <> _}
|
||||
assert_receive {:live_table_reloaded, "clients"}, 250
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
defmodule Web.Live.Clients.ShowTest do
|
||||
use Web.ConnCase, async: true
|
||||
alias Domain.Events
|
||||
|
||||
setup do
|
||||
account = Fixtures.Accounts.create_account()
|
||||
@@ -116,7 +117,7 @@ defmodule Web.Live.Clients.ShowTest do
|
||||
identity: identity,
|
||||
conn: conn
|
||||
} do
|
||||
:ok = Domain.Clients.connect_client(client)
|
||||
:ok = Events.Hooks.Clients.connect(client)
|
||||
|
||||
{:ok, lv, _html} =
|
||||
conn
|
||||
@@ -144,8 +145,8 @@ defmodule Web.Live.Clients.ShowTest do
|
||||
|> authorize_conn(identity)
|
||||
|> live(~p"/#{account}/clients/#{client}")
|
||||
|
||||
Domain.Clients.subscribe_to_clients_presence_for_actor(actor)
|
||||
assert Domain.Clients.connect_client(client) == :ok
|
||||
Events.Hooks.Actors.subscribe_to_clients_presence(actor.id)
|
||||
assert Events.Hooks.Clients.connect(client) == :ok
|
||||
assert_receive %Phoenix.Socket.Broadcast{topic: "presences:actor_clients:" <> _}
|
||||
|
||||
wait_for(fn ->
|
||||
|
||||
Reference in New Issue
Block a user