From c6545fe853d21d52c15870aca8b1992f5bd51b53 Mon Sep 17 00:00:00 2001 From: Jamil Date: Sat, 14 Jun 2025 21:30:57 -0700 Subject: [PATCH] refactor(portal): consolidate pubsub functions (#9529) We issue broadcasts and subscribes in many places throughout the portal. To help keep the cognitive overhead low, this PR consolidates all PubSub functionality to the `Domain.PubSub` module. This allows for: - better maintainability - see all of the topics we use at a glance - consolidate repeated functionality (saved for a future PR) - use the module hierarchy to define function names, which feels more intuitive when reading and sets a convention We also introduce a `Domain.Events.Hooks` behavior to ensure all hooks comply with this simple contract, and we also introduce a convention to standardize on topic names using the module hierarchy defined herein. Lastly, we add convenience functions to the Presence modules to save a bit of duplication and chance for errors. This will make it much easier to maintain PubSub going forward. Related: #9501 --- elixir/apps/api/lib/api/client/channel.ex | 40 +-- elixir/apps/api/lib/api/gateway/channel.ex | 30 +- .../apps/api/test/api/client/channel_test.exs | 75 +++-- .../api/test/api/gateway/channel_test.exs | 17 +- elixir/apps/domain/lib/domain/clients.ex | 9 +- .../domain/lib/domain/clients/presence.ex | 77 +++++ elixir/apps/domain/lib/domain/events/hooks.ex | 9 + .../lib/domain/events/hooks/accounts.ex | 90 +---- .../events/hooks/actor_group_memberships.ex | 35 +- .../lib/domain/events/hooks/actor_groups.ex | 39 +-- .../domain/lib/domain/events/hooks/actors.ex | 57 +--- .../domain/events/hooks/auth_identities.ex | 17 +- .../lib/domain/events/hooks/auth_providers.ex | 17 +- .../domain/lib/domain/events/hooks/clients.ex | 51 +-- .../domain/events/hooks/flow_activities.ex | 17 +- .../domain/lib/domain/events/hooks/flows.ex | 34 +- .../lib/domain/events/hooks/gateway_groups.ex | 43 +-- .../lib/domain/events/hooks/gateways.ex | 53 +-- .../lib/domain/events/hooks/policies.ex | 82 +++-- .../lib/domain/events/hooks/relay_groups.ex | 17 +- .../domain/lib/domain/events/hooks/relays.ex | 17 +- .../events/hooks/resource_connections.ex | 5 + .../lib/domain/events/hooks/resources.ex | 48 +-- .../domain/lib/domain/events/hooks/tokens.ex | 24 +- .../apps/domain/lib/domain/events/topics.ex | 47 +++ elixir/apps/domain/lib/domain/gateways.ex | 21 +- .../domain/lib/domain/gateways/presence.ex | 70 ++++ elixir/apps/domain/lib/domain/pubsub.ex | 311 ++++++++++++++++++ .../apps/domain/test/domain/actors_test.exs | 2 +- .../jobs/sync_directory_test.exs | 14 +- .../jumpcloud/jobs/sync_directory_test.exs | 14 +- .../jobs/sync_directory_test.exs | 14 +- .../okta/jobs/sync_directory_test.exs | 18 +- .../apps/domain/test/domain/clients_test.exs | 20 +- .../domain/events/hooks/accounts_test.exs | 6 +- .../hooks/actor_group_memberships_test.exs | 11 +- .../test/domain/events/hooks/clients_test.exs | 27 +- .../test/domain/events/hooks/flows_test.exs | 6 +- .../domain/events/hooks/gateways_test.exs | 6 +- .../domain/events/hooks/policies_test.exs | 50 +-- .../domain/events/hooks/resources_test.exs | 39 +-- .../apps/domain/test/domain/gateways_test.exs | 18 +- .../jobs/outdated_gateways_test.exs | 15 +- elixir/apps/web/lib/web/live/actors/show.ex | 4 +- elixir/apps/web/lib/web/live/clients/index.ex | 3 +- elixir/apps/web/lib/web/live/clients/show.ex | 4 +- elixir/apps/web/lib/web/live/gateways/show.ex | 3 +- .../apps/web/lib/web/live/policies/index.ex | 4 +- elixir/apps/web/lib/web/live/policies/show.ex | 4 +- .../apps/web/lib/web/live/resources/index.ex | 4 +- .../apps/web/lib/web/live/resources/show.ex | 4 +- .../web/lib/web/live/sites/gateways/index.ex | 3 +- elixir/apps/web/lib/web/live/sites/index.ex | 3 +- .../apps/web/lib/web/live/sites/new_token.ex | 3 +- elixir/apps/web/lib/web/live/sites/show.ex | 4 +- .../web/test/web/live/actors/index_test.exs | 3 +- .../web/test/web/live/actors/show_test.exs | 5 +- .../web/test/web/live/clients/index_test.exs | 7 +- .../web/test/web/live/clients/show_test.exs | 7 +- .../web/test/web/live/gateways/show_test.exs | 7 +- .../web/test/web/live/resources/edit_test.exs | 4 +- .../web/live/sites/gateways/index_test.exs | 5 +- .../web/test/web/live/sites/index_test.exs | 9 +- .../test/web/live/sites/new_token_test.exs | 5 +- .../web/test/web/live/sites/show_test.exs | 19 +- 65 files changed, 964 insertions(+), 762 deletions(-) create mode 100644 elixir/apps/domain/lib/domain/events/hooks.ex create mode 100644 elixir/apps/domain/lib/domain/events/topics.ex diff --git a/elixir/apps/api/lib/api/client/channel.ex b/elixir/apps/api/lib/api/client/channel.ex index 212817a91..797f3d397 100644 --- a/elixir/apps/api/lib/api/client/channel.ex +++ b/elixir/apps/api/lib/api/client/channel.ex @@ -1,7 +1,7 @@ defmodule API.Client.Channel do use API, :channel alias API.Client.Views - alias Domain.{Accounts, Clients, Actors, Events, Resources, Gateways, Relays, Policies, Flows} + alias Domain.{Accounts, Clients, Actors, PubSub, Resources, Gateways, Relays, Policies, Flows} alias Domain.Relays.Presence.Debouncer require Logger require OpenTelemetry.Tracer @@ -81,8 +81,8 @@ defmodule API.Client.Channel do Enum.each(resources, fn resource -> # TODO: WAL # Why are we unsubscribing and subscribing again? - :ok = Events.Hooks.Resources.unsubscribe(resource.id) - :ok = Events.Hooks.Resources.subscribe(resource.id) + :ok = PubSub.Resource.unsubscribe(resource.id) + :ok = PubSub.Resource.subscribe(resource.id) end) # Subscribe for known gateway group names so that if they are updated - we can render change in the UI @@ -93,8 +93,8 @@ defmodule API.Client.Channel do |> Enum.each(fn gateway_group -> # TODO: WAL # Why are we unsubscribing and subscribing again? - :ok = Events.Hooks.GatewayGroups.unsubscribe(gateway_group.id) - :ok = Events.Hooks.GatewayGroups.subscribe(gateway_group.id) + :ok = PubSub.GatewayGroup.unsubscribe(gateway_group.id) + :ok = PubSub.GatewayGroup.subscribe(gateway_group.id) end) # Return all connected relays for the account @@ -128,18 +128,18 @@ defmodule API.Client.Channel do OpenTelemetry.Tracer.set_current_span(opentelemetry_span_ctx) OpenTelemetry.Tracer.with_span "client.after_join" do - :ok = Events.Hooks.Clients.connect(socket.assigns.client) + :ok = Clients.Presence.connect(socket.assigns.client) # Subscribe for account config updates - :ok = Events.Hooks.Accounts.subscribe(socket.assigns.client.account_id) + :ok = PubSub.Account.subscribe(socket.assigns.client.account_id) # We subscribe for membership updates for all actor groups the client is a member of, - :ok = Events.Hooks.Actors.subscribe_to_memberships(socket.assigns.subject.actor.id) + :ok = PubSub.Actor.Memberships.subscribe(socket.assigns.subject.actor.id) # We subscribe for policy access events for the actor and the groups the client is a member of, actor_group_ids = Actors.all_actor_group_ids!(socket.assigns.subject.actor) - :ok = Enum.each(actor_group_ids, &Events.Hooks.ActorGroups.subscribe_to_policies/1) - :ok = Events.Hooks.Actors.subscribe_to_policies(socket.assigns.subject.actor.id) + :ok = Enum.each(actor_group_ids, &PubSub.ActorGroup.Policies.subscribe/1) + :ok = PubSub.Actor.Policies.subscribe(socket.assigns.subject.actor.id) {:ok, socket} = init(socket) @@ -299,12 +299,12 @@ defmodule API.Client.Channel do # Those events are broadcasted by Actors whenever a membership is created or deleted def handle_info({:create_membership, _actor_id, group_id}, socket) do - :ok = Events.Hooks.ActorGroups.subscribe_to_policies(group_id) + :ok = PubSub.ActorGroup.Policies.subscribe(group_id) {:noreply, socket} end def handle_info({:delete_membership, _actor_id, group_id}, socket) do - :ok = Events.Hooks.ActorGroups.unsubscribe_from_policies(group_id) + :ok = PubSub.ActorGroup.Policies.unsubscribe(group_id) {:noreply, socket} end @@ -322,8 +322,8 @@ defmodule API.Client.Channel do } do # TODO: WAL # Why are we unsubscribing and subscribing again? - :ok = Events.Hooks.Resources.unsubscribe(resource_id) - :ok = Events.Hooks.Resources.subscribe(resource_id) + :ok = PubSub.Resource.unsubscribe(resource_id) + :ok = PubSub.Resource.subscribe(resource_id) case Resources.fetch_and_authorize_resource_by_id(resource_id, socket.assigns.subject, preload: [:gateway_groups] @@ -364,7 +364,7 @@ defmodule API.Client.Channel do actor_group_id: actor_group_id, resource_id: resource_id } do - :ok = Events.Hooks.Resources.unsubscribe(resource_id) + :ok = PubSub.Resource.unsubscribe(resource_id) # We potentially can re-create the flow but this will require keep tracking of client connections to gateways, # which is not worth it as this case should be pretty rare. Instead we just tell client to remove it @@ -648,7 +648,7 @@ defmodule API.Client.Channel do ice_credentials = generate_ice_credentials(socket.assigns.client, gateway) :ok = - Events.Hooks.Gateways.broadcast( + PubSub.Gateway.broadcast( gateway.id, {:authorize_flow, {self(), socket_ref(socket)}, %{ @@ -791,7 +791,7 @@ defmodule API.Client.Channel do opentelemetry_span_ctx = OpenTelemetry.Tracer.current_span_ctx() :ok = - Events.Hooks.Gateways.broadcast( + PubSub.Gateway.broadcast( gateway.id, {:allow_access, {self(), socket_ref(socket)}, %{ @@ -853,7 +853,7 @@ defmodule API.Client.Channel do opentelemetry_span_ctx = OpenTelemetry.Tracer.current_span_ctx() :ok = - Events.Hooks.Gateways.broadcast( + PubSub.Gateway.broadcast( gateway.id, {:request_connection, {self(), socket_ref(socket)}, %{ @@ -900,7 +900,7 @@ defmodule API.Client.Channel do :ok = Enum.each(gateway_ids, fn gateway_id -> - Events.Hooks.Gateways.broadcast( + PubSub.Gateway.broadcast( gateway_id, {:ice_candidates, socket.assigns.client.id, candidates, {opentelemetry_ctx, opentelemetry_span_ctx}} @@ -925,7 +925,7 @@ defmodule API.Client.Channel do :ok = Enum.each(gateway_ids, fn gateway_id -> - Events.Hooks.Gateways.broadcast( + PubSub.Gateway.broadcast( gateway_id, {:invalidate_ice_candidates, socket.assigns.client.id, candidates, {opentelemetry_ctx, opentelemetry_span_ctx}} diff --git a/elixir/apps/api/lib/api/gateway/channel.ex b/elixir/apps/api/lib/api/gateway/channel.ex index d09aae0ac..b4d76e8e7 100644 --- a/elixir/apps/api/lib/api/gateway/channel.ex +++ b/elixir/apps/api/lib/api/gateway/channel.ex @@ -1,7 +1,7 @@ defmodule API.Gateway.Channel do use API, :channel alias API.Gateway.Views - alias Domain.{Clients, Events, Resources, Relays, Flows} + alias Domain.{Clients, Gateways, PubSub, Resources, Relays, Flows} alias Domain.Relays.Presence.Debouncer require Logger require OpenTelemetry.Tracer @@ -36,7 +36,7 @@ defmodule API.Gateway.Channel do OpenTelemetry.Tracer.set_current_span(opentelemetry_span_ctx) OpenTelemetry.Tracer.with_span "gateway.after_join" do - :ok = Events.Hooks.Gateways.connect(socket.assigns.gateway) + :ok = Gateways.Presence.connect(socket.assigns.gateway) config = Domain.Config.fetch_env!(:domain, Domain.Gateways) ipv4_masquerade_enabled? = Keyword.fetch!(config, :gateway_ipv4_masquerade) @@ -117,7 +117,7 @@ defmodule API.Gateway.Channel do # This event is ignored because we will receive a reject_access message from # the Flows which will trigger a reject_access event def handle_info({:delete_resource, resource_id}, socket) do - :ok = Events.Hooks.Resources.unsubscribe(resource_id) + :ok = PubSub.Resource.unsubscribe(resource_id) {:noreply, socket} end @@ -134,7 +134,7 @@ defmodule API.Gateway.Channel do client_id: client_id, resource_id: resource_id } do - :ok = Events.Hooks.Flows.unsubscribe(flow_id) + :ok = PubSub.Flow.unsubscribe(flow_id) push(socket, "reject_access", %{ flow_id: flow_id, @@ -311,7 +311,7 @@ defmodule API.Gateway.Channel do } = payload OpenTelemetry.Tracer.with_span "gateway.authorize_flow" do - :ok = Events.Hooks.Flows.subscribe(flow_id) + :ok = PubSub.Flow.subscribe(flow_id) Logger.debug("Gateway authorizes a new network flow", flow_id: flow_id, @@ -324,8 +324,8 @@ defmodule API.Gateway.Channel do # TODO: WAL # Why are we unsubscribing and subscribing again? - :ok = Events.Hooks.Resources.unsubscribe(resource_id) - :ok = Events.Hooks.Resources.subscribe(resource_id) + :ok = PubSub.Resource.unsubscribe(resource_id) + :ok = PubSub.Resource.subscribe(resource_id) opentelemetry_headers = :otel_propagator_text_map.inject([]) @@ -384,7 +384,7 @@ defmodule API.Gateway.Channel do client_id: client_id, resource_id: resource_id } do - :ok = Events.Hooks.Flows.subscribe(flow_id) + :ok = PubSub.Flow.subscribe(flow_id) client = Clients.fetch_client_by_id!(client_id) resource = Resources.fetch_resource_by_id!(resource_id) @@ -396,8 +396,8 @@ defmodule API.Gateway.Channel do {:cont, resource} -> # TODO: WAL # Why are we unsubscribing and subscribing again? - :ok = Events.Hooks.Resources.unsubscribe(resource_id) - :ok = Events.Hooks.Resources.subscribe(resource_id) + :ok = PubSub.Resource.unsubscribe(resource_id) + :ok = PubSub.Resource.subscribe(resource_id) opentelemetry_headers = :otel_propagator_text_map.inject([]) ref = encode_ref(socket, {channel_pid, socket_ref, resource_id, opentelemetry_headers}) @@ -451,7 +451,7 @@ defmodule API.Gateway.Channel do } = attrs OpenTelemetry.Tracer.with_span "gateway.request_connection" do - :ok = Events.Hooks.Flows.subscribe(flow_id) + :ok = PubSub.Flow.subscribe(flow_id) Logger.debug("Gateway received connection request message", client_id: client_id, @@ -468,8 +468,8 @@ defmodule API.Gateway.Channel do {:cont, resource} -> # TODO: WAL # Why are we unsubscribing and subscribing again? - :ok = Events.Hooks.Resources.unsubscribe(resource_id) - :ok = Events.Hooks.Resources.subscribe(resource_id) + :ok = PubSub.Resource.unsubscribe(resource_id) + :ok = PubSub.Resource.subscribe(resource_id) opentelemetry_headers = :otel_propagator_text_map.inject([]) ref = encode_ref(socket, {channel_pid, socket_ref, resource_id, opentelemetry_headers}) @@ -609,7 +609,7 @@ defmodule API.Gateway.Channel do :ok = Enum.each(client_ids, fn client_id -> - Events.Hooks.Clients.broadcast( + PubSub.Client.broadcast( client_id, {:ice_candidates, socket.assigns.gateway.id, candidates, {opentelemetry_ctx, opentelemetry_span_ctx}} @@ -634,7 +634,7 @@ defmodule API.Gateway.Channel do :ok = Enum.each(client_ids, fn client_id -> - Events.Hooks.Clients.broadcast( + PubSub.Client.broadcast( client_id, {:invalidate_ice_candidates, socket.assigns.gateway.id, candidates, {opentelemetry_ctx, opentelemetry_span_ctx}} diff --git a/elixir/apps/api/test/api/client/channel_test.exs b/elixir/apps/api/test/api/client/channel_test.exs index 3e342a7ea..e4a4e5689 100644 --- a/elixir/apps/api/test/api/client/channel_test.exs +++ b/elixir/apps/api/test/api/client/channel_test.exs @@ -1,6 +1,6 @@ defmodule API.Client.ChannelTest do use API.ChannelCase, async: true - alias Domain.Events + alias Domain.{Clients, Events, PubSub} setup do account = @@ -168,8 +168,7 @@ defmodule API.Client.ChannelTest do describe "join/3" do test "tracks presence after join", %{account: account, client: client} do - presence = - Domain.Clients.Presence.list(Events.Hooks.Accounts.clients_presence_topic(account.id)) + presence = Clients.Presence.Account.list(account.id) assert %{metas: [%{online_at: online_at, phx_ref: _ref}]} = Map.fetch!(presence, client.id) assert is_number(online_at) @@ -502,7 +501,7 @@ defmodule API.Client.ChannelTest do } do assert_push "init", %{} Process.flag(:trap_exit, true) - Events.Hooks.Clients.broadcast(client.id, :token_expired) + PubSub.Client.broadcast(client.id, :token_expired) assert_push "disconnect", %{reason: :token_expired}, 250 end @@ -1128,7 +1127,7 @@ defmodule API.Client.ChannelTest do resource = Fixtures.Resources.create_resource(account: account) gateway = Fixtures.Gateways.create_gateway(account: account) - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) attrs = %{ "resource_id" => resource.id, @@ -1175,7 +1174,7 @@ defmodule API.Client.ChannelTest do "connected_gateway_ids" => [] } - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) push(socket, "create_flow", attrs) # assert_reply ref, :error, %{reason: :not_found} @@ -1195,7 +1194,7 @@ defmodule API.Client.ChannelTest do socket: socket } do gateway = Fixtures.Gateways.create_gateway(account: account) - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) push(socket, "create_flow", %{ "resource_id" => resource.id, @@ -1235,7 +1234,7 @@ defmodule API.Client.ChannelTest do last_seen_at: DateTime.utc_now() |> DateTime.add(-10, :second) ) - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) Domain.PubSub.subscribe(Domain.Tokens.socket_id(gateway_group_token)) push(socket, "create_flow", %{ @@ -1290,7 +1289,7 @@ defmodule API.Client.ChannelTest do last_seen_at: DateTime.utc_now() |> DateTime.add(-10, :second) ) - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) Domain.PubSub.subscribe(Domain.Tokens.socket_id(gateway_group_token)) push(socket, "create_flow", %{ @@ -1340,7 +1339,7 @@ defmodule API.Client.ChannelTest do last_seen_at: DateTime.utc_now() |> DateTime.add(-10, :second) ) - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) Domain.PubSub.subscribe(Domain.Tokens.socket_id(gateway_group_token)) push(socket, "create_flow", %{ @@ -1444,7 +1443,7 @@ defmodule API.Client.ChannelTest do last_seen_at: DateTime.utc_now() |> DateTime.add(-10, :second) ) - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) Domain.PubSub.subscribe(Domain.Tokens.socket_id(gateway_group_token)) push(socket, "create_flow", %{ @@ -1490,7 +1489,7 @@ defmodule API.Client.ChannelTest do ) ) - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) {:ok, _reply, socket} = API.Client.Socket @@ -1525,7 +1524,7 @@ defmodule API.Client.ChannelTest do ) ) - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) push(socket, "create_flow", %{ "resource_id" => resource.id, @@ -1562,7 +1561,7 @@ defmodule API.Client.ChannelTest do group: gateway_group ) - :ok = Events.Hooks.Gateways.connect(gateway1) + :ok = Domain.Gateways.Presence.connect(gateway1) gateway2 = Fixtures.Gateways.create_gateway( @@ -1570,7 +1569,7 @@ defmodule API.Client.ChannelTest do group: gateway_group ) - :ok = Events.Hooks.Gateways.connect(gateway2) + :ok = Domain.Gateways.Presence.connect(gateway2) push(socket, "create_flow", %{ "resource_id" => resource.id, @@ -1621,7 +1620,7 @@ defmodule API.Client.ChannelTest do resource = Fixtures.Resources.create_resource(account: account) gateway = Fixtures.Gateways.create_gateway(account: account) - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) attrs = %{ "resource_id" => resource.id @@ -1637,7 +1636,7 @@ defmodule API.Client.ChannelTest do socket: socket } do gateway = Fixtures.Gateways.create_gateway(account: account) - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) ref = push(socket, "prepare_connection", %{"resource_id" => resource.id}) assert_reply ref, :error, %{reason: :offline} @@ -1664,7 +1663,7 @@ defmodule API.Client.ChannelTest do last_seen_at: DateTime.utc_now() |> DateTime.add(-10, :second) ) - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) ref = push(socket, "prepare_connection", %{"resource_id" => resource.id}) resource_id = resource.id @@ -1686,7 +1685,7 @@ defmodule API.Client.ChannelTest do socket: socket } do gateway = Fixtures.Gateways.create_gateway(account: account) - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) ref = push(socket, "prepare_connection", %{"resource_id" => dns_resource.id}) assert_reply ref, :error, %{reason: :offline} @@ -1733,7 +1732,7 @@ defmodule API.Client.ChannelTest do resource: resource ) - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) ref = push(socket, "prepare_connection", %{"resource_id" => resource.id}) resource_id = resource.id @@ -1753,7 +1752,7 @@ defmodule API.Client.ChannelTest do last_seen_at: DateTime.utc_now() |> DateTime.add(-10, :second) ) - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) ref = push(socket, "prepare_connection", %{"resource_id" => resource.id}) @@ -1798,7 +1797,7 @@ defmodule API.Client.ChannelTest do } ) - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) ref = push(socket, "prepare_connection", %{"resource_id" => resource.id}) resource_id = resource.id @@ -1818,7 +1817,7 @@ defmodule API.Client.ChannelTest do last_seen_at: DateTime.utc_now() |> DateTime.add(-10, :second) ) - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) ref = push(socket, "prepare_connection", %{"resource_id" => resource.id}) @@ -1870,7 +1869,7 @@ defmodule API.Client.ChannelTest do last_seen_at: DateTime.utc_now() |> DateTime.add(-10, :second) ) - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) ref = push(socket, "prepare_connection", %{"resource_id" => resource.id}) @@ -1912,7 +1911,7 @@ defmodule API.Client.ChannelTest do ) ) - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) {:ok, _reply, socket} = API.Client.Socket @@ -1939,7 +1938,7 @@ defmodule API.Client.ChannelTest do ) ) - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) ref = push(socket, "prepare_connection", %{"resource_id" => resource.id}) @@ -1976,7 +1975,7 @@ defmodule API.Client.ChannelTest do socket: socket } do gateway = Fixtures.Gateways.create_gateway(account: account) - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) attrs = %{ "resource_id" => resource.id, @@ -2021,7 +2020,7 @@ defmodule API.Client.ChannelTest do "payload" => "DNS_Q" } - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) ref = push(socket, "reuse_connection", attrs) @@ -2038,7 +2037,7 @@ defmodule API.Client.ChannelTest do resource = Fixtures.Resources.create_resource(account: account) gateway = Fixtures.Gateways.create_gateway(account: account) - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) attrs = %{ "resource_id" => resource.id, @@ -2075,7 +2074,7 @@ defmodule API.Client.ChannelTest do resource_id = resource.id client_id = client.id - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) attrs = %{ "resource_id" => resource.id, @@ -2135,7 +2134,7 @@ defmodule API.Client.ChannelTest do }) |> subscribe_and_join(API.Client.Channel, "client") - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) Phoenix.PubSub.subscribe(Domain.PubSub, Domain.Tokens.socket_id(gateway_group_token)) push(socket, "reuse_connection", %{ @@ -2179,7 +2178,7 @@ defmodule API.Client.ChannelTest do socket: socket } do gateway = Fixtures.Gateways.create_gateway(account: account) - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) attrs = %{ "resource_id" => resource.id, @@ -2199,7 +2198,7 @@ defmodule API.Client.ChannelTest do resource = Fixtures.Resources.create_resource(account: account) gateway = Fixtures.Gateways.create_gateway(account: account) - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) attrs = %{ "resource_id" => resource.id, @@ -2246,7 +2245,7 @@ defmodule API.Client.ChannelTest do "client_preshared_key" => "PSK" } - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) ref = push(socket, "request_connection", attrs) @@ -2283,7 +2282,7 @@ defmodule API.Client.ChannelTest do resource_id = resource.id client_id = client.id - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) Domain.PubSub.subscribe(Domain.Tokens.socket_id(gateway_group_token)) attrs = %{ @@ -2346,7 +2345,7 @@ defmodule API.Client.ChannelTest do }) |> subscribe_and_join(API.Client.Channel, "client") - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) Phoenix.PubSub.subscribe(Domain.PubSub, Domain.Tokens.socket_id(gateway_group_token)) push(socket, "request_connection", %{ @@ -2388,7 +2387,7 @@ defmodule API.Client.ChannelTest do "gateway_ids" => [gateway.id] } - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) Domain.PubSub.subscribe(Domain.Tokens.socket_id(gateway_group_token)) push(socket, "broadcast_ice_candidates", attrs) @@ -2426,7 +2425,7 @@ defmodule API.Client.ChannelTest do "gateway_ids" => [gateway.id] } - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) Domain.PubSub.subscribe(Domain.Tokens.socket_id(gateway_group_token)) push(socket, "broadcast_invalidated_ice_candidates", attrs) diff --git a/elixir/apps/api/test/api/gateway/channel_test.exs b/elixir/apps/api/test/api/gateway/channel_test.exs index ce2d478d3..cc67a7ac2 100644 --- a/elixir/apps/api/test/api/gateway/channel_test.exs +++ b/elixir/apps/api/test/api/gateway/channel_test.exs @@ -1,6 +1,6 @@ defmodule API.Gateway.ChannelTest do use API.ChannelCase, async: true - alias Domain.Events + alias Domain.{Events, Gateways, PubSub} setup do account = Fixtures.Accounts.create_account() @@ -9,7 +9,7 @@ defmodule API.Gateway.ChannelTest do subject = Fixtures.Auth.create_subject(identity: identity) client = Fixtures.Clients.create_client(subject: subject) gateway = Fixtures.Gateways.create_gateway(account: account) - {:ok, gateway_group} = Domain.Gateways.fetch_group_by_id(gateway.group_id, subject) + {:ok, gateway_group} = Gateways.fetch_group_by_id(gateway.group_id, subject) resource = Fixtures.Resources.create_resource( @@ -52,10 +52,7 @@ defmodule API.Gateway.ChannelTest do describe "join/3" do test "tracks presence after join", %{account: account, gateway: gateway} do - presence = - account.id - |> Events.Hooks.Accounts.gateways_presence_topic() - |> Domain.Gateways.Presence.list() + presence = Gateways.Presence.Account.list(account.id) assert %{metas: [%{online_at: online_at, phx_ref: _ref}]} = Map.fetch!(presence, gateway.id) assert is_number(online_at) @@ -1136,8 +1133,8 @@ defmodule API.Gateway.ChannelTest do "client_ids" => [client.id] } - :ok = Events.Hooks.Clients.connect(client) - Domain.PubSub.subscribe(Domain.Tokens.socket_id(subject.token_id)) + :ok = Domain.Clients.Presence.connect(client) + PubSub.subscribe(Domain.Tokens.socket_id(subject.token_id)) push(socket, "broadcast_ice_candidates", attrs) @@ -1174,8 +1171,8 @@ defmodule API.Gateway.ChannelTest do "client_ids" => [client.id] } - :ok = Events.Hooks.Clients.connect(client) - Domain.PubSub.subscribe(Domain.Tokens.socket_id(subject.token_id)) + :ok = Domain.Clients.Presence.connect(client) + PubSub.subscribe(Domain.Tokens.socket_id(subject.token_id)) push(socket, "broadcast_invalidated_ice_candidates", attrs) diff --git a/elixir/apps/domain/lib/domain/clients.ex b/elixir/apps/domain/lib/domain/clients.ex index b947aba71..f03b39190 100644 --- a/elixir/apps/domain/lib/domain/clients.ex +++ b/elixir/apps/domain/lib/domain/clients.ex @@ -1,7 +1,7 @@ defmodule Domain.Clients do use Supervisor alias Domain.{Repo, Auth} - alias Domain.{Accounts, Actors, Events, Flows} + alias Domain.{Accounts, Actors, Flows} alias Domain.Clients.{Client, Authorizer, Presence} require Ecto.Query @@ -106,9 +106,7 @@ defmodule Domain.Clients do @doc false def preload_clients_presence([client]) do - client.account_id - |> Events.Hooks.Accounts.clients_presence_topic() - |> Presence.get_by_key(client.id) + Presence.Account.get(client.account_id, client.id) |> case do [] -> %{client | online?: false} %{metas: [_ | _]} -> %{client | online?: true} @@ -135,8 +133,7 @@ defmodule Domain.Clients do def online_client_ids(account_id) do account_id - |> Events.Hooks.Accounts.clients_presence_topic() - |> Presence.list() + |> Presence.Account.list() |> Map.keys() end diff --git a/elixir/apps/domain/lib/domain/clients/presence.ex b/elixir/apps/domain/lib/domain/clients/presence.ex index b0bf4f34d..4ff587178 100644 --- a/elixir/apps/domain/lib/domain/clients/presence.ex +++ b/elixir/apps/domain/lib/domain/clients/presence.ex @@ -2,4 +2,81 @@ defmodule Domain.Clients.Presence do use Phoenix.Presence, otp_app: :domain, pubsub_server: Domain.PubSub + + alias Domain.PubSub + alias Domain.Clients.Client + + def connect(%Client{} = client) do + with {:ok, _} <- __MODULE__.Account.track(client.account_id, client.id), + {:ok, _} <- __MODULE__.Actor.track(client.actor_id, client.id) do + :ok = PubSub.Client.subscribe(client.id) + :ok = PubSub.Account.Clients.subscribe(client.account_id) + end + end + + defmodule Account do + def track(account_id, client_id) do + Domain.Clients.Presence.track( + self(), + topic(account_id), + client_id, + %{online_at: System.system_time(:second)} + ) + end + + def subscribe(account_id) do + account_id + |> topic() + |> PubSub.subscribe() + end + + def get(account_id, client_id) do + account_id + |> topic() + |> Domain.Clients.Presence.get_by_key(client_id) + end + + def list(account_id) do + account_id + |> topic() + |> Domain.Clients.Presence.list() + end + + defp topic(account_id) do + "presences:account_clients:" <> account_id + end + end + + defmodule Actor do + def track(actor_id, client_id) do + Domain.Clients.Presence.track( + self(), + topic(actor_id), + client_id, + %{} + ) + end + + def get(actor_id, client_id) do + actor_id + |> topic() + |> Domain.Clients.Presence.get_by_key(client_id) + end + + def list(actor_id) do + actor_id + |> topic() + |> Domain.Clients.Presence.list() + end + + def subscribe(actor_id) do + actor_id + |> topic() + |> PubSub.subscribe() + end + + defp topic(actor_id) do + "presences:actor_clients:" <> actor_id + end + end end diff --git a/elixir/apps/domain/lib/domain/events/hooks.ex b/elixir/apps/domain/lib/domain/events/hooks.ex new file mode 100644 index 000000000..7ee419653 --- /dev/null +++ b/elixir/apps/domain/lib/domain/events/hooks.ex @@ -0,0 +1,9 @@ +defmodule Domain.Events.Hooks do + @moduledoc """ + A simple behavior to define hooks needed for processing WAL events. + """ + + @callback on_insert(data :: map()) :: :ok + @callback on_update(old_data :: map(), data :: map()) :: :ok + @callback on_delete(old_data :: map()) :: :ok +end diff --git a/elixir/apps/domain/lib/domain/events/hooks/accounts.ex b/elixir/apps/domain/lib/domain/events/hooks/accounts.ex index 0b7214f7d..676154feb 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/accounts.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/accounts.ex @@ -1,109 +1,31 @@ defmodule Domain.Events.Hooks.Accounts do + @behaviour Domain.Events.Hooks alias Domain.PubSub require Logger + @impl true def on_insert(_data), do: :ok # Account disabled - disconnect clients + @impl true def on_update( %{"disabled_at" => nil} = _old_data, %{"disabled_at" => disabled_at, "id" => account_id} = _data ) when not is_nil(disabled_at) do - disconnect_clients(account_id) + PubSub.Account.Clients.disconnect(account_id) end def on_update(%{"config" => old_config}, %{"config" => config, "id" => account_id}) do if old_config != config do - broadcast(account_id, :config_changed) + PubSub.Account.broadcast(account_id, :config_changed) else :ok end end + @impl true def on_delete(_old_data) do :ok end - - def subscribe(account_id) do - PubSub.subscribe("accounts:#{account_id}") - end - - def subscribe_to_clients(account_id) do - account_id - |> clients_topic() - |> PubSub.subscribe() - end - - def subscribe_to_resources(account_id) do - account_id - |> resources_topic() - |> PubSub.subscribe() - end - - def subscribe_to_policies(account_id) do - account_id - |> policies_topic() - |> PubSub.subscribe() - end - - def subscribe_to_clients_presence(account_id) do - account_id - |> clients_presence_topic() - |> PubSub.subscribe() - end - - def subscribe_to_gateways_presence(account_id) do - account_id - |> gateways_presence_topic() - |> PubSub.subscribe() - end - - def clients_presence_topic(account_id) do - "presences:#{clients_topic(account_id)}" - end - - def clients_topic(account_id) do - "account_clients:#{account_id}" - end - - def gateways_presence_topic(account_id) do - "presences:account_gateways:#{account_id}" - end - - def broadcast_to_resources(account_id, payload) do - account_id - |> resources_topic() - |> PubSub.broadcast(payload) - end - - def broadcast_to_policies(account_id, payload) do - account_id - |> policies_topic() - |> PubSub.broadcast(payload) - end - - defp resources_topic(account_id) do - "account_resources:#{account_id}" - end - - defp policies_topic(account_id) do - "account_policies:#{account_id}" - end - - defp topic(account_id) do - "accounts:#{account_id}" - end - - defp broadcast(account_id, event) do - account_id - |> topic() - |> PubSub.broadcast(event) - end - - defp disconnect_clients(account_id) do - account_id - |> clients_topic() - |> PubSub.broadcast("disconnect") - end end diff --git a/elixir/apps/domain/lib/domain/events/hooks/actor_group_memberships.ex b/elixir/apps/domain/lib/domain/events/hooks/actor_group_memberships.ex index eb323060e..5d4958e80 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/actor_group_memberships.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/actor_group_memberships.ex @@ -1,39 +1,38 @@ defmodule Domain.Events.Hooks.ActorGroupMemberships do - alias Domain.{Events, Flows, Policies, PubSub, Repo} + @behaviour Domain.Events.Hooks + alias Domain.{Flows, Policies, PubSub, Repo} + @impl true def on_insert(%{"actor_id" => actor_id, "group_id" => group_id} = _data) do - broadcast_access(:allow, actor_id, group_id) - broadcast(:create, actor_id, group_id) - end - - def on_update(_old_data, _data), do: :ok - - def on_delete(%{"actor_id" => actor_id, "group_id" => group_id} = _old_data) do Task.start(fn -> - {:ok, _flows} = Flows.expire_flows_for(actor_id, group_id) - broadcast_access(:reject, actor_id, group_id) - broadcast(:delete, actor_id, group_id) + :ok = PubSub.Actor.Memberships.broadcast(actor_id, {:create_membership, actor_id, group_id}) + broadcast_access(:allow, actor_id, group_id) end) :ok end - def broadcast(action, actor_id, group_id) do - payload = {:"#{action}_membership", actor_id, group_id} - topic = Events.Hooks.Actors.memberships_topic(actor_id) + @impl true + def on_update(_old_data, _data), do: :ok - :ok = PubSub.broadcast(topic, payload) + @impl true + def on_delete(%{"actor_id" => actor_id, "group_id" => group_id} = _old_data) do + Task.start(fn -> + {:ok, _flows} = Flows.expire_flows_for(actor_id, group_id) + :ok = PubSub.Actor.Memberships.broadcast(actor_id, {:delete_membership, actor_id, group_id}) + broadcast_access(:reject, actor_id, group_id) + end) + + :ok end defp broadcast_access(action, actor_id, group_id) do - # TODO: WAL - # This N+1 query will go away when we broadcast flows directly Policies.Policy.Query.not_deleted() |> Policies.Policy.Query.by_actor_group_id(group_id) |> Repo.all() |> Enum.each(fn policy -> payload = {:"#{action}_access", policy.id, policy.actor_group_id, policy.resource_id} - :ok = Events.Hooks.Actors.broadcast_to_policies(actor_id, payload) + :ok = PubSub.Actor.Policies.broadcast(actor_id, payload) end) end end diff --git a/elixir/apps/domain/lib/domain/events/hooks/actor_groups.ex b/elixir/apps/domain/lib/domain/events/hooks/actor_groups.ex index b9938b142..ebf195081 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/actor_groups.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/actor_groups.ex @@ -1,37 +1,12 @@ defmodule Domain.Events.Hooks.ActorGroups do - alias Domain.PubSub + @behaviour Domain.Events.Hooks - def on_insert(_data) do - :ok - end + @impl true + def on_insert(_data), do: :ok - def on_update(_old_data, _data) do - :ok - end + @impl true + def on_update(_old_data, _data), do: :ok - def on_delete(_old_data) do - :ok - end - - def subscribe_to_policies(actor_group_id) do - actor_group_id - |> policies_topic() - |> PubSub.subscribe() - end - - def unsubscribe_from_policies(actor_group_id) do - actor_group_id - |> policies_topic() - |> PubSub.unsubscribe() - end - - def broadcast_to_policies(actor_group_id, payload) do - actor_group_id - |> policies_topic() - |> PubSub.broadcast(payload) - end - - defp policies_topic(actor_group_id) do - "actor_group_policies:#{actor_group_id}" - end + @impl true + def on_delete(_old_data), do: :ok end diff --git a/elixir/apps/domain/lib/domain/events/hooks/actors.ex b/elixir/apps/domain/lib/domain/events/hooks/actors.ex index 280b764c9..325a09da2 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/actors.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/actors.ex @@ -1,55 +1,12 @@ defmodule Domain.Events.Hooks.Actors do - alias Domain.PubSub + @behaviour Domain.Events.Hooks - def on_insert(_data) do - :ok - end + @impl true + def on_insert(_data), do: :ok - def on_update(_old_data, _data) do - :ok - end + @impl true + def on_update(_old_data, _data), do: :ok - def on_delete(_old_data) do - :ok - end - - def subscribe_to_clients_presence(actor_id) do - actor_id - |> clients_presence_topic() - |> PubSub.subscribe() - end - - def clients_presence_topic(actor_id) do - "presences:#{clients_topic(actor_id)}" - end - - def subscribe_to_memberships(actor_id) do - actor_id - |> memberships_topic() - |> PubSub.subscribe() - end - - def memberships_topic(actor_id) do - "actor_memberships:#{actor_id}" - end - - def subscribe_to_policies(actor_id) do - actor_id - |> policies_topic() - |> PubSub.subscribe() - end - - def broadcast_to_policies(actor_id, payload) do - actor_id - |> policies_topic() - |> PubSub.broadcast(payload) - end - - defp policies_topic(actor_id) do - "actor_policies:#{actor_id}" - end - - defp clients_topic(actor_id) do - "actor_clients:#{actor_id}" - end + @impl true + def on_delete(_old_data), do: :ok end diff --git a/elixir/apps/domain/lib/domain/events/hooks/auth_identities.ex b/elixir/apps/domain/lib/domain/events/hooks/auth_identities.ex index 24e7ca89a..8dd1cd6b8 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/auth_identities.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/auth_identities.ex @@ -1,13 +1,12 @@ defmodule Domain.Events.Hooks.AuthIdentities do - def on_insert(_data) do - :ok - end + @behaviour Domain.Events.Hooks - def on_update(_old_data, _data) do - :ok - end + @impl true + def on_insert(_data), do: :ok - def on_delete(_old_data) do - :ok - end + @impl true + def on_update(_old_data, _data), do: :ok + + @impl true + def on_delete(_old_data), do: :ok end diff --git a/elixir/apps/domain/lib/domain/events/hooks/auth_providers.ex b/elixir/apps/domain/lib/domain/events/hooks/auth_providers.ex index 629023461..756b46aca 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/auth_providers.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/auth_providers.ex @@ -1,13 +1,12 @@ defmodule Domain.Events.Hooks.AuthProviders do - def on_insert(_data) do - :ok - end + @behaviour Domain.Events.Hooks - def on_update(_old_data, _data) do - :ok - end + @impl true + def on_insert(_data), do: :ok - def on_delete(_old_data) do - :ok - end + @impl true + def on_update(_old_data, _data), do: :ok + + @impl true + def on_delete(_old_data), do: :ok end diff --git a/elixir/apps/domain/lib/domain/events/hooks/clients.ex b/elixir/apps/domain/lib/domain/events/hooks/clients.ex index 912487837..b91ba1e71 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/clients.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/clients.ex @@ -1,13 +1,12 @@ defmodule Domain.Events.Hooks.Clients do + @behaviour Domain.Events.Hooks alias Domain.PubSub - alias Domain.Clients.{Client, Presence} - alias Domain.Events - def on_insert(_data) do - :ok - end + @impl true + def on_insert(_data), do: :ok # Soft-delete + @impl true def on_update(%{"deleted_at" => nil} = old_data, %{"deleted_at" => deleted_at} = _data) when not is_nil(deleted_at) do on_delete(old_data) @@ -15,47 +14,11 @@ defmodule Domain.Events.Hooks.Clients do # Regular update def on_update(_old_data, %{"id" => client_id} = _data) do - broadcast(client_id, :updated) + PubSub.Client.broadcast(client_id, :updated) end + @impl true def on_delete(%{"id" => client_id} = _old_data) do - disconnect_client(client_id) - end - - def connect(%Client{} = client) do - with {:ok, _} <- - Presence.track( - self(), - Events.Hooks.Accounts.clients_presence_topic(client.account_id), - client.id, - %{ - online_at: System.system_time(:second) - } - ), - {:ok, _} <- - Presence.track( - self(), - Events.Hooks.Actors.clients_presence_topic(client.actor_id), - client.id, - %{} - ) do - :ok = PubSub.subscribe(topic(client.id)) - :ok = Events.Hooks.Accounts.subscribe_to_clients(client.account_id) - :ok - end - end - - ### PubSub - - def broadcast(client_id, payload) do - client_id - |> topic() - |> PubSub.broadcast(payload) - end - - defp topic(client_id), do: "clients:#{client_id}" - - defp disconnect_client(client_id) do - broadcast(client_id, "disconnect") + PubSub.Client.disconnect(client_id) end end diff --git a/elixir/apps/domain/lib/domain/events/hooks/flow_activities.ex b/elixir/apps/domain/lib/domain/events/hooks/flow_activities.ex index a1eaa45d9..e813ec64c 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/flow_activities.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/flow_activities.ex @@ -1,13 +1,12 @@ defmodule Domain.Events.Hooks.FlowActivities do - def on_insert(_data) do - :ok - end + @behaviour Domain.Events.Hooks - def on_update(_old_data, _data) do - :ok - end + @impl true + def on_insert(_data), do: :ok - def on_delete(_old_data) do - :ok - end + @impl true + def on_update(_old_data, _data), do: :ok + + @impl true + def on_delete(_old_data), do: :ok end diff --git a/elixir/apps/domain/lib/domain/events/hooks/flows.ex b/elixir/apps/domain/lib/domain/events/hooks/flows.ex index a0367cb3c..c6fbcac3f 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/flows.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/flows.ex @@ -1,11 +1,12 @@ defmodule Domain.Events.Hooks.Flows do + @behaviour Domain.Events.Hooks alias Domain.PubSub require Logger - def on_insert(_data) do - :ok - end + @impl true + def on_insert(_data), do: :ok + @impl true def on_update( _old_data, %{ @@ -17,7 +18,7 @@ defmodule Domain.Events.Hooks.Flows do ) do if expired?(expires_at) do # Flow has become expired - broadcast(flow_id, {:expire_flow, flow_id, client_id, resource_id}) + PubSub.Flow.broadcast(flow_id, {:expire_flow, flow_id, client_id, resource_id}) else :ok end @@ -25,6 +26,7 @@ defmodule Domain.Events.Hooks.Flows do # During normal operation we don't expect to delete flows, however, this is implemented as a safeguard for cases # where we might manually clear flows in a migration or some other mechanism. + @impl true def on_delete( %{ "id" => flow_id, @@ -32,19 +34,7 @@ defmodule Domain.Events.Hooks.Flows do "resource_id" => resource_id } = _old_data ) do - broadcast(flow_id, {:expire_flow, flow_id, client_id, resource_id}) - end - - def subscribe(flow_id) do - flow_id - |> topic() - |> PubSub.subscribe() - end - - def unsubscribe(flow_id) do - flow_id - |> topic() - |> PubSub.unsubscribe() + PubSub.Flow.broadcast(flow_id, {:expire_flow, flow_id, client_id, resource_id}) end defp expired?(nil), do: false @@ -56,14 +46,4 @@ defmodule Domain.Events.Hooks.Flows do _ -> false end end - - defp topic(flow_id) do - "flows:#{flow_id}" - end - - defp broadcast(flow_id, payload) do - flow_id - |> topic() - |> PubSub.broadcast(payload) - end end diff --git a/elixir/apps/domain/lib/domain/events/hooks/gateway_groups.ex b/elixir/apps/domain/lib/domain/events/hooks/gateway_groups.ex index 861dcde05..f717f7494 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/gateway_groups.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/gateway_groups.ex @@ -1,41 +1,12 @@ defmodule Domain.Events.Hooks.GatewayGroups do - alias Domain.PubSub + @behaviour Domain.Events.Hooks - def on_insert(_data) do - :ok - end + @impl true + def on_insert(_data), do: :ok - def on_update(_old_data, _data) do - :ok - end + @impl true + def on_update(_old_data, _data), do: :ok - def on_delete(_old_data) do - :ok - end - - def subscribe(gateway_group_id) do - gateway_group_id - |> topic() - |> PubSub.subscribe() - end - - def subscribe_to_presence(gateway_group_id) do - gateway_group_id - |> presence_topic() - |> PubSub.subscribe() - end - - def unsubscribe(gateway_group_id) do - gateway_group_id - |> topic() - |> PubSub.unsubscribe() - end - - def presence_topic(gateway_group_id) do - "presences:#{topic(gateway_group_id)}" - end - - defp topic(gateway_group_id) do - "group_gateways:#{gateway_group_id}" - end + @impl true + def on_delete(_old_data), do: :ok end diff --git a/elixir/apps/domain/lib/domain/events/hooks/gateways.ex b/elixir/apps/domain/lib/domain/events/hooks/gateways.ex index 6c55cd40c..6daad5e4c 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/gateways.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/gateways.ex @@ -1,61 +1,22 @@ defmodule Domain.Events.Hooks.Gateways do + @behaviour Domain.Events.Hooks alias Domain.PubSub - alias Domain.Gateways - alias Domain.Events - def on_insert(_data) do - :ok - end + @impl true + def on_insert(_data), do: :ok # Soft-delete + @impl true def on_update(%{"deleted_at" => nil} = old_data, %{"deleted_at" => deleted_at} = _data) when not is_nil(deleted_at) do on_delete(old_data) end # Regular update - def on_update(_old_data, _data) do - :ok - end + def on_update(_old_data, _data), do: :ok + @impl true def on_delete(%{"id" => gateway_id} = _old_data) do - disconnect(gateway_id) - end - - def connect(%Gateways.Gateway{} = gateway) do - with {:ok, _} <- - Gateways.Presence.track( - self(), - Events.Hooks.GatewayGroups.presence_topic(gateway.group_id), - gateway.id, - %{} - ), - {:ok, _} <- - Gateways.Presence.track( - self(), - Events.Hooks.Accounts.gateways_presence_topic(gateway.account_id), - gateway.id, - %{ - online_at: System.system_time(:second) - } - ) do - :ok = PubSub.subscribe(topic(gateway.id)) - :ok - end - end - - def broadcast(gateway_id, payload) do - gateway_id - |> topic() - |> PubSub.broadcast(payload) - end - - defp disconnect(gateway_id) do - gateway_id - |> broadcast("disconnect") - end - - defp topic(gateway_id) do - "gateways:#{gateway_id}" + PubSub.Gateway.disconnect(gateway_id) end end diff --git a/elixir/apps/domain/lib/domain/events/hooks/policies.ex b/elixir/apps/domain/lib/domain/events/hooks/policies.ex index 1ccea7c23..77d4f38fc 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/policies.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/policies.ex @@ -1,7 +1,9 @@ defmodule Domain.Events.Hooks.Policies do - alias Domain.{Events, Flows, PubSub} + @behaviour Domain.Events.Hooks + alias Domain.{Flows, PubSub} require Logger + @impl true def on_insert( %{ "id" => policy_id, @@ -14,12 +16,15 @@ defmodule Domain.Events.Hooks.Policies do # TODO: WAL # Creating a policy should broadcast directly to subscribed clients/gateways payload = {:create_policy, policy_id} - access_payload = {:allow_access, policy_id, actor_group_id, resource_id} - :ok = broadcast(policy_id, payload) - :ok = Events.Hooks.Accounts.broadcast_to_policies(account_id, payload) - :ok = Events.Hooks.ActorGroups.broadcast_to_policies(actor_group_id, access_payload) + :ok = PubSub.Policy.broadcast(policy_id, payload) + :ok = PubSub.Account.Policies.broadcast(account_id, payload) + + payload = {:allow_access, policy_id, actor_group_id, resource_id} + :ok = PubSub.ActorGroup.Policies.broadcast(actor_group_id, payload) end + @impl true + # Enable def on_update( %{"disabled_at" => disabled_at} = _old_data, @@ -35,10 +40,11 @@ defmodule Domain.Events.Hooks.Policies do # TODO: WAL # Enabling a policy should broadcast directly to subscribed clients/gateways payload = {:enable_policy, policy_id} - access_payload = {:allow_access, policy_id, actor_group_id, resource_id} - :ok = broadcast(policy_id, payload) - :ok = Events.Hooks.Accounts.broadcast_to_policies(account_id, payload) - :ok = Events.Hooks.ActorGroups.broadcast_to_policies(actor_group_id, access_payload) + :ok = PubSub.Policy.broadcast(policy_id, payload) + :ok = PubSub.Account.Policies.broadcast(account_id, payload) + + payload = {:allow_access, policy_id, actor_group_id, resource_id} + :ok = PubSub.ActorGroup.Policies.broadcast(actor_group_id, payload) end # Disable @@ -59,10 +65,11 @@ defmodule Domain.Events.Hooks.Policies do # TODO: WAL # Disabling a policy should broadcast directly to the subscribed clients/gateways payload = {:disable_policy, policy_id} - access_payload = {:reject_access, policy_id, actor_group_id, resource_id} - :ok = broadcast(policy_id, payload) - :ok = Events.Hooks.Accounts.broadcast_to_policies(account_id, payload) - :ok = Events.Hooks.ActorGroups.broadcast_to_policies(actor_group_id, access_payload) + :ok = PubSub.Policy.broadcast(policy_id, payload) + :ok = PubSub.Account.Policies.broadcast(account_id, payload) + + payload = {:reject_access, policy_id, actor_group_id, resource_id} + :ok = PubSub.ActorGroup.Policies.broadcast(actor_group_id, payload) end) :ok @@ -106,16 +113,18 @@ defmodule Domain.Events.Hooks.Policies do # TODO: WAL # Deleting a policy should broadcast directly to the subscribed clients/gateways payload = {:delete_policy, old_policy_id} - access_payload = {:reject_access, old_policy_id, old_actor_group_id, old_resource_id} - :ok = broadcast(old_policy_id, payload) - :ok = Events.Hooks.Accounts.broadcast_to_policies(old_account_id, payload) - :ok = Events.Hooks.ActorGroups.broadcast_to_policies(old_actor_group_id, access_payload) + :ok = PubSub.Policy.broadcast(old_policy_id, payload) + :ok = PubSub.Account.Policies.broadcast(old_account_id, payload) + + payload = {:reject_access, old_policy_id, old_actor_group_id, old_resource_id} + :ok = PubSub.ActorGroup.Policies.broadcast(old_actor_group_id, payload) payload = {:create_policy, policy_id} - access_payload = {:allow_access, policy_id, actor_group_id, resource_id} - :ok = broadcast(policy_id, payload) - :ok = Events.Hooks.Accounts.broadcast_to_policies(account_id, payload) - :ok = Events.Hooks.ActorGroups.broadcast_to_policies(actor_group_id, access_payload) + :ok = PubSub.Policy.broadcast(policy_id, payload) + :ok = PubSub.Account.Policies.broadcast(account_id, payload) + + payload = {:allow_access, policy_id, actor_group_id, resource_id} + :ok = PubSub.ActorGroup.Policies.broadcast(actor_group_id, payload) end) else Logger.warning("Breaking update ignored for policy as it is deleted or disabled", @@ -129,10 +138,11 @@ defmodule Domain.Events.Hooks.Policies do # Regular update - name, description, etc def on_update(_old_data, %{"id" => policy_id, "account_id" => account_id} = _data) do payload = {:update_policy, policy_id} - :ok = broadcast(policy_id, payload) - :ok = Events.Hooks.Accounts.broadcast_to_policies(account_id, payload) + :ok = PubSub.Policy.broadcast(policy_id, payload) + :ok = PubSub.Account.Policies.broadcast(account_id, payload) end + @impl true def on_delete( %{ "id" => policy_id, @@ -143,31 +153,17 @@ defmodule Domain.Events.Hooks.Policies do ) do Task.start(fn -> Flows.expire_flows_for_policy_id(policy_id) + # TODO: WAL # Deleting a policy should broadcast directly to the subscribed clients/gateways payload = {:delete_policy, policy_id} - access_payload = {:reject_access, policy_id, actor_group_id, resource_id} - :ok = broadcast(policy_id, payload) - :ok = Events.Hooks.Accounts.broadcast_to_policies(account_id, payload) - :ok = Events.Hooks.ActorGroups.broadcast_to_policies(actor_group_id, access_payload) + :ok = PubSub.Policy.broadcast(policy_id, payload) + :ok = PubSub.Account.Policies.broadcast(account_id, payload) + + payload = {:reject_access, policy_id, actor_group_id, resource_id} + :ok = PubSub.ActorGroup.Policies.broadcast(actor_group_id, payload) end) :ok end - - def subscribe(policy_id) do - policy_id - |> topic() - |> PubSub.subscribe() - end - - defp broadcast(policy_id, payload) do - policy_id - |> topic() - |> PubSub.broadcast(payload) - end - - defp topic(policy_id) do - "policy:#{policy_id}" - end end diff --git a/elixir/apps/domain/lib/domain/events/hooks/relay_groups.ex b/elixir/apps/domain/lib/domain/events/hooks/relay_groups.ex index 8c59c91a9..2dcda9a8d 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/relay_groups.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/relay_groups.ex @@ -1,13 +1,12 @@ defmodule Domain.Events.Hooks.RelayGroups do - def on_insert(_data) do - :ok - end + @behaviour Domain.Events.Hooks - def on_update(_old_data, _data) do - :ok - end + @impl true + def on_insert(_data), do: :ok - def on_delete(_old_data) do - :ok - end + @impl true + def on_update(_old_data, _data), do: :ok + + @impl true + def on_delete(_old_data), do: :ok end diff --git a/elixir/apps/domain/lib/domain/events/hooks/relays.ex b/elixir/apps/domain/lib/domain/events/hooks/relays.ex index 0e347cb7c..48209a4fd 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/relays.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/relays.ex @@ -1,13 +1,12 @@ defmodule Domain.Events.Hooks.Relays do - def on_insert(_data) do - :ok - end + @behaviour Domain.Events.Hooks - def on_update(_old_data, _data) do - :ok - end + @impl true + def on_insert(_data), do: :ok - def on_delete(_old_data) do - :ok - end + @impl true + def on_update(_old_data, _data), do: :ok + + @impl true + def on_delete(_old_data), do: :ok end diff --git a/elixir/apps/domain/lib/domain/events/hooks/resource_connections.ex b/elixir/apps/domain/lib/domain/events/hooks/resource_connections.ex index 0a9194544..56a6012aa 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/resource_connections.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/resource_connections.ex @@ -1,9 +1,14 @@ defmodule Domain.Events.Hooks.ResourceConnections do + @behaviour Domain.Events.Hooks alias Domain.Flows + @impl true def on_insert(_data), do: :ok + + @impl true def on_update(_old_data, _data), do: :ok + @impl true def on_delete(%{"resource_id" => resource_id} = _old_data) do # TODO: WAL # The flow expires_at field is not used for any persistence-related reason. diff --git a/elixir/apps/domain/lib/domain/events/hooks/resources.ex b/elixir/apps/domain/lib/domain/events/hooks/resources.ex index 8bcd62b53..d28dabba7 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/resources.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/resources.ex @@ -1,13 +1,16 @@ defmodule Domain.Events.Hooks.Resources do - alias Domain.Events.Hooks.Accounts + @behaviour Domain.Events.Hooks alias Domain.{Flows, PubSub} + @impl true def on_insert(%{"id" => resource_id, "account_id" => account_id} = _data) do payload = {:create_resource, resource_id} - broadcast(resource_id, payload) - Accounts.broadcast_to_resources(account_id, payload) + PubSub.Resource.broadcast(resource_id, payload) + PubSub.Account.Resources.broadcast(account_id, payload) end + @impl true + # Soft-delete def on_update(%{"deleted_at" => nil} = old_data, %{"deleted_at" => deleted_at} = _data) when not is_nil(deleted_at) do @@ -41,12 +44,12 @@ defmodule Domain.Events.Hooks.Resources do {:ok, _flows} = Flows.expire_flows_for_resource_id(resource_id) payload = {:delete_resource, resource_id} - broadcast(resource_id, payload) - Accounts.broadcast_to_resources(account_id, payload) + PubSub.Resource.broadcast(resource_id, payload) + PubSub.Account.Resources.broadcast(account_id, payload) payload = {:create_resource, resource_id} - broadcast(resource_id, payload) - Accounts.broadcast_to_resources(account_id, payload) + PubSub.Resource.broadcast(resource_id, payload) + PubSub.Account.Resources.broadcast(account_id, payload) end) :ok @@ -55,35 +58,14 @@ defmodule Domain.Events.Hooks.Resources do # Non-breaking update - for non-addressability changes - e.g. name, description, etc. def on_update(_old_data, %{"id" => resource_id, "account_id" => account_id} = _data) do payload = {:update_resource, resource_id} - broadcast(resource_id, payload) - Accounts.broadcast_to_resources(account_id, payload) + PubSub.Resource.broadcast(resource_id, payload) + PubSub.Account.Resources.broadcast(account_id, payload) end + @impl true def on_delete(%{"id" => resource_id, "account_id" => account_id} = _old_data) do payload = {:delete_resource, resource_id} - broadcast(resource_id, payload) - Accounts.broadcast_to_resources(account_id, payload) - end - - def subscribe(resource_id) do - resource_id - |> topic() - |> PubSub.subscribe() - end - - def unsubscribe(resource_id) do - resource_id - |> topic() - |> PubSub.unsubscribe() - end - - def broadcast(resource_id, payload) do - resource_id - |> topic() - |> PubSub.broadcast(payload) - end - - defp topic(resource_id) do - "resource:#{resource_id}" + PubSub.Resource.broadcast(resource_id, payload) + PubSub.Account.Resources.broadcast(account_id, payload) end end diff --git a/elixir/apps/domain/lib/domain/events/hooks/tokens.ex b/elixir/apps/domain/lib/domain/events/hooks/tokens.ex index 8a217f273..1b03761ab 100644 --- a/elixir/apps/domain/lib/domain/events/hooks/tokens.ex +++ b/elixir/apps/domain/lib/domain/events/hooks/tokens.ex @@ -1,10 +1,15 @@ defmodule Domain.Events.Hooks.Tokens do - def on_insert(_data) do - :ok - end + @behaviour Domain.Events.Hooks + alias Domain.PubSub + + @impl true + def on_insert(_data), do: :ok + + @impl true # updates for email tokens have no side effects def on_update(%{"type" => "email"}, _data), do: :ok + def on_update(_old_data, %{"type" => "email"}), do: :ok # Soft-delete @@ -14,17 +19,10 @@ defmodule Domain.Events.Hooks.Tokens do end # Regular update - def on_update(_old_data, _new_data) do - :ok - end + def on_update(_old_data, _new_data), do: :ok + @impl true def on_delete(%{"id" => token_id}) do - broadcast_disconnect(token_id) - end - - defp broadcast_disconnect(token_id) do - topic = Domain.Tokens.socket_id(token_id) - payload = %Phoenix.Socket.Broadcast{topic: topic, event: "disconnect"} - Phoenix.PubSub.broadcast(Domain.PubSub, topic, payload) + PubSub.Token.disconnect(token_id) end end diff --git a/elixir/apps/domain/lib/domain/events/topics.ex b/elixir/apps/domain/lib/domain/events/topics.ex new file mode 100644 index 000000000..9a23e2412 --- /dev/null +++ b/elixir/apps/domain/lib/domain/events/topics.ex @@ -0,0 +1,47 @@ +defmodule Domain.Events.Topics do + @moduledoc """ + A simple module to house all of the topics and broadcasts so we can see + them and verify them in one place. + """ + alias Domain.PubSub + + defmodule Account do + def subscribe(account_id) do + account_id + |> topic() + |> PubSub.subscribe() + end + + defp topic(account_id) do + "accounts:" <> account_id + end + end + + defmodule Presence do + defmodule Account do + defmodule Clients do + def subscribe(account_id) do + account_id + |> topic() + |> PubSub.subscribe() + end + + defp topic(account_id) do + "presences:account_clients:" <> account_id + end + end + + defmodule Gateways do + def subscribe(account_id) do + account_id + |> topic() + |> PubSub.subscribe() + end + + defp topic(account_id) do + "presences:account_gateways:" <> account_id + end + end + end + end +end diff --git a/elixir/apps/domain/lib/domain/gateways.ex b/elixir/apps/domain/lib/domain/gateways.ex index 203486b3d..da2c0463f 100644 --- a/elixir/apps/domain/lib/domain/gateways.ex +++ b/elixir/apps/domain/lib/domain/gateways.ex @@ -1,7 +1,7 @@ defmodule Domain.Gateways do use Supervisor alias Domain.Accounts.Account - alias Domain.{Repo, Auth, Events, Geo} + alias Domain.{Repo, Auth, Geo} alias Domain.{Accounts, Resources, Tokens, Billing} alias Domain.Gateways.{Authorizer, Gateway, Group, Presence} @@ -220,9 +220,7 @@ defmodule Domain.Gateways do @doc false def preload_gateways_presence([gateway]) do - gateway.account_id - |> Events.Hooks.Accounts.gateways_presence_topic() - |> Presence.get_by_key(gateway.id) + Presence.Account.get(gateway.account_id, gateway.id) |> case do [] -> %{gateway | online?: false} %{metas: [_ | _]} -> %{gateway | online?: true} @@ -238,8 +236,7 @@ defmodule Domain.Gateways do |> Enum.reject(&is_nil/1) |> Enum.uniq() |> Enum.reduce(%{}, fn account_id, acc -> - connected_gateways = - account_id |> Events.Hooks.Accounts.gateways_presence_topic() |> Presence.list() + connected_gateways = Presence.Account.list(account_id) Map.merge(acc, connected_gateways) end) @@ -250,9 +247,7 @@ defmodule Domain.Gateways do end def all_online_gateway_ids_by_group_id!(group_id) do - group_id - |> Events.Hooks.GatewayGroups.presence_topic() - |> Presence.list() + Presence.Group.list(group_id) |> Map.keys() end @@ -265,9 +260,7 @@ defmodule Domain.Gateways do {preload, _opts} = Keyword.pop(opts, :preload, []) connected_gateway_ids = - resource.account_id - |> Events.Hooks.Accounts.gateways_presence_topic() - |> Presence.list() + Presence.Account.list(resource.account_id) |> Map.keys() gateways = @@ -284,9 +277,7 @@ defmodule Domain.Gateways do def gateway_can_connect_to_resource?(%Gateway{} = gateway, %Resources.Resource{} = resource) do connected_gateway_ids = - resource.account_id - |> Events.Hooks.Accounts.gateways_presence_topic() - |> Presence.list() + Presence.Account.list(resource.account_id) |> Map.keys() cond do diff --git a/elixir/apps/domain/lib/domain/gateways/presence.ex b/elixir/apps/domain/lib/domain/gateways/presence.ex index 764b33e8c..4082103f8 100644 --- a/elixir/apps/domain/lib/domain/gateways/presence.ex +++ b/elixir/apps/domain/lib/domain/gateways/presence.ex @@ -2,4 +2,74 @@ defmodule Domain.Gateways.Presence do use Phoenix.Presence, otp_app: :domain, pubsub_server: Domain.PubSub + + alias Domain.Gateways.Gateway + alias Domain.PubSub + + def connect(%Gateway{} = gateway) do + with {:ok, _} <- __MODULE__.Group.track(gateway.group_id, gateway.id), + {:ok, _} <- __MODULE__.Account.track(gateway.account_id, gateway.id) do + :ok = PubSub.Gateway.subscribe(gateway.id) + end + end + + defmodule Account do + def track(account_id, gateway_id) do + Domain.Gateways.Presence.track( + self(), + topic(account_id), + gateway_id, + %{online_at: System.system_time(:second)} + ) + end + + def subscribe(account_id) do + account_id + |> topic() + |> PubSub.subscribe() + end + + def get(account_id, gateway_id) do + account_id + |> topic() + |> Domain.Gateways.Presence.get_by_key(gateway_id) + end + + def list(account_id) do + account_id + |> topic() + |> Domain.Gateways.Presence.list() + end + + defp topic(account_id) do + "presences:account_gateways:" <> account_id + end + end + + defmodule Group do + def track(gateway_group_id, gateway_id) do + Domain.Gateways.Presence.track( + self(), + topic(gateway_group_id), + gateway_id, + %{} + ) + end + + def subscribe(gateway_group_id) do + gateway_group_id + |> topic() + |> PubSub.subscribe() + end + + def list(gateway_group_id) do + gateway_group_id + |> topic() + |> Domain.Gateways.Presence.list() + end + + defp topic(gateway_group_id) do + "presences:group_gateways:" <> gateway_group_id + end + end end diff --git a/elixir/apps/domain/lib/domain/pubsub.ex b/elixir/apps/domain/lib/domain/pubsub.ex index 9822f950a..2a8479c5e 100644 --- a/elixir/apps/domain/lib/domain/pubsub.ex +++ b/elixir/apps/domain/lib/domain/pubsub.ex @@ -41,4 +41,315 @@ defmodule Domain.PubSub do def unsubscribe(topic) do Phoenix.PubSub.unsubscribe(__MODULE__, topic) end + + # TODO: These are quite repetitive. We could simplify this with a `__using__` macro. + defmodule Account do + def subscribe(account_id) do + account_id + |> topic() + |> Domain.PubSub.subscribe() + end + + def broadcast(account_id, payload) do + account_id + |> topic() + |> Domain.PubSub.broadcast(payload) + end + + defp topic(account_id) do + Atom.to_string(__MODULE__) <> ":" <> account_id + end + + defmodule Clients do + def subscribe(account_id) do + account_id + |> topic() + |> Domain.PubSub.subscribe() + end + + def broadcast(account_id, payload) do + account_id + |> topic() + |> Domain.PubSub.broadcast(payload) + end + + def disconnect(account_id) do + account_id + |> topic() + |> Domain.PubSub.broadcast("disconnect") + end + + defp topic(account_id) do + Atom.to_string(__MODULE__) <> ":" <> account_id + end + end + + defmodule Policies do + def subscribe(account_id) do + account_id + |> topic() + |> Domain.PubSub.subscribe() + end + + def broadcast(account_id, payload) do + account_id + |> topic() + |> Domain.PubSub.broadcast(payload) + end + + defp topic(account_id) do + Atom.to_string(__MODULE__) <> ":" <> account_id + end + end + + defmodule Resources do + def subscribe(account_id) do + account_id + |> topic() + |> Domain.PubSub.subscribe() + end + + def broadcast(account_id, payload) do + account_id + |> topic() + |> Domain.PubSub.broadcast(payload) + end + + defp topic(account_id) do + Atom.to_string(__MODULE__) <> ":" <> account_id + end + end + end + + defmodule Actor do + def subscribe(actor_id) do + actor_id + |> topic() + |> Domain.PubSub.subscribe() + end + + defp topic(actor_id) do + Atom.to_string(__MODULE__) <> ":" <> actor_id + end + + defmodule Memberships do + def subscribe(actor_id) do + actor_id + |> topic() + |> Domain.PubSub.subscribe() + end + + def broadcast(actor_id, payload) do + actor_id + |> topic() + |> Domain.PubSub.broadcast(payload) + end + + def broadcast_access(action, actor_id, group_id) do + Domain.Policies.Policy.Query.not_deleted() + |> Domain.Policies.Policy.Query.by_actor_group_id(group_id) + |> Domain.Repo.all() + |> Enum.each(fn policy -> + payload = {:"#{action}_access", policy.id, policy.actor_group_id, policy.resource_id} + :ok = Actor.Policies.broadcast(actor_id, payload) + end) + end + + defp topic(actor_id) do + Atom.to_string(__MODULE__) <> ":" <> actor_id + end + end + + defmodule Policies do + def subscribe(actor_id) do + actor_id + |> topic() + |> Domain.PubSub.subscribe() + end + + def broadcast(actor_id, payload) do + actor_id + |> topic() + |> Domain.PubSub.broadcast(payload) + end + + defp topic(actor_id) do + Atom.to_string(__MODULE__) <> ":" <> actor_id + end + end + end + + defmodule ActorGroup do + defmodule Policies do + def subscribe(actor_group_id) do + actor_group_id + |> topic() + |> Domain.PubSub.subscribe() + end + + def unsubscribe(actor_group_id) do + actor_group_id + |> topic() + |> Domain.PubSub.unsubscribe() + end + + def broadcast(actor_group_id, payload) do + actor_group_id + |> topic() + |> Domain.PubSub.broadcast(payload) + end + + defp topic(actor_group_id) do + Atom.to_string(__MODULE__) <> ":" <> actor_group_id + end + end + end + + defmodule Client do + def subscribe(client_id) do + client_id + |> topic() + |> Domain.PubSub.subscribe() + end + + def broadcast(client_id, payload) do + client_id + |> topic() + |> Domain.PubSub.broadcast(payload) + end + + def disconnect(client_id) do + client_id + |> topic() + |> Domain.PubSub.broadcast("disconnect") + end + + defp topic(client_id) do + Atom.to_string(__MODULE__) <> ":" <> client_id + end + end + + defmodule Flow do + def subscribe(flow_id) do + flow_id + |> topic() + |> Domain.PubSub.subscribe() + end + + def unsubscribe(flow_id) do + flow_id + |> topic() + |> Domain.PubSub.unsubscribe() + end + + def broadcast(flow_id, payload) do + flow_id + |> topic() + |> Domain.PubSub.broadcast(payload) + end + + defp topic(flow_id) do + Atom.to_string(__MODULE__) <> ":" <> flow_id + end + end + + defmodule GatewayGroup do + def subscribe(gateway_group_id) do + gateway_group_id + |> topic() + |> Domain.PubSub.subscribe() + end + + def unsubscribe(gateway_group_id) do + gateway_group_id + |> topic() + |> Domain.PubSub.unsubscribe() + end + + defp topic(gateway_group_id) do + Atom.to_string(__MODULE__) <> ":" <> gateway_group_id + end + end + + defmodule Gateway do + def subscribe(gateway_id) do + gateway_id + |> topic() + |> Domain.PubSub.subscribe() + end + + def broadcast(gateway_id, payload) do + gateway_id + |> topic() + |> Domain.PubSub.broadcast(payload) + end + + def disconnect(gateway_id) do + gateway_id + |> topic() + |> Domain.PubSub.broadcast("disconnect") + end + + defp topic(gateway_id) do + Atom.to_string(__MODULE__) <> ":" <> gateway_id + end + end + + defmodule Policy do + def subscribe(policy_id) do + policy_id + |> topic() + |> Domain.PubSub.subscribe() + end + + def broadcast(policy_id, payload) do + policy_id + |> topic() + |> Domain.PubSub.broadcast(payload) + end + + defp topic(policy_id) do + Atom.to_string(__MODULE__) <> ":" <> policy_id + end + end + + defmodule Resource do + def subscribe(resource_id) do + resource_id + |> topic() + |> Domain.PubSub.subscribe() + end + + def unsubscribe(resource_id) do + resource_id + |> topic() + |> Domain.PubSub.unsubscribe() + end + + def broadcast(resource_id, payload) do + resource_id + |> topic() + |> Domain.PubSub.broadcast(payload) + end + + defp topic(resource_id) do + Atom.to_string(__MODULE__) <> ":" <> resource_id + end + end + + defmodule Token do + def disconnect(token_id) do + token_id + |> topic() + |> Domain.PubSub.broadcast(%Phoenix.Socket.Broadcast{ + topic: topic(token_id), + event: "disconnect" + }) + end + + defp topic(token_id) do + # This topic is managed by Phoenix + Domain.Tokens.socket_id(token_id) + end + end end diff --git a/elixir/apps/domain/test/domain/actors_test.exs b/elixir/apps/domain/test/domain/actors_test.exs index ff5878237..94e5762c4 100644 --- a/elixir/apps/domain/test/domain/actors_test.exs +++ b/elixir/apps/domain/test/domain/actors_test.exs @@ -632,7 +632,7 @@ defmodule Domain.ActorsTest do } do actor = Fixtures.Actors.create_actor(account: account) client = Fixtures.Clients.create_client(account: account, actor: actor) - Events.Hooks.Clients.connect(client) + Clients.Presence.connect(client) assert {:ok, peek} = peek_actor_clients([actor], 3, subject) assert [%Clients.Client{} = client] = peek[actor.id].items diff --git a/elixir/apps/domain/test/domain/auth/adapters/google_workspace/jobs/sync_directory_test.exs b/elixir/apps/domain/test/domain/auth/adapters/google_workspace/jobs/sync_directory_test.exs index d118676c9..45b8da4ac 100644 --- a/elixir/apps/domain/test/domain/auth/adapters/google_workspace/jobs/sync_directory_test.exs +++ b/elixir/apps/domain/test/domain/auth/adapters/google_workspace/jobs/sync_directory_test.exs @@ -1,6 +1,6 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace.Jobs.SyncDirectoryTest do use Domain.DataCase, async: true - alias Domain.{Auth, Actors, Events} + alias Domain.{Auth, Actors, Events, PubSub} alias Domain.Mocks.GoogleWorkspaceDirectory import Domain.Auth.Adapters.GoogleWorkspace.Jobs.SyncDirectory @@ -629,12 +629,12 @@ defmodule Domain.Auth.Adapters.GoogleWorkspace.Jobs.SyncDirectoryTest do Fixtures.Actors.create_membership(account: account, actor: actor, group: deleted_group) Fixtures.Actors.create_membership(account: account, actor: actor, group: org_unit) - :ok = Events.Hooks.Actors.subscribe_to_memberships(actor.id) - :ok = Events.Hooks.Actors.subscribe_to_memberships(other_actor.id) - :ok = Events.Hooks.Actors.subscribe_to_memberships(deleted_membership.actor_id) - :ok = Events.Hooks.Actors.subscribe_to_policies(actor.id) - :ok = Events.Hooks.Actors.subscribe_to_policies(other_actor.id) - :ok = Events.Hooks.ActorGroups.subscribe_to_policies(deleted_group.id) + :ok = PubSub.Actor.Memberships.subscribe(actor.id) + :ok = PubSub.Actor.Memberships.subscribe(other_actor.id) + :ok = PubSub.Actor.Memberships.subscribe(deleted_membership.actor_id) + :ok = PubSub.Actor.Policies.subscribe(actor.id) + :ok = PubSub.Actor.Policies.subscribe(other_actor.id) + :ok = PubSub.ActorGroup.Policies.subscribe(deleted_group.id) GoogleWorkspaceDirectory.override_endpoint_url("http://localhost:#{bypass.port}/") diff --git a/elixir/apps/domain/test/domain/auth/adapters/jumpcloud/jobs/sync_directory_test.exs b/elixir/apps/domain/test/domain/auth/adapters/jumpcloud/jobs/sync_directory_test.exs index fd1872a46..dcbfbab15 100644 --- a/elixir/apps/domain/test/domain/auth/adapters/jumpcloud/jobs/sync_directory_test.exs +++ b/elixir/apps/domain/test/domain/auth/adapters/jumpcloud/jobs/sync_directory_test.exs @@ -1,6 +1,6 @@ defmodule Domain.Auth.Adapters.JumpCloud.Jobs.SyncDirectoryTest do use Domain.DataCase, async: true - alias Domain.{Auth, Actors, Events} + alias Domain.{Auth, Actors, Events, PubSub} alias Domain.Mocks.WorkOSDirectory import Domain.Auth.Adapters.JumpCloud.Jobs.SyncDirectory @@ -416,12 +416,12 @@ defmodule Domain.Auth.Adapters.JumpCloud.Jobs.SyncDirectoryTest do deleted_membership = Fixtures.Actors.create_membership(account: account, group: group) Fixtures.Actors.create_membership(account: account, actor: actor, group: deleted_group) - :ok = Events.Hooks.Actors.subscribe_to_memberships(actor.id) - :ok = Events.Hooks.Actors.subscribe_to_memberships(other_actor.id) - :ok = Events.Hooks.Actors.subscribe_to_memberships(deleted_membership.actor_id) - :ok = Events.Hooks.Actors.subscribe_to_policies(actor.id) - :ok = Events.Hooks.Actors.subscribe_to_policies(other_actor.id) - :ok = Events.Hooks.ActorGroups.subscribe_to_policies(deleted_group.id) + :ok = PubSub.Actor.Memberships.subscribe(actor.id) + :ok = PubSub.Actor.Memberships.subscribe(other_actor.id) + :ok = PubSub.Actor.Memberships.subscribe(deleted_membership.actor_id) + :ok = PubSub.Actor.Policies.subscribe(actor.id) + :ok = PubSub.Actor.Policies.subscribe(other_actor.id) + :ok = PubSub.ActorGroup.Policies.subscribe(deleted_group.id) WorkOSDirectory.override_base_url("http://localhost:#{bypass.port}") WorkOSDirectory.mock_list_directories_endpoint(bypass) diff --git a/elixir/apps/domain/test/domain/auth/adapters/microsoft_entra/jobs/sync_directory_test.exs b/elixir/apps/domain/test/domain/auth/adapters/microsoft_entra/jobs/sync_directory_test.exs index bd20bcc7e..262ec3d84 100644 --- a/elixir/apps/domain/test/domain/auth/adapters/microsoft_entra/jobs/sync_directory_test.exs +++ b/elixir/apps/domain/test/domain/auth/adapters/microsoft_entra/jobs/sync_directory_test.exs @@ -1,6 +1,6 @@ defmodule Domain.Auth.Adapters.MicrosoftEntra.Jobs.SyncDirectoryTest do use Domain.DataCase, async: true - alias Domain.{Auth, Actors, Events} + alias Domain.{Auth, Actors, Events, PubSub} alias Domain.Mocks.MicrosoftEntraDirectory import Domain.Auth.Adapters.MicrosoftEntra.Jobs.SyncDirectory @@ -464,12 +464,12 @@ defmodule Domain.Auth.Adapters.MicrosoftEntra.Jobs.SyncDirectoryTest do deleted_membership = Fixtures.Actors.create_membership(account: account, group: group) Fixtures.Actors.create_membership(account: account, actor: actor, group: deleted_group) - :ok = Events.Hooks.Actors.subscribe_to_memberships(actor.id) - :ok = Events.Hooks.Actors.subscribe_to_memberships(other_actor.id) - :ok = Events.Hooks.Actors.subscribe_to_memberships(deleted_membership.actor_id) - :ok = Events.Hooks.Actors.subscribe_to_policies(actor.id) - :ok = Events.Hooks.Actors.subscribe_to_policies(other_actor.id) - :ok = Events.Hooks.ActorGroups.subscribe_to_policies(deleted_group.id) + :ok = PubSub.Actor.Memberships.subscribe(actor.id) + :ok = PubSub.Actor.Memberships.subscribe(other_actor.id) + :ok = PubSub.Actor.Memberships.subscribe(deleted_membership.actor_id) + :ok = PubSub.Actor.Policies.subscribe(actor.id) + :ok = PubSub.Actor.Policies.subscribe(other_actor.id) + :ok = PubSub.ActorGroup.Policies.subscribe(deleted_group.id) MicrosoftEntraDirectory.mock_groups_list_endpoint( bypass, diff --git a/elixir/apps/domain/test/domain/auth/adapters/okta/jobs/sync_directory_test.exs b/elixir/apps/domain/test/domain/auth/adapters/okta/jobs/sync_directory_test.exs index b61dfe108..650ff5ddd 100644 --- a/elixir/apps/domain/test/domain/auth/adapters/okta/jobs/sync_directory_test.exs +++ b/elixir/apps/domain/test/domain/auth/adapters/okta/jobs/sync_directory_test.exs @@ -1,6 +1,6 @@ defmodule Domain.Auth.Adapters.Okta.Jobs.SyncDirectoryTest do use Domain.DataCase, async: true - alias Domain.{Auth, Actors, Events} + alias Domain.{Auth, Actors, Events, PubSub} alias Domain.Mocks.OktaDirectory import Domain.Auth.Adapters.Okta.Jobs.SyncDirectory @@ -708,12 +708,12 @@ defmodule Domain.Auth.Adapters.Okta.Jobs.SyncDirectoryTest do deleted_membership = Fixtures.Actors.create_membership(account: account, group: group) Fixtures.Actors.create_membership(account: account, actor: actor, group: deleted_group) - :ok = Events.Hooks.Actors.subscribe_to_memberships(actor.id) - :ok = Events.Hooks.Actors.subscribe_to_memberships(other_actor.id) - :ok = Events.Hooks.Actors.subscribe_to_memberships(deleted_membership.actor_id) - :ok = Events.Hooks.Actors.subscribe_to_policies(actor.id) - :ok = Events.Hooks.Actors.subscribe_to_policies(other_actor.id) - :ok = Events.Hooks.ActorGroups.subscribe_to_policies(deleted_group.id) + :ok = PubSub.Actor.Memberships.subscribe(actor.id) + :ok = PubSub.Actor.Memberships.subscribe(other_actor.id) + :ok = PubSub.Actor.Memberships.subscribe(deleted_membership.actor_id) + :ok = PubSub.Actor.Policies.subscribe(actor.id) + :ok = PubSub.Actor.Policies.subscribe(other_actor.id) + :ok = PubSub.ActorGroup.Policies.subscribe(deleted_group.id) OktaDirectory.mock_groups_list_endpoint(bypass, 200, Jason.encode!(groups)) OktaDirectory.mock_users_list_endpoint(bypass, 200, Jason.encode!(users)) @@ -809,12 +809,12 @@ defmodule Domain.Auth.Adapters.Okta.Jobs.SyncDirectoryTest do "account_id" => deleted_policy.account_id }) - assert_receive {:reject_access, ^policy_id, ^group_id, ^resource_id} - # TODO: WAL # Remove this after direct broadcast Process.sleep(100) + assert_receive {:reject_access, ^policy_id, ^group_id, ^resource_id} + # Deleted policies expire all flows authorized by them deleted_group_flow = Repo.reload(deleted_group_flow) assert DateTime.compare(deleted_group_flow.expires_at, DateTime.utc_now()) == :lt diff --git a/elixir/apps/domain/test/domain/clients_test.exs b/elixir/apps/domain/test/domain/clients_test.exs index 7f13931d7..434591656 100644 --- a/elixir/apps/domain/test/domain/clients_test.exs +++ b/elixir/apps/domain/test/domain/clients_test.exs @@ -1,7 +1,7 @@ defmodule Domain.ClientsTest do use Domain.DataCase, async: true import Domain.Clients - alias Domain.{Clients, Events} + alias Domain.{Clients, PubSub} setup do account = Fixtures.Accounts.create_account() @@ -117,7 +117,11 @@ defmodule Domain.ClientsTest do assert {:ok, client} = fetch_client_by_id(client.id, subject, preload: [:online?]) assert client.online? == false - assert Events.Hooks.Clients.connect(client) == :ok + {:ok, _} = Clients.Presence.Account.track(client.account_id, client.id) + {:ok, _} = Clients.Presence.Actor.track(client.actor_id, client.id) + :ok = PubSub.Client.subscribe(client.id) + :ok = PubSub.Account.Clients.subscribe(client.account_id) + assert {:ok, client} = fetch_client_by_id(client.id, subject, preload: [:online?]) assert client.online? == true end @@ -222,7 +226,10 @@ defmodule Domain.ClientsTest do assert client = fetch_client_by_id!(client.id, preload: [:online?]) assert client.online? == false - assert Events.Hooks.Clients.connect(client) == :ok + {:ok, _} = Clients.Presence.Account.track(client.account_id, client.id) + {:ok, _} = Clients.Presence.Actor.track(client.actor_id, client.id) + :ok = PubSub.Client.subscribe(client.id) + :ok = PubSub.Account.Clients.subscribe(client.account_id) assert client = fetch_client_by_id!(client.id, preload: [:online?]) assert client.online? == true end @@ -281,7 +288,10 @@ defmodule Domain.ClientsTest do assert {:ok, [client], _metadata} = list_clients(subject, preload: [:online?]) assert client.online? == false - assert Events.Hooks.Clients.connect(client) == :ok + {:ok, _} = Clients.Presence.Account.track(client.account_id, client.id) + {:ok, _} = Clients.Presence.Actor.track(client.actor_id, client.id) + :ok = PubSub.Client.subscribe(client.id) + :ok = PubSub.Account.Clients.subscribe(client.account_id) assert {:ok, [client], _metadata} = list_clients(subject, preload: [:online?]) assert client.online? == true end @@ -395,7 +405,7 @@ defmodule Domain.ClientsTest do client_attrs = Fixtures.Clients.client_attrs() assert changeset = change_client(client, client_attrs) - assert %Ecto.Changeset{data: %Domain.Clients.Client{}} = changeset + assert %Ecto.Changeset{data: %Clients.Client{}} = changeset assert changeset.changes == %{name: client_attrs.name} end diff --git a/elixir/apps/domain/test/domain/events/hooks/accounts_test.exs b/elixir/apps/domain/test/domain/events/hooks/accounts_test.exs index b5fe5e4f2..9dd348333 100644 --- a/elixir/apps/domain/test/domain/events/hooks/accounts_test.exs +++ b/elixir/apps/domain/test/domain/events/hooks/accounts_test.exs @@ -16,7 +16,7 @@ defmodule Domain.Events.Hooks.AccountsTest do test "sends :config_changed if config changes" do account = Fixtures.Accounts.create_account() - :ok = subscribe(account.id) + :ok = Domain.PubSub.Account.subscribe(account.id) old_data = %{ "id" => account.id, @@ -38,7 +38,7 @@ defmodule Domain.Events.Hooks.AccountsTest do test "does not send :config_changed if config does not change" do account = Fixtures.Accounts.create_account() - :ok = subscribe(account.id) + :ok = Domain.PubSub.Account.subscribe(account.id) old_data = %{ "id" => account.id, @@ -60,7 +60,7 @@ defmodule Domain.Events.Hooks.AccountsTest do old_data = %{"id" => account_id, "disabled_at" => nil} data = %{"id" => account_id, "disabled_at" => DateTime.utc_now()} - :ok = subscribe_to_clients(account_id) + :ok = Domain.PubSub.Account.Clients.subscribe(account_id) assert :ok == on_update(old_data, data) diff --git a/elixir/apps/domain/test/domain/events/hooks/actor_group_memberships_test.exs b/elixir/apps/domain/test/domain/events/hooks/actor_group_memberships_test.exs index de60c0f06..6b6aeb746 100644 --- a/elixir/apps/domain/test/domain/events/hooks/actor_group_memberships_test.exs +++ b/elixir/apps/domain/test/domain/events/hooks/actor_group_memberships_test.exs @@ -1,7 +1,7 @@ defmodule Domain.Events.Hooks.ActorGroupMembershipsTest do use API.ChannelCase, async: true import Domain.Events.Hooks.ActorGroupMemberships - alias Domain.Events.Hooks.Actors + alias Domain.PubSub setup do %{old_data: %{}, data: %{}} @@ -17,9 +17,14 @@ defmodule Domain.Events.Hooks.ActorGroupMembershipsTest do "group_id" => group_id } - :ok = Actors.subscribe_to_memberships(actor_id) + :ok = PubSub.Actor.Memberships.subscribe(actor_id) assert :ok == on_insert(data) + + # TODO: WAL + # Remove this when direct broadcast is implement + Process.sleep(100) + assert_receive {:create_membership, ^actor_id, ^group_id} end end @@ -81,7 +86,7 @@ defmodule Domain.Events.Hooks.ActorGroupMembershipsTest do "group_id" => group_id } - :ok = Actors.subscribe_to_memberships(actor_id) + :ok = PubSub.Actor.Memberships.subscribe(actor_id) assert :ok == on_delete(data) assert_receive {:delete_membership, ^actor_id, ^group_id} diff --git a/elixir/apps/domain/test/domain/events/hooks/clients_test.exs b/elixir/apps/domain/test/domain/events/hooks/clients_test.exs index 58c796252..5117b0185 100644 --- a/elixir/apps/domain/test/domain/events/hooks/clients_test.exs +++ b/elixir/apps/domain/test/domain/events/hooks/clients_test.exs @@ -1,6 +1,7 @@ defmodule Domain.Events.Hooks.ClientsTest do use Domain.DataCase, async: true import Domain.Events.Hooks.Clients + alias Domain.{Clients, PubSub} setup do %{old_data: %{}, data: %{}} @@ -15,7 +16,7 @@ defmodule Domain.Events.Hooks.ClientsTest do describe "update/2" do test "soft-delete broadcasts disconnect" do client = Fixtures.Clients.create_client() - :ok = connect(client) + :ok = Clients.Presence.connect(client) old_data = %{"id" => client.id, "deleted_at" => nil} data = %{"id" => client.id, "deleted_at" => DateTime.utc_now()} @@ -28,7 +29,7 @@ defmodule Domain.Events.Hooks.ClientsTest do test "update broadcasts :update" do client = Fixtures.Clients.create_client() - :ok = connect(client) + :ok = Clients.Presence.connect(client) old_data = %{"id" => client.id, "name" => "Old Client"} data = %{"id" => client.id, "name" => "Updated Client"} @@ -43,7 +44,7 @@ defmodule Domain.Events.Hooks.ClientsTest do describe "delete/1" do test "broadcasts disconnect" do client = Fixtures.Clients.create_client() - :ok = connect(client) + :ok = Clients.Presence.connect(client) old_data = %{"id" => client.id} @@ -57,20 +58,12 @@ defmodule Domain.Events.Hooks.ClientsTest do describe "connect/1" do test "tracks client presence and subscribes to topics" do client = Fixtures.Clients.create_client() - assert :ok == connect(client) + assert :ok == Clients.Presence.connect(client) - assert client.account_id - |> Domain.Events.Hooks.Accounts.clients_presence_topic() - |> Domain.Clients.Presence.get_by_key(client.id) + assert Clients.Presence.Account.get(client.account_id, client.id) + assert Clients.Presence.Actor.get(client.actor_id, client.id) - assert client.actor_id - |> Domain.Events.Hooks.Actors.clients_presence_topic() - |> Domain.Clients.Presence.get_by_key(client.id) - - Domain.PubSub.broadcast( - Domain.Events.Hooks.Accounts.clients_topic(client.account_id), - :test_event - ) + PubSub.Account.Clients.broadcast(client.account_id, :test_event) assert_receive :test_event end @@ -79,9 +72,9 @@ defmodule Domain.Events.Hooks.ClientsTest do describe "broadcast/2" do test "broadcasts payload to client topic" do client = Fixtures.Clients.create_client() - :ok = connect(client) + :ok = Clients.Presence.connect(client) - assert :ok == broadcast(client.id, :updated) + assert :ok == PubSub.Client.broadcast(client.id, :updated) assert_receive :updated end diff --git a/elixir/apps/domain/test/domain/events/hooks/flows_test.exs b/elixir/apps/domain/test/domain/events/hooks/flows_test.exs index b38307b49..ca99f0262 100644 --- a/elixir/apps/domain/test/domain/events/hooks/flows_test.exs +++ b/elixir/apps/domain/test/domain/events/hooks/flows_test.exs @@ -27,7 +27,7 @@ defmodule Domain.Events.Hooks.FlowsTest do "resource_id" => resource_id } - :ok = subscribe(flow_id) + :ok = Domain.PubSub.Flow.subscribe(flow_id) assert :ok == on_update(old_data, data) assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} @@ -47,7 +47,7 @@ defmodule Domain.Events.Hooks.FlowsTest do "resource_id" => resource_id } - :ok = subscribe(flow_id) + :ok = Domain.PubSub.Flow.subscribe(flow_id) assert :ok == on_update(old_data, data) refute_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} @@ -84,7 +84,7 @@ defmodule Domain.Events.Hooks.FlowsTest do "resource_id" => resource_id } - :ok = subscribe(flow_id) + :ok = Domain.PubSub.Flow.subscribe(flow_id) assert :ok == on_delete(old_data) assert_receive {:expire_flow, ^flow_id, ^client_id, ^resource_id} diff --git a/elixir/apps/domain/test/domain/events/hooks/gateways_test.exs b/elixir/apps/domain/test/domain/events/hooks/gateways_test.exs index b09825810..4adcbe0df 100644 --- a/elixir/apps/domain/test/domain/events/hooks/gateways_test.exs +++ b/elixir/apps/domain/test/domain/events/hooks/gateways_test.exs @@ -19,7 +19,7 @@ defmodule Domain.Events.Hooks.GatewaysTest do old_data = %{"id" => gateway.id, "deleted_at" => nil} data = %{"id" => gateway.id, "deleted_at" => "2023-10-01T00:00:00Z"} - :ok = connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) :ok = on_update(old_data, data) assert_receive "disconnect" @@ -31,7 +31,7 @@ defmodule Domain.Events.Hooks.GatewaysTest do old_data = %{"id" => gateway.id} data = %{"id" => gateway.id, "name" => "New Gateway Name"} - :ok = connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) :ok = on_update(old_data, data) refute_receive "disconnect" @@ -44,7 +44,7 @@ defmodule Domain.Events.Hooks.GatewaysTest do old_data = %{"id" => gateway.id} - :ok = connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) :ok = on_delete(old_data) assert_receive "disconnect" diff --git a/elixir/apps/domain/test/domain/events/hooks/policies_test.exs b/elixir/apps/domain/test/domain/events/hooks/policies_test.exs index bb506c2a9..9cfa6c3f1 100644 --- a/elixir/apps/domain/test/domain/events/hooks/policies_test.exs +++ b/elixir/apps/domain/test/domain/events/hooks/policies_test.exs @@ -1,7 +1,7 @@ defmodule Domain.Events.Hooks.PoliciesTest do use Domain.DataCase, async: true import Domain.Events.Hooks.Policies - alias Domain.Events + alias Domain.PubSub describe "insert/1" do test "broadcasts :create_policy and :allow_access" do @@ -17,9 +17,9 @@ defmodule Domain.Events.Hooks.PoliciesTest do "resource_id" => resource_id } - :ok = subscribe(policy_id) - :ok = Events.Hooks.Accounts.subscribe_to_policies(account_id) - :ok = Events.Hooks.ActorGroups.subscribe_to_policies(actor_group_id) + :ok = PubSub.Policy.subscribe(policy_id) + :ok = PubSub.Account.Policies.subscribe(account_id) + :ok = PubSub.ActorGroup.Policies.subscribe(actor_group_id) assert :ok == on_insert(data) assert_receive {:create_policy, ^policy_id} @@ -45,9 +45,9 @@ defmodule Domain.Events.Hooks.PoliciesTest do data = Map.put(old_data, "disabled_at", nil) - :ok = subscribe(policy_id) - :ok = Events.Hooks.Accounts.subscribe_to_policies(account_id) - :ok = Events.Hooks.ActorGroups.subscribe_to_policies(actor_group_id) + :ok = Domain.PubSub.Policy.subscribe(policy_id) + :ok = Domain.PubSub.Account.Policies.subscribe(account_id) + :ok = Domain.PubSub.ActorGroup.Policies.subscribe(actor_group_id) assert :ok == on_update(old_data, data) assert_receive {:enable_policy, ^policy_id} @@ -72,9 +72,9 @@ defmodule Domain.Events.Hooks.PoliciesTest do data = Map.put(old_data, "disabled_at", "2023-10-01T00:00:00Z") - :ok = subscribe(policy_id) - :ok = Events.Hooks.Accounts.subscribe_to_policies(account_id) - :ok = Events.Hooks.ActorGroups.subscribe_to_policies(actor_group_id) + :ok = PubSub.Policy.subscribe(policy_id) + :ok = PubSub.Account.Policies.subscribe(account_id) + :ok = PubSub.ActorGroup.Policies.subscribe(actor_group_id) assert :ok == on_update(old_data, data) assert_receive {:disable_policy, ^policy_id} @@ -107,9 +107,9 @@ defmodule Domain.Events.Hooks.PoliciesTest do data = Map.put(old_data, "deleted_at", "2023-10-01T00:00:00Z") - :ok = subscribe(policy_id) - :ok = Events.Hooks.Accounts.subscribe_to_policies(account_id) - :ok = Events.Hooks.ActorGroups.subscribe_to_policies(actor_group_id) + :ok = PubSub.Policy.subscribe(policy_id) + :ok = PubSub.Account.Policies.subscribe(account_id) + :ok = PubSub.ActorGroup.Policies.subscribe(actor_group_id) assert :ok == on_update(old_data, data) assert_receive {:delete_policy, ^policy_id} @@ -142,9 +142,9 @@ defmodule Domain.Events.Hooks.PoliciesTest do data = Map.put(old_data, "resource_id", "new-resource-123") - :ok = subscribe(policy_id) - :ok = Events.Hooks.Accounts.subscribe_to_policies(account_id) - :ok = Events.Hooks.ActorGroups.subscribe_to_policies(actor_group_id) + :ok = PubSub.Policy.subscribe(policy_id) + :ok = PubSub.Account.Policies.subscribe(account_id) + :ok = PubSub.ActorGroup.Policies.subscribe(actor_group_id) assert :ok == on_update(old_data, data) @@ -182,9 +182,9 @@ defmodule Domain.Events.Hooks.PoliciesTest do data = Map.put(old_data, "resource_id", "new-resource-123") - :ok = subscribe(policy_id) - :ok = Events.Hooks.Accounts.subscribe_to_policies(account_id) - :ok = Events.Hooks.ActorGroups.subscribe_to_policies(actor_group_id) + :ok = PubSub.Policy.subscribe(policy_id) + :ok = PubSub.Account.Policies.subscribe(account_id) + :ok = PubSub.ActorGroup.Policies.subscribe(actor_group_id) assert :ok == on_update(old_data, data) @@ -219,9 +219,9 @@ defmodule Domain.Events.Hooks.PoliciesTest do data = Map.put(old_data, "resource_id", "new-resource-123") - :ok = subscribe(policy_id) - :ok = Events.Hooks.Accounts.subscribe_to_policies(account_id) - :ok = Events.Hooks.ActorGroups.subscribe_to_policies(actor_group_id) + :ok = PubSub.Policy.subscribe(policy_id) + :ok = PubSub.Account.Policies.subscribe(account_id) + :ok = PubSub.ActorGroup.Policies.subscribe(actor_group_id) assert :ok == on_update(old_data, data) @@ -248,9 +248,9 @@ defmodule Domain.Events.Hooks.PoliciesTest do data = Map.put(old_data, "deleted_at", "2023-10-01T00:00:00Z") - :ok = subscribe(policy_id) - :ok = Events.Hooks.Accounts.subscribe_to_policies(account_id) - :ok = Events.Hooks.ActorGroups.subscribe_to_policies(actor_group_id) + :ok = PubSub.Policy.subscribe(policy_id) + :ok = PubSub.Account.Policies.subscribe(account_id) + :ok = PubSub.ActorGroup.Policies.subscribe(actor_group_id) assert :ok == on_update(old_data, data) assert_receive {:delete_policy, ^policy_id} diff --git a/elixir/apps/domain/test/domain/events/hooks/resources_test.exs b/elixir/apps/domain/test/domain/events/hooks/resources_test.exs index 80c36284a..593cd3aed 100644 --- a/elixir/apps/domain/test/domain/events/hooks/resources_test.exs +++ b/elixir/apps/domain/test/domain/events/hooks/resources_test.exs @@ -1,13 +1,14 @@ defmodule Domain.Events.Hooks.ResourcesTest do use Domain.DataCase, async: true import Domain.Events.Hooks.Resources + alias Domain.PubSub describe "insert/1" do test "broadcasts :create_resource to subscribed" do resource_id = "test_resource" account_id = "test_account" - :ok = subscribe(resource_id) - :ok = Domain.Events.Hooks.Accounts.subscribe_to_resources(account_id) + :ok = PubSub.Resource.subscribe(resource_id) + :ok = PubSub.Account.Resources.subscribe(account_id) data = %{"id" => resource_id, "account_id" => account_id} @@ -17,7 +18,7 @@ defmodule Domain.Events.Hooks.ResourcesTest do assert_receive {:create_resource, ^resource_id} assert_receive {:create_resource, ^resource_id} - :ok = unsubscribe(resource_id) + :ok = PubSub.Resource.unsubscribe(resource_id) assert :ok = on_insert(data) assert_receive {:create_resource, ^resource_id} @@ -44,8 +45,8 @@ defmodule Domain.Events.Hooks.ResourcesTest do test "broadcasts :delete_resource to subscribed for soft-deletions" do resource_id = "test_resource" account_id = "test_account" - :ok = subscribe(resource_id) - :ok = Domain.Events.Hooks.Accounts.subscribe_to_resources(account_id) + :ok = PubSub.Resource.subscribe(resource_id) + :ok = PubSub.Account.Resources.subscribe(account_id) old_data = %{"id" => resource_id, "account_id" => account_id, "deleted_at" => nil} @@ -59,7 +60,7 @@ defmodule Domain.Events.Hooks.ResourcesTest do assert_receive {:delete_resource, ^resource_id} assert_receive {:delete_resource, ^resource_id} - :ok = unsubscribe(resource_id) + :ok = PubSub.Resource.unsubscribe(resource_id) assert :ok = on_update(old_data, data) assert_receive {:delete_resource, ^resource_id} @@ -67,8 +68,8 @@ defmodule Domain.Events.Hooks.ResourcesTest do end test "expires flows when resource type changes", %{flow: flow, old_data: old_data} do - :ok = subscribe(flow.resource_id) - :ok = Domain.Events.Hooks.Accounts.subscribe_to_resources(flow.account_id) + :ok = PubSub.Resource.subscribe(flow.resource_id) + :ok = PubSub.Account.Resources.subscribe(flow.account_id) data = Map.put(old_data, "type", "cidr") @@ -90,8 +91,8 @@ defmodule Domain.Events.Hooks.ResourcesTest do end test "expires flows when resource address changes", %{flow: flow, old_data: old_data} do - :ok = subscribe(flow.resource_id) - :ok = Domain.Events.Hooks.Accounts.subscribe_to_resources(flow.account_id) + :ok = PubSub.Resource.subscribe(flow.resource_id) + :ok = PubSub.Account.Resources.subscribe(flow.account_id) data = Map.put(old_data, "address", "4.3.2.1") @@ -113,8 +114,8 @@ defmodule Domain.Events.Hooks.ResourcesTest do end test "expires flows when resource filters change", %{flow: flow, old_data: old_data} do - :ok = subscribe(flow.resource_id) - :ok = Domain.Events.Hooks.Accounts.subscribe_to_resources(flow.account_id) + :ok = PubSub.Resource.subscribe(flow.resource_id) + :ok = PubSub.Account.Resources.subscribe(flow.account_id) data = Map.put(old_data, "filters", ["new_filter"]) @@ -136,8 +137,8 @@ defmodule Domain.Events.Hooks.ResourcesTest do end test "expires flows when resource ip_stack changes", %{flow: flow, old_data: old_data} do - :ok = subscribe(flow.resource_id) - :ok = Domain.Events.Hooks.Accounts.subscribe_to_resources(flow.account_id) + :ok = PubSub.Resource.subscribe(flow.resource_id) + :ok = PubSub.Account.Resources.subscribe(flow.account_id) data = Map.put(old_data, "ip_stack", "ipv4_only") @@ -159,8 +160,8 @@ defmodule Domain.Events.Hooks.ResourcesTest do end test "broadcasts update for non-addressability change", %{flow: flow, old_data: old_data} do - :ok = subscribe(flow.resource_id) - :ok = Domain.Events.Hooks.Accounts.subscribe_to_resources(flow.account_id) + :ok = PubSub.Resource.subscribe(flow.resource_id) + :ok = PubSub.Account.Resources.subscribe(flow.account_id) data = Map.put(old_data, "name", "New Name") @@ -181,8 +182,8 @@ defmodule Domain.Events.Hooks.ResourcesTest do test "broadcasts :delete_resource to subscribed" do resource_id = "test_resource" account_id = "test_account" - :ok = subscribe(resource_id) - :ok = Domain.Events.Hooks.Accounts.subscribe_to_resources(account_id) + :ok = PubSub.Resource.subscribe(resource_id) + :ok = PubSub.Account.Resources.subscribe(account_id) old_data = %{"id" => resource_id, "account_id" => account_id} @@ -190,7 +191,7 @@ defmodule Domain.Events.Hooks.ResourcesTest do assert_receive {:delete_resource, ^resource_id} assert_receive {:delete_resource, ^resource_id} - :ok = unsubscribe(resource_id) + :ok = PubSub.Resource.unsubscribe(resource_id) assert :ok = on_delete(old_data) assert_receive {:delete_resource, ^resource_id} diff --git a/elixir/apps/domain/test/domain/gateways_test.exs b/elixir/apps/domain/test/domain/gateways_test.exs index 453b9d3c6..1f033d722 100644 --- a/elixir/apps/domain/test/domain/gateways_test.exs +++ b/elixir/apps/domain/test/domain/gateways_test.exs @@ -1,7 +1,7 @@ defmodule Domain.GatewaysTest do use Domain.DataCase, async: true import Domain.Gateways - alias Domain.{Events, Gateways, Tokens, Resources} + alias Domain.{Gateways, Tokens, Resources} setup do account = Fixtures.Accounts.create_account() @@ -622,7 +622,7 @@ defmodule Domain.GatewaysTest do } do offline_gateway = Fixtures.Gateways.create_gateway(account: account) online_gateway = Fixtures.Gateways.create_gateway(account: account) - :ok = Events.Hooks.Gateways.connect(online_gateway) + :ok = Gateways.Presence.connect(online_gateway) Fixtures.Gateways.create_gateway() assert {:ok, gateways, _metadata} = list_gateways(subject, preload: :online?) @@ -689,7 +689,7 @@ defmodule Domain.GatewaysTest do connections: [%{gateway_group_id: gateway.group_id}] ) - assert Events.Hooks.Gateways.connect(gateway) == :ok + assert Gateways.Presence.connect(gateway) == :ok assert {:ok, [connected_gateway]} = all_connected_gateways_for_resource(resource, subject) assert connected_gateway.id == gateway.id @@ -702,7 +702,7 @@ defmodule Domain.GatewaysTest do resource = Fixtures.Resources.create_resource(account: account) gateway = Fixtures.Gateways.create_gateway(account: account) - assert Events.Hooks.Gateways.connect(gateway) == :ok + assert Gateways.Presence.connect(gateway) == :ok assert all_connected_gateways_for_resource(resource, subject) == {:ok, []} end @@ -711,7 +711,7 @@ defmodule Domain.GatewaysTest do describe "gateway_can_connect_to_resource?/2" do test "returns true when gateway can connect to resource", %{account: account} do gateway = Fixtures.Gateways.create_gateway(account: account) - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Gateways.Presence.connect(gateway) resource = Fixtures.Resources.create_resource( @@ -724,7 +724,7 @@ defmodule Domain.GatewaysTest do test "returns false when gateway cannot connect to resource", %{account: account} do gateway = Fixtures.Gateways.create_gateway(account: account) - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Gateways.Presence.connect(gateway) resource = Fixtures.Resources.create_resource(account: account) @@ -1206,15 +1206,15 @@ defmodule Domain.GatewaysTest do test "does not allow duplicate presence", %{account: account} do gateway = Fixtures.Gateways.create_gateway(account: account) - assert Events.Hooks.Gateways.connect(gateway) == :ok + assert Gateways.Presence.connect(gateway) == :ok assert {:error, {:already_tracked, _pid, _topic, _key}} = - Events.Hooks.Gateways.connect(gateway) + Gateways.Presence.connect(gateway) end test "tracks gateway presence for account", %{account: account} do gateway = Fixtures.Gateways.create_gateway(account: account) - assert Events.Hooks.Gateways.connect(gateway) == :ok + assert Gateways.Presence.connect(gateway) == :ok gateway = fetch_gateway_by_id!(gateway.id, preload: [:online?]) assert gateway.online? == true diff --git a/elixir/apps/domain/test/domain/notifications/jobs/outdated_gateways_test.exs b/elixir/apps/domain/test/domain/notifications/jobs/outdated_gateways_test.exs index 0bea5610c..8428bb37c 100644 --- a/elixir/apps/domain/test/domain/notifications/jobs/outdated_gateways_test.exs +++ b/elixir/apps/domain/test/domain/notifications/jobs/outdated_gateways_test.exs @@ -1,8 +1,7 @@ defmodule Domain.Notifications.Jobs.OutdatedGatewaysTest do use Domain.DataCase, async: true import Domain.Notifications.Jobs.OutdatedGateways - alias Domain.ComponentVersions - alias Domain.Events + alias Domain.{ComponentVersions, Gateways, PubSub} describe "execute/1" do setup do @@ -42,8 +41,10 @@ defmodule Domain.Notifications.Jobs.OutdatedGatewaysTest do new_config = update_component_versions_config(gateway: new_version) Domain.Config.put_env_override(ComponentVersions, new_config) - :ok = Events.Hooks.GatewayGroups.subscribe_to_presence(gateway_group.id) - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Gateways.Presence.Group.subscribe(gateway_group.id) + {:ok, _} = Gateways.Presence.Group.track(gateway.group_id, gateway.id) + {:ok, _} = Gateways.Presence.Account.track(gateway.account_id, gateway.id) + :ok = PubSub.Gateway.subscribe(gateway.id) assert_receive %Phoenix.Socket.Broadcast{topic: "presences:group_gateways:" <> _} assert execute(%{}) == :ok @@ -66,8 +67,10 @@ defmodule Domain.Notifications.Jobs.OutdatedGatewaysTest do new_config = update_component_versions_config(gateway: version) Domain.Config.put_env_override(ComponentVersions, new_config) - :ok = Events.Hooks.GatewayGroups.subscribe_to_presence(gateway_group.id) - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Gateways.Presence.Group.subscribe(gateway_group.id) + {:ok, _} = Gateways.Presence.Group.track(gateway.group_id, gateway.id) + {:ok, _} = Gateways.Presence.Account.track(gateway.account_id, gateway.id) + :ok = PubSub.Gateway.subscribe(gateway.id) assert_receive %Phoenix.Socket.Broadcast{topic: "presences:group_gateways:" <> _} assert execute(%{}) == :ok diff --git a/elixir/apps/web/lib/web/live/actors/show.ex b/elixir/apps/web/lib/web/live/actors/show.ex index c882eba19..80c3f1a01 100644 --- a/elixir/apps/web/lib/web/live/actors/show.ex +++ b/elixir/apps/web/lib/web/live/actors/show.ex @@ -2,7 +2,7 @@ defmodule Web.Actors.Show do use Web, :live_view import Web.Actors.Components import Web.Clients.Components - alias Domain.{Accounts, Auth, Events, Tokens, Flows, Clients} + alias Domain.{Accounts, Auth, Tokens, Flows, Clients} alias Domain.Actors def mount(%{"id" => id}, _session, socket) do @@ -11,7 +11,7 @@ defmodule Web.Actors.Show do preload: [:identities, :last_seen_at, groups: [:provider]] ) do if connected?(socket) do - :ok = Events.Hooks.Actors.subscribe_to_clients_presence(actor.id) + :ok = Clients.Presence.Actor.subscribe(actor.id) end available_providers = diff --git a/elixir/apps/web/lib/web/live/clients/index.ex b/elixir/apps/web/lib/web/live/clients/index.ex index db4a2e241..68014737b 100644 --- a/elixir/apps/web/lib/web/live/clients/index.ex +++ b/elixir/apps/web/lib/web/live/clients/index.ex @@ -3,11 +3,10 @@ defmodule Web.Clients.Index do import Web.Actors.Components import Web.Clients.Components alias Domain.Clients - alias Domain.Events def mount(_params, _session, socket) do if connected?(socket) do - :ok = Events.Hooks.Accounts.subscribe_to_clients_presence(socket.assigns.subject.account.id) + :ok = Clients.Presence.Account.subscribe(socket.assigns.subject.account.id) end socket = diff --git a/elixir/apps/web/lib/web/live/clients/show.ex b/elixir/apps/web/lib/web/live/clients/show.ex index 2dedd96ba..2dffd85d2 100644 --- a/elixir/apps/web/lib/web/live/clients/show.ex +++ b/elixir/apps/web/lib/web/live/clients/show.ex @@ -2,7 +2,7 @@ defmodule Web.Clients.Show do use Web, :live_view import Web.Policies.Components import Web.Clients.Components - alias Domain.{Accounts, Clients, Events, Flows} + alias Domain.{Accounts, Clients, Flows} def mount(%{"id" => id}, _session, socket) do with {:ok, client} <- @@ -14,7 +14,7 @@ defmodule Web.Clients.Show do ] ) do if connected?(socket) do - :ok = Events.Hooks.Actors.subscribe_to_clients_presence(client.actor_id) + :ok = Clients.Presence.Actor.subscribe(client.actor_id) end socket = diff --git a/elixir/apps/web/lib/web/live/gateways/show.ex b/elixir/apps/web/lib/web/live/gateways/show.ex index 3a608817e..9e63f1756 100644 --- a/elixir/apps/web/lib/web/live/gateways/show.ex +++ b/elixir/apps/web/lib/web/live/gateways/show.ex @@ -1,13 +1,12 @@ defmodule Web.Gateways.Show do use Web, :live_view alias Domain.Gateways - alias Domain.Events def mount(%{"id" => id}, _session, socket) do with {:ok, gateway} <- Gateways.fetch_gateway_by_id(id, socket.assigns.subject, preload: [:group, :online?]) do if connected?(socket) do - :ok = Events.Hooks.GatewayGroups.subscribe_to_presence(gateway.group_id) + :ok = Gateways.Presence.Group.subscribe(gateway.group_id) end socket = diff --git a/elixir/apps/web/lib/web/live/policies/index.ex b/elixir/apps/web/lib/web/live/policies/index.ex index 1ea3a2909..cc82c8e47 100644 --- a/elixir/apps/web/lib/web/live/policies/index.ex +++ b/elixir/apps/web/lib/web/live/policies/index.ex @@ -1,10 +1,10 @@ defmodule Web.Policies.Index do use Web, :live_view - alias Domain.{Events, Policies} + alias Domain.{PubSub, Policies} def mount(_params, _session, socket) do if connected?(socket) do - :ok = Events.Hooks.Accounts.subscribe_to_policies(socket.assigns.account.id) + :ok = PubSub.Account.Policies.subscribe(socket.assigns.account.id) end socket = diff --git a/elixir/apps/web/lib/web/live/policies/show.ex b/elixir/apps/web/lib/web/live/policies/show.ex index 4a05c16bd..adec338c5 100644 --- a/elixir/apps/web/lib/web/live/policies/show.ex +++ b/elixir/apps/web/lib/web/live/policies/show.ex @@ -1,7 +1,7 @@ defmodule Web.Policies.Show do use Web, :live_view import Web.Policies.Components - alias Domain.{Accounts, Policies, Events, Flows, Auth} + alias Domain.{Accounts, Policies, PubSub, Flows, Auth} def mount(%{"id" => id}, _session, socket) do with {:ok, policy} <- @@ -16,7 +16,7 @@ defmodule Web.Policies.Show do providers = Auth.all_active_providers_for_account!(socket.assigns.account) if connected?(socket) do - :ok = Events.Hooks.Policies.subscribe(policy.id) + :ok = PubSub.Policy.subscribe(policy.id) end socket = diff --git a/elixir/apps/web/lib/web/live/resources/index.ex b/elixir/apps/web/lib/web/live/resources/index.ex index 918af7260..50859c609 100644 --- a/elixir/apps/web/lib/web/live/resources/index.ex +++ b/elixir/apps/web/lib/web/live/resources/index.ex @@ -1,11 +1,11 @@ defmodule Web.Resources.Index do use Web, :live_view - alias Domain.Events + alias Domain.PubSub alias Domain.Resources def mount(_params, _session, socket) do if connected?(socket) do - :ok = Events.Hooks.Accounts.subscribe_to_resources(socket.assigns.account.id) + :ok = PubSub.Account.Resources.subscribe(socket.assigns.account.id) end socket = diff --git a/elixir/apps/web/lib/web/live/resources/show.ex b/elixir/apps/web/lib/web/live/resources/show.ex index 1a35b90a0..d4c206a51 100644 --- a/elixir/apps/web/lib/web/live/resources/show.ex +++ b/elixir/apps/web/lib/web/live/resources/show.ex @@ -2,14 +2,14 @@ defmodule Web.Resources.Show do use Web, :live_view import Web.Policies.Components import Web.Resources.Components - alias Domain.{Accounts, Events, Resources, Policies, Flows} + alias Domain.{Accounts, PubSub, Resources, Policies, Flows} def mount(%{"id" => id} = params, _session, socket) do with {:ok, resource} <- fetch_resource(id, socket.assigns.subject), {:ok, actor_groups_peek} <- Resources.peek_resource_actor_groups([resource], 3, socket.assigns.subject) do if connected?(socket) do - :ok = Events.Hooks.Resources.subscribe(resource.id) + :ok = PubSub.Resource.subscribe(resource.id) end socket = diff --git a/elixir/apps/web/lib/web/live/sites/gateways/index.ex b/elixir/apps/web/lib/web/live/sites/gateways/index.ex index 8990216fc..7be37a04a 100644 --- a/elixir/apps/web/lib/web/live/sites/gateways/index.ex +++ b/elixir/apps/web/lib/web/live/sites/gateways/index.ex @@ -1,12 +1,11 @@ defmodule Web.Sites.Gateways.Index do use Web, :live_view alias Domain.Gateways - alias Domain.Events def mount(%{"id" => id}, _session, socket) do with {:ok, group} <- Gateways.fetch_group_by_id(id, socket.assigns.subject) do if connected?(socket) do - :ok = Events.Hooks.GatewayGroups.subscribe_to_presence(group.id) + :ok = Gateways.Presence.Group.subscribe(group.id) end socket = diff --git a/elixir/apps/web/lib/web/live/sites/index.ex b/elixir/apps/web/lib/web/live/sites/index.ex index aa1e00a7c..8acf698a6 100644 --- a/elixir/apps/web/lib/web/live/sites/index.ex +++ b/elixir/apps/web/lib/web/live/sites/index.ex @@ -1,12 +1,11 @@ defmodule Web.Sites.Index do use Web, :live_view alias Domain.Gateways - alias Domain.Events require Logger def mount(_params, _session, socket) do if connected?(socket) do - :ok = Events.Hooks.Accounts.subscribe_to_gateways_presence(socket.assigns.account.id) + :ok = Gateways.Presence.Account.subscribe(socket.assigns.account.id) end {:ok, managed_groups, _metadata} = diff --git a/elixir/apps/web/lib/web/live/sites/new_token.ex b/elixir/apps/web/lib/web/live/sites/new_token.ex index 6fd3c83b2..56252dc2d 100644 --- a/elixir/apps/web/lib/web/live/sites/new_token.ex +++ b/elixir/apps/web/lib/web/live/sites/new_token.ex @@ -1,7 +1,6 @@ defmodule Web.Sites.NewToken do use Web, :live_view alias Domain.Gateways - alias Domain.Events def mount(%{"id" => id}, _session, socket) do with {:ok, group} <- @@ -13,7 +12,7 @@ defmodule Web.Sites.NewToken do {group, token, env} = if connected?(socket) do {:ok, token, encoded_token} = Gateways.create_token(group, %{}, socket.assigns.subject) - :ok = Events.Hooks.GatewayGroups.subscribe_to_presence(group.id) + :ok = Gateways.Presence.Group.subscribe(group.id) {group, token, env(encoded_token)} else {group, nil, nil} diff --git a/elixir/apps/web/lib/web/live/sites/show.ex b/elixir/apps/web/lib/web/live/sites/show.ex index e10821167..6fdaf0d9b 100644 --- a/elixir/apps/web/lib/web/live/sites/show.ex +++ b/elixir/apps/web/lib/web/live/sites/show.ex @@ -1,12 +1,12 @@ defmodule Web.Sites.Show do use Web, :live_view - alias Domain.{Gateways, Resources, Policies, Events, Flows, Tokens} + alias Domain.{Gateways, Resources, Policies, Flows, Tokens} def mount(%{"id" => id}, _session, socket) do with {:ok, group} <- Gateways.fetch_group_by_id(id, socket.assigns.subject) do if connected?(socket) do - :ok = Events.Hooks.GatewayGroups.subscribe_to_presence(group.id) + :ok = Gateways.Presence.Group.subscribe(group.id) end socket = diff --git a/elixir/apps/web/test/web/live/actors/index_test.exs b/elixir/apps/web/test/web/live/actors/index_test.exs index 2c09d1d8d..1c1a4155c 100644 --- a/elixir/apps/web/test/web/live/actors/index_test.exs +++ b/elixir/apps/web/test/web/live/actors/index_test.exs @@ -1,5 +1,6 @@ defmodule Web.Live.Actors.IndexTest do use Web.ConnCase, async: true + alias Domain.Clients setup do account = Fixtures.Accounts.create_account() @@ -60,7 +61,7 @@ defmodule Web.Live.Actors.IndexTest do admin_actor = Fixtures.Actors.create_actor(account: account, type: :account_admin_user) Fixtures.Actors.create_membership(account: account, actor: admin_actor) client = Fixtures.Clients.create_client(account: account, actor: admin_actor) - Domain.Events.Hooks.Clients.connect(client) + :ok = Clients.Presence.connect(client) admin_actor = Repo.preload(admin_actor, identities: [:provider], groups: []) user_actor = Fixtures.Actors.create_actor(account: account, type: :account_user) diff --git a/elixir/apps/web/test/web/live/actors/show_test.exs b/elixir/apps/web/test/web/live/actors/show_test.exs index 84ff34a4a..525be8862 100644 --- a/elixir/apps/web/test/web/live/actors/show_test.exs +++ b/elixir/apps/web/test/web/live/actors/show_test.exs @@ -1,6 +1,5 @@ defmodule Web.Live.Actors.ShowTest do use Web.ConnCase, async: true - alias Domain.Events test "redirects to sign in page for unauthorized user", %{conn: conn} do account = Fixtures.Accounts.create_account() @@ -92,8 +91,8 @@ defmodule Web.Live.Actors.ShowTest do |> live(~p"/#{account}/actors/#{actor}") Domain.Config.put_env_override(:test_pid, self()) - :ok = Events.Hooks.Actors.subscribe_to_clients_presence(actor.id) - assert Events.Hooks.Clients.connect(client) == :ok + :ok = Domain.Clients.Presence.Actor.subscribe(actor.id) + assert Domain.Clients.Presence.connect(client) == :ok assert_receive %Phoenix.Socket.Broadcast{topic: "presences:actor_clients:" <> _} assert_receive {:live_table_reloaded, "clients"}, 500 diff --git a/elixir/apps/web/test/web/live/clients/index_test.exs b/elixir/apps/web/test/web/live/clients/index_test.exs index 8c5f3766c..6692c08ce 100644 --- a/elixir/apps/web/test/web/live/clients/index_test.exs +++ b/elixir/apps/web/test/web/live/clients/index_test.exs @@ -1,6 +1,5 @@ defmodule Web.Live.Clients.IndexTest do use Web.ConnCase, async: true - alias Domain.Events setup do account = Fixtures.Accounts.create_account() @@ -60,7 +59,7 @@ defmodule Web.Live.Clients.IndexTest do online_client = Fixtures.Clients.create_client(account: account) offline_client = Fixtures.Clients.create_client(account: account) - :ok = Events.Hooks.Clients.connect(online_client) + :ok = Domain.Clients.Presence.connect(online_client) {:ok, lv, _html} = conn @@ -103,8 +102,8 @@ defmodule Web.Live.Clients.IndexTest do |> live(~p"/#{account}/clients") Domain.Config.put_env_override(:test_pid, self()) - :ok = Events.Hooks.Actors.subscribe_to_clients_presence(client.actor_id) - assert Events.Hooks.Clients.connect(client) == :ok + :ok = Domain.Clients.Presence.Actor.subscribe(client.actor_id) + assert Domain.Clients.Presence.connect(client) == :ok assert_receive %Phoenix.Socket.Broadcast{topic: "presences:actor_clients:" <> _} assert_receive {:live_table_reloaded, "clients"}, 250 diff --git a/elixir/apps/web/test/web/live/clients/show_test.exs b/elixir/apps/web/test/web/live/clients/show_test.exs index 61253ddf9..08583b797 100644 --- a/elixir/apps/web/test/web/live/clients/show_test.exs +++ b/elixir/apps/web/test/web/live/clients/show_test.exs @@ -1,6 +1,5 @@ defmodule Web.Live.Clients.ShowTest do use Web.ConnCase, async: true - alias Domain.Events setup do account = Fixtures.Accounts.create_account() @@ -117,7 +116,7 @@ defmodule Web.Live.Clients.ShowTest do identity: identity, conn: conn } do - :ok = Events.Hooks.Clients.connect(client) + :ok = Domain.Clients.Presence.connect(client) {:ok, lv, _html} = conn @@ -145,8 +144,8 @@ defmodule Web.Live.Clients.ShowTest do |> authorize_conn(identity) |> live(~p"/#{account}/clients/#{client}") - Events.Hooks.Actors.subscribe_to_clients_presence(actor.id) - assert Events.Hooks.Clients.connect(client) == :ok + :ok = Domain.Clients.Presence.Actor.subscribe(actor.id) + assert Domain.Clients.Presence.connect(client) == :ok assert_receive %Phoenix.Socket.Broadcast{topic: "presences:actor_clients:" <> _} wait_for(fn -> diff --git a/elixir/apps/web/test/web/live/gateways/show_test.exs b/elixir/apps/web/test/web/live/gateways/show_test.exs index 6c09a8801..a7550a33e 100644 --- a/elixir/apps/web/test/web/live/gateways/show_test.exs +++ b/elixir/apps/web/test/web/live/gateways/show_test.exs @@ -1,6 +1,5 @@ defmodule Web.Live.Gateways.ShowTest do use Web.ConnCase, async: true - alias Domain.Events setup do account = Fixtures.Accounts.create_account() @@ -103,7 +102,7 @@ defmodule Web.Live.Gateways.ShowTest do identity: identity, conn: conn } do - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) {:ok, lv, _html} = conn @@ -150,8 +149,8 @@ defmodule Web.Live.Gateways.ShowTest do |> authorize_conn(identity) |> live(~p"/#{account}/gateways/#{gateway}") - :ok = Events.Hooks.GatewayGroups.subscribe_to_presence(gateway.group.id) - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.Group.subscribe(gateway.group.id) + :ok = Domain.Gateways.Presence.connect(gateway) assert_receive %{topic: "presences:group_gateways:" <> _} table = diff --git a/elixir/apps/web/test/web/live/resources/edit_test.exs b/elixir/apps/web/test/web/live/resources/edit_test.exs index e34d85349..76740a5f9 100644 --- a/elixir/apps/web/test/web/live/resources/edit_test.exs +++ b/elixir/apps/web/test/web/live/resources/edit_test.exs @@ -1,6 +1,6 @@ defmodule Web.Live.Resources.EditTest do use Web.ConnCase, async: true - alias Domain.Events + alias Domain.{Events, PubSub} setup do account = Fixtures.Accounts.create_account() @@ -229,7 +229,7 @@ defmodule Web.Live.Resources.EditTest do } } - :ok = Events.Hooks.Accounts.subscribe_to_resources(account.id) + :ok = PubSub.Account.Resources.subscribe(account.id) {:ok, lv, _html} = conn diff --git a/elixir/apps/web/test/web/live/sites/gateways/index_test.exs b/elixir/apps/web/test/web/live/sites/gateways/index_test.exs index e9a3a6262..55f9a36ac 100644 --- a/elixir/apps/web/test/web/live/sites/gateways/index_test.exs +++ b/elixir/apps/web/test/web/live/sites/gateways/index_test.exs @@ -1,6 +1,5 @@ defmodule Web.Live.Sites.Gateways.IndexTest do use Web.ConnCase, async: true - alias Domain.Events setup do account = Fixtures.Accounts.create_account() @@ -96,8 +95,8 @@ defmodule Web.Live.Sites.Gateways.IndexTest do |> authorize_conn(identity) |> live(~p"/#{account}/sites/#{group}/gateways") - :ok = Events.Hooks.GatewayGroups.subscribe_to_presence(group.id) - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.Group.subscribe(group.id) + :ok = Domain.Gateways.Presence.connect(gateway) assert_receive %Phoenix.Socket.Broadcast{topic: "presences:group_gateways:" <> _} wait_for(fn -> diff --git a/elixir/apps/web/test/web/live/sites/index_test.exs b/elixir/apps/web/test/web/live/sites/index_test.exs index ef0c7c636..0cad1cd00 100644 --- a/elixir/apps/web/test/web/live/sites/index_test.exs +++ b/elixir/apps/web/test/web/live/sites/index_test.exs @@ -1,6 +1,5 @@ defmodule Web.Live.Sites.IndexTest do use Web.ConnCase, async: true - alias Domain.Events setup do account = Fixtures.Accounts.create_account() @@ -98,8 +97,8 @@ defmodule Web.Live.Sites.IndexTest do |> live(~p"/#{account}/sites") Domain.Config.put_env_override(:test_pid, self()) - :ok = Events.Hooks.Accounts.subscribe_to_gateways_presence(account.id) - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.Account.subscribe(account.id) + :ok = Domain.Gateways.Presence.connect(gateway) assert_receive %Phoenix.Socket.Broadcast{topic: "presences:account_gateways:" <> _} assert_receive {:live_table_reloaded, "groups"}, 250 @@ -143,7 +142,7 @@ defmodule Web.Live.Sites.IndexTest do group = Fixtures.Gateways.create_internet_group(account: account) gateway = Fixtures.Gateways.create_gateway(account: account, group: group) Domain.Config.put_env_override(:test_pid, self()) - :ok = Events.Hooks.Accounts.subscribe_to_gateways_presence(account.id) + :ok = Domain.Gateways.Presence.Account.subscribe(account.id) {:ok, lv, _html} = conn @@ -156,7 +155,7 @@ defmodule Web.Live.Sites.IndexTest do assert has_element?(lv, "#internet-site-banner a[href='/#{account.slug}/sites/#{group.id}']") - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) assert_receive %Phoenix.Socket.Broadcast{topic: "presences:account_gateways:" <> _} assert_receive {:live_table_reloaded, "groups"}, 250 assert lv |> element("#internet-site-banner") |> render() =~ "Online" diff --git a/elixir/apps/web/test/web/live/sites/new_token_test.exs b/elixir/apps/web/test/web/live/sites/new_token_test.exs index cf087e69a..fb92d6448 100644 --- a/elixir/apps/web/test/web/live/sites/new_token_test.exs +++ b/elixir/apps/web/test/web/live/sites/new_token_test.exs @@ -1,6 +1,5 @@ defmodule Web.Live.Sites.NewTokenTest do use Web.ConnCase, async: true - alias Domain.Events setup do account = Fixtures.Accounts.create_account() @@ -37,11 +36,11 @@ defmodule Web.Live.Sites.NewTokenTest do |> List.last() |> String.trim(""") - :ok = Events.Hooks.GatewayGroups.subscribe_to_presence(group.id) + :ok = Domain.Gateways.Presence.Group.subscribe(group.id) context = Fixtures.Auth.build_context(type: :gateway_group) assert {:ok, group, token} = Domain.Gateways.authenticate(token, context) gateway = Fixtures.Gateways.create_gateway(account: account, group: group, token: token) - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) assert_receive %Phoenix.Socket.Broadcast{topic: "presences:group_gateways:" <> _group_id} diff --git a/elixir/apps/web/test/web/live/sites/show_test.exs b/elixir/apps/web/test/web/live/sites/show_test.exs index e55fca907..94da34451 100644 --- a/elixir/apps/web/test/web/live/sites/show_test.exs +++ b/elixir/apps/web/test/web/live/sites/show_test.exs @@ -1,6 +1,5 @@ defmodule Web.Live.Sites.ShowTest do use Web.ConnCase, async: true - alias Domain.Events setup do account = Fixtures.Accounts.create_account() @@ -143,7 +142,7 @@ defmodule Web.Live.Sites.ShowTest do gateway: gateway, conn: conn } do - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) Fixtures.Gateways.create_gateway(account: account, group: group) {:ok, lv, _html} = @@ -180,8 +179,8 @@ defmodule Web.Live.Sites.ShowTest do |> authorize_conn(identity) |> live(~p"/#{account}/sites/#{group}") - :ok = Events.Hooks.GatewayGroups.subscribe_to_presence(group.id) - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.Group.subscribe(group.id) + :ok = Domain.Gateways.Presence.connect(gateway) assert_receive %Phoenix.Socket.Broadcast{topic: "presences:group_gateways:" <> _} wait_for(fn -> @@ -378,7 +377,7 @@ defmodule Web.Live.Sites.ShowTest do gateway: gateway, conn: conn } do - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) Fixtures.Gateways.create_gateway(account: account, group: group) {:ok, lv, _html} = @@ -414,8 +413,8 @@ defmodule Web.Live.Sites.ShowTest do |> authorize_conn(identity) |> live(~p"/#{account}/sites/#{group}") - :ok = Events.Hooks.GatewayGroups.subscribe_to_presence(group.id) - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.Group.subscribe(group.id) + :ok = Domain.Gateways.Presence.connect(gateway) assert_receive %Phoenix.Socket.Broadcast{topic: "presences:group_gateways:" <> _} wait_for(fn -> @@ -601,7 +600,7 @@ defmodule Web.Live.Sites.ShowTest do gateway: gateway, conn: conn } do - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.connect(gateway) Fixtures.Gateways.create_gateway(account: account, group: group) {:ok, lv, _html} = @@ -637,8 +636,8 @@ defmodule Web.Live.Sites.ShowTest do |> authorize_conn(identity) |> live(~p"/#{account}/sites/#{group}") - :ok = Events.Hooks.GatewayGroups.subscribe_to_presence(group.id) - :ok = Events.Hooks.Gateways.connect(gateway) + :ok = Domain.Gateways.Presence.Group.subscribe(group.id) + :ok = Domain.Gateways.Presence.connect(gateway) assert_receive %Phoenix.Socket.Broadcast{topic: "presences:group_gateways:" <> _} wait_for(fn ->