refactor(portal): move gateway events to WAL (#9299)

This PR moves Gateway events to be triggered by the WAL broadcaster.
Some things of note that are cleaned up:

- The gateway `:update` event was never received anywhere (but in a
test) and so has been removed
- The account topic has been removed as it was also never acted upon
anywhere. Presence yes, but topic no
- The group topic has also been removed as it was only used to receive
broadcasted disconnects when a group is deleted, but this was already
handled by the token deletion and so is redundant.
This commit is contained in:
Jamil
2025-06-01 09:40:28 -07:00
committed by GitHub
parent bfca4e8411
commit 73c3e2d87b
22 changed files with 234 additions and 289 deletions

View File

@@ -192,7 +192,7 @@ executing this example:
```elixir
[gateway | _rest_gateways] = Domain.Repo.all(Domain.Gateways.Gateway)
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
[relay | _rest_relays] = Domain.Repo.all(Domain.Relays.Relay)
relay_secret = Domain.Crypto.random_token()

View File

@@ -89,8 +89,8 @@ defmodule API.Client.Channel do
|> Enum.flat_map(& &1.gateway_groups)
|> Enum.uniq()
|> Enum.each(fn gateway_group ->
:ok = Gateways.unsubscribe_from_group_updates(gateway_group)
:ok = Gateways.subscribe_to_group_updates(gateway_group)
:ok = Events.Hooks.GatewayGroups.unsubscribe(gateway_group.id)
:ok = Events.Hooks.GatewayGroups.subscribe(gateway_group.id)
end)
# Return all connected relays for the account
@@ -638,8 +638,8 @@ defmodule API.Client.Channel do
ice_credentials = generate_ice_credentials(socket.assigns.client, gateway)
:ok =
Gateways.broadcast_to_gateway(
gateway,
Events.Hooks.Gateways.broadcast(
gateway.id,
{:authorize_flow, {self(), socket_ref(socket)},
%{
client_id: socket.assigns.client.id,
@@ -781,8 +781,8 @@ defmodule API.Client.Channel do
opentelemetry_span_ctx = OpenTelemetry.Tracer.current_span_ctx()
:ok =
Gateways.broadcast_to_gateway(
gateway,
Events.Hooks.Gateways.broadcast(
gateway.id,
{:allow_access, {self(), socket_ref(socket)},
%{
client_id: socket.assigns.client.id,
@@ -843,8 +843,8 @@ defmodule API.Client.Channel do
opentelemetry_span_ctx = OpenTelemetry.Tracer.current_span_ctx()
:ok =
Gateways.broadcast_to_gateway(
gateway,
Events.Hooks.Gateways.broadcast(
gateway.id,
{:request_connection, {self(), socket_ref(socket)},
%{
client_id: socket.assigns.client.id,
@@ -890,7 +890,7 @@ defmodule API.Client.Channel do
:ok =
Enum.each(gateway_ids, fn gateway_id ->
Gateways.broadcast_to_gateway(
Events.Hooks.Gateways.broadcast(
gateway_id,
{:ice_candidates, socket.assigns.client.id, candidates,
{opentelemetry_ctx, opentelemetry_span_ctx}}
@@ -915,7 +915,7 @@ defmodule API.Client.Channel do
:ok =
Enum.each(gateway_ids, fn gateway_id ->
Gateways.broadcast_to_gateway(
Events.Hooks.Gateways.broadcast(
gateway_id,
{:invalidate_ice_candidates, socket.assigns.client.id, candidates,
{opentelemetry_ctx, opentelemetry_span_ctx}}

View File

@@ -1,7 +1,7 @@
defmodule API.Gateway.Channel do
use API, :channel
alias API.Gateway.Views
alias Domain.{Clients, Events, Resources, Relays, Gateways, Flows}
alias Domain.{Clients, Events, Resources, Relays, Flows}
alias Domain.Relays.Presence.Debouncer
require Logger
require OpenTelemetry.Tracer
@@ -36,7 +36,7 @@ defmodule API.Gateway.Channel do
OpenTelemetry.Tracer.set_current_span(opentelemetry_span_ctx)
OpenTelemetry.Tracer.with_span "gateway.after_join" do
:ok = Gateways.connect_gateway(socket.assigns.gateway)
:ok = Events.Hooks.Gateways.connect(socket.assigns.gateway)
config = Domain.Config.fetch_env!(:domain, Domain.Gateways)
ipv4_masquerade_enabled? = Keyword.fetch!(config, :gateway_ipv4_masquerade)

View File

@@ -1093,7 +1093,7 @@ defmodule API.Client.ChannelTest do
resource = Fixtures.Resources.create_resource(account: account)
gateway = Fixtures.Gateways.create_gateway(account: account)
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
attrs = %{
"resource_id" => resource.id,
@@ -1140,7 +1140,7 @@ defmodule API.Client.ChannelTest do
"connected_gateway_ids" => []
}
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
push(socket, "create_flow", attrs)
# assert_reply ref, :error, %{reason: :not_found}
@@ -1160,7 +1160,7 @@ defmodule API.Client.ChannelTest do
socket: socket
} do
gateway = Fixtures.Gateways.create_gateway(account: account)
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
push(socket, "create_flow", %{
"resource_id" => resource.id,
@@ -1200,7 +1200,7 @@ defmodule API.Client.ChannelTest do
last_seen_at: DateTime.utc_now() |> DateTime.add(-10, :second)
)
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
Domain.PubSub.subscribe(Domain.Tokens.socket_id(gateway_group_token))
push(socket, "create_flow", %{
@@ -1255,7 +1255,7 @@ defmodule API.Client.ChannelTest do
last_seen_at: DateTime.utc_now() |> DateTime.add(-10, :second)
)
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
Domain.PubSub.subscribe(Domain.Tokens.socket_id(gateway_group_token))
push(socket, "create_flow", %{
@@ -1305,7 +1305,7 @@ defmodule API.Client.ChannelTest do
last_seen_at: DateTime.utc_now() |> DateTime.add(-10, :second)
)
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
Domain.PubSub.subscribe(Domain.Tokens.socket_id(gateway_group_token))
push(socket, "create_flow", %{
@@ -1409,7 +1409,7 @@ defmodule API.Client.ChannelTest do
last_seen_at: DateTime.utc_now() |> DateTime.add(-10, :second)
)
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
Domain.PubSub.subscribe(Domain.Tokens.socket_id(gateway_group_token))
push(socket, "create_flow", %{
@@ -1455,7 +1455,7 @@ defmodule API.Client.ChannelTest do
)
)
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
{:ok, _reply, socket} =
API.Client.Socket
@@ -1490,7 +1490,7 @@ defmodule API.Client.ChannelTest do
)
)
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
push(socket, "create_flow", %{
"resource_id" => resource.id,
@@ -1527,7 +1527,7 @@ defmodule API.Client.ChannelTest do
group: gateway_group
)
:ok = Domain.Gateways.connect_gateway(gateway1)
:ok = Events.Hooks.Gateways.connect(gateway1)
gateway2 =
Fixtures.Gateways.create_gateway(
@@ -1535,7 +1535,7 @@ defmodule API.Client.ChannelTest do
group: gateway_group
)
:ok = Domain.Gateways.connect_gateway(gateway2)
:ok = Events.Hooks.Gateways.connect(gateway2)
push(socket, "create_flow", %{
"resource_id" => resource.id,
@@ -1586,7 +1586,7 @@ defmodule API.Client.ChannelTest do
resource = Fixtures.Resources.create_resource(account: account)
gateway = Fixtures.Gateways.create_gateway(account: account)
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
attrs = %{
"resource_id" => resource.id
@@ -1602,7 +1602,7 @@ defmodule API.Client.ChannelTest do
socket: socket
} do
gateway = Fixtures.Gateways.create_gateway(account: account)
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
ref = push(socket, "prepare_connection", %{"resource_id" => resource.id})
assert_reply ref, :error, %{reason: :offline}
@@ -1629,7 +1629,7 @@ defmodule API.Client.ChannelTest do
last_seen_at: DateTime.utc_now() |> DateTime.add(-10, :second)
)
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
ref = push(socket, "prepare_connection", %{"resource_id" => resource.id})
resource_id = resource.id
@@ -1651,7 +1651,7 @@ defmodule API.Client.ChannelTest do
socket: socket
} do
gateway = Fixtures.Gateways.create_gateway(account: account)
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
ref = push(socket, "prepare_connection", %{"resource_id" => dns_resource.id})
assert_reply ref, :error, %{reason: :offline}
@@ -1698,7 +1698,7 @@ defmodule API.Client.ChannelTest do
resource: resource
)
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
ref = push(socket, "prepare_connection", %{"resource_id" => resource.id})
resource_id = resource.id
@@ -1718,7 +1718,7 @@ defmodule API.Client.ChannelTest do
last_seen_at: DateTime.utc_now() |> DateTime.add(-10, :second)
)
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
ref = push(socket, "prepare_connection", %{"resource_id" => resource.id})
@@ -1763,7 +1763,7 @@ defmodule API.Client.ChannelTest do
}
)
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
ref = push(socket, "prepare_connection", %{"resource_id" => resource.id})
resource_id = resource.id
@@ -1783,7 +1783,7 @@ defmodule API.Client.ChannelTest do
last_seen_at: DateTime.utc_now() |> DateTime.add(-10, :second)
)
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
ref = push(socket, "prepare_connection", %{"resource_id" => resource.id})
@@ -1835,7 +1835,7 @@ defmodule API.Client.ChannelTest do
last_seen_at: DateTime.utc_now() |> DateTime.add(-10, :second)
)
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
ref = push(socket, "prepare_connection", %{"resource_id" => resource.id})
@@ -1877,7 +1877,7 @@ defmodule API.Client.ChannelTest do
)
)
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
{:ok, _reply, socket} =
API.Client.Socket
@@ -1904,7 +1904,7 @@ defmodule API.Client.ChannelTest do
)
)
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
ref = push(socket, "prepare_connection", %{"resource_id" => resource.id})
@@ -1941,7 +1941,7 @@ defmodule API.Client.ChannelTest do
socket: socket
} do
gateway = Fixtures.Gateways.create_gateway(account: account)
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
attrs = %{
"resource_id" => resource.id,
@@ -1986,7 +1986,7 @@ defmodule API.Client.ChannelTest do
"payload" => "DNS_Q"
}
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
ref = push(socket, "reuse_connection", attrs)
@@ -2003,7 +2003,7 @@ defmodule API.Client.ChannelTest do
resource = Fixtures.Resources.create_resource(account: account)
gateway = Fixtures.Gateways.create_gateway(account: account)
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
attrs = %{
"resource_id" => resource.id,
@@ -2040,7 +2040,7 @@ defmodule API.Client.ChannelTest do
resource_id = resource.id
client_id = client.id
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
attrs = %{
"resource_id" => resource.id,
@@ -2100,7 +2100,7 @@ defmodule API.Client.ChannelTest do
})
|> subscribe_and_join(API.Client.Channel, "client")
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
Phoenix.PubSub.subscribe(Domain.PubSub, Domain.Tokens.socket_id(gateway_group_token))
push(socket, "reuse_connection", %{
@@ -2144,7 +2144,7 @@ defmodule API.Client.ChannelTest do
socket: socket
} do
gateway = Fixtures.Gateways.create_gateway(account: account)
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
attrs = %{
"resource_id" => resource.id,
@@ -2164,7 +2164,7 @@ defmodule API.Client.ChannelTest do
resource = Fixtures.Resources.create_resource(account: account)
gateway = Fixtures.Gateways.create_gateway(account: account)
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
attrs = %{
"resource_id" => resource.id,
@@ -2211,7 +2211,7 @@ defmodule API.Client.ChannelTest do
"client_preshared_key" => "PSK"
}
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
ref = push(socket, "request_connection", attrs)
@@ -2248,7 +2248,7 @@ defmodule API.Client.ChannelTest do
resource_id = resource.id
client_id = client.id
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
Domain.PubSub.subscribe(Domain.Tokens.socket_id(gateway_group_token))
attrs = %{
@@ -2311,7 +2311,7 @@ defmodule API.Client.ChannelTest do
})
|> subscribe_and_join(API.Client.Channel, "client")
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
Phoenix.PubSub.subscribe(Domain.PubSub, Domain.Tokens.socket_id(gateway_group_token))
push(socket, "request_connection", %{
@@ -2353,7 +2353,7 @@ defmodule API.Client.ChannelTest do
"gateway_ids" => [gateway.id]
}
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
Domain.PubSub.subscribe(Domain.Tokens.socket_id(gateway_group_token))
push(socket, "broadcast_ice_candidates", attrs)
@@ -2391,7 +2391,7 @@ defmodule API.Client.ChannelTest do
"gateway_ids" => [gateway.id]
}
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
Domain.PubSub.subscribe(Domain.Tokens.socket_id(gateway_group_token))
push(socket, "broadcast_invalidated_ice_candidates", attrs)

View File

@@ -53,7 +53,9 @@ defmodule API.Gateway.ChannelTest do
describe "join/3" do
test "tracks presence after join", %{account: account, gateway: gateway} do
presence =
Domain.Gateways.Presence.list(Domain.Gateways.account_gateways_presence_topic(account))
account.id
|> Events.Hooks.Accounts.gateways_presence_topic()
|> Domain.Gateways.Presence.list()
assert %{metas: [%{online_at: online_at, phx_ref: _ref}]} = Map.fetch!(presence, gateway.id)
assert is_number(online_at)

View File

@@ -43,16 +43,26 @@ defmodule Domain.Events.Hooks.Accounts do
|> PubSub.subscribe()
end
def subscribe_to_gateways_presence(account_id) do
account_id
|> gateways_presence_topic()
|> PubSub.subscribe()
end
# No unsubscribe needed - account deletions destroy any subscribed entities
def clients_presence_topic(account_or_id) do
"presences:#{clients_topic(account_or_id)}"
def clients_presence_topic(account_id) do
"presences:#{clients_topic(account_id)}"
end
def clients_topic(account_id) do
"account_clients:#{account_id}"
end
def gateways_presence_topic(account_id) do
"presences:account_gateways:#{account_id}"
end
defp topic(account_id) do
"accounts:#{account_id}"
end

View File

@@ -1,4 +1,6 @@
defmodule Domain.Events.Hooks.GatewayGroups do
alias Domain.PubSub
def on_insert(_data) do
:ok
end
@@ -10,4 +12,30 @@ defmodule Domain.Events.Hooks.GatewayGroups do
def on_delete(_old_data) do
:ok
end
def subscribe(gateway_group_id) do
gateway_group_id
|> topic()
|> PubSub.subscribe()
end
def subscribe_to_presence(gateway_group_id) do
gateway_group_id
|> presence_topic()
|> PubSub.subscribe()
end
def unsubscribe(gateway_group_id) do
gateway_group_id
|> topic()
|> PubSub.unsubscribe()
end
def presence_topic(gateway_group_id) do
"presences:#{topic(gateway_group_id)}"
end
defp topic(gateway_group_id) do
"group_gateways:#{gateway_group_id}"
end
end

View File

@@ -1,13 +1,61 @@
defmodule Domain.Events.Hooks.Gateways do
alias Domain.PubSub
alias Domain.Gateways
alias Domain.Events
def on_insert(_data) do
:ok
end
# Soft-delete
def on_update(%{"deleted_at" => nil} = old_data, %{"deleted_at" => deleted_at} = _data)
when not is_nil(deleted_at) do
on_delete(old_data)
end
# Regular update
def on_update(_old_data, _data) do
:ok
end
def on_delete(_old_data) do
:ok
def on_delete(%{"id" => gateway_id} = _old_data) do
disconnect(gateway_id)
end
def connect(%Gateways.Gateway{} = gateway) do
with {:ok, _} <-
Gateways.Presence.track(
self(),
Events.Hooks.GatewayGroups.presence_topic(gateway.group_id),
gateway.id,
%{}
),
{:ok, _} <-
Gateways.Presence.track(
self(),
Events.Hooks.Accounts.gateways_presence_topic(gateway.account_id),
gateway.id,
%{
online_at: System.system_time(:second)
}
) do
:ok = PubSub.subscribe(topic(gateway.id))
:ok
end
end
def broadcast(gateway_id, payload) do
gateway_id
|> topic()
|> PubSub.broadcast(payload)
end
defp disconnect(gateway_id) do
gateway_id
|> broadcast("disconnect")
end
defp topic(gateway_id) do
"gateways:#{gateway_id}"
end
end

View File

@@ -1,7 +1,7 @@
defmodule Domain.Gateways do
use Supervisor
alias Domain.Accounts.Account
alias Domain.{Repo, Auth, Geo, PubSub}
alias Domain.{Repo, Auth, Events, Geo}
alias Domain.{Accounts, Resources, Tokens, Billing}
alias Domain.Gateways.{Authorizer, Gateway, Group, Presence}
@@ -118,15 +118,6 @@ defmodule Domain.Gateways do
|> Group.Changeset.update(attrs, subject)
end
)
|> case do
{:ok, group} ->
# TODO: WAL
:ok = broadcast_to_group(group, :updated)
{:ok, group}
{:error, reason} ->
{:error, reason}
end
end
end
@@ -137,6 +128,7 @@ defmodule Domain.Gateways do
|> Authorizer.for_subject(subject)
|> Repo.fetch_and_update(Group.Query,
with: fn group ->
# Token deletion will disconnect gateways
{:ok, _tokens} = Tokens.delete_tokens_for(group, subject)
{:ok, _count} = Resources.delete_connections_for(group, subject)
@@ -146,8 +138,7 @@ defmodule Domain.Gateways do
|> Repo.update_all(set: [deleted_at: DateTime.utc_now()])
Group.Changeset.delete(group)
end,
after_commit: &disconnect_gateways_in_group/1
end
)
end
end
@@ -230,7 +221,7 @@ defmodule Domain.Gateways do
@doc false
def preload_gateways_presence([gateway]) do
gateway.account_id
|> account_gateways_presence_topic()
|> Events.Hooks.Accounts.gateways_presence_topic()
|> Presence.get_by_key(gateway.id)
|> case do
[] -> %{gateway | online?: false}
@@ -247,7 +238,9 @@ defmodule Domain.Gateways do
|> Enum.reject(&is_nil/1)
|> Enum.uniq()
|> Enum.reduce(%{}, fn account_id, acc ->
connected_gateways = account_id |> account_gateways_presence_topic() |> Presence.list()
connected_gateways =
account_id |> Events.Hooks.Accounts.gateways_presence_topic() |> Presence.list()
Map.merge(acc, connected_gateways)
end)
@@ -258,7 +251,7 @@ defmodule Domain.Gateways do
def all_online_gateway_ids_by_group_id!(group_id) do
group_id
|> group_gateways_presence_topic()
|> Events.Hooks.GatewayGroups.presence_topic()
|> Presence.list()
|> Map.keys()
end
@@ -273,7 +266,7 @@ defmodule Domain.Gateways do
connected_gateway_ids =
resource.account_id
|> account_gateways_presence_topic()
|> Events.Hooks.Accounts.gateways_presence_topic()
|> Presence.list()
|> Map.keys()
@@ -296,7 +289,7 @@ defmodule Domain.Gateways do
def gateway_can_connect_to_resource?(%Gateway{} = gateway, %Resources.Resource{} = resource) do
connected_gateway_ids =
resource.account_id
|> account_gateways_presence_topic()
|> Events.Hooks.Accounts.gateways_presence_topic()
|> Presence.list()
|> Map.keys()
@@ -367,7 +360,6 @@ defmodule Domain.Gateways do
|> Authorizer.for_subject(subject)
|> Repo.fetch_and_update(Gateway.Query,
with: &Gateway.Changeset.delete/1,
after_commit: &disconnect_gateway/1,
preload: [:online?]
)
end
@@ -419,30 +411,6 @@ defmodule Domain.Gateways do
end
end
def connect_gateway(%Gateway{} = gateway) do
with {:ok, _} <-
Presence.track(
self(),
group_gateways_presence_topic(gateway.group_id),
gateway.id,
%{}
),
{:ok, _} <-
Presence.track(
self(),
account_gateways_presence_topic(gateway.account_id),
gateway.id,
%{
online_at: System.system_time(:second)
}
) do
:ok = PubSub.subscribe(gateway_topic(gateway))
:ok = PubSub.subscribe(group_gateways_topic(gateway.group_id))
:ok = PubSub.subscribe(account_gateways_topic(gateway.account_id))
:ok
end
end
def gateway_outdated?(gateway) do
latest_release = Domain.ComponentVersions.gateway_version()
@@ -451,102 +419,4 @@ defmodule Domain.Gateways do
_ -> false
end
end
### Presence
def account_gateways_presence_topic(account_or_id),
do: "presences:#{account_gateways_topic(account_or_id)}"
defp group_gateways_presence_topic(group_or_id),
do: "presences:#{group_gateways_topic(group_or_id)}"
### PubSub
defp gateway_topic(%Gateway{} = gateway), do: gateway_topic(gateway.id)
defp gateway_topic(gateway_id), do: "gateways:#{gateway_id}"
defp account_gateways_topic(%Accounts.Account{} = account),
do: account_gateways_topic(account.id)
defp account_gateways_topic(account_id),
do: "account_gateways:#{account_id}"
defp group_gateways_topic(%Group{} = group), do: group_gateways_topic(group.id)
defp group_gateways_topic(group_id), do: "group_gateways:#{group_id}"
defp group_topic(%Group{} = group), do: group_topic(group.id)
defp group_topic(group_id), do: "group:#{group_id}"
def subscribe_to_group_updates(group_or_id) do
group_or_id
|> group_topic()
|> PubSub.subscribe()
end
def unsubscribe_from_group_updates(group_or_id) do
group_or_id
|> group_topic()
|> PubSub.unsubscribe()
end
def subscribe_to_gateways_presence_in_account(%Accounts.Account{} = account) do
account
|> account_gateways_presence_topic()
|> PubSub.subscribe()
end
def unsubscribe_from_gateways_presence_in_account(%Accounts.Account{} = account) do
account
|> account_gateways_presence_topic()
|> PubSub.unsubscribe()
end
def subscribe_to_gateways_presence_in_group(group_or_id) do
group_or_id
|> group_gateways_presence_topic()
|> PubSub.subscribe()
end
def unsubscribe_from_gateways_presence_in_group(group_or_id) do
group_or_id
|> group_gateways_presence_topic()
|> PubSub.unsubscribe()
end
# TODO: WAL
def broadcast_to_group(group_or_id, payload) do
group_or_id
|> group_topic()
|> PubSub.broadcast(payload)
end
def broadcast_to_gateway(gateway_or_id, payload) do
gateway_or_id
|> gateway_topic()
|> PubSub.broadcast(payload)
end
defp broadcast_to_gateways_in_group(group_or_id, payload) do
group_or_id
|> group_gateways_topic()
|> PubSub.broadcast(payload)
end
defp broadcast_to_gateways_in_account(account_or_id, payload) do
account_or_id
|> account_gateways_topic()
|> PubSub.broadcast(payload)
end
def disconnect_gateway(gateway_or_id) do
broadcast_to_gateway(gateway_or_id, "disconnect")
end
def disconnect_gateways_in_group(group_or_id) do
broadcast_to_gateways_in_group(group_or_id, "disconnect")
end
def disconnect_gateways_in_account(account_or_id) do
broadcast_to_gateways_in_account(account_or_id, "disconnect")
end
end

View File

@@ -1,5 +1,5 @@
defmodule Domain.Events.Hooks.GatewaysTest do
use ExUnit.Case, async: true
use Domain.DataCase, async: true
import Domain.Events.Hooks.Gateways
setup do
@@ -13,14 +13,41 @@ defmodule Domain.Events.Hooks.GatewaysTest do
end
describe "update/2" do
test "returns :ok", %{old_data: old_data, data: data} do
assert :ok == on_update(old_data, data)
test "soft-delete broadcasts disconnect" do
gateway = Fixtures.Gateways.create_gateway()
old_data = %{"id" => gateway.id, "deleted_at" => nil}
data = %{"id" => gateway.id, "deleted_at" => "2023-10-01T00:00:00Z"}
:ok = connect(gateway)
:ok = on_update(old_data, data)
assert_receive "disconnect"
end
test "regular update does not broadcast disconnect" do
gateway = Fixtures.Gateways.create_gateway()
old_data = %{"id" => gateway.id}
data = %{"id" => gateway.id, "name" => "New Gateway Name"}
:ok = connect(gateway)
:ok = on_update(old_data, data)
refute_receive "disconnect"
end
end
describe "delete/1" do
test "returns :ok", %{data: data} do
assert :ok == on_delete(data)
test "delete broadcasts disconnect" do
gateway = Fixtures.Gateways.create_gateway()
old_data = %{"id" => gateway.id}
:ok = connect(gateway)
:ok = on_delete(old_data)
assert_receive "disconnect"
end
end
end

View File

@@ -1,7 +1,7 @@
defmodule Domain.GatewaysTest do
use Domain.DataCase, async: true
import Domain.Gateways
alias Domain.{Gateways, Tokens, Resources}
alias Domain.{Events, Gateways, Tokens, Resources}
setup do
account = Fixtures.Accounts.create_account()
@@ -299,19 +299,6 @@ defmodule Domain.GatewaysTest do
assert group.name == "foo"
end
test "broadcasts an update event", %{account: account, subject: subject} do
group = Fixtures.Gateways.create_group(account: account)
:ok = subscribe_to_group_updates(group)
attrs = %{
name: "foo"
}
assert {:ok, _group} = update_group(group, attrs, subject)
assert_receive :updated
end
test "returns error when subject has no permission to manage groups", %{
account: account,
subject: subject
@@ -635,7 +622,7 @@ defmodule Domain.GatewaysTest do
} do
offline_gateway = Fixtures.Gateways.create_gateway(account: account)
online_gateway = Fixtures.Gateways.create_gateway(account: account)
:ok = connect_gateway(online_gateway)
:ok = Events.Hooks.Gateways.connect(online_gateway)
Fixtures.Gateways.create_gateway()
assert {:ok, gateways, _metadata} = list_gateways(subject, preload: :online?)
@@ -702,7 +689,7 @@ defmodule Domain.GatewaysTest do
connections: [%{gateway_group_id: gateway.group_id}]
)
assert connect_gateway(gateway) == :ok
assert Events.Hooks.Gateways.connect(gateway) == :ok
assert {:ok, [connected_gateway]} = all_connected_gateways_for_resource(resource, subject)
assert connected_gateway.id == gateway.id
@@ -715,7 +702,7 @@ defmodule Domain.GatewaysTest do
resource = Fixtures.Resources.create_resource(account: account)
gateway = Fixtures.Gateways.create_gateway(account: account)
assert connect_gateway(gateway) == :ok
assert Events.Hooks.Gateways.connect(gateway) == :ok
assert all_connected_gateways_for_resource(resource, subject) == {:ok, []}
end
@@ -724,7 +711,7 @@ defmodule Domain.GatewaysTest do
describe "gateway_can_connect_to_resource?/2" do
test "returns true when gateway can connect to resource", %{account: account} do
gateway = Fixtures.Gateways.create_gateway(account: account)
:ok = connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
resource =
Fixtures.Resources.create_resource(
@@ -737,7 +724,7 @@ defmodule Domain.GatewaysTest do
test "returns false when gateway cannot connect to resource", %{account: account} do
gateway = Fixtures.Gateways.create_gateway(account: account)
:ok = connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
resource = Fixtures.Resources.create_resource(account: account)
@@ -1219,55 +1206,18 @@ defmodule Domain.GatewaysTest do
test "does not allow duplicate presence", %{account: account} do
gateway = Fixtures.Gateways.create_gateway(account: account)
assert connect_gateway(gateway) == :ok
assert {:error, {:already_tracked, _pid, _topic, _key}} = connect_gateway(gateway)
assert Events.Hooks.Gateways.connect(gateway) == :ok
assert {:error, {:already_tracked, _pid, _topic, _key}} =
Events.Hooks.Gateways.connect(gateway)
end
test "tracks gateway presence for account", %{account: account} do
gateway = Fixtures.Gateways.create_gateway(account: account)
assert connect_gateway(gateway) == :ok
assert Events.Hooks.Gateways.connect(gateway) == :ok
gateway = fetch_gateway_by_id!(gateway.id, preload: [:online?])
assert gateway.online? == true
end
test "tracks gateway presence for actor", %{account: account} do
gateway = Fixtures.Gateways.create_gateway(account: account)
assert connect_gateway(gateway) == :ok
assert broadcast_to_gateway(gateway, "test") == :ok
assert_receive "test"
end
test "subscribes to gateway events", %{account: account} do
gateway = Fixtures.Gateways.create_gateway(account: account)
assert connect_gateway(gateway) == :ok
assert disconnect_gateway(gateway) == :ok
assert_receive "disconnect"
end
test "subscribes to gateway group events", %{account: account} do
group = Fixtures.Gateways.create_group(account: account)
gateway = Fixtures.Gateways.create_gateway(account: account, group: group)
assert connect_gateway(gateway) == :ok
assert disconnect_gateways_in_group(group) == :ok
assert_receive "disconnect"
end
test "subscribes to account events", %{account: account} do
gateway = Fixtures.Gateways.create_gateway(account: account)
assert connect_gateway(gateway) == :ok
assert disconnect_gateways_in_account(account) == :ok
assert_receive "disconnect"
end
end
end

View File

@@ -1,7 +1,8 @@
defmodule Domain.Notifications.Jobs.OutdatedGatewaysTest do
alias Domain.ComponentVersions
use Domain.DataCase, async: true
import Domain.Notifications.Jobs.OutdatedGateways
alias Domain.ComponentVersions
alias Domain.Events
describe "execute/1" do
setup do
@@ -41,8 +42,8 @@ defmodule Domain.Notifications.Jobs.OutdatedGatewaysTest do
new_config = update_component_versions_config(gateway: new_version)
Domain.Config.put_env_override(ComponentVersions, new_config)
:ok = Domain.Gateways.subscribe_to_gateways_presence_in_group(gateway_group)
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.GatewayGroups.subscribe_to_presence(gateway_group.id)
:ok = Events.Hooks.Gateways.connect(gateway)
assert_receive %Phoenix.Socket.Broadcast{topic: "presences:group_gateways:" <> _}
assert execute(%{}) == :ok
@@ -65,8 +66,8 @@ defmodule Domain.Notifications.Jobs.OutdatedGatewaysTest do
new_config = update_component_versions_config(gateway: version)
Domain.Config.put_env_override(ComponentVersions, new_config)
:ok = Domain.Gateways.subscribe_to_gateways_presence_in_group(gateway_group)
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.GatewayGroups.subscribe_to_presence(gateway_group.id)
:ok = Events.Hooks.Gateways.connect(gateway)
assert_receive %Phoenix.Socket.Broadcast{topic: "presences:group_gateways:" <> _}
assert execute(%{}) == :ok

View File

@@ -1,12 +1,13 @@
defmodule Web.Gateways.Show do
use Web, :live_view
alias Domain.Gateways
alias Domain.Events
def mount(%{"id" => id}, _session, socket) do
with {:ok, gateway} <-
Gateways.fetch_gateway_by_id(id, socket.assigns.subject, preload: [:group, :online?]) do
if connected?(socket) do
:ok = Gateways.subscribe_to_gateways_presence_in_group(gateway.group)
:ok = Events.Hooks.GatewayGroups.subscribe_to_presence(gateway.group_id)
end
socket =

View File

@@ -1,11 +1,12 @@
defmodule Web.Sites.Gateways.Index do
use Web, :live_view
alias Domain.Gateways
alias Domain.Events
def mount(%{"id" => id}, _session, socket) do
with {:ok, group} <- Gateways.fetch_group_by_id(id, socket.assigns.subject) do
if connected?(socket) do
:ok = Gateways.subscribe_to_gateways_presence_in_group(group)
:ok = Events.Hooks.GatewayGroups.subscribe_to_presence(group.id)
end
socket =

View File

@@ -1,11 +1,12 @@
defmodule Web.Sites.Index do
use Web, :live_view
alias Domain.Gateways
alias Domain.Events
require Logger
def mount(_params, _session, socket) do
if connected?(socket) do
:ok = Gateways.subscribe_to_gateways_presence_in_account(socket.assigns.account)
:ok = Events.Hooks.Accounts.subscribe_to_gateways_presence(socket.assigns.account.id)
end
{:ok, managed_groups, _metadata} =

View File

@@ -1,6 +1,7 @@
defmodule Web.Sites.NewToken do
use Web, :live_view
alias Domain.Gateways
alias Domain.Events
def mount(%{"id" => id}, _session, socket) do
with {:ok, group} <-
@@ -12,7 +13,7 @@ defmodule Web.Sites.NewToken do
{group, token, env} =
if connected?(socket) do
{:ok, token, encoded_token} = Gateways.create_token(group, %{}, socket.assigns.subject)
:ok = Gateways.subscribe_to_gateways_presence_in_group(group)
:ok = Events.Hooks.GatewayGroups.subscribe_to_presence(group.id)
{group, token, env(encoded_token)}
else
{group, nil, nil}

View File

@@ -1,12 +1,12 @@
defmodule Web.Sites.Show do
use Web, :live_view
alias Domain.{Gateways, Resources, Policies, Flows, Tokens}
alias Domain.{Gateways, Resources, Policies, Events, Flows, Tokens}
def mount(%{"id" => id}, _session, socket) do
with {:ok, group} <-
Gateways.fetch_group_by_id(id, socket.assigns.subject) do
if connected?(socket) do
:ok = Gateways.subscribe_to_gateways_presence_in_group(group)
:ok = Events.Hooks.GatewayGroups.subscribe_to_presence(group.id)
end
socket =

View File

@@ -1,5 +1,6 @@
defmodule Web.Live.Gateways.ShowTest do
use Web.ConnCase, async: true
alias Domain.Events
setup do
account = Fixtures.Accounts.create_account()
@@ -102,7 +103,7 @@ defmodule Web.Live.Gateways.ShowTest do
identity: identity,
conn: conn
} do
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
{:ok, lv, _html} =
conn
@@ -149,8 +150,8 @@ defmodule Web.Live.Gateways.ShowTest do
|> authorize_conn(identity)
|> live(~p"/#{account}/gateways/#{gateway}")
:ok = Domain.Gateways.subscribe_to_gateways_presence_in_group(gateway.group_id)
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.GatewayGroups.subscribe_to_presence(gateway.group.id)
:ok = Events.Hooks.Gateways.connect(gateway)
assert_receive %{topic: "presences:group_gateways:" <> _}
table =

View File

@@ -1,5 +1,6 @@
defmodule Web.Live.Sites.Gateways.IndexTest do
use Web.ConnCase, async: true
alias Domain.Events
setup do
account = Fixtures.Accounts.create_account()
@@ -95,8 +96,8 @@ defmodule Web.Live.Sites.Gateways.IndexTest do
|> authorize_conn(identity)
|> live(~p"/#{account}/sites/#{group}/gateways")
:ok = Domain.Gateways.subscribe_to_gateways_presence_in_group(group)
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.GatewayGroups.subscribe_to_presence(group.id)
:ok = Events.Hooks.Gateways.connect(gateway)
assert_receive %Phoenix.Socket.Broadcast{topic: "presences:group_gateways:" <> _}
wait_for(fn ->

View File

@@ -1,5 +1,6 @@
defmodule Web.Live.Sites.IndexTest do
use Web.ConnCase, async: true
alias Domain.Events
setup do
account = Fixtures.Accounts.create_account()
@@ -97,8 +98,8 @@ defmodule Web.Live.Sites.IndexTest do
|> live(~p"/#{account}/sites")
Domain.Config.put_env_override(:test_pid, self())
:ok = Domain.Gateways.subscribe_to_gateways_presence_in_account(account)
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Accounts.subscribe_to_gateways_presence(account.id)
:ok = Events.Hooks.Gateways.connect(gateway)
assert_receive %Phoenix.Socket.Broadcast{topic: "presences:account_gateways:" <> _}
assert_receive {:live_table_reloaded, "groups"}, 250
@@ -142,7 +143,7 @@ defmodule Web.Live.Sites.IndexTest do
group = Fixtures.Gateways.create_internet_group(account: account)
gateway = Fixtures.Gateways.create_gateway(account: account, group: group)
Domain.Config.put_env_override(:test_pid, self())
:ok = Domain.Gateways.subscribe_to_gateways_presence_in_account(account)
:ok = Events.Hooks.Accounts.subscribe_to_gateways_presence(account.id)
{:ok, lv, _html} =
conn
@@ -155,7 +156,7 @@ defmodule Web.Live.Sites.IndexTest do
assert has_element?(lv, "#internet-site-banner a[href='/#{account.slug}/sites/#{group.id}']")
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
assert_receive %Phoenix.Socket.Broadcast{topic: "presences:account_gateways:" <> _}
assert_receive {:live_table_reloaded, "groups"}, 250
assert lv |> element("#internet-site-banner") |> render() =~ "Online"

View File

@@ -1,5 +1,6 @@
defmodule Web.Live.Sites.NewTokenTest do
use Web.ConnCase, async: true
alias Domain.Events
setup do
account = Fixtures.Accounts.create_account()
@@ -36,11 +37,11 @@ defmodule Web.Live.Sites.NewTokenTest do
|> List.last()
|> String.trim("&quot;")
:ok = Domain.Gateways.subscribe_to_gateways_presence_in_group(group)
:ok = Events.Hooks.GatewayGroups.subscribe_to_presence(group.id)
context = Fixtures.Auth.build_context(type: :gateway_group)
assert {:ok, group, token} = Domain.Gateways.authenticate(token, context)
gateway = Fixtures.Gateways.create_gateway(account: account, group: group, token: token)
Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
assert_receive %Phoenix.Socket.Broadcast{topic: "presences:group_gateways:" <> _group_id}

View File

@@ -1,5 +1,6 @@
defmodule Web.Live.Sites.ShowTest do
use Web.ConnCase, async: true
alias Domain.Events
setup do
account = Fixtures.Accounts.create_account()
@@ -142,7 +143,7 @@ defmodule Web.Live.Sites.ShowTest do
gateway: gateway,
conn: conn
} do
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
Fixtures.Gateways.create_gateway(account: account, group: group)
{:ok, lv, _html} =
@@ -179,8 +180,8 @@ defmodule Web.Live.Sites.ShowTest do
|> authorize_conn(identity)
|> live(~p"/#{account}/sites/#{group}")
:ok = Domain.Gateways.subscribe_to_gateways_presence_in_group(group)
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.GatewayGroups.subscribe_to_presence(group.id)
:ok = Events.Hooks.Gateways.connect(gateway)
assert_receive %Phoenix.Socket.Broadcast{topic: "presences:group_gateways:" <> _}
wait_for(fn ->
@@ -377,7 +378,7 @@ defmodule Web.Live.Sites.ShowTest do
gateway: gateway,
conn: conn
} do
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
Fixtures.Gateways.create_gateway(account: account, group: group)
{:ok, lv, _html} =
@@ -413,8 +414,8 @@ defmodule Web.Live.Sites.ShowTest do
|> authorize_conn(identity)
|> live(~p"/#{account}/sites/#{group}")
:ok = Domain.Gateways.subscribe_to_gateways_presence_in_group(group)
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.GatewayGroups.subscribe_to_presence(group.id)
:ok = Events.Hooks.Gateways.connect(gateway)
assert_receive %Phoenix.Socket.Broadcast{topic: "presences:group_gateways:" <> _}
wait_for(fn ->
@@ -600,7 +601,7 @@ defmodule Web.Live.Sites.ShowTest do
gateway: gateway,
conn: conn
} do
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.Gateways.connect(gateway)
Fixtures.Gateways.create_gateway(account: account, group: group)
{:ok, lv, _html} =
@@ -636,8 +637,8 @@ defmodule Web.Live.Sites.ShowTest do
|> authorize_conn(identity)
|> live(~p"/#{account}/sites/#{group}")
:ok = Domain.Gateways.subscribe_to_gateways_presence_in_group(group)
:ok = Domain.Gateways.connect_gateway(gateway)
:ok = Events.Hooks.GatewayGroups.subscribe_to_presence(group.id)
:ok = Events.Hooks.Gateways.connect(gateway)
assert_receive %Phoenix.Socket.Broadcast{topic: "presences:group_gateways:" <> _}
wait_for(fn ->