Allow client logs and messages instrumentation (#2086)

Closes #2019
This commit is contained in:
Andrew Dryga
2023-09-18 15:03:51 -06:00
committed by GitHub
parent 68f2bac3a1
commit 9281b7fede
37 changed files with 1241 additions and 551 deletions

View File

@@ -309,8 +309,6 @@ services:
EXTERNAL_URL: http://localhost:8081/
# Erlang
ERLANG_DISTRIBUTION_PORT: 9000
ERLANG_CLUSTER_ADAPTER: "Elixir.Domain.Cluster.Local"
ERLANG_CLUSTER_ADAPTER_CONFIG: '{}'
RELEASE_COOKIE: "NksuBhJFBhjHD1uUa9mDOHV"
RELEASE_HOSTNAME: "mix.cluster.local"
RELEASE_NAME: "mix"

View File

@@ -1,40 +1,51 @@
defmodule API.Client.Channel do
use API, :channel
alias API.Client.Views
alias Domain.Instrumentation
alias Domain.{Clients, Resources, Gateways, Relays}
require Logger
require OpenTelemetry.Tracer
@impl true
def join("client", _payload, socket) do
expires_in =
DateTime.diff(socket.assigns.subject.expires_at, DateTime.utc_now(), :millisecond)
OpenTelemetry.Tracer.with_span socket.assigns.opentelemetry_ctx, "join", %{} do
opentelemetry_ctx = OpenTelemetry.Tracer.current_span_ctx()
if expires_in > 0 do
Process.send_after(self(), :token_expired, expires_in)
send(self(), :after_join)
{:ok, socket}
else
{:error, %{"reason" => "token_expired"}}
expires_in =
DateTime.diff(socket.assigns.subject.expires_at, DateTime.utc_now(), :millisecond)
if expires_in > 0 do
Process.send_after(self(), :token_expired, expires_in)
send(self(), {:after_join, opentelemetry_ctx})
{:ok, assign(socket, opentelemetry_ctx: opentelemetry_ctx)}
else
{:error, %{"reason" => "token_expired"}}
end
end
end
@impl true
def handle_info(:after_join, socket) do
API.Endpoint.subscribe("client:#{socket.assigns.client.id}")
:ok = Clients.connect_client(socket.assigns.client)
def handle_info({:after_join, opentelemetry_ctx}, socket) do
OpenTelemetry.Tracer.with_span opentelemetry_ctx, "after_join", %{} do
API.Endpoint.subscribe("client:#{socket.assigns.client.id}")
:ok = Clients.connect_client(socket.assigns.client)
{:ok, resources} = Domain.Resources.list_resources(socket.assigns.subject)
{:ok, resources} = Domain.Resources.list_resources(socket.assigns.subject)
:ok =
push(socket, "init", %{
resources: Views.Resource.render_many(resources),
interface: Views.Interface.render(socket.assigns.client)
})
:ok =
push(socket, "init", %{
resources: Views.Resource.render_many(resources),
interface: Views.Interface.render(socket.assigns.client)
})
{:noreply, socket}
{:noreply, socket}
end
end
def handle_info(:token_expired, socket) do
OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_ctx)
OpenTelemetry.Tracer.add_event("token_expired", %{})
push(socket, "token_expired", %{})
{:stop, :token_expired, socket}
end
@@ -42,9 +53,13 @@ defmodule API.Client.Channel do
# This message is sent by the gateway when it is ready
# to accept the connection from the client
def handle_info(
{:connect, socket_ref, resource_id, gateway_public_key, rtc_session_description},
{:connect, socket_ref, resource_id, gateway_public_key, rtc_session_description,
opentelemetry_ctx},
socket
) do
OpenTelemetry.Tracer.set_current_span(opentelemetry_ctx)
OpenTelemetry.Tracer.add_event("connect", %{resource_id: resource_id})
reply(
socket_ref,
{:ok,
@@ -60,6 +75,9 @@ defmodule API.Client.Channel do
end
def handle_info({:resource_added, resource_id}, socket) do
OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_ctx)
OpenTelemetry.Tracer.add_event("resource_added", %{resource_id: resource_id})
with {:ok, resource} <- Resources.fetch_resource_by_id(resource_id, socket.assigns.subject) do
push(socket, "resource_added", Views.Resource.render(resource))
end
@@ -68,6 +86,9 @@ defmodule API.Client.Channel do
end
def handle_info({:resource_updated, resource_id}, socket) do
OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_ctx)
OpenTelemetry.Tracer.add_event("resource_updated", %{resource_id: resource_id})
with {:ok, resource} <- Resources.fetch_resource_by_id(resource_id, socket.assigns.subject) do
push(socket, "resource_updated", Views.Resource.render(resource))
end
@@ -76,35 +97,56 @@ defmodule API.Client.Channel do
end
def handle_info({:resource_removed, resource_id}, socket) do
OpenTelemetry.Tracer.set_current_span(socket.assigns.opentelemetry_ctx)
OpenTelemetry.Tracer.add_event("resource_updated", %{resource_id: resource_id})
push(socket, "resource_removed", resource_id)
{:noreply, socket}
end
def handle_in("create_log_sink", _attrs, socket) do
OpenTelemetry.Tracer.with_span socket.assigns.opentelemetry_ctx, "create_log_sink", %{} do
case Instrumentation.create_remote_log_sink(socket.assigns.client) do
{:ok, signed_url} -> {:reply, {:ok, signed_url}, socket}
{:error, :disabled} -> {:reply, {:error, :disabled}, socket}
end
end
end
@impl true
def handle_in("prepare_connection", %{"resource_id" => resource_id} = attrs, socket) do
connected_gateway_ids = Map.get(attrs, "connected_gateway_ids", [])
OpenTelemetry.Tracer.with_span socket.assigns.opentelemetry_ctx,
"prepare_connection",
attrs do
connected_gateway_ids = Map.get(attrs, "connected_gateway_ids", [])
with {:ok, resource} <- Resources.fetch_resource_by_id(resource_id, socket.assigns.subject),
# TODO:
# :ok = Resource.authorize(resource, socket.assigns.subject),
{:ok, [_ | _] = gateways} <-
Gateways.list_connected_gateways_for_resource(resource),
{:ok, [_ | _] = relays} <- Relays.list_connected_relays_for_resource(resource) do
gateway = Gateways.load_balance_gateways(gateways, connected_gateway_ids)
with {:ok, resource} <- Resources.fetch_resource_by_id(resource_id, socket.assigns.subject),
# TODO:
# :ok = Resource.authorize(resource, socket.assigns.subject),
{:ok, [_ | _] = gateways} <-
Gateways.list_connected_gateways_for_resource(resource),
{:ok, [_ | _] = relays} <- Relays.list_connected_relays_for_resource(resource) do
gateway = Gateways.load_balance_gateways(gateways, connected_gateway_ids)
reply =
{:ok,
%{
relays: Views.Relay.render_many(relays, socket.assigns.subject.expires_at),
resource_id: resource_id,
gateway_id: gateway.id,
gateway_remote_ip: gateway.last_seen_remote_ip
}}
reply =
{:ok,
%{
relays: Views.Relay.render_many(relays, socket.assigns.subject.expires_at),
resource_id: resource_id,
gateway_id: gateway.id,
gateway_remote_ip: gateway.last_seen_remote_ip
}}
{:reply, reply, socket}
else
{:ok, []} -> {:reply, {:error, :offline}, socket}
{:error, :not_found} -> {:reply, {:error, :not_found}, socket}
{:reply, reply, socket}
else
{:ok, []} ->
OpenTelemetry.Tracer.set_status(:error, "offline")
{:reply, {:error, :offline}, socket}
{:error, :not_found} ->
OpenTelemetry.Tracer.set_status(:error, "not_found")
{:reply, {:error, :not_found}, socket}
end
end
end
@@ -115,28 +157,37 @@ defmodule API.Client.Channel do
%{
"gateway_id" => gateway_id,
"resource_id" => resource_id
},
} = attrs,
socket
) do
with {:ok, resource} <- Resources.fetch_resource_by_id(resource_id, socket.assigns.subject),
# :ok = Resource.authorize(resource, socket.assigns.subject),
{:ok, gateway} <- Gateways.fetch_gateway_by_id(gateway_id, socket.assigns.subject),
true <- Gateways.gateway_can_connect_to_resource?(gateway, resource) do
:ok =
API.Gateway.Channel.broadcast(
gateway,
{:allow_access,
%{
client_id: socket.assigns.client.id,
resource_id: resource.id,
authorization_expires_at: socket.assigns.subject.expires_at
}}
)
OpenTelemetry.Tracer.with_span socket.assigns.opentelemetry_ctx, "reuse_connection", attrs do
with {:ok, resource} <- Resources.fetch_resource_by_id(resource_id, socket.assigns.subject),
# :ok = Resource.authorize(resource, socket.assigns.subject),
{:ok, gateway} <- Gateways.fetch_gateway_by_id(gateway_id, socket.assigns.subject),
true <- Gateways.gateway_can_connect_to_resource?(gateway, resource) do
opentelemetry_ctx = OpenTelemetry.Tracer.current_span_ctx()
{:noreply, socket}
else
{:error, :not_found} -> {:reply, {:error, :not_found}, socket}
false -> {:reply, {:error, :offline}, socket}
:ok =
API.Gateway.Channel.broadcast(
gateway,
{:allow_access,
%{
client_id: socket.assigns.client.id,
resource_id: resource.id,
authorization_expires_at: socket.assigns.subject.expires_at
}, opentelemetry_ctx}
)
{:noreply, socket}
else
{:error, :not_found} ->
OpenTelemetry.Tracer.set_status(:error, "not_found")
{:reply, {:error, :not_found}, socket}
false ->
OpenTelemetry.Tracer.set_status(:error, "offline")
{:reply, {:error, :offline}, socket}
end
end
end
@@ -151,27 +202,40 @@ defmodule API.Client.Channel do
},
socket
) do
with {:ok, resource} <- Resources.fetch_resource_by_id(resource_id, socket.assigns.subject),
# :ok = Resource.authorize(resource, socket.assigns.subject),
{:ok, gateway} <- Gateways.fetch_gateway_by_id(gateway_id, socket.assigns.subject),
true <- Gateways.gateway_can_connect_to_resource?(gateway, resource) do
:ok =
API.Gateway.Channel.broadcast(
gateway,
{:request_connection, {self(), socket_ref(socket)},
%{
client_id: socket.assigns.client.id,
resource_id: resource.id,
authorization_expires_at: socket.assigns.subject.expires_at,
client_rtc_session_description: client_rtc_session_description,
client_preshared_key: preshared_key
}}
)
ctx_attrs = %{gateway_id: gateway_id, resource_id: resource_id}
{:noreply, socket}
else
{:error, :not_found} -> {:reply, {:error, :not_found}, socket}
false -> {:reply, {:error, :offline}, socket}
OpenTelemetry.Tracer.with_span socket.assigns.opentelemetry_ctx,
"request_connection",
ctx_attrs do
with {:ok, resource} <- Resources.fetch_resource_by_id(resource_id, socket.assigns.subject),
# :ok = Resource.authorize(resource, socket.assigns.subject),
{:ok, gateway} <- Gateways.fetch_gateway_by_id(gateway_id, socket.assigns.subject),
true <- Gateways.gateway_can_connect_to_resource?(gateway, resource) do
opentelemetry_ctx = OpenTelemetry.Tracer.current_span_ctx()
:ok =
API.Gateway.Channel.broadcast(
gateway,
{:request_connection, {self(), socket_ref(socket)},
%{
client_id: socket.assigns.client.id,
resource_id: resource.id,
authorization_expires_at: socket.assigns.subject.expires_at,
client_rtc_session_description: client_rtc_session_description,
client_preshared_key: preshared_key
}, opentelemetry_ctx}
)
{:noreply, socket}
else
{:error, :not_found} ->
OpenTelemetry.Tracer.set_status(:error, "not_found")
{:reply, {:error, :not_found}, socket}
false ->
OpenTelemetry.Tracer.set_status(:error, "offline")
{:reply, {:error, :offline}, socket}
end
end
end
end

View File

@@ -2,6 +2,7 @@ defmodule API.Client.Socket do
use Phoenix.Socket
alias Domain.{Auth, Clients}
require Logger
require OpenTelemetry.Tracer
## Channels
@@ -11,29 +12,36 @@ defmodule API.Client.Socket do
@impl true
def connect(%{"token" => token} = attrs, socket, connect_info) do
%{
user_agent: user_agent,
x_headers: x_headers,
peer_data: peer_data
} = connect_info
:otel_propagator_text_map.extract(connect_info.trace_context_headers)
real_ip = API.Sockets.real_ip(x_headers, peer_data)
OpenTelemetry.Tracer.with_span "connect" do
%{
user_agent: user_agent,
x_headers: x_headers,
peer_data: peer_data
} = connect_info
with {:ok, subject} <- Auth.sign_in(token, user_agent, real_ip),
{:ok, client} <- Clients.upsert_client(attrs, subject) do
socket =
socket
|> assign(:subject, subject)
|> assign(:client, client)
real_ip = API.Sockets.real_ip(x_headers, peer_data)
{:ok, socket}
else
{:error, :unauthorized} ->
{:error, :invalid_token}
with {:ok, subject} <- Auth.sign_in(token, user_agent, real_ip),
{:ok, client} <- Clients.upsert_client(attrs, subject) do
socket =
socket
|> assign(:subject, subject)
|> assign(:client, client)
|> assign(:opentelemetry_ctx, OpenTelemetry.Tracer.current_span_ctx())
{:error, reason} ->
Logger.debug("Error connecting client websocket: #{inspect(reason)}")
{:error, reason}
{:ok, socket}
else
{:error, :unauthorized} ->
OpenTelemetry.Tracer.set_status(:error, "unauthorized")
{:error, :invalid_token}
{:error, reason} ->
OpenTelemetry.Tracer.set_status(:error, inspect(reason))
Logger.debug("Error connecting client websocket: #{inspect(reason)}")
{:error, reason}
end
end
end

View File

@@ -3,6 +3,7 @@ defmodule API.Gateway.Channel do
alias API.Gateway.Views
alias Domain.{Clients, Resources, Relays, Gateways}
require Logger
require OpenTelemetry.Tracer
def broadcast(%Gateways.Gateway{} = gateway, payload) do
Logger.debug("Gateway message is being dispatched", gateway_id: gateway.id)
@@ -11,83 +12,103 @@ defmodule API.Gateway.Channel do
@impl true
def join("gateway", _payload, socket) do
send(self(), :after_join)
socket = assign(socket, :refs, %{})
{:ok, socket}
OpenTelemetry.Tracer.with_span socket.assigns.opentelemetry_ctx, "join", %{} do
opentelemetry_ctx = OpenTelemetry.Tracer.current_span_ctx()
send(self(), {:after_join, opentelemetry_ctx})
socket = assign(socket, :refs, %{})
{:ok, socket}
end
end
@impl true
def handle_info(:after_join, socket) do
:ok = Gateways.connect_gateway(socket.assigns.gateway)
:ok = API.Endpoint.subscribe("gateway:#{socket.assigns.gateway.id}")
def handle_info({:after_join, opentelemetry_ctx}, socket) do
OpenTelemetry.Tracer.with_span opentelemetry_ctx, "after_join", %{} do
:ok = Gateways.connect_gateway(socket.assigns.gateway)
:ok = API.Endpoint.subscribe("gateway:#{socket.assigns.gateway.id}")
push(socket, "init", %{
interface: Views.Interface.render(socket.assigns.gateway),
# TODO: move to settings
ipv4_masquerade_enabled: true,
ipv6_masquerade_enabled: true
})
push(socket, "init", %{
interface: Views.Interface.render(socket.assigns.gateway),
# TODO: move to settings
ipv4_masquerade_enabled: true,
ipv6_masquerade_enabled: true
})
{:noreply, socket}
{:noreply, socket}
end
end
def handle_info({:allow_access, attrs}, socket) do
%{
client_id: client_id,
resource_id: resource_id,
authorization_expires_at: authorization_expires_at
} = attrs
def handle_info({:allow_access, attrs, opentelemetry_ctx}, socket) do
OpenTelemetry.Tracer.with_span opentelemetry_ctx, "allow_access", %{} do
%{
client_id: client_id,
resource_id: resource_id,
authorization_expires_at: authorization_expires_at
} = attrs
resource = Resources.fetch_resource_by_id!(resource_id)
resource = Resources.fetch_resource_by_id!(resource_id)
push(socket, "allow_access", %{
client_id: client_id,
resource: Views.Resource.render(resource),
expires_at: DateTime.to_unix(authorization_expires_at, :second)
})
push(socket, "allow_access", %{
client_id: client_id,
resource: Views.Resource.render(resource),
expires_at: DateTime.to_unix(authorization_expires_at, :second)
})
{:noreply, socket}
{:noreply, socket}
end
end
def handle_info({:request_connection, {channel_pid, socket_ref}, attrs}, socket) do
%{
client_id: client_id,
resource_id: resource_id,
authorization_expires_at: authorization_expires_at,
client_rtc_session_description: rtc_session_description,
client_preshared_key: preshared_key
} = attrs
def handle_info(
{:request_connection, {channel_pid, socket_ref}, attrs, opentelemetry_ctx},
socket
) do
OpenTelemetry.Tracer.with_span opentelemetry_ctx, "allow_access", %{} do
opentelemetry_ctx = OpenTelemetry.Tracer.current_span_ctx()
Logger.debug("Gateway received connection request message",
client_id: client_id,
resource_id: resource_id
)
%{
client_id: client_id,
resource_id: resource_id,
authorization_expires_at: authorization_expires_at,
client_rtc_session_description: rtc_session_description,
client_preshared_key: preshared_key
} = attrs
client = Clients.fetch_client_by_id!(client_id, preload: [:actor])
resource = Resources.fetch_resource_by_id!(resource_id)
{:ok, relays} = Relays.list_connected_relays_for_resource(resource)
Logger.debug("Gateway received connection request message",
client_id: client_id,
resource_id: resource_id
)
ref = Ecto.UUID.generate()
client = Clients.fetch_client_by_id!(client_id, preload: [:actor])
resource = Resources.fetch_resource_by_id!(resource_id)
{:ok, relays} = Relays.list_connected_relays_for_resource(resource)
push(socket, "request_connection", %{
ref: ref,
actor: Views.Actor.render(client.actor),
relays: Views.Relay.render_many(relays, authorization_expires_at),
resource: Views.Resource.render(resource),
client: Views.Client.render(client, rtc_session_description, preshared_key),
expires_at: DateTime.to_unix(authorization_expires_at, :second)
})
ref = Ecto.UUID.generate()
Logger.debug("Awaiting gateway connection_ready message",
client_id: client_id,
resource_id: resource_id,
ref: ref
)
push(socket, "request_connection", %{
ref: ref,
actor: Views.Actor.render(client.actor),
relays: Views.Relay.render_many(relays, authorization_expires_at),
resource: Views.Resource.render(resource),
client: Views.Client.render(client, rtc_session_description, preshared_key),
expires_at: DateTime.to_unix(authorization_expires_at, :second)
})
refs = Map.put(socket.assigns.refs, ref, {channel_pid, socket_ref, resource_id})
socket = assign(socket, :refs, refs)
Logger.debug("Awaiting gateway connection_ready message",
client_id: client_id,
resource_id: resource_id,
ref: ref
)
{:noreply, socket}
refs =
Map.put(
socket.assigns.refs,
ref,
{channel_pid, socket_ref, resource_id, opentelemetry_ctx}
)
socket = assign(socket, :refs, refs)
{:noreply, socket}
end
end
@impl true
@@ -99,22 +120,26 @@ defmodule API.Gateway.Channel do
},
socket
) do
{{channel_pid, socket_ref, resource_id}, refs} = Map.pop(socket.assigns.refs, ref)
socket = assign(socket, :refs, refs)
{{channel_pid, socket_ref, resource_id, opentelemetry_ctx}, refs} =
Map.pop(socket.assigns.refs, ref)
send(
channel_pid,
{:connect, socket_ref, resource_id, socket.assigns.gateway.public_key,
rtc_session_description}
)
OpenTelemetry.Tracer.with_span opentelemetry_ctx, "connection_ready", %{} do
socket = assign(socket, :refs, refs)
Logger.debug("Gateway replied to the Client with :connect message",
resource_id: resource_id,
channel_pid: inspect(channel_pid),
ref: ref
)
send(
channel_pid,
{:connect, socket_ref, resource_id, socket.assigns.gateway.public_key,
rtc_session_description, opentelemetry_ctx}
)
{:reply, :ok, socket}
Logger.debug("Gateway replied to the Client with :connect message",
resource_id: resource_id,
channel_pid: inspect(channel_pid),
ref: ref
)
{:reply, :ok, socket}
end
end
# def handle_in("metrics", params, socket) do

View File

@@ -2,6 +2,7 @@ defmodule API.Gateway.Socket do
use Phoenix.Socket
alias Domain.Gateways
require Logger
require OpenTelemetry.Tracer
## Channels
@@ -11,34 +12,41 @@ defmodule API.Gateway.Socket do
@impl true
def connect(%{"token" => encrypted_secret} = attrs, socket, connect_info) do
%{
user_agent: user_agent,
x_headers: x_headers,
peer_data: peer_data
} = connect_info
:otel_propagator_text_map.extract(connect_info.trace_context_headers)
real_ip = API.Sockets.real_ip(x_headers, peer_data)
OpenTelemetry.Tracer.with_span "connect" do
%{
user_agent: user_agent,
x_headers: x_headers,
peer_data: peer_data
} = connect_info
attrs =
attrs
|> Map.take(~w[external_id name_suffix public_key])
|> Map.put("last_seen_user_agent", user_agent)
|> Map.put("last_seen_remote_ip", real_ip)
real_ip = API.Sockets.real_ip(x_headers, peer_data)
with {:ok, token} <- Gateways.authorize_gateway(encrypted_secret),
{:ok, gateway} <- Gateways.upsert_gateway(token, attrs) do
socket =
socket
|> assign(:gateway, gateway)
attrs =
attrs
|> Map.take(~w[external_id name_suffix public_key])
|> Map.put("last_seen_user_agent", user_agent)
|> Map.put("last_seen_remote_ip", real_ip)
{:ok, socket}
else
{:error, :invalid_token} ->
{:error, :invalid_token}
with {:ok, token} <- Gateways.authorize_gateway(encrypted_secret),
{:ok, gateway} <- Gateways.upsert_gateway(token, attrs) do
socket =
socket
|> assign(:gateway, gateway)
|> assign(:opentelemetry_ctx, OpenTelemetry.Tracer.current_span_ctx())
{:error, reason} ->
Logger.debug("Error connecting gateway websocket: #{inspect(reason)}")
{:error, reason}
{:ok, socket}
else
{:error, :invalid_token} ->
OpenTelemetry.Tracer.set_status(:error, "invalid_token")
{:error, :invalid_token}
{:error, reason} ->
OpenTelemetry.Tracer.set_status(:error, inspect(reason))
Logger.debug("Error connecting gateway websocket: #{inspect(reason)}")
{:error, reason}
end
end
end

View File

@@ -1,18 +1,24 @@
defmodule API.Relay.Channel do
use API, :channel
alias Domain.Relays
require OpenTelemetry.Tracer
@impl true
def join("relay", %{"stamp_secret" => stamp_secret}, socket) do
send(self(), {:after_join, stamp_secret})
{:ok, socket}
OpenTelemetry.Tracer.with_span socket.assigns.opentelemetry_ctx, "join", %{} do
opentelemetry_ctx = OpenTelemetry.Tracer.current_span_ctx()
send(self(), {:after_join, stamp_secret, opentelemetry_ctx})
{:ok, assign(socket, opentelemetry_ctx: opentelemetry_ctx)}
end
end
@impl true
def handle_info({:after_join, stamp_secret}, socket) do
API.Endpoint.subscribe("relay:#{socket.assigns.relay.id}")
push(socket, "init", %{})
:ok = Relays.connect_relay(socket.assigns.relay, stamp_secret)
{:noreply, socket}
def handle_info({:after_join, stamp_secret, opentelemetry_ctx}, socket) do
OpenTelemetry.Tracer.with_span opentelemetry_ctx, "after_join", %{} do
API.Endpoint.subscribe("relay:#{socket.assigns.relay.id}")
push(socket, "init", %{})
:ok = Relays.connect_relay(socket.assigns.relay, stamp_secret)
{:noreply, socket}
end
end
end

View File

@@ -2,6 +2,7 @@ defmodule API.Relay.Socket do
use Phoenix.Socket
alias Domain.Relays
require Logger
require OpenTelemetry.Tracer
## Channels
@@ -11,34 +12,41 @@ defmodule API.Relay.Socket do
@impl true
def connect(%{"token" => encrypted_secret} = attrs, socket, connect_info) do
%{
user_agent: user_agent,
x_headers: x_headers,
peer_data: peer_data
} = connect_info
:otel_propagator_text_map.extract(connect_info.trace_context_headers)
real_ip = API.Sockets.real_ip(x_headers, peer_data)
OpenTelemetry.Tracer.with_span "connect" do
%{
user_agent: user_agent,
x_headers: x_headers,
peer_data: peer_data
} = connect_info
attrs =
attrs
|> Map.take(~w[ipv4 ipv6])
|> Map.put("last_seen_user_agent", user_agent)
|> Map.put("last_seen_remote_ip", real_ip)
real_ip = API.Sockets.real_ip(x_headers, peer_data)
with {:ok, token} <- Relays.authorize_relay(encrypted_secret),
{:ok, relay} <- Relays.upsert_relay(token, attrs) do
socket =
socket
|> assign(:relay, relay)
attrs =
attrs
|> Map.take(~w[ipv4 ipv6])
|> Map.put("last_seen_user_agent", user_agent)
|> Map.put("last_seen_remote_ip", real_ip)
{:ok, socket}
else
{:error, :invalid_token} ->
{:error, :invalid_token}
with {:ok, token} <- Relays.authorize_relay(encrypted_secret),
{:ok, relay} <- Relays.upsert_relay(token, attrs) do
socket =
socket
|> assign(:relay, relay)
|> assign(:opentelemetry_ctx, OpenTelemetry.Tracer.current_span_ctx())
{:error, reason} ->
Logger.debug("Error connecting relay websocket: #{inspect(reason)}")
{:error, reason}
{:ok, socket}
else
{:error, :invalid_token} ->
OpenTelemetry.Tracer.set_status(:error, "invalid_token")
{:error, :invalid_token}
{:error, reason} ->
OpenTelemetry.Tracer.set_status(:error, inspect(reason))
Logger.debug("Error connecting relay websocket: #{inspect(reason)}")
{:error, reason}
end
end
end

View File

@@ -1,5 +1,6 @@
defmodule API.Client.ChannelTest do
use API.ChannelCase
alias Domain.Mocks.GoogleCloudPlatform
setup do
account = Fixtures.Accounts.create_account()
@@ -31,6 +32,7 @@ defmodule API.Client.ChannelTest do
{:ok, _reply, socket} =
API.Client.Socket
|> socket("client:#{client.id}", %{
opentelemetry_ctx: OpenTelemetry.Tracer.start_span("test"),
client: client,
subject: subject
})
@@ -68,6 +70,7 @@ defmodule API.Client.ChannelTest do
{:ok, _reply, _socket} =
API.Client.Socket
|> socket("client:#{client.id}", %{
opentelemetry_ctx: OpenTelemetry.Tracer.start_span("test"),
client: client,
subject: subject
})
@@ -109,6 +112,34 @@ defmodule API.Client.ChannelTest do
end
end
describe "handle_in/3 create_log_sink" do
test "returns error when feature is disabled", %{socket: socket} do
Domain.Config.put_env_override(Domain.Instrumentation, client_logs_enabled: false)
ref = push(socket, "create_log_sink", %{})
assert_reply ref, :error, :disabled
end
test "returns a signed URL which can be used to upload the logs", %{
socket: socket,
client: client
} do
bypass = Bypass.open()
GoogleCloudPlatform.mock_instance_metadata_token_endpoint(bypass)
GoogleCloudPlatform.mock_sign_blob_endpoint(bypass, "foo")
ref = push(socket, "create_log_sink", %{})
assert_reply ref, :ok, signed_url
assert signed_uri = URI.parse(signed_url)
assert signed_uri.scheme == "https"
assert signed_uri.host == "storage.googleapis.com"
assert String.starts_with?(signed_uri.path, "/logs/clients/#{client.id}/")
assert String.ends_with?(signed_uri.path, ".json")
end
end
describe "handle_in/3 prepare_connection" do
test "returns error when resource is not found", %{socket: socket} do
ref = push(socket, "prepare_connection", %{"resource_id" => Ecto.UUID.generate()})
@@ -290,7 +321,7 @@ defmodule API.Client.ChannelTest do
push(socket, "reuse_connection", attrs)
assert_receive {:allow_access, payload}
assert_receive {:allow_access, payload, _opentelemetry_ctx}
assert %{
resource_id: ^resource_id,
@@ -384,7 +415,7 @@ defmodule API.Client.ChannelTest do
ref = push(socket, "request_connection", attrs)
assert_receive {:request_connection, {channel_pid, socket_ref}, payload}
assert_receive {:request_connection, {channel_pid, socket_ref}, payload, _opentelemetry_ctx}
assert %{
resource_id: ^resource_id,
@@ -396,7 +427,11 @@ defmodule API.Client.ChannelTest do
assert authorization_expires_at == socket.assigns.subject.expires_at
send(channel_pid, {:connect, socket_ref, resource.id, gateway.public_key, "FULL_RTC_SD"})
send(
channel_pid,
{:connect, socket_ref, resource.id, gateway.public_key, "FULL_RTC_SD",
OpenTelemetry.Tracer.start_span("connect")}
)
assert_reply ref, :ok, %{
resource_id: ^resource_id,

View File

@@ -7,7 +7,8 @@ defmodule API.Client.SocketTest do
@connect_info %{
user_agent: "iOS/12.7 (iPhone) connlib/0.1.1",
peer_data: %{address: {189, 172, 73, 001}},
x_headers: [{"x-forwarded-for", "189.172.73.153"}]
x_headers: [{"x-forwarded-for", "189.172.73.153"}],
trace_context_headers: []
}
describe "connect/3" do
@@ -62,7 +63,8 @@ defmodule API.Client.SocketTest do
%{
user_agent: subject.context.user_agent,
peer_data: %{address: subject.context.remote_ip},
x_headers: []
x_headers: [],
trace_context_headers: []
}
end

View File

@@ -17,7 +17,10 @@ defmodule API.Gateway.ChannelTest do
{:ok, _, socket} =
API.Gateway.Socket
|> socket("gateway:#{gateway.id}", %{gateway: gateway})
|> socket("gateway:#{gateway.id}", %{
gateway: gateway,
opentelemetry_ctx: OpenTelemetry.Tracer.start_span("test")
})
|> subscribe_and_join(API.Gateway.Channel, "gateway")
relay = Fixtures.Relays.create_relay(account: account)
@@ -67,6 +70,7 @@ defmodule API.Gateway.ChannelTest do
socket: socket
} do
expires_at = DateTime.utc_now() |> DateTime.add(30, :second)
opentelemetry_ctx = OpenTelemetry.Tracer.start_span("test")
stamp_secret = Ecto.UUID.generate()
:ok = Domain.Relays.connect_relay(relay, stamp_secret)
@@ -78,7 +82,7 @@ defmodule API.Gateway.ChannelTest do
client_id: client.id,
resource_id: resource.id,
authorization_expires_at: expires_at
}}
}, opentelemetry_ctx}
)
assert_push "allow_access", payload
@@ -115,6 +119,8 @@ defmodule API.Gateway.ChannelTest do
preshared_key = "PSK"
rtc_session_description = "RTC_SD"
opentelemetry_ctx = OpenTelemetry.Tracer.start_span("test")
stamp_secret = Ecto.UUID.generate()
:ok = Domain.Relays.connect_relay(relay, stamp_secret)
@@ -127,7 +133,7 @@ defmodule API.Gateway.ChannelTest do
authorization_expires_at: expires_at,
client_rtc_session_description: rtc_session_description,
client_preshared_key: preshared_key
}}
}, opentelemetry_ctx}
)
assert_push "request_connection", payload
@@ -217,6 +223,8 @@ defmodule API.Gateway.ChannelTest do
gateway_public_key = gateway.public_key
rtc_session_description = "RTC_SD"
opentelemetry_ctx = OpenTelemetry.Tracer.start_span("test")
stamp_secret = Ecto.UUID.generate()
:ok = Domain.Relays.connect_relay(relay, stamp_secret)
@@ -229,7 +237,7 @@ defmodule API.Gateway.ChannelTest do
authorization_expires_at: expires_at,
client_rtc_session_description: rtc_session_description,
client_preshared_key: preshared_key
}}
}, opentelemetry_ctx}
)
assert_push "request_connection", %{ref: ref}
@@ -243,7 +251,7 @@ defmodule API.Gateway.ChannelTest do
assert_reply push_ref, :ok
assert_receive {:connect, ^socket_ref, resource_id, ^gateway_public_key,
^rtc_session_description}
^rtc_session_description, _opentelemetry_ctx}
assert resource_id == resource.id
end

View File

@@ -9,7 +9,8 @@ defmodule API.Gateway.SocketTest do
@connect_info %{
user_agent: "iOS/12.7 (iPhone) connlib/#{@connlib_version}",
peer_data: %{address: {189, 172, 73, 001}},
x_headers: [{"x-forwarded-for", "189.172.73.153"}]
x_headers: [{"x-forwarded-for", "189.172.73.153"}],
trace_context_headers: []
}
describe "connect/3" do

View File

@@ -8,7 +8,10 @@ defmodule API.Relay.ChannelTest do
{:ok, _, socket} =
API.Relay.Socket
|> socket("relay:#{relay.id}", %{relay: relay})
|> socket("relay:#{relay.id}", %{
relay: relay,
opentelemetry_ctx: OpenTelemetry.Tracer.start_span("test")
})
|> subscribe_and_join(API.Relay.Channel, "relay", %{stamp_secret: stamp_secret})
%{relay: relay, socket: socket}
@@ -30,7 +33,10 @@ defmodule API.Relay.ChannelTest do
{:ok, _, _socket} =
API.Relay.Socket
|> socket("relay:#{relay.id}", %{relay: relay})
|> socket("relay:#{relay.id}", %{
relay: relay,
opentelemetry_ctx: OpenTelemetry.Tracer.start_span("test")
})
|> subscribe_and_join(API.Relay.Channel, "relay", %{stamp_secret: stamp_secret})
presence = Domain.Relays.Presence.list("relays")

View File

@@ -9,7 +9,8 @@ defmodule API.Relay.SocketTest do
@connect_info %{
user_agent: "iOS/12.7 (iPhone) connlib/#{@connlib_version}",
peer_data: %{address: {189, 172, 73, 001}},
x_headers: [{"x-forwarded-for", "189.172.73.153"}]
x_headers: [{"x-forwarded-for", "189.172.73.153"}],
trace_context_headers: []
}
describe "connect/3" do

View File

@@ -15,6 +15,7 @@ defmodule API.ChannelCase do
import API.ChannelCase
alias Domain.Repo
alias Domain.Fixtures
require OpenTelemetry.Tracer
# The default endpoint for testing
@endpoint API.Endpoint

View File

@@ -22,21 +22,23 @@ defmodule Domain.Application do
def children do
[
# Infrastructure services
# Core services
Domain.Repo,
{Phoenix.PubSub, name: Domain.PubSub},
# Infrastructure services
# Note: only one of platform adapters will be actually started.
Domain.GoogleCloudPlatform,
Domain.Cluster,
# Application
Domain.Auth,
Domain.Relays,
Domain.Gateways,
Domain.Clients,
Domain.Clients
# Observability
# Domain.Telemetry
# Erlang Clustering
Domain.Cluster
]
end
end

View File

@@ -10,7 +10,6 @@ defmodule Domain.Cluster do
config = Domain.Config.fetch_env!(:domain, __MODULE__)
adapter = Keyword.fetch!(config, :adapter)
adapter_config = Keyword.fetch!(config, :adapter_config)
pool_opts = Domain.Config.fetch_env!(:domain, :http_client_ssl_opts)
topology_config = [
default: [
@@ -19,12 +18,8 @@ defmodule Domain.Cluster do
]
]
shared_children = [
{Finch, name: __MODULE__.Finch, pools: %{default: pool_opts}}
]
children =
if adapter != Domain.Cluster.Local do
if adapter do
[
{Cluster.Supervisor, [topology_config, [name: __MODULE__]]}
]
@@ -32,6 +27,6 @@ defmodule Domain.Cluster do
[]
end
Supervisor.init(shared_children ++ children, strategy: :rest_for_one)
Supervisor.init(children, strategy: :rest_for_one)
end
end

View File

@@ -21,22 +21,21 @@ defmodule Domain.Cluster.GoogleComputeLabelsStrategy do
defmodule Meta do
@type t :: %{
access_token: String.t(),
access_token_expires_at: DateTime.t(),
nodes: MapSet.t()
}
defstruct access_token: nil,
access_token_expires_at: nil,
nodes: MapSet.new()
defstruct nodes: MapSet.new()
end
@default_polling_interval 5_000
@default_polling_interval :timer.seconds(10)
def start_link(args), do: GenServer.start_link(__MODULE__, args)
@impl true
def init([%State{} = state]) do
unless Domain.GoogleCloudPlatform.enabled?(),
do: "Google Cloud Platform clustering strategy requires GoogleCloudPlatform to be enabled"
{:ok, %{state | meta: %Meta{}}, {:continue, :start}}
end
@@ -103,63 +102,12 @@ defmodule Domain.Cluster.GoogleComputeLabelsStrategy do
%State{state | meta: %{state.meta | nodes: new_nodes}}
end
@doc false
# We use Google Compute Engine metadata server to fetch the node access token,
# it will have scopes declared in the instance template but actual permissions
# are limited by the service account attached to it.
def refresh_access_token(state) do
config = fetch_config!()
token_endpoint_url = Keyword.fetch!(config, :token_endpoint_url)
request = Finch.build(:get, token_endpoint_url, [{"Metadata-Flavor", "Google"}])
case Finch.request(request, Domain.Cluster.Finch) do
{:ok, %Finch.Response{status: 200, body: response}} ->
%{"access_token" => access_token, "expires_in" => expires_in} = Jason.decode!(response)
access_token_expires_at = DateTime.utc_now() |> DateTime.add(expires_in - 1, :second)
{:ok,
%{
state
| meta: %{
state.meta
| access_token: access_token,
access_token_expires_at: access_token_expires_at
}
}}
{:ok, response} ->
Cluster.Logger.warn(state.topology, "Can't fetch instance metadata: #{inspect(response)}")
{:error, {response.status, response.body}}
{:error, reason} ->
Cluster.Logger.warn(state.topology, "Can not fetch instance metadata: #{inspect(reason)}")
{:error, reason}
end
end
defp maybe_refresh_access_token(state) do
cond do
is_nil(state.meta.access_token) ->
refresh_access_token(state)
is_nil(state.meta.access_token_expires_at) ->
refresh_access_token(state)
DateTime.diff(state.meta.access_token_expires_at, DateTime.utc_now()) > 0 ->
{:ok, state}
true ->
refresh_access_token(state)
end
end
@doc false
# We use Google Compute API to fetch the list of instances in all regions of a project,
# instances are filtered by cluster name and status, and then we use this instance labels
# to figure out the actual node name (which is set in `rel/env.sh.eex` by also reading node metadata).
def fetch_nodes(state, remaining_retry_count \\ 3) do
with {:ok, state} <- maybe_refresh_access_token(state),
{:ok, nodes} <- fetch_google_cloud_instances(state) do
with {:ok, nodes} <- list_google_cloud_cluster_nodes(state) do
{:ok, nodes, state}
else
{:error, %{"error" => %{"code" => 401}} = reason} ->
@@ -171,7 +119,6 @@ defmodule Domain.Cluster.GoogleComputeLabelsStrategy do
if remaining_retry_count == 0 do
{:error, reason}
else
{:ok, state} = refresh_access_token(state)
fetch_nodes(state, remaining_retry_count - 1)
end
@@ -191,41 +138,20 @@ defmodule Domain.Cluster.GoogleComputeLabelsStrategy do
end
end
defp fetch_google_cloud_instances(state) do
defp list_google_cloud_cluster_nodes(state) do
project_id = Keyword.fetch!(state.config, :project_id)
cluster_name = Keyword.fetch!(state.config, :cluster_name)
cluster_name_label = Keyword.get(state.config, :cluster_name_label, "cluster_name")
node_name_label = Keyword.get(state.config, :node_name_label, "application")
aggregated_list_endpoint_url =
fetch_config!()
|> Keyword.fetch!(:aggregated_list_endpoint_url)
|> String.replace("${project_id}", project_id)
filter = "labels.#{cluster_name_label}=#{cluster_name} AND status=RUNNING"
query = URI.encode_query(%{"filter" => filter})
request =
Finch.build(:get, aggregated_list_endpoint_url <> "?" <> query, [
{"Authorization", "Bearer #{state.meta.access_token}"}
])
with {:ok, %Finch.Response{status: 200, body: response}} <-
Finch.request(request, Domain.Cluster.Finch),
{:ok, %{"items" => items}} <- Jason.decode(response) do
with {:ok, instances} <-
Domain.GoogleCloudPlatform.list_google_cloud_instances_by_label(
project_id,
cluster_name_label,
cluster_name
) do
nodes =
items
|> Enum.flat_map(fn
{_zone, %{"instances" => instances}} ->
instances
{_zone, %{"warning" => %{"code" => "NO_RESULTS_ON_PAGE"}}} ->
[]
end)
|> Enum.filter(fn
%{"status" => "RUNNING", "labels" => %{^cluster_name_label => ^cluster_name}} -> true
%{"status" => _status, "labels" => _labels} -> false
end)
instances
|> Enum.map(fn %{"zone" => zone, "name" => name, "labels" => labels} ->
release_name = Map.fetch!(labels, node_name_label)
zone = String.split(zone, "/") |> List.last()
@@ -235,22 +161,9 @@ defmodule Domain.Cluster.GoogleComputeLabelsStrategy do
end)
{:ok, nodes}
else
{:ok, %Finch.Response{status: status, body: body}} ->
{:error, {status, body}}
{:ok, map} ->
{:error, map}
{:error, reason} ->
{:error, reason}
end
end
defp fetch_config! do
Domain.Config.fetch_env!(:domain, __MODULE__)
end
defp polling_interval(%State{config: config}) do
Keyword.get(config, :polling_interval, @default_polling_interval)
end

View File

@@ -63,6 +63,11 @@ defmodule Domain.Config.Definitions do
:database_ssl_opts,
:database_parameters
]},
{"Cloud Platform",
[
:platform_adapter,
:platform_adapter_config
]},
{"Erlang Cluster",
[
:erlang_cluster_adapter,
@@ -114,6 +119,11 @@ defmodule Domain.Config.Definitions do
:outbound_email_adapter,
:outbound_email_adapter_opts
]},
{"Instrumentation",
[
:instrumentation_client_logs_enabled,
:instrumentation_client_logs_bucket
]},
{"Telemetry",
[
:telemetry_enabled,
@@ -278,14 +288,39 @@ defmodule Domain.Config.Definitions do
dump: &Dumper.keyword/1
)
##############################################
## Platform
##############################################
@doc """
Cloud platform on which the Firezone runs on which is used to unlock
platform-specific features (logging, tracing, monitoring, clustering).
"""
defconfig(
:platform_adapter,
{:parameterized, Ecto.Enum,
Ecto.Enum.init(
values: [
Elixir.Domain.GoogleCloudPlatform
]
)},
default: nil
)
@doc """
Config for the platform adapter.
"""
defconfig(:platform_adapter_config, :map,
default: [],
dump: &Dumper.keyword/1
)
##############################################
## Erlang Cluster
##############################################
@doc """
An adapter that will be used to discover and connect nodes to the Erlang cluster.
Set to `Domain.Cluster.Local` to disable
"""
defconfig(
:erlang_cluster_adapter,
@@ -295,18 +330,17 @@ defmodule Domain.Config.Definitions do
Elixir.Cluster.Strategy.LocalEpmd,
Elixir.Cluster.Strategy.Epmd,
Elixir.Cluster.Strategy.Gossip,
Elixir.Domain.Cluster.GoogleComputeLabelsStrategy,
Domain.Cluster.Local
Elixir.Domain.Cluster.GoogleComputeLabelsStrategy
]
)},
default: Domain.Cluster.Local
default: nil
)
@doc """
Config for the Erlang cluster adapter.
"""
defconfig(:erlang_cluster_adapter_config, :map,
default: [],
default: %{},
dump: fn map ->
keyword = Dumper.keyword(map)
@@ -457,6 +491,22 @@ defmodule Domain.Config.Definitions do
@doc """
Enable or disable the Firezone telemetry collection.
For more details see https://docs.firezone.dev/reference/telemetry/.
"""
defconfig(:instrumentation_client_logs_enabled, :boolean, default: true)
@doc """
Name of the bucket to store client-, relay- and gateway-submitted instrumentation logs in.
"""
defconfig(:instrumentation_client_logs_bucket, :string, default: "logs")
##############################################
## Telemetry
##############################################
@doc """
Enable or disable the Firezone telemetry collection.
For more details see https://docs.firezone.dev/reference/telemetry/.
"""
defconfig(:telemetry_enabled, :boolean, default: true)

View File

@@ -35,6 +35,7 @@ defmodule Domain.Config.Resolver do
if Mix.env() == :test do
def fetch_process_env(pdict_key) do
with :error <- fetch_process_value(pdict_key),
:error <- fetch_process_value(Process.get(:last_caller_pid), pdict_key),
:error <- fetch_process_value(get_last_pid_from_pdict_list(:"$ancestors"), pdict_key),
:error <- fetch_process_value(get_last_pid_from_pdict_list(:"$callers"), pdict_key) do
:error

View File

@@ -61,6 +61,12 @@ defmodule Domain.Crypto do
defp replace_ambiguous_characters(<<char::utf8, rest::binary>>, acc),
do: replace_ambiguous_characters(rest, <<acc::binary, char::utf8>>)
def hash(type, value) do
:crypto.hash(type, value)
|> Base.encode16()
|> String.downcase()
end
def hash(value), do: Argon2.hash_pwd_salt(value)
def equal?(token, hash) when is_nil(token) or is_nil(hash), do: Argon2.no_user_verify()

View File

@@ -0,0 +1,139 @@
defmodule Domain.GoogleCloudPlatform do
use Supervisor
alias Domain.GoogleCloudPlatform.{Instance, URLSigner}
require Logger
def start_link(opts) do
Supervisor.start_link(__MODULE__, opts, name: __MODULE__.Supervisor)
end
@impl true
def init(_opts) do
if enabled?() do
pool_opts = Domain.Config.fetch_env!(:domain, :http_client_ssl_opts)
children = [
{Finch, name: __MODULE__.Finch, pools: %{default: pool_opts}},
Instance
]
Supervisor.init(children, strategy: :rest_for_one)
else
:ignore
end
end
def enabled? do
Application.fetch_env!(:domain, :platform_adapter) == __MODULE__
end
def fetch_and_cache_access_token do
Domain.GoogleCloudPlatform.Instance.fetch_access_token()
end
# We use Google Compute Engine metadata server to fetch the node access token,
# it will have scopes declared in the instance template but actual permissions
# are limited by the service account attached to it.
def fetch_access_token do
config = fetch_config!()
token_endpoint_url = Keyword.fetch!(config, :token_endpoint_url)
request = Finch.build(:get, token_endpoint_url, [{"Metadata-Flavor", "Google"}])
case Finch.request(request, __MODULE__.Finch) do
{:ok, %Finch.Response{status: 200, body: response}} ->
%{"access_token" => access_token, "expires_in" => expires_in} = Jason.decode!(response)
access_token_expires_at = DateTime.utc_now() |> DateTime.add(expires_in - 1, :second)
{:ok, access_token, access_token_expires_at}
{:ok, response} ->
Logger.error("Can't fetch instance token", reason: inspect(response))
{:error, {response.status, response.body}}
{:error, reason} ->
Logger.error("Can not fetch instance token", reason: inspect(reason))
{:error, reason}
end
end
def list_google_cloud_instances_by_label(project_id, label, value) do
aggregated_list_endpoint_url =
fetch_config!()
|> Keyword.fetch!(:aggregated_list_endpoint_url)
|> String.replace("${project_id}", project_id)
filter = "labels.#{label}=#{value} AND status=RUNNING"
query = URI.encode_query(%{"filter" => filter})
url = aggregated_list_endpoint_url <> "?" <> query
with {:ok, access_token} <- fetch_and_cache_access_token(),
request = Finch.build(:get, url, [{"Authorization", "Bearer #{access_token}"}]),
{:ok, %Finch.Response{status: 200, body: response}} <-
Finch.request(request, __MODULE__.Finch),
{:ok, %{"items" => items}} <- Jason.decode(response) do
instances =
items
|> Enum.flat_map(fn
{_zone, %{"instances" => instances}} ->
instances
{_zone, %{"warning" => %{"code" => "NO_RESULTS_ON_PAGE"}}} ->
[]
end)
|> Enum.filter(fn
%{"status" => "RUNNING", "labels" => %{^label => ^value}} -> true
%{"status" => _status, "labels" => _labels} -> false
end)
{:ok, instances}
else
{:ok, %Finch.Response{status: status, body: body}} ->
{:error, {status, body}}
{:ok, map} ->
{:error, map}
{:error, reason} ->
{:error, reason}
end
end
@doc """
Signs a URL which can be used to write or read a file in Google Cloud Storage bucket
using HMAC (without additional network requests to Google API's).
## Available options
* `:verb` - HTTP verb which would be used to access the resource ("PUT", "GET", "HEAD").
Default: `GET`.
* `:expires_in` - time in seconds after which signed URL would expire. Default - `:infinity`.
* `:headers` - Enforce any other headers, eg. `Content-Type` to make sure that signed URL requests
are going to have a specific `Content-Type` header (only when `:verb` is `PUT`).
"""
def sign_url(bucket, filename, opts \\ []) do
with {:ok, service_account_access_token} <- fetch_and_cache_access_token() do
config = fetch_config!()
service_account_email = Keyword.fetch!(config, :service_account_email)
sign_endpoint_url = Keyword.fetch!(config, :sign_endpoint_url)
cloud_storage_url = Keyword.fetch!(config, :cloud_storage_url)
opts =
opts
|> Keyword.put_new(:sign_endpoint_url, sign_endpoint_url)
|> Keyword.put_new(:cloud_storage_url, cloud_storage_url)
URLSigner.sign_url(
service_account_email,
service_account_access_token,
bucket,
filename,
opts
)
end
end
defp fetch_config! do
Domain.Config.fetch_env!(:domain, __MODULE__)
end
end

View File

@@ -0,0 +1,65 @@
defmodule Domain.GoogleCloudPlatform.Instance do
use GenServer
alias Domain.GoogleCloudPlatform
def start_link(_opts) do
GenServer.start_link(__MODULE__, nil, name: __MODULE__)
end
@impl true
def init(nil) do
{:ok, %{access_token: nil, access_token_expires_at: nil}}
end
def fetch_access_token do
callers = Process.get(:"$callers") || []
last_caller = List.first(callers) || self()
metadata = Logger.metadata()
GenServer.call(__MODULE__, {:fetch_access_token, last_caller, metadata})
end
@impl true
def handle_call({:fetch_access_token, last_caller, metadata}, _from, state) do
# Propagate logger metadata
Logger.metadata(metadata)
# Allows GenServer to find the caller process for pdict config overrides
Process.put(:last_caller_pid, last_caller)
case maybe_refresh_access_token(state) do
{:ok, access_token, access_token_expires_at} ->
state = %{
state
| access_token: access_token,
access_token_expires_at: access_token_expires_at
}
{:reply, {:ok, access_token}, state}
{:error, reason} ->
state = %{
state
| access_token: nil,
access_token_expires_at: nil
}
{:reply, {:error, reason}, state}
end
end
defp maybe_refresh_access_token(state) do
cond do
is_nil(state.access_token) ->
GoogleCloudPlatform.fetch_access_token()
is_nil(state.access_token_expires_at) ->
GoogleCloudPlatform.fetch_access_token()
DateTime.diff(state.access_token_expires_at, DateTime.utc_now()) > 0 ->
{:ok, state.access_token, state.access_token_expires_at}
true ->
GoogleCloudPlatform.fetch_access_token()
end
end
end

View File

@@ -0,0 +1,110 @@
defmodule Domain.GoogleCloudPlatform.URLSigner do
def sign_url(oauth_identity, oauth_access_token, bucket, filename, opts \\ []) do
sign_endpoint_url = Keyword.fetch!(opts, :sign_endpoint_url)
sign_endpoint_url = sign_endpoint_url <> oauth_identity <> ":signBlob"
cloud_storage_url = Keyword.fetch!(opts, :cloud_storage_url)
cloud_storage_host = URI.parse(cloud_storage_url).host
valid_from = Keyword.get(opts, :valid_from, DateTime.utc_now())
valid_from = DateTime.truncate(valid_from, :second)
valid_from_date = DateTime.to_date(valid_from)
verb = Keyword.get(opts, :verb, "GET")
expires_in = Keyword.get(opts, :expires_in, 60 * 60 * 24 * 7)
headers = Keyword.get(opts, :headers, [])
headers = prepare_headers(headers, cloud_storage_host)
canonical_headers = canonical_headers(headers)
signed_headers = signed_headers(headers)
path = Path.join("/", Path.join(bucket, filename))
credential_scope = "#{Date.to_iso8601(valid_from_date, :basic)}/auto/storage/goog4_request"
canonical_query_string =
[
{"X-Goog-Algorithm", "GOOG4-RSA-SHA256"},
{"X-Goog-Credential", "#{oauth_identity}/#{credential_scope}"},
{"X-Goog-Date", DateTime.to_iso8601(valid_from, :basic)},
{"X-Goog-SignedHeaders", signed_headers},
{"X-Goog-Expires", expires_in}
]
|> Enum.sort()
|> URI.encode_query(:rfc3986)
canonical_request =
[
verb,
path,
canonical_query_string,
canonical_headers,
"",
signed_headers,
"UNSIGNED-PAYLOAD"
]
|> Enum.join("\n")
string_to_sign =
[
"GOOG4-RSA-SHA256",
DateTime.to_iso8601(valid_from, :basic),
"#{Date.to_iso8601(valid_from_date, :basic)}/auto/storage/goog4_request",
Domain.Crypto.hash(:sha256, canonical_request)
]
|> Enum.join("\n")
|> Base.encode64()
request =
Finch.build(
:post,
sign_endpoint_url,
[{"Authorization", "Bearer #{oauth_access_token}"}],
Jason.encode!(%{"payload" => string_to_sign})
)
with {:ok, %Finch.Response{status: 200, body: response}} <-
Finch.request(request, Domain.GoogleCloudPlatform.Finch),
{:ok, %{"signedBlob" => signature}} <- Jason.decode(response) do
signature =
signature
|> Base.decode64!()
|> Base.encode16()
|> String.downcase()
{:ok,
"https://#{cloud_storage_host}#{path}?#{canonical_query_string}&X-Goog-Signature=#{signature}"}
else
{:ok, %Finch.Response{status: status, body: body}} ->
{:error, {status, body}}
{:ok, map} ->
{:error, map}
{:error, reason} ->
{:error, reason}
end
end
defp prepare_headers(headers, host) do
headers = [host: host] ++ headers
headers
|> Enum.map(fn {k, v} -> {k |> to_string() |> String.downcase(), v} end)
|> Enum.sort(fn {k1, _}, {k2, _} -> k1 <= k2 end)
end
@doc false
def canonical_headers(headers) do
headers
|> Enum.group_by(&elem(&1, 0), &elem(&1, 1))
|> Enum.map_join("\n", fn {k, v} -> "#{k}:#{Enum.join(v, ",")}" end)
end
def signed_headers(headers) do
headers
|> Enum.map(&elem(&1, 0))
|> Enum.uniq()
|> Enum.join(";")
end
end

View File

@@ -0,0 +1,27 @@
defmodule Domain.Instrumentation do
alias Domain.Clients
alias Domain.GoogleCloudPlatform
def create_remote_log_sink(%Clients.Client{} = client) do
config = config!()
enabled? = Keyword.fetch!(config, :client_logs_enabled)
if enabled? and GoogleCloudPlatform.enabled?() do
now = DateTime.utc_now() |> DateTime.to_iso8601()
bucket =
Application.fetch_env!(:domain, __MODULE__)
|> Keyword.fetch!(:client_logs_bucket)
filename = "clients/#{client.id}/#{now}-#{System.unique_integer([:positive])}.json"
GoogleCloudPlatform.sign_url(bucket, filename, verb: "PUT")
else
{:error, :disabled}
end
end
defp config! do
Domain.Config.fetch_env!(:domain, __MODULE__)
end
end

View File

@@ -5,39 +5,6 @@ defmodule Domain.Cluster.GoogleComputeLabelsStrategyTest do
alias Cluster.Strategy.State
alias Domain.Mocks.GoogleCloudPlatform
describe "refresh_access_token/1" do
test "returns access token" do
bypass = Bypass.open()
GoogleCloudPlatform.mock_instance_metadata_token_endpoint(bypass)
state = %State{meta: %Meta{}}
assert {:ok, state} = refresh_access_token(state)
assert state.meta.access_token == "GCP_ACCESS_TOKEN"
expected_access_token_expires_at = DateTime.add(DateTime.utc_now(), 3595, :second)
assert DateTime.diff(state.meta.access_token_expires_at, expected_access_token_expires_at) in -2..2
assert_receive {:bypass_request, conn}
assert {"metadata-flavor", "Google"} in conn.req_headers
end
test "returns error when endpoint is not available" do
bypass = Bypass.open()
Bypass.down(bypass)
GoogleCloudPlatform.override_endpoint_url(
:token_endpoint_url,
"http://localhost:#{bypass.port}/"
)
state = %State{meta: %Meta{}}
assert refresh_access_token(state) ==
{:error, %Mint.TransportError{reason: :econnrefused}}
end
end
describe "fetch_nodes/1" do
test "returns list of nodes in all regions when access token is not set" do
bypass = Bypass.open()
@@ -52,130 +19,11 @@ defmodule Domain.Cluster.GoogleComputeLabelsStrategyTest do
]
}
assert {:ok, nodes, state} = fetch_nodes(state)
assert {:ok, nodes, _state} = fetch_nodes(state)
assert nodes == [
:"api@api-q3j6.us-east1-d.c.firezone-staging.internal"
]
assert state.meta.access_token
assert state.meta.access_token_expires_at
end
test "returns list of nodes when token is not expired" do
bypass = Bypass.open()
GoogleCloudPlatform.mock_instances_list_endpoint(bypass)
state = %State{
meta: %Meta{
access_token: "ACCESS_TOKEN",
access_token_expires_at: DateTime.utc_now() |> DateTime.add(5, :second)
},
config: [
project_id: "firezone-staging",
cluster_name: "firezone",
backoff_interval: 1
]
}
assert {:ok, nodes, ^state} = fetch_nodes(state)
assert nodes == [
:"api@api-q3j6.us-east1-d.c.firezone-staging.internal"
]
assert_receive {:bypass_request, conn}
assert {"authorization", "Bearer ACCESS_TOKEN"} in conn.req_headers
end
test "returns error when compute endpoint is down" do
bypass = Bypass.open()
Bypass.down(bypass)
GoogleCloudPlatform.override_endpoint_url(
:aggregated_list_endpoint_url,
"http://localhost:#{bypass.port}/"
)
state = %State{
meta: %Meta{
access_token: "ACCESS_TOKEN",
access_token_expires_at: DateTime.utc_now() |> DateTime.add(5, :second)
},
config: [
project_id: "firezone-staging",
cluster_name: "firezone",
backoff_interval: 1
]
}
assert fetch_nodes(state) == {:error, %Mint.TransportError{reason: :econnrefused}}
GoogleCloudPlatform.override_endpoint_url(
:token_endpoint_url,
"http://localhost:#{bypass.port}/"
)
state = %State{
meta: %Meta{},
config: [
project_id: "firezone-staging",
cluster_name: "firezone",
backoff_interval: 1
]
}
assert fetch_nodes(state) == {:error, %Mint.TransportError{reason: :econnrefused}}
end
test "refreshes the access token if it expired" do
bypass = Bypass.open()
GoogleCloudPlatform.mock_instance_metadata_token_endpoint(bypass)
GoogleCloudPlatform.mock_instances_list_endpoint(bypass)
state = %State{
meta: %Meta{
access_token: "ACCESS_TOKEN",
access_token_expires_at: DateTime.utc_now() |> DateTime.add(-5, :second)
},
config: [
project_id: "firezone-staging",
cluster_name: "firezone",
backoff_interval: 1
]
}
assert {:ok, _nodes, updated_state} = fetch_nodes(state)
assert updated_state.meta.access_token != state.meta.access_token
assert updated_state.meta.access_token_expires_at != state.meta.access_token_expires_at
end
test "refreshes the access token if it became invalid even through did not expire" do
resp = %{
"error" => %{
"code" => 401,
"status" => "UNAUTHENTICATED"
}
}
bypass = Bypass.open()
GoogleCloudPlatform.mock_instance_metadata_token_endpoint(bypass)
GoogleCloudPlatform.mock_instances_list_endpoint(bypass, resp)
state = %State{
meta: %Meta{
access_token: "ACCESS_TOKEN",
access_token_expires_at: DateTime.utc_now() |> DateTime.add(5, :second)
},
config: [
project_id: "firezone-staging",
cluster_name: "firezone",
backoff_interval: 1
]
}
assert {:error, _reason} = fetch_nodes(state)
end
end
end

View File

@@ -0,0 +1,155 @@
defmodule Domain.GoogleCloudPlatformTest do
use ExUnit.Case, async: true
import Domain.GoogleCloudPlatform
alias Domain.Mocks.GoogleCloudPlatform
setup do
bypass = Bypass.open()
%{bypass: bypass}
end
describe "fetch_and_cache_access_token/0" do
test "returns instance default account service token", %{bypass: bypass} do
GoogleCloudPlatform.mock_instance_metadata_token_endpoint(bypass)
assert {:ok, token} = fetch_and_cache_access_token()
assert token
end
test "caches the access token", %{bypass: bypass} do
GoogleCloudPlatform.mock_instance_metadata_token_endpoint(bypass)
assert {:ok, _token} = fetch_and_cache_access_token()
Bypass.down(bypass)
assert {:ok, _token} = fetch_and_cache_access_token()
end
end
describe "fetch_access_token/0" do
test "returns instance default account service token", %{bypass: bypass} do
GoogleCloudPlatform.mock_instance_metadata_token_endpoint(bypass)
assert {:ok, token, token_expires_at} = fetch_access_token()
assert token
assert %DateTime{} = token_expires_at
end
test "returns error on failure", %{bypass: bypass} do
Bypass.down(bypass)
GoogleCloudPlatform.mock_instance_metadata_token_endpoint(bypass)
assert fetch_access_token() ==
{:error, %Mint.TransportError{reason: :econnrefused}}
end
end
describe "list_google_cloud_instances_by_label/3" do
test "returns list of nodes in all regions when access token is not set", %{bypass: bypass} do
GoogleCloudPlatform.mock_instance_metadata_token_endpoint(bypass)
GoogleCloudPlatform.mock_instances_list_endpoint(bypass)
assert {:ok, nodes} =
list_google_cloud_instances_by_label(
"firezone-staging",
"cluster_name",
"firezone"
)
assert length(nodes) == 1
assert [
%{
"name" => "api-q3j6",
"zone" =>
"https://www.googleapis.com/compute/v1/projects/firezone-staging/zones/us-east1-d",
"labels" => %{
"application" => "api",
"cluster_name" => "firezone",
"container-vm" => "cos-105-17412-101-13",
"managed_by" => "terraform",
"version" => "0-0-1"
}
}
] = nodes
end
test "returns error when compute endpoint is down", %{bypass: bypass} do
GoogleCloudPlatform.mock_instance_metadata_token_endpoint(bypass)
Bypass.down(bypass)
GoogleCloudPlatform.override_endpoint_url(
:aggregated_list_endpoint_url,
"http://localhost:#{bypass.port}/"
)
assert list_google_cloud_instances_by_label("firezone-staging", "cluster_name", "firezone") ==
{:error, %Mint.TransportError{reason: :econnrefused}}
GoogleCloudPlatform.override_endpoint_url(
:token_endpoint_url,
"http://localhost:#{bypass.port}/"
)
assert list_google_cloud_instances_by_label("firezone-staging", "cluster_name", "firezone") ==
{:error, %Mint.TransportError{reason: :econnrefused}}
end
end
describe "sign_url/3" do
test "returns error when endpoint is not available", %{bypass: bypass} do
GoogleCloudPlatform.mock_instance_metadata_token_endpoint(bypass)
GoogleCloudPlatform.override_endpoint_url(
:sign_endpoint_url,
"http://localhost:#{bypass.port}/"
)
Bypass.down(bypass)
assert sign_url("logs", "clients/id/log.json.tar.gz", verb: "PUT") ==
{:error, %Mint.TransportError{reason: :econnrefused}}
end
test "returns error when endpoint returns an error", %{bypass: bypass} do
GoogleCloudPlatform.mock_instance_metadata_token_endpoint(bypass)
GoogleCloudPlatform.mock_sign_blob_endpoint(bypass, "foo", %{"error" => "reason"})
assert sign_url("logs", "clients/id/log.json.tar.gz", verb: "PUT") ==
{:error, %{"error" => "reason"}}
end
test "returns signed url", %{bypass: bypass} do
fixed_datetime = ~U[2000-01-01 00:00:00.000000Z]
GoogleCloudPlatform.mock_instance_metadata_token_endpoint(bypass)
GoogleCloudPlatform.mock_sign_blob_endpoint(bypass, "foo")
assert {:ok, signed_url} =
sign_url("logs", "clients/id/log.json.tar.gz",
verb: "PUT",
valid_from: fixed_datetime
)
assert {:ok, signed_uri} = URI.new(signed_url)
assert signed_uri.scheme == "https"
assert signed_uri.host == "storage.googleapis.com"
assert signed_uri.path == "/logs/clients/id/log.json.tar.gz"
assert URI.decode_query(signed_uri.query) == %{
"X-Goog-Algorithm" => "GOOG4-RSA-SHA256",
"X-Goog-Credential" => "foo@iam.example.com/20000101/auto/storage/goog4_request",
"X-Goog-Date" => "20000101T000000Z",
"X-Goog-Expires" => "604800",
"X-Goog-Signature" => "efdd75f1feb87fa75a71ee36e9bf1bd35f777fcdb9e7cd1f",
"X-Goog-SignedHeaders" => "host"
}
assert_receive {:bypass_request,
%{request_path: "/service_accounts/foo@iam.example.com:signBlob"} = conn}
assert {"authorization", "Bearer GCP_ACCESS_TOKEN"} in conn.req_headers
assert conn.method == "POST"
end
end
end

View File

@@ -0,0 +1,38 @@
defmodule Domain.GoogleCloudPlatform.URLSignerTest do
use ExUnit.Case, async: true
import Domain.GoogleCloudPlatform.URLSigner
describe "canonical_headers/1" do
test "generates valid canonical form of the headers" do
headers = [
{"content-type", "text/plain"},
{"host", "storage.googleapis.com"},
{"x-goog-meta-reviewer", "jane"},
{"x-goog-meta-reviewer", "john"}
]
assert canonical_headers = canonical_headers(headers)
assert is_binary(canonical_headers)
assert String.split(canonical_headers, "\n") ==
[
"content-type:text/plain",
"host:storage.googleapis.com",
"x-goog-meta-reviewer:jane,john"
]
end
end
describe "signed_headers/1" do
test "generates valid canonical form of the headers" do
headers = [
{"content-type", "text/plain"},
{"host", "storage.googleapis.com"},
{"x-goog-meta-reviewer", "jane"},
{"x-goog-meta-reviewer", "john"}
]
assert signed_headers(headers) == "content-type;host;x-goog-meta-reviewer"
end
end
end

View File

@@ -0,0 +1,32 @@
defmodule Domain.InstrumentationTest do
use Domain.DataCase, async: true
import Domain.Instrumentation
alias Domain.Mocks.GoogleCloudPlatform
describe "create_remote_log_sink/1" do
test "returns an error if feature is disabled" do
client = Fixtures.Clients.create_client()
Domain.Config.put_env_override(Domain.Instrumentation, client_logs_enabled: false)
assert create_remote_log_sink(client) == {:error, :disabled}
end
test "returns a signed URL" do
bypass = Bypass.open()
GoogleCloudPlatform.mock_instance_metadata_token_endpoint(bypass)
GoogleCloudPlatform.mock_sign_blob_endpoint(bypass, "foo")
client = Fixtures.Clients.create_client()
assert {:ok, signed_url} = create_remote_log_sink(client)
assert signed_uri = URI.parse(signed_url)
assert signed_uri.scheme == "https"
assert signed_uri.host == "storage.googleapis.com"
assert String.starts_with?(signed_uri.path, "/logs/clients/#{client.id}/")
assert String.ends_with?(signed_uri.path, ".json")
end
end
end

View File

@@ -10,8 +10,8 @@ defmodule Domain.Jobs.Executors.GlobalTest do
test "executes the handler on the interval" do
assert {:ok, _pid} = start_link({{__MODULE__, :send_test_message}, 25, test_pid: self()})
assert_receive {:executed, _pid, time1}
assert_receive {:executed, _pid, time2}
assert_receive {:executed, _pid, time1}, 200
assert_receive {:executed, _pid, time2}, 200
assert time1 < time2
end
@@ -25,7 +25,7 @@ defmodule Domain.Jobs.Executors.GlobalTest do
})
refute_receive {:executed, _pid, _time}, 50
assert_receive {:executed, _pid, _time}, 200
assert_receive {:executed, _pid, _time}, 400
end
test "registers itself as a leader if there is no global name registered" do

View File

@@ -1,13 +1,8 @@
defmodule Domain.Mocks.GoogleCloudPlatform do
def override_endpoint_url(endpoint, url) do
config = Domain.Config.fetch_env!(:domain, Domain.Cluster.GoogleComputeLabelsStrategy)
strategy_config = Keyword.put(config, endpoint, url)
Domain.Config.put_env_override(
:domain,
Domain.Cluster.GoogleComputeLabelsStrategy,
strategy_config
)
config = Domain.Config.fetch_env!(:domain, Domain.GoogleCloudPlatform)
config = Keyword.put(config, endpoint, url)
Domain.Config.put_env_override(:domain, Domain.GoogleCloudPlatform, config)
end
def mock_instance_metadata_token_endpoint(bypass, resp \\ nil) do
@@ -23,7 +18,7 @@ defmodule Domain.Mocks.GoogleCloudPlatform do
test_pid = self()
Bypass.expect(bypass, "GET", token_endpoint_path, fn conn ->
Bypass.stub(bypass, "GET", token_endpoint_path, fn conn ->
conn = Plug.Conn.fetch_query_params(conn)
send(test_pid, {:bypass_request, conn})
Plug.Conn.send_resp(conn, 200, Jason.encode!(resp))
@@ -37,6 +32,35 @@ defmodule Domain.Mocks.GoogleCloudPlatform do
bypass
end
def mock_sign_blob_endpoint(bypass, service_account_email, resp \\ nil) do
token_endpoint_path = "service_accounts/#{service_account_email}:signBlob"
test_pid = self()
Bypass.expect(bypass, "POST", token_endpoint_path, fn conn ->
conn = Plug.Conn.fetch_query_params(conn)
send(test_pid, {:bypass_request, conn})
{:ok, binary, conn} = Plug.Conn.read_body(conn)
%{"payload" => payload} = Jason.decode!(binary)
resp =
resp ||
%{
"keyId" => Ecto.UUID.generate(),
"signedBlob" => Domain.Crypto.hash(:md5, service_account_email <> payload)
}
Plug.Conn.send_resp(conn, 200, Jason.encode!(resp))
end)
override_endpoint_url(
:sign_endpoint_url,
"http://localhost:#{bypass.port}/service_accounts/"
)
bypass
end
def mock_instances_list_endpoint(bypass, resp \\ nil) do
aggregated_instances_endpoint_path =
"compute/v1/projects/firezone-staging/aggregated/instances"

View File

@@ -53,6 +53,24 @@ config :domain, Domain.Auth.Adapters.GoogleWorkspace.APIClient,
endpoint: "https://admin.googleapis.com",
finch_transport_opts: []
config :domain, platform_adapter: nil
config :domain, Domain.GoogleCloudPlatform,
token_endpoint_url:
"http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token",
aggregated_list_endpoint_url:
"https://compute.googleapis.com/compute/v1/projects/${project_id}/aggregated/instances",
sign_endpoint_url: "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/",
cloud_storage_url: "https://storage.googleapis.com"
config :domain, Domain.Cluster,
adapter: nil,
adapter_config: []
config :domain, Domain.Instrumentation,
client_logs_enabled: true,
client_logs_bucket: "logs"
###############################
##### Web #####################
###############################
@@ -134,20 +152,6 @@ config :api,
external_trusted_proxies: [],
private_clients: [%{__struct__: Postgrex.INET, address: {172, 28, 0, 0}, netmask: 16}]
###############################
##### Erlang Cluster ##########
###############################
config :domain, Domain.Cluster.GoogleComputeLabelsStrategy,
token_endpoint_url:
"http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token",
aggregated_list_endpoint_url:
"https://compute.googleapis.com/compute/v1/projects/${project_id}/aggregated/instances"
config :domain, Domain.Cluster,
adapter: Domain.Cluster.Local,
adapter_config: []
###############################
##### Third-party configs #####
###############################

View File

@@ -51,6 +51,20 @@ if config_env() == :prod do
config :domain, Domain.Auth.Adapters.GoogleWorkspace.APIClient,
finch_transport_opts: compile_config!(:http_client_ssl_opts)
config :domain, platform_adapter: compile_config!(:platform_adapter)
if platform_adapter = compile_config!(:platform_adapter) do
config :domain, platform_adapter, compile_config!(:platform_adapter_config)
end
config :domain, Domain.Cluster,
adapter: compile_config!(:erlang_cluster_adapter),
adapter_config: compile_config!(:erlang_cluster_adapter_config)
config :domain, Domain.Instrumentation,
client_logs_enabled: compile_config!(:instrumentation_client_logs_enabled),
client_logs_bucket: compile_config!(:instrumentation_client_logs_bucket)
###############################
##### Web #####################
###############################
@@ -108,14 +122,6 @@ if config_env() == :prod do
external_trusted_proxies: compile_config!(:phoenix_external_trusted_proxies),
private_clients: compile_config!(:phoenix_private_clients)
###############################
##### Erlang Cluster ##########
###############################
config :domain, Domain.Cluster,
adapter: compile_config!(:erlang_cluster_adapter),
adapter_config: compile_config!(:erlang_cluster_adapter_config)
###############################
##### Third-party configs #####
###############################

View File

@@ -22,6 +22,12 @@ config :domain, Domain.Telemetry, enabled: false
config :domain, Domain.ConnectivityChecks, enabled: false
config :domain, platform_adapter: Domain.GoogleCloudPlatform
config :domain, Domain.GoogleCloudPlatform,
project_id: "fz-test",
service_account_email: "foo@iam.example.com"
###############################
##### Web #####################
###############################

View File

@@ -232,6 +232,47 @@ resource "google_sql_database" "firezone" {
instance = module.google-cloud-sql.master_instance_name
}
resource "google_storage_bucket" "client-logs" {
project = module.google-cloud-project.project.project_id
name = "${module.google-cloud-project.project.project_id}-client-logs"
location = "US"
lifecycle_rule {
condition {
age = 3
}
action {
type = "Delete"
}
}
lifecycle_rule {
condition {
age = 1
}
action {
type = "AbortIncompleteMultipartUpload"
}
}
logging {
log_bucket = true
log_object_prefix = "firezone.dev/clients"
}
public_access_prevention = "enforced"
uniform_bucket_level_access = true
lifecycle {
prevent_destroy = true
ignore_changes = []
}
}
locals {
cluster = {
name = "firezone"
@@ -338,6 +379,14 @@ locals {
name = "TELEMETRY_ENABLED"
value = "false"
},
{
name = "INSTRUMENTATION_CLIENT_LOGS_ENABLED"
value = true
},
{
name = "INSTRUMENTATION_CLIENT_LOGS_BUCKET"
value = google_storage_bucket.client-logs.name
},
# Emails
{
name = "OUTBOUND_EMAIL_ADAPTER"
@@ -490,6 +539,35 @@ module "api" {
application_labels = {
"cluster_name" = local.cluster.name
}
application_token_scopes = [
"https://www.googleapis.com/auth/cloud-platform"
]
}
## Allow API nodes to sign URLs for Google Cloud Storage
resource "google_storage_bucket_iam_member" "dealerships-catalog-photos-public-rule" {
bucket = google_storage_bucket.client-logs.name
role = "roles/storage.objectAdmin"
member = "serviceAccount:${module.api.service_account.email}"
}
resource "google_project_iam_custom_role" "sign-urls" {
project = module.google-cloud-project.project.project_id
title = "Sign URLs for Google Cloud Storage"
role_id = "iam.sign_urls"
permissions = [
"iam.serviceAccounts.signBlob"
]
}
resource "google_project_iam_member" "sign-urls" {
project = module.google-cloud-project.project.project_id
role = "projects/${module.google-cloud-project.project.project_id}/roles/${google_project_iam_custom_role.sign-urls.role_id}"
member = "serviceAccount:${module.api.service_account.email}"
}
# Erlang Cluster
@@ -660,6 +738,7 @@ resource "google_compute_firewall" "ssh-ipv4" {
source_ranges = ["0.0.0.0/0"]
target_tags = concat(module.web.target_tags, module.api.target_tags)
}
resource "google_compute_firewall" "ssh-ipv6" {
project = module.google-cloud-project.project.project_id
@@ -681,7 +760,7 @@ resource "google_compute_firewall" "ssh-ipv6" {
ports = [22]
}
source_ranges = ["::0/0"]
source_ranges = ["::/0"]
target_tags = concat(module.web.target_tags, module.api.target_tags)
}
@@ -735,6 +814,6 @@ resource "google_compute_firewall" "relays-ssh-ipv6" {
ports = [22]
}
source_ranges = ["::0/0"]
source_ranges = ["::/0"]
target_tags = module.relays[0].target_tags
}

View File

@@ -36,6 +36,17 @@ locals {
{
name = "OTEL_RESOURCE_ATTRIBUTES"
value = "application.name=${local.application_name}"
},
{
name = "PLATFORM_ADAPTER"
value = "Elixir.Domain.GoogleCloudPlatform"
},
{
name = "PLATFORM_ADAPTER_CONFIG"
value = jsonencode({
project_id = var.project_id
service_account_email = google_service_account.application.email
})
}
], var.application_environment_variables)
@@ -168,7 +179,7 @@ resource "google_compute_instance_template" "application" {
service_account {
email = google_service_account.application.email
scopes = [
scopes = concat([
# Those are default scopes
"https://www.googleapis.com/auth/devstorage.read_only",
"https://www.googleapis.com/auth/logging.write",
@@ -178,7 +189,7 @@ resource "google_compute_instance_template" "application" {
"https://www.googleapis.com/auth/trace.append",
# Required to discover the other instances in the Erlang Cluster
"https://www.googleapis.com/auth/compute.readonly"
]
], var.application_token_scopes)
}
shielded_instance_config {
@@ -622,7 +633,7 @@ resource "google_compute_firewall" "egress-ipv6" {
direction = "EGRESS"
target_tags = ["app-${local.application_name}"]
destination_ranges = ["::0/0"]
destination_ranges = ["::/0"]
allow {
protocol = "all"

View File

@@ -212,6 +212,14 @@ variable "application_labels" {
description = "Labels to add to all created by this module resources."
}
variable "application_token_scopes" {
type = list(string)
nullable = false
default = []
description = "Any extra oAuth2 token scopes granted to the token of default service account."
}
variable "application_dns_tld" {
type = string
nullable = false

View File

@@ -394,7 +394,7 @@ resource "google_compute_firewall" "stun-turn-ipv6" {
name = "${local.application_name}-firewall-lb-to-instances-ipv6"
network = google_compute_network.network.self_link
source_ranges = ["::0/0"]
source_ranges = ["::/0"]
target_tags = ["app-${local.application_name}"]
allow {
@@ -448,7 +448,7 @@ resource "google_compute_firewall" "ingress-ipv6" {
direction = "INGRESS"
target_tags = ["app-${local.application_name}"]
source_ranges = ["::0/0"]
source_ranges = ["::/0"]
allow {
protocol = "udp"
@@ -479,7 +479,7 @@ resource "google_compute_firewall" "egress-ipv6" {
direction = "EGRESS"
target_tags = ["app-${local.application_name}"]
destination_ranges = ["::0/0"]
destination_ranges = ["::/0"]
allow {
protocol = "udp"