diff --git a/docker-compose.yml b/docker-compose.yml index 4abf6eefa..d8767ddb0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -309,8 +309,6 @@ services: EXTERNAL_URL: http://localhost:8081/ # Erlang ERLANG_DISTRIBUTION_PORT: 9000 - ERLANG_CLUSTER_ADAPTER: "Elixir.Domain.Cluster.Local" - ERLANG_CLUSTER_ADAPTER_CONFIG: '{}' RELEASE_COOKIE: "NksuBhJFBhjHD1uUa9mDOHV" RELEASE_HOSTNAME: "mix.cluster.local" RELEASE_NAME: "mix" diff --git a/elixir/apps/api/lib/api/client/channel.ex b/elixir/apps/api/lib/api/client/channel.ex index d1361feae..3489e9a84 100644 --- a/elixir/apps/api/lib/api/client/channel.ex +++ b/elixir/apps/api/lib/api/client/channel.ex @@ -1,40 +1,51 @@ defmodule API.Client.Channel do use API, :channel alias API.Client.Views + alias Domain.Instrumentation alias Domain.{Clients, Resources, Gateways, Relays} require Logger + require OpenTelemetry.Tracer @impl true def join("client", _payload, socket) do - expires_in = - DateTime.diff(socket.assigns.subject.expires_at, DateTime.utc_now(), :millisecond) + OpenTelemetry.Tracer.with_span socket.assigns.opentelemetry_ctx, "join", %{} do + opentelemetry_ctx = OpenTelemetry.Tracer.current_span_ctx() - if expires_in > 0 do - Process.send_after(self(), :token_expired, expires_in) - send(self(), :after_join) - {:ok, socket} - else - {:error, %{"reason" => "token_expired"}} + expires_in = + DateTime.diff(socket.assigns.subject.expires_at, DateTime.utc_now(), :millisecond) + + if expires_in > 0 do + Process.send_after(self(), :token_expired, expires_in) + send(self(), {:after_join, opentelemetry_ctx}) + {:ok, assign(socket, opentelemetry_ctx: opentelemetry_ctx)} + else + {:error, %{"reason" => "token_expired"}} + end end end @impl true - def handle_info(:after_join, socket) do - API.Endpoint.subscribe("client:#{socket.assigns.client.id}") - :ok = Clients.connect_client(socket.assigns.client) + def handle_info({:after_join, opentelemetry_ctx}, socket) do + OpenTelemetry.Tracer.with_span opentelemetry_ctx, "after_join", %{} do + API.Endpoint.subscribe("client:#{socket.assigns.client.id}") + :ok = Clients.connect_client(socket.assigns.client) - {:ok, resources} = Domain.Resources.list_resources(socket.assigns.subject) + {:ok, resources} = Domain.Resources.list_resources(socket.assigns.subject) - :ok = - push(socket, "init", %{ - resources: Views.Resource.render_many(resources), - interface: Views.Interface.render(socket.assigns.client) - }) + :ok = + push(socket, "init", %{ + resources: Views.Resource.render_many(resources), + interface: Views.Interface.render(socket.assigns.client) + }) - {:noreply, socket} + {:noreply, socket} + end end def handle_info(:token_expired, socket) do + OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_ctx) + OpenTelemetry.Tracer.add_event("token_expired", %{}) + push(socket, "token_expired", %{}) {:stop, :token_expired, socket} end @@ -42,9 +53,13 @@ defmodule API.Client.Channel do # This message is sent by the gateway when it is ready # to accept the connection from the client def handle_info( - {:connect, socket_ref, resource_id, gateway_public_key, rtc_session_description}, + {:connect, socket_ref, resource_id, gateway_public_key, rtc_session_description, + opentelemetry_ctx}, socket ) do + OpenTelemetry.Tracer.set_current_span(opentelemetry_ctx) + OpenTelemetry.Tracer.add_event("connect", %{resource_id: resource_id}) + reply( socket_ref, {:ok, @@ -60,6 +75,9 @@ defmodule API.Client.Channel do end def handle_info({:resource_added, resource_id}, socket) do + OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_ctx) + OpenTelemetry.Tracer.add_event("resource_added", %{resource_id: resource_id}) + with {:ok, resource} <- Resources.fetch_resource_by_id(resource_id, socket.assigns.subject) do push(socket, "resource_added", Views.Resource.render(resource)) end @@ -68,6 +86,9 @@ defmodule API.Client.Channel do end def handle_info({:resource_updated, resource_id}, socket) do + OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_ctx) + OpenTelemetry.Tracer.add_event("resource_updated", %{resource_id: resource_id}) + with {:ok, resource} <- Resources.fetch_resource_by_id(resource_id, socket.assigns.subject) do push(socket, "resource_updated", Views.Resource.render(resource)) end @@ -76,35 +97,56 @@ defmodule API.Client.Channel do end def handle_info({:resource_removed, resource_id}, socket) do + OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_ctx) + OpenTelemetry.Tracer.add_event("resource_updated", %{resource_id: resource_id}) + push(socket, "resource_removed", resource_id) {:noreply, socket} end + def handle_in("create_log_sink", _attrs, socket) do + OpenTelemetry.Tracer.with_span socket.assigns.opentelemetry_ctx, "create_log_sink", %{} do + case Instrumentation.create_remote_log_sink(socket.assigns.client) do + {:ok, signed_url} -> {:reply, {:ok, signed_url}, socket} + {:error, :disabled} -> {:reply, {:error, :disabled}, socket} + end + end + end + @impl true def handle_in("prepare_connection", %{"resource_id" => resource_id} = attrs, socket) do - connected_gateway_ids = Map.get(attrs, "connected_gateway_ids", []) + OpenTelemetry.Tracer.with_span socket.assigns.opentelemetry_ctx, + "prepare_connection", + attrs do + connected_gateway_ids = Map.get(attrs, "connected_gateway_ids", []) - with {:ok, resource} <- Resources.fetch_resource_by_id(resource_id, socket.assigns.subject), - # TODO: - # :ok = Resource.authorize(resource, socket.assigns.subject), - {:ok, [_ | _] = gateways} <- - Gateways.list_connected_gateways_for_resource(resource), - {:ok, [_ | _] = relays} <- Relays.list_connected_relays_for_resource(resource) do - gateway = Gateways.load_balance_gateways(gateways, connected_gateway_ids) + with {:ok, resource} <- Resources.fetch_resource_by_id(resource_id, socket.assigns.subject), + # TODO: + # :ok = Resource.authorize(resource, socket.assigns.subject), + {:ok, [_ | _] = gateways} <- + Gateways.list_connected_gateways_for_resource(resource), + {:ok, [_ | _] = relays} <- Relays.list_connected_relays_for_resource(resource) do + gateway = Gateways.load_balance_gateways(gateways, connected_gateway_ids) - reply = - {:ok, - %{ - relays: Views.Relay.render_many(relays, socket.assigns.subject.expires_at), - resource_id: resource_id, - gateway_id: gateway.id, - gateway_remote_ip: gateway.last_seen_remote_ip - }} + reply = + {:ok, + %{ + relays: Views.Relay.render_many(relays, socket.assigns.subject.expires_at), + resource_id: resource_id, + gateway_id: gateway.id, + gateway_remote_ip: gateway.last_seen_remote_ip + }} - {:reply, reply, socket} - else - {:ok, []} -> {:reply, {:error, :offline}, socket} - {:error, :not_found} -> {:reply, {:error, :not_found}, socket} + {:reply, reply, socket} + else + {:ok, []} -> + OpenTelemetry.Tracer.set_status(:error, "offline") + {:reply, {:error, :offline}, socket} + + {:error, :not_found} -> + OpenTelemetry.Tracer.set_status(:error, "not_found") + {:reply, {:error, :not_found}, socket} + end end end @@ -115,28 +157,37 @@ defmodule API.Client.Channel do %{ "gateway_id" => gateway_id, "resource_id" => resource_id - }, + } = attrs, socket ) do - with {:ok, resource} <- Resources.fetch_resource_by_id(resource_id, socket.assigns.subject), - # :ok = Resource.authorize(resource, socket.assigns.subject), - {:ok, gateway} <- Gateways.fetch_gateway_by_id(gateway_id, socket.assigns.subject), - true <- Gateways.gateway_can_connect_to_resource?(gateway, resource) do - :ok = - API.Gateway.Channel.broadcast( - gateway, - {:allow_access, - %{ - client_id: socket.assigns.client.id, - resource_id: resource.id, - authorization_expires_at: socket.assigns.subject.expires_at - }} - ) + OpenTelemetry.Tracer.with_span socket.assigns.opentelemetry_ctx, "reuse_connection", attrs do + with {:ok, resource} <- Resources.fetch_resource_by_id(resource_id, socket.assigns.subject), + # :ok = Resource.authorize(resource, socket.assigns.subject), + {:ok, gateway} <- Gateways.fetch_gateway_by_id(gateway_id, socket.assigns.subject), + true <- Gateways.gateway_can_connect_to_resource?(gateway, resource) do + opentelemetry_ctx = OpenTelemetry.Tracer.current_span_ctx() - {:noreply, socket} - else - {:error, :not_found} -> {:reply, {:error, :not_found}, socket} - false -> {:reply, {:error, :offline}, socket} + :ok = + API.Gateway.Channel.broadcast( + gateway, + {:allow_access, + %{ + client_id: socket.assigns.client.id, + resource_id: resource.id, + authorization_expires_at: socket.assigns.subject.expires_at + }, opentelemetry_ctx} + ) + + {:noreply, socket} + else + {:error, :not_found} -> + OpenTelemetry.Tracer.set_status(:error, "not_found") + {:reply, {:error, :not_found}, socket} + + false -> + OpenTelemetry.Tracer.set_status(:error, "offline") + {:reply, {:error, :offline}, socket} + end end end @@ -151,27 +202,40 @@ defmodule API.Client.Channel do }, socket ) do - with {:ok, resource} <- Resources.fetch_resource_by_id(resource_id, socket.assigns.subject), - # :ok = Resource.authorize(resource, socket.assigns.subject), - {:ok, gateway} <- Gateways.fetch_gateway_by_id(gateway_id, socket.assigns.subject), - true <- Gateways.gateway_can_connect_to_resource?(gateway, resource) do - :ok = - API.Gateway.Channel.broadcast( - gateway, - {:request_connection, {self(), socket_ref(socket)}, - %{ - client_id: socket.assigns.client.id, - resource_id: resource.id, - authorization_expires_at: socket.assigns.subject.expires_at, - client_rtc_session_description: client_rtc_session_description, - client_preshared_key: preshared_key - }} - ) + ctx_attrs = %{gateway_id: gateway_id, resource_id: resource_id} - {:noreply, socket} - else - {:error, :not_found} -> {:reply, {:error, :not_found}, socket} - false -> {:reply, {:error, :offline}, socket} + OpenTelemetry.Tracer.with_span socket.assigns.opentelemetry_ctx, + "request_connection", + ctx_attrs do + with {:ok, resource} <- Resources.fetch_resource_by_id(resource_id, socket.assigns.subject), + # :ok = Resource.authorize(resource, socket.assigns.subject), + {:ok, gateway} <- Gateways.fetch_gateway_by_id(gateway_id, socket.assigns.subject), + true <- Gateways.gateway_can_connect_to_resource?(gateway, resource) do + opentelemetry_ctx = OpenTelemetry.Tracer.current_span_ctx() + + :ok = + API.Gateway.Channel.broadcast( + gateway, + {:request_connection, {self(), socket_ref(socket)}, + %{ + client_id: socket.assigns.client.id, + resource_id: resource.id, + authorization_expires_at: socket.assigns.subject.expires_at, + client_rtc_session_description: client_rtc_session_description, + client_preshared_key: preshared_key + }, opentelemetry_ctx} + ) + + {:noreply, socket} + else + {:error, :not_found} -> + OpenTelemetry.Tracer.set_status(:error, "not_found") + {:reply, {:error, :not_found}, socket} + + false -> + OpenTelemetry.Tracer.set_status(:error, "offline") + {:reply, {:error, :offline}, socket} + end end end end diff --git a/elixir/apps/api/lib/api/client/socket.ex b/elixir/apps/api/lib/api/client/socket.ex index a8a8ec438..314a6b637 100644 --- a/elixir/apps/api/lib/api/client/socket.ex +++ b/elixir/apps/api/lib/api/client/socket.ex @@ -2,6 +2,7 @@ defmodule API.Client.Socket do use Phoenix.Socket alias Domain.{Auth, Clients} require Logger + require OpenTelemetry.Tracer ## Channels @@ -11,29 +12,36 @@ defmodule API.Client.Socket do @impl true def connect(%{"token" => token} = attrs, socket, connect_info) do - %{ - user_agent: user_agent, - x_headers: x_headers, - peer_data: peer_data - } = connect_info + :otel_propagator_text_map.extract(connect_info.trace_context_headers) - real_ip = API.Sockets.real_ip(x_headers, peer_data) + OpenTelemetry.Tracer.with_span "connect" do + %{ + user_agent: user_agent, + x_headers: x_headers, + peer_data: peer_data + } = connect_info - with {:ok, subject} <- Auth.sign_in(token, user_agent, real_ip), - {:ok, client} <- Clients.upsert_client(attrs, subject) do - socket = - socket - |> assign(:subject, subject) - |> assign(:client, client) + real_ip = API.Sockets.real_ip(x_headers, peer_data) - {:ok, socket} - else - {:error, :unauthorized} -> - {:error, :invalid_token} + with {:ok, subject} <- Auth.sign_in(token, user_agent, real_ip), + {:ok, client} <- Clients.upsert_client(attrs, subject) do + socket = + socket + |> assign(:subject, subject) + |> assign(:client, client) + |> assign(:opentelemetry_ctx, OpenTelemetry.Tracer.current_span_ctx()) - {:error, reason} -> - Logger.debug("Error connecting client websocket: #{inspect(reason)}") - {:error, reason} + {:ok, socket} + else + {:error, :unauthorized} -> + OpenTelemetry.Tracer.set_status(:error, "unauthorized") + {:error, :invalid_token} + + {:error, reason} -> + OpenTelemetry.Tracer.set_status(:error, inspect(reason)) + Logger.debug("Error connecting client websocket: #{inspect(reason)}") + {:error, reason} + end end end diff --git a/elixir/apps/api/lib/api/gateway/channel.ex b/elixir/apps/api/lib/api/gateway/channel.ex index ec445cf9b..bf2ba706b 100644 --- a/elixir/apps/api/lib/api/gateway/channel.ex +++ b/elixir/apps/api/lib/api/gateway/channel.ex @@ -3,6 +3,7 @@ defmodule API.Gateway.Channel do alias API.Gateway.Views alias Domain.{Clients, Resources, Relays, Gateways} require Logger + require OpenTelemetry.Tracer def broadcast(%Gateways.Gateway{} = gateway, payload) do Logger.debug("Gateway message is being dispatched", gateway_id: gateway.id) @@ -11,83 +12,103 @@ defmodule API.Gateway.Channel do @impl true def join("gateway", _payload, socket) do - send(self(), :after_join) - socket = assign(socket, :refs, %{}) - {:ok, socket} + OpenTelemetry.Tracer.with_span socket.assigns.opentelemetry_ctx, "join", %{} do + opentelemetry_ctx = OpenTelemetry.Tracer.current_span_ctx() + send(self(), {:after_join, opentelemetry_ctx}) + socket = assign(socket, :refs, %{}) + {:ok, socket} + end end @impl true - def handle_info(:after_join, socket) do - :ok = Gateways.connect_gateway(socket.assigns.gateway) - :ok = API.Endpoint.subscribe("gateway:#{socket.assigns.gateway.id}") + def handle_info({:after_join, opentelemetry_ctx}, socket) do + OpenTelemetry.Tracer.with_span opentelemetry_ctx, "after_join", %{} do + :ok = Gateways.connect_gateway(socket.assigns.gateway) + :ok = API.Endpoint.subscribe("gateway:#{socket.assigns.gateway.id}") - push(socket, "init", %{ - interface: Views.Interface.render(socket.assigns.gateway), - # TODO: move to settings - ipv4_masquerade_enabled: true, - ipv6_masquerade_enabled: true - }) + push(socket, "init", %{ + interface: Views.Interface.render(socket.assigns.gateway), + # TODO: move to settings + ipv4_masquerade_enabled: true, + ipv6_masquerade_enabled: true + }) - {:noreply, socket} + {:noreply, socket} + end end - def handle_info({:allow_access, attrs}, socket) do - %{ - client_id: client_id, - resource_id: resource_id, - authorization_expires_at: authorization_expires_at - } = attrs + def handle_info({:allow_access, attrs, opentelemetry_ctx}, socket) do + OpenTelemetry.Tracer.with_span opentelemetry_ctx, "allow_access", %{} do + %{ + client_id: client_id, + resource_id: resource_id, + authorization_expires_at: authorization_expires_at + } = attrs - resource = Resources.fetch_resource_by_id!(resource_id) + resource = Resources.fetch_resource_by_id!(resource_id) - push(socket, "allow_access", %{ - client_id: client_id, - resource: Views.Resource.render(resource), - expires_at: DateTime.to_unix(authorization_expires_at, :second) - }) + push(socket, "allow_access", %{ + client_id: client_id, + resource: Views.Resource.render(resource), + expires_at: DateTime.to_unix(authorization_expires_at, :second) + }) - {:noreply, socket} + {:noreply, socket} + end end - def handle_info({:request_connection, {channel_pid, socket_ref}, attrs}, socket) do - %{ - client_id: client_id, - resource_id: resource_id, - authorization_expires_at: authorization_expires_at, - client_rtc_session_description: rtc_session_description, - client_preshared_key: preshared_key - } = attrs + def handle_info( + {:request_connection, {channel_pid, socket_ref}, attrs, opentelemetry_ctx}, + socket + ) do + OpenTelemetry.Tracer.with_span opentelemetry_ctx, "allow_access", %{} do + opentelemetry_ctx = OpenTelemetry.Tracer.current_span_ctx() - Logger.debug("Gateway received connection request message", - client_id: client_id, - resource_id: resource_id - ) + %{ + client_id: client_id, + resource_id: resource_id, + authorization_expires_at: authorization_expires_at, + client_rtc_session_description: rtc_session_description, + client_preshared_key: preshared_key + } = attrs - client = Clients.fetch_client_by_id!(client_id, preload: [:actor]) - resource = Resources.fetch_resource_by_id!(resource_id) - {:ok, relays} = Relays.list_connected_relays_for_resource(resource) + Logger.debug("Gateway received connection request message", + client_id: client_id, + resource_id: resource_id + ) - ref = Ecto.UUID.generate() + client = Clients.fetch_client_by_id!(client_id, preload: [:actor]) + resource = Resources.fetch_resource_by_id!(resource_id) + {:ok, relays} = Relays.list_connected_relays_for_resource(resource) - push(socket, "request_connection", %{ - ref: ref, - actor: Views.Actor.render(client.actor), - relays: Views.Relay.render_many(relays, authorization_expires_at), - resource: Views.Resource.render(resource), - client: Views.Client.render(client, rtc_session_description, preshared_key), - expires_at: DateTime.to_unix(authorization_expires_at, :second) - }) + ref = Ecto.UUID.generate() - Logger.debug("Awaiting gateway connection_ready message", - client_id: client_id, - resource_id: resource_id, - ref: ref - ) + push(socket, "request_connection", %{ + ref: ref, + actor: Views.Actor.render(client.actor), + relays: Views.Relay.render_many(relays, authorization_expires_at), + resource: Views.Resource.render(resource), + client: Views.Client.render(client, rtc_session_description, preshared_key), + expires_at: DateTime.to_unix(authorization_expires_at, :second) + }) - refs = Map.put(socket.assigns.refs, ref, {channel_pid, socket_ref, resource_id}) - socket = assign(socket, :refs, refs) + Logger.debug("Awaiting gateway connection_ready message", + client_id: client_id, + resource_id: resource_id, + ref: ref + ) - {:noreply, socket} + refs = + Map.put( + socket.assigns.refs, + ref, + {channel_pid, socket_ref, resource_id, opentelemetry_ctx} + ) + + socket = assign(socket, :refs, refs) + + {:noreply, socket} + end end @impl true @@ -99,22 +120,26 @@ defmodule API.Gateway.Channel do }, socket ) do - {{channel_pid, socket_ref, resource_id}, refs} = Map.pop(socket.assigns.refs, ref) - socket = assign(socket, :refs, refs) + {{channel_pid, socket_ref, resource_id, opentelemetry_ctx}, refs} = + Map.pop(socket.assigns.refs, ref) - send( - channel_pid, - {:connect, socket_ref, resource_id, socket.assigns.gateway.public_key, - rtc_session_description} - ) + OpenTelemetry.Tracer.with_span opentelemetry_ctx, "connection_ready", %{} do + socket = assign(socket, :refs, refs) - Logger.debug("Gateway replied to the Client with :connect message", - resource_id: resource_id, - channel_pid: inspect(channel_pid), - ref: ref - ) + send( + channel_pid, + {:connect, socket_ref, resource_id, socket.assigns.gateway.public_key, + rtc_session_description, opentelemetry_ctx} + ) - {:reply, :ok, socket} + Logger.debug("Gateway replied to the Client with :connect message", + resource_id: resource_id, + channel_pid: inspect(channel_pid), + ref: ref + ) + + {:reply, :ok, socket} + end end # def handle_in("metrics", params, socket) do diff --git a/elixir/apps/api/lib/api/gateway/socket.ex b/elixir/apps/api/lib/api/gateway/socket.ex index 9d56848ab..beb778f51 100644 --- a/elixir/apps/api/lib/api/gateway/socket.ex +++ b/elixir/apps/api/lib/api/gateway/socket.ex @@ -2,6 +2,7 @@ defmodule API.Gateway.Socket do use Phoenix.Socket alias Domain.Gateways require Logger + require OpenTelemetry.Tracer ## Channels @@ -11,34 +12,41 @@ defmodule API.Gateway.Socket do @impl true def connect(%{"token" => encrypted_secret} = attrs, socket, connect_info) do - %{ - user_agent: user_agent, - x_headers: x_headers, - peer_data: peer_data - } = connect_info + :otel_propagator_text_map.extract(connect_info.trace_context_headers) - real_ip = API.Sockets.real_ip(x_headers, peer_data) + OpenTelemetry.Tracer.with_span "connect" do + %{ + user_agent: user_agent, + x_headers: x_headers, + peer_data: peer_data + } = connect_info - attrs = - attrs - |> Map.take(~w[external_id name_suffix public_key]) - |> Map.put("last_seen_user_agent", user_agent) - |> Map.put("last_seen_remote_ip", real_ip) + real_ip = API.Sockets.real_ip(x_headers, peer_data) - with {:ok, token} <- Gateways.authorize_gateway(encrypted_secret), - {:ok, gateway} <- Gateways.upsert_gateway(token, attrs) do - socket = - socket - |> assign(:gateway, gateway) + attrs = + attrs + |> Map.take(~w[external_id name_suffix public_key]) + |> Map.put("last_seen_user_agent", user_agent) + |> Map.put("last_seen_remote_ip", real_ip) - {:ok, socket} - else - {:error, :invalid_token} -> - {:error, :invalid_token} + with {:ok, token} <- Gateways.authorize_gateway(encrypted_secret), + {:ok, gateway} <- Gateways.upsert_gateway(token, attrs) do + socket = + socket + |> assign(:gateway, gateway) + |> assign(:opentelemetry_ctx, OpenTelemetry.Tracer.current_span_ctx()) - {:error, reason} -> - Logger.debug("Error connecting gateway websocket: #{inspect(reason)}") - {:error, reason} + {:ok, socket} + else + {:error, :invalid_token} -> + OpenTelemetry.Tracer.set_status(:error, "invalid_token") + {:error, :invalid_token} + + {:error, reason} -> + OpenTelemetry.Tracer.set_status(:error, inspect(reason)) + Logger.debug("Error connecting gateway websocket: #{inspect(reason)}") + {:error, reason} + end end end diff --git a/elixir/apps/api/lib/api/relay/channel.ex b/elixir/apps/api/lib/api/relay/channel.ex index 237236c85..6f28361fb 100644 --- a/elixir/apps/api/lib/api/relay/channel.ex +++ b/elixir/apps/api/lib/api/relay/channel.ex @@ -1,18 +1,24 @@ defmodule API.Relay.Channel do use API, :channel alias Domain.Relays + require OpenTelemetry.Tracer @impl true def join("relay", %{"stamp_secret" => stamp_secret}, socket) do - send(self(), {:after_join, stamp_secret}) - {:ok, socket} + OpenTelemetry.Tracer.with_span socket.assigns.opentelemetry_ctx, "join", %{} do + opentelemetry_ctx = OpenTelemetry.Tracer.current_span_ctx() + send(self(), {:after_join, stamp_secret, opentelemetry_ctx}) + {:ok, assign(socket, opentelemetry_ctx: opentelemetry_ctx)} + end end @impl true - def handle_info({:after_join, stamp_secret}, socket) do - API.Endpoint.subscribe("relay:#{socket.assigns.relay.id}") - push(socket, "init", %{}) - :ok = Relays.connect_relay(socket.assigns.relay, stamp_secret) - {:noreply, socket} + def handle_info({:after_join, stamp_secret, opentelemetry_ctx}, socket) do + OpenTelemetry.Tracer.with_span opentelemetry_ctx, "after_join", %{} do + API.Endpoint.subscribe("relay:#{socket.assigns.relay.id}") + push(socket, "init", %{}) + :ok = Relays.connect_relay(socket.assigns.relay, stamp_secret) + {:noreply, socket} + end end end diff --git a/elixir/apps/api/lib/api/relay/socket.ex b/elixir/apps/api/lib/api/relay/socket.ex index 29ceee9dd..5347bf837 100644 --- a/elixir/apps/api/lib/api/relay/socket.ex +++ b/elixir/apps/api/lib/api/relay/socket.ex @@ -2,6 +2,7 @@ defmodule API.Relay.Socket do use Phoenix.Socket alias Domain.Relays require Logger + require OpenTelemetry.Tracer ## Channels @@ -11,34 +12,41 @@ defmodule API.Relay.Socket do @impl true def connect(%{"token" => encrypted_secret} = attrs, socket, connect_info) do - %{ - user_agent: user_agent, - x_headers: x_headers, - peer_data: peer_data - } = connect_info + :otel_propagator_text_map.extract(connect_info.trace_context_headers) - real_ip = API.Sockets.real_ip(x_headers, peer_data) + OpenTelemetry.Tracer.with_span "connect" do + %{ + user_agent: user_agent, + x_headers: x_headers, + peer_data: peer_data + } = connect_info - attrs = - attrs - |> Map.take(~w[ipv4 ipv6]) - |> Map.put("last_seen_user_agent", user_agent) - |> Map.put("last_seen_remote_ip", real_ip) + real_ip = API.Sockets.real_ip(x_headers, peer_data) - with {:ok, token} <- Relays.authorize_relay(encrypted_secret), - {:ok, relay} <- Relays.upsert_relay(token, attrs) do - socket = - socket - |> assign(:relay, relay) + attrs = + attrs + |> Map.take(~w[ipv4 ipv6]) + |> Map.put("last_seen_user_agent", user_agent) + |> Map.put("last_seen_remote_ip", real_ip) - {:ok, socket} - else - {:error, :invalid_token} -> - {:error, :invalid_token} + with {:ok, token} <- Relays.authorize_relay(encrypted_secret), + {:ok, relay} <- Relays.upsert_relay(token, attrs) do + socket = + socket + |> assign(:relay, relay) + |> assign(:opentelemetry_ctx, OpenTelemetry.Tracer.current_span_ctx()) - {:error, reason} -> - Logger.debug("Error connecting relay websocket: #{inspect(reason)}") - {:error, reason} + {:ok, socket} + else + {:error, :invalid_token} -> + OpenTelemetry.Tracer.set_status(:error, "invalid_token") + {:error, :invalid_token} + + {:error, reason} -> + OpenTelemetry.Tracer.set_status(:error, inspect(reason)) + Logger.debug("Error connecting relay websocket: #{inspect(reason)}") + {:error, reason} + end end end diff --git a/elixir/apps/api/test/api/client/channel_test.exs b/elixir/apps/api/test/api/client/channel_test.exs index a6028537a..b61f489f5 100644 --- a/elixir/apps/api/test/api/client/channel_test.exs +++ b/elixir/apps/api/test/api/client/channel_test.exs @@ -1,5 +1,6 @@ defmodule API.Client.ChannelTest do use API.ChannelCase + alias Domain.Mocks.GoogleCloudPlatform setup do account = Fixtures.Accounts.create_account() @@ -31,6 +32,7 @@ defmodule API.Client.ChannelTest do {:ok, _reply, socket} = API.Client.Socket |> socket("client:#{client.id}", %{ + opentelemetry_ctx: OpenTelemetry.Tracer.start_span("test"), client: client, subject: subject }) @@ -68,6 +70,7 @@ defmodule API.Client.ChannelTest do {:ok, _reply, _socket} = API.Client.Socket |> socket("client:#{client.id}", %{ + opentelemetry_ctx: OpenTelemetry.Tracer.start_span("test"), client: client, subject: subject }) @@ -109,6 +112,34 @@ defmodule API.Client.ChannelTest do end end + describe "handle_in/3 create_log_sink" do + test "returns error when feature is disabled", %{socket: socket} do + Domain.Config.put_env_override(Domain.Instrumentation, client_logs_enabled: false) + + ref = push(socket, "create_log_sink", %{}) + assert_reply ref, :error, :disabled + end + + test "returns a signed URL which can be used to upload the logs", %{ + socket: socket, + client: client + } do + bypass = Bypass.open() + GoogleCloudPlatform.mock_instance_metadata_token_endpoint(bypass) + GoogleCloudPlatform.mock_sign_blob_endpoint(bypass, "foo") + + ref = push(socket, "create_log_sink", %{}) + assert_reply ref, :ok, signed_url + + assert signed_uri = URI.parse(signed_url) + assert signed_uri.scheme == "https" + assert signed_uri.host == "storage.googleapis.com" + + assert String.starts_with?(signed_uri.path, "/logs/clients/#{client.id}/") + assert String.ends_with?(signed_uri.path, ".json") + end + end + describe "handle_in/3 prepare_connection" do test "returns error when resource is not found", %{socket: socket} do ref = push(socket, "prepare_connection", %{"resource_id" => Ecto.UUID.generate()}) @@ -290,7 +321,7 @@ defmodule API.Client.ChannelTest do push(socket, "reuse_connection", attrs) - assert_receive {:allow_access, payload} + assert_receive {:allow_access, payload, _opentelemetry_ctx} assert %{ resource_id: ^resource_id, @@ -384,7 +415,7 @@ defmodule API.Client.ChannelTest do ref = push(socket, "request_connection", attrs) - assert_receive {:request_connection, {channel_pid, socket_ref}, payload} + assert_receive {:request_connection, {channel_pid, socket_ref}, payload, _opentelemetry_ctx} assert %{ resource_id: ^resource_id, @@ -396,7 +427,11 @@ defmodule API.Client.ChannelTest do assert authorization_expires_at == socket.assigns.subject.expires_at - send(channel_pid, {:connect, socket_ref, resource.id, gateway.public_key, "FULL_RTC_SD"}) + send( + channel_pid, + {:connect, socket_ref, resource.id, gateway.public_key, "FULL_RTC_SD", + OpenTelemetry.Tracer.start_span("connect")} + ) assert_reply ref, :ok, %{ resource_id: ^resource_id, diff --git a/elixir/apps/api/test/api/client/socket_test.exs b/elixir/apps/api/test/api/client/socket_test.exs index b474df514..cede2f0e4 100644 --- a/elixir/apps/api/test/api/client/socket_test.exs +++ b/elixir/apps/api/test/api/client/socket_test.exs @@ -7,7 +7,8 @@ defmodule API.Client.SocketTest do @connect_info %{ user_agent: "iOS/12.7 (iPhone) connlib/0.1.1", peer_data: %{address: {189, 172, 73, 001}}, - x_headers: [{"x-forwarded-for", "189.172.73.153"}] + x_headers: [{"x-forwarded-for", "189.172.73.153"}], + trace_context_headers: [] } describe "connect/3" do @@ -62,7 +63,8 @@ defmodule API.Client.SocketTest do %{ user_agent: subject.context.user_agent, peer_data: %{address: subject.context.remote_ip}, - x_headers: [] + x_headers: [], + trace_context_headers: [] } end diff --git a/elixir/apps/api/test/api/gateway/channel_test.exs b/elixir/apps/api/test/api/gateway/channel_test.exs index 387e47c0f..11c0d8ea5 100644 --- a/elixir/apps/api/test/api/gateway/channel_test.exs +++ b/elixir/apps/api/test/api/gateway/channel_test.exs @@ -17,7 +17,10 @@ defmodule API.Gateway.ChannelTest do {:ok, _, socket} = API.Gateway.Socket - |> socket("gateway:#{gateway.id}", %{gateway: gateway}) + |> socket("gateway:#{gateway.id}", %{ + gateway: gateway, + opentelemetry_ctx: OpenTelemetry.Tracer.start_span("test") + }) |> subscribe_and_join(API.Gateway.Channel, "gateway") relay = Fixtures.Relays.create_relay(account: account) @@ -67,6 +70,7 @@ defmodule API.Gateway.ChannelTest do socket: socket } do expires_at = DateTime.utc_now() |> DateTime.add(30, :second) + opentelemetry_ctx = OpenTelemetry.Tracer.start_span("test") stamp_secret = Ecto.UUID.generate() :ok = Domain.Relays.connect_relay(relay, stamp_secret) @@ -78,7 +82,7 @@ defmodule API.Gateway.ChannelTest do client_id: client.id, resource_id: resource.id, authorization_expires_at: expires_at - }} + }, opentelemetry_ctx} ) assert_push "allow_access", payload @@ -115,6 +119,8 @@ defmodule API.Gateway.ChannelTest do preshared_key = "PSK" rtc_session_description = "RTC_SD" + opentelemetry_ctx = OpenTelemetry.Tracer.start_span("test") + stamp_secret = Ecto.UUID.generate() :ok = Domain.Relays.connect_relay(relay, stamp_secret) @@ -127,7 +133,7 @@ defmodule API.Gateway.ChannelTest do authorization_expires_at: expires_at, client_rtc_session_description: rtc_session_description, client_preshared_key: preshared_key - }} + }, opentelemetry_ctx} ) assert_push "request_connection", payload @@ -217,6 +223,8 @@ defmodule API.Gateway.ChannelTest do gateway_public_key = gateway.public_key rtc_session_description = "RTC_SD" + opentelemetry_ctx = OpenTelemetry.Tracer.start_span("test") + stamp_secret = Ecto.UUID.generate() :ok = Domain.Relays.connect_relay(relay, stamp_secret) @@ -229,7 +237,7 @@ defmodule API.Gateway.ChannelTest do authorization_expires_at: expires_at, client_rtc_session_description: rtc_session_description, client_preshared_key: preshared_key - }} + }, opentelemetry_ctx} ) assert_push "request_connection", %{ref: ref} @@ -243,7 +251,7 @@ defmodule API.Gateway.ChannelTest do assert_reply push_ref, :ok assert_receive {:connect, ^socket_ref, resource_id, ^gateway_public_key, - ^rtc_session_description} + ^rtc_session_description, _opentelemetry_ctx} assert resource_id == resource.id end diff --git a/elixir/apps/api/test/api/gateway/socket_test.exs b/elixir/apps/api/test/api/gateway/socket_test.exs index 54ee9de4a..873129d0a 100644 --- a/elixir/apps/api/test/api/gateway/socket_test.exs +++ b/elixir/apps/api/test/api/gateway/socket_test.exs @@ -9,7 +9,8 @@ defmodule API.Gateway.SocketTest do @connect_info %{ user_agent: "iOS/12.7 (iPhone) connlib/#{@connlib_version}", peer_data: %{address: {189, 172, 73, 001}}, - x_headers: [{"x-forwarded-for", "189.172.73.153"}] + x_headers: [{"x-forwarded-for", "189.172.73.153"}], + trace_context_headers: [] } describe "connect/3" do diff --git a/elixir/apps/api/test/api/relay/channel_test.exs b/elixir/apps/api/test/api/relay/channel_test.exs index 5ad1b754c..fcca30501 100644 --- a/elixir/apps/api/test/api/relay/channel_test.exs +++ b/elixir/apps/api/test/api/relay/channel_test.exs @@ -8,7 +8,10 @@ defmodule API.Relay.ChannelTest do {:ok, _, socket} = API.Relay.Socket - |> socket("relay:#{relay.id}", %{relay: relay}) + |> socket("relay:#{relay.id}", %{ + relay: relay, + opentelemetry_ctx: OpenTelemetry.Tracer.start_span("test") + }) |> subscribe_and_join(API.Relay.Channel, "relay", %{stamp_secret: stamp_secret}) %{relay: relay, socket: socket} @@ -30,7 +33,10 @@ defmodule API.Relay.ChannelTest do {:ok, _, _socket} = API.Relay.Socket - |> socket("relay:#{relay.id}", %{relay: relay}) + |> socket("relay:#{relay.id}", %{ + relay: relay, + opentelemetry_ctx: OpenTelemetry.Tracer.start_span("test") + }) |> subscribe_and_join(API.Relay.Channel, "relay", %{stamp_secret: stamp_secret}) presence = Domain.Relays.Presence.list("relays") diff --git a/elixir/apps/api/test/api/relay/socket_test.exs b/elixir/apps/api/test/api/relay/socket_test.exs index b9a0a50fa..40b6e2a84 100644 --- a/elixir/apps/api/test/api/relay/socket_test.exs +++ b/elixir/apps/api/test/api/relay/socket_test.exs @@ -9,7 +9,8 @@ defmodule API.Relay.SocketTest do @connect_info %{ user_agent: "iOS/12.7 (iPhone) connlib/#{@connlib_version}", peer_data: %{address: {189, 172, 73, 001}}, - x_headers: [{"x-forwarded-for", "189.172.73.153"}] + x_headers: [{"x-forwarded-for", "189.172.73.153"}], + trace_context_headers: [] } describe "connect/3" do diff --git a/elixir/apps/api/test/support/channel_case.ex b/elixir/apps/api/test/support/channel_case.ex index 880e01512..6299d66ec 100644 --- a/elixir/apps/api/test/support/channel_case.ex +++ b/elixir/apps/api/test/support/channel_case.ex @@ -15,6 +15,7 @@ defmodule API.ChannelCase do import API.ChannelCase alias Domain.Repo alias Domain.Fixtures + require OpenTelemetry.Tracer # The default endpoint for testing @endpoint API.Endpoint diff --git a/elixir/apps/domain/lib/domain/application.ex b/elixir/apps/domain/lib/domain/application.ex index 10c636385..e61f3a665 100644 --- a/elixir/apps/domain/lib/domain/application.ex +++ b/elixir/apps/domain/lib/domain/application.ex @@ -22,21 +22,23 @@ defmodule Domain.Application do def children do [ - # Infrastructure services + # Core services Domain.Repo, {Phoenix.PubSub, name: Domain.PubSub}, + # Infrastructure services + # Note: only one of platform adapters will be actually started. + Domain.GoogleCloudPlatform, + Domain.Cluster, + # Application Domain.Auth, Domain.Relays, Domain.Gateways, - Domain.Clients, + Domain.Clients # Observability # Domain.Telemetry - - # Erlang Clustering - Domain.Cluster ] end end diff --git a/elixir/apps/domain/lib/domain/cluster.ex b/elixir/apps/domain/lib/domain/cluster.ex index b74016dbd..45827137d 100644 --- a/elixir/apps/domain/lib/domain/cluster.ex +++ b/elixir/apps/domain/lib/domain/cluster.ex @@ -10,7 +10,6 @@ defmodule Domain.Cluster do config = Domain.Config.fetch_env!(:domain, __MODULE__) adapter = Keyword.fetch!(config, :adapter) adapter_config = Keyword.fetch!(config, :adapter_config) - pool_opts = Domain.Config.fetch_env!(:domain, :http_client_ssl_opts) topology_config = [ default: [ @@ -19,12 +18,8 @@ defmodule Domain.Cluster do ] ] - shared_children = [ - {Finch, name: __MODULE__.Finch, pools: %{default: pool_opts}} - ] - children = - if adapter != Domain.Cluster.Local do + if adapter do [ {Cluster.Supervisor, [topology_config, [name: __MODULE__]]} ] @@ -32,6 +27,6 @@ defmodule Domain.Cluster do [] end - Supervisor.init(shared_children ++ children, strategy: :rest_for_one) + Supervisor.init(children, strategy: :rest_for_one) end end diff --git a/elixir/apps/domain/lib/domain/cluster/google_compute_labels_strategy.ex b/elixir/apps/domain/lib/domain/cluster/google_compute_labels_strategy.ex index c1fb040ac..3b85158af 100644 --- a/elixir/apps/domain/lib/domain/cluster/google_compute_labels_strategy.ex +++ b/elixir/apps/domain/lib/domain/cluster/google_compute_labels_strategy.ex @@ -21,22 +21,21 @@ defmodule Domain.Cluster.GoogleComputeLabelsStrategy do defmodule Meta do @type t :: %{ - access_token: String.t(), - access_token_expires_at: DateTime.t(), nodes: MapSet.t() } - defstruct access_token: nil, - access_token_expires_at: nil, - nodes: MapSet.new() + defstruct nodes: MapSet.new() end - @default_polling_interval 5_000 + @default_polling_interval :timer.seconds(10) def start_link(args), do: GenServer.start_link(__MODULE__, args) @impl true def init([%State{} = state]) do + unless Domain.GoogleCloudPlatform.enabled?(), + do: "Google Cloud Platform clustering strategy requires GoogleCloudPlatform to be enabled" + {:ok, %{state | meta: %Meta{}}, {:continue, :start}} end @@ -103,63 +102,12 @@ defmodule Domain.Cluster.GoogleComputeLabelsStrategy do %State{state | meta: %{state.meta | nodes: new_nodes}} end - @doc false - # We use Google Compute Engine metadata server to fetch the node access token, - # it will have scopes declared in the instance template but actual permissions - # are limited by the service account attached to it. - def refresh_access_token(state) do - config = fetch_config!() - token_endpoint_url = Keyword.fetch!(config, :token_endpoint_url) - request = Finch.build(:get, token_endpoint_url, [{"Metadata-Flavor", "Google"}]) - - case Finch.request(request, Domain.Cluster.Finch) do - {:ok, %Finch.Response{status: 200, body: response}} -> - %{"access_token" => access_token, "expires_in" => expires_in} = Jason.decode!(response) - access_token_expires_at = DateTime.utc_now() |> DateTime.add(expires_in - 1, :second) - - {:ok, - %{ - state - | meta: %{ - state.meta - | access_token: access_token, - access_token_expires_at: access_token_expires_at - } - }} - - {:ok, response} -> - Cluster.Logger.warn(state.topology, "Can't fetch instance metadata: #{inspect(response)}") - {:error, {response.status, response.body}} - - {:error, reason} -> - Cluster.Logger.warn(state.topology, "Can not fetch instance metadata: #{inspect(reason)}") - {:error, reason} - end - end - - defp maybe_refresh_access_token(state) do - cond do - is_nil(state.meta.access_token) -> - refresh_access_token(state) - - is_nil(state.meta.access_token_expires_at) -> - refresh_access_token(state) - - DateTime.diff(state.meta.access_token_expires_at, DateTime.utc_now()) > 0 -> - {:ok, state} - - true -> - refresh_access_token(state) - end - end - @doc false # We use Google Compute API to fetch the list of instances in all regions of a project, # instances are filtered by cluster name and status, and then we use this instance labels # to figure out the actual node name (which is set in `rel/env.sh.eex` by also reading node metadata). def fetch_nodes(state, remaining_retry_count \\ 3) do - with {:ok, state} <- maybe_refresh_access_token(state), - {:ok, nodes} <- fetch_google_cloud_instances(state) do + with {:ok, nodes} <- list_google_cloud_cluster_nodes(state) do {:ok, nodes, state} else {:error, %{"error" => %{"code" => 401}} = reason} -> @@ -171,7 +119,6 @@ defmodule Domain.Cluster.GoogleComputeLabelsStrategy do if remaining_retry_count == 0 do {:error, reason} else - {:ok, state} = refresh_access_token(state) fetch_nodes(state, remaining_retry_count - 1) end @@ -191,41 +138,20 @@ defmodule Domain.Cluster.GoogleComputeLabelsStrategy do end end - defp fetch_google_cloud_instances(state) do + defp list_google_cloud_cluster_nodes(state) do project_id = Keyword.fetch!(state.config, :project_id) cluster_name = Keyword.fetch!(state.config, :cluster_name) cluster_name_label = Keyword.get(state.config, :cluster_name_label, "cluster_name") node_name_label = Keyword.get(state.config, :node_name_label, "application") - aggregated_list_endpoint_url = - fetch_config!() - |> Keyword.fetch!(:aggregated_list_endpoint_url) - |> String.replace("${project_id}", project_id) - - filter = "labels.#{cluster_name_label}=#{cluster_name} AND status=RUNNING" - query = URI.encode_query(%{"filter" => filter}) - - request = - Finch.build(:get, aggregated_list_endpoint_url <> "?" <> query, [ - {"Authorization", "Bearer #{state.meta.access_token}"} - ]) - - with {:ok, %Finch.Response{status: 200, body: response}} <- - Finch.request(request, Domain.Cluster.Finch), - {:ok, %{"items" => items}} <- Jason.decode(response) do + with {:ok, instances} <- + Domain.GoogleCloudPlatform.list_google_cloud_instances_by_label( + project_id, + cluster_name_label, + cluster_name + ) do nodes = - items - |> Enum.flat_map(fn - {_zone, %{"instances" => instances}} -> - instances - - {_zone, %{"warning" => %{"code" => "NO_RESULTS_ON_PAGE"}}} -> - [] - end) - |> Enum.filter(fn - %{"status" => "RUNNING", "labels" => %{^cluster_name_label => ^cluster_name}} -> true - %{"status" => _status, "labels" => _labels} -> false - end) + instances |> Enum.map(fn %{"zone" => zone, "name" => name, "labels" => labels} -> release_name = Map.fetch!(labels, node_name_label) zone = String.split(zone, "/") |> List.last() @@ -235,22 +161,9 @@ defmodule Domain.Cluster.GoogleComputeLabelsStrategy do end) {:ok, nodes} - else - {:ok, %Finch.Response{status: status, body: body}} -> - {:error, {status, body}} - - {:ok, map} -> - {:error, map} - - {:error, reason} -> - {:error, reason} end end - defp fetch_config! do - Domain.Config.fetch_env!(:domain, __MODULE__) - end - defp polling_interval(%State{config: config}) do Keyword.get(config, :polling_interval, @default_polling_interval) end diff --git a/elixir/apps/domain/lib/domain/config/definitions.ex b/elixir/apps/domain/lib/domain/config/definitions.ex index 9e296afb8..08cef280f 100644 --- a/elixir/apps/domain/lib/domain/config/definitions.ex +++ b/elixir/apps/domain/lib/domain/config/definitions.ex @@ -63,6 +63,11 @@ defmodule Domain.Config.Definitions do :database_ssl_opts, :database_parameters ]}, + {"Cloud Platform", + [ + :platform_adapter, + :platform_adapter_config + ]}, {"Erlang Cluster", [ :erlang_cluster_adapter, @@ -114,6 +119,11 @@ defmodule Domain.Config.Definitions do :outbound_email_adapter, :outbound_email_adapter_opts ]}, + {"Instrumentation", + [ + :instrumentation_client_logs_enabled, + :instrumentation_client_logs_bucket + ]}, {"Telemetry", [ :telemetry_enabled, @@ -278,14 +288,39 @@ defmodule Domain.Config.Definitions do dump: &Dumper.keyword/1 ) + ############################################## + ## Platform + ############################################## + + @doc """ + Cloud platform on which the Firezone runs on which is used to unlock + platform-specific features (logging, tracing, monitoring, clustering). + """ + defconfig( + :platform_adapter, + {:parameterized, Ecto.Enum, + Ecto.Enum.init( + values: [ + Elixir.Domain.GoogleCloudPlatform + ] + )}, + default: nil + ) + + @doc """ + Config for the platform adapter. + """ + defconfig(:platform_adapter_config, :map, + default: [], + dump: &Dumper.keyword/1 + ) + ############################################## ## Erlang Cluster ############################################## @doc """ An adapter that will be used to discover and connect nodes to the Erlang cluster. - - Set to `Domain.Cluster.Local` to disable """ defconfig( :erlang_cluster_adapter, @@ -295,18 +330,17 @@ defmodule Domain.Config.Definitions do Elixir.Cluster.Strategy.LocalEpmd, Elixir.Cluster.Strategy.Epmd, Elixir.Cluster.Strategy.Gossip, - Elixir.Domain.Cluster.GoogleComputeLabelsStrategy, - Domain.Cluster.Local + Elixir.Domain.Cluster.GoogleComputeLabelsStrategy ] )}, - default: Domain.Cluster.Local + default: nil ) @doc """ Config for the Erlang cluster adapter. """ defconfig(:erlang_cluster_adapter_config, :map, - default: [], + default: %{}, dump: fn map -> keyword = Dumper.keyword(map) @@ -457,6 +491,22 @@ defmodule Domain.Config.Definitions do @doc """ Enable or disable the Firezone telemetry collection. + For more details see https://docs.firezone.dev/reference/telemetry/. + """ + defconfig(:instrumentation_client_logs_enabled, :boolean, default: true) + + @doc """ + Name of the bucket to store client-, relay- and gateway-submitted instrumentation logs in. + """ + defconfig(:instrumentation_client_logs_bucket, :string, default: "logs") + + ############################################## + ## Telemetry + ############################################## + + @doc """ + Enable or disable the Firezone telemetry collection. + For more details see https://docs.firezone.dev/reference/telemetry/. """ defconfig(:telemetry_enabled, :boolean, default: true) diff --git a/elixir/apps/domain/lib/domain/config/resolver.ex b/elixir/apps/domain/lib/domain/config/resolver.ex index 099eab2e2..2e20d7bbd 100644 --- a/elixir/apps/domain/lib/domain/config/resolver.ex +++ b/elixir/apps/domain/lib/domain/config/resolver.ex @@ -35,6 +35,7 @@ defmodule Domain.Config.Resolver do if Mix.env() == :test do def fetch_process_env(pdict_key) do with :error <- fetch_process_value(pdict_key), + :error <- fetch_process_value(Process.get(:last_caller_pid), pdict_key), :error <- fetch_process_value(get_last_pid_from_pdict_list(:"$ancestors"), pdict_key), :error <- fetch_process_value(get_last_pid_from_pdict_list(:"$callers"), pdict_key) do :error diff --git a/elixir/apps/domain/lib/domain/crypto.ex b/elixir/apps/domain/lib/domain/crypto.ex index 55e1e11b6..3c6137483 100644 --- a/elixir/apps/domain/lib/domain/crypto.ex +++ b/elixir/apps/domain/lib/domain/crypto.ex @@ -61,6 +61,12 @@ defmodule Domain.Crypto do defp replace_ambiguous_characters(<>, acc), do: replace_ambiguous_characters(rest, <>) + def hash(type, value) do + :crypto.hash(type, value) + |> Base.encode16() + |> String.downcase() + end + def hash(value), do: Argon2.hash_pwd_salt(value) def equal?(token, hash) when is_nil(token) or is_nil(hash), do: Argon2.no_user_verify() diff --git a/elixir/apps/domain/lib/domain/google_cloud_platform.ex b/elixir/apps/domain/lib/domain/google_cloud_platform.ex new file mode 100644 index 000000000..d64601231 --- /dev/null +++ b/elixir/apps/domain/lib/domain/google_cloud_platform.ex @@ -0,0 +1,139 @@ +defmodule Domain.GoogleCloudPlatform do + use Supervisor + alias Domain.GoogleCloudPlatform.{Instance, URLSigner} + require Logger + + def start_link(opts) do + Supervisor.start_link(__MODULE__, opts, name: __MODULE__.Supervisor) + end + + @impl true + def init(_opts) do + if enabled?() do + pool_opts = Domain.Config.fetch_env!(:domain, :http_client_ssl_opts) + + children = [ + {Finch, name: __MODULE__.Finch, pools: %{default: pool_opts}}, + Instance + ] + + Supervisor.init(children, strategy: :rest_for_one) + else + :ignore + end + end + + def enabled? do + Application.fetch_env!(:domain, :platform_adapter) == __MODULE__ + end + + def fetch_and_cache_access_token do + Domain.GoogleCloudPlatform.Instance.fetch_access_token() + end + + # We use Google Compute Engine metadata server to fetch the node access token, + # it will have scopes declared in the instance template but actual permissions + # are limited by the service account attached to it. + def fetch_access_token do + config = fetch_config!() + token_endpoint_url = Keyword.fetch!(config, :token_endpoint_url) + request = Finch.build(:get, token_endpoint_url, [{"Metadata-Flavor", "Google"}]) + + case Finch.request(request, __MODULE__.Finch) do + {:ok, %Finch.Response{status: 200, body: response}} -> + %{"access_token" => access_token, "expires_in" => expires_in} = Jason.decode!(response) + access_token_expires_at = DateTime.utc_now() |> DateTime.add(expires_in - 1, :second) + {:ok, access_token, access_token_expires_at} + + {:ok, response} -> + Logger.error("Can't fetch instance token", reason: inspect(response)) + {:error, {response.status, response.body}} + + {:error, reason} -> + Logger.error("Can not fetch instance token", reason: inspect(reason)) + {:error, reason} + end + end + + def list_google_cloud_instances_by_label(project_id, label, value) do + aggregated_list_endpoint_url = + fetch_config!() + |> Keyword.fetch!(:aggregated_list_endpoint_url) + |> String.replace("${project_id}", project_id) + + filter = "labels.#{label}=#{value} AND status=RUNNING" + query = URI.encode_query(%{"filter" => filter}) + url = aggregated_list_endpoint_url <> "?" <> query + + with {:ok, access_token} <- fetch_and_cache_access_token(), + request = Finch.build(:get, url, [{"Authorization", "Bearer #{access_token}"}]), + {:ok, %Finch.Response{status: 200, body: response}} <- + Finch.request(request, __MODULE__.Finch), + {:ok, %{"items" => items}} <- Jason.decode(response) do + instances = + items + |> Enum.flat_map(fn + {_zone, %{"instances" => instances}} -> + instances + + {_zone, %{"warning" => %{"code" => "NO_RESULTS_ON_PAGE"}}} -> + [] + end) + |> Enum.filter(fn + %{"status" => "RUNNING", "labels" => %{^label => ^value}} -> true + %{"status" => _status, "labels" => _labels} -> false + end) + + {:ok, instances} + else + {:ok, %Finch.Response{status: status, body: body}} -> + {:error, {status, body}} + + {:ok, map} -> + {:error, map} + + {:error, reason} -> + {:error, reason} + end + end + + @doc """ + Signs a URL which can be used to write or read a file in Google Cloud Storage bucket + using HMAC (without additional network requests to Google API's). + + ## Available options + + * `:verb` - HTTP verb which would be used to access the resource ("PUT", "GET", "HEAD"). + Default: `GET`. + + * `:expires_in` - time in seconds after which signed URL would expire. Default - `:infinity`. + + * `:headers` - Enforce any other headers, eg. `Content-Type` to make sure that signed URL requests + are going to have a specific `Content-Type` header (only when `:verb` is `PUT`). + """ + def sign_url(bucket, filename, opts \\ []) do + with {:ok, service_account_access_token} <- fetch_and_cache_access_token() do + config = fetch_config!() + service_account_email = Keyword.fetch!(config, :service_account_email) + sign_endpoint_url = Keyword.fetch!(config, :sign_endpoint_url) + cloud_storage_url = Keyword.fetch!(config, :cloud_storage_url) + + opts = + opts + |> Keyword.put_new(:sign_endpoint_url, sign_endpoint_url) + |> Keyword.put_new(:cloud_storage_url, cloud_storage_url) + + URLSigner.sign_url( + service_account_email, + service_account_access_token, + bucket, + filename, + opts + ) + end + end + + defp fetch_config! do + Domain.Config.fetch_env!(:domain, __MODULE__) + end +end diff --git a/elixir/apps/domain/lib/domain/google_cloud_platform/instance.ex b/elixir/apps/domain/lib/domain/google_cloud_platform/instance.ex new file mode 100644 index 000000000..4a54b0d74 --- /dev/null +++ b/elixir/apps/domain/lib/domain/google_cloud_platform/instance.ex @@ -0,0 +1,65 @@ +defmodule Domain.GoogleCloudPlatform.Instance do + use GenServer + alias Domain.GoogleCloudPlatform + + def start_link(_opts) do + GenServer.start_link(__MODULE__, nil, name: __MODULE__) + end + + @impl true + def init(nil) do + {:ok, %{access_token: nil, access_token_expires_at: nil}} + end + + def fetch_access_token do + callers = Process.get(:"$callers") || [] + last_caller = List.first(callers) || self() + metadata = Logger.metadata() + GenServer.call(__MODULE__, {:fetch_access_token, last_caller, metadata}) + end + + @impl true + def handle_call({:fetch_access_token, last_caller, metadata}, _from, state) do + # Propagate logger metadata + Logger.metadata(metadata) + + # Allows GenServer to find the caller process for pdict config overrides + Process.put(:last_caller_pid, last_caller) + + case maybe_refresh_access_token(state) do + {:ok, access_token, access_token_expires_at} -> + state = %{ + state + | access_token: access_token, + access_token_expires_at: access_token_expires_at + } + + {:reply, {:ok, access_token}, state} + + {:error, reason} -> + state = %{ + state + | access_token: nil, + access_token_expires_at: nil + } + + {:reply, {:error, reason}, state} + end + end + + defp maybe_refresh_access_token(state) do + cond do + is_nil(state.access_token) -> + GoogleCloudPlatform.fetch_access_token() + + is_nil(state.access_token_expires_at) -> + GoogleCloudPlatform.fetch_access_token() + + DateTime.diff(state.access_token_expires_at, DateTime.utc_now()) > 0 -> + {:ok, state.access_token, state.access_token_expires_at} + + true -> + GoogleCloudPlatform.fetch_access_token() + end + end +end diff --git a/elixir/apps/domain/lib/domain/google_cloud_platform/url_signer.ex b/elixir/apps/domain/lib/domain/google_cloud_platform/url_signer.ex new file mode 100644 index 000000000..bb5223b28 --- /dev/null +++ b/elixir/apps/domain/lib/domain/google_cloud_platform/url_signer.ex @@ -0,0 +1,110 @@ +defmodule Domain.GoogleCloudPlatform.URLSigner do + def sign_url(oauth_identity, oauth_access_token, bucket, filename, opts \\ []) do + sign_endpoint_url = Keyword.fetch!(opts, :sign_endpoint_url) + sign_endpoint_url = sign_endpoint_url <> oauth_identity <> ":signBlob" + + cloud_storage_url = Keyword.fetch!(opts, :cloud_storage_url) + cloud_storage_host = URI.parse(cloud_storage_url).host + + valid_from = Keyword.get(opts, :valid_from, DateTime.utc_now()) + valid_from = DateTime.truncate(valid_from, :second) + valid_from_date = DateTime.to_date(valid_from) + + verb = Keyword.get(opts, :verb, "GET") + expires_in = Keyword.get(opts, :expires_in, 60 * 60 * 24 * 7) + + headers = Keyword.get(opts, :headers, []) + headers = prepare_headers(headers, cloud_storage_host) + canonical_headers = canonical_headers(headers) + signed_headers = signed_headers(headers) + + path = Path.join("/", Path.join(bucket, filename)) + + credential_scope = "#{Date.to_iso8601(valid_from_date, :basic)}/auto/storage/goog4_request" + + canonical_query_string = + [ + {"X-Goog-Algorithm", "GOOG4-RSA-SHA256"}, + {"X-Goog-Credential", "#{oauth_identity}/#{credential_scope}"}, + {"X-Goog-Date", DateTime.to_iso8601(valid_from, :basic)}, + {"X-Goog-SignedHeaders", signed_headers}, + {"X-Goog-Expires", expires_in} + ] + |> Enum.sort() + |> URI.encode_query(:rfc3986) + + canonical_request = + [ + verb, + path, + canonical_query_string, + canonical_headers, + "", + signed_headers, + "UNSIGNED-PAYLOAD" + ] + |> Enum.join("\n") + + string_to_sign = + [ + "GOOG4-RSA-SHA256", + DateTime.to_iso8601(valid_from, :basic), + "#{Date.to_iso8601(valid_from_date, :basic)}/auto/storage/goog4_request", + Domain.Crypto.hash(:sha256, canonical_request) + ] + |> Enum.join("\n") + |> Base.encode64() + + request = + Finch.build( + :post, + sign_endpoint_url, + [{"Authorization", "Bearer #{oauth_access_token}"}], + Jason.encode!(%{"payload" => string_to_sign}) + ) + + with {:ok, %Finch.Response{status: 200, body: response}} <- + Finch.request(request, Domain.GoogleCloudPlatform.Finch), + {:ok, %{"signedBlob" => signature}} <- Jason.decode(response) do + signature = + signature + |> Base.decode64!() + |> Base.encode16() + |> String.downcase() + + {:ok, + "https://#{cloud_storage_host}#{path}?#{canonical_query_string}&X-Goog-Signature=#{signature}"} + else + {:ok, %Finch.Response{status: status, body: body}} -> + {:error, {status, body}} + + {:ok, map} -> + {:error, map} + + {:error, reason} -> + {:error, reason} + end + end + + defp prepare_headers(headers, host) do + headers = [host: host] ++ headers + + headers + |> Enum.map(fn {k, v} -> {k |> to_string() |> String.downcase(), v} end) + |> Enum.sort(fn {k1, _}, {k2, _} -> k1 <= k2 end) + end + + @doc false + def canonical_headers(headers) do + headers + |> Enum.group_by(&elem(&1, 0), &elem(&1, 1)) + |> Enum.map_join("\n", fn {k, v} -> "#{k}:#{Enum.join(v, ",")}" end) + end + + def signed_headers(headers) do + headers + |> Enum.map(&elem(&1, 0)) + |> Enum.uniq() + |> Enum.join(";") + end +end diff --git a/elixir/apps/domain/lib/domain/instrumentation.ex b/elixir/apps/domain/lib/domain/instrumentation.ex new file mode 100644 index 000000000..1ff6130d0 --- /dev/null +++ b/elixir/apps/domain/lib/domain/instrumentation.ex @@ -0,0 +1,27 @@ +defmodule Domain.Instrumentation do + alias Domain.Clients + alias Domain.GoogleCloudPlatform + + def create_remote_log_sink(%Clients.Client{} = client) do + config = config!() + enabled? = Keyword.fetch!(config, :client_logs_enabled) + + if enabled? and GoogleCloudPlatform.enabled?() do + now = DateTime.utc_now() |> DateTime.to_iso8601() + + bucket = + Application.fetch_env!(:domain, __MODULE__) + |> Keyword.fetch!(:client_logs_bucket) + + filename = "clients/#{client.id}/#{now}-#{System.unique_integer([:positive])}.json" + + GoogleCloudPlatform.sign_url(bucket, filename, verb: "PUT") + else + {:error, :disabled} + end + end + + defp config! do + Domain.Config.fetch_env!(:domain, __MODULE__) + end +end diff --git a/elixir/apps/domain/test/domain/cluster/google_compute_labels_strategy_test.exs b/elixir/apps/domain/test/domain/cluster/google_compute_labels_strategy_test.exs index 213345309..2106c64cf 100644 --- a/elixir/apps/domain/test/domain/cluster/google_compute_labels_strategy_test.exs +++ b/elixir/apps/domain/test/domain/cluster/google_compute_labels_strategy_test.exs @@ -5,39 +5,6 @@ defmodule Domain.Cluster.GoogleComputeLabelsStrategyTest do alias Cluster.Strategy.State alias Domain.Mocks.GoogleCloudPlatform - describe "refresh_access_token/1" do - test "returns access token" do - bypass = Bypass.open() - GoogleCloudPlatform.mock_instance_metadata_token_endpoint(bypass) - - state = %State{meta: %Meta{}} - assert {:ok, state} = refresh_access_token(state) - assert state.meta.access_token == "GCP_ACCESS_TOKEN" - - expected_access_token_expires_at = DateTime.add(DateTime.utc_now(), 3595, :second) - - assert DateTime.diff(state.meta.access_token_expires_at, expected_access_token_expires_at) in -2..2 - - assert_receive {:bypass_request, conn} - assert {"metadata-flavor", "Google"} in conn.req_headers - end - - test "returns error when endpoint is not available" do - bypass = Bypass.open() - Bypass.down(bypass) - - GoogleCloudPlatform.override_endpoint_url( - :token_endpoint_url, - "http://localhost:#{bypass.port}/" - ) - - state = %State{meta: %Meta{}} - - assert refresh_access_token(state) == - {:error, %Mint.TransportError{reason: :econnrefused}} - end - end - describe "fetch_nodes/1" do test "returns list of nodes in all regions when access token is not set" do bypass = Bypass.open() @@ -52,130 +19,11 @@ defmodule Domain.Cluster.GoogleComputeLabelsStrategyTest do ] } - assert {:ok, nodes, state} = fetch_nodes(state) + assert {:ok, nodes, _state} = fetch_nodes(state) assert nodes == [ :"api@api-q3j6.us-east1-d.c.firezone-staging.internal" ] - - assert state.meta.access_token - assert state.meta.access_token_expires_at - end - - test "returns list of nodes when token is not expired" do - bypass = Bypass.open() - GoogleCloudPlatform.mock_instances_list_endpoint(bypass) - - state = %State{ - meta: %Meta{ - access_token: "ACCESS_TOKEN", - access_token_expires_at: DateTime.utc_now() |> DateTime.add(5, :second) - }, - config: [ - project_id: "firezone-staging", - cluster_name: "firezone", - backoff_interval: 1 - ] - } - - assert {:ok, nodes, ^state} = fetch_nodes(state) - - assert nodes == [ - :"api@api-q3j6.us-east1-d.c.firezone-staging.internal" - ] - - assert_receive {:bypass_request, conn} - assert {"authorization", "Bearer ACCESS_TOKEN"} in conn.req_headers - end - - test "returns error when compute endpoint is down" do - bypass = Bypass.open() - Bypass.down(bypass) - - GoogleCloudPlatform.override_endpoint_url( - :aggregated_list_endpoint_url, - "http://localhost:#{bypass.port}/" - ) - - state = %State{ - meta: %Meta{ - access_token: "ACCESS_TOKEN", - access_token_expires_at: DateTime.utc_now() |> DateTime.add(5, :second) - }, - config: [ - project_id: "firezone-staging", - cluster_name: "firezone", - backoff_interval: 1 - ] - } - - assert fetch_nodes(state) == {:error, %Mint.TransportError{reason: :econnrefused}} - - GoogleCloudPlatform.override_endpoint_url( - :token_endpoint_url, - "http://localhost:#{bypass.port}/" - ) - - state = %State{ - meta: %Meta{}, - config: [ - project_id: "firezone-staging", - cluster_name: "firezone", - backoff_interval: 1 - ] - } - - assert fetch_nodes(state) == {:error, %Mint.TransportError{reason: :econnrefused}} - end - - test "refreshes the access token if it expired" do - bypass = Bypass.open() - GoogleCloudPlatform.mock_instance_metadata_token_endpoint(bypass) - GoogleCloudPlatform.mock_instances_list_endpoint(bypass) - - state = %State{ - meta: %Meta{ - access_token: "ACCESS_TOKEN", - access_token_expires_at: DateTime.utc_now() |> DateTime.add(-5, :second) - }, - config: [ - project_id: "firezone-staging", - cluster_name: "firezone", - backoff_interval: 1 - ] - } - - assert {:ok, _nodes, updated_state} = fetch_nodes(state) - - assert updated_state.meta.access_token != state.meta.access_token - assert updated_state.meta.access_token_expires_at != state.meta.access_token_expires_at - end - - test "refreshes the access token if it became invalid even through did not expire" do - resp = %{ - "error" => %{ - "code" => 401, - "status" => "UNAUTHENTICATED" - } - } - - bypass = Bypass.open() - GoogleCloudPlatform.mock_instance_metadata_token_endpoint(bypass) - GoogleCloudPlatform.mock_instances_list_endpoint(bypass, resp) - - state = %State{ - meta: %Meta{ - access_token: "ACCESS_TOKEN", - access_token_expires_at: DateTime.utc_now() |> DateTime.add(5, :second) - }, - config: [ - project_id: "firezone-staging", - cluster_name: "firezone", - backoff_interval: 1 - ] - } - - assert {:error, _reason} = fetch_nodes(state) end end end diff --git a/elixir/apps/domain/test/domain/google_cloud_platform_test.exs b/elixir/apps/domain/test/domain/google_cloud_platform_test.exs new file mode 100644 index 000000000..9ba08abe2 --- /dev/null +++ b/elixir/apps/domain/test/domain/google_cloud_platform_test.exs @@ -0,0 +1,155 @@ +defmodule Domain.GoogleCloudPlatformTest do + use ExUnit.Case, async: true + import Domain.GoogleCloudPlatform + alias Domain.Mocks.GoogleCloudPlatform + + setup do + bypass = Bypass.open() + %{bypass: bypass} + end + + describe "fetch_and_cache_access_token/0" do + test "returns instance default account service token", %{bypass: bypass} do + GoogleCloudPlatform.mock_instance_metadata_token_endpoint(bypass) + + assert {:ok, token} = fetch_and_cache_access_token() + assert token + end + + test "caches the access token", %{bypass: bypass} do + GoogleCloudPlatform.mock_instance_metadata_token_endpoint(bypass) + assert {:ok, _token} = fetch_and_cache_access_token() + + Bypass.down(bypass) + assert {:ok, _token} = fetch_and_cache_access_token() + end + end + + describe "fetch_access_token/0" do + test "returns instance default account service token", %{bypass: bypass} do + GoogleCloudPlatform.mock_instance_metadata_token_endpoint(bypass) + + assert {:ok, token, token_expires_at} = fetch_access_token() + assert token + assert %DateTime{} = token_expires_at + end + + test "returns error on failure", %{bypass: bypass} do + Bypass.down(bypass) + GoogleCloudPlatform.mock_instance_metadata_token_endpoint(bypass) + + assert fetch_access_token() == + {:error, %Mint.TransportError{reason: :econnrefused}} + end + end + + describe "list_google_cloud_instances_by_label/3" do + test "returns list of nodes in all regions when access token is not set", %{bypass: bypass} do + GoogleCloudPlatform.mock_instance_metadata_token_endpoint(bypass) + GoogleCloudPlatform.mock_instances_list_endpoint(bypass) + + assert {:ok, nodes} = + list_google_cloud_instances_by_label( + "firezone-staging", + "cluster_name", + "firezone" + ) + + assert length(nodes) == 1 + + assert [ + %{ + "name" => "api-q3j6", + "zone" => + "https://www.googleapis.com/compute/v1/projects/firezone-staging/zones/us-east1-d", + "labels" => %{ + "application" => "api", + "cluster_name" => "firezone", + "container-vm" => "cos-105-17412-101-13", + "managed_by" => "terraform", + "version" => "0-0-1" + } + } + ] = nodes + end + + test "returns error when compute endpoint is down", %{bypass: bypass} do + GoogleCloudPlatform.mock_instance_metadata_token_endpoint(bypass) + + Bypass.down(bypass) + + GoogleCloudPlatform.override_endpoint_url( + :aggregated_list_endpoint_url, + "http://localhost:#{bypass.port}/" + ) + + assert list_google_cloud_instances_by_label("firezone-staging", "cluster_name", "firezone") == + {:error, %Mint.TransportError{reason: :econnrefused}} + + GoogleCloudPlatform.override_endpoint_url( + :token_endpoint_url, + "http://localhost:#{bypass.port}/" + ) + + assert list_google_cloud_instances_by_label("firezone-staging", "cluster_name", "firezone") == + {:error, %Mint.TransportError{reason: :econnrefused}} + end + end + + describe "sign_url/3" do + test "returns error when endpoint is not available", %{bypass: bypass} do + GoogleCloudPlatform.mock_instance_metadata_token_endpoint(bypass) + + GoogleCloudPlatform.override_endpoint_url( + :sign_endpoint_url, + "http://localhost:#{bypass.port}/" + ) + + Bypass.down(bypass) + + assert sign_url("logs", "clients/id/log.json.tar.gz", verb: "PUT") == + {:error, %Mint.TransportError{reason: :econnrefused}} + end + + test "returns error when endpoint returns an error", %{bypass: bypass} do + GoogleCloudPlatform.mock_instance_metadata_token_endpoint(bypass) + GoogleCloudPlatform.mock_sign_blob_endpoint(bypass, "foo", %{"error" => "reason"}) + + assert sign_url("logs", "clients/id/log.json.tar.gz", verb: "PUT") == + {:error, %{"error" => "reason"}} + end + + test "returns signed url", %{bypass: bypass} do + fixed_datetime = ~U[2000-01-01 00:00:00.000000Z] + GoogleCloudPlatform.mock_instance_metadata_token_endpoint(bypass) + GoogleCloudPlatform.mock_sign_blob_endpoint(bypass, "foo") + + assert {:ok, signed_url} = + sign_url("logs", "clients/id/log.json.tar.gz", + verb: "PUT", + valid_from: fixed_datetime + ) + + assert {:ok, signed_uri} = URI.new(signed_url) + + assert signed_uri.scheme == "https" + assert signed_uri.host == "storage.googleapis.com" + assert signed_uri.path == "/logs/clients/id/log.json.tar.gz" + + assert URI.decode_query(signed_uri.query) == %{ + "X-Goog-Algorithm" => "GOOG4-RSA-SHA256", + "X-Goog-Credential" => "foo@iam.example.com/20000101/auto/storage/goog4_request", + "X-Goog-Date" => "20000101T000000Z", + "X-Goog-Expires" => "604800", + "X-Goog-Signature" => "efdd75f1feb87fa75a71ee36e9bf1bd35f777fcdb9e7cd1f", + "X-Goog-SignedHeaders" => "host" + } + + assert_receive {:bypass_request, + %{request_path: "/service_accounts/foo@iam.example.com:signBlob"} = conn} + + assert {"authorization", "Bearer GCP_ACCESS_TOKEN"} in conn.req_headers + assert conn.method == "POST" + end + end +end diff --git a/elixir/apps/domain/test/domain/google_clout_platform/url_signer_test.exs b/elixir/apps/domain/test/domain/google_clout_platform/url_signer_test.exs new file mode 100644 index 000000000..d0fda93ba --- /dev/null +++ b/elixir/apps/domain/test/domain/google_clout_platform/url_signer_test.exs @@ -0,0 +1,38 @@ +defmodule Domain.GoogleCloudPlatform.URLSignerTest do + use ExUnit.Case, async: true + import Domain.GoogleCloudPlatform.URLSigner + + describe "canonical_headers/1" do + test "generates valid canonical form of the headers" do + headers = [ + {"content-type", "text/plain"}, + {"host", "storage.googleapis.com"}, + {"x-goog-meta-reviewer", "jane"}, + {"x-goog-meta-reviewer", "john"} + ] + + assert canonical_headers = canonical_headers(headers) + assert is_binary(canonical_headers) + + assert String.split(canonical_headers, "\n") == + [ + "content-type:text/plain", + "host:storage.googleapis.com", + "x-goog-meta-reviewer:jane,john" + ] + end + end + + describe "signed_headers/1" do + test "generates valid canonical form of the headers" do + headers = [ + {"content-type", "text/plain"}, + {"host", "storage.googleapis.com"}, + {"x-goog-meta-reviewer", "jane"}, + {"x-goog-meta-reviewer", "john"} + ] + + assert signed_headers(headers) == "content-type;host;x-goog-meta-reviewer" + end + end +end diff --git a/elixir/apps/domain/test/domain/instrumentation_test.exs b/elixir/apps/domain/test/domain/instrumentation_test.exs new file mode 100644 index 000000000..5c6d2abe8 --- /dev/null +++ b/elixir/apps/domain/test/domain/instrumentation_test.exs @@ -0,0 +1,32 @@ +defmodule Domain.InstrumentationTest do + use Domain.DataCase, async: true + import Domain.Instrumentation + alias Domain.Mocks.GoogleCloudPlatform + + describe "create_remote_log_sink/1" do + test "returns an error if feature is disabled" do + client = Fixtures.Clients.create_client() + + Domain.Config.put_env_override(Domain.Instrumentation, client_logs_enabled: false) + + assert create_remote_log_sink(client) == {:error, :disabled} + end + + test "returns a signed URL" do + bypass = Bypass.open() + GoogleCloudPlatform.mock_instance_metadata_token_endpoint(bypass) + GoogleCloudPlatform.mock_sign_blob_endpoint(bypass, "foo") + + client = Fixtures.Clients.create_client() + + assert {:ok, signed_url} = create_remote_log_sink(client) + + assert signed_uri = URI.parse(signed_url) + assert signed_uri.scheme == "https" + assert signed_uri.host == "storage.googleapis.com" + + assert String.starts_with?(signed_uri.path, "/logs/clients/#{client.id}/") + assert String.ends_with?(signed_uri.path, ".json") + end + end +end diff --git a/elixir/apps/domain/test/domain/jobs/executors/global_test.exs b/elixir/apps/domain/test/domain/jobs/executors/global_test.exs index 8caba2fa6..11e9a7eba 100644 --- a/elixir/apps/domain/test/domain/jobs/executors/global_test.exs +++ b/elixir/apps/domain/test/domain/jobs/executors/global_test.exs @@ -10,8 +10,8 @@ defmodule Domain.Jobs.Executors.GlobalTest do test "executes the handler on the interval" do assert {:ok, _pid} = start_link({{__MODULE__, :send_test_message}, 25, test_pid: self()}) - assert_receive {:executed, _pid, time1} - assert_receive {:executed, _pid, time2} + assert_receive {:executed, _pid, time1}, 200 + assert_receive {:executed, _pid, time2}, 200 assert time1 < time2 end @@ -25,7 +25,7 @@ defmodule Domain.Jobs.Executors.GlobalTest do }) refute_receive {:executed, _pid, _time}, 50 - assert_receive {:executed, _pid, _time}, 200 + assert_receive {:executed, _pid, _time}, 400 end test "registers itself as a leader if there is no global name registered" do diff --git a/elixir/apps/domain/test/support/mocks/google_cloud_platform.ex b/elixir/apps/domain/test/support/mocks/google_cloud_platform.ex index 5ff3da391..6012f11e8 100644 --- a/elixir/apps/domain/test/support/mocks/google_cloud_platform.ex +++ b/elixir/apps/domain/test/support/mocks/google_cloud_platform.ex @@ -1,13 +1,8 @@ defmodule Domain.Mocks.GoogleCloudPlatform do def override_endpoint_url(endpoint, url) do - config = Domain.Config.fetch_env!(:domain, Domain.Cluster.GoogleComputeLabelsStrategy) - strategy_config = Keyword.put(config, endpoint, url) - - Domain.Config.put_env_override( - :domain, - Domain.Cluster.GoogleComputeLabelsStrategy, - strategy_config - ) + config = Domain.Config.fetch_env!(:domain, Domain.GoogleCloudPlatform) + config = Keyword.put(config, endpoint, url) + Domain.Config.put_env_override(:domain, Domain.GoogleCloudPlatform, config) end def mock_instance_metadata_token_endpoint(bypass, resp \\ nil) do @@ -23,7 +18,7 @@ defmodule Domain.Mocks.GoogleCloudPlatform do test_pid = self() - Bypass.expect(bypass, "GET", token_endpoint_path, fn conn -> + Bypass.stub(bypass, "GET", token_endpoint_path, fn conn -> conn = Plug.Conn.fetch_query_params(conn) send(test_pid, {:bypass_request, conn}) Plug.Conn.send_resp(conn, 200, Jason.encode!(resp)) @@ -37,6 +32,35 @@ defmodule Domain.Mocks.GoogleCloudPlatform do bypass end + def mock_sign_blob_endpoint(bypass, service_account_email, resp \\ nil) do + token_endpoint_path = "service_accounts/#{service_account_email}:signBlob" + + test_pid = self() + + Bypass.expect(bypass, "POST", token_endpoint_path, fn conn -> + conn = Plug.Conn.fetch_query_params(conn) + send(test_pid, {:bypass_request, conn}) + {:ok, binary, conn} = Plug.Conn.read_body(conn) + %{"payload" => payload} = Jason.decode!(binary) + + resp = + resp || + %{ + "keyId" => Ecto.UUID.generate(), + "signedBlob" => Domain.Crypto.hash(:md5, service_account_email <> payload) + } + + Plug.Conn.send_resp(conn, 200, Jason.encode!(resp)) + end) + + override_endpoint_url( + :sign_endpoint_url, + "http://localhost:#{bypass.port}/service_accounts/" + ) + + bypass + end + def mock_instances_list_endpoint(bypass, resp \\ nil) do aggregated_instances_endpoint_path = "compute/v1/projects/firezone-staging/aggregated/instances" diff --git a/elixir/config/config.exs b/elixir/config/config.exs index 803265b4a..e18b9a226 100644 --- a/elixir/config/config.exs +++ b/elixir/config/config.exs @@ -53,6 +53,24 @@ config :domain, Domain.Auth.Adapters.GoogleWorkspace.APIClient, endpoint: "https://admin.googleapis.com", finch_transport_opts: [] +config :domain, platform_adapter: nil + +config :domain, Domain.GoogleCloudPlatform, + token_endpoint_url: + "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token", + aggregated_list_endpoint_url: + "https://compute.googleapis.com/compute/v1/projects/${project_id}/aggregated/instances", + sign_endpoint_url: "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/", + cloud_storage_url: "https://storage.googleapis.com" + +config :domain, Domain.Cluster, + adapter: nil, + adapter_config: [] + +config :domain, Domain.Instrumentation, + client_logs_enabled: true, + client_logs_bucket: "logs" + ############################### ##### Web ##################### ############################### @@ -134,20 +152,6 @@ config :api, external_trusted_proxies: [], private_clients: [%{__struct__: Postgrex.INET, address: {172, 28, 0, 0}, netmask: 16}] -############################### -##### Erlang Cluster ########## -############################### - -config :domain, Domain.Cluster.GoogleComputeLabelsStrategy, - token_endpoint_url: - "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token", - aggregated_list_endpoint_url: - "https://compute.googleapis.com/compute/v1/projects/${project_id}/aggregated/instances" - -config :domain, Domain.Cluster, - adapter: Domain.Cluster.Local, - adapter_config: [] - ############################### ##### Third-party configs ##### ############################### diff --git a/elixir/config/runtime.exs b/elixir/config/runtime.exs index 87685d1eb..6c6249171 100644 --- a/elixir/config/runtime.exs +++ b/elixir/config/runtime.exs @@ -51,6 +51,20 @@ if config_env() == :prod do config :domain, Domain.Auth.Adapters.GoogleWorkspace.APIClient, finch_transport_opts: compile_config!(:http_client_ssl_opts) + config :domain, platform_adapter: compile_config!(:platform_adapter) + + if platform_adapter = compile_config!(:platform_adapter) do + config :domain, platform_adapter, compile_config!(:platform_adapter_config) + end + + config :domain, Domain.Cluster, + adapter: compile_config!(:erlang_cluster_adapter), + adapter_config: compile_config!(:erlang_cluster_adapter_config) + + config :domain, Domain.Instrumentation, + client_logs_enabled: compile_config!(:instrumentation_client_logs_enabled), + client_logs_bucket: compile_config!(:instrumentation_client_logs_bucket) + ############################### ##### Web ##################### ############################### @@ -108,14 +122,6 @@ if config_env() == :prod do external_trusted_proxies: compile_config!(:phoenix_external_trusted_proxies), private_clients: compile_config!(:phoenix_private_clients) - ############################### - ##### Erlang Cluster ########## - ############################### - - config :domain, Domain.Cluster, - adapter: compile_config!(:erlang_cluster_adapter), - adapter_config: compile_config!(:erlang_cluster_adapter_config) - ############################### ##### Third-party configs ##### ############################### diff --git a/elixir/config/test.exs b/elixir/config/test.exs index 42a645c0d..a8387793d 100644 --- a/elixir/config/test.exs +++ b/elixir/config/test.exs @@ -22,6 +22,12 @@ config :domain, Domain.Telemetry, enabled: false config :domain, Domain.ConnectivityChecks, enabled: false +config :domain, platform_adapter: Domain.GoogleCloudPlatform + +config :domain, Domain.GoogleCloudPlatform, + project_id: "fz-test", + service_account_email: "foo@iam.example.com" + ############################### ##### Web ##################### ############################### diff --git a/terraform/environments/staging/main.tf b/terraform/environments/staging/main.tf index c856a0492..27f3f53de 100644 --- a/terraform/environments/staging/main.tf +++ b/terraform/environments/staging/main.tf @@ -232,6 +232,47 @@ resource "google_sql_database" "firezone" { instance = module.google-cloud-sql.master_instance_name } + +resource "google_storage_bucket" "client-logs" { + project = module.google-cloud-project.project.project_id + name = "${module.google-cloud-project.project.project_id}-client-logs" + + location = "US" + + lifecycle_rule { + condition { + age = 3 + } + + action { + type = "Delete" + } + } + + lifecycle_rule { + condition { + age = 1 + } + + action { + type = "AbortIncompleteMultipartUpload" + } + } + + logging { + log_bucket = true + log_object_prefix = "firezone.dev/clients" + } + + public_access_prevention = "enforced" + uniform_bucket_level_access = true + + lifecycle { + prevent_destroy = true + ignore_changes = [] + } +} + locals { cluster = { name = "firezone" @@ -338,6 +379,14 @@ locals { name = "TELEMETRY_ENABLED" value = "false" }, + { + name = "INSTRUMENTATION_CLIENT_LOGS_ENABLED" + value = true + }, + { + name = "INSTRUMENTATION_CLIENT_LOGS_BUCKET" + value = google_storage_bucket.client-logs.name + }, # Emails { name = "OUTBOUND_EMAIL_ADAPTER" @@ -490,6 +539,35 @@ module "api" { application_labels = { "cluster_name" = local.cluster.name } + + application_token_scopes = [ + "https://www.googleapis.com/auth/cloud-platform" + ] +} + +## Allow API nodes to sign URLs for Google Cloud Storage +resource "google_storage_bucket_iam_member" "dealerships-catalog-photos-public-rule" { + bucket = google_storage_bucket.client-logs.name + role = "roles/storage.objectAdmin" + member = "serviceAccount:${module.api.service_account.email}" +} + +resource "google_project_iam_custom_role" "sign-urls" { + project = module.google-cloud-project.project.project_id + + title = "Sign URLs for Google Cloud Storage" + + role_id = "iam.sign_urls" + + permissions = [ + "iam.serviceAccounts.signBlob" + ] +} + +resource "google_project_iam_member" "sign-urls" { + project = module.google-cloud-project.project.project_id + role = "projects/${module.google-cloud-project.project.project_id}/roles/${google_project_iam_custom_role.sign-urls.role_id}" + member = "serviceAccount:${module.api.service_account.email}" } # Erlang Cluster @@ -660,6 +738,7 @@ resource "google_compute_firewall" "ssh-ipv4" { source_ranges = ["0.0.0.0/0"] target_tags = concat(module.web.target_tags, module.api.target_tags) } + resource "google_compute_firewall" "ssh-ipv6" { project = module.google-cloud-project.project.project_id @@ -681,7 +760,7 @@ resource "google_compute_firewall" "ssh-ipv6" { ports = [22] } - source_ranges = ["::0/0"] + source_ranges = ["::/0"] target_tags = concat(module.web.target_tags, module.api.target_tags) } @@ -735,6 +814,6 @@ resource "google_compute_firewall" "relays-ssh-ipv6" { ports = [22] } - source_ranges = ["::0/0"] + source_ranges = ["::/0"] target_tags = module.relays[0].target_tags } diff --git a/terraform/modules/elixir-app/main.tf b/terraform/modules/elixir-app/main.tf index 47e7d5c9f..719b9df0d 100644 --- a/terraform/modules/elixir-app/main.tf +++ b/terraform/modules/elixir-app/main.tf @@ -36,6 +36,17 @@ locals { { name = "OTEL_RESOURCE_ATTRIBUTES" value = "application.name=${local.application_name}" + }, + { + name = "PLATFORM_ADAPTER" + value = "Elixir.Domain.GoogleCloudPlatform" + }, + { + name = "PLATFORM_ADAPTER_CONFIG" + value = jsonencode({ + project_id = var.project_id + service_account_email = google_service_account.application.email + }) } ], var.application_environment_variables) @@ -168,7 +179,7 @@ resource "google_compute_instance_template" "application" { service_account { email = google_service_account.application.email - scopes = [ + scopes = concat([ # Those are default scopes "https://www.googleapis.com/auth/devstorage.read_only", "https://www.googleapis.com/auth/logging.write", @@ -178,7 +189,7 @@ resource "google_compute_instance_template" "application" { "https://www.googleapis.com/auth/trace.append", # Required to discover the other instances in the Erlang Cluster "https://www.googleapis.com/auth/compute.readonly" - ] + ], var.application_token_scopes) } shielded_instance_config { @@ -622,7 +633,7 @@ resource "google_compute_firewall" "egress-ipv6" { direction = "EGRESS" target_tags = ["app-${local.application_name}"] - destination_ranges = ["::0/0"] + destination_ranges = ["::/0"] allow { protocol = "all" diff --git a/terraform/modules/elixir-app/variables.tf b/terraform/modules/elixir-app/variables.tf index 0adf18386..141a29028 100644 --- a/terraform/modules/elixir-app/variables.tf +++ b/terraform/modules/elixir-app/variables.tf @@ -212,6 +212,14 @@ variable "application_labels" { description = "Labels to add to all created by this module resources." } +variable "application_token_scopes" { + type = list(string) + nullable = false + default = [] + + description = "Any extra oAuth2 token scopes granted to the token of default service account." +} + variable "application_dns_tld" { type = string nullable = false diff --git a/terraform/modules/relay-app/main.tf b/terraform/modules/relay-app/main.tf index e2d9bb460..b86ad5d95 100644 --- a/terraform/modules/relay-app/main.tf +++ b/terraform/modules/relay-app/main.tf @@ -394,7 +394,7 @@ resource "google_compute_firewall" "stun-turn-ipv6" { name = "${local.application_name}-firewall-lb-to-instances-ipv6" network = google_compute_network.network.self_link - source_ranges = ["::0/0"] + source_ranges = ["::/0"] target_tags = ["app-${local.application_name}"] allow { @@ -448,7 +448,7 @@ resource "google_compute_firewall" "ingress-ipv6" { direction = "INGRESS" target_tags = ["app-${local.application_name}"] - source_ranges = ["::0/0"] + source_ranges = ["::/0"] allow { protocol = "udp" @@ -479,7 +479,7 @@ resource "google_compute_firewall" "egress-ipv6" { direction = "EGRESS" target_tags = ["app-${local.application_name}"] - destination_ranges = ["::0/0"] + destination_ranges = ["::/0"] allow { protocol = "udp"