diff --git a/elixir/README.md b/elixir/README.md index 880667f8e..acd55272e 100644 --- a/elixir/README.md +++ b/elixir/README.md @@ -192,7 +192,7 @@ executing this example: ```elixir [gateway | _rest_gateways] = Domain.Repo.all(Domain.Gateways.Gateway) -:ok = Domain.Gateways.connect_gateway(gateway) +:ok = Events.Hooks.Gateways.connect(gateway) [relay | _rest_relays] = Domain.Repo.all(Domain.Relays.Relay) relay_secret = Domain.Crypto.random_token() diff --git a/elixir/apps/api/lib/api/client/channel.ex b/elixir/apps/api/lib/api/client/channel.ex index dd9d389af..4f45fb9c5 100644 --- a/elixir/apps/api/lib/api/client/channel.ex +++ b/elixir/apps/api/lib/api/client/channel.ex @@ -89,8 +89,8 @@ defmodule API.Client.Channel do |> Enum.flat_map(& &1.gateway_groups) |> Enum.uniq() |> Enum.each(fn gateway_group -> - :ok = Gateways.unsubscribe_from_group_updates(gateway_group) - :ok = Gateways.subscribe_to_group_updates(gateway_group) + :ok = Events.Hooks.GatewayGroups.unsubscribe(gateway_group.id) + :ok = Events.Hooks.GatewayGroups.subscribe(gateway_group.id) end) # Return all connected relays for the account @@ -638,8 +638,8 @@ defmodule API.Client.Channel do ice_credentials = generate_ice_credentials(socket.assigns.client, gateway) :ok = - Gateways.broadcast_to_gateway( - gateway, + Events.Hooks.Gateways.broadcast( + gateway.id, {:authorize_flow, {self(), socket_ref(socket)}, %{ client_id: socket.assigns.client.id, @@ -781,8 +781,8 @@ defmodule API.Client.Channel do opentelemetry_span_ctx = OpenTelemetry.Tracer.current_span_ctx() :ok = - Gateways.broadcast_to_gateway( - gateway, + Events.Hooks.Gateways.broadcast( + gateway.id, {:allow_access, {self(), socket_ref(socket)}, %{ client_id: socket.assigns.client.id, @@ -843,8 +843,8 @@ defmodule API.Client.Channel do opentelemetry_span_ctx = OpenTelemetry.Tracer.current_span_ctx() :ok = - Gateways.broadcast_to_gateway( - gateway, + Events.Hooks.Gateways.broadcast( + gateway.id, {:request_connection, {self(), socket_ref(socket)}, %{ client_id: socket.assigns.client.id, @@ -890,7 +890,7 @@ defmodule API.Client.Channel do :ok = Enum.each(gateway_ids, fn gateway_id -> - Gateways.broadcast_to_gateway( + Events.Hooks.Gateways.broadcast( gateway_id, {:ice_candidates, socket.assigns.client.id, candidates, {opentelemetry_ctx, opentelemetry_span_ctx}} @@ -915,7 +915,7 @@ defmodule API.Client.Channel do :ok = Enum.each(gateway_ids, fn gateway_id -> - Gateways.broadcast_to_gateway( + Events.Hooks.Gateways.broadcast( gateway_id, {:invalidate_ice_candidates, socket.assigns.client.id, candidates, {opentelemetry_ctx, opentelemetry_span_ctx}} diff --git a/elixir/apps/api/lib/api/gateway/channel.ex b/elixir/apps/api/lib/api/gateway/channel.ex index d6bfe8fcd..2764abf52 100644 --- a/elixir/apps/api/lib/api/gateway/channel.ex +++ b/elixir/apps/api/lib/api/gateway/channel.ex @@ -1,7 +1,7 @@ defmodule API.Gateway.Channel do use API, :channel alias API.Gateway.Views - alias Domain.{Clients, Events, Resources, Relays, Gateways, Flows} + alias Domain.{Clients, Events, Resources, Relays, Flows} alias Domain.Relays.Presence.Debouncer require Logger require OpenTelemetry.Tracer @@ -36,7 +36,7 @@ defmodule API.Gateway.Channel do OpenTelemetry.Tracer.set_current_span(opentelemetry_span_ctx) OpenTelemetry.Tracer.with_span "gateway.after_join" do - :ok = Gateways.connect_gateway(socket.assigns.gateway) + :ok = Events.Hooks.Gateways.connect(socket.assigns.gateway) config = Domain.Config.fetch_env!(:domain, Domain.Gateways) ipv4_masquerade_enabled? = Keyword.fetch!(config, :gateway_ipv4_masquerade) diff --git a/elixir/apps/api/test/api/client/channel_test.exs b/elixir/apps/api/test/api/client/channel_test.exs index 4a6fdeb00..8e53add3d 100644 --- a/elixir/apps/api/test/api/client/channel_test.exs +++ b/elixir/apps/api/test/api/client/channel_test.exs @@ -1093,7 +1093,7 @@ defmodule API.Client.ChannelTest do resource = Fixtures.Resources.create_resource(account: account) gateway = Fixtures.Gateways.create_gateway(account: account) - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) attrs = %{ "resource_id" => resource.id, @@ -1140,7 +1140,7 @@ defmodule API.Client.ChannelTest do "connected_gateway_ids" => [] } - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) push(socket, "create_flow", attrs) # assert_reply ref, :error, %{reason: :not_found} @@ -1160,7 +1160,7 @@ defmodule API.Client.ChannelTest do socket: socket } do gateway = Fixtures.Gateways.create_gateway(account: account) - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) push(socket, "create_flow", %{ "resource_id" => resource.id, @@ -1200,7 +1200,7 @@ defmodule API.Client.ChannelTest do last_seen_at: DateTime.utc_now() |> DateTime.add(-10, :second) ) - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) Domain.PubSub.subscribe(Domain.Tokens.socket_id(gateway_group_token)) push(socket, "create_flow", %{ @@ -1255,7 +1255,7 @@ defmodule API.Client.ChannelTest do last_seen_at: DateTime.utc_now() |> DateTime.add(-10, :second) ) - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) Domain.PubSub.subscribe(Domain.Tokens.socket_id(gateway_group_token)) push(socket, "create_flow", %{ @@ -1305,7 +1305,7 @@ defmodule API.Client.ChannelTest do last_seen_at: DateTime.utc_now() |> DateTime.add(-10, :second) ) - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) Domain.PubSub.subscribe(Domain.Tokens.socket_id(gateway_group_token)) push(socket, "create_flow", %{ @@ -1409,7 +1409,7 @@ defmodule API.Client.ChannelTest do last_seen_at: DateTime.utc_now() |> DateTime.add(-10, :second) ) - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) Domain.PubSub.subscribe(Domain.Tokens.socket_id(gateway_group_token)) push(socket, "create_flow", %{ @@ -1455,7 +1455,7 @@ defmodule API.Client.ChannelTest do ) ) - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) {:ok, _reply, socket} = API.Client.Socket @@ -1490,7 +1490,7 @@ defmodule API.Client.ChannelTest do ) ) - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) push(socket, "create_flow", %{ "resource_id" => resource.id, @@ -1527,7 +1527,7 @@ defmodule API.Client.ChannelTest do group: gateway_group ) - :ok = Domain.Gateways.connect_gateway(gateway1) + :ok = Events.Hooks.Gateways.connect(gateway1) gateway2 = Fixtures.Gateways.create_gateway( @@ -1535,7 +1535,7 @@ defmodule API.Client.ChannelTest do group: gateway_group ) - :ok = Domain.Gateways.connect_gateway(gateway2) + :ok = Events.Hooks.Gateways.connect(gateway2) push(socket, "create_flow", %{ "resource_id" => resource.id, @@ -1586,7 +1586,7 @@ defmodule API.Client.ChannelTest do resource = Fixtures.Resources.create_resource(account: account) gateway = Fixtures.Gateways.create_gateway(account: account) - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) attrs = %{ "resource_id" => resource.id @@ -1602,7 +1602,7 @@ defmodule API.Client.ChannelTest do socket: socket } do gateway = Fixtures.Gateways.create_gateway(account: account) - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) ref = push(socket, "prepare_connection", %{"resource_id" => resource.id}) assert_reply ref, :error, %{reason: :offline} @@ -1629,7 +1629,7 @@ defmodule API.Client.ChannelTest do last_seen_at: DateTime.utc_now() |> DateTime.add(-10, :second) ) - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) ref = push(socket, "prepare_connection", %{"resource_id" => resource.id}) resource_id = resource.id @@ -1651,7 +1651,7 @@ defmodule API.Client.ChannelTest do socket: socket } do gateway = Fixtures.Gateways.create_gateway(account: account) - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) ref = push(socket, "prepare_connection", %{"resource_id" => dns_resource.id}) assert_reply ref, :error, %{reason: :offline} @@ -1698,7 +1698,7 @@ defmodule API.Client.ChannelTest do resource: resource ) - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) ref = push(socket, "prepare_connection", %{"resource_id" => resource.id}) resource_id = resource.id @@ -1718,7 +1718,7 @@ defmodule API.Client.ChannelTest do last_seen_at: DateTime.utc_now() |> DateTime.add(-10, :second) ) - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) ref = push(socket, "prepare_connection", %{"resource_id" => resource.id}) @@ -1763,7 +1763,7 @@ defmodule API.Client.ChannelTest do } ) - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) ref = push(socket, "prepare_connection", %{"resource_id" => resource.id}) resource_id = resource.id @@ -1783,7 +1783,7 @@ defmodule API.Client.ChannelTest do last_seen_at: DateTime.utc_now() |> DateTime.add(-10, :second) ) - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) ref = push(socket, "prepare_connection", %{"resource_id" => resource.id}) @@ -1835,7 +1835,7 @@ defmodule API.Client.ChannelTest do last_seen_at: DateTime.utc_now() |> DateTime.add(-10, :second) ) - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) ref = push(socket, "prepare_connection", %{"resource_id" => resource.id}) @@ -1877,7 +1877,7 @@ defmodule API.Client.ChannelTest do ) ) - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) {:ok, _reply, socket} = API.Client.Socket @@ -1904,7 +1904,7 @@ defmodule API.Client.ChannelTest do ) ) - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) ref = push(socket, "prepare_connection", %{"resource_id" => resource.id}) @@ -1941,7 +1941,7 @@ defmodule API.Client.ChannelTest do socket: socket } do gateway = Fixtures.Gateways.create_gateway(account: account) - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) attrs = %{ "resource_id" => resource.id, @@ -1986,7 +1986,7 @@ defmodule API.Client.ChannelTest do "payload" => "DNS_Q" } - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) ref = push(socket, "reuse_connection", attrs) @@ -2003,7 +2003,7 @@ defmodule API.Client.ChannelTest do resource = Fixtures.Resources.create_resource(account: account) gateway = Fixtures.Gateways.create_gateway(account: account) - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) attrs = %{ "resource_id" => resource.id, @@ -2040,7 +2040,7 @@ defmodule API.Client.ChannelTest do resource_id = resource.id client_id = client.id - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) attrs = %{ "resource_id" => resource.id, @@ -2100,7 +2100,7 @@ defmodule API.Client.ChannelTest do }) |> subscribe_and_join(API.Client.Channel, "client") - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) Phoenix.PubSub.subscribe(Domain.PubSub, Domain.Tokens.socket_id(gateway_group_token)) push(socket, "reuse_connection", %{ @@ -2144,7 +2144,7 @@ defmodule API.Client.ChannelTest do socket: socket } do gateway = Fixtures.Gateways.create_gateway(account: account) - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) attrs = %{ "resource_id" => resource.id, @@ -2164,7 +2164,7 @@ defmodule API.Client.ChannelTest do resource = Fixtures.Resources.create_resource(account: account) gateway = Fixtures.Gateways.create_gateway(account: account) - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) attrs = %{ "resource_id" => resource.id, @@ -2211,7 +2211,7 @@ defmodule API.Client.ChannelTest do "client_preshared_key" => "PSK" } - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) ref = push(socket, "request_connection", attrs) @@ -2248,7 +2248,7 @@ defmodule API.Client.ChannelTest do resource_id = resource.id client_id = client.id - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) Domain.PubSub.subscribe(Domain.Tokens.socket_id(gateway_group_token)) attrs = %{ @@ -2311,7 +2311,7 @@ defmodule API.Client.ChannelTest do }) |> subscribe_and_join(API.Client.Channel, "client") - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) Phoenix.PubSub.subscribe(Domain.PubSub, Domain.Tokens.socket_id(gateway_group_token)) push(socket, "request_connection", %{ @@ -2353,7 +2353,7 @@ defmodule API.Client.ChannelTest do "gateway_ids" => [gateway.id] } - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) Domain.PubSub.subscribe(Domain.Tokens.socket_id(gateway_group_token)) push(socket, "broadcast_ice_candidates", attrs) @@ -2391,7 +2391,7 @@ defmodule API.Client.ChannelTest do "gateway_ids" => [gateway.id] } - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) Domain.PubSub.subscribe(Domain.Tokens.socket_id(gateway_group_token)) push(socket, "broadcast_invalidated_ice_candidates", attrs) diff --git a/elixir/apps/api/test/api/gateway/channel_test.exs b/elixir/apps/api/test/api/gateway/channel_test.exs index b5579bee5..d00cafd02 100644 --- a/elixir/apps/api/test/api/gateway/channel_test.exs +++ b/elixir/apps/api/test/api/gateway/channel_test.exs @@ -53,7 +53,9 @@ defmodule API.Gateway.ChannelTest do describe "join/3" do test "tracks presence after join", %{account: account, gateway: gateway} do presence = - Domain.Gateways.Presence.list(Domain.Gateways.account_gateways_presence_topic(account)) + account.id + |> Events.Hooks.Accounts.gateways_presence_topic() + |> Domain.Gateways.Presence.list() assert %{metas: [%{online_at: online_at, phx_ref: _ref}]} = Map.fetch!(presence, gateway.id) assert is_number(online_at) diff --git a/elixir/apps/domain/lib/domain/events/hooks/accounts.ex b/elixir/apps/domain/lib/domain/events/hooks/accounts.ex index feaa6adde..158087afa 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/accounts.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/accounts.ex @@ -43,16 +43,26 @@ defmodule Domain.Events.Hooks.Accounts do |> PubSub.subscribe() end + def subscribe_to_gateways_presence(account_id) do + account_id + |> gateways_presence_topic() + |> PubSub.subscribe() + end + # No unsubscribe needed - account deletions destroy any subscribed entities - def clients_presence_topic(account_or_id) do - "presences:#{clients_topic(account_or_id)}" + def clients_presence_topic(account_id) do + "presences:#{clients_topic(account_id)}" end def clients_topic(account_id) do "account_clients:#{account_id}" end + def gateways_presence_topic(account_id) do + "presences:account_gateways:#{account_id}" + end + defp topic(account_id) do "accounts:#{account_id}" end diff --git a/elixir/apps/domain/lib/domain/events/hooks/gateway_groups.ex b/elixir/apps/domain/lib/domain/events/hooks/gateway_groups.ex index b375077dd..861dcde05 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/gateway_groups.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/gateway_groups.ex @@ -1,4 +1,6 @@ defmodule Domain.Events.Hooks.GatewayGroups do + alias Domain.PubSub + def on_insert(_data) do :ok end @@ -10,4 +12,30 @@ defmodule Domain.Events.Hooks.GatewayGroups do def on_delete(_old_data) do :ok end + + def subscribe(gateway_group_id) do + gateway_group_id + |> topic() + |> PubSub.subscribe() + end + + def subscribe_to_presence(gateway_group_id) do + gateway_group_id + |> presence_topic() + |> PubSub.subscribe() + end + + def unsubscribe(gateway_group_id) do + gateway_group_id + |> topic() + |> PubSub.unsubscribe() + end + + def presence_topic(gateway_group_id) do + "presences:#{topic(gateway_group_id)}" + end + + defp topic(gateway_group_id) do + "group_gateways:#{gateway_group_id}" + end end diff --git a/elixir/apps/domain/lib/domain/events/hooks/gateways.ex b/elixir/apps/domain/lib/domain/events/hooks/gateways.ex index d4edb44fc..6c55cd40c 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/gateways.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/gateways.ex @@ -1,13 +1,61 @@ defmodule Domain.Events.Hooks.Gateways do + alias Domain.PubSub + alias Domain.Gateways + alias Domain.Events + def on_insert(_data) do :ok end + # Soft-delete + def on_update(%{"deleted_at" => nil} = old_data, %{"deleted_at" => deleted_at} = _data) + when not is_nil(deleted_at) do + on_delete(old_data) + end + + # Regular update def on_update(_old_data, _data) do :ok end - def on_delete(_old_data) do - :ok + def on_delete(%{"id" => gateway_id} = _old_data) do + disconnect(gateway_id) + end + + def connect(%Gateways.Gateway{} = gateway) do + with {:ok, _} <- + Gateways.Presence.track( + self(), + Events.Hooks.GatewayGroups.presence_topic(gateway.group_id), + gateway.id, + %{} + ), + {:ok, _} <- + Gateways.Presence.track( + self(), + Events.Hooks.Accounts.gateways_presence_topic(gateway.account_id), + gateway.id, + %{ + online_at: System.system_time(:second) + } + ) do + :ok = PubSub.subscribe(topic(gateway.id)) + :ok + end + end + + def broadcast(gateway_id, payload) do + gateway_id + |> topic() + |> PubSub.broadcast(payload) + end + + defp disconnect(gateway_id) do + gateway_id + |> broadcast("disconnect") + end + + defp topic(gateway_id) do + "gateways:#{gateway_id}" end end diff --git a/elixir/apps/domain/lib/domain/gateways.ex b/elixir/apps/domain/lib/domain/gateways.ex index 25220c9b3..9d61bbbbc 100644 --- a/elixir/apps/domain/lib/domain/gateways.ex +++ b/elixir/apps/domain/lib/domain/gateways.ex @@ -1,7 +1,7 @@ defmodule Domain.Gateways do use Supervisor alias Domain.Accounts.Account - alias Domain.{Repo, Auth, Geo, PubSub} + alias Domain.{Repo, Auth, Events, Geo} alias Domain.{Accounts, Resources, Tokens, Billing} alias Domain.Gateways.{Authorizer, Gateway, Group, Presence} @@ -118,15 +118,6 @@ defmodule Domain.Gateways do |> Group.Changeset.update(attrs, subject) end ) - |> case do - {:ok, group} -> - # TODO: WAL - :ok = broadcast_to_group(group, :updated) - {:ok, group} - - {:error, reason} -> - {:error, reason} - end end end @@ -137,6 +128,7 @@ defmodule Domain.Gateways do |> Authorizer.for_subject(subject) |> Repo.fetch_and_update(Group.Query, with: fn group -> + # Token deletion will disconnect gateways {:ok, _tokens} = Tokens.delete_tokens_for(group, subject) {:ok, _count} = Resources.delete_connections_for(group, subject) @@ -146,8 +138,7 @@ defmodule Domain.Gateways do |> Repo.update_all(set: [deleted_at: DateTime.utc_now()]) Group.Changeset.delete(group) - end, - after_commit: &disconnect_gateways_in_group/1 + end ) end end @@ -230,7 +221,7 @@ defmodule Domain.Gateways do @doc false def preload_gateways_presence([gateway]) do gateway.account_id - |> account_gateways_presence_topic() + |> Events.Hooks.Accounts.gateways_presence_topic() |> Presence.get_by_key(gateway.id) |> case do [] -> %{gateway | online?: false} @@ -247,7 +238,9 @@ defmodule Domain.Gateways do |> Enum.reject(&is_nil/1) |> Enum.uniq() |> Enum.reduce(%{}, fn account_id, acc -> - connected_gateways = account_id |> account_gateways_presence_topic() |> Presence.list() + connected_gateways = + account_id |> Events.Hooks.Accounts.gateways_presence_topic() |> Presence.list() + Map.merge(acc, connected_gateways) end) @@ -258,7 +251,7 @@ defmodule Domain.Gateways do def all_online_gateway_ids_by_group_id!(group_id) do group_id - |> group_gateways_presence_topic() + |> Events.Hooks.GatewayGroups.presence_topic() |> Presence.list() |> Map.keys() end @@ -273,7 +266,7 @@ defmodule Domain.Gateways do connected_gateway_ids = resource.account_id - |> account_gateways_presence_topic() + |> Events.Hooks.Accounts.gateways_presence_topic() |> Presence.list() |> Map.keys() @@ -296,7 +289,7 @@ defmodule Domain.Gateways do def gateway_can_connect_to_resource?(%Gateway{} = gateway, %Resources.Resource{} = resource) do connected_gateway_ids = resource.account_id - |> account_gateways_presence_topic() + |> Events.Hooks.Accounts.gateways_presence_topic() |> Presence.list() |> Map.keys() @@ -367,7 +360,6 @@ defmodule Domain.Gateways do |> Authorizer.for_subject(subject) |> Repo.fetch_and_update(Gateway.Query, with: &Gateway.Changeset.delete/1, - after_commit: &disconnect_gateway/1, preload: [:online?] ) end @@ -419,30 +411,6 @@ defmodule Domain.Gateways do end end - def connect_gateway(%Gateway{} = gateway) do - with {:ok, _} <- - Presence.track( - self(), - group_gateways_presence_topic(gateway.group_id), - gateway.id, - %{} - ), - {:ok, _} <- - Presence.track( - self(), - account_gateways_presence_topic(gateway.account_id), - gateway.id, - %{ - online_at: System.system_time(:second) - } - ) do - :ok = PubSub.subscribe(gateway_topic(gateway)) - :ok = PubSub.subscribe(group_gateways_topic(gateway.group_id)) - :ok = PubSub.subscribe(account_gateways_topic(gateway.account_id)) - :ok - end - end - def gateway_outdated?(gateway) do latest_release = Domain.ComponentVersions.gateway_version() @@ -451,102 +419,4 @@ defmodule Domain.Gateways do _ -> false end end - - ### Presence - - def account_gateways_presence_topic(account_or_id), - do: "presences:#{account_gateways_topic(account_or_id)}" - - defp group_gateways_presence_topic(group_or_id), - do: "presences:#{group_gateways_topic(group_or_id)}" - - ### PubSub - - defp gateway_topic(%Gateway{} = gateway), do: gateway_topic(gateway.id) - defp gateway_topic(gateway_id), do: "gateways:#{gateway_id}" - - defp account_gateways_topic(%Accounts.Account{} = account), - do: account_gateways_topic(account.id) - - defp account_gateways_topic(account_id), - do: "account_gateways:#{account_id}" - - defp group_gateways_topic(%Group{} = group), do: group_gateways_topic(group.id) - defp group_gateways_topic(group_id), do: "group_gateways:#{group_id}" - - defp group_topic(%Group{} = group), do: group_topic(group.id) - defp group_topic(group_id), do: "group:#{group_id}" - - def subscribe_to_group_updates(group_or_id) do - group_or_id - |> group_topic() - |> PubSub.subscribe() - end - - def unsubscribe_from_group_updates(group_or_id) do - group_or_id - |> group_topic() - |> PubSub.unsubscribe() - end - - def subscribe_to_gateways_presence_in_account(%Accounts.Account{} = account) do - account - |> account_gateways_presence_topic() - |> PubSub.subscribe() - end - - def unsubscribe_from_gateways_presence_in_account(%Accounts.Account{} = account) do - account - |> account_gateways_presence_topic() - |> PubSub.unsubscribe() - end - - def subscribe_to_gateways_presence_in_group(group_or_id) do - group_or_id - |> group_gateways_presence_topic() - |> PubSub.subscribe() - end - - def unsubscribe_from_gateways_presence_in_group(group_or_id) do - group_or_id - |> group_gateways_presence_topic() - |> PubSub.unsubscribe() - end - - # TODO: WAL - def broadcast_to_group(group_or_id, payload) do - group_or_id - |> group_topic() - |> PubSub.broadcast(payload) - end - - def broadcast_to_gateway(gateway_or_id, payload) do - gateway_or_id - |> gateway_topic() - |> PubSub.broadcast(payload) - end - - defp broadcast_to_gateways_in_group(group_or_id, payload) do - group_or_id - |> group_gateways_topic() - |> PubSub.broadcast(payload) - end - - defp broadcast_to_gateways_in_account(account_or_id, payload) do - account_or_id - |> account_gateways_topic() - |> PubSub.broadcast(payload) - end - - def disconnect_gateway(gateway_or_id) do - broadcast_to_gateway(gateway_or_id, "disconnect") - end - - def disconnect_gateways_in_group(group_or_id) do - broadcast_to_gateways_in_group(group_or_id, "disconnect") - end - - def disconnect_gateways_in_account(account_or_id) do - broadcast_to_gateways_in_account(account_or_id, "disconnect") - end end diff --git a/elixir/apps/domain/test/domain/events/hooks/gateways_test.exs b/elixir/apps/domain/test/domain/events/hooks/gateways_test.exs index 281d52626..b09825810 100644 --- a/elixir/apps/domain/test/domain/events/hooks/gateways_test.exs +++ b/elixir/apps/domain/test/domain/events/hooks/gateways_test.exs @@ -1,5 +1,5 @@ defmodule Domain.Events.Hooks.GatewaysTest do - use ExUnit.Case, async: true + use Domain.DataCase, async: true import Domain.Events.Hooks.Gateways setup do @@ -13,14 +13,41 @@ defmodule Domain.Events.Hooks.GatewaysTest do end describe "update/2" do - test "returns :ok", %{old_data: old_data, data: data} do - assert :ok == on_update(old_data, data) + test "soft-delete broadcasts disconnect" do + gateway = Fixtures.Gateways.create_gateway() + + old_data = %{"id" => gateway.id, "deleted_at" => nil} + data = %{"id" => gateway.id, "deleted_at" => "2023-10-01T00:00:00Z"} + + :ok = connect(gateway) + :ok = on_update(old_data, data) + + assert_receive "disconnect" + end + + test "regular update does not broadcast disconnect" do + gateway = Fixtures.Gateways.create_gateway() + + old_data = %{"id" => gateway.id} + data = %{"id" => gateway.id, "name" => "New Gateway Name"} + + :ok = connect(gateway) + :ok = on_update(old_data, data) + + refute_receive "disconnect" end end describe "delete/1" do - test "returns :ok", %{data: data} do - assert :ok == on_delete(data) + test "delete broadcasts disconnect" do + gateway = Fixtures.Gateways.create_gateway() + + old_data = %{"id" => gateway.id} + + :ok = connect(gateway) + :ok = on_delete(old_data) + + assert_receive "disconnect" end end end diff --git a/elixir/apps/domain/test/domain/gateways_test.exs b/elixir/apps/domain/test/domain/gateways_test.exs index a71f7f60a..453b9d3c6 100644 --- a/elixir/apps/domain/test/domain/gateways_test.exs +++ b/elixir/apps/domain/test/domain/gateways_test.exs @@ -1,7 +1,7 @@ defmodule Domain.GatewaysTest do use Domain.DataCase, async: true import Domain.Gateways - alias Domain.{Gateways, Tokens, Resources} + alias Domain.{Events, Gateways, Tokens, Resources} setup do account = Fixtures.Accounts.create_account() @@ -299,19 +299,6 @@ defmodule Domain.GatewaysTest do assert group.name == "foo" end - test "broadcasts an update event", %{account: account, subject: subject} do - group = Fixtures.Gateways.create_group(account: account) - :ok = subscribe_to_group_updates(group) - - attrs = %{ - name: "foo" - } - - assert {:ok, _group} = update_group(group, attrs, subject) - - assert_receive :updated - end - test "returns error when subject has no permission to manage groups", %{ account: account, subject: subject @@ -635,7 +622,7 @@ defmodule Domain.GatewaysTest do } do offline_gateway = Fixtures.Gateways.create_gateway(account: account) online_gateway = Fixtures.Gateways.create_gateway(account: account) - :ok = connect_gateway(online_gateway) + :ok = Events.Hooks.Gateways.connect(online_gateway) Fixtures.Gateways.create_gateway() assert {:ok, gateways, _metadata} = list_gateways(subject, preload: :online?) @@ -702,7 +689,7 @@ defmodule Domain.GatewaysTest do connections: [%{gateway_group_id: gateway.group_id}] ) - assert connect_gateway(gateway) == :ok + assert Events.Hooks.Gateways.connect(gateway) == :ok assert {:ok, [connected_gateway]} = all_connected_gateways_for_resource(resource, subject) assert connected_gateway.id == gateway.id @@ -715,7 +702,7 @@ defmodule Domain.GatewaysTest do resource = Fixtures.Resources.create_resource(account: account) gateway = Fixtures.Gateways.create_gateway(account: account) - assert connect_gateway(gateway) == :ok + assert Events.Hooks.Gateways.connect(gateway) == :ok assert all_connected_gateways_for_resource(resource, subject) == {:ok, []} end @@ -724,7 +711,7 @@ defmodule Domain.GatewaysTest do describe "gateway_can_connect_to_resource?/2" do test "returns true when gateway can connect to resource", %{account: account} do gateway = Fixtures.Gateways.create_gateway(account: account) - :ok = connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) resource = Fixtures.Resources.create_resource( @@ -737,7 +724,7 @@ defmodule Domain.GatewaysTest do test "returns false when gateway cannot connect to resource", %{account: account} do gateway = Fixtures.Gateways.create_gateway(account: account) - :ok = connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) resource = Fixtures.Resources.create_resource(account: account) @@ -1219,55 +1206,18 @@ defmodule Domain.GatewaysTest do test "does not allow duplicate presence", %{account: account} do gateway = Fixtures.Gateways.create_gateway(account: account) - assert connect_gateway(gateway) == :ok - assert {:error, {:already_tracked, _pid, _topic, _key}} = connect_gateway(gateway) + assert Events.Hooks.Gateways.connect(gateway) == :ok + + assert {:error, {:already_tracked, _pid, _topic, _key}} = + Events.Hooks.Gateways.connect(gateway) end test "tracks gateway presence for account", %{account: account} do gateway = Fixtures.Gateways.create_gateway(account: account) - assert connect_gateway(gateway) == :ok + assert Events.Hooks.Gateways.connect(gateway) == :ok gateway = fetch_gateway_by_id!(gateway.id, preload: [:online?]) assert gateway.online? == true end - - test "tracks gateway presence for actor", %{account: account} do - gateway = Fixtures.Gateways.create_gateway(account: account) - assert connect_gateway(gateway) == :ok - - assert broadcast_to_gateway(gateway, "test") == :ok - - assert_receive "test" - end - - test "subscribes to gateway events", %{account: account} do - gateway = Fixtures.Gateways.create_gateway(account: account) - assert connect_gateway(gateway) == :ok - - assert disconnect_gateway(gateway) == :ok - - assert_receive "disconnect" - end - - test "subscribes to gateway group events", %{account: account} do - group = Fixtures.Gateways.create_group(account: account) - gateway = Fixtures.Gateways.create_gateway(account: account, group: group) - - assert connect_gateway(gateway) == :ok - - assert disconnect_gateways_in_group(group) == :ok - - assert_receive "disconnect" - end - - test "subscribes to account events", %{account: account} do - gateway = Fixtures.Gateways.create_gateway(account: account) - - assert connect_gateway(gateway) == :ok - - assert disconnect_gateways_in_account(account) == :ok - - assert_receive "disconnect" - end end end diff --git a/elixir/apps/domain/test/domain/notifications/jobs/outdated_gateways_test.exs b/elixir/apps/domain/test/domain/notifications/jobs/outdated_gateways_test.exs index 62d9ba58c..0bea5610c 100644 --- a/elixir/apps/domain/test/domain/notifications/jobs/outdated_gateways_test.exs +++ b/elixir/apps/domain/test/domain/notifications/jobs/outdated_gateways_test.exs @@ -1,7 +1,8 @@ defmodule Domain.Notifications.Jobs.OutdatedGatewaysTest do - alias Domain.ComponentVersions use Domain.DataCase, async: true import Domain.Notifications.Jobs.OutdatedGateways + alias Domain.ComponentVersions + alias Domain.Events describe "execute/1" do setup do @@ -41,8 +42,8 @@ defmodule Domain.Notifications.Jobs.OutdatedGatewaysTest do new_config = update_component_versions_config(gateway: new_version) Domain.Config.put_env_override(ComponentVersions, new_config) - :ok = Domain.Gateways.subscribe_to_gateways_presence_in_group(gateway_group) - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.GatewayGroups.subscribe_to_presence(gateway_group.id) + :ok = Events.Hooks.Gateways.connect(gateway) assert_receive %Phoenix.Socket.Broadcast{topic: "presences:group_gateways:" <> _} assert execute(%{}) == :ok @@ -65,8 +66,8 @@ defmodule Domain.Notifications.Jobs.OutdatedGatewaysTest do new_config = update_component_versions_config(gateway: version) Domain.Config.put_env_override(ComponentVersions, new_config) - :ok = Domain.Gateways.subscribe_to_gateways_presence_in_group(gateway_group) - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.GatewayGroups.subscribe_to_presence(gateway_group.id) + :ok = Events.Hooks.Gateways.connect(gateway) assert_receive %Phoenix.Socket.Broadcast{topic: "presences:group_gateways:" <> _} assert execute(%{}) == :ok diff --git a/elixir/apps/web/lib/web/live/gateways/show.ex b/elixir/apps/web/lib/web/live/gateways/show.ex index 70a0b441b..3a608817e 100644 --- a/elixir/apps/web/lib/web/live/gateways/show.ex +++ b/elixir/apps/web/lib/web/live/gateways/show.ex @@ -1,12 +1,13 @@ defmodule Web.Gateways.Show do use Web, :live_view alias Domain.Gateways + alias Domain.Events def mount(%{"id" => id}, _session, socket) do with {:ok, gateway} <- Gateways.fetch_gateway_by_id(id, socket.assigns.subject, preload: [:group, :online?]) do if connected?(socket) do - :ok = Gateways.subscribe_to_gateways_presence_in_group(gateway.group) + :ok = Events.Hooks.GatewayGroups.subscribe_to_presence(gateway.group_id) end socket = diff --git a/elixir/apps/web/lib/web/live/sites/gateways/index.ex b/elixir/apps/web/lib/web/live/sites/gateways/index.ex index c425f897b..8990216fc 100644 --- a/elixir/apps/web/lib/web/live/sites/gateways/index.ex +++ b/elixir/apps/web/lib/web/live/sites/gateways/index.ex @@ -1,11 +1,12 @@ defmodule Web.Sites.Gateways.Index do use Web, :live_view alias Domain.Gateways + alias Domain.Events def mount(%{"id" => id}, _session, socket) do with {:ok, group} <- Gateways.fetch_group_by_id(id, socket.assigns.subject) do if connected?(socket) do - :ok = Gateways.subscribe_to_gateways_presence_in_group(group) + :ok = Events.Hooks.GatewayGroups.subscribe_to_presence(group.id) end socket = diff --git a/elixir/apps/web/lib/web/live/sites/index.ex b/elixir/apps/web/lib/web/live/sites/index.ex index 8983316b2..aa1e00a7c 100644 --- a/elixir/apps/web/lib/web/live/sites/index.ex +++ b/elixir/apps/web/lib/web/live/sites/index.ex @@ -1,11 +1,12 @@ defmodule Web.Sites.Index do use Web, :live_view alias Domain.Gateways + alias Domain.Events require Logger def mount(_params, _session, socket) do if connected?(socket) do - :ok = Gateways.subscribe_to_gateways_presence_in_account(socket.assigns.account) + :ok = Events.Hooks.Accounts.subscribe_to_gateways_presence(socket.assigns.account.id) end {:ok, managed_groups, _metadata} = diff --git a/elixir/apps/web/lib/web/live/sites/new_token.ex b/elixir/apps/web/lib/web/live/sites/new_token.ex index c82225c2f..6fd3c83b2 100644 --- a/elixir/apps/web/lib/web/live/sites/new_token.ex +++ b/elixir/apps/web/lib/web/live/sites/new_token.ex @@ -1,6 +1,7 @@ defmodule Web.Sites.NewToken do use Web, :live_view alias Domain.Gateways + alias Domain.Events def mount(%{"id" => id}, _session, socket) do with {:ok, group} <- @@ -12,7 +13,7 @@ defmodule Web.Sites.NewToken do {group, token, env} = if connected?(socket) do {:ok, token, encoded_token} = Gateways.create_token(group, %{}, socket.assigns.subject) - :ok = Gateways.subscribe_to_gateways_presence_in_group(group) + :ok = Events.Hooks.GatewayGroups.subscribe_to_presence(group.id) {group, token, env(encoded_token)} else {group, nil, nil} diff --git a/elixir/apps/web/lib/web/live/sites/show.ex b/elixir/apps/web/lib/web/live/sites/show.ex index f863a2d8e..e10821167 100644 --- a/elixir/apps/web/lib/web/live/sites/show.ex +++ b/elixir/apps/web/lib/web/live/sites/show.ex @@ -1,12 +1,12 @@ defmodule Web.Sites.Show do use Web, :live_view - alias Domain.{Gateways, Resources, Policies, Flows, Tokens} + alias Domain.{Gateways, Resources, Policies, Events, Flows, Tokens} def mount(%{"id" => id}, _session, socket) do with {:ok, group} <- Gateways.fetch_group_by_id(id, socket.assigns.subject) do if connected?(socket) do - :ok = Gateways.subscribe_to_gateways_presence_in_group(group) + :ok = Events.Hooks.GatewayGroups.subscribe_to_presence(group.id) end socket = diff --git a/elixir/apps/web/test/web/live/gateways/show_test.exs b/elixir/apps/web/test/web/live/gateways/show_test.exs index 8a7f86366..6c09a8801 100644 --- a/elixir/apps/web/test/web/live/gateways/show_test.exs +++ b/elixir/apps/web/test/web/live/gateways/show_test.exs @@ -1,5 +1,6 @@ defmodule Web.Live.Gateways.ShowTest do use Web.ConnCase, async: true + alias Domain.Events setup do account = Fixtures.Accounts.create_account() @@ -102,7 +103,7 @@ defmodule Web.Live.Gateways.ShowTest do identity: identity, conn: conn } do - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) {:ok, lv, _html} = conn @@ -149,8 +150,8 @@ defmodule Web.Live.Gateways.ShowTest do |> authorize_conn(identity) |> live(~p"/#{account}/gateways/#{gateway}") - :ok = Domain.Gateways.subscribe_to_gateways_presence_in_group(gateway.group_id) - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.GatewayGroups.subscribe_to_presence(gateway.group.id) + :ok = Events.Hooks.Gateways.connect(gateway) assert_receive %{topic: "presences:group_gateways:" <> _} table = diff --git a/elixir/apps/web/test/web/live/sites/gateways/index_test.exs b/elixir/apps/web/test/web/live/sites/gateways/index_test.exs index b8157ba85..e9a3a6262 100644 --- a/elixir/apps/web/test/web/live/sites/gateways/index_test.exs +++ b/elixir/apps/web/test/web/live/sites/gateways/index_test.exs @@ -1,5 +1,6 @@ defmodule Web.Live.Sites.Gateways.IndexTest do use Web.ConnCase, async: true + alias Domain.Events setup do account = Fixtures.Accounts.create_account() @@ -95,8 +96,8 @@ defmodule Web.Live.Sites.Gateways.IndexTest do |> authorize_conn(identity) |> live(~p"/#{account}/sites/#{group}/gateways") - :ok = Domain.Gateways.subscribe_to_gateways_presence_in_group(group) - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.GatewayGroups.subscribe_to_presence(group.id) + :ok = Events.Hooks.Gateways.connect(gateway) assert_receive %Phoenix.Socket.Broadcast{topic: "presences:group_gateways:" <> _} wait_for(fn -> diff --git a/elixir/apps/web/test/web/live/sites/index_test.exs b/elixir/apps/web/test/web/live/sites/index_test.exs index 6970f155c..ef0c7c636 100644 --- a/elixir/apps/web/test/web/live/sites/index_test.exs +++ b/elixir/apps/web/test/web/live/sites/index_test.exs @@ -1,5 +1,6 @@ defmodule Web.Live.Sites.IndexTest do use Web.ConnCase, async: true + alias Domain.Events setup do account = Fixtures.Accounts.create_account() @@ -97,8 +98,8 @@ defmodule Web.Live.Sites.IndexTest do |> live(~p"/#{account}/sites") Domain.Config.put_env_override(:test_pid, self()) - :ok = Domain.Gateways.subscribe_to_gateways_presence_in_account(account) - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Accounts.subscribe_to_gateways_presence(account.id) + :ok = Events.Hooks.Gateways.connect(gateway) assert_receive %Phoenix.Socket.Broadcast{topic: "presences:account_gateways:" <> _} assert_receive {:live_table_reloaded, "groups"}, 250 @@ -142,7 +143,7 @@ defmodule Web.Live.Sites.IndexTest do group = Fixtures.Gateways.create_internet_group(account: account) gateway = Fixtures.Gateways.create_gateway(account: account, group: group) Domain.Config.put_env_override(:test_pid, self()) - :ok = Domain.Gateways.subscribe_to_gateways_presence_in_account(account) + :ok = Events.Hooks.Accounts.subscribe_to_gateways_presence(account.id) {:ok, lv, _html} = conn @@ -155,7 +156,7 @@ defmodule Web.Live.Sites.IndexTest do assert has_element?(lv, "#internet-site-banner a[href='/#{account.slug}/sites/#{group.id}']") - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) assert_receive %Phoenix.Socket.Broadcast{topic: "presences:account_gateways:" <> _} assert_receive {:live_table_reloaded, "groups"}, 250 assert lv |> element("#internet-site-banner") |> render() =~ "Online" diff --git a/elixir/apps/web/test/web/live/sites/new_token_test.exs b/elixir/apps/web/test/web/live/sites/new_token_test.exs index f2d661a8e..cf087e69a 100644 --- a/elixir/apps/web/test/web/live/sites/new_token_test.exs +++ b/elixir/apps/web/test/web/live/sites/new_token_test.exs @@ -1,5 +1,6 @@ defmodule Web.Live.Sites.NewTokenTest do use Web.ConnCase, async: true + alias Domain.Events setup do account = Fixtures.Accounts.create_account() @@ -36,11 +37,11 @@ defmodule Web.Live.Sites.NewTokenTest do |> List.last() |> String.trim(""") - :ok = Domain.Gateways.subscribe_to_gateways_presence_in_group(group) + :ok = Events.Hooks.GatewayGroups.subscribe_to_presence(group.id) context = Fixtures.Auth.build_context(type: :gateway_group) assert {:ok, group, token} = Domain.Gateways.authenticate(token, context) gateway = Fixtures.Gateways.create_gateway(account: account, group: group, token: token) - Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) assert_receive %Phoenix.Socket.Broadcast{topic: "presences:group_gateways:" <> _group_id} diff --git a/elixir/apps/web/test/web/live/sites/show_test.exs b/elixir/apps/web/test/web/live/sites/show_test.exs index bac82c2c8..e55fca907 100644 --- a/elixir/apps/web/test/web/live/sites/show_test.exs +++ b/elixir/apps/web/test/web/live/sites/show_test.exs @@ -1,5 +1,6 @@ defmodule Web.Live.Sites.ShowTest do use Web.ConnCase, async: true + alias Domain.Events setup do account = Fixtures.Accounts.create_account() @@ -142,7 +143,7 @@ defmodule Web.Live.Sites.ShowTest do gateway: gateway, conn: conn } do - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) Fixtures.Gateways.create_gateway(account: account, group: group) {:ok, lv, _html} = @@ -179,8 +180,8 @@ defmodule Web.Live.Sites.ShowTest do |> authorize_conn(identity) |> live(~p"/#{account}/sites/#{group}") - :ok = Domain.Gateways.subscribe_to_gateways_presence_in_group(group) - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.GatewayGroups.subscribe_to_presence(group.id) + :ok = Events.Hooks.Gateways.connect(gateway) assert_receive %Phoenix.Socket.Broadcast{topic: "presences:group_gateways:" <> _} wait_for(fn -> @@ -377,7 +378,7 @@ defmodule Web.Live.Sites.ShowTest do gateway: gateway, conn: conn } do - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) Fixtures.Gateways.create_gateway(account: account, group: group) {:ok, lv, _html} = @@ -413,8 +414,8 @@ defmodule Web.Live.Sites.ShowTest do |> authorize_conn(identity) |> live(~p"/#{account}/sites/#{group}") - :ok = Domain.Gateways.subscribe_to_gateways_presence_in_group(group) - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.GatewayGroups.subscribe_to_presence(group.id) + :ok = Events.Hooks.Gateways.connect(gateway) assert_receive %Phoenix.Socket.Broadcast{topic: "presences:group_gateways:" <> _} wait_for(fn -> @@ -600,7 +601,7 @@ defmodule Web.Live.Sites.ShowTest do gateway: gateway, conn: conn } do - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.Gateways.connect(gateway) Fixtures.Gateways.create_gateway(account: account, group: group) {:ok, lv, _html} = @@ -636,8 +637,8 @@ defmodule Web.Live.Sites.ShowTest do |> authorize_conn(identity) |> live(~p"/#{account}/sites/#{group}") - :ok = Domain.Gateways.subscribe_to_gateways_presence_in_group(group) - :ok = Domain.Gateways.connect_gateway(gateway) + :ok = Events.Hooks.GatewayGroups.subscribe_to_presence(group.id) + :ok = Events.Hooks.Gateways.connect(gateway) assert_receive %Phoenix.Socket.Broadcast{topic: "presences:group_gateways:" <> _} wait_for(fn ->