From 2a731ba25c1e1ff6661351e2048780b1d69dbe98 Mon Sep 17 00:00:00 2001 From: Andrew Dryga Date: Thu, 29 Jun 2023 14:06:09 -0600 Subject: [PATCH] Explicitly subscribe to id channels Looks like for some reason the id/1 callback doesn't subscribe the channel process any more (only the socket itself), so we are doing that explicitly now. --- elixir/apps/api/lib/api/device/channel.ex | 27 ++++++++++++---------- elixir/apps/api/lib/api/gateway/channel.ex | 25 ++++++++++++++++++++ elixir/apps/api/lib/api/relay/channel.ex | 1 + elixir/apps/domain/lib/domain/devices.ex | 1 + 4 files changed, 42 insertions(+), 12 deletions(-) diff --git a/elixir/apps/api/lib/api/device/channel.ex b/elixir/apps/api/lib/api/device/channel.ex index 9c6164fec..dd1510699 100644 --- a/elixir/apps/api/lib/api/device/channel.ex +++ b/elixir/apps/api/lib/api/device/channel.ex @@ -2,6 +2,7 @@ defmodule API.Device.Channel do use API, :channel alias API.Device.Views alias Domain.{Devices, Resources, Gateways, Relays} + require Logger @impl true def join("device", _payload, socket) do @@ -19,6 +20,8 @@ defmodule API.Device.Channel do @impl true def handle_info(:after_join, socket) do + API.Endpoint.subscribe("device:#{socket.assigns.device.id}") + {:ok, resources} = Domain.Resources.list_resources(socket.assigns.subject) :ok = @@ -110,18 +113,18 @@ defmodule API.Device.Channel do Gateways.list_connected_gateways_for_resource(resource) do gateway = Enum.random(gateways) - Phoenix.PubSub.broadcast( - Domain.PubSub, - API.Gateway.Socket.id(gateway), - {:request_connection, {self(), socket_ref(socket)}, - %{ - device_id: socket.assigns.device.id, - resource_id: resource_id, - authorization_expires_at: socket.assigns.subject.expires_at, - device_rtc_session_description: device_rtc_session_description, - device_preshared_key: preshared_key - }} - ) + :ok = + API.Gateway.Channel.broadcast( + gateway, + {:request_connection, {self(), socket_ref(socket)}, + %{ + device_id: socket.assigns.device.id, + resource_id: resource_id, + authorization_expires_at: socket.assigns.subject.expires_at, + device_rtc_session_description: device_rtc_session_description, + device_preshared_key: preshared_key + }} + ) {:noreply, socket} else diff --git a/elixir/apps/api/lib/api/gateway/channel.ex b/elixir/apps/api/lib/api/gateway/channel.ex index 88c511543..da86bb672 100644 --- a/elixir/apps/api/lib/api/gateway/channel.ex +++ b/elixir/apps/api/lib/api/gateway/channel.ex @@ -2,6 +2,12 @@ defmodule API.Gateway.Channel do use API, :channel alias API.Gateway.Views alias Domain.{Devices, Resources, Relays, Gateways} + require Logger + + def broadcast(%Gateways.Gateway{} = gateway, payload) do + Logger.debug("Gateway message is being dispatched", gateway_id: gateway.id) + Phoenix.PubSub.broadcast(Domain.PubSub, "gateway:#{gateway.id}", payload) + end @impl true def join("gateway", _payload, socket) do @@ -12,6 +18,8 @@ defmodule API.Gateway.Channel do @impl true def handle_info(:after_join, socket) do + API.Endpoint.subscribe("gateway:#{socket.assigns.gateway.id}") + push(socket, "init", %{ interface: Views.Interface.render(socket.assigns.gateway), # TODO: move to settings @@ -33,6 +41,11 @@ defmodule API.Gateway.Channel do device_preshared_key: preshared_key } = attrs + Logger.debug("Gateway received connection request message", + device_id: device_id, + resource_id: resource_id + ) + device = Devices.fetch_device_by_id!(device_id, preload: [:actor]) resource = Resources.fetch_resource_by_id!(resource_id) {:ok, relays} = Relays.list_connected_relays_for_resource(resource) @@ -48,6 +61,12 @@ defmodule API.Gateway.Channel do expires_at: DateTime.to_unix(authorization_expires_at, :second) }) + Logger.debug("Awaiting gateway connection_ready message", + device_id: device_id, + resource_id: resource_id, + ref: ref + ) + refs = Map.put(socket.assigns.refs, ref, {channel_pid, socket_ref, resource_id}) socket = assign(socket, :refs, refs) @@ -72,6 +91,12 @@ defmodule API.Gateway.Channel do rtc_session_description} ) + Logger.debug("Gateway replied to the Device with :connect message", + resource_id: resource_id, + channel_pid: inspect(channel_pid), + ref: ref + ) + {:reply, :ok, socket} end diff --git a/elixir/apps/api/lib/api/relay/channel.ex b/elixir/apps/api/lib/api/relay/channel.ex index 7eafade26..237236c85 100644 --- a/elixir/apps/api/lib/api/relay/channel.ex +++ b/elixir/apps/api/lib/api/relay/channel.ex @@ -10,6 +10,7 @@ defmodule API.Relay.Channel do @impl true def handle_info({:after_join, stamp_secret}, socket) do + API.Endpoint.subscribe("relay:#{socket.assigns.relay.id}") push(socket, "init", %{}) :ok = Relays.connect_relay(socket.assigns.relay, stamp_secret) {:noreply, socket} diff --git a/elixir/apps/domain/lib/domain/devices.ex b/elixir/apps/domain/lib/domain/devices.ex index 0b03a9123..2dd121ee0 100644 --- a/elixir/apps/domain/lib/domain/devices.ex +++ b/elixir/apps/domain/lib/domain/devices.ex @@ -158,6 +158,7 @@ defmodule Domain.Devices do end def connect_device(%Device{} = device) do + # TODO: use new Phoenix.Tracker instead Phoenix.PubSub.subscribe(Domain.PubSub, "actor:#{device.actor_id}") {:ok, _} =