Explicitly subscribe to id channels

Looks like for some reason the id/1 callback doesn't subscribe the channel process any more (only the socket itself), so we are doing that explicitly now.
This commit is contained in:
Andrew Dryga
2023-06-29 14:06:09 -06:00
parent 4bebccb6a0
commit 2a731ba25c
4 changed files with 42 additions and 12 deletions

View File

@@ -2,6 +2,7 @@ defmodule API.Device.Channel do
use API, :channel
alias API.Device.Views
alias Domain.{Devices, Resources, Gateways, Relays}
require Logger
@impl true
def join("device", _payload, socket) do
@@ -19,6 +20,8 @@ defmodule API.Device.Channel do
@impl true
def handle_info(:after_join, socket) do
API.Endpoint.subscribe("device:#{socket.assigns.device.id}")
{:ok, resources} = Domain.Resources.list_resources(socket.assigns.subject)
:ok =
@@ -110,18 +113,18 @@ defmodule API.Device.Channel do
Gateways.list_connected_gateways_for_resource(resource) do
gateway = Enum.random(gateways)
Phoenix.PubSub.broadcast(
Domain.PubSub,
API.Gateway.Socket.id(gateway),
{:request_connection, {self(), socket_ref(socket)},
%{
device_id: socket.assigns.device.id,
resource_id: resource_id,
authorization_expires_at: socket.assigns.subject.expires_at,
device_rtc_session_description: device_rtc_session_description,
device_preshared_key: preshared_key
}}
)
:ok =
API.Gateway.Channel.broadcast(
gateway,
{:request_connection, {self(), socket_ref(socket)},
%{
device_id: socket.assigns.device.id,
resource_id: resource_id,
authorization_expires_at: socket.assigns.subject.expires_at,
device_rtc_session_description: device_rtc_session_description,
device_preshared_key: preshared_key
}}
)
{:noreply, socket}
else

View File

@@ -2,6 +2,12 @@ defmodule API.Gateway.Channel do
use API, :channel
alias API.Gateway.Views
alias Domain.{Devices, Resources, Relays, Gateways}
require Logger
def broadcast(%Gateways.Gateway{} = gateway, payload) do
Logger.debug("Gateway message is being dispatched", gateway_id: gateway.id)
Phoenix.PubSub.broadcast(Domain.PubSub, "gateway:#{gateway.id}", payload)
end
@impl true
def join("gateway", _payload, socket) do
@@ -12,6 +18,8 @@ defmodule API.Gateway.Channel do
@impl true
def handle_info(:after_join, socket) do
API.Endpoint.subscribe("gateway:#{socket.assigns.gateway.id}")
push(socket, "init", %{
interface: Views.Interface.render(socket.assigns.gateway),
# TODO: move to settings
@@ -33,6 +41,11 @@ defmodule API.Gateway.Channel do
device_preshared_key: preshared_key
} = attrs
Logger.debug("Gateway received connection request message",
device_id: device_id,
resource_id: resource_id
)
device = Devices.fetch_device_by_id!(device_id, preload: [:actor])
resource = Resources.fetch_resource_by_id!(resource_id)
{:ok, relays} = Relays.list_connected_relays_for_resource(resource)
@@ -48,6 +61,12 @@ defmodule API.Gateway.Channel do
expires_at: DateTime.to_unix(authorization_expires_at, :second)
})
Logger.debug("Awaiting gateway connection_ready message",
device_id: device_id,
resource_id: resource_id,
ref: ref
)
refs = Map.put(socket.assigns.refs, ref, {channel_pid, socket_ref, resource_id})
socket = assign(socket, :refs, refs)
@@ -72,6 +91,12 @@ defmodule API.Gateway.Channel do
rtc_session_description}
)
Logger.debug("Gateway replied to the Device with :connect message",
resource_id: resource_id,
channel_pid: inspect(channel_pid),
ref: ref
)
{:reply, :ok, socket}
end

View File

@@ -10,6 +10,7 @@ defmodule API.Relay.Channel do
@impl true
def handle_info({:after_join, stamp_secret}, socket) do
API.Endpoint.subscribe("relay:#{socket.assigns.relay.id}")
push(socket, "init", %{})
:ok = Relays.connect_relay(socket.assigns.relay, stamp_secret)
{:noreply, socket}

View File

@@ -158,6 +158,7 @@ defmodule Domain.Devices do
end
def connect_device(%Device{} = device) do
# TODO: use new Phoenix.Tracker instead
Phoenix.PubSub.subscribe(Domain.PubSub, "actor:#{device.actor_id}")
{:ok, _} =