From e635ee377412b8040e9bcac4bf24ca0caed8b104 Mon Sep 17 00:00:00 2001 From: Andrew Dryga Date: Wed, 20 Sep 2023 22:21:34 -0600 Subject: [PATCH] Properly set parent span ids for phoenix channels (#2101) --- elixir/README.md | 6 +- elixir/apps/api/lib/api/client/channel.ex | 76 +++++++++++++------ elixir/apps/api/lib/api/client/socket.ex | 3 +- elixir/apps/api/lib/api/gateway/channel.ex | 58 ++++++++++---- elixir/apps/api/lib/api/gateway/socket.ex | 3 +- elixir/apps/api/lib/api/relay/channel.ex | 27 +++++-- elixir/apps/api/lib/api/relay/socket.ex | 3 +- elixir/apps/api/lib/api/sockets.ex | 3 + .../apps/api/test/api/client/channel_test.exs | 11 ++- .../apps/api/test/api/client/socket_test.exs | 19 +++++ .../api/test/api/gateway/channel_test.exs | 15 ++-- .../apps/api/test/api/gateway/socket_test.exs | 18 +++++ .../apps/api/test/api/relay/channel_test.exs | 6 +- .../apps/api/test/api/relay/socket_test.exs | 18 +++++ 14 files changed, 203 insertions(+), 63 deletions(-) diff --git a/elixir/README.md b/elixir/README.md index 89e128bf1..d41e06cb8 100644 --- a/elixir/README.md +++ b/elixir/README.md @@ -97,7 +97,7 @@ Now you can verify that it's working by connecting to a websocket: Client ```elixir -❯ export CLIENT_TOKEN_FROM_SEEDS="SFMyNTY.g2gDaANkAAhpZGVudGl0eW0AAAAkN2RhN2QxY2QtMTExYy00NGE3LWI1YWMtNDAyN2I5ZDIzMGU1bQAAACDZI3ehOZSu3JOSMREkvzrtKjs8jkrW6fpbVw9opDYmi24GANjCD-qIAWIB4TOA.XhoLEDjIzuv1SXEVUV6lfIHW12n5-J5aBDUKCl8ovMk" +❯ export CLIENT_TOKEN_FROM_SEEDS="SFMyNTY.g2gDaAN3CGlkZW50aXR5bQAAACQ3ZGE3ZDFjZC0xMTFjLTQ0YTctYjVhYy00MDI3YjlkMjMwZTV3Bmlnbm9yZW4GAJhGr7WKAWIACTqA.mrPu5eFVwkfRml7zzHb5uYfosLGaYVHq03-wE02xUNc" # Panel will only accept token if it's coming with this User-Agent header and from IP 172.28.0.1 ❯ export CLIENT_USER_AGENT="iOS/12.5 (iPhone) connlib/0.7.412" @@ -115,9 +115,9 @@ Now you can verify that it's working by connecting to a websocket: {"ref":null,"topic":"client","event":"init","payload":{"interface":{"ipv6":"fd00:2021:1111::11:f4bd","upstream_dns":[],"ipv4":"100.71.71.245"},"resources":[{"id":"4429d3aa-53ea-4c03-9435-4dee2899672b","name":"172.20.0.1/16","type":"cidr","address":"172.20.0.0/16"},{"id":"85a1cffc-70d3-46dd-aa6b-776192af7b06","name":"gitlab.mycorp.com","type":"dns","address":"gitlab.mycorp.com","ipv6":"fd00:2021:1111::5:b370","ipv4":"100.85.109.146"}]}} # List online relays for a Resource -❯ {"event":"list_relays","topic":"client","payload":{"resource_id":"4429d3aa-53ea-4c03-9435-4dee2899672b"},"ref":"unique_list_relays_ref"} +❯ {"event":"prepare_connection","topic":"client","payload":{"resource_id":"116c62dc-bae5-45b0-afa2-4afe7f195144"},"ref":"unique_prepare_connection_ref"} -{"ref":"unique_list_relays_ref","topic":"client","event":"phx_reply","payload":{"status":"ok","response":{"relays":[{"type":"stun","uri":"stun:172.28.0.101:3478"},{"type":"turn","username":"1719090081:UVxHhieTJWaD8_Sg","password":"Ml65XDZyYpuBiEIvk/q0Zy6EEJ1ZwGa4pWztXFP+tOo","uri":"turn:172.28.0.101:3478","expires_at":1719090081}],"resource_id":"4429d3aa-53ea-4c03-9435-4dee2899672b"}}} +{"ref":"unique_prepare_connection_ref","topic":"client","event":"phx_reply","payload":{"status":"ok","response":{"relays":[{"type":"stun","uri":"stun:172.28.0.101:3478"},{"type":"turn","username":"1719090081:UVxHhieTJWaD8_Sg","password":"Ml65XDZyYpuBiEIvk/q0Zy6EEJ1ZwGa4pWztXFP+tOo","uri":"turn:172.28.0.101:3478","expires_at":1719090081}],"resource_id":"4429d3aa-53ea-4c03-9435-4dee2899672b"}}} # Initiate connection to a resource ❯ {"event":"request_connection","topic":"client","payload":{"resource_id":"4429d3aa-53ea-4c03-9435-4dee2899672b","client_rtc_session_description":"RTC_SD","client_preshared_key":"+HapiGI5UdeRjKuKTwk4ZPPYpCnlXHvvqebcIevL+2A="},"ref":"unique_request_connection_ref"} diff --git a/elixir/apps/api/lib/api/client/channel.ex b/elixir/apps/api/lib/api/client/channel.ex index 3489e9a84..f6803c214 100644 --- a/elixir/apps/api/lib/api/client/channel.ex +++ b/elixir/apps/api/lib/api/client/channel.ex @@ -8,16 +8,27 @@ defmodule API.Client.Channel do @impl true def join("client", _payload, socket) do - OpenTelemetry.Tracer.with_span socket.assigns.opentelemetry_ctx, "join", %{} do - opentelemetry_ctx = OpenTelemetry.Tracer.current_span_ctx() + OpenTelemetry.Ctx.attach(socket.assigns.opentelemetry_ctx) + OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_span_ctx) + + OpenTelemetry.Tracer.with_span "join" do + opentelemetry_ctx = OpenTelemetry.Ctx.get_current() + opentelemetry_span_ctx = OpenTelemetry.Tracer.current_span_ctx() expires_in = DateTime.diff(socket.assigns.subject.expires_at, DateTime.utc_now(), :millisecond) if expires_in > 0 do Process.send_after(self(), :token_expired, expires_in) - send(self(), {:after_join, opentelemetry_ctx}) - {:ok, assign(socket, opentelemetry_ctx: opentelemetry_ctx)} + send(self(), {:after_join, {opentelemetry_ctx, opentelemetry_span_ctx}}) + + socket = + assign(socket, + opentelemetry_ctx: opentelemetry_ctx, + opentelemetry_span_ctx: opentelemetry_span_ctx + ) + + {:ok, socket} else {:error, %{"reason" => "token_expired"}} end @@ -25,8 +36,11 @@ defmodule API.Client.Channel do end @impl true - def handle_info({:after_join, opentelemetry_ctx}, socket) do - OpenTelemetry.Tracer.with_span opentelemetry_ctx, "after_join", %{} do + def handle_info({:after_join, {opentelemetry_ctx, opentelemetry_span_ctx}}, socket) do + OpenTelemetry.Ctx.attach(opentelemetry_ctx) + OpenTelemetry.Tracer.set_current_span(opentelemetry_span_ctx) + + OpenTelemetry.Tracer.with_span "after_join" do API.Endpoint.subscribe("client:#{socket.assigns.client.id}") :ok = Clients.connect_client(socket.assigns.client) @@ -43,7 +57,8 @@ defmodule API.Client.Channel do end def handle_info(:token_expired, socket) do - OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_ctx) + OpenTelemetry.Ctx.attach(socket.assigns.opentelemetry_ctx) + OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_span_ctx) OpenTelemetry.Tracer.add_event("token_expired", %{}) push(socket, "token_expired", %{}) @@ -54,10 +69,11 @@ defmodule API.Client.Channel do # to accept the connection from the client def handle_info( {:connect, socket_ref, resource_id, gateway_public_key, rtc_session_description, - opentelemetry_ctx}, + {opentelemetry_ctx, opentelemetry_span_ctx}}, socket ) do - OpenTelemetry.Tracer.set_current_span(opentelemetry_ctx) + OpenTelemetry.Ctx.attach(opentelemetry_ctx) + OpenTelemetry.Tracer.set_current_span(opentelemetry_span_ctx) OpenTelemetry.Tracer.add_event("connect", %{resource_id: resource_id}) reply( @@ -75,7 +91,8 @@ defmodule API.Client.Channel do end def handle_info({:resource_added, resource_id}, socket) do - OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_ctx) + OpenTelemetry.Ctx.attach(socket.assigns.opentelemetry_ctx) + OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_span_ctx) OpenTelemetry.Tracer.add_event("resource_added", %{resource_id: resource_id}) with {:ok, resource} <- Resources.fetch_resource_by_id(resource_id, socket.assigns.subject) do @@ -86,7 +103,8 @@ defmodule API.Client.Channel do end def handle_info({:resource_updated, resource_id}, socket) do - OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_ctx) + OpenTelemetry.Ctx.attach(socket.assigns.opentelemetry_ctx) + OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_span_ctx) OpenTelemetry.Tracer.add_event("resource_updated", %{resource_id: resource_id}) with {:ok, resource} <- Resources.fetch_resource_by_id(resource_id, socket.assigns.subject) do @@ -97,7 +115,8 @@ defmodule API.Client.Channel do end def handle_info({:resource_removed, resource_id}, socket) do - OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_ctx) + OpenTelemetry.Ctx.attach(socket.assigns.opentelemetry_ctx) + OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_span_ctx) OpenTelemetry.Tracer.add_event("resource_updated", %{resource_id: resource_id}) push(socket, "resource_removed", resource_id) @@ -105,7 +124,10 @@ defmodule API.Client.Channel do end def handle_in("create_log_sink", _attrs, socket) do - OpenTelemetry.Tracer.with_span socket.assigns.opentelemetry_ctx, "create_log_sink", %{} do + OpenTelemetry.Ctx.attach(socket.assigns.opentelemetry_ctx) + OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_span_ctx) + + OpenTelemetry.Tracer.with_span "create_log_sink" do case Instrumentation.create_remote_log_sink(socket.assigns.client) do {:ok, signed_url} -> {:reply, {:ok, signed_url}, socket} {:error, :disabled} -> {:reply, {:error, :disabled}, socket} @@ -115,9 +137,10 @@ defmodule API.Client.Channel do @impl true def handle_in("prepare_connection", %{"resource_id" => resource_id} = attrs, socket) do - OpenTelemetry.Tracer.with_span socket.assigns.opentelemetry_ctx, - "prepare_connection", - attrs do + OpenTelemetry.Ctx.attach(socket.assigns.opentelemetry_ctx) + OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_span_ctx) + + OpenTelemetry.Tracer.with_span "prepare_connection", attrs do connected_gateway_ids = Map.get(attrs, "connected_gateway_ids", []) with {:ok, resource} <- Resources.fetch_resource_by_id(resource_id, socket.assigns.subject), @@ -160,12 +183,16 @@ defmodule API.Client.Channel do } = attrs, socket ) do - OpenTelemetry.Tracer.with_span socket.assigns.opentelemetry_ctx, "reuse_connection", attrs do + OpenTelemetry.Ctx.attach(socket.assigns.opentelemetry_ctx) + OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_span_ctx) + + OpenTelemetry.Tracer.with_span "reuse_connection", attrs do with {:ok, resource} <- Resources.fetch_resource_by_id(resource_id, socket.assigns.subject), # :ok = Resource.authorize(resource, socket.assigns.subject), {:ok, gateway} <- Gateways.fetch_gateway_by_id(gateway_id, socket.assigns.subject), true <- Gateways.gateway_can_connect_to_resource?(gateway, resource) do - opentelemetry_ctx = OpenTelemetry.Tracer.current_span_ctx() + opentelemetry_ctx = OpenTelemetry.Ctx.get_current() + opentelemetry_span_ctx = OpenTelemetry.Tracer.current_span_ctx() :ok = API.Gateway.Channel.broadcast( @@ -175,7 +202,7 @@ defmodule API.Client.Channel do client_id: socket.assigns.client.id, resource_id: resource.id, authorization_expires_at: socket.assigns.subject.expires_at - }, opentelemetry_ctx} + }, {opentelemetry_ctx, opentelemetry_span_ctx}} ) {:noreply, socket} @@ -203,15 +230,16 @@ defmodule API.Client.Channel do socket ) do ctx_attrs = %{gateway_id: gateway_id, resource_id: resource_id} + OpenTelemetry.Ctx.attach(socket.assigns.opentelemetry_ctx) + OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_span_ctx) - OpenTelemetry.Tracer.with_span socket.assigns.opentelemetry_ctx, - "request_connection", - ctx_attrs do + OpenTelemetry.Tracer.with_span "request_connection", ctx_attrs do with {:ok, resource} <- Resources.fetch_resource_by_id(resource_id, socket.assigns.subject), # :ok = Resource.authorize(resource, socket.assigns.subject), {:ok, gateway} <- Gateways.fetch_gateway_by_id(gateway_id, socket.assigns.subject), true <- Gateways.gateway_can_connect_to_resource?(gateway, resource) do - opentelemetry_ctx = OpenTelemetry.Tracer.current_span_ctx() + opentelemetry_ctx = OpenTelemetry.Ctx.get_current() + opentelemetry_span_ctx = OpenTelemetry.Tracer.current_span_ctx() :ok = API.Gateway.Channel.broadcast( @@ -223,7 +251,7 @@ defmodule API.Client.Channel do authorization_expires_at: socket.assigns.subject.expires_at, client_rtc_session_description: client_rtc_session_description, client_preshared_key: preshared_key - }, opentelemetry_ctx} + }, {opentelemetry_ctx, opentelemetry_span_ctx}} ) {:noreply, socket} diff --git a/elixir/apps/api/lib/api/client/socket.ex b/elixir/apps/api/lib/api/client/socket.ex index 314a6b637..0222b996f 100644 --- a/elixir/apps/api/lib/api/client/socket.ex +++ b/elixir/apps/api/lib/api/client/socket.ex @@ -29,7 +29,8 @@ defmodule API.Client.Socket do socket |> assign(:subject, subject) |> assign(:client, client) - |> assign(:opentelemetry_ctx, OpenTelemetry.Tracer.current_span_ctx()) + |> assign(:opentelemetry_span_ctx, OpenTelemetry.Tracer.current_span_ctx()) + |> assign(:opentelemetry_ctx, OpenTelemetry.Ctx.get_current()) {:ok, socket} else diff --git a/elixir/apps/api/lib/api/gateway/channel.ex b/elixir/apps/api/lib/api/gateway/channel.ex index bf2ba706b..9f4e21551 100644 --- a/elixir/apps/api/lib/api/gateway/channel.ex +++ b/elixir/apps/api/lib/api/gateway/channel.ex @@ -12,17 +12,31 @@ defmodule API.Gateway.Channel do @impl true def join("gateway", _payload, socket) do - OpenTelemetry.Tracer.with_span socket.assigns.opentelemetry_ctx, "join", %{} do - opentelemetry_ctx = OpenTelemetry.Tracer.current_span_ctx() - send(self(), {:after_join, opentelemetry_ctx}) - socket = assign(socket, :refs, %{}) + OpenTelemetry.Ctx.attach(socket.assigns.opentelemetry_ctx) + OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_span_ctx) + + OpenTelemetry.Tracer.with_span "join" do + opentelemetry_ctx = OpenTelemetry.Ctx.get_current() + opentelemetry_span_ctx = OpenTelemetry.Tracer.current_span_ctx() + send(self(), {:after_join, {opentelemetry_ctx, opentelemetry_span_ctx}}) + + socket = + assign(socket, + opentelemetry_ctx: opentelemetry_ctx, + opentelemetry_span_ctx: opentelemetry_span_ctx, + refs: %{} + ) + {:ok, socket} end end @impl true - def handle_info({:after_join, opentelemetry_ctx}, socket) do - OpenTelemetry.Tracer.with_span opentelemetry_ctx, "after_join", %{} do + def handle_info({:after_join, {opentelemetry_ctx, opentelemetry_span_ctx}}, socket) do + OpenTelemetry.Ctx.attach(opentelemetry_ctx) + OpenTelemetry.Tracer.set_current_span(opentelemetry_span_ctx) + + OpenTelemetry.Tracer.with_span "after_join" do :ok = Gateways.connect_gateway(socket.assigns.gateway) :ok = API.Endpoint.subscribe("gateway:#{socket.assigns.gateway.id}") @@ -37,8 +51,11 @@ defmodule API.Gateway.Channel do end end - def handle_info({:allow_access, attrs, opentelemetry_ctx}, socket) do - OpenTelemetry.Tracer.with_span opentelemetry_ctx, "allow_access", %{} do + def handle_info({:allow_access, attrs, {opentelemetry_ctx, opentelemetry_span_ctx}}, socket) do + OpenTelemetry.Ctx.attach(opentelemetry_ctx) + OpenTelemetry.Tracer.set_current_span(opentelemetry_span_ctx) + + OpenTelemetry.Tracer.with_span "allow_access" do %{ client_id: client_id, resource_id: resource_id, @@ -58,11 +75,16 @@ defmodule API.Gateway.Channel do end def handle_info( - {:request_connection, {channel_pid, socket_ref}, attrs, opentelemetry_ctx}, + {:request_connection, {channel_pid, socket_ref}, attrs, + {opentelemetry_ctx, opentelemetry_span_ctx}}, socket ) do - OpenTelemetry.Tracer.with_span opentelemetry_ctx, "allow_access", %{} do - opentelemetry_ctx = OpenTelemetry.Tracer.current_span_ctx() + OpenTelemetry.Ctx.attach(opentelemetry_ctx) + OpenTelemetry.Tracer.set_current_span(opentelemetry_span_ctx) + + OpenTelemetry.Tracer.with_span "allow_access" do + opentelemetry_ctx = OpenTelemetry.Ctx.get_current() + opentelemetry_span_ctx = OpenTelemetry.Tracer.current_span_ctx() %{ client_id: client_id, @@ -102,7 +124,7 @@ defmodule API.Gateway.Channel do Map.put( socket.assigns.refs, ref, - {channel_pid, socket_ref, resource_id, opentelemetry_ctx} + {channel_pid, socket_ref, resource_id, {opentelemetry_ctx, opentelemetry_span_ctx}} ) socket = assign(socket, :refs, refs) @@ -120,16 +142,22 @@ defmodule API.Gateway.Channel do }, socket ) do - {{channel_pid, socket_ref, resource_id, opentelemetry_ctx}, refs} = + {{channel_pid, socket_ref, resource_id, {opentelemetry_ctx, opentelemetry_span_ctx}}, refs} = Map.pop(socket.assigns.refs, ref) - OpenTelemetry.Tracer.with_span opentelemetry_ctx, "connection_ready", %{} do + OpenTelemetry.Ctx.attach(opentelemetry_ctx) + OpenTelemetry.Tracer.set_current_span(opentelemetry_span_ctx) + + OpenTelemetry.Tracer.with_span "connection_ready" do + opentelemetry_ctx = OpenTelemetry.Ctx.get_current() + opentelemetry_span_ctx = OpenTelemetry.Tracer.current_span_ctx() + socket = assign(socket, :refs, refs) send( channel_pid, {:connect, socket_ref, resource_id, socket.assigns.gateway.public_key, - rtc_session_description, opentelemetry_ctx} + rtc_session_description, {opentelemetry_ctx, opentelemetry_span_ctx}} ) Logger.debug("Gateway replied to the Client with :connect message", diff --git a/elixir/apps/api/lib/api/gateway/socket.ex b/elixir/apps/api/lib/api/gateway/socket.ex index beb778f51..e5e8ad1fa 100644 --- a/elixir/apps/api/lib/api/gateway/socket.ex +++ b/elixir/apps/api/lib/api/gateway/socket.ex @@ -34,7 +34,8 @@ defmodule API.Gateway.Socket do socket = socket |> assign(:gateway, gateway) - |> assign(:opentelemetry_ctx, OpenTelemetry.Tracer.current_span_ctx()) + |> assign(:opentelemetry_span_ctx, OpenTelemetry.Tracer.current_span_ctx()) + |> assign(:opentelemetry_ctx, OpenTelemetry.Ctx.get_current()) {:ok, socket} else diff --git a/elixir/apps/api/lib/api/relay/channel.ex b/elixir/apps/api/lib/api/relay/channel.ex index 6f28361fb..e3f3e8df6 100644 --- a/elixir/apps/api/lib/api/relay/channel.ex +++ b/elixir/apps/api/lib/api/relay/channel.ex @@ -5,16 +5,33 @@ defmodule API.Relay.Channel do @impl true def join("relay", %{"stamp_secret" => stamp_secret}, socket) do + OpenTelemetry.Ctx.attach(socket.assigns.opentelemetry_ctx) + OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_span_ctx) + OpenTelemetry.Tracer.with_span socket.assigns.opentelemetry_ctx, "join", %{} do - opentelemetry_ctx = OpenTelemetry.Tracer.current_span_ctx() - send(self(), {:after_join, stamp_secret, opentelemetry_ctx}) - {:ok, assign(socket, opentelemetry_ctx: opentelemetry_ctx)} + opentelemetry_ctx = OpenTelemetry.Ctx.get_current() + opentelemetry_span_ctx = OpenTelemetry.Tracer.current_span_ctx() + send(self(), {:after_join, stamp_secret, {opentelemetry_ctx, opentelemetry_span_ctx}}) + + socket = + assign(socket, + opentelemetry_ctx: opentelemetry_ctx, + opentelemetry_span_ctx: opentelemetry_span_ctx + ) + + {:ok, socket} end end @impl true - def handle_info({:after_join, stamp_secret, opentelemetry_ctx}, socket) do - OpenTelemetry.Tracer.with_span opentelemetry_ctx, "after_join", %{} do + def handle_info( + {:after_join, stamp_secret, {opentelemetry_ctx, opentelemetry_span_ctx}}, + socket + ) do + OpenTelemetry.Ctx.attach(opentelemetry_ctx) + OpenTelemetry.Tracer.set_current_span(opentelemetry_span_ctx) + + OpenTelemetry.Tracer.with_span "after_join" do API.Endpoint.subscribe("relay:#{socket.assigns.relay.id}") push(socket, "init", %{}) :ok = Relays.connect_relay(socket.assigns.relay, stamp_secret) diff --git a/elixir/apps/api/lib/api/relay/socket.ex b/elixir/apps/api/lib/api/relay/socket.ex index 5347bf837..cf169cfd0 100644 --- a/elixir/apps/api/lib/api/relay/socket.ex +++ b/elixir/apps/api/lib/api/relay/socket.ex @@ -34,7 +34,8 @@ defmodule API.Relay.Socket do socket = socket |> assign(:relay, relay) - |> assign(:opentelemetry_ctx, OpenTelemetry.Tracer.current_span_ctx()) + |> assign(:opentelemetry_span_ctx, OpenTelemetry.Tracer.current_span_ctx()) + |> assign(:opentelemetry_ctx, OpenTelemetry.Ctx.get_current()) {:ok, socket} else diff --git a/elixir/apps/api/lib/api/sockets.ex b/elixir/apps/api/lib/api/sockets.ex index 745e0369a..0c08d2ad2 100644 --- a/elixir/apps/api/lib/api/sockets.ex +++ b/elixir/apps/api/lib/api/sockets.ex @@ -19,6 +19,9 @@ defmodule API.Sockets do def handle_error(conn, :invalid_token), do: Plug.Conn.send_resp(conn, 401, "Invalid token") + def handle_error(conn, :missing_token), + do: Plug.Conn.send_resp(conn, 401, "Missing token") + def handle_error(conn, :unauthenticated), do: Plug.Conn.send_resp(conn, 403, "Forbidden") diff --git a/elixir/apps/api/test/api/client/channel_test.exs b/elixir/apps/api/test/api/client/channel_test.exs index b61f489f5..d64b702f1 100644 --- a/elixir/apps/api/test/api/client/channel_test.exs +++ b/elixir/apps/api/test/api/client/channel_test.exs @@ -32,7 +32,8 @@ defmodule API.Client.ChannelTest do {:ok, _reply, socket} = API.Client.Socket |> socket("client:#{client.id}", %{ - opentelemetry_ctx: OpenTelemetry.Tracer.start_span("test"), + opentelemetry_ctx: OpenTelemetry.Ctx.new(), + opentelemetry_span_ctx: OpenTelemetry.Tracer.start_span("test"), client: client, subject: subject }) @@ -70,7 +71,8 @@ defmodule API.Client.ChannelTest do {:ok, _reply, _socket} = API.Client.Socket |> socket("client:#{client.id}", %{ - opentelemetry_ctx: OpenTelemetry.Tracer.start_span("test"), + opentelemetry_ctx: OpenTelemetry.Ctx.new(), + opentelemetry_span_ctx: OpenTelemetry.Tracer.start_span("test"), client: client, subject: subject }) @@ -427,10 +429,11 @@ defmodule API.Client.ChannelTest do assert authorization_expires_at == socket.assigns.subject.expires_at + otel_ctx = {OpenTelemetry.Ctx.new(), OpenTelemetry.Tracer.start_span("connect")} + send( channel_pid, - {:connect, socket_ref, resource.id, gateway.public_key, "FULL_RTC_SD", - OpenTelemetry.Tracer.start_span("connect")} + {:connect, socket_ref, resource.id, gateway.public_key, "FULL_RTC_SD", otel_ctx} ) assert_reply ref, :ok, %{ diff --git a/elixir/apps/api/test/api/client/socket_test.exs b/elixir/apps/api/test/api/client/socket_test.exs index cede2f0e4..6df42d1b7 100644 --- a/elixir/apps/api/test/api/client/socket_test.exs +++ b/elixir/apps/api/test/api/client/socket_test.exs @@ -37,6 +37,25 @@ defmodule API.Client.SocketTest do assert client.last_seen_version == "0.7.412" end + test "propagates trace context" do + subject = Fixtures.Auth.create_subject() + {:ok, token} = Auth.create_session_token_from_subject(subject) + + span_ctx = OpenTelemetry.Tracer.start_span("test") + OpenTelemetry.Tracer.set_current_span(span_ctx) + + attrs = connect_attrs(token: token) + + trace_context_headers = [ + {"traceparent", "00-a1bf53221e0be8000000000000000002-f316927eb144aa62-01"} + ] + + connect_info = %{connect_info(subject) | trace_context_headers: trace_context_headers} + + assert {:ok, _socket} = connect(Socket, attrs, connect_info: connect_info) + assert span_ctx != OpenTelemetry.Tracer.current_span_ctx() + end + test "updates existing client" do subject = Fixtures.Auth.create_subject() existing_client = Fixtures.Clients.create_client(subject: subject) diff --git a/elixir/apps/api/test/api/gateway/channel_test.exs b/elixir/apps/api/test/api/gateway/channel_test.exs index 11c0d8ea5..585ee3ac1 100644 --- a/elixir/apps/api/test/api/gateway/channel_test.exs +++ b/elixir/apps/api/test/api/gateway/channel_test.exs @@ -19,7 +19,8 @@ defmodule API.Gateway.ChannelTest do API.Gateway.Socket |> socket("gateway:#{gateway.id}", %{ gateway: gateway, - opentelemetry_ctx: OpenTelemetry.Tracer.start_span("test") + opentelemetry_ctx: OpenTelemetry.Ctx.new(), + opentelemetry_span_ctx: OpenTelemetry.Tracer.start_span("test") }) |> subscribe_and_join(API.Gateway.Channel, "gateway") @@ -70,7 +71,7 @@ defmodule API.Gateway.ChannelTest do socket: socket } do expires_at = DateTime.utc_now() |> DateTime.add(30, :second) - opentelemetry_ctx = OpenTelemetry.Tracer.start_span("test") + otel_ctx = {OpenTelemetry.Ctx.new(), OpenTelemetry.Tracer.start_span("connect")} stamp_secret = Ecto.UUID.generate() :ok = Domain.Relays.connect_relay(relay, stamp_secret) @@ -82,7 +83,7 @@ defmodule API.Gateway.ChannelTest do client_id: client.id, resource_id: resource.id, authorization_expires_at: expires_at - }, opentelemetry_ctx} + }, otel_ctx} ) assert_push "allow_access", payload @@ -119,7 +120,7 @@ defmodule API.Gateway.ChannelTest do preshared_key = "PSK" rtc_session_description = "RTC_SD" - opentelemetry_ctx = OpenTelemetry.Tracer.start_span("test") + otel_ctx = {OpenTelemetry.Ctx.new(), OpenTelemetry.Tracer.start_span("connect")} stamp_secret = Ecto.UUID.generate() :ok = Domain.Relays.connect_relay(relay, stamp_secret) @@ -133,7 +134,7 @@ defmodule API.Gateway.ChannelTest do authorization_expires_at: expires_at, client_rtc_session_description: rtc_session_description, client_preshared_key: preshared_key - }, opentelemetry_ctx} + }, otel_ctx} ) assert_push "request_connection", payload @@ -223,7 +224,7 @@ defmodule API.Gateway.ChannelTest do gateway_public_key = gateway.public_key rtc_session_description = "RTC_SD" - opentelemetry_ctx = OpenTelemetry.Tracer.start_span("test") + otel_ctx = {OpenTelemetry.Ctx.new(), OpenTelemetry.Tracer.start_span("connect")} stamp_secret = Ecto.UUID.generate() :ok = Domain.Relays.connect_relay(relay, stamp_secret) @@ -237,7 +238,7 @@ defmodule API.Gateway.ChannelTest do authorization_expires_at: expires_at, client_rtc_session_description: rtc_session_description, client_preshared_key: preshared_key - }, opentelemetry_ctx} + }, otel_ctx} ) assert_push "request_connection", %{ref: ref} diff --git a/elixir/apps/api/test/api/gateway/socket_test.exs b/elixir/apps/api/test/api/gateway/socket_test.exs index 873129d0a..4f924fe09 100644 --- a/elixir/apps/api/test/api/gateway/socket_test.exs +++ b/elixir/apps/api/test/api/gateway/socket_test.exs @@ -34,6 +34,24 @@ defmodule API.Gateway.SocketTest do assert gateway.last_seen_version == @connlib_version end + test "propagates trace context" do + token = Fixtures.Gateways.create_token() + encrypted_secret = Gateways.encode_token!(token) + attrs = connect_attrs(token: encrypted_secret) + + span_ctx = OpenTelemetry.Tracer.start_span("test") + OpenTelemetry.Tracer.set_current_span(span_ctx) + + trace_context_headers = [ + {"traceparent", "00-a1bf53221e0be8000000000000000002-f316927eb144aa62-01"} + ] + + connect_info = %{@connect_info | trace_context_headers: trace_context_headers} + + assert {:ok, _socket} = connect(Socket, attrs, connect_info: connect_info) + assert span_ctx != OpenTelemetry.Tracer.current_span_ctx() + end + test "updates existing gateway" do token = Fixtures.Gateways.create_token() existing_gateway = Fixtures.Gateways.create_gateway(token: token) diff --git a/elixir/apps/api/test/api/relay/channel_test.exs b/elixir/apps/api/test/api/relay/channel_test.exs index fcca30501..ad1f8373c 100644 --- a/elixir/apps/api/test/api/relay/channel_test.exs +++ b/elixir/apps/api/test/api/relay/channel_test.exs @@ -10,7 +10,8 @@ defmodule API.Relay.ChannelTest do API.Relay.Socket |> socket("relay:#{relay.id}", %{ relay: relay, - opentelemetry_ctx: OpenTelemetry.Tracer.start_span("test") + opentelemetry_ctx: OpenTelemetry.Ctx.new(), + opentelemetry_span_ctx: OpenTelemetry.Tracer.start_span("test") }) |> subscribe_and_join(API.Relay.Channel, "relay", %{stamp_secret: stamp_secret}) @@ -35,7 +36,8 @@ defmodule API.Relay.ChannelTest do API.Relay.Socket |> socket("relay:#{relay.id}", %{ relay: relay, - opentelemetry_ctx: OpenTelemetry.Tracer.start_span("test") + opentelemetry_ctx: OpenTelemetry.Ctx.new(), + opentelemetry_span_ctx: OpenTelemetry.Tracer.start_span("test") }) |> subscribe_and_join(API.Relay.Channel, "relay", %{stamp_secret: stamp_secret}) diff --git a/elixir/apps/api/test/api/relay/socket_test.exs b/elixir/apps/api/test/api/relay/socket_test.exs index 40b6e2a84..cc556eb46 100644 --- a/elixir/apps/api/test/api/relay/socket_test.exs +++ b/elixir/apps/api/test/api/relay/socket_test.exs @@ -34,6 +34,24 @@ defmodule API.Relay.SocketTest do assert relay.last_seen_version == @connlib_version end + test "propagates trace context" do + token = Fixtures.Relays.create_token() + encrypted_secret = Relays.encode_token!(token) + attrs = connect_attrs(token: encrypted_secret) + + span_ctx = OpenTelemetry.Tracer.start_span("test") + OpenTelemetry.Tracer.set_current_span(span_ctx) + + trace_context_headers = [ + {"traceparent", "00-a1bf53221e0be8000000000000000002-f316927eb144aa62-01"} + ] + + connect_info = %{@connect_info | trace_context_headers: trace_context_headers} + + assert {:ok, _socket} = connect(Socket, attrs, connect_info: connect_info) + assert span_ctx != OpenTelemetry.Tracer.current_span_ctx() + end + test "updates existing relay" do token = Fixtures.Relays.create_token() existing_relay = Fixtures.Relays.create_relay(token: token)