From e9ad85e939c477b70f3418b754017dc44fd8cc19 Mon Sep 17 00:00:00 2001 From: Andrew Dryga Date: Wed, 27 Mar 2024 14:08:42 -0600 Subject: [PATCH] chore(portal): Encode client reply pid and socket ref instead of storing it (#4349) --- elixir/apps/api/lib/api/gateway/channel.ex | 96 +++++++++------------- 1 file changed, 40 insertions(+), 56 deletions(-) diff --git a/elixir/apps/api/lib/api/gateway/channel.ex b/elixir/apps/api/lib/api/gateway/channel.ex index 0b8a31e98..18a9b85e6 100644 --- a/elixir/apps/api/lib/api/gateway/channel.ex +++ b/elixir/apps/api/lib/api/gateway/channel.ex @@ -18,8 +18,7 @@ defmodule API.Gateway.Channel do socket = assign(socket, opentelemetry_ctx: opentelemetry_ctx, - opentelemetry_span_ctx: opentelemetry_span_ctx, - refs: %{} + opentelemetry_span_ctx: opentelemetry_span_ctx ) {:ok, socket} @@ -106,7 +105,10 @@ defmodule API.Gateway.Channel do resource = Resources.fetch_resource_by_id!(resource_id) :ok = Resources.subscribe_to_events_for_resource(resource_id) - ref = Ecto.UUID.generate() + ref = + {channel_pid, socket_ref, resource_id, :otel_propagator_text_map.inject([])} + |> :erlang.term_to_binary() + |> Base.url_encode64() push(socket, "allow_access", %{ ref: ref, @@ -120,19 +122,9 @@ defmodule API.Gateway.Channel do Logger.debug("Awaiting gateway connection_ready message", client_id: client_id, resource_id: resource_id, - flow_id: flow_id, - ref: ref + flow_id: flow_id ) - refs = - Map.put( - socket.assigns.refs, - ref, - {channel_pid, socket_ref, resource_id, {opentelemetry_ctx, opentelemetry_span_ctx}} - ) - - socket = assign(socket, :refs, refs) - {:noreply, socket} end end @@ -202,9 +194,6 @@ defmodule API.Gateway.Channel do OpenTelemetry.Tracer.with_span "gateway.request_connection" do :ok = Flows.subscribe_to_flow_expiration_events(flow_id) - opentelemetry_ctx = OpenTelemetry.Ctx.get_current() - opentelemetry_span_ctx = OpenTelemetry.Tracer.current_span_ctx() - Logger.debug("Gateway received connection request message", client_id: client_id, resource_id: resource_id @@ -219,7 +208,10 @@ defmodule API.Gateway.Channel do {:ok, relays} = Relays.all_connected_relays_for_resource(resource, relay_hosting_type) - ref = Ecto.UUID.generate() + ref = + {channel_pid, socket_ref, resource_id, :otel_propagator_text_map.inject([])} + |> :erlang.term_to_binary() + |> Base.url_encode64() push(socket, "request_connection", %{ ref: ref, @@ -234,19 +226,9 @@ defmodule API.Gateway.Channel do Logger.debug("Awaiting gateway connection_ready message", client_id: client_id, resource_id: resource_id, - flow_id: flow_id, - ref: ref + flow_id: flow_id ) - refs = - Map.put( - socket.assigns.refs, - ref, - {channel_pid, socket_ref, resource_id, {opentelemetry_ctx, opentelemetry_span_ctx}} - ) - - socket = assign(socket, :refs, refs) - {:noreply, socket} end end @@ -260,38 +242,40 @@ defmodule API.Gateway.Channel do }, socket ) do - { - { - channel_pid, - socket_ref, - resource_id, - {opentelemetry_ctx, opentelemetry_span_ctx} - }, - refs - } = Map.pop(socket.assigns.refs, ref) - - OpenTelemetry.Ctx.attach(opentelemetry_ctx) - OpenTelemetry.Tracer.set_current_span(opentelemetry_span_ctx) - OpenTelemetry.Tracer.with_span "gateway.connection_ready" do - opentelemetry_ctx = OpenTelemetry.Ctx.get_current() - opentelemetry_span_ctx = OpenTelemetry.Tracer.current_span_ctx() + ref + |> Base.url_decode64!() + |> Plug.Crypto.non_executable_binary_to_term([:safe]) + |> case do + {channel_pid, socket_ref, resource_id, opentelemetry_headers} -> + :otel_propagator_text_map.extract(opentelemetry_headers) - socket = assign(socket, :refs, refs) + opentelemetry_ctx = OpenTelemetry.Ctx.get_current() + opentelemetry_span_ctx = OpenTelemetry.Tracer.current_span_ctx() - send( - channel_pid, - {:connect, socket_ref, resource_id, socket.assigns.gateway.public_key, payload, - {opentelemetry_ctx, opentelemetry_span_ctx}} - ) + send( + channel_pid, + {:connect, socket_ref, resource_id, socket.assigns.gateway.public_key, payload, + {opentelemetry_ctx, opentelemetry_span_ctx}} + ) - Logger.debug("Gateway replied to the Client with :connect message", - resource_id: resource_id, - channel_pid: inspect(channel_pid), - ref: ref - ) + Logger.debug("Gateway replied to the Client with :connect message", + resource_id: resource_id, + channel_pid: inspect(channel_pid) + ) - {:reply, :ok, socket} + {:reply, :ok, socket} + + other -> + OpenTelemetry.Tracer.set_status(:error, "invalid ref") + + Logger.error("Gateway replied with an invalid ref", + ref: inspect(ref), + decoded: inspect(other) + ) + + {:reply, {:error, %{reason: :invalid_ref}}, socket} + end end end