Properly set parent span ids for phoenix channels (#2101)

This commit is contained in:
Andrew Dryga
2023-09-20 22:21:34 -06:00
committed by GitHub
parent 5ed3601231
commit e635ee3774
14 changed files with 203 additions and 63 deletions

View File

@@ -97,7 +97,7 @@ Now you can verify that it's working by connecting to a websocket:
<summary>Client</summary>
```elixir
export CLIENT_TOKEN_FROM_SEEDS="SFMyNTY.g2gDaANkAAhpZGVudGl0eW0AAAAkN2RhN2QxY2QtMTExYy00NGE3LWI1YWMtNDAyN2I5ZDIzMGU1bQAAACDZI3ehOZSu3JOSMREkvzrtKjs8jkrW6fpbVw9opDYmi24GANjCD-qIAWIB4TOA.XhoLEDjIzuv1SXEVUV6lfIHW12n5-J5aBDUKCl8ovMk"
export CLIENT_TOKEN_FROM_SEEDS="SFMyNTY.g2gDaAN3CGlkZW50aXR5bQAAACQ3ZGE3ZDFjZC0xMTFjLTQ0YTctYjVhYy00MDI3YjlkMjMwZTV3Bmlnbm9yZW4GAJhGr7WKAWIACTqA.mrPu5eFVwkfRml7zzHb5uYfosLGaYVHq03-wE02xUNc"
# Panel will only accept token if it's coming with this User-Agent header and from IP 172.28.0.1
export CLIENT_USER_AGENT="iOS/12.5 (iPhone) connlib/0.7.412"
@@ -115,9 +115,9 @@ Now you can verify that it's working by connecting to a websocket:
{"ref":null,"topic":"client","event":"init","payload":{"interface":{"ipv6":"fd00:2021:1111::11:f4bd","upstream_dns":[],"ipv4":"100.71.71.245"},"resources":[{"id":"4429d3aa-53ea-4c03-9435-4dee2899672b","name":"172.20.0.1/16","type":"cidr","address":"172.20.0.0/16"},{"id":"85a1cffc-70d3-46dd-aa6b-776192af7b06","name":"gitlab.mycorp.com","type":"dns","address":"gitlab.mycorp.com","ipv6":"fd00:2021:1111::5:b370","ipv4":"100.85.109.146"}]}}
# List online relays for a Resource
{"event":"list_relays","topic":"client","payload":{"resource_id":"4429d3aa-53ea-4c03-9435-4dee2899672b"},"ref":"unique_list_relays_ref"}
{"event":"prepare_connection","topic":"client","payload":{"resource_id":"116c62dc-bae5-45b0-afa2-4afe7f195144"},"ref":"unique_prepare_connection_ref"}
{"ref":"unique_list_relays_ref","topic":"client","event":"phx_reply","payload":{"status":"ok","response":{"relays":[{"type":"stun","uri":"stun:172.28.0.101:3478"},{"type":"turn","username":"1719090081:UVxHhieTJWaD8_Sg","password":"Ml65XDZyYpuBiEIvk/q0Zy6EEJ1ZwGa4pWztXFP+tOo","uri":"turn:172.28.0.101:3478","expires_at":1719090081}],"resource_id":"4429d3aa-53ea-4c03-9435-4dee2899672b"}}}
{"ref":"unique_prepare_connection_ref","topic":"client","event":"phx_reply","payload":{"status":"ok","response":{"relays":[{"type":"stun","uri":"stun:172.28.0.101:3478"},{"type":"turn","username":"1719090081:UVxHhieTJWaD8_Sg","password":"Ml65XDZyYpuBiEIvk/q0Zy6EEJ1ZwGa4pWztXFP+tOo","uri":"turn:172.28.0.101:3478","expires_at":1719090081}],"resource_id":"4429d3aa-53ea-4c03-9435-4dee2899672b"}}}
# Initiate connection to a resource
{"event":"request_connection","topic":"client","payload":{"resource_id":"4429d3aa-53ea-4c03-9435-4dee2899672b","client_rtc_session_description":"RTC_SD","client_preshared_key":"+HapiGI5UdeRjKuKTwk4ZPPYpCnlXHvvqebcIevL+2A="},"ref":"unique_request_connection_ref"}

View File

@@ -8,16 +8,27 @@ defmodule API.Client.Channel do
@impl true
def join("client", _payload, socket) do
OpenTelemetry.Tracer.with_span socket.assigns.opentelemetry_ctx, "join", %{} do
opentelemetry_ctx = OpenTelemetry.Tracer.current_span_ctx()
OpenTelemetry.Ctx.attach(socket.assigns.opentelemetry_ctx)
OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_span_ctx)
OpenTelemetry.Tracer.with_span "join" do
opentelemetry_ctx = OpenTelemetry.Ctx.get_current()
opentelemetry_span_ctx = OpenTelemetry.Tracer.current_span_ctx()
expires_in =
DateTime.diff(socket.assigns.subject.expires_at, DateTime.utc_now(), :millisecond)
if expires_in > 0 do
Process.send_after(self(), :token_expired, expires_in)
send(self(), {:after_join, opentelemetry_ctx})
{:ok, assign(socket, opentelemetry_ctx: opentelemetry_ctx)}
send(self(), {:after_join, {opentelemetry_ctx, opentelemetry_span_ctx}})
socket =
assign(socket,
opentelemetry_ctx: opentelemetry_ctx,
opentelemetry_span_ctx: opentelemetry_span_ctx
)
{:ok, socket}
else
{:error, %{"reason" => "token_expired"}}
end
@@ -25,8 +36,11 @@ defmodule API.Client.Channel do
end
@impl true
def handle_info({:after_join, opentelemetry_ctx}, socket) do
OpenTelemetry.Tracer.with_span opentelemetry_ctx, "after_join", %{} do
def handle_info({:after_join, {opentelemetry_ctx, opentelemetry_span_ctx}}, socket) do
OpenTelemetry.Ctx.attach(opentelemetry_ctx)
OpenTelemetry.Tracer.set_current_span(opentelemetry_span_ctx)
OpenTelemetry.Tracer.with_span "after_join" do
API.Endpoint.subscribe("client:#{socket.assigns.client.id}")
:ok = Clients.connect_client(socket.assigns.client)
@@ -43,7 +57,8 @@ defmodule API.Client.Channel do
end
def handle_info(:token_expired, socket) do
OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_ctx)
OpenTelemetry.Ctx.attach(socket.assigns.opentelemetry_ctx)
OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_span_ctx)
OpenTelemetry.Tracer.add_event("token_expired", %{})
push(socket, "token_expired", %{})
@@ -54,10 +69,11 @@ defmodule API.Client.Channel do
# to accept the connection from the client
def handle_info(
{:connect, socket_ref, resource_id, gateway_public_key, rtc_session_description,
opentelemetry_ctx},
{opentelemetry_ctx, opentelemetry_span_ctx}},
socket
) do
OpenTelemetry.Tracer.set_current_span(opentelemetry_ctx)
OpenTelemetry.Ctx.attach(opentelemetry_ctx)
OpenTelemetry.Tracer.set_current_span(opentelemetry_span_ctx)
OpenTelemetry.Tracer.add_event("connect", %{resource_id: resource_id})
reply(
@@ -75,7 +91,8 @@ defmodule API.Client.Channel do
end
def handle_info({:resource_added, resource_id}, socket) do
OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_ctx)
OpenTelemetry.Ctx.attach(socket.assigns.opentelemetry_ctx)
OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_span_ctx)
OpenTelemetry.Tracer.add_event("resource_added", %{resource_id: resource_id})
with {:ok, resource} <- Resources.fetch_resource_by_id(resource_id, socket.assigns.subject) do
@@ -86,7 +103,8 @@ defmodule API.Client.Channel do
end
def handle_info({:resource_updated, resource_id}, socket) do
OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_ctx)
OpenTelemetry.Ctx.attach(socket.assigns.opentelemetry_ctx)
OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_span_ctx)
OpenTelemetry.Tracer.add_event("resource_updated", %{resource_id: resource_id})
with {:ok, resource} <- Resources.fetch_resource_by_id(resource_id, socket.assigns.subject) do
@@ -97,7 +115,8 @@ defmodule API.Client.Channel do
end
def handle_info({:resource_removed, resource_id}, socket) do
OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_ctx)
OpenTelemetry.Ctx.attach(socket.assigns.opentelemetry_ctx)
OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_span_ctx)
OpenTelemetry.Tracer.add_event("resource_updated", %{resource_id: resource_id})
push(socket, "resource_removed", resource_id)
@@ -105,7 +124,10 @@ defmodule API.Client.Channel do
end
def handle_in("create_log_sink", _attrs, socket) do
OpenTelemetry.Tracer.with_span socket.assigns.opentelemetry_ctx, "create_log_sink", %{} do
OpenTelemetry.Ctx.attach(socket.assigns.opentelemetry_ctx)
OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_span_ctx)
OpenTelemetry.Tracer.with_span "create_log_sink" do
case Instrumentation.create_remote_log_sink(socket.assigns.client) do
{:ok, signed_url} -> {:reply, {:ok, signed_url}, socket}
{:error, :disabled} -> {:reply, {:error, :disabled}, socket}
@@ -115,9 +137,10 @@ defmodule API.Client.Channel do
@impl true
def handle_in("prepare_connection", %{"resource_id" => resource_id} = attrs, socket) do
OpenTelemetry.Tracer.with_span socket.assigns.opentelemetry_ctx,
"prepare_connection",
attrs do
OpenTelemetry.Ctx.attach(socket.assigns.opentelemetry_ctx)
OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_span_ctx)
OpenTelemetry.Tracer.with_span "prepare_connection", attrs do
connected_gateway_ids = Map.get(attrs, "connected_gateway_ids", [])
with {:ok, resource} <- Resources.fetch_resource_by_id(resource_id, socket.assigns.subject),
@@ -160,12 +183,16 @@ defmodule API.Client.Channel do
} = attrs,
socket
) do
OpenTelemetry.Tracer.with_span socket.assigns.opentelemetry_ctx, "reuse_connection", attrs do
OpenTelemetry.Ctx.attach(socket.assigns.opentelemetry_ctx)
OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_span_ctx)
OpenTelemetry.Tracer.with_span "reuse_connection", attrs do
with {:ok, resource} <- Resources.fetch_resource_by_id(resource_id, socket.assigns.subject),
# :ok = Resource.authorize(resource, socket.assigns.subject),
{:ok, gateway} <- Gateways.fetch_gateway_by_id(gateway_id, socket.assigns.subject),
true <- Gateways.gateway_can_connect_to_resource?(gateway, resource) do
opentelemetry_ctx = OpenTelemetry.Tracer.current_span_ctx()
opentelemetry_ctx = OpenTelemetry.Ctx.get_current()
opentelemetry_span_ctx = OpenTelemetry.Tracer.current_span_ctx()
:ok =
API.Gateway.Channel.broadcast(
@@ -175,7 +202,7 @@ defmodule API.Client.Channel do
client_id: socket.assigns.client.id,
resource_id: resource.id,
authorization_expires_at: socket.assigns.subject.expires_at
}, opentelemetry_ctx}
}, {opentelemetry_ctx, opentelemetry_span_ctx}}
)
{:noreply, socket}
@@ -203,15 +230,16 @@ defmodule API.Client.Channel do
socket
) do
ctx_attrs = %{gateway_id: gateway_id, resource_id: resource_id}
OpenTelemetry.Ctx.attach(socket.assigns.opentelemetry_ctx)
OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_span_ctx)
OpenTelemetry.Tracer.with_span socket.assigns.opentelemetry_ctx,
"request_connection",
ctx_attrs do
OpenTelemetry.Tracer.with_span "request_connection", ctx_attrs do
with {:ok, resource} <- Resources.fetch_resource_by_id(resource_id, socket.assigns.subject),
# :ok = Resource.authorize(resource, socket.assigns.subject),
{:ok, gateway} <- Gateways.fetch_gateway_by_id(gateway_id, socket.assigns.subject),
true <- Gateways.gateway_can_connect_to_resource?(gateway, resource) do
opentelemetry_ctx = OpenTelemetry.Tracer.current_span_ctx()
opentelemetry_ctx = OpenTelemetry.Ctx.get_current()
opentelemetry_span_ctx = OpenTelemetry.Tracer.current_span_ctx()
:ok =
API.Gateway.Channel.broadcast(
@@ -223,7 +251,7 @@ defmodule API.Client.Channel do
authorization_expires_at: socket.assigns.subject.expires_at,
client_rtc_session_description: client_rtc_session_description,
client_preshared_key: preshared_key
}, opentelemetry_ctx}
}, {opentelemetry_ctx, opentelemetry_span_ctx}}
)
{:noreply, socket}

View File

@@ -29,7 +29,8 @@ defmodule API.Client.Socket do
socket
|> assign(:subject, subject)
|> assign(:client, client)
|> assign(:opentelemetry_ctx, OpenTelemetry.Tracer.current_span_ctx())
|> assign(:opentelemetry_span_ctx, OpenTelemetry.Tracer.current_span_ctx())
|> assign(:opentelemetry_ctx, OpenTelemetry.Ctx.get_current())
{:ok, socket}
else

View File

@@ -12,17 +12,31 @@ defmodule API.Gateway.Channel do
@impl true
def join("gateway", _payload, socket) do
OpenTelemetry.Tracer.with_span socket.assigns.opentelemetry_ctx, "join", %{} do
opentelemetry_ctx = OpenTelemetry.Tracer.current_span_ctx()
send(self(), {:after_join, opentelemetry_ctx})
socket = assign(socket, :refs, %{})
OpenTelemetry.Ctx.attach(socket.assigns.opentelemetry_ctx)
OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_span_ctx)
OpenTelemetry.Tracer.with_span "join" do
opentelemetry_ctx = OpenTelemetry.Ctx.get_current()
opentelemetry_span_ctx = OpenTelemetry.Tracer.current_span_ctx()
send(self(), {:after_join, {opentelemetry_ctx, opentelemetry_span_ctx}})
socket =
assign(socket,
opentelemetry_ctx: opentelemetry_ctx,
opentelemetry_span_ctx: opentelemetry_span_ctx,
refs: %{}
)
{:ok, socket}
end
end
@impl true
def handle_info({:after_join, opentelemetry_ctx}, socket) do
OpenTelemetry.Tracer.with_span opentelemetry_ctx, "after_join", %{} do
def handle_info({:after_join, {opentelemetry_ctx, opentelemetry_span_ctx}}, socket) do
OpenTelemetry.Ctx.attach(opentelemetry_ctx)
OpenTelemetry.Tracer.set_current_span(opentelemetry_span_ctx)
OpenTelemetry.Tracer.with_span "after_join" do
:ok = Gateways.connect_gateway(socket.assigns.gateway)
:ok = API.Endpoint.subscribe("gateway:#{socket.assigns.gateway.id}")
@@ -37,8 +51,11 @@ defmodule API.Gateway.Channel do
end
end
def handle_info({:allow_access, attrs, opentelemetry_ctx}, socket) do
OpenTelemetry.Tracer.with_span opentelemetry_ctx, "allow_access", %{} do
def handle_info({:allow_access, attrs, {opentelemetry_ctx, opentelemetry_span_ctx}}, socket) do
OpenTelemetry.Ctx.attach(opentelemetry_ctx)
OpenTelemetry.Tracer.set_current_span(opentelemetry_span_ctx)
OpenTelemetry.Tracer.with_span "allow_access" do
%{
client_id: client_id,
resource_id: resource_id,
@@ -58,11 +75,16 @@ defmodule API.Gateway.Channel do
end
def handle_info(
{:request_connection, {channel_pid, socket_ref}, attrs, opentelemetry_ctx},
{:request_connection, {channel_pid, socket_ref}, attrs,
{opentelemetry_ctx, opentelemetry_span_ctx}},
socket
) do
OpenTelemetry.Tracer.with_span opentelemetry_ctx, "allow_access", %{} do
opentelemetry_ctx = OpenTelemetry.Tracer.current_span_ctx()
OpenTelemetry.Ctx.attach(opentelemetry_ctx)
OpenTelemetry.Tracer.set_current_span(opentelemetry_span_ctx)
OpenTelemetry.Tracer.with_span "allow_access" do
opentelemetry_ctx = OpenTelemetry.Ctx.get_current()
opentelemetry_span_ctx = OpenTelemetry.Tracer.current_span_ctx()
%{
client_id: client_id,
@@ -102,7 +124,7 @@ defmodule API.Gateway.Channel do
Map.put(
socket.assigns.refs,
ref,
{channel_pid, socket_ref, resource_id, opentelemetry_ctx}
{channel_pid, socket_ref, resource_id, {opentelemetry_ctx, opentelemetry_span_ctx}}
)
socket = assign(socket, :refs, refs)
@@ -120,16 +142,22 @@ defmodule API.Gateway.Channel do
},
socket
) do
{{channel_pid, socket_ref, resource_id, opentelemetry_ctx}, refs} =
{{channel_pid, socket_ref, resource_id, {opentelemetry_ctx, opentelemetry_span_ctx}}, refs} =
Map.pop(socket.assigns.refs, ref)
OpenTelemetry.Tracer.with_span opentelemetry_ctx, "connection_ready", %{} do
OpenTelemetry.Ctx.attach(opentelemetry_ctx)
OpenTelemetry.Tracer.set_current_span(opentelemetry_span_ctx)
OpenTelemetry.Tracer.with_span "connection_ready" do
opentelemetry_ctx = OpenTelemetry.Ctx.get_current()
opentelemetry_span_ctx = OpenTelemetry.Tracer.current_span_ctx()
socket = assign(socket, :refs, refs)
send(
channel_pid,
{:connect, socket_ref, resource_id, socket.assigns.gateway.public_key,
rtc_session_description, opentelemetry_ctx}
rtc_session_description, {opentelemetry_ctx, opentelemetry_span_ctx}}
)
Logger.debug("Gateway replied to the Client with :connect message",

View File

@@ -34,7 +34,8 @@ defmodule API.Gateway.Socket do
socket =
socket
|> assign(:gateway, gateway)
|> assign(:opentelemetry_ctx, OpenTelemetry.Tracer.current_span_ctx())
|> assign(:opentelemetry_span_ctx, OpenTelemetry.Tracer.current_span_ctx())
|> assign(:opentelemetry_ctx, OpenTelemetry.Ctx.get_current())
{:ok, socket}
else

View File

@@ -5,16 +5,33 @@ defmodule API.Relay.Channel do
@impl true
def join("relay", %{"stamp_secret" => stamp_secret}, socket) do
OpenTelemetry.Ctx.attach(socket.assigns.opentelemetry_ctx)
OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_span_ctx)
OpenTelemetry.Tracer.with_span socket.assigns.opentelemetry_ctx, "join", %{} do
opentelemetry_ctx = OpenTelemetry.Tracer.current_span_ctx()
send(self(), {:after_join, stamp_secret, opentelemetry_ctx})
{:ok, assign(socket, opentelemetry_ctx: opentelemetry_ctx)}
opentelemetry_ctx = OpenTelemetry.Ctx.get_current()
opentelemetry_span_ctx = OpenTelemetry.Tracer.current_span_ctx()
send(self(), {:after_join, stamp_secret, {opentelemetry_ctx, opentelemetry_span_ctx}})
socket =
assign(socket,
opentelemetry_ctx: opentelemetry_ctx,
opentelemetry_span_ctx: opentelemetry_span_ctx
)
{:ok, socket}
end
end
@impl true
def handle_info({:after_join, stamp_secret, opentelemetry_ctx}, socket) do
OpenTelemetry.Tracer.with_span opentelemetry_ctx, "after_join", %{} do
def handle_info(
{:after_join, stamp_secret, {opentelemetry_ctx, opentelemetry_span_ctx}},
socket
) do
OpenTelemetry.Ctx.attach(opentelemetry_ctx)
OpenTelemetry.Tracer.set_current_span(opentelemetry_span_ctx)
OpenTelemetry.Tracer.with_span "after_join" do
API.Endpoint.subscribe("relay:#{socket.assigns.relay.id}")
push(socket, "init", %{})
:ok = Relays.connect_relay(socket.assigns.relay, stamp_secret)

View File

@@ -34,7 +34,8 @@ defmodule API.Relay.Socket do
socket =
socket
|> assign(:relay, relay)
|> assign(:opentelemetry_ctx, OpenTelemetry.Tracer.current_span_ctx())
|> assign(:opentelemetry_span_ctx, OpenTelemetry.Tracer.current_span_ctx())
|> assign(:opentelemetry_ctx, OpenTelemetry.Ctx.get_current())
{:ok, socket}
else

View File

@@ -19,6 +19,9 @@ defmodule API.Sockets do
def handle_error(conn, :invalid_token),
do: Plug.Conn.send_resp(conn, 401, "Invalid token")
def handle_error(conn, :missing_token),
do: Plug.Conn.send_resp(conn, 401, "Missing token")
def handle_error(conn, :unauthenticated),
do: Plug.Conn.send_resp(conn, 403, "Forbidden")

View File

@@ -32,7 +32,8 @@ defmodule API.Client.ChannelTest do
{:ok, _reply, socket} =
API.Client.Socket
|> socket("client:#{client.id}", %{
opentelemetry_ctx: OpenTelemetry.Tracer.start_span("test"),
opentelemetry_ctx: OpenTelemetry.Ctx.new(),
opentelemetry_span_ctx: OpenTelemetry.Tracer.start_span("test"),
client: client,
subject: subject
})
@@ -70,7 +71,8 @@ defmodule API.Client.ChannelTest do
{:ok, _reply, _socket} =
API.Client.Socket
|> socket("client:#{client.id}", %{
opentelemetry_ctx: OpenTelemetry.Tracer.start_span("test"),
opentelemetry_ctx: OpenTelemetry.Ctx.new(),
opentelemetry_span_ctx: OpenTelemetry.Tracer.start_span("test"),
client: client,
subject: subject
})
@@ -427,10 +429,11 @@ defmodule API.Client.ChannelTest do
assert authorization_expires_at == socket.assigns.subject.expires_at
otel_ctx = {OpenTelemetry.Ctx.new(), OpenTelemetry.Tracer.start_span("connect")}
send(
channel_pid,
{:connect, socket_ref, resource.id, gateway.public_key, "FULL_RTC_SD",
OpenTelemetry.Tracer.start_span("connect")}
{:connect, socket_ref, resource.id, gateway.public_key, "FULL_RTC_SD", otel_ctx}
)
assert_reply ref, :ok, %{

View File

@@ -37,6 +37,25 @@ defmodule API.Client.SocketTest do
assert client.last_seen_version == "0.7.412"
end
test "propagates trace context" do
subject = Fixtures.Auth.create_subject()
{:ok, token} = Auth.create_session_token_from_subject(subject)
span_ctx = OpenTelemetry.Tracer.start_span("test")
OpenTelemetry.Tracer.set_current_span(span_ctx)
attrs = connect_attrs(token: token)
trace_context_headers = [
{"traceparent", "00-a1bf53221e0be8000000000000000002-f316927eb144aa62-01"}
]
connect_info = %{connect_info(subject) | trace_context_headers: trace_context_headers}
assert {:ok, _socket} = connect(Socket, attrs, connect_info: connect_info)
assert span_ctx != OpenTelemetry.Tracer.current_span_ctx()
end
test "updates existing client" do
subject = Fixtures.Auth.create_subject()
existing_client = Fixtures.Clients.create_client(subject: subject)

View File

@@ -19,7 +19,8 @@ defmodule API.Gateway.ChannelTest do
API.Gateway.Socket
|> socket("gateway:#{gateway.id}", %{
gateway: gateway,
opentelemetry_ctx: OpenTelemetry.Tracer.start_span("test")
opentelemetry_ctx: OpenTelemetry.Ctx.new(),
opentelemetry_span_ctx: OpenTelemetry.Tracer.start_span("test")
})
|> subscribe_and_join(API.Gateway.Channel, "gateway")
@@ -70,7 +71,7 @@ defmodule API.Gateway.ChannelTest do
socket: socket
} do
expires_at = DateTime.utc_now() |> DateTime.add(30, :second)
opentelemetry_ctx = OpenTelemetry.Tracer.start_span("test")
otel_ctx = {OpenTelemetry.Ctx.new(), OpenTelemetry.Tracer.start_span("connect")}
stamp_secret = Ecto.UUID.generate()
:ok = Domain.Relays.connect_relay(relay, stamp_secret)
@@ -82,7 +83,7 @@ defmodule API.Gateway.ChannelTest do
client_id: client.id,
resource_id: resource.id,
authorization_expires_at: expires_at
}, opentelemetry_ctx}
}, otel_ctx}
)
assert_push "allow_access", payload
@@ -119,7 +120,7 @@ defmodule API.Gateway.ChannelTest do
preshared_key = "PSK"
rtc_session_description = "RTC_SD"
opentelemetry_ctx = OpenTelemetry.Tracer.start_span("test")
otel_ctx = {OpenTelemetry.Ctx.new(), OpenTelemetry.Tracer.start_span("connect")}
stamp_secret = Ecto.UUID.generate()
:ok = Domain.Relays.connect_relay(relay, stamp_secret)
@@ -133,7 +134,7 @@ defmodule API.Gateway.ChannelTest do
authorization_expires_at: expires_at,
client_rtc_session_description: rtc_session_description,
client_preshared_key: preshared_key
}, opentelemetry_ctx}
}, otel_ctx}
)
assert_push "request_connection", payload
@@ -223,7 +224,7 @@ defmodule API.Gateway.ChannelTest do
gateway_public_key = gateway.public_key
rtc_session_description = "RTC_SD"
opentelemetry_ctx = OpenTelemetry.Tracer.start_span("test")
otel_ctx = {OpenTelemetry.Ctx.new(), OpenTelemetry.Tracer.start_span("connect")}
stamp_secret = Ecto.UUID.generate()
:ok = Domain.Relays.connect_relay(relay, stamp_secret)
@@ -237,7 +238,7 @@ defmodule API.Gateway.ChannelTest do
authorization_expires_at: expires_at,
client_rtc_session_description: rtc_session_description,
client_preshared_key: preshared_key
}, opentelemetry_ctx}
}, otel_ctx}
)
assert_push "request_connection", %{ref: ref}

View File

@@ -34,6 +34,24 @@ defmodule API.Gateway.SocketTest do
assert gateway.last_seen_version == @connlib_version
end
test "propagates trace context" do
token = Fixtures.Gateways.create_token()
encrypted_secret = Gateways.encode_token!(token)
attrs = connect_attrs(token: encrypted_secret)
span_ctx = OpenTelemetry.Tracer.start_span("test")
OpenTelemetry.Tracer.set_current_span(span_ctx)
trace_context_headers = [
{"traceparent", "00-a1bf53221e0be8000000000000000002-f316927eb144aa62-01"}
]
connect_info = %{@connect_info | trace_context_headers: trace_context_headers}
assert {:ok, _socket} = connect(Socket, attrs, connect_info: connect_info)
assert span_ctx != OpenTelemetry.Tracer.current_span_ctx()
end
test "updates existing gateway" do
token = Fixtures.Gateways.create_token()
existing_gateway = Fixtures.Gateways.create_gateway(token: token)

View File

@@ -10,7 +10,8 @@ defmodule API.Relay.ChannelTest do
API.Relay.Socket
|> socket("relay:#{relay.id}", %{
relay: relay,
opentelemetry_ctx: OpenTelemetry.Tracer.start_span("test")
opentelemetry_ctx: OpenTelemetry.Ctx.new(),
opentelemetry_span_ctx: OpenTelemetry.Tracer.start_span("test")
})
|> subscribe_and_join(API.Relay.Channel, "relay", %{stamp_secret: stamp_secret})
@@ -35,7 +36,8 @@ defmodule API.Relay.ChannelTest do
API.Relay.Socket
|> socket("relay:#{relay.id}", %{
relay: relay,
opentelemetry_ctx: OpenTelemetry.Tracer.start_span("test")
opentelemetry_ctx: OpenTelemetry.Ctx.new(),
opentelemetry_span_ctx: OpenTelemetry.Tracer.start_span("test")
})
|> subscribe_and_join(API.Relay.Channel, "relay", %{stamp_secret: stamp_secret})

View File

@@ -34,6 +34,24 @@ defmodule API.Relay.SocketTest do
assert relay.last_seen_version == @connlib_version
end
test "propagates trace context" do
token = Fixtures.Relays.create_token()
encrypted_secret = Relays.encode_token!(token)
attrs = connect_attrs(token: encrypted_secret)
span_ctx = OpenTelemetry.Tracer.start_span("test")
OpenTelemetry.Tracer.set_current_span(span_ctx)
trace_context_headers = [
{"traceparent", "00-a1bf53221e0be8000000000000000002-f316927eb144aa62-01"}
]
connect_info = %{@connect_info | trace_context_headers: trace_context_headers}
assert {:ok, _socket} = connect(Socket, attrs, connect_info: connect_info)
assert span_ctx != OpenTelemetry.Tracer.current_span_ctx()
end
test "updates existing relay" do
token = Fixtures.Relays.create_token()
existing_relay = Fixtures.Relays.create_relay(token: token)