refactor(portal): consolidate pubsub functions (#9529)

We issue broadcasts and subscribes in many places throughout the portal.
To help keep the cognitive overhead low, this PR consolidates all PubSub
functionality to the `Domain.PubSub` module.

This allows for:

- better maintainability
- see all of the topics we use at a glance
- consolidate repeated functionality (saved for a future PR)
- use the module hierarchy to define function names, which feels more
intuitive when reading and sets a convention

We also introduce a `Domain.Events.Hooks` behavior to ensure all hooks
comply with this simple contract, and we also introduce a convention to
standardize on topic names using the module hierarchy defined herein.

Lastly, we add convenience functions to the Presence modules to save a
bit of duplication and chance for errors.

This will make it much easier to maintain PubSub going forward.


Related: #9501
This commit is contained in:
Jamil
2025-06-14 21:30:57 -07:00
committed by GitHub
parent 62c3dd9370
commit c6545fe853
65 changed files with 964 additions and 762 deletions

View File

@@ -1,7 +1,7 @@
defmodule API.Client.Channel do
use API, :channel
alias API.Client.Views
alias Domain.{Accounts, Clients, Actors, Events, Resources, Gateways, Relays, Policies, Flows}
alias Domain.{Accounts, Clients, Actors, PubSub, Resources, Gateways, Relays, Policies, Flows}
alias Domain.Relays.Presence.Debouncer
require Logger
require OpenTelemetry.Tracer
@@ -81,8 +81,8 @@ defmodule API.Client.Channel do
Enum.each(resources, fn resource ->
# TODO: WAL
# Why are we unsubscribing and subscribing again?
:ok = Events.Hooks.Resources.unsubscribe(resource.id)
:ok = Events.Hooks.Resources.subscribe(resource.id)
:ok = PubSub.Resource.unsubscribe(resource.id)
:ok = PubSub.Resource.subscribe(resource.id)
end)
# Subscribe for known gateway group names so that if they are updated - we can render change in the UI
@@ -93,8 +93,8 @@ defmodule API.Client.Channel do
|> Enum.each(fn gateway_group ->
# TODO: WAL
# Why are we unsubscribing and subscribing again?
:ok = Events.Hooks.GatewayGroups.unsubscribe(gateway_group.id)
:ok = Events.Hooks.GatewayGroups.subscribe(gateway_group.id)
:ok = PubSub.GatewayGroup.unsubscribe(gateway_group.id)
:ok = PubSub.GatewayGroup.subscribe(gateway_group.id)
end)
# Return all connected relays for the account
@@ -128,18 +128,18 @@ defmodule API.Client.Channel do
OpenTelemetry.Tracer.set_current_span(opentelemetry_span_ctx)
OpenTelemetry.Tracer.with_span "client.after_join" do
:ok = Events.Hooks.Clients.connect(socket.assigns.client)
:ok = Clients.Presence.connect(socket.assigns.client)
# Subscribe for account config updates
:ok = Events.Hooks.Accounts.subscribe(socket.assigns.client.account_id)
:ok = PubSub.Account.subscribe(socket.assigns.client.account_id)
# We subscribe for membership updates for all actor groups the client is a member of,
:ok = Events.Hooks.Actors.subscribe_to_memberships(socket.assigns.subject.actor.id)
:ok = PubSub.Actor.Memberships.subscribe(socket.assigns.subject.actor.id)
# We subscribe for policy access events for the actor and the groups the client is a member of,
actor_group_ids = Actors.all_actor_group_ids!(socket.assigns.subject.actor)
:ok = Enum.each(actor_group_ids, &Events.Hooks.ActorGroups.subscribe_to_policies/1)
:ok = Events.Hooks.Actors.subscribe_to_policies(socket.assigns.subject.actor.id)
:ok = Enum.each(actor_group_ids, &PubSub.ActorGroup.Policies.subscribe/1)
:ok = PubSub.Actor.Policies.subscribe(socket.assigns.subject.actor.id)
{:ok, socket} = init(socket)
@@ -299,12 +299,12 @@ defmodule API.Client.Channel do
# Those events are broadcasted by Actors whenever a membership is created or deleted
def handle_info({:create_membership, _actor_id, group_id}, socket) do
:ok = Events.Hooks.ActorGroups.subscribe_to_policies(group_id)
:ok = PubSub.ActorGroup.Policies.subscribe(group_id)
{:noreply, socket}
end
def handle_info({:delete_membership, _actor_id, group_id}, socket) do
:ok = Events.Hooks.ActorGroups.unsubscribe_from_policies(group_id)
:ok = PubSub.ActorGroup.Policies.unsubscribe(group_id)
{:noreply, socket}
end
@@ -322,8 +322,8 @@ defmodule API.Client.Channel do
} do
# TODO: WAL
# Why are we unsubscribing and subscribing again?
:ok = Events.Hooks.Resources.unsubscribe(resource_id)
:ok = Events.Hooks.Resources.subscribe(resource_id)
:ok = PubSub.Resource.unsubscribe(resource_id)
:ok = PubSub.Resource.subscribe(resource_id)
case Resources.fetch_and_authorize_resource_by_id(resource_id, socket.assigns.subject,
preload: [:gateway_groups]
@@ -364,7 +364,7 @@ defmodule API.Client.Channel do
actor_group_id: actor_group_id,
resource_id: resource_id
} do
:ok = Events.Hooks.Resources.unsubscribe(resource_id)
:ok = PubSub.Resource.unsubscribe(resource_id)
# We potentially can re-create the flow but this will require keep tracking of client connections to gateways,
# which is not worth it as this case should be pretty rare. Instead we just tell client to remove it
@@ -648,7 +648,7 @@ defmodule API.Client.Channel do
ice_credentials = generate_ice_credentials(socket.assigns.client, gateway)
:ok =
Events.Hooks.Gateways.broadcast(
PubSub.Gateway.broadcast(
gateway.id,
{:authorize_flow, {self(), socket_ref(socket)},
%{
@@ -791,7 +791,7 @@ defmodule API.Client.Channel do
opentelemetry_span_ctx = OpenTelemetry.Tracer.current_span_ctx()
:ok =
Events.Hooks.Gateways.broadcast(
PubSub.Gateway.broadcast(
gateway.id,
{:allow_access, {self(), socket_ref(socket)},
%{
@@ -853,7 +853,7 @@ defmodule API.Client.Channel do
opentelemetry_span_ctx = OpenTelemetry.Tracer.current_span_ctx()
:ok =
Events.Hooks.Gateways.broadcast(
PubSub.Gateway.broadcast(
gateway.id,
{:request_connection, {self(), socket_ref(socket)},
%{
@@ -900,7 +900,7 @@ defmodule API.Client.Channel do
:ok =
Enum.each(gateway_ids, fn gateway_id ->
Events.Hooks.Gateways.broadcast(
PubSub.Gateway.broadcast(
gateway_id,
{:ice_candidates, socket.assigns.client.id, candidates,
{opentelemetry_ctx, opentelemetry_span_ctx}}
@@ -925,7 +925,7 @@ defmodule API.Client.Channel do
:ok =
Enum.each(gateway_ids, fn gateway_id ->
Events.Hooks.Gateways.broadcast(
PubSub.Gateway.broadcast(
gateway_id,
{:invalidate_ice_candidates, socket.assigns.client.id, candidates,
{opentelemetry_ctx, opentelemetry_span_ctx}}

View File

@@ -1,7 +1,7 @@
defmodule API.Gateway.Channel do
use API, :channel
alias API.Gateway.Views
alias Domain.{Clients, Events, Resources, Relays, Flows}
alias Domain.{Clients, Gateways, PubSub, Resources, Relays, Flows}
alias Domain.Relays.Presence.Debouncer
require Logger
require OpenTelemetry.Tracer
@@ -36,7 +36,7 @@ defmodule API.Gateway.Channel do
OpenTelemetry.Tracer.set_current_span(opentelemetry_span_ctx)
OpenTelemetry.Tracer.with_span "gateway.after_join" do
:ok = Events.Hooks.Gateways.connect(socket.assigns.gateway)
:ok = Gateways.Presence.connect(socket.assigns.gateway)
config = Domain.Config.fetch_env!(:domain, Domain.Gateways)
ipv4_masquerade_enabled? = Keyword.fetch!(config, :gateway_ipv4_masquerade)
@@ -117,7 +117,7 @@ defmodule API.Gateway.Channel do
# This event is ignored because we will receive a reject_access message from
# the Flows which will trigger a reject_access event
def handle_info({:delete_resource, resource_id}, socket) do
:ok = Events.Hooks.Resources.unsubscribe(resource_id)
:ok = PubSub.Resource.unsubscribe(resource_id)
{:noreply, socket}
end
@@ -134,7 +134,7 @@ defmodule API.Gateway.Channel do
client_id: client_id,
resource_id: resource_id
} do
:ok = Events.Hooks.Flows.unsubscribe(flow_id)
:ok = PubSub.Flow.unsubscribe(flow_id)
push(socket, "reject_access", %{
flow_id: flow_id,
@@ -311,7 +311,7 @@ defmodule API.Gateway.Channel do
} = payload
OpenTelemetry.Tracer.with_span "gateway.authorize_flow" do
:ok = Events.Hooks.Flows.subscribe(flow_id)
:ok = PubSub.Flow.subscribe(flow_id)
Logger.debug("Gateway authorizes a new network flow",
flow_id: flow_id,
@@ -324,8 +324,8 @@ defmodule API.Gateway.Channel do
# TODO: WAL
# Why are we unsubscribing and subscribing again?
:ok = Events.Hooks.Resources.unsubscribe(resource_id)
:ok = Events.Hooks.Resources.subscribe(resource_id)
:ok = PubSub.Resource.unsubscribe(resource_id)
:ok = PubSub.Resource.subscribe(resource_id)
opentelemetry_headers = :otel_propagator_text_map.inject([])
@@ -384,7 +384,7 @@ defmodule API.Gateway.Channel do
client_id: client_id,
resource_id: resource_id
} do
:ok = Events.Hooks.Flows.subscribe(flow_id)
:ok = PubSub.Flow.subscribe(flow_id)
client = Clients.fetch_client_by_id!(client_id)
resource = Resources.fetch_resource_by_id!(resource_id)
@@ -396,8 +396,8 @@ defmodule API.Gateway.Channel do
{:cont, resource} ->
# TODO: WAL
# Why are we unsubscribing and subscribing again?
:ok = Events.Hooks.Resources.unsubscribe(resource_id)
:ok = Events.Hooks.Resources.subscribe(resource_id)
:ok = PubSub.Resource.unsubscribe(resource_id)
:ok = PubSub.Resource.subscribe(resource_id)
opentelemetry_headers = :otel_propagator_text_map.inject([])
ref = encode_ref(socket, {channel_pid, socket_ref, resource_id, opentelemetry_headers})
@@ -451,7 +451,7 @@ defmodule API.Gateway.Channel do
} = attrs
OpenTelemetry.Tracer.with_span "gateway.request_connection" do
:ok = Events.Hooks.Flows.subscribe(flow_id)
:ok = PubSub.Flow.subscribe(flow_id)
Logger.debug("Gateway received connection request message",
client_id: client_id,
@@ -468,8 +468,8 @@ defmodule API.Gateway.Channel do
{:cont, resource} ->
# TODO: WAL
# Why are we unsubscribing and subscribing again?
:ok = Events.Hooks.Resources.unsubscribe(resource_id)
:ok = Events.Hooks.Resources.subscribe(resource_id)
:ok = PubSub.Resource.unsubscribe(resource_id)
:ok = PubSub.Resource.subscribe(resource_id)
opentelemetry_headers = :otel_propagator_text_map.inject([])
ref = encode_ref(socket, {channel_pid, socket_ref, resource_id, opentelemetry_headers})
@@ -609,7 +609,7 @@ defmodule API.Gateway.Channel do
:ok =
Enum.each(client_ids, fn client_id ->
Events.Hooks.Clients.broadcast(
PubSub.Client.broadcast(
client_id,
{:ice_candidates, socket.assigns.gateway.id, candidates,
{opentelemetry_ctx, opentelemetry_span_ctx}}
@@ -634,7 +634,7 @@ defmodule API.Gateway.Channel do
:ok =
Enum.each(client_ids, fn client_id ->
Events.Hooks.Clients.broadcast(
PubSub.Client.broadcast(
client_id,
{:invalidate_ice_candidates, socket.assigns.gateway.id, candidates,
{opentelemetry_ctx, opentelemetry_span_ctx}}

View File

@@ -1,6 +1,6 @@
defmodule API.Client.ChannelTest do
use API.ChannelCase, async: true
alias Domain.Events
alias Domain.{Clients, Events, PubSub}
setup do
account =
@@ -168,8 +168,7 @@ defmodule API.Client.ChannelTest do
describe "join/3" do
test "tracks presence after join", %{account: account, client: client} do
presence =
Domain.Clients.Presence.list(Events.Hooks.Accounts.clients_presence_topic(account.id))
presence = Clients.Presence.Account.list(account.id)
assert %{metas: [%{online_at: online_at, phx_ref: _ref}]} = Map.fetch!(presence, client.id)
assert is_number(online_at)
@@ -502,7 +501,7 @@ defmodule API.Client.ChannelTest do
} do
assert_push "init", %{}
Process.flag(:trap_exit, true)
Events.Hooks.Clients.broadcast(client.id, :token_expired)
PubSub.Client.broadcast(client.id, :token_expired)
assert_push "disconnect", %{reason: :token_expired}, 250
end
@@ -1128,7 +1127,7 @@ defmodule API.Client.ChannelTest do
resource = Fixtures.Resources.create_resource(account: account)
gateway = Fixtures.Gateways.create_gateway(account: account)
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
attrs = %{
"resource_id" => resource.id,
@@ -1175,7 +1174,7 @@ defmodule API.Client.ChannelTest do
"connected_gateway_ids" => []
}
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
push(socket, "create_flow", attrs)
# assert_reply ref, :error, %{reason: :not_found}
@@ -1195,7 +1194,7 @@ defmodule API.Client.ChannelTest do
socket: socket
} do
gateway = Fixtures.Gateways.create_gateway(account: account)
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
push(socket, "create_flow", %{
"resource_id" => resource.id,
@@ -1235,7 +1234,7 @@ defmodule API.Client.ChannelTest do
last_seen_at: DateTime.utc_now() |> DateTime.add(-10, :second)
)
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
Domain.PubSub.subscribe(Domain.Tokens.socket_id(gateway_group_token))
push(socket, "create_flow", %{
@@ -1290,7 +1289,7 @@ defmodule API.Client.ChannelTest do
last_seen_at: DateTime.utc_now() |> DateTime.add(-10, :second)
)
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
Domain.PubSub.subscribe(Domain.Tokens.socket_id(gateway_group_token))
push(socket, "create_flow", %{
@@ -1340,7 +1339,7 @@ defmodule API.Client.ChannelTest do
last_seen_at: DateTime.utc_now() |> DateTime.add(-10, :second)
)
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
Domain.PubSub.subscribe(Domain.Tokens.socket_id(gateway_group_token))
push(socket, "create_flow", %{
@@ -1444,7 +1443,7 @@ defmodule API.Client.ChannelTest do
last_seen_at: DateTime.utc_now() |> DateTime.add(-10, :second)
)
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
Domain.PubSub.subscribe(Domain.Tokens.socket_id(gateway_group_token))
push(socket, "create_flow", %{
@@ -1490,7 +1489,7 @@ defmodule API.Client.ChannelTest do
)
)
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
{:ok, _reply, socket} =
API.Client.Socket
@@ -1525,7 +1524,7 @@ defmodule API.Client.ChannelTest do
)
)
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
push(socket, "create_flow", %{
"resource_id" => resource.id,
@@ -1562,7 +1561,7 @@ defmodule API.Client.ChannelTest do
group: gateway_group
)
:ok = Events.Hooks.Gateways.connect(gateway1)
:ok = Domain.Gateways.Presence.connect(gateway1)
gateway2 =
Fixtures.Gateways.create_gateway(
@@ -1570,7 +1569,7 @@ defmodule API.Client.ChannelTest do
group: gateway_group
)
:ok = Events.Hooks.Gateways.connect(gateway2)
:ok = Domain.Gateways.Presence.connect(gateway2)
push(socket, "create_flow", %{
"resource_id" => resource.id,
@@ -1621,7 +1620,7 @@ defmodule API.Client.ChannelTest do
resource = Fixtures.Resources.create_resource(account: account)
gateway = Fixtures.Gateways.create_gateway(account: account)
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
attrs = %{
"resource_id" => resource.id
@@ -1637,7 +1636,7 @@ defmodule API.Client.ChannelTest do
socket: socket
} do
gateway = Fixtures.Gateways.create_gateway(account: account)
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
ref = push(socket, "prepare_connection", %{"resource_id" => resource.id})
assert_reply ref, :error, %{reason: :offline}
@@ -1664,7 +1663,7 @@ defmodule API.Client.ChannelTest do
last_seen_at: DateTime.utc_now() |> DateTime.add(-10, :second)
)
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
ref = push(socket, "prepare_connection", %{"resource_id" => resource.id})
resource_id = resource.id
@@ -1686,7 +1685,7 @@ defmodule API.Client.ChannelTest do
socket: socket
} do
gateway = Fixtures.Gateways.create_gateway(account: account)
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
ref = push(socket, "prepare_connection", %{"resource_id" => dns_resource.id})
assert_reply ref, :error, %{reason: :offline}
@@ -1733,7 +1732,7 @@ defmodule API.Client.ChannelTest do
resource: resource
)
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
ref = push(socket, "prepare_connection", %{"resource_id" => resource.id})
resource_id = resource.id
@@ -1753,7 +1752,7 @@ defmodule API.Client.ChannelTest do
last_seen_at: DateTime.utc_now() |> DateTime.add(-10, :second)
)
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
ref = push(socket, "prepare_connection", %{"resource_id" => resource.id})
@@ -1798,7 +1797,7 @@ defmodule API.Client.ChannelTest do
}
)
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
ref = push(socket, "prepare_connection", %{"resource_id" => resource.id})
resource_id = resource.id
@@ -1818,7 +1817,7 @@ defmodule API.Client.ChannelTest do
last_seen_at: DateTime.utc_now() |> DateTime.add(-10, :second)
)
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
ref = push(socket, "prepare_connection", %{"resource_id" => resource.id})
@@ -1870,7 +1869,7 @@ defmodule API.Client.ChannelTest do
last_seen_at: DateTime.utc_now() |> DateTime.add(-10, :second)
)
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
ref = push(socket, "prepare_connection", %{"resource_id" => resource.id})
@@ -1912,7 +1911,7 @@ defmodule API.Client.ChannelTest do
)
)
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
{:ok, _reply, socket} =
API.Client.Socket
@@ -1939,7 +1938,7 @@ defmodule API.Client.ChannelTest do
)
)
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
ref = push(socket, "prepare_connection", %{"resource_id" => resource.id})
@@ -1976,7 +1975,7 @@ defmodule API.Client.ChannelTest do
socket: socket
} do
gateway = Fixtures.Gateways.create_gateway(account: account)
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
attrs = %{
"resource_id" => resource.id,
@@ -2021,7 +2020,7 @@ defmodule API.Client.ChannelTest do
"payload" => "DNS_Q"
}
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
ref = push(socket, "reuse_connection", attrs)
@@ -2038,7 +2037,7 @@ defmodule API.Client.ChannelTest do
resource = Fixtures.Resources.create_resource(account: account)
gateway = Fixtures.Gateways.create_gateway(account: account)
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
attrs = %{
"resource_id" => resource.id,
@@ -2075,7 +2074,7 @@ defmodule API.Client.ChannelTest do
resource_id = resource.id
client_id = client.id
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
attrs = %{
"resource_id" => resource.id,
@@ -2135,7 +2134,7 @@ defmodule API.Client.ChannelTest do
})
|> subscribe_and_join(API.Client.Channel, "client")
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
Phoenix.PubSub.subscribe(Domain.PubSub, Domain.Tokens.socket_id(gateway_group_token))
push(socket, "reuse_connection", %{
@@ -2179,7 +2178,7 @@ defmodule API.Client.ChannelTest do
socket: socket
} do
gateway = Fixtures.Gateways.create_gateway(account: account)
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
attrs = %{
"resource_id" => resource.id,
@@ -2199,7 +2198,7 @@ defmodule API.Client.ChannelTest do
resource = Fixtures.Resources.create_resource(account: account)
gateway = Fixtures.Gateways.create_gateway(account: account)
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
attrs = %{
"resource_id" => resource.id,
@@ -2246,7 +2245,7 @@ defmodule API.Client.ChannelTest do
"client_preshared_key" => "PSK"
}
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
ref = push(socket, "request_connection", attrs)
@@ -2283,7 +2282,7 @@ defmodule API.Client.ChannelTest do
resource_id = resource.id
client_id = client.id
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
Domain.PubSub.subscribe(Domain.Tokens.socket_id(gateway_group_token))
attrs = %{
@@ -2346,7 +2345,7 @@ defmodule API.Client.ChannelTest do
})
|> subscribe_and_join(API.Client.Channel, "client")
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
Phoenix.PubSub.subscribe(Domain.PubSub, Domain.Tokens.socket_id(gateway_group_token))
push(socket, "request_connection", %{
@@ -2388,7 +2387,7 @@ defmodule API.Client.ChannelTest do
"gateway_ids" => [gateway.id]
}
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
Domain.PubSub.subscribe(Domain.Tokens.socket_id(gateway_group_token))
push(socket, "broadcast_ice_candidates", attrs)
@@ -2426,7 +2425,7 @@ defmodule API.Client.ChannelTest do
"gateway_ids" => [gateway.id]
}
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
Domain.PubSub.subscribe(Domain.Tokens.socket_id(gateway_group_token))
push(socket, "broadcast_invalidated_ice_candidates", attrs)

View File

@@ -1,6 +1,6 @@
defmodule API.Gateway.ChannelTest do
use API.ChannelCase, async: true
alias Domain.Events
alias Domain.{Events, Gateways, PubSub}
setup do
account = Fixtures.Accounts.create_account()
@@ -9,7 +9,7 @@ defmodule API.Gateway.ChannelTest do
subject = Fixtures.Auth.create_subject(identity: identity)
client = Fixtures.Clients.create_client(subject: subject)
gateway = Fixtures.Gateways.create_gateway(account: account)
{:ok, gateway_group} = Domain.Gateways.fetch_group_by_id(gateway.group_id, subject)
{:ok, gateway_group} = Gateways.fetch_group_by_id(gateway.group_id, subject)
resource =
Fixtures.Resources.create_resource(
@@ -52,10 +52,7 @@ defmodule API.Gateway.ChannelTest do
describe "join/3" do
test "tracks presence after join", %{account: account, gateway: gateway} do
presence =
account.id
|> Events.Hooks.Accounts.gateways_presence_topic()
|> Domain.Gateways.Presence.list()
presence = Gateways.Presence.Account.list(account.id)
assert %{metas: [%{online_at: online_at, phx_ref: _ref}]} = Map.fetch!(presence, gateway.id)
assert is_number(online_at)
@@ -1136,8 +1133,8 @@ defmodule API.Gateway.ChannelTest do
"client_ids" => [client.id]
}
:ok = Events.Hooks.Clients.connect(client)
Domain.PubSub.subscribe(Domain.Tokens.socket_id(subject.token_id))
:ok = Domain.Clients.Presence.connect(client)
PubSub.subscribe(Domain.Tokens.socket_id(subject.token_id))
push(socket, "broadcast_ice_candidates", attrs)
@@ -1174,8 +1171,8 @@ defmodule API.Gateway.ChannelTest do
"client_ids" => [client.id]
}
:ok = Events.Hooks.Clients.connect(client)
Domain.PubSub.subscribe(Domain.Tokens.socket_id(subject.token_id))
:ok = Domain.Clients.Presence.connect(client)
PubSub.subscribe(Domain.Tokens.socket_id(subject.token_id))
push(socket, "broadcast_invalidated_ice_candidates", attrs)

View File

@@ -1,7 +1,7 @@
defmodule Domain.Clients do
use Supervisor
alias Domain.{Repo, Auth}
alias Domain.{Accounts, Actors, Events, Flows}
alias Domain.{Accounts, Actors, Flows}
alias Domain.Clients.{Client, Authorizer, Presence}
require Ecto.Query
@@ -106,9 +106,7 @@ defmodule Domain.Clients do
@doc false
def preload_clients_presence([client]) do
client.account_id
|> Events.Hooks.Accounts.clients_presence_topic()
|> Presence.get_by_key(client.id)
Presence.Account.get(client.account_id, client.id)
|> case do
[] -> %{client | online?: false}
%{metas: [_ | _]} -> %{client | online?: true}
@@ -135,8 +133,7 @@ defmodule Domain.Clients do
def online_client_ids(account_id) do
account_id
|> Events.Hooks.Accounts.clients_presence_topic()
|> Presence.list()
|> Presence.Account.list()
|> Map.keys()
end

View File

@@ -2,4 +2,81 @@ defmodule Domain.Clients.Presence do
use Phoenix.Presence,
otp_app: :domain,
pubsub_server: Domain.PubSub
alias Domain.PubSub
alias Domain.Clients.Client
def connect(%Client{} = client) do
with {:ok, _} <- __MODULE__.Account.track(client.account_id, client.id),
{:ok, _} <- __MODULE__.Actor.track(client.actor_id, client.id) do
:ok = PubSub.Client.subscribe(client.id)
:ok = PubSub.Account.Clients.subscribe(client.account_id)
end
end
defmodule Account do
def track(account_id, client_id) do
Domain.Clients.Presence.track(
self(),
topic(account_id),
client_id,
%{online_at: System.system_time(:second)}
)
end
def subscribe(account_id) do
account_id
|> topic()
|> PubSub.subscribe()
end
def get(account_id, client_id) do
account_id
|> topic()
|> Domain.Clients.Presence.get_by_key(client_id)
end
def list(account_id) do
account_id
|> topic()
|> Domain.Clients.Presence.list()
end
defp topic(account_id) do
"presences:account_clients:" <> account_id
end
end
defmodule Actor do
def track(actor_id, client_id) do
Domain.Clients.Presence.track(
self(),
topic(actor_id),
client_id,
%{}
)
end
def get(actor_id, client_id) do
actor_id
|> topic()
|> Domain.Clients.Presence.get_by_key(client_id)
end
def list(actor_id) do
actor_id
|> topic()
|> Domain.Clients.Presence.list()
end
def subscribe(actor_id) do
actor_id
|> topic()
|> PubSub.subscribe()
end
defp topic(actor_id) do
"presences:actor_clients:" <> actor_id
end
end
end

View File

@@ -0,0 +1,9 @@
defmodule Domain.Events.Hooks do
@moduledoc """
A simple behavior to define hooks needed for processing WAL events.
"""
@callback on_insert(data :: map()) :: :ok
@callback on_update(old_data :: map(), data :: map()) :: :ok
@callback on_delete(old_data :: map()) :: :ok
end

View File

@@ -1,109 +1,31 @@
defmodule Domain.Events.Hooks.Accounts do
@behaviour Domain.Events.Hooks
alias Domain.PubSub
require Logger
@impl true
def on_insert(_data), do: :ok
# Account disabled - disconnect clients
@impl true
def on_update(
%{"disabled_at" => nil} = _old_data,
%{"disabled_at" => disabled_at, "id" => account_id} = _data
)
when not is_nil(disabled_at) do
disconnect_clients(account_id)
PubSub.Account.Clients.disconnect(account_id)
end
def on_update(%{"config" => old_config}, %{"config" => config, "id" => account_id}) do
if old_config != config do
broadcast(account_id, :config_changed)
PubSub.Account.broadcast(account_id, :config_changed)
else
:ok
end
end
@impl true
def on_delete(_old_data) do
:ok
end
def subscribe(account_id) do
PubSub.subscribe("accounts:#{account_id}")
end
def subscribe_to_clients(account_id) do
account_id
|> clients_topic()
|> PubSub.subscribe()
end
def subscribe_to_resources(account_id) do
account_id
|> resources_topic()
|> PubSub.subscribe()
end
def subscribe_to_policies(account_id) do
account_id
|> policies_topic()
|> PubSub.subscribe()
end
def subscribe_to_clients_presence(account_id) do
account_id
|> clients_presence_topic()
|> PubSub.subscribe()
end
def subscribe_to_gateways_presence(account_id) do
account_id
|> gateways_presence_topic()
|> PubSub.subscribe()
end
def clients_presence_topic(account_id) do
"presences:#{clients_topic(account_id)}"
end
def clients_topic(account_id) do
"account_clients:#{account_id}"
end
def gateways_presence_topic(account_id) do
"presences:account_gateways:#{account_id}"
end
def broadcast_to_resources(account_id, payload) do
account_id
|> resources_topic()
|> PubSub.broadcast(payload)
end
def broadcast_to_policies(account_id, payload) do
account_id
|> policies_topic()
|> PubSub.broadcast(payload)
end
defp resources_topic(account_id) do
"account_resources:#{account_id}"
end
defp policies_topic(account_id) do
"account_policies:#{account_id}"
end
defp topic(account_id) do
"accounts:#{account_id}"
end
defp broadcast(account_id, event) do
account_id
|> topic()
|> PubSub.broadcast(event)
end
defp disconnect_clients(account_id) do
account_id
|> clients_topic()
|> PubSub.broadcast("disconnect")
end
end

View File

@@ -1,39 +1,38 @@
defmodule Domain.Events.Hooks.ActorGroupMemberships do
alias Domain.{Events, Flows, Policies, PubSub, Repo}
@behaviour Domain.Events.Hooks
alias Domain.{Flows, Policies, PubSub, Repo}
@impl true
def on_insert(%{"actor_id" => actor_id, "group_id" => group_id} = _data) do
broadcast_access(:allow, actor_id, group_id)
broadcast(:create, actor_id, group_id)
end
def on_update(_old_data, _data), do: :ok
def on_delete(%{"actor_id" => actor_id, "group_id" => group_id} = _old_data) do
Task.start(fn ->
{:ok, _flows} = Flows.expire_flows_for(actor_id, group_id)
broadcast_access(:reject, actor_id, group_id)
broadcast(:delete, actor_id, group_id)
:ok = PubSub.Actor.Memberships.broadcast(actor_id, {:create_membership, actor_id, group_id})
broadcast_access(:allow, actor_id, group_id)
end)
:ok
end
def broadcast(action, actor_id, group_id) do
payload = {:"#{action}_membership", actor_id, group_id}
topic = Events.Hooks.Actors.memberships_topic(actor_id)
@impl true
def on_update(_old_data, _data), do: :ok
:ok = PubSub.broadcast(topic, payload)
@impl true
def on_delete(%{"actor_id" => actor_id, "group_id" => group_id} = _old_data) do
Task.start(fn ->
{:ok, _flows} = Flows.expire_flows_for(actor_id, group_id)
:ok = PubSub.Actor.Memberships.broadcast(actor_id, {:delete_membership, actor_id, group_id})
broadcast_access(:reject, actor_id, group_id)
end)
:ok
end
defp broadcast_access(action, actor_id, group_id) do
# TODO: WAL
# This N+1 query will go away when we broadcast flows directly
Policies.Policy.Query.not_deleted()
|> Policies.Policy.Query.by_actor_group_id(group_id)
|> Repo.all()
|> Enum.each(fn policy ->
payload = {:"#{action}_access", policy.id, policy.actor_group_id, policy.resource_id}
:ok = Events.Hooks.Actors.broadcast_to_policies(actor_id, payload)
:ok = PubSub.Actor.Policies.broadcast(actor_id, payload)
end)
end
end

View File

@@ -1,37 +1,12 @@
defmodule Domain.Events.Hooks.ActorGroups do
alias Domain.PubSub
@behaviour Domain.Events.Hooks
def on_insert(_data) do
:ok
end
@impl true
def on_insert(_data), do: :ok
def on_update(_old_data, _data) do
:ok
end
@impl true
def on_update(_old_data, _data), do: :ok
def on_delete(_old_data) do
:ok
end
def subscribe_to_policies(actor_group_id) do
actor_group_id
|> policies_topic()
|> PubSub.subscribe()
end
def unsubscribe_from_policies(actor_group_id) do
actor_group_id
|> policies_topic()
|> PubSub.unsubscribe()
end
def broadcast_to_policies(actor_group_id, payload) do
actor_group_id
|> policies_topic()
|> PubSub.broadcast(payload)
end
defp policies_topic(actor_group_id) do
"actor_group_policies:#{actor_group_id}"
end
@impl true
def on_delete(_old_data), do: :ok
end

View File

@@ -1,55 +1,12 @@
defmodule Domain.Events.Hooks.Actors do
alias Domain.PubSub
@behaviour Domain.Events.Hooks
def on_insert(_data) do
:ok
end
@impl true
def on_insert(_data), do: :ok
def on_update(_old_data, _data) do
:ok
end
@impl true
def on_update(_old_data, _data), do: :ok
def on_delete(_old_data) do
:ok
end
def subscribe_to_clients_presence(actor_id) do
actor_id
|> clients_presence_topic()
|> PubSub.subscribe()
end
def clients_presence_topic(actor_id) do
"presences:#{clients_topic(actor_id)}"
end
def subscribe_to_memberships(actor_id) do
actor_id
|> memberships_topic()
|> PubSub.subscribe()
end
def memberships_topic(actor_id) do
"actor_memberships:#{actor_id}"
end
def subscribe_to_policies(actor_id) do
actor_id
|> policies_topic()
|> PubSub.subscribe()
end
def broadcast_to_policies(actor_id, payload) do
actor_id
|> policies_topic()
|> PubSub.broadcast(payload)
end
defp policies_topic(actor_id) do
"actor_policies:#{actor_id}"
end
defp clients_topic(actor_id) do
"actor_clients:#{actor_id}"
end
@impl true
def on_delete(_old_data), do: :ok
end

View File

@@ -1,13 +1,12 @@
defmodule Domain.Events.Hooks.AuthIdentities do
def on_insert(_data) do
:ok
end
@behaviour Domain.Events.Hooks
def on_update(_old_data, _data) do
:ok
end
@impl true
def on_insert(_data), do: :ok
def on_delete(_old_data) do
:ok
end
@impl true
def on_update(_old_data, _data), do: :ok
@impl true
def on_delete(_old_data), do: :ok
end

View File

@@ -1,13 +1,12 @@
defmodule Domain.Events.Hooks.AuthProviders do
def on_insert(_data) do
:ok
end
@behaviour Domain.Events.Hooks
def on_update(_old_data, _data) do
:ok
end
@impl true
def on_insert(_data), do: :ok
def on_delete(_old_data) do
:ok
end
@impl true
def on_update(_old_data, _data), do: :ok
@impl true
def on_delete(_old_data), do: :ok
end

View File

@@ -1,13 +1,12 @@
defmodule Domain.Events.Hooks.Clients do
@behaviour Domain.Events.Hooks
alias Domain.PubSub
alias Domain.Clients.{Client, Presence}
alias Domain.Events
def on_insert(_data) do
:ok
end
@impl true
def on_insert(_data), do: :ok
# Soft-delete
@impl true
def on_update(%{"deleted_at" => nil} = old_data, %{"deleted_at" => deleted_at} = _data)
when not is_nil(deleted_at) do
on_delete(old_data)
@@ -15,47 +14,11 @@ defmodule Domain.Events.Hooks.Clients do
# Regular update
def on_update(_old_data, %{"id" => client_id} = _data) do
broadcast(client_id, :updated)
PubSub.Client.broadcast(client_id, :updated)
end
@impl true
def on_delete(%{"id" => client_id} = _old_data) do
disconnect_client(client_id)
end
def connect(%Client{} = client) do
with {:ok, _} <-
Presence.track(
self(),
Events.Hooks.Accounts.clients_presence_topic(client.account_id),
client.id,
%{
online_at: System.system_time(:second)
}
),
{:ok, _} <-
Presence.track(
self(),
Events.Hooks.Actors.clients_presence_topic(client.actor_id),
client.id,
%{}
) do
:ok = PubSub.subscribe(topic(client.id))
:ok = Events.Hooks.Accounts.subscribe_to_clients(client.account_id)
:ok
end
end
### PubSub
def broadcast(client_id, payload) do
client_id
|> topic()
|> PubSub.broadcast(payload)
end
defp topic(client_id), do: "clients:#{client_id}"
defp disconnect_client(client_id) do
broadcast(client_id, "disconnect")
PubSub.Client.disconnect(client_id)
end
end

View File

@@ -1,13 +1,12 @@
defmodule Domain.Events.Hooks.FlowActivities do
def on_insert(_data) do
:ok
end
@behaviour Domain.Events.Hooks
def on_update(_old_data, _data) do
:ok
end
@impl true
def on_insert(_data), do: :ok
def on_delete(_old_data) do
:ok
end
@impl true
def on_update(_old_data, _data), do: :ok
@impl true
def on_delete(_old_data), do: :ok
end

View File

@@ -1,11 +1,12 @@
defmodule Domain.Events.Hooks.Flows do
@behaviour Domain.Events.Hooks
alias Domain.PubSub
require Logger
def on_insert(_data) do
:ok
end
@impl true
def on_insert(_data), do: :ok
@impl true
def on_update(
_old_data,
%{
@@ -17,7 +18,7 @@ defmodule Domain.Events.Hooks.Flows do
) do
if expired?(expires_at) do
# Flow has become expired
broadcast(flow_id, {:expire_flow, flow_id, client_id, resource_id})
PubSub.Flow.broadcast(flow_id, {:expire_flow, flow_id, client_id, resource_id})
else
:ok
end
@@ -25,6 +26,7 @@ defmodule Domain.Events.Hooks.Flows do
# During normal operation we don't expect to delete flows, however, this is implemented as a safeguard for cases
# where we might manually clear flows in a migration or some other mechanism.
@impl true
def on_delete(
%{
"id" => flow_id,
@@ -32,19 +34,7 @@ defmodule Domain.Events.Hooks.Flows do
"resource_id" => resource_id
} = _old_data
) do
broadcast(flow_id, {:expire_flow, flow_id, client_id, resource_id})
end
def subscribe(flow_id) do
flow_id
|> topic()
|> PubSub.subscribe()
end
def unsubscribe(flow_id) do
flow_id
|> topic()
|> PubSub.unsubscribe()
PubSub.Flow.broadcast(flow_id, {:expire_flow, flow_id, client_id, resource_id})
end
defp expired?(nil), do: false
@@ -56,14 +46,4 @@ defmodule Domain.Events.Hooks.Flows do
_ -> false
end
end
defp topic(flow_id) do
"flows:#{flow_id}"
end
defp broadcast(flow_id, payload) do
flow_id
|> topic()
|> PubSub.broadcast(payload)
end
end

View File

@@ -1,41 +1,12 @@
defmodule Domain.Events.Hooks.GatewayGroups do
alias Domain.PubSub
@behaviour Domain.Events.Hooks
def on_insert(_data) do
:ok
end
@impl true
def on_insert(_data), do: :ok
def on_update(_old_data, _data) do
:ok
end
@impl true
def on_update(_old_data, _data), do: :ok
def on_delete(_old_data) do
:ok
end
def subscribe(gateway_group_id) do
gateway_group_id
|> topic()
|> PubSub.subscribe()
end
def subscribe_to_presence(gateway_group_id) do
gateway_group_id
|> presence_topic()
|> PubSub.subscribe()
end
def unsubscribe(gateway_group_id) do
gateway_group_id
|> topic()
|> PubSub.unsubscribe()
end
def presence_topic(gateway_group_id) do
"presences:#{topic(gateway_group_id)}"
end
defp topic(gateway_group_id) do
"group_gateways:#{gateway_group_id}"
end
@impl true
def on_delete(_old_data), do: :ok
end

View File

@@ -1,61 +1,22 @@
defmodule Domain.Events.Hooks.Gateways do
@behaviour Domain.Events.Hooks
alias Domain.PubSub
alias Domain.Gateways
alias Domain.Events
def on_insert(_data) do
:ok
end
@impl true
def on_insert(_data), do: :ok
# Soft-delete
@impl true
def on_update(%{"deleted_at" => nil} = old_data, %{"deleted_at" => deleted_at} = _data)
when not is_nil(deleted_at) do
on_delete(old_data)
end
# Regular update
def on_update(_old_data, _data) do
:ok
end
def on_update(_old_data, _data), do: :ok
@impl true
def on_delete(%{"id" => gateway_id} = _old_data) do
disconnect(gateway_id)
end
def connect(%Gateways.Gateway{} = gateway) do
with {:ok, _} <-
Gateways.Presence.track(
self(),
Events.Hooks.GatewayGroups.presence_topic(gateway.group_id),
gateway.id,
%{}
),
{:ok, _} <-
Gateways.Presence.track(
self(),
Events.Hooks.Accounts.gateways_presence_topic(gateway.account_id),
gateway.id,
%{
online_at: System.system_time(:second)
}
) do
:ok = PubSub.subscribe(topic(gateway.id))
:ok
end
end
def broadcast(gateway_id, payload) do
gateway_id
|> topic()
|> PubSub.broadcast(payload)
end
defp disconnect(gateway_id) do
gateway_id
|> broadcast("disconnect")
end
defp topic(gateway_id) do
"gateways:#{gateway_id}"
PubSub.Gateway.disconnect(gateway_id)
end
end

View File

@@ -1,7 +1,9 @@
defmodule Domain.Events.Hooks.Policies do
alias Domain.{Events, Flows, PubSub}
@behaviour Domain.Events.Hooks
alias Domain.{Flows, PubSub}
require Logger
@impl true
def on_insert(
%{
"id" => policy_id,
@@ -14,12 +16,15 @@ defmodule Domain.Events.Hooks.Policies do
# TODO: WAL
# Creating a policy should broadcast directly to subscribed clients/gateways
payload = {:create_policy, policy_id}
access_payload = {:allow_access, policy_id, actor_group_id, resource_id}
:ok = broadcast(policy_id, payload)
:ok = Events.Hooks.Accounts.broadcast_to_policies(account_id, payload)
:ok = Events.Hooks.ActorGroups.broadcast_to_policies(actor_group_id, access_payload)
:ok = PubSub.Policy.broadcast(policy_id, payload)
:ok = PubSub.Account.Policies.broadcast(account_id, payload)
payload = {:allow_access, policy_id, actor_group_id, resource_id}
:ok = PubSub.ActorGroup.Policies.broadcast(actor_group_id, payload)
end
@impl true
# Enable
def on_update(
%{"disabled_at" => disabled_at} = _old_data,
@@ -35,10 +40,11 @@ defmodule Domain.Events.Hooks.Policies do
# TODO: WAL
# Enabling a policy should broadcast directly to subscribed clients/gateways
payload = {:enable_policy, policy_id}
access_payload = {:allow_access, policy_id, actor_group_id, resource_id}
:ok = broadcast(policy_id, payload)
:ok = Events.Hooks.Accounts.broadcast_to_policies(account_id, payload)
:ok = Events.Hooks.ActorGroups.broadcast_to_policies(actor_group_id, access_payload)
:ok = PubSub.Policy.broadcast(policy_id, payload)
:ok = PubSub.Account.Policies.broadcast(account_id, payload)
payload = {:allow_access, policy_id, actor_group_id, resource_id}
:ok = PubSub.ActorGroup.Policies.broadcast(actor_group_id, payload)
end
# Disable
@@ -59,10 +65,11 @@ defmodule Domain.Events.Hooks.Policies do
# TODO: WAL
# Disabling a policy should broadcast directly to the subscribed clients/gateways
payload = {:disable_policy, policy_id}
access_payload = {:reject_access, policy_id, actor_group_id, resource_id}
:ok = broadcast(policy_id, payload)
:ok = Events.Hooks.Accounts.broadcast_to_policies(account_id, payload)
:ok = Events.Hooks.ActorGroups.broadcast_to_policies(actor_group_id, access_payload)
:ok = PubSub.Policy.broadcast(policy_id, payload)
:ok = PubSub.Account.Policies.broadcast(account_id, payload)
payload = {:reject_access, policy_id, actor_group_id, resource_id}
:ok = PubSub.ActorGroup.Policies.broadcast(actor_group_id, payload)
end)
:ok
@@ -106,16 +113,18 @@ defmodule Domain.Events.Hooks.Policies do
# TODO: WAL
# Deleting a policy should broadcast directly to the subscribed clients/gateways
payload = {:delete_policy, old_policy_id}
access_payload = {:reject_access, old_policy_id, old_actor_group_id, old_resource_id}
:ok = broadcast(old_policy_id, payload)
:ok = Events.Hooks.Accounts.broadcast_to_policies(old_account_id, payload)
:ok = Events.Hooks.ActorGroups.broadcast_to_policies(old_actor_group_id, access_payload)
:ok = PubSub.Policy.broadcast(old_policy_id, payload)
:ok = PubSub.Account.Policies.broadcast(old_account_id, payload)
payload = {:reject_access, old_policy_id, old_actor_group_id, old_resource_id}
:ok = PubSub.ActorGroup.Policies.broadcast(old_actor_group_id, payload)
payload = {:create_policy, policy_id}
access_payload = {:allow_access, policy_id, actor_group_id, resource_id}
:ok = broadcast(policy_id, payload)
:ok = Events.Hooks.Accounts.broadcast_to_policies(account_id, payload)
:ok = Events.Hooks.ActorGroups.broadcast_to_policies(actor_group_id, access_payload)
:ok = PubSub.Policy.broadcast(policy_id, payload)
:ok = PubSub.Account.Policies.broadcast(account_id, payload)
payload = {:allow_access, policy_id, actor_group_id, resource_id}
:ok = PubSub.ActorGroup.Policies.broadcast(actor_group_id, payload)
end)
else
Logger.warning("Breaking update ignored for policy as it is deleted or disabled",
@@ -129,10 +138,11 @@ defmodule Domain.Events.Hooks.Policies do
# Regular update - name, description, etc
def on_update(_old_data, %{"id" => policy_id, "account_id" => account_id} = _data) do
payload = {:update_policy, policy_id}
:ok = broadcast(policy_id, payload)
:ok = Events.Hooks.Accounts.broadcast_to_policies(account_id, payload)
:ok = PubSub.Policy.broadcast(policy_id, payload)
:ok = PubSub.Account.Policies.broadcast(account_id, payload)
end
@impl true
def on_delete(
%{
"id" => policy_id,
@@ -143,31 +153,17 @@ defmodule Domain.Events.Hooks.Policies do
) do
Task.start(fn ->
Flows.expire_flows_for_policy_id(policy_id)
# TODO: WAL
# Deleting a policy should broadcast directly to the subscribed clients/gateways
payload = {:delete_policy, policy_id}
access_payload = {:reject_access, policy_id, actor_group_id, resource_id}
:ok = broadcast(policy_id, payload)
:ok = Events.Hooks.Accounts.broadcast_to_policies(account_id, payload)
:ok = Events.Hooks.ActorGroups.broadcast_to_policies(actor_group_id, access_payload)
:ok = PubSub.Policy.broadcast(policy_id, payload)
:ok = PubSub.Account.Policies.broadcast(account_id, payload)
payload = {:reject_access, policy_id, actor_group_id, resource_id}
:ok = PubSub.ActorGroup.Policies.broadcast(actor_group_id, payload)
end)
:ok
end
def subscribe(policy_id) do
policy_id
|> topic()
|> PubSub.subscribe()
end
defp broadcast(policy_id, payload) do
policy_id
|> topic()
|> PubSub.broadcast(payload)
end
defp topic(policy_id) do
"policy:#{policy_id}"
end
end

View File

@@ -1,13 +1,12 @@
defmodule Domain.Events.Hooks.RelayGroups do
def on_insert(_data) do
:ok
end
@behaviour Domain.Events.Hooks
def on_update(_old_data, _data) do
:ok
end
@impl true
def on_insert(_data), do: :ok
def on_delete(_old_data) do
:ok
end
@impl true
def on_update(_old_data, _data), do: :ok
@impl true
def on_delete(_old_data), do: :ok
end

View File

@@ -1,13 +1,12 @@
defmodule Domain.Events.Hooks.Relays do
def on_insert(_data) do
:ok
end
@behaviour Domain.Events.Hooks
def on_update(_old_data, _data) do
:ok
end
@impl true
def on_insert(_data), do: :ok
def on_delete(_old_data) do
:ok
end
@impl true
def on_update(_old_data, _data), do: :ok
@impl true
def on_delete(_old_data), do: :ok
end

View File

@@ -1,9 +1,14 @@
defmodule Domain.Events.Hooks.ResourceConnections do
@behaviour Domain.Events.Hooks
alias Domain.Flows
@impl true
def on_insert(_data), do: :ok
@impl true
def on_update(_old_data, _data), do: :ok
@impl true
def on_delete(%{"resource_id" => resource_id} = _old_data) do
# TODO: WAL
# The flow expires_at field is not used for any persistence-related reason.

View File

@@ -1,13 +1,16 @@
defmodule Domain.Events.Hooks.Resources do
alias Domain.Events.Hooks.Accounts
@behaviour Domain.Events.Hooks
alias Domain.{Flows, PubSub}
@impl true
def on_insert(%{"id" => resource_id, "account_id" => account_id} = _data) do
payload = {:create_resource, resource_id}
broadcast(resource_id, payload)
Accounts.broadcast_to_resources(account_id, payload)
PubSub.Resource.broadcast(resource_id, payload)
PubSub.Account.Resources.broadcast(account_id, payload)
end
@impl true
# Soft-delete
def on_update(%{"deleted_at" => nil} = old_data, %{"deleted_at" => deleted_at} = _data)
when not is_nil(deleted_at) do
@@ -41,12 +44,12 @@ defmodule Domain.Events.Hooks.Resources do
{:ok, _flows} = Flows.expire_flows_for_resource_id(resource_id)
payload = {:delete_resource, resource_id}
broadcast(resource_id, payload)
Accounts.broadcast_to_resources(account_id, payload)
PubSub.Resource.broadcast(resource_id, payload)
PubSub.Account.Resources.broadcast(account_id, payload)
payload = {:create_resource, resource_id}
broadcast(resource_id, payload)
Accounts.broadcast_to_resources(account_id, payload)
PubSub.Resource.broadcast(resource_id, payload)
PubSub.Account.Resources.broadcast(account_id, payload)
end)
:ok
@@ -55,35 +58,14 @@ defmodule Domain.Events.Hooks.Resources do
# Non-breaking update - for non-addressability changes - e.g. name, description, etc.
def on_update(_old_data, %{"id" => resource_id, "account_id" => account_id} = _data) do
payload = {:update_resource, resource_id}
broadcast(resource_id, payload)
Accounts.broadcast_to_resources(account_id, payload)
PubSub.Resource.broadcast(resource_id, payload)
PubSub.Account.Resources.broadcast(account_id, payload)
end
@impl true
def on_delete(%{"id" => resource_id, "account_id" => account_id} = _old_data) do
payload = {:delete_resource, resource_id}
broadcast(resource_id, payload)
Accounts.broadcast_to_resources(account_id, payload)
end
def subscribe(resource_id) do
resource_id
|> topic()
|> PubSub.subscribe()
end
def unsubscribe(resource_id) do
resource_id
|> topic()
|> PubSub.unsubscribe()
end
def broadcast(resource_id, payload) do
resource_id
|> topic()
|> PubSub.broadcast(payload)
end
defp topic(resource_id) do
"resource:#{resource_id}"
PubSub.Resource.broadcast(resource_id, payload)
PubSub.Account.Resources.broadcast(account_id, payload)
end
end

View File

@@ -1,10 +1,15 @@
defmodule Domain.Events.Hooks.Tokens do
def on_insert(_data) do
:ok
end
@behaviour Domain.Events.Hooks
alias Domain.PubSub
@impl true
def on_insert(_data), do: :ok
@impl true
# updates for email tokens have no side effects
def on_update(%{"type" => "email"}, _data), do: :ok
def on_update(_old_data, %{"type" => "email"}), do: :ok
# Soft-delete
@@ -14,17 +19,10 @@ defmodule Domain.Events.Hooks.Tokens do
end
# Regular update
def on_update(_old_data, _new_data) do
:ok
end
def on_update(_old_data, _new_data), do: :ok
@impl true
def on_delete(%{"id" => token_id}) do
broadcast_disconnect(token_id)
end
defp broadcast_disconnect(token_id) do
topic = Domain.Tokens.socket_id(token_id)
payload = %Phoenix.Socket.Broadcast{topic: topic, event: "disconnect"}
Phoenix.PubSub.broadcast(Domain.PubSub, topic, payload)
PubSub.Token.disconnect(token_id)
end
end

View File

@@ -0,0 +1,47 @@
defmodule Domain.Events.Topics do
@moduledoc """
A simple module to house all of the topics and broadcasts so we can see
them and verify them in one place.
"""
alias Domain.PubSub
defmodule Account do
def subscribe(account_id) do
account_id
|> topic()
|> PubSub.subscribe()
end
defp topic(account_id) do
"accounts:" <> account_id
end
end
defmodule Presence do
defmodule Account do
defmodule Clients do
def subscribe(account_id) do
account_id
|> topic()
|> PubSub.subscribe()
end
defp topic(account_id) do
"presences:account_clients:" <> account_id
end
end
defmodule Gateways do
def subscribe(account_id) do
account_id
|> topic()
|> PubSub.subscribe()
end
defp topic(account_id) do
"presences:account_gateways:" <> account_id
end
end
end
end
end

View File

@@ -1,7 +1,7 @@
defmodule Domain.Gateways do
use Supervisor
alias Domain.Accounts.Account
alias Domain.{Repo, Auth, Events, Geo}
alias Domain.{Repo, Auth, Geo}
alias Domain.{Accounts, Resources, Tokens, Billing}
alias Domain.Gateways.{Authorizer, Gateway, Group, Presence}
@@ -220,9 +220,7 @@ defmodule Domain.Gateways do
@doc false
def preload_gateways_presence([gateway]) do
gateway.account_id
|> Events.Hooks.Accounts.gateways_presence_topic()
|> Presence.get_by_key(gateway.id)
Presence.Account.get(gateway.account_id, gateway.id)
|> case do
[] -> %{gateway | online?: false}
%{metas: [_ | _]} -> %{gateway | online?: true}
@@ -238,8 +236,7 @@ defmodule Domain.Gateways do
|> Enum.reject(&is_nil/1)
|> Enum.uniq()
|> Enum.reduce(%{}, fn account_id, acc ->
connected_gateways =
account_id |> Events.Hooks.Accounts.gateways_presence_topic() |> Presence.list()
connected_gateways = Presence.Account.list(account_id)
Map.merge(acc, connected_gateways)
end)
@@ -250,9 +247,7 @@ defmodule Domain.Gateways do
end
def all_online_gateway_ids_by_group_id!(group_id) do
group_id
|> Events.Hooks.GatewayGroups.presence_topic()
|> Presence.list()
Presence.Group.list(group_id)
|> Map.keys()
end
@@ -265,9 +260,7 @@ defmodule Domain.Gateways do
{preload, _opts} = Keyword.pop(opts, :preload, [])
connected_gateway_ids =
resource.account_id
|> Events.Hooks.Accounts.gateways_presence_topic()
|> Presence.list()
Presence.Account.list(resource.account_id)
|> Map.keys()
gateways =
@@ -284,9 +277,7 @@ defmodule Domain.Gateways do
def gateway_can_connect_to_resource?(%Gateway{} = gateway, %Resources.Resource{} = resource) do
connected_gateway_ids =
resource.account_id
|> Events.Hooks.Accounts.gateways_presence_topic()
|> Presence.list()
Presence.Account.list(resource.account_id)
|> Map.keys()
cond do

View File

@@ -2,4 +2,74 @@ defmodule Domain.Gateways.Presence do
use Phoenix.Presence,
otp_app: :domain,
pubsub_server: Domain.PubSub
alias Domain.Gateways.Gateway
alias Domain.PubSub
def connect(%Gateway{} = gateway) do
with {:ok, _} <- __MODULE__.Group.track(gateway.group_id, gateway.id),
{:ok, _} <- __MODULE__.Account.track(gateway.account_id, gateway.id) do
:ok = PubSub.Gateway.subscribe(gateway.id)
end
end
defmodule Account do
def track(account_id, gateway_id) do
Domain.Gateways.Presence.track(
self(),
topic(account_id),
gateway_id,
%{online_at: System.system_time(:second)}
)
end
def subscribe(account_id) do
account_id
|> topic()
|> PubSub.subscribe()
end
def get(account_id, gateway_id) do
account_id
|> topic()
|> Domain.Gateways.Presence.get_by_key(gateway_id)
end
def list(account_id) do
account_id
|> topic()
|> Domain.Gateways.Presence.list()
end
defp topic(account_id) do
"presences:account_gateways:" <> account_id
end
end
defmodule Group do
def track(gateway_group_id, gateway_id) do
Domain.Gateways.Presence.track(
self(),
topic(gateway_group_id),
gateway_id,
%{}
)
end
def subscribe(gateway_group_id) do
gateway_group_id
|> topic()
|> PubSub.subscribe()
end
def list(gateway_group_id) do
gateway_group_id
|> topic()
|> Domain.Gateways.Presence.list()
end
defp topic(gateway_group_id) do
"presences:group_gateways:" <> gateway_group_id
end
end
end

View File

@@ -41,4 +41,315 @@ defmodule Domain.PubSub do
def unsubscribe(topic) do
Phoenix.PubSub.unsubscribe(__MODULE__, topic)
end
# TODO: These are quite repetitive. We could simplify this with a `__using__` macro.
defmodule Account do
def subscribe(account_id) do
account_id
|> topic()
|> Domain.PubSub.subscribe()
end
def broadcast(account_id, payload) do
account_id
|> topic()
|> Domain.PubSub.broadcast(payload)
end
defp topic(account_id) do
Atom.to_string(__MODULE__) <> ":" <> account_id
end
defmodule Clients do
def subscribe(account_id) do
account_id
|> topic()
|> Domain.PubSub.subscribe()
end
def broadcast(account_id, payload) do
account_id
|> topic()
|> Domain.PubSub.broadcast(payload)
end
def disconnect(account_id) do
account_id
|> topic()
|> Domain.PubSub.broadcast("disconnect")
end
defp topic(account_id) do
Atom.to_string(__MODULE__) <> ":" <> account_id
end
end
defmodule Policies do
def subscribe(account_id) do
account_id
|> topic()
|> Domain.PubSub.subscribe()
end
def broadcast(account_id, payload) do
account_id
|> topic()
|> Domain.PubSub.broadcast(payload)
end
defp topic(account_id) do
Atom.to_string(__MODULE__) <> ":" <> account_id
end
end
defmodule Resources do
def subscribe(account_id) do
account_id
|> topic()
|> Domain.PubSub.subscribe()
end
def broadcast(account_id, payload) do
account_id
|> topic()
|> Domain.PubSub.broadcast(payload)
end
defp topic(account_id) do
Atom.to_string(__MODULE__) <> ":" <> account_id
end
end
end
defmodule Actor do
def subscribe(actor_id) do
actor_id
|> topic()
|> Domain.PubSub.subscribe()
end
defp topic(actor_id) do
Atom.to_string(__MODULE__) <> ":" <> actor_id
end
defmodule Memberships do
def subscribe(actor_id) do
actor_id
|> topic()
|> Domain.PubSub.subscribe()
end
def broadcast(actor_id, payload) do
actor_id
|> topic()
|> Domain.PubSub.broadcast(payload)
end
def broadcast_access(action, actor_id, group_id) do
Domain.Policies.Policy.Query.not_deleted()
|> Domain.Policies.Policy.Query.by_actor_group_id(group_id)
|> Domain.Repo.all()
|> Enum.each(fn policy ->
payload = {:"#{action}_access", policy.id, policy.actor_group_id, policy.resource_id}
:ok = Actor.Policies.broadcast(actor_id, payload)
end)
end
defp topic(actor_id) do
Atom.to_string(__MODULE__) <> ":" <> actor_id
end
end
defmodule Policies do
def subscribe(actor_id) do
actor_id
|> topic()
|> Domain.PubSub.subscribe()
end
def broadcast(actor_id, payload) do
actor_id
|> topic()
|> Domain.PubSub.broadcast(payload)
end
defp topic(actor_id) do
Atom.to_string(__MODULE__) <> ":" <> actor_id
end
end
end
defmodule ActorGroup do
defmodule Policies do
def subscribe(actor_group_id) do
actor_group_id
|> topic()
|> Domain.PubSub.subscribe()
end
def unsubscribe(actor_group_id) do
actor_group_id
|> topic()
|> Domain.PubSub.unsubscribe()
end
def broadcast(actor_group_id, payload) do
actor_group_id
|> topic()
|> Domain.PubSub.broadcast(payload)
end
defp topic(actor_group_id) do
Atom.to_string(__MODULE__) <> ":" <> actor_group_id
end
end
end
defmodule Client do
def subscribe(client_id) do
client_id
|> topic()
|> Domain.PubSub.subscribe()
end
def broadcast(client_id, payload) do
client_id
|> topic()
|> Domain.PubSub.broadcast(payload)
end
def disconnect(client_id) do
client_id
|> topic()
|> Domain.PubSub.broadcast("disconnect")
end
defp topic(client_id) do
Atom.to_string(__MODULE__) <> ":" <> client_id
end
end
defmodule Flow do
def subscribe(flow_id) do
flow_id
|> topic()
|> Domain.PubSub.subscribe()
end
def unsubscribe(flow_id) do
flow_id
|> topic()
|> Domain.PubSub.unsubscribe()
end
def broadcast(flow_id, payload) do
flow_id
|> topic()
|> Domain.PubSub.broadcast(payload)
end
defp topic(flow_id) do
Atom.to_string(__MODULE__) <> ":" <> flow_id
end
end
defmodule GatewayGroup do
def subscribe(gateway_group_id) do
gateway_group_id
|> topic()
|> Domain.PubSub.subscribe()
end
def unsubscribe(gateway_group_id) do
gateway_group_id
|> topic()
|> Domain.PubSub.unsubscribe()
end
defp topic(gateway_group_id) do
Atom.to_string(__MODULE__) <> ":" <> gateway_group_id
end
end
defmodule Gateway do
def subscribe(gateway_id) do
gateway_id
|> topic()
|> Domain.PubSub.subscribe()
end
def broadcast(gateway_id, payload) do
gateway_id
|> topic()
|> Domain.PubSub.broadcast(payload)
end
def disconnect(gateway_id) do
gateway_id
|> topic()
|> Domain.PubSub.broadcast("disconnect")
end
defp topic(gateway_id) do
Atom.to_string(__MODULE__) <> ":" <> gateway_id
end
end
defmodule Policy do
def subscribe(policy_id) do
policy_id
|> topic()
|> Domain.PubSub.subscribe()
end
def broadcast(policy_id, payload) do
policy_id
|> topic()
|> Domain.PubSub.broadcast(payload)
end
defp topic(policy_id) do
Atom.to_string(__MODULE__) <> ":" <> policy_id
end
end
defmodule Resource do
def subscribe(resource_id) do
resource_id
|> topic()
|> Domain.PubSub.subscribe()
end
def unsubscribe(resource_id) do
resource_id
|> topic()
|> Domain.PubSub.unsubscribe()
end
def broadcast(resource_id, payload) do
resource_id
|> topic()
|> Domain.PubSub.broadcast(payload)
end
defp topic(resource_id) do
Atom.to_string(__MODULE__) <> ":" <> resource_id
end
end
defmodule Token do
def disconnect(token_id) do
token_id
|> topic()
|> Domain.PubSub.broadcast(%Phoenix.Socket.Broadcast{
topic: topic(token_id),
event: "disconnect"
})
end
defp topic(token_id) do
# This topic is managed by Phoenix
Domain.Tokens.socket_id(token_id)
end
end
end

View File

@@ -632,7 +632,7 @@ defmodule Domain.ActorsTest do
} do
actor = Fixtures.Actors.create_actor(account: account)
client = Fixtures.Clients.create_client(account: account, actor: actor)
Events.Hooks.Clients.connect(client)
Clients.Presence.connect(client)
assert {:ok, peek} = peek_actor_clients([actor], 3, subject)
assert [%Clients.Client{} = client] = peek[actor.id].items

View File

@@ -1,6 +1,6 @@
defmodule Domain.Auth.Adapters.GoogleWorkspace.Jobs.SyncDirectoryTest do
use Domain.DataCase, async: true
alias Domain.{Auth, Actors, Events}
alias Domain.{Auth, Actors, Events, PubSub}
alias Domain.Mocks.GoogleWorkspaceDirectory
import Domain.Auth.Adapters.GoogleWorkspace.Jobs.SyncDirectory
@@ -629,12 +629,12 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace.Jobs.SyncDirectoryTest do
Fixtures.Actors.create_membership(account: account, actor: actor, group: deleted_group)
Fixtures.Actors.create_membership(account: account, actor: actor, group: org_unit)
:ok = Events.Hooks.Actors.subscribe_to_memberships(actor.id)
:ok = Events.Hooks.Actors.subscribe_to_memberships(other_actor.id)
:ok = Events.Hooks.Actors.subscribe_to_memberships(deleted_membership.actor_id)
:ok = Events.Hooks.Actors.subscribe_to_policies(actor.id)
:ok = Events.Hooks.Actors.subscribe_to_policies(other_actor.id)
:ok = Events.Hooks.ActorGroups.subscribe_to_policies(deleted_group.id)
:ok = PubSub.Actor.Memberships.subscribe(actor.id)
:ok = PubSub.Actor.Memberships.subscribe(other_actor.id)
:ok = PubSub.Actor.Memberships.subscribe(deleted_membership.actor_id)
:ok = PubSub.Actor.Policies.subscribe(actor.id)
:ok = PubSub.Actor.Policies.subscribe(other_actor.id)
:ok = PubSub.ActorGroup.Policies.subscribe(deleted_group.id)
GoogleWorkspaceDirectory.override_endpoint_url("http://localhost:#{bypass.port}/")

View File

@@ -1,6 +1,6 @@
defmodule Domain.Auth.Adapters.JumpCloud.Jobs.SyncDirectoryTest do
use Domain.DataCase, async: true
alias Domain.{Auth, Actors, Events}
alias Domain.{Auth, Actors, Events, PubSub}
alias Domain.Mocks.WorkOSDirectory
import Domain.Auth.Adapters.JumpCloud.Jobs.SyncDirectory
@@ -416,12 +416,12 @@ defmodule Domain.Auth.Adapters.JumpCloud.Jobs.SyncDirectoryTest do
deleted_membership = Fixtures.Actors.create_membership(account: account, group: group)
Fixtures.Actors.create_membership(account: account, actor: actor, group: deleted_group)
:ok = Events.Hooks.Actors.subscribe_to_memberships(actor.id)
:ok = Events.Hooks.Actors.subscribe_to_memberships(other_actor.id)
:ok = Events.Hooks.Actors.subscribe_to_memberships(deleted_membership.actor_id)
:ok = Events.Hooks.Actors.subscribe_to_policies(actor.id)
:ok = Events.Hooks.Actors.subscribe_to_policies(other_actor.id)
:ok = Events.Hooks.ActorGroups.subscribe_to_policies(deleted_group.id)
:ok = PubSub.Actor.Memberships.subscribe(actor.id)
:ok = PubSub.Actor.Memberships.subscribe(other_actor.id)
:ok = PubSub.Actor.Memberships.subscribe(deleted_membership.actor_id)
:ok = PubSub.Actor.Policies.subscribe(actor.id)
:ok = PubSub.Actor.Policies.subscribe(other_actor.id)
:ok = PubSub.ActorGroup.Policies.subscribe(deleted_group.id)
WorkOSDirectory.override_base_url("http://localhost:#{bypass.port}")
WorkOSDirectory.mock_list_directories_endpoint(bypass)

View File

@@ -1,6 +1,6 @@
defmodule Domain.Auth.Adapters.MicrosoftEntra.Jobs.SyncDirectoryTest do
use Domain.DataCase, async: true
alias Domain.{Auth, Actors, Events}
alias Domain.{Auth, Actors, Events, PubSub}
alias Domain.Mocks.MicrosoftEntraDirectory
import Domain.Auth.Adapters.MicrosoftEntra.Jobs.SyncDirectory
@@ -464,12 +464,12 @@ defmodule Domain.Auth.Adapters.MicrosoftEntra.Jobs.SyncDirectoryTest do
deleted_membership = Fixtures.Actors.create_membership(account: account, group: group)
Fixtures.Actors.create_membership(account: account, actor: actor, group: deleted_group)
:ok = Events.Hooks.Actors.subscribe_to_memberships(actor.id)
:ok = Events.Hooks.Actors.subscribe_to_memberships(other_actor.id)
:ok = Events.Hooks.Actors.subscribe_to_memberships(deleted_membership.actor_id)
:ok = Events.Hooks.Actors.subscribe_to_policies(actor.id)
:ok = Events.Hooks.Actors.subscribe_to_policies(other_actor.id)
:ok = Events.Hooks.ActorGroups.subscribe_to_policies(deleted_group.id)
:ok = PubSub.Actor.Memberships.subscribe(actor.id)
:ok = PubSub.Actor.Memberships.subscribe(other_actor.id)
:ok = PubSub.Actor.Memberships.subscribe(deleted_membership.actor_id)
:ok = PubSub.Actor.Policies.subscribe(actor.id)
:ok = PubSub.Actor.Policies.subscribe(other_actor.id)
:ok = PubSub.ActorGroup.Policies.subscribe(deleted_group.id)
MicrosoftEntraDirectory.mock_groups_list_endpoint(
bypass,

View File

@@ -1,6 +1,6 @@
defmodule Domain.Auth.Adapters.Okta.Jobs.SyncDirectoryTest do
use Domain.DataCase, async: true
alias Domain.{Auth, Actors, Events}
alias Domain.{Auth, Actors, Events, PubSub}
alias Domain.Mocks.OktaDirectory
import Domain.Auth.Adapters.Okta.Jobs.SyncDirectory
@@ -708,12 +708,12 @@ defmodule Domain.Auth.Adapters.Okta.Jobs.SyncDirectoryTest do
deleted_membership = Fixtures.Actors.create_membership(account: account, group: group)
Fixtures.Actors.create_membership(account: account, actor: actor, group: deleted_group)
:ok = Events.Hooks.Actors.subscribe_to_memberships(actor.id)
:ok = Events.Hooks.Actors.subscribe_to_memberships(other_actor.id)
:ok = Events.Hooks.Actors.subscribe_to_memberships(deleted_membership.actor_id)
:ok = Events.Hooks.Actors.subscribe_to_policies(actor.id)
:ok = Events.Hooks.Actors.subscribe_to_policies(other_actor.id)
:ok = Events.Hooks.ActorGroups.subscribe_to_policies(deleted_group.id)
:ok = PubSub.Actor.Memberships.subscribe(actor.id)
:ok = PubSub.Actor.Memberships.subscribe(other_actor.id)
:ok = PubSub.Actor.Memberships.subscribe(deleted_membership.actor_id)
:ok = PubSub.Actor.Policies.subscribe(actor.id)
:ok = PubSub.Actor.Policies.subscribe(other_actor.id)
:ok = PubSub.ActorGroup.Policies.subscribe(deleted_group.id)
OktaDirectory.mock_groups_list_endpoint(bypass, 200, Jason.encode!(groups))
OktaDirectory.mock_users_list_endpoint(bypass, 200, Jason.encode!(users))
@@ -809,12 +809,12 @@ defmodule Domain.Auth.Adapters.Okta.Jobs.SyncDirectoryTest do
"account_id" => deleted_policy.account_id
})
assert_receive {:reject_access, ^policy_id, ^group_id, ^resource_id}
# TODO: WAL
# Remove this after direct broadcast
Process.sleep(100)
assert_receive {:reject_access, ^policy_id, ^group_id, ^resource_id}
# Deleted policies expire all flows authorized by them
deleted_group_flow = Repo.reload(deleted_group_flow)
assert DateTime.compare(deleted_group_flow.expires_at, DateTime.utc_now()) == :lt

View File

@@ -1,7 +1,7 @@
defmodule Domain.ClientsTest do
use Domain.DataCase, async: true
import Domain.Clients
alias Domain.{Clients, Events}
alias Domain.{Clients, PubSub}
setup do
account = Fixtures.Accounts.create_account()
@@ -117,7 +117,11 @@ defmodule Domain.ClientsTest do
assert {:ok, client} = fetch_client_by_id(client.id, subject, preload: [:online?])
assert client.online? == false
assert Events.Hooks.Clients.connect(client) == :ok
{:ok, _} = Clients.Presence.Account.track(client.account_id, client.id)
{:ok, _} = Clients.Presence.Actor.track(client.actor_id, client.id)
:ok = PubSub.Client.subscribe(client.id)
:ok = PubSub.Account.Clients.subscribe(client.account_id)
assert {:ok, client} = fetch_client_by_id(client.id, subject, preload: [:online?])
assert client.online? == true
end
@@ -222,7 +226,10 @@ defmodule Domain.ClientsTest do
assert client = fetch_client_by_id!(client.id, preload: [:online?])
assert client.online? == false
assert Events.Hooks.Clients.connect(client) == :ok
{:ok, _} = Clients.Presence.Account.track(client.account_id, client.id)
{:ok, _} = Clients.Presence.Actor.track(client.actor_id, client.id)
:ok = PubSub.Client.subscribe(client.id)
:ok = PubSub.Account.Clients.subscribe(client.account_id)
assert client = fetch_client_by_id!(client.id, preload: [:online?])
assert client.online? == true
end
@@ -281,7 +288,10 @@ defmodule Domain.ClientsTest do
assert {:ok, [client], _metadata} = list_clients(subject, preload: [:online?])
assert client.online? == false
assert Events.Hooks.Clients.connect(client) == :ok
{:ok, _} = Clients.Presence.Account.track(client.account_id, client.id)
{:ok, _} = Clients.Presence.Actor.track(client.actor_id, client.id)
:ok = PubSub.Client.subscribe(client.id)
:ok = PubSub.Account.Clients.subscribe(client.account_id)
assert {:ok, [client], _metadata} = list_clients(subject, preload: [:online?])
assert client.online? == true
end
@@ -395,7 +405,7 @@ defmodule Domain.ClientsTest do
client_attrs = Fixtures.Clients.client_attrs()
assert changeset = change_client(client, client_attrs)
assert %Ecto.Changeset{data: %Domain.Clients.Client{}} = changeset
assert %Ecto.Changeset{data: %Clients.Client{}} = changeset
assert changeset.changes == %{name: client_attrs.name}
end

View File

@@ -16,7 +16,7 @@ defmodule Domain.Events.Hooks.AccountsTest do
test "sends :config_changed if config changes" do
account = Fixtures.Accounts.create_account()
:ok = subscribe(account.id)
:ok = Domain.PubSub.Account.subscribe(account.id)
old_data = %{
"id" => account.id,
@@ -38,7 +38,7 @@ defmodule Domain.Events.Hooks.AccountsTest do
test "does not send :config_changed if config does not change" do
account = Fixtures.Accounts.create_account()
:ok = subscribe(account.id)
:ok = Domain.PubSub.Account.subscribe(account.id)
old_data = %{
"id" => account.id,
@@ -60,7 +60,7 @@ defmodule Domain.Events.Hooks.AccountsTest do
old_data = %{"id" => account_id, "disabled_at" => nil}
data = %{"id" => account_id, "disabled_at" => DateTime.utc_now()}
:ok = subscribe_to_clients(account_id)
:ok = Domain.PubSub.Account.Clients.subscribe(account_id)
assert :ok == on_update(old_data, data)

View File

@@ -1,7 +1,7 @@
defmodule Domain.Events.Hooks.ActorGroupMembershipsTest do
use API.ChannelCase, async: true
import Domain.Events.Hooks.ActorGroupMemberships
alias Domain.Events.Hooks.Actors
alias Domain.PubSub
setup do
%{old_data: %{}, data: %{}}
@@ -17,9 +17,14 @@ defmodule Domain.Events.Hooks.ActorGroupMembershipsTest do
"group_id" => group_id
}
:ok = Actors.subscribe_to_memberships(actor_id)
:ok = PubSub.Actor.Memberships.subscribe(actor_id)
assert :ok == on_insert(data)
# TODO: WAL
# Remove this when direct broadcast is implement
Process.sleep(100)
assert_receive {:create_membership, ^actor_id, ^group_id}
end
end
@@ -81,7 +86,7 @@ defmodule Domain.Events.Hooks.ActorGroupMembershipsTest do
"group_id" => group_id
}
:ok = Actors.subscribe_to_memberships(actor_id)
:ok = PubSub.Actor.Memberships.subscribe(actor_id)
assert :ok == on_delete(data)
assert_receive {:delete_membership, ^actor_id, ^group_id}

View File

@@ -1,6 +1,7 @@
defmodule Domain.Events.Hooks.ClientsTest do
use Domain.DataCase, async: true
import Domain.Events.Hooks.Clients
alias Domain.{Clients, PubSub}
setup do
%{old_data: %{}, data: %{}}
@@ -15,7 +16,7 @@ defmodule Domain.Events.Hooks.ClientsTest do
describe "update/2" do
test "soft-delete broadcasts disconnect" do
client = Fixtures.Clients.create_client()
:ok = connect(client)
:ok = Clients.Presence.connect(client)
old_data = %{"id" => client.id, "deleted_at" => nil}
data = %{"id" => client.id, "deleted_at" => DateTime.utc_now()}
@@ -28,7 +29,7 @@ defmodule Domain.Events.Hooks.ClientsTest do
test "update broadcasts :update" do
client = Fixtures.Clients.create_client()
:ok = connect(client)
:ok = Clients.Presence.connect(client)
old_data = %{"id" => client.id, "name" => "Old Client"}
data = %{"id" => client.id, "name" => "Updated Client"}
@@ -43,7 +44,7 @@ defmodule Domain.Events.Hooks.ClientsTest do
describe "delete/1" do
test "broadcasts disconnect" do
client = Fixtures.Clients.create_client()
:ok = connect(client)
:ok = Clients.Presence.connect(client)
old_data = %{"id" => client.id}
@@ -57,20 +58,12 @@ defmodule Domain.Events.Hooks.ClientsTest do
describe "connect/1" do
test "tracks client presence and subscribes to topics" do
client = Fixtures.Clients.create_client()
assert :ok == connect(client)
assert :ok == Clients.Presence.connect(client)
assert client.account_id
|> Domain.Events.Hooks.Accounts.clients_presence_topic()
|> Domain.Clients.Presence.get_by_key(client.id)
assert Clients.Presence.Account.get(client.account_id, client.id)
assert Clients.Presence.Actor.get(client.actor_id, client.id)
assert client.actor_id
|> Domain.Events.Hooks.Actors.clients_presence_topic()
|> Domain.Clients.Presence.get_by_key(client.id)
Domain.PubSub.broadcast(
Domain.Events.Hooks.Accounts.clients_topic(client.account_id),
:test_event
)
PubSub.Account.Clients.broadcast(client.account_id, :test_event)
assert_receive :test_event
end
@@ -79,9 +72,9 @@ defmodule Domain.Events.Hooks.ClientsTest do
describe "broadcast/2" do
test "broadcasts payload to client topic" do
client = Fixtures.Clients.create_client()
:ok = connect(client)
:ok = Clients.Presence.connect(client)
assert :ok == broadcast(client.id, :updated)
assert :ok == PubSub.Client.broadcast(client.id, :updated)
assert_receive :updated
end

View File

@@ -27,7 +27,7 @@ defmodule Domain.Events.Hooks.FlowsTest do
"resource_id" => resource_id
}
:ok = subscribe(flow_id)
:ok = Domain.PubSub.Flow.subscribe(flow_id)
assert :ok == on_update(old_data, data)
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
@@ -47,7 +47,7 @@ defmodule Domain.Events.Hooks.FlowsTest do
"resource_id" => resource_id
}
:ok = subscribe(flow_id)
:ok = Domain.PubSub.Flow.subscribe(flow_id)
assert :ok == on_update(old_data, data)
refute_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}
@@ -84,7 +84,7 @@ defmodule Domain.Events.Hooks.FlowsTest do
"resource_id" => resource_id
}
:ok = subscribe(flow_id)
:ok = Domain.PubSub.Flow.subscribe(flow_id)
assert :ok == on_delete(old_data)
assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id}

View File

@@ -19,7 +19,7 @@ defmodule Domain.Events.Hooks.GatewaysTest do
old_data = %{"id" => gateway.id, "deleted_at" => nil}
data = %{"id" => gateway.id, "deleted_at" => "2023-10-01T00:00:00Z"}
:ok = connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
:ok = on_update(old_data, data)
assert_receive "disconnect"
@@ -31,7 +31,7 @@ defmodule Domain.Events.Hooks.GatewaysTest do
old_data = %{"id" => gateway.id}
data = %{"id" => gateway.id, "name" => "New Gateway Name"}
:ok = connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
:ok = on_update(old_data, data)
refute_receive "disconnect"
@@ -44,7 +44,7 @@ defmodule Domain.Events.Hooks.GatewaysTest do
old_data = %{"id" => gateway.id}
:ok = connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
:ok = on_delete(old_data)
assert_receive "disconnect"

View File

@@ -1,7 +1,7 @@
defmodule Domain.Events.Hooks.PoliciesTest do
use Domain.DataCase, async: true
import Domain.Events.Hooks.Policies
alias Domain.Events
alias Domain.PubSub
describe "insert/1" do
test "broadcasts :create_policy and :allow_access" do
@@ -17,9 +17,9 @@ defmodule Domain.Events.Hooks.PoliciesTest do
"resource_id" => resource_id
}
:ok = subscribe(policy_id)
:ok = Events.Hooks.Accounts.subscribe_to_policies(account_id)
:ok = Events.Hooks.ActorGroups.subscribe_to_policies(actor_group_id)
:ok = PubSub.Policy.subscribe(policy_id)
:ok = PubSub.Account.Policies.subscribe(account_id)
:ok = PubSub.ActorGroup.Policies.subscribe(actor_group_id)
assert :ok == on_insert(data)
assert_receive {:create_policy, ^policy_id}
@@ -45,9 +45,9 @@ defmodule Domain.Events.Hooks.PoliciesTest do
data = Map.put(old_data, "disabled_at", nil)
:ok = subscribe(policy_id)
:ok = Events.Hooks.Accounts.subscribe_to_policies(account_id)
:ok = Events.Hooks.ActorGroups.subscribe_to_policies(actor_group_id)
:ok = Domain.PubSub.Policy.subscribe(policy_id)
:ok = Domain.PubSub.Account.Policies.subscribe(account_id)
:ok = Domain.PubSub.ActorGroup.Policies.subscribe(actor_group_id)
assert :ok == on_update(old_data, data)
assert_receive {:enable_policy, ^policy_id}
@@ -72,9 +72,9 @@ defmodule Domain.Events.Hooks.PoliciesTest do
data = Map.put(old_data, "disabled_at", "2023-10-01T00:00:00Z")
:ok = subscribe(policy_id)
:ok = Events.Hooks.Accounts.subscribe_to_policies(account_id)
:ok = Events.Hooks.ActorGroups.subscribe_to_policies(actor_group_id)
:ok = PubSub.Policy.subscribe(policy_id)
:ok = PubSub.Account.Policies.subscribe(account_id)
:ok = PubSub.ActorGroup.Policies.subscribe(actor_group_id)
assert :ok == on_update(old_data, data)
assert_receive {:disable_policy, ^policy_id}
@@ -107,9 +107,9 @@ defmodule Domain.Events.Hooks.PoliciesTest do
data = Map.put(old_data, "deleted_at", "2023-10-01T00:00:00Z")
:ok = subscribe(policy_id)
:ok = Events.Hooks.Accounts.subscribe_to_policies(account_id)
:ok = Events.Hooks.ActorGroups.subscribe_to_policies(actor_group_id)
:ok = PubSub.Policy.subscribe(policy_id)
:ok = PubSub.Account.Policies.subscribe(account_id)
:ok = PubSub.ActorGroup.Policies.subscribe(actor_group_id)
assert :ok == on_update(old_data, data)
assert_receive {:delete_policy, ^policy_id}
@@ -142,9 +142,9 @@ defmodule Domain.Events.Hooks.PoliciesTest do
data = Map.put(old_data, "resource_id", "new-resource-123")
:ok = subscribe(policy_id)
:ok = Events.Hooks.Accounts.subscribe_to_policies(account_id)
:ok = Events.Hooks.ActorGroups.subscribe_to_policies(actor_group_id)
:ok = PubSub.Policy.subscribe(policy_id)
:ok = PubSub.Account.Policies.subscribe(account_id)
:ok = PubSub.ActorGroup.Policies.subscribe(actor_group_id)
assert :ok == on_update(old_data, data)
@@ -182,9 +182,9 @@ defmodule Domain.Events.Hooks.PoliciesTest do
data = Map.put(old_data, "resource_id", "new-resource-123")
:ok = subscribe(policy_id)
:ok = Events.Hooks.Accounts.subscribe_to_policies(account_id)
:ok = Events.Hooks.ActorGroups.subscribe_to_policies(actor_group_id)
:ok = PubSub.Policy.subscribe(policy_id)
:ok = PubSub.Account.Policies.subscribe(account_id)
:ok = PubSub.ActorGroup.Policies.subscribe(actor_group_id)
assert :ok == on_update(old_data, data)
@@ -219,9 +219,9 @@ defmodule Domain.Events.Hooks.PoliciesTest do
data = Map.put(old_data, "resource_id", "new-resource-123")
:ok = subscribe(policy_id)
:ok = Events.Hooks.Accounts.subscribe_to_policies(account_id)
:ok = Events.Hooks.ActorGroups.subscribe_to_policies(actor_group_id)
:ok = PubSub.Policy.subscribe(policy_id)
:ok = PubSub.Account.Policies.subscribe(account_id)
:ok = PubSub.ActorGroup.Policies.subscribe(actor_group_id)
assert :ok == on_update(old_data, data)
@@ -248,9 +248,9 @@ defmodule Domain.Events.Hooks.PoliciesTest do
data = Map.put(old_data, "deleted_at", "2023-10-01T00:00:00Z")
:ok = subscribe(policy_id)
:ok = Events.Hooks.Accounts.subscribe_to_policies(account_id)
:ok = Events.Hooks.ActorGroups.subscribe_to_policies(actor_group_id)
:ok = PubSub.Policy.subscribe(policy_id)
:ok = PubSub.Account.Policies.subscribe(account_id)
:ok = PubSub.ActorGroup.Policies.subscribe(actor_group_id)
assert :ok == on_update(old_data, data)
assert_receive {:delete_policy, ^policy_id}

View File

@@ -1,13 +1,14 @@
defmodule Domain.Events.Hooks.ResourcesTest do
use Domain.DataCase, async: true
import Domain.Events.Hooks.Resources
alias Domain.PubSub
describe "insert/1" do
test "broadcasts :create_resource to subscribed" do
resource_id = "test_resource"
account_id = "test_account"
:ok = subscribe(resource_id)
:ok = Domain.Events.Hooks.Accounts.subscribe_to_resources(account_id)
:ok = PubSub.Resource.subscribe(resource_id)
:ok = PubSub.Account.Resources.subscribe(account_id)
data = %{"id" => resource_id, "account_id" => account_id}
@@ -17,7 +18,7 @@ defmodule Domain.Events.Hooks.ResourcesTest do
assert_receive {:create_resource, ^resource_id}
assert_receive {:create_resource, ^resource_id}
:ok = unsubscribe(resource_id)
:ok = PubSub.Resource.unsubscribe(resource_id)
assert :ok = on_insert(data)
assert_receive {:create_resource, ^resource_id}
@@ -44,8 +45,8 @@ defmodule Domain.Events.Hooks.ResourcesTest do
test "broadcasts :delete_resource to subscribed for soft-deletions" do
resource_id = "test_resource"
account_id = "test_account"
:ok = subscribe(resource_id)
:ok = Domain.Events.Hooks.Accounts.subscribe_to_resources(account_id)
:ok = PubSub.Resource.subscribe(resource_id)
:ok = PubSub.Account.Resources.subscribe(account_id)
old_data = %{"id" => resource_id, "account_id" => account_id, "deleted_at" => nil}
@@ -59,7 +60,7 @@ defmodule Domain.Events.Hooks.ResourcesTest do
assert_receive {:delete_resource, ^resource_id}
assert_receive {:delete_resource, ^resource_id}
:ok = unsubscribe(resource_id)
:ok = PubSub.Resource.unsubscribe(resource_id)
assert :ok = on_update(old_data, data)
assert_receive {:delete_resource, ^resource_id}
@@ -67,8 +68,8 @@ defmodule Domain.Events.Hooks.ResourcesTest do
end
test "expires flows when resource type changes", %{flow: flow, old_data: old_data} do
:ok = subscribe(flow.resource_id)
:ok = Domain.Events.Hooks.Accounts.subscribe_to_resources(flow.account_id)
:ok = PubSub.Resource.subscribe(flow.resource_id)
:ok = PubSub.Account.Resources.subscribe(flow.account_id)
data = Map.put(old_data, "type", "cidr")
@@ -90,8 +91,8 @@ defmodule Domain.Events.Hooks.ResourcesTest do
end
test "expires flows when resource address changes", %{flow: flow, old_data: old_data} do
:ok = subscribe(flow.resource_id)
:ok = Domain.Events.Hooks.Accounts.subscribe_to_resources(flow.account_id)
:ok = PubSub.Resource.subscribe(flow.resource_id)
:ok = PubSub.Account.Resources.subscribe(flow.account_id)
data = Map.put(old_data, "address", "4.3.2.1")
@@ -113,8 +114,8 @@ defmodule Domain.Events.Hooks.ResourcesTest do
end
test "expires flows when resource filters change", %{flow: flow, old_data: old_data} do
:ok = subscribe(flow.resource_id)
:ok = Domain.Events.Hooks.Accounts.subscribe_to_resources(flow.account_id)
:ok = PubSub.Resource.subscribe(flow.resource_id)
:ok = PubSub.Account.Resources.subscribe(flow.account_id)
data = Map.put(old_data, "filters", ["new_filter"])
@@ -136,8 +137,8 @@ defmodule Domain.Events.Hooks.ResourcesTest do
end
test "expires flows when resource ip_stack changes", %{flow: flow, old_data: old_data} do
:ok = subscribe(flow.resource_id)
:ok = Domain.Events.Hooks.Accounts.subscribe_to_resources(flow.account_id)
:ok = PubSub.Resource.subscribe(flow.resource_id)
:ok = PubSub.Account.Resources.subscribe(flow.account_id)
data = Map.put(old_data, "ip_stack", "ipv4_only")
@@ -159,8 +160,8 @@ defmodule Domain.Events.Hooks.ResourcesTest do
end
test "broadcasts update for non-addressability change", %{flow: flow, old_data: old_data} do
:ok = subscribe(flow.resource_id)
:ok = Domain.Events.Hooks.Accounts.subscribe_to_resources(flow.account_id)
:ok = PubSub.Resource.subscribe(flow.resource_id)
:ok = PubSub.Account.Resources.subscribe(flow.account_id)
data = Map.put(old_data, "name", "New Name")
@@ -181,8 +182,8 @@ defmodule Domain.Events.Hooks.ResourcesTest do
test "broadcasts :delete_resource to subscribed" do
resource_id = "test_resource"
account_id = "test_account"
:ok = subscribe(resource_id)
:ok = Domain.Events.Hooks.Accounts.subscribe_to_resources(account_id)
:ok = PubSub.Resource.subscribe(resource_id)
:ok = PubSub.Account.Resources.subscribe(account_id)
old_data = %{"id" => resource_id, "account_id" => account_id}
@@ -190,7 +191,7 @@ defmodule Domain.Events.Hooks.ResourcesTest do
assert_receive {:delete_resource, ^resource_id}
assert_receive {:delete_resource, ^resource_id}
:ok = unsubscribe(resource_id)
:ok = PubSub.Resource.unsubscribe(resource_id)
assert :ok = on_delete(old_data)
assert_receive {:delete_resource, ^resource_id}

View File

@@ -1,7 +1,7 @@
defmodule Domain.GatewaysTest do
use Domain.DataCase, async: true
import Domain.Gateways
alias Domain.{Events, Gateways, Tokens, Resources}
alias Domain.{Gateways, Tokens, Resources}
setup do
account = Fixtures.Accounts.create_account()
@@ -622,7 +622,7 @@ defmodule Domain.GatewaysTest do
} do
offline_gateway = Fixtures.Gateways.create_gateway(account: account)
online_gateway = Fixtures.Gateways.create_gateway(account: account)
:ok = Events.Hooks.Gateways.connect(online_gateway)
:ok = Gateways.Presence.connect(online_gateway)
Fixtures.Gateways.create_gateway()
assert {:ok, gateways, _metadata} = list_gateways(subject, preload: :online?)
@@ -689,7 +689,7 @@ defmodule Domain.GatewaysTest do
connections: [%{gateway_group_id: gateway.group_id}]
)
assert Events.Hooks.Gateways.connect(gateway) == :ok
assert Gateways.Presence.connect(gateway) == :ok
assert {:ok, [connected_gateway]} = all_connected_gateways_for_resource(resource, subject)
assert connected_gateway.id == gateway.id
@@ -702,7 +702,7 @@ defmodule Domain.GatewaysTest do
resource = Fixtures.Resources.create_resource(account: account)
gateway = Fixtures.Gateways.create_gateway(account: account)
assert Events.Hooks.Gateways.connect(gateway) == :ok
assert Gateways.Presence.connect(gateway) == :ok
assert all_connected_gateways_for_resource(resource, subject) == {:ok, []}
end
@@ -711,7 +711,7 @@ defmodule Domain.GatewaysTest do
describe "gateway_can_connect_to_resource?/2" do
test "returns true when gateway can connect to resource", %{account: account} do
gateway = Fixtures.Gateways.create_gateway(account: account)
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Gateways.Presence.connect(gateway)
resource =
Fixtures.Resources.create_resource(
@@ -724,7 +724,7 @@ defmodule Domain.GatewaysTest do
test "returns false when gateway cannot connect to resource", %{account: account} do
gateway = Fixtures.Gateways.create_gateway(account: account)
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Gateways.Presence.connect(gateway)
resource = Fixtures.Resources.create_resource(account: account)
@@ -1206,15 +1206,15 @@ defmodule Domain.GatewaysTest do
test "does not allow duplicate presence", %{account: account} do
gateway = Fixtures.Gateways.create_gateway(account: account)
assert Events.Hooks.Gateways.connect(gateway) == :ok
assert Gateways.Presence.connect(gateway) == :ok
assert {:error, {:already_tracked, _pid, _topic, _key}} =
Events.Hooks.Gateways.connect(gateway)
Gateways.Presence.connect(gateway)
end
test "tracks gateway presence for account", %{account: account} do
gateway = Fixtures.Gateways.create_gateway(account: account)
assert Events.Hooks.Gateways.connect(gateway) == :ok
assert Gateways.Presence.connect(gateway) == :ok
gateway = fetch_gateway_by_id!(gateway.id, preload: [:online?])
assert gateway.online? == true

View File

@@ -1,8 +1,7 @@
defmodule Domain.Notifications.Jobs.OutdatedGatewaysTest do
use Domain.DataCase, async: true
import Domain.Notifications.Jobs.OutdatedGateways
alias Domain.ComponentVersions
alias Domain.Events
alias Domain.{ComponentVersions, Gateways, PubSub}
describe "execute/1" do
setup do
@@ -42,8 +41,10 @@ defmodule Domain.Notifications.Jobs.OutdatedGatewaysTest do
new_config = update_component_versions_config(gateway: new_version)
Domain.Config.put_env_override(ComponentVersions, new_config)
:ok = Events.Hooks.GatewayGroups.subscribe_to_presence(gateway_group.id)
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Gateways.Presence.Group.subscribe(gateway_group.id)
{:ok, _} = Gateways.Presence.Group.track(gateway.group_id, gateway.id)
{:ok, _} = Gateways.Presence.Account.track(gateway.account_id, gateway.id)
:ok = PubSub.Gateway.subscribe(gateway.id)
assert_receive %Phoenix.Socket.Broadcast{topic: "presences:group_gateways:" <> _}
assert execute(%{}) == :ok
@@ -66,8 +67,10 @@ defmodule Domain.Notifications.Jobs.OutdatedGatewaysTest do
new_config = update_component_versions_config(gateway: version)
Domain.Config.put_env_override(ComponentVersions, new_config)
:ok = Events.Hooks.GatewayGroups.subscribe_to_presence(gateway_group.id)
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Gateways.Presence.Group.subscribe(gateway_group.id)
{:ok, _} = Gateways.Presence.Group.track(gateway.group_id, gateway.id)
{:ok, _} = Gateways.Presence.Account.track(gateway.account_id, gateway.id)
:ok = PubSub.Gateway.subscribe(gateway.id)
assert_receive %Phoenix.Socket.Broadcast{topic: "presences:group_gateways:" <> _}
assert execute(%{}) == :ok

View File

@@ -2,7 +2,7 @@ defmodule Web.Actors.Show do
use Web, :live_view
import Web.Actors.Components
import Web.Clients.Components
alias Domain.{Accounts, Auth, Events, Tokens, Flows, Clients}
alias Domain.{Accounts, Auth, Tokens, Flows, Clients}
alias Domain.Actors
def mount(%{"id" => id}, _session, socket) do
@@ -11,7 +11,7 @@ defmodule Web.Actors.Show do
preload: [:identities, :last_seen_at, groups: [:provider]]
) do
if connected?(socket) do
:ok = Events.Hooks.Actors.subscribe_to_clients_presence(actor.id)
:ok = Clients.Presence.Actor.subscribe(actor.id)
end
available_providers =

View File

@@ -3,11 +3,10 @@ defmodule Web.Clients.Index do
import Web.Actors.Components
import Web.Clients.Components
alias Domain.Clients
alias Domain.Events
def mount(_params, _session, socket) do
if connected?(socket) do
:ok = Events.Hooks.Accounts.subscribe_to_clients_presence(socket.assigns.subject.account.id)
:ok = Clients.Presence.Account.subscribe(socket.assigns.subject.account.id)
end
socket =

View File

@@ -2,7 +2,7 @@ defmodule Web.Clients.Show do
use Web, :live_view
import Web.Policies.Components
import Web.Clients.Components
alias Domain.{Accounts, Clients, Events, Flows}
alias Domain.{Accounts, Clients, Flows}
def mount(%{"id" => id}, _session, socket) do
with {:ok, client} <-
@@ -14,7 +14,7 @@ defmodule Web.Clients.Show do
]
) do
if connected?(socket) do
:ok = Events.Hooks.Actors.subscribe_to_clients_presence(client.actor_id)
:ok = Clients.Presence.Actor.subscribe(client.actor_id)
end
socket =

View File

@@ -1,13 +1,12 @@
defmodule Web.Gateways.Show do
use Web, :live_view
alias Domain.Gateways
alias Domain.Events
def mount(%{"id" => id}, _session, socket) do
with {:ok, gateway} <-
Gateways.fetch_gateway_by_id(id, socket.assigns.subject, preload: [:group, :online?]) do
if connected?(socket) do
:ok = Events.Hooks.GatewayGroups.subscribe_to_presence(gateway.group_id)
:ok = Gateways.Presence.Group.subscribe(gateway.group_id)
end
socket =

View File

@@ -1,10 +1,10 @@
defmodule Web.Policies.Index do
use Web, :live_view
alias Domain.{Events, Policies}
alias Domain.{PubSub, Policies}
def mount(_params, _session, socket) do
if connected?(socket) do
:ok = Events.Hooks.Accounts.subscribe_to_policies(socket.assigns.account.id)
:ok = PubSub.Account.Policies.subscribe(socket.assigns.account.id)
end
socket =

View File

@@ -1,7 +1,7 @@
defmodule Web.Policies.Show do
use Web, :live_view
import Web.Policies.Components
alias Domain.{Accounts, Policies, Events, Flows, Auth}
alias Domain.{Accounts, Policies, PubSub, Flows, Auth}
def mount(%{"id" => id}, _session, socket) do
with {:ok, policy} <-
@@ -16,7 +16,7 @@ defmodule Web.Policies.Show do
providers = Auth.all_active_providers_for_account!(socket.assigns.account)
if connected?(socket) do
:ok = Events.Hooks.Policies.subscribe(policy.id)
:ok = PubSub.Policy.subscribe(policy.id)
end
socket =

View File

@@ -1,11 +1,11 @@
defmodule Web.Resources.Index do
use Web, :live_view
alias Domain.Events
alias Domain.PubSub
alias Domain.Resources
def mount(_params, _session, socket) do
if connected?(socket) do
:ok = Events.Hooks.Accounts.subscribe_to_resources(socket.assigns.account.id)
:ok = PubSub.Account.Resources.subscribe(socket.assigns.account.id)
end
socket =

View File

@@ -2,14 +2,14 @@ defmodule Web.Resources.Show do
use Web, :live_view
import Web.Policies.Components
import Web.Resources.Components
alias Domain.{Accounts, Events, Resources, Policies, Flows}
alias Domain.{Accounts, PubSub, Resources, Policies, Flows}
def mount(%{"id" => id} = params, _session, socket) do
with {:ok, resource} <- fetch_resource(id, socket.assigns.subject),
{:ok, actor_groups_peek} <-
Resources.peek_resource_actor_groups([resource], 3, socket.assigns.subject) do
if connected?(socket) do
:ok = Events.Hooks.Resources.subscribe(resource.id)
:ok = PubSub.Resource.subscribe(resource.id)
end
socket =

View File

@@ -1,12 +1,11 @@
defmodule Web.Sites.Gateways.Index do
use Web, :live_view
alias Domain.Gateways
alias Domain.Events
def mount(%{"id" => id}, _session, socket) do
with {:ok, group} <- Gateways.fetch_group_by_id(id, socket.assigns.subject) do
if connected?(socket) do
:ok = Events.Hooks.GatewayGroups.subscribe_to_presence(group.id)
:ok = Gateways.Presence.Group.subscribe(group.id)
end
socket =

View File

@@ -1,12 +1,11 @@
defmodule Web.Sites.Index do
use Web, :live_view
alias Domain.Gateways
alias Domain.Events
require Logger
def mount(_params, _session, socket) do
if connected?(socket) do
:ok = Events.Hooks.Accounts.subscribe_to_gateways_presence(socket.assigns.account.id)
:ok = Gateways.Presence.Account.subscribe(socket.assigns.account.id)
end
{:ok, managed_groups, _metadata} =

View File

@@ -1,7 +1,6 @@
defmodule Web.Sites.NewToken do
use Web, :live_view
alias Domain.Gateways
alias Domain.Events
def mount(%{"id" => id}, _session, socket) do
with {:ok, group} <-
@@ -13,7 +12,7 @@ defmodule Web.Sites.NewToken do
{group, token, env} =
if connected?(socket) do
{:ok, token, encoded_token} = Gateways.create_token(group, %{}, socket.assigns.subject)
:ok = Events.Hooks.GatewayGroups.subscribe_to_presence(group.id)
:ok = Gateways.Presence.Group.subscribe(group.id)
{group, token, env(encoded_token)}
else
{group, nil, nil}

View File

@@ -1,12 +1,12 @@
defmodule Web.Sites.Show do
use Web, :live_view
alias Domain.{Gateways, Resources, Policies, Events, Flows, Tokens}
alias Domain.{Gateways, Resources, Policies, Flows, Tokens}
def mount(%{"id" => id}, _session, socket) do
with {:ok, group} <-
Gateways.fetch_group_by_id(id, socket.assigns.subject) do
if connected?(socket) do
:ok = Events.Hooks.GatewayGroups.subscribe_to_presence(group.id)
:ok = Gateways.Presence.Group.subscribe(group.id)
end
socket =

View File

@@ -1,5 +1,6 @@
defmodule Web.Live.Actors.IndexTest do
use Web.ConnCase, async: true
alias Domain.Clients
setup do
account = Fixtures.Accounts.create_account()
@@ -60,7 +61,7 @@ defmodule Web.Live.Actors.IndexTest do
admin_actor = Fixtures.Actors.create_actor(account: account, type: :account_admin_user)
Fixtures.Actors.create_membership(account: account, actor: admin_actor)
client = Fixtures.Clients.create_client(account: account, actor: admin_actor)
Domain.Events.Hooks.Clients.connect(client)
:ok = Clients.Presence.connect(client)
admin_actor = Repo.preload(admin_actor, identities: [:provider], groups: [])
user_actor = Fixtures.Actors.create_actor(account: account, type: :account_user)

View File

@@ -1,6 +1,5 @@
defmodule Web.Live.Actors.ShowTest do
use Web.ConnCase, async: true
alias Domain.Events
test "redirects to sign in page for unauthorized user", %{conn: conn} do
account = Fixtures.Accounts.create_account()
@@ -92,8 +91,8 @@ defmodule Web.Live.Actors.ShowTest do
|> live(~p"/#{account}/actors/#{actor}")
Domain.Config.put_env_override(:test_pid, self())
:ok = Events.Hooks.Actors.subscribe_to_clients_presence(actor.id)
assert Events.Hooks.Clients.connect(client) == :ok
:ok = Domain.Clients.Presence.Actor.subscribe(actor.id)
assert Domain.Clients.Presence.connect(client) == :ok
assert_receive %Phoenix.Socket.Broadcast{topic: "presences:actor_clients:" <> _}
assert_receive {:live_table_reloaded, "clients"}, 500

View File

@@ -1,6 +1,5 @@
defmodule Web.Live.Clients.IndexTest do
use Web.ConnCase, async: true
alias Domain.Events
setup do
account = Fixtures.Accounts.create_account()
@@ -60,7 +59,7 @@ defmodule Web.Live.Clients.IndexTest do
online_client = Fixtures.Clients.create_client(account: account)
offline_client = Fixtures.Clients.create_client(account: account)
:ok = Events.Hooks.Clients.connect(online_client)
:ok = Domain.Clients.Presence.connect(online_client)
{:ok, lv, _html} =
conn
@@ -103,8 +102,8 @@ defmodule Web.Live.Clients.IndexTest do
|> live(~p"/#{account}/clients")
Domain.Config.put_env_override(:test_pid, self())
:ok = Events.Hooks.Actors.subscribe_to_clients_presence(client.actor_id)
assert Events.Hooks.Clients.connect(client) == :ok
:ok = Domain.Clients.Presence.Actor.subscribe(client.actor_id)
assert Domain.Clients.Presence.connect(client) == :ok
assert_receive %Phoenix.Socket.Broadcast{topic: "presences:actor_clients:" <> _}
assert_receive {:live_table_reloaded, "clients"}, 250

View File

@@ -1,6 +1,5 @@
defmodule Web.Live.Clients.ShowTest do
use Web.ConnCase, async: true
alias Domain.Events
setup do
account = Fixtures.Accounts.create_account()
@@ -117,7 +116,7 @@ defmodule Web.Live.Clients.ShowTest do
identity: identity,
conn: conn
} do
:ok = Events.Hooks.Clients.connect(client)
:ok = Domain.Clients.Presence.connect(client)
{:ok, lv, _html} =
conn
@@ -145,8 +144,8 @@ defmodule Web.Live.Clients.ShowTest do
|> authorize_conn(identity)
|> live(~p"/#{account}/clients/#{client}")
Events.Hooks.Actors.subscribe_to_clients_presence(actor.id)
assert Events.Hooks.Clients.connect(client) == :ok
:ok = Domain.Clients.Presence.Actor.subscribe(actor.id)
assert Domain.Clients.Presence.connect(client) == :ok
assert_receive %Phoenix.Socket.Broadcast{topic: "presences:actor_clients:" <> _}
wait_for(fn ->

View File

@@ -1,6 +1,5 @@
defmodule Web.Live.Gateways.ShowTest do
use Web.ConnCase, async: true
alias Domain.Events
setup do
account = Fixtures.Accounts.create_account()
@@ -103,7 +102,7 @@ defmodule Web.Live.Gateways.ShowTest do
identity: identity,
conn: conn
} do
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
{:ok, lv, _html} =
conn
@@ -150,8 +149,8 @@ defmodule Web.Live.Gateways.ShowTest do
|> authorize_conn(identity)
|> live(~p"/#{account}/gateways/#{gateway}")
:ok = Events.Hooks.GatewayGroups.subscribe_to_presence(gateway.group.id)
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.Group.subscribe(gateway.group.id)
:ok = Domain.Gateways.Presence.connect(gateway)
assert_receive %{topic: "presences:group_gateways:" <> _}
table =

View File

@@ -1,6 +1,6 @@
defmodule Web.Live.Resources.EditTest do
use Web.ConnCase, async: true
alias Domain.Events
alias Domain.{Events, PubSub}
setup do
account = Fixtures.Accounts.create_account()
@@ -229,7 +229,7 @@ defmodule Web.Live.Resources.EditTest do
}
}
:ok = Events.Hooks.Accounts.subscribe_to_resources(account.id)
:ok = PubSub.Account.Resources.subscribe(account.id)
{:ok, lv, _html} =
conn

View File

@@ -1,6 +1,5 @@
defmodule Web.Live.Sites.Gateways.IndexTest do
use Web.ConnCase, async: true
alias Domain.Events
setup do
account = Fixtures.Accounts.create_account()
@@ -96,8 +95,8 @@ defmodule Web.Live.Sites.Gateways.IndexTest do
|> authorize_conn(identity)
|> live(~p"/#{account}/sites/#{group}/gateways")
:ok = Events.Hooks.GatewayGroups.subscribe_to_presence(group.id)
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.Group.subscribe(group.id)
:ok = Domain.Gateways.Presence.connect(gateway)
assert_receive %Phoenix.Socket.Broadcast{topic: "presences:group_gateways:" <> _}
wait_for(fn ->

View File

@@ -1,6 +1,5 @@
defmodule Web.Live.Sites.IndexTest do
use Web.ConnCase, async: true
alias Domain.Events
setup do
account = Fixtures.Accounts.create_account()
@@ -98,8 +97,8 @@ defmodule Web.Live.Sites.IndexTest do
|> live(~p"/#{account}/sites")
Domain.Config.put_env_override(:test_pid, self())
:ok = Events.Hooks.Accounts.subscribe_to_gateways_presence(account.id)
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.Account.subscribe(account.id)
:ok = Domain.Gateways.Presence.connect(gateway)
assert_receive %Phoenix.Socket.Broadcast{topic: "presences:account_gateways:" <> _}
assert_receive {:live_table_reloaded, "groups"}, 250
@@ -143,7 +142,7 @@ defmodule Web.Live.Sites.IndexTest do
group = Fixtures.Gateways.create_internet_group(account: account)
gateway = Fixtures.Gateways.create_gateway(account: account, group: group)
Domain.Config.put_env_override(:test_pid, self())
:ok = Events.Hooks.Accounts.subscribe_to_gateways_presence(account.id)
:ok = Domain.Gateways.Presence.Account.subscribe(account.id)
{:ok, lv, _html} =
conn
@@ -156,7 +155,7 @@ defmodule Web.Live.Sites.IndexTest do
assert has_element?(lv, "#internet-site-banner a[href='/#{account.slug}/sites/#{group.id}']")
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
assert_receive %Phoenix.Socket.Broadcast{topic: "presences:account_gateways:" <> _}
assert_receive {:live_table_reloaded, "groups"}, 250
assert lv |> element("#internet-site-banner") |> render() =~ "Online"

View File

@@ -1,6 +1,5 @@
defmodule Web.Live.Sites.NewTokenTest do
use Web.ConnCase, async: true
alias Domain.Events
setup do
account = Fixtures.Accounts.create_account()
@@ -37,11 +36,11 @@ defmodule Web.Live.Sites.NewTokenTest do
|> List.last()
|> String.trim("&quot;")
:ok = Events.Hooks.GatewayGroups.subscribe_to_presence(group.id)
:ok = Domain.Gateways.Presence.Group.subscribe(group.id)
context = Fixtures.Auth.build_context(type: :gateway_group)
assert {:ok, group, token} = Domain.Gateways.authenticate(token, context)
gateway = Fixtures.Gateways.create_gateway(account: account, group: group, token: token)
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
assert_receive %Phoenix.Socket.Broadcast{topic: "presences:group_gateways:" <> _group_id}

View File

@@ -1,6 +1,5 @@
defmodule Web.Live.Sites.ShowTest do
use Web.ConnCase, async: true
alias Domain.Events
setup do
account = Fixtures.Accounts.create_account()
@@ -143,7 +142,7 @@ defmodule Web.Live.Sites.ShowTest do
gateway: gateway,
conn: conn
} do
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
Fixtures.Gateways.create_gateway(account: account, group: group)
{:ok, lv, _html} =
@@ -180,8 +179,8 @@ defmodule Web.Live.Sites.ShowTest do
|> authorize_conn(identity)
|> live(~p"/#{account}/sites/#{group}")
:ok = Events.Hooks.GatewayGroups.subscribe_to_presence(group.id)
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.Group.subscribe(group.id)
:ok = Domain.Gateways.Presence.connect(gateway)
assert_receive %Phoenix.Socket.Broadcast{topic: "presences:group_gateways:" <> _}
wait_for(fn ->
@@ -378,7 +377,7 @@ defmodule Web.Live.Sites.ShowTest do
gateway: gateway,
conn: conn
} do
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
Fixtures.Gateways.create_gateway(account: account, group: group)
{:ok, lv, _html} =
@@ -414,8 +413,8 @@ defmodule Web.Live.Sites.ShowTest do
|> authorize_conn(identity)
|> live(~p"/#{account}/sites/#{group}")
:ok = Events.Hooks.GatewayGroups.subscribe_to_presence(group.id)
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.Group.subscribe(group.id)
:ok = Domain.Gateways.Presence.connect(gateway)
assert_receive %Phoenix.Socket.Broadcast{topic: "presences:group_gateways:" <> _}
wait_for(fn ->
@@ -601,7 +600,7 @@ defmodule Web.Live.Sites.ShowTest do
gateway: gateway,
conn: conn
} do
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.connect(gateway)
Fixtures.Gateways.create_gateway(account: account, group: group)
{:ok, lv, _html} =
@@ -637,8 +636,8 @@ defmodule Web.Live.Sites.ShowTest do
|> authorize_conn(identity)
|> live(~p"/#{account}/sites/#{group}")
:ok = Events.Hooks.GatewayGroups.subscribe_to_presence(group.id)
:ok = Events.Hooks.Gateways.connect(gateway)
:ok = Domain.Gateways.Presence.Group.subscribe(group.id)
:ok = Domain.Gateways.Presence.connect(gateway)
assert_receive %Phoenix.Socket.Broadcast{topic: "presences:group_gateways:" <> _}
wait_for(fn ->