chore(portal): Encode client reply pid and socket ref instead of storing it (#4349)

This commit is contained in:
Andrew Dryga
2024-03-27 14:08:42 -06:00
committed by GitHub
parent ab35a5ea76
commit e9ad85e939

View File

@@ -18,8 +18,7 @@ defmodule API.Gateway.Channel do
socket =
assign(socket,
opentelemetry_ctx: opentelemetry_ctx,
opentelemetry_span_ctx: opentelemetry_span_ctx,
refs: %{}
opentelemetry_span_ctx: opentelemetry_span_ctx
)
{:ok, socket}
@@ -106,7 +105,10 @@ defmodule API.Gateway.Channel do
resource = Resources.fetch_resource_by_id!(resource_id)
:ok = Resources.subscribe_to_events_for_resource(resource_id)
ref = Ecto.UUID.generate()
ref =
{channel_pid, socket_ref, resource_id, :otel_propagator_text_map.inject([])}
|> :erlang.term_to_binary()
|> Base.url_encode64()
push(socket, "allow_access", %{
ref: ref,
@@ -120,19 +122,9 @@ defmodule API.Gateway.Channel do
Logger.debug("Awaiting gateway connection_ready message",
client_id: client_id,
resource_id: resource_id,
flow_id: flow_id,
ref: ref
flow_id: flow_id
)
refs =
Map.put(
socket.assigns.refs,
ref,
{channel_pid, socket_ref, resource_id, {opentelemetry_ctx, opentelemetry_span_ctx}}
)
socket = assign(socket, :refs, refs)
{:noreply, socket}
end
end
@@ -202,9 +194,6 @@ defmodule API.Gateway.Channel do
OpenTelemetry.Tracer.with_span "gateway.request_connection" do
:ok = Flows.subscribe_to_flow_expiration_events(flow_id)
opentelemetry_ctx = OpenTelemetry.Ctx.get_current()
opentelemetry_span_ctx = OpenTelemetry.Tracer.current_span_ctx()
Logger.debug("Gateway received connection request message",
client_id: client_id,
resource_id: resource_id
@@ -219,7 +208,10 @@ defmodule API.Gateway.Channel do
{:ok, relays} = Relays.all_connected_relays_for_resource(resource, relay_hosting_type)
ref = Ecto.UUID.generate()
ref =
{channel_pid, socket_ref, resource_id, :otel_propagator_text_map.inject([])}
|> :erlang.term_to_binary()
|> Base.url_encode64()
push(socket, "request_connection", %{
ref: ref,
@@ -234,19 +226,9 @@ defmodule API.Gateway.Channel do
Logger.debug("Awaiting gateway connection_ready message",
client_id: client_id,
resource_id: resource_id,
flow_id: flow_id,
ref: ref
flow_id: flow_id
)
refs =
Map.put(
socket.assigns.refs,
ref,
{channel_pid, socket_ref, resource_id, {opentelemetry_ctx, opentelemetry_span_ctx}}
)
socket = assign(socket, :refs, refs)
{:noreply, socket}
end
end
@@ -260,38 +242,40 @@ defmodule API.Gateway.Channel do
},
socket
) do
{
{
channel_pid,
socket_ref,
resource_id,
{opentelemetry_ctx, opentelemetry_span_ctx}
},
refs
} = Map.pop(socket.assigns.refs, ref)
OpenTelemetry.Ctx.attach(opentelemetry_ctx)
OpenTelemetry.Tracer.set_current_span(opentelemetry_span_ctx)
OpenTelemetry.Tracer.with_span "gateway.connection_ready" do
opentelemetry_ctx = OpenTelemetry.Ctx.get_current()
opentelemetry_span_ctx = OpenTelemetry.Tracer.current_span_ctx()
ref
|> Base.url_decode64!()
|> Plug.Crypto.non_executable_binary_to_term([:safe])
|> case do
{channel_pid, socket_ref, resource_id, opentelemetry_headers} ->
:otel_propagator_text_map.extract(opentelemetry_headers)
socket = assign(socket, :refs, refs)
opentelemetry_ctx = OpenTelemetry.Ctx.get_current()
opentelemetry_span_ctx = OpenTelemetry.Tracer.current_span_ctx()
send(
channel_pid,
{:connect, socket_ref, resource_id, socket.assigns.gateway.public_key, payload,
{opentelemetry_ctx, opentelemetry_span_ctx}}
)
send(
channel_pid,
{:connect, socket_ref, resource_id, socket.assigns.gateway.public_key, payload,
{opentelemetry_ctx, opentelemetry_span_ctx}}
)
Logger.debug("Gateway replied to the Client with :connect message",
resource_id: resource_id,
channel_pid: inspect(channel_pid),
ref: ref
)
Logger.debug("Gateway replied to the Client with :connect message",
resource_id: resource_id,
channel_pid: inspect(channel_pid)
)
{:reply, :ok, socket}
{:reply, :ok, socket}
other ->
OpenTelemetry.Tracer.set_status(:error, "invalid ref")
Logger.error("Gateway replied with an invalid ref",
ref: inspect(ref),
decoded: inspect(other)
)
{:reply, {:error, %{reason: :invalid_ref}}, socket}
end
end
end