mirror of
https://github.com/outbackdingo/firezone.git
synced 2026-01-27 10:18:54 +00:00
chore(portal): Send metrics to Google Cloud Monitoring (#4564)
This commit is contained in:
@@ -7,7 +7,6 @@ defmodule API.Application do
|
||||
_ = OpentelemetryPhoenix.setup(adapter: :cowboy2)
|
||||
|
||||
children = [
|
||||
API.Telemetry,
|
||||
API.Endpoint
|
||||
]
|
||||
|
||||
|
||||
@@ -1,71 +0,0 @@
|
||||
defmodule API.Telemetry do
|
||||
use Supervisor
|
||||
import Telemetry.Metrics
|
||||
|
||||
def start_link(arg) do
|
||||
Supervisor.start_link(__MODULE__, arg, name: __MODULE__)
|
||||
end
|
||||
|
||||
@impl true
|
||||
def init(_arg) do
|
||||
children = [
|
||||
# Telemetry poller will execute the given period measurements
|
||||
# every 10_000ms. Learn more here: https://hexdocs.pm/telemetry_metrics
|
||||
{:telemetry_poller, measurements: periodic_measurements(), period: 10_000}
|
||||
# Add reporters as children of your supervision tree.
|
||||
# {Telemetry.Metrics.ConsoleReporter, metrics: metrics()}
|
||||
]
|
||||
|
||||
Supervisor.init(children, strategy: :one_for_one)
|
||||
end
|
||||
|
||||
def metrics do
|
||||
[
|
||||
# Phoenix Metrics
|
||||
summary("phoenix.endpoint.start.system_time",
|
||||
unit: {:native, :millisecond}
|
||||
),
|
||||
summary("phoenix.endpoint.stop.duration",
|
||||
unit: {:native, :millisecond}
|
||||
),
|
||||
summary("phoenix.router_dispatch.start.system_time",
|
||||
tags: [:route],
|
||||
unit: {:native, :millisecond}
|
||||
),
|
||||
summary("phoenix.router_dispatch.exception.duration",
|
||||
tags: [:route],
|
||||
unit: {:native, :millisecond}
|
||||
),
|
||||
summary("phoenix.router_dispatch.stop.duration",
|
||||
tags: [:route],
|
||||
unit: {:native, :millisecond}
|
||||
),
|
||||
summary("phoenix.socket_connected.duration",
|
||||
unit: {:native, :millisecond}
|
||||
),
|
||||
summary("phoenix.channel_join.duration",
|
||||
unit: {:native, :millisecond}
|
||||
),
|
||||
summary("phoenix.channel_handled_in.duration",
|
||||
tags: [:event],
|
||||
unit: {:native, :millisecond}
|
||||
),
|
||||
|
||||
# VM Metrics
|
||||
summary("vm.memory.total", unit: {:byte, :kilobyte}),
|
||||
summary("vm.total_run_queue_lengths.total"),
|
||||
summary("vm.total_run_queue_lengths.cpu"),
|
||||
summary("vm.total_run_queue_lengths.io")
|
||||
]
|
||||
end
|
||||
|
||||
# XXX: We might want to contribute initial_timeout to telemetry_poller
|
||||
# so that we can remove out custom gen_servers that do long polling.
|
||||
defp periodic_measurements do
|
||||
[
|
||||
# A module, function and arguments to be invoked periodically.
|
||||
# This function must call :telemetry.execute/3 and a metric must be added above.
|
||||
# {API, :count_users, []}
|
||||
]
|
||||
end
|
||||
end
|
||||
@@ -167,7 +167,13 @@ defmodule Domain.Cluster.GoogleComputeLabelsStrategy do
|
||||
node_name
|
||||
end)
|
||||
|
||||
Logger.debug("Found #{length(nodes)}", module: __MODULE__, nodes: Enum.join(nodes, ", "))
|
||||
count = length(nodes)
|
||||
|
||||
:telemetry.execute([:domain, :cluster], %{
|
||||
discovered_nodes_count: count
|
||||
})
|
||||
|
||||
Logger.debug("Found #{count}", module: __MODULE__, nodes: Enum.join(nodes, ", "))
|
||||
|
||||
{:ok, nodes}
|
||||
end
|
||||
|
||||
@@ -120,7 +120,8 @@ defmodule Domain.Config.Definitions do
|
||||
{"Instrumentation",
|
||||
[
|
||||
:instrumentation_client_logs_enabled,
|
||||
:instrumentation_client_logs_bucket
|
||||
:instrumentation_client_logs_bucket,
|
||||
:telemetry_metrics_reporter
|
||||
]}
|
||||
]
|
||||
end
|
||||
@@ -453,6 +454,29 @@ defmodule Domain.Config.Definitions do
|
||||
"""
|
||||
defconfig(:instrumentation_client_logs_bucket, :string, default: "logs")
|
||||
|
||||
@doc """
|
||||
Reporter to use for sending metrics to the telemetry backend.
|
||||
"""
|
||||
defconfig(
|
||||
:telemetry_metrics_reporter,
|
||||
{:parameterized, Ecto.Enum,
|
||||
Ecto.Enum.init(
|
||||
values: [
|
||||
Telemetry.Metrics.ConsoleReporter,
|
||||
Domain.Telemetry.GoogleCloudMetricsReporter
|
||||
]
|
||||
)},
|
||||
default: nil
|
||||
)
|
||||
|
||||
@doc """
|
||||
Configuration for the telemetry metrics reporter.
|
||||
"""
|
||||
defconfig(:telemetry_metrics_reporter_opts, :map,
|
||||
default: %{},
|
||||
dump: &Dumper.keyword/1
|
||||
)
|
||||
|
||||
##############################################
|
||||
## Gateways
|
||||
##############################################
|
||||
|
||||
@@ -36,8 +36,12 @@ defmodule Domain.GoogleCloudPlatform do
|
||||
# are limited by the service account attached to it.
|
||||
def fetch_access_token do
|
||||
config = fetch_config!()
|
||||
token_endpoint_url = Keyword.fetch!(config, :token_endpoint_url)
|
||||
request = Finch.build(:get, token_endpoint_url, [{"Metadata-Flavor", "Google"}])
|
||||
metadata_endpoint_url = Keyword.fetch!(config, :metadata_endpoint_url)
|
||||
|
||||
request =
|
||||
Finch.build(:get, metadata_endpoint_url <> "/instance/service-accounts/default/token", [
|
||||
{"Metadata-Flavor", "Google"}
|
||||
])
|
||||
|
||||
case Finch.request(request, __MODULE__.Finch) do
|
||||
{:ok, %Finch.Response{status: 200, body: response}} ->
|
||||
@@ -55,6 +59,52 @@ defmodule Domain.GoogleCloudPlatform do
|
||||
end
|
||||
end
|
||||
|
||||
def fetch_instance_id do
|
||||
config = fetch_config!()
|
||||
metadata_endpoint_url = Keyword.fetch!(config, :metadata_endpoint_url)
|
||||
|
||||
request =
|
||||
Finch.build(:get, metadata_endpoint_url <> "/instance/id", [
|
||||
{"Metadata-Flavor", "Google"}
|
||||
])
|
||||
|
||||
case Finch.request(request, __MODULE__.Finch) do
|
||||
{:ok, %Finch.Response{status: 200, body: instance_id}} ->
|
||||
{:ok, instance_id}
|
||||
|
||||
{:ok, response} ->
|
||||
Logger.error("Can't fetch instance ID", reason: inspect(response))
|
||||
{:error, {response.status, response.body}}
|
||||
|
||||
{:error, reason} ->
|
||||
Logger.error("Can't fetch instance ID", reason: inspect(reason))
|
||||
{:error, reason}
|
||||
end
|
||||
end
|
||||
|
||||
def fetch_instance_zone do
|
||||
config = fetch_config!()
|
||||
metadata_endpoint_url = Keyword.fetch!(config, :metadata_endpoint_url)
|
||||
|
||||
request =
|
||||
Finch.build(:get, metadata_endpoint_url <> "/instance/zone", [
|
||||
{"Metadata-Flavor", "Google"}
|
||||
])
|
||||
|
||||
case Finch.request(request, __MODULE__.Finch) do
|
||||
{:ok, %Finch.Response{status: 200, body: zone}} ->
|
||||
{:ok, zone |> String.split("/") |> List.last()}
|
||||
|
||||
{:ok, response} ->
|
||||
Logger.error("Can't fetch instance zone", reason: inspect(response))
|
||||
{:error, {response.status, response.body}}
|
||||
|
||||
{:error, reason} ->
|
||||
Logger.error("Can't fetch instance zone", reason: inspect(reason))
|
||||
{:error, reason}
|
||||
end
|
||||
end
|
||||
|
||||
def list_google_cloud_instances_by_labels(project_id, label_values) do
|
||||
aggregated_list_endpoint_url =
|
||||
fetch_config!()
|
||||
@@ -132,6 +182,39 @@ defmodule Domain.GoogleCloudPlatform do
|
||||
end
|
||||
end
|
||||
|
||||
@doc """
|
||||
Sends metrics to Google Cloud Monitoring API.
|
||||
"""
|
||||
def send_metrics(project_id, time_series) do
|
||||
cloud_metrics_endpoint_url =
|
||||
fetch_config!()
|
||||
|> Keyword.fetch!(:cloud_metrics_endpoint_url)
|
||||
|> String.replace("${project_id}", project_id)
|
||||
|
||||
body = Jason.encode!(%{"timeSeries" => time_series})
|
||||
|
||||
with {:ok, access_token} <- fetch_and_cache_access_token(),
|
||||
request =
|
||||
Finch.build(
|
||||
:post,
|
||||
cloud_metrics_endpoint_url,
|
||||
[
|
||||
{"Content-Type", "application/json"},
|
||||
{"Authorization", "Bearer #{access_token}"}
|
||||
],
|
||||
body
|
||||
),
|
||||
{:ok, %{status: 200}} <- Finch.request(request, __MODULE__.Finch) do
|
||||
:ok
|
||||
else
|
||||
{:ok, %{status: status, body: body}} ->
|
||||
{:error, {status, body}}
|
||||
|
||||
{:error, reason} ->
|
||||
{:error, reason}
|
||||
end
|
||||
end
|
||||
|
||||
defp fetch_config! do
|
||||
Domain.Config.fetch_env!(:domain, __MODULE__)
|
||||
end
|
||||
|
||||
@@ -16,6 +16,14 @@ defmodule Domain.Relays do
|
||||
Supervisor.init(children, strategy: :one_for_one)
|
||||
end
|
||||
|
||||
def send_metrics do
|
||||
count = global_groups_presence_topic() |> Presence.list() |> Enum.count()
|
||||
|
||||
:telemetry.execute([:domain, :relays], %{
|
||||
online_relays_count: count
|
||||
})
|
||||
end
|
||||
|
||||
def fetch_group_by_id(id, %Auth.Subject{} = subject, opts \\ []) do
|
||||
with :ok <- Auth.ensure_has_permissions(subject, Authorizer.manage_relays_permission()),
|
||||
true <- Repo.valid_uuid?(id) do
|
||||
|
||||
@@ -9,6 +9,8 @@ defmodule Domain.Telemetry do
|
||||
|
||||
@impl true
|
||||
def init(_arg) do
|
||||
config = Domain.Config.fetch_env!(:domain, __MODULE__)
|
||||
|
||||
children = [
|
||||
# We start a /healthz endpoint that is used for liveness probes
|
||||
{Bandit, plug: Telemetry.HealthzPlug, scheme: :http, port: 4000},
|
||||
@@ -16,35 +18,72 @@ defmodule Domain.Telemetry do
|
||||
# Telemetry poller will execute the given period measurements
|
||||
# every 10_000ms. Learn more here: https://hexdocs.pm/telemetry_metrics
|
||||
{:telemetry_poller, measurements: periodic_measurements(), period: 10_000}
|
||||
# Add reporters as children of your supervision tree.
|
||||
# {Telemetry.Metrics.ConsoleReporter, metrics: metrics()}
|
||||
]
|
||||
|
||||
Supervisor.init(children, strategy: :one_for_one)
|
||||
reporter_children =
|
||||
if metrics_reporter = Keyword.get(config, :metrics_reporter) do
|
||||
[{metrics_reporter, metrics: metrics()}]
|
||||
else
|
||||
[]
|
||||
end
|
||||
|
||||
Supervisor.init(children ++ reporter_children, strategy: :one_for_one)
|
||||
end
|
||||
|
||||
def metrics do
|
||||
[
|
||||
# Database Metrics
|
||||
summary("domain.repo.query.total_time", unit: {:native, :millisecond}),
|
||||
distribution("domain.repo.query.total_time", unit: {:native, :millisecond}),
|
||||
summary("domain.repo.query.decode_time", unit: {:native, :millisecond}),
|
||||
summary("domain.repo.query.query_time", unit: {:native, :millisecond}),
|
||||
summary("domain.repo.query.query_time", tags: [:query], unit: {:native, :millisecond}),
|
||||
summary("domain.repo.query.queue_time", unit: {:native, :millisecond}),
|
||||
summary("domain.repo.query.idle_time", unit: {:native, :millisecond}),
|
||||
|
||||
# Phoenix Metrics
|
||||
summary("phoenix.endpoint.start.system_time",
|
||||
unit: {:native, :millisecond}
|
||||
),
|
||||
summary("phoenix.endpoint.stop.duration",
|
||||
unit: {:native, :millisecond}
|
||||
),
|
||||
summary("phoenix.router_dispatch.start.system_time",
|
||||
tags: [:route],
|
||||
unit: {:native, :millisecond}
|
||||
),
|
||||
summary("phoenix.router_dispatch.exception.duration",
|
||||
tags: [:route],
|
||||
unit: {:native, :millisecond}
|
||||
),
|
||||
summary("phoenix.router_dispatch.stop.duration",
|
||||
tags: [:route],
|
||||
unit: {:native, :millisecond}
|
||||
),
|
||||
summary("phoenix.socket_connected.duration",
|
||||
unit: {:native, :millisecond}
|
||||
),
|
||||
summary("phoenix.channel_join.duration",
|
||||
unit: {:native, :millisecond}
|
||||
),
|
||||
summary("phoenix.channel_handled_in.duration",
|
||||
tags: [:event],
|
||||
unit: {:native, :millisecond}
|
||||
),
|
||||
|
||||
# VM Metrics
|
||||
summary("vm.memory.total", unit: {:byte, :kilobyte}),
|
||||
summary("vm.total_run_queue_lengths.total"),
|
||||
summary("vm.total_run_queue_lengths.cpu"),
|
||||
summary("vm.total_run_queue_lengths.io")
|
||||
summary("vm.total_run_queue_lengths.io"),
|
||||
|
||||
# Application metrics
|
||||
last_value("domain.relays.online_relays_count"),
|
||||
last_value("domain.metrics.discovered_nodes_count")
|
||||
]
|
||||
end
|
||||
|
||||
defp periodic_measurements do
|
||||
[
|
||||
# A module, function and arguments to be invoked periodically.
|
||||
# This function must call :telemetry.execute/3 and a metric must be added above.
|
||||
# {Web, :count_users, []}
|
||||
{Domain.Relays, :send_metrics, []}
|
||||
]
|
||||
end
|
||||
end
|
||||
|
||||
@@ -0,0 +1,458 @@
|
||||
defmodule Domain.Telemetry.GoogleCloudMetricsReporter do
|
||||
@doc """
|
||||
This module implement Telemetry reporter that sends metrics to Google Cloud Monitoring API,
|
||||
with a *best effort* approach. It means that if the reporter fails to send metrics to the API,
|
||||
it will not retry, but it will keep trying to send new metrics.
|
||||
"""
|
||||
use GenServer
|
||||
alias Telemetry.Metrics
|
||||
alias Domain.GoogleCloudPlatform
|
||||
require Logger
|
||||
|
||||
# Maximum number of metrics a buffer can hold,
|
||||
# after this count they will be delivered or flushed right away.
|
||||
@buffer_size 100
|
||||
|
||||
# Maximum time in seconds to wait before flushing the buffer
|
||||
# in case it did not reach the @buffer_size limit within the flush interval
|
||||
@flush_interval :timer.seconds(10)
|
||||
|
||||
def start_link(opts) do
|
||||
project_id =
|
||||
Application.fetch_env!(:domain, __MODULE__)
|
||||
|> Keyword.fetch!(:project_id)
|
||||
|
||||
metrics = Keyword.fetch!(opts, :metrics)
|
||||
GenServer.start_link(__MODULE__, {metrics, project_id})
|
||||
end
|
||||
|
||||
@impl true
|
||||
def init({metrics, project_id}) do
|
||||
Process.flag(:trap_exit, true)
|
||||
groups = Enum.group_by(metrics, & &1.event_name)
|
||||
|
||||
node_name =
|
||||
System.get_env("RELEASE_NODE") ||
|
||||
Node.self()
|
||||
|
||||
application_version =
|
||||
System.get_env("RELEASE_VERSION") ||
|
||||
to_string(Application.spec(:domain, :vsn))
|
||||
|
||||
labels = %{node_name: node_name, application_version: application_version}
|
||||
|
||||
{:ok, instance_id} = GoogleCloudPlatform.fetch_instance_id()
|
||||
{:ok, zone} = GoogleCloudPlatform.fetch_instance_zone()
|
||||
|
||||
resource = %{
|
||||
"type" => "gce_instance",
|
||||
"labels" => %{
|
||||
"project_id" => project_id,
|
||||
"instance_id" => instance_id,
|
||||
"zone" => zone
|
||||
}
|
||||
}
|
||||
|
||||
events =
|
||||
for {event, metrics} <- groups do
|
||||
id = {__MODULE__, event, self()}
|
||||
:telemetry.attach(id, event, &__MODULE__.handle_event/4, {metrics, self()})
|
||||
event
|
||||
end
|
||||
|
||||
Process.send_after(self(), :flush, @flush_interval)
|
||||
|
||||
{:ok, {events, project_id, {resource, labels}, {0, %{}}}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def terminate(_, {events, _project_id, _resource_and_labels, {_buffer_size, _buffer}}) do
|
||||
for event <- events do
|
||||
:telemetry.detach({__MODULE__, event, self()})
|
||||
end
|
||||
|
||||
:ok
|
||||
end
|
||||
|
||||
## Metrics aggregation and delivery
|
||||
|
||||
@impl true
|
||||
def handle_info(
|
||||
{:compressed_metrics, compressed},
|
||||
{events, project_id, {resource, labels}, {buffer_size, buffer}}
|
||||
) do
|
||||
{buffer_size, buffer} =
|
||||
Enum.reduce(
|
||||
compressed,
|
||||
{buffer_size, buffer},
|
||||
fn {schema, name, tags, at, measurement, unit}, {buffer_size, buffer} ->
|
||||
{increment, buffer} =
|
||||
Map.get_and_update(buffer, {schema, name, tags, unit}, fn prev_value ->
|
||||
buffer(schema, prev_value, at, measurement)
|
||||
end)
|
||||
|
||||
{buffer_size + increment, buffer}
|
||||
end
|
||||
)
|
||||
|
||||
{buffer_size, buffer} =
|
||||
if buffer_size >= @buffer_size do
|
||||
flush(project_id, resource, labels, {buffer_size, buffer})
|
||||
else
|
||||
{buffer_size, buffer}
|
||||
end
|
||||
|
||||
{:noreply, {events, project_id, {resource, labels}, {buffer_size, buffer}}}
|
||||
end
|
||||
|
||||
def handle_info(:flush, {events, project_id, {resource, labels}, {buffer_size, buffer}}) do
|
||||
{buffer_size, buffer} = flush(project_id, resource, labels, {buffer_size, buffer})
|
||||
Process.send_after(self(), :flush, @flush_interval)
|
||||
{:noreply, {events, project_id, {resource, labels}, {buffer_size, buffer}}}
|
||||
end
|
||||
|
||||
# counts the total number of emitted events
|
||||
defp buffer(Metrics.Counter, nil, at, _measurement) do
|
||||
{1, {at, at, 1}}
|
||||
end
|
||||
|
||||
defp buffer(Metrics.Counter, {started_at, _ended_at, num}, ended_at, _measurement) do
|
||||
{0, {started_at, ended_at, num + 1}}
|
||||
end
|
||||
|
||||
# builds a histogram of selected measurement
|
||||
defp buffer(Metrics.Distribution, nil, at, measurement) do
|
||||
buckets = init_buckets() |> update_buckets(measurement)
|
||||
{1, {at, at, {1, measurement, measurement, measurement, 0, buckets}}}
|
||||
end
|
||||
|
||||
defp buffer(
|
||||
Metrics.Distribution,
|
||||
{started_at, _ended_at, {count, sum, min, max, squared_deviation_sum, buckets}},
|
||||
ended_at,
|
||||
measurement
|
||||
) do
|
||||
count = count + 1
|
||||
sum = sum + measurement
|
||||
min = min(min, measurement)
|
||||
max = max(max, measurement)
|
||||
mean = sum / count
|
||||
deviation = measurement - mean
|
||||
squared_deviation_sum = squared_deviation_sum + :math.pow(deviation, 2)
|
||||
buckets = update_buckets(buckets, measurement)
|
||||
|
||||
{0, {started_at, ended_at, {count, sum, min, max, squared_deviation_sum, buckets}}}
|
||||
end
|
||||
|
||||
# keeps track of the sum of selected measurement
|
||||
defp buffer(Metrics.Sum, nil, at, measurement) do
|
||||
{1, {at, at, measurement}}
|
||||
end
|
||||
|
||||
defp buffer(Metrics.Sum, {started_at, _ended_at, sum}, ended_at, measurement) do
|
||||
{0, {started_at, ended_at, sum + measurement}}
|
||||
end
|
||||
|
||||
# calculating statistics of the selected measurement, like maximum, mean, percentiles etc
|
||||
# since google does not support more than one metric point per 5 seconds we must aggregate them
|
||||
defp buffer(Metrics.Summary, nil, at, measurement) do
|
||||
buckets = init_buckets() |> update_buckets(measurement)
|
||||
{1, {at, at, {1, measurement, measurement, measurement, 0, buckets}}}
|
||||
end
|
||||
|
||||
defp buffer(
|
||||
Metrics.Summary,
|
||||
{started_at, _ended_at, {count, sum, min, max, squared_deviation_sum, buckets}},
|
||||
ended_at,
|
||||
measurement
|
||||
) do
|
||||
count = count + 1
|
||||
sum = sum + measurement
|
||||
min = min(min, measurement)
|
||||
max = max(max, measurement)
|
||||
mean = sum / count
|
||||
deviation = measurement - mean
|
||||
squared_deviation_sum = squared_deviation_sum + :math.pow(deviation, 2)
|
||||
buckets = update_buckets(buckets, measurement)
|
||||
|
||||
{0, {started_at, ended_at, {count, sum, min, max, squared_deviation_sum, buckets}}}
|
||||
end
|
||||
|
||||
# holding the value of the selected measurement from the most recent event
|
||||
defp buffer(Metrics.LastValue, nil, at, measurement) do
|
||||
{1, {at, at, measurement}}
|
||||
end
|
||||
|
||||
defp buffer(Metrics.LastValue, {started_at, _ended_at, _measurement}, ended_at, measurement) do
|
||||
{0, {started_at, ended_at, measurement}}
|
||||
end
|
||||
|
||||
defp init_buckets do
|
||||
# add an underflow bucket
|
||||
%{0 => 0}
|
||||
end
|
||||
|
||||
# We use exponential bucketing for histograms and distributions
|
||||
defp update_buckets(buckets, measurement) do
|
||||
# Determine the nearest power of 2 for the measurement
|
||||
power_of_2 =
|
||||
if measurement <= 0 do
|
||||
0
|
||||
else
|
||||
:math.pow(2, :math.ceil(:math.log2(measurement)))
|
||||
end
|
||||
|
||||
# put measurement into this bucket
|
||||
Map.update(buckets, trunc(power_of_2), 1, &(&1 + 1))
|
||||
end
|
||||
|
||||
defp bucket_counts(buckets) do
|
||||
value_buckets =
|
||||
buckets
|
||||
|> Enum.sort_by(&elem(&1, 0))
|
||||
|> Enum.map(fn {_bucket, count} ->
|
||||
count
|
||||
end)
|
||||
|
||||
# append an overflow bucket which will be empty
|
||||
value_buckets ++ [0]
|
||||
end
|
||||
|
||||
defp flush(project_id, resource, labels, {buffer_size, buffer}) do
|
||||
buffer
|
||||
|> Enum.flat_map(fn {{schema, name, tags, unit}, measurements} ->
|
||||
labels = Map.merge(labels, tags)
|
||||
format_time_series(schema, name, labels, resource, measurements, unit)
|
||||
end)
|
||||
|> Enum.chunk_every(200)
|
||||
|> Enum.each(fn time_series ->
|
||||
case GoogleCloudPlatform.send_metrics(project_id, time_series) do
|
||||
:ok ->
|
||||
:ok
|
||||
|
||||
{:error, reason} ->
|
||||
Logger.warning("Failed to send metrics to Google Cloud Monitoring API",
|
||||
reason: inspect(reason),
|
||||
count: buffer_size
|
||||
)
|
||||
end
|
||||
end)
|
||||
|
||||
{0, %{}}
|
||||
end
|
||||
|
||||
defp format_time_series(Metrics.Counter, name, labels, resource, measurements, unit) do
|
||||
{started_at, ended_at, count} = measurements
|
||||
|
||||
[
|
||||
%{
|
||||
metric: %{
|
||||
type: "custom.googleapis.com/elixir/#{Enum.join(name, "/")}/count",
|
||||
labels: labels
|
||||
},
|
||||
resource: resource,
|
||||
unit: to_string(unit),
|
||||
metricKind: "CUMULATIVE",
|
||||
valueType: "INT64",
|
||||
points: [
|
||||
%{
|
||||
interval: %{"startTime" => started_at, "endTime" => ended_at},
|
||||
value: %{"int64Value" => count}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
end
|
||||
|
||||
# builds a histogram of selected measurement
|
||||
defp format_time_series(Metrics.Distribution, name, labels, resource, measurements, unit) do
|
||||
{started_at, ended_at, {count, sum, _min, _max, squared_deviation_sum, buckets}} =
|
||||
measurements
|
||||
|
||||
[
|
||||
%{
|
||||
metric: %{
|
||||
type: "custom.googleapis.com/elixir/#{Enum.join(name, "/")}/distribution",
|
||||
labels: labels
|
||||
},
|
||||
resource: resource,
|
||||
unit: to_string(unit),
|
||||
metricKind: "CUMULATIVE",
|
||||
valueType: "DISTRIBUTION",
|
||||
points: [
|
||||
%{
|
||||
interval: %{"startTime" => started_at, "endTime" => ended_at},
|
||||
value: %{
|
||||
"distributionValue" => %{
|
||||
"count" => count,
|
||||
"mean" => sum / count,
|
||||
"sumOfSquaredDeviation" => squared_deviation_sum,
|
||||
"bucketOptions" => %{
|
||||
"exponentialBuckets" => %{
|
||||
"numFiniteBuckets" => Enum.count(buckets),
|
||||
"growthFactor" => 2,
|
||||
"scale" => 1
|
||||
}
|
||||
},
|
||||
"bucketCounts" => bucket_counts(buckets)
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
end
|
||||
|
||||
# keeps track of the sum of selected measurement
|
||||
defp format_time_series(Metrics.Sum, name, labels, resource, measurements, unit) do
|
||||
{started_at, ended_at, sum} = measurements
|
||||
|
||||
[
|
||||
%{
|
||||
metric: %{
|
||||
type: "custom.googleapis.com/elixir/#{Enum.join(name, "/")}/sum",
|
||||
labels: labels
|
||||
},
|
||||
resource: resource,
|
||||
unit: to_string(unit),
|
||||
metricKind: "CUMULATIVE",
|
||||
valueType: "DOUBLE",
|
||||
points: [
|
||||
%{
|
||||
interval: %{"startTime" => started_at, "endTime" => ended_at},
|
||||
value: %{"doubleValue" => sum}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
end
|
||||
|
||||
# calculating statistics of the selected measurement, like maximum, mean, percentiles etc
|
||||
defp format_time_series(Metrics.Summary, name, labels, resource, measurements, unit) do
|
||||
{started_at, ended_at, {count, sum, min, max, squared_deviation_sum, buckets}} = measurements
|
||||
|
||||
[
|
||||
%{
|
||||
metric: %{
|
||||
type: "custom.googleapis.com/elixir/#{Enum.join(name, "/")}/summary",
|
||||
labels: labels
|
||||
},
|
||||
resource: resource,
|
||||
unit: to_string(unit),
|
||||
metricKind: "CUMULATIVE",
|
||||
valueType: "DISTRIBUTION",
|
||||
points: [
|
||||
%{
|
||||
interval: %{"startTime" => started_at, "endTime" => ended_at},
|
||||
value: %{
|
||||
"distributionValue" => %{
|
||||
"count" => count,
|
||||
"mean" => sum / count,
|
||||
"sumOfSquaredDeviation" => squared_deviation_sum,
|
||||
"bucketOptions" => %{
|
||||
"exponentialBuckets" => %{
|
||||
"numFiniteBuckets" => Enum.count(buckets),
|
||||
"growthFactor" => 2,
|
||||
"scale" => 1
|
||||
}
|
||||
},
|
||||
"bucketCounts" => bucket_counts(buckets)
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
%{
|
||||
metric: %{
|
||||
type: "custom.googleapis.com/elixir/#{Enum.join(name, "/")}/min",
|
||||
labels: labels
|
||||
},
|
||||
resource: resource,
|
||||
unit: to_string(unit),
|
||||
metricKind: "CUMULATIVE",
|
||||
valueType: "DOUBLE",
|
||||
points: [
|
||||
%{
|
||||
interval: %{"startTime" => started_at, "endTime" => ended_at},
|
||||
value: %{"doubleValue" => min}
|
||||
}
|
||||
]
|
||||
},
|
||||
%{
|
||||
metric: %{
|
||||
type: "custom.googleapis.com/elixir/#{Enum.join(name, "/")}/max",
|
||||
labels: labels
|
||||
},
|
||||
resource: resource,
|
||||
unit: to_string(unit),
|
||||
metricKind: "CUMULATIVE",
|
||||
valueType: "DOUBLE",
|
||||
points: [
|
||||
%{
|
||||
interval: %{"startTime" => started_at, "endTime" => ended_at},
|
||||
value: %{"doubleValue" => max}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
end
|
||||
|
||||
# holding the value of the selected measurement from the most recent event
|
||||
defp format_time_series(Metrics.LastValue, name, labels, resource, measurements, unit) do
|
||||
{started_at, ended_at, last_value} = measurements
|
||||
|
||||
[
|
||||
%{
|
||||
metric: %{
|
||||
type: "custom.googleapis.com/elixir/#{Enum.join(name, "/")}/last_value",
|
||||
labels: labels
|
||||
},
|
||||
resource: resource,
|
||||
unit: to_string(unit),
|
||||
metricKind: "CUMULATIVE",
|
||||
valueType: "DOUBLE",
|
||||
points: [
|
||||
%{
|
||||
interval: %{"startTime" => started_at, "endTime" => ended_at},
|
||||
value: %{"doubleValue" => last_value}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
end
|
||||
|
||||
## Telemetry handlers
|
||||
|
||||
@doc false
|
||||
def handle_event(_event_name, measurements, metadata, {metrics, aggregator_pid}) do
|
||||
now = DateTime.utc_now() |> DateTime.to_iso8601()
|
||||
|
||||
compressed_metrics =
|
||||
for %schema{} = metric <- metrics,
|
||||
keep?(metric, metadata),
|
||||
measurement = extract_measurement(metric, measurements, metadata) do
|
||||
tags = extract_tags(metric, metadata)
|
||||
{schema, metric.name, tags, now, measurement, metric.unit}
|
||||
end
|
||||
|
||||
send(aggregator_pid, {:compressed_metrics, compressed_metrics})
|
||||
|
||||
:ok
|
||||
end
|
||||
|
||||
defp keep?(%{keep: nil}, _metadata), do: true
|
||||
defp keep?(metric, metadata), do: metric.keep.(metadata)
|
||||
|
||||
defp extract_measurement(metric, measurements, metadata) do
|
||||
case metric.measurement do
|
||||
fun when is_function(fun, 2) -> fun.(measurements, metadata)
|
||||
fun when is_function(fun, 1) -> fun.(measurements)
|
||||
key -> measurements[key]
|
||||
end
|
||||
end
|
||||
|
||||
defp extract_tags(metric, metadata) do
|
||||
tag_values = metric.tag_values.(metadata)
|
||||
Map.take(tag_values, metric.tags)
|
||||
end
|
||||
end
|
||||
@@ -98,7 +98,7 @@ defmodule Domain.GoogleCloudPlatformTest do
|
||||
}) == {:error, %Mint.TransportError{reason: :econnrefused}}
|
||||
|
||||
GoogleCloudPlatform.override_endpoint_url(
|
||||
:token_endpoint_url,
|
||||
:metadata_endpoint_url,
|
||||
"http://localhost:#{bypass.port}/"
|
||||
)
|
||||
|
||||
@@ -164,4 +164,71 @@ defmodule Domain.GoogleCloudPlatformTest do
|
||||
assert conn.method == "POST"
|
||||
end
|
||||
end
|
||||
|
||||
describe "send_metrics/3" do
|
||||
test "returns list of nodes in all regions when access token is not set", %{bypass: bypass} do
|
||||
GoogleCloudPlatform.mock_instance_metadata_token_endpoint(bypass)
|
||||
GoogleCloudPlatform.mock_metrics_submit_endpoint(bypass)
|
||||
|
||||
time_series = [
|
||||
%{
|
||||
"metric" => %{
|
||||
"type" => "custom.googleapis.com/my_metric",
|
||||
"labels" => %{
|
||||
"my_label" => "my_value"
|
||||
}
|
||||
},
|
||||
"resource" => %{
|
||||
"type" => "gce_instance",
|
||||
"labels" => %{
|
||||
"project_id" => "firezone-staging",
|
||||
"instance_id" => "1234567890123456789",
|
||||
"zone" => "us-central1-f"
|
||||
}
|
||||
},
|
||||
"points" => [
|
||||
%{
|
||||
"interval" => %{
|
||||
"endTime" => "2024-04-05T10:00:00-04:00"
|
||||
},
|
||||
"value" => %{
|
||||
"doubleValue" => 123.45
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
|
||||
assert send_metrics("firezone-staging", time_series) == :ok
|
||||
|
||||
assert_receive {:bypass_request, conn, body}
|
||||
|
||||
assert conn.request_path == "/v3/projects/firezone-staging/timeSeries"
|
||||
assert body == %{"timeSeries" => time_series}
|
||||
end
|
||||
|
||||
test "returns error when compute endpoint is down", %{bypass: bypass} do
|
||||
GoogleCloudPlatform.mock_instance_metadata_token_endpoint(bypass)
|
||||
|
||||
Bypass.down(bypass)
|
||||
|
||||
GoogleCloudPlatform.override_endpoint_url(
|
||||
:cloud_metrics_endpoint_url,
|
||||
"http://localhost:#{bypass.port}/"
|
||||
)
|
||||
|
||||
assert send_metrics("firezone-staging", %{
|
||||
"cluster_name" => "firezone"
|
||||
}) == {:error, %Mint.TransportError{reason: :econnrefused}}
|
||||
|
||||
GoogleCloudPlatform.override_endpoint_url(
|
||||
:metadata_endpoint_url,
|
||||
"http://localhost:#{bypass.port}/"
|
||||
)
|
||||
|
||||
assert send_metrics("firezone-staging", %{
|
||||
"cluster_name" => "firezone"
|
||||
}) == {:error, %Mint.TransportError{reason: :econnrefused}}
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -0,0 +1,493 @@
|
||||
defmodule Domain.Telemetry.GoogleCloudMetricsReporterTest do
|
||||
use ExUnit.Case, async: true
|
||||
import Domain.Telemetry.GoogleCloudMetricsReporter
|
||||
alias Domain.Mocks.GoogleCloudPlatform
|
||||
|
||||
describe "handle_info/2 for :compressed_metrics" do
|
||||
test "aggregates and delivers Metrics.Counter metrics" do
|
||||
Bypass.open()
|
||||
|> GoogleCloudPlatform.mock_instance_metadata_token_endpoint()
|
||||
|> GoogleCloudPlatform.mock_metrics_submit_endpoint()
|
||||
|
||||
now = DateTime.utc_now()
|
||||
one_minute_ago = DateTime.add(now, -1, :minute)
|
||||
two_minutes_ago = DateTime.add(now, -2, :minute)
|
||||
|
||||
tags = {%{type: "test"}, %{app: "myapp"}}
|
||||
|
||||
assert {:noreply, {[], "proj", ^tags, {buffer_size, buffer}} = state} =
|
||||
handle_info(
|
||||
{:compressed_metrics,
|
||||
[
|
||||
{Telemetry.Metrics.Counter, [:foo], %{"foo" => "bar"}, two_minutes_ago, 1,
|
||||
:request}
|
||||
]},
|
||||
{[], "proj", tags, {0, %{}}}
|
||||
)
|
||||
|
||||
assert buffer_size == 1
|
||||
|
||||
assert buffer == %{
|
||||
{Telemetry.Metrics.Counter, [:foo], %{"foo" => "bar"}, :request} =>
|
||||
{two_minutes_ago, two_minutes_ago, 1}
|
||||
}
|
||||
|
||||
assert {:noreply, {_, _, _, {buffer_size, buffer}} = state} =
|
||||
handle_info(
|
||||
{:compressed_metrics,
|
||||
[
|
||||
{Telemetry.Metrics.Counter, [:foo], %{"foo" => "bar"}, one_minute_ago, 1.1,
|
||||
:request}
|
||||
]},
|
||||
state
|
||||
)
|
||||
|
||||
assert buffer_size == 1
|
||||
|
||||
assert buffer == %{
|
||||
{Telemetry.Metrics.Counter, [:foo], %{"foo" => "bar"}, :request} =>
|
||||
{two_minutes_ago, one_minute_ago, 2}
|
||||
}
|
||||
|
||||
assert {:noreply, {_, _, _, {0, %{}}}} = handle_info(:flush, state)
|
||||
|
||||
assert_receive {:bypass_request, _conn, body}
|
||||
|
||||
assert body == %{
|
||||
"timeSeries" => [
|
||||
%{
|
||||
"metric" => %{
|
||||
"type" => "custom.googleapis.com/elixir/foo/count",
|
||||
"labels" => %{"foo" => "bar", "app" => "myapp"}
|
||||
},
|
||||
"resource" => %{"type" => "test"},
|
||||
"unit" => "request",
|
||||
"metricKind" => "CUMULATIVE",
|
||||
"valueType" => "INT64",
|
||||
"points" => [
|
||||
%{
|
||||
"interval" => %{
|
||||
"endTime" => DateTime.to_iso8601(one_minute_ago),
|
||||
"startTime" => DateTime.to_iso8601(two_minutes_ago)
|
||||
},
|
||||
"value" => %{"int64Value" => 2}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
end
|
||||
|
||||
test "aggregates and delivers Metrics.Distribution metrics" do
|
||||
Bypass.open()
|
||||
|> GoogleCloudPlatform.mock_instance_metadata_token_endpoint()
|
||||
|> GoogleCloudPlatform.mock_metrics_submit_endpoint()
|
||||
|
||||
now = DateTime.utc_now()
|
||||
one_minute_ago = DateTime.add(now, -1, :minute)
|
||||
two_minutes_ago = DateTime.add(now, -2, :minute)
|
||||
|
||||
tags = {%{type: "test"}, %{app: "myapp"}}
|
||||
|
||||
assert {:noreply, {[], "proj", ^tags, {buffer_size, buffer}} = state} =
|
||||
handle_info(
|
||||
{:compressed_metrics,
|
||||
[
|
||||
{Telemetry.Metrics.Distribution, [:foo], %{"foo" => "bar"}, two_minutes_ago,
|
||||
5.5, :request}
|
||||
]},
|
||||
{[], "proj", tags, {0, %{}}}
|
||||
)
|
||||
|
||||
assert buffer_size == 1
|
||||
|
||||
assert buffer == %{
|
||||
{Telemetry.Metrics.Distribution, [:foo], %{"foo" => "bar"}, :request} =>
|
||||
{two_minutes_ago, two_minutes_ago, {1, 5.5, 5.5, 5.5, 0, %{0 => 0, 8 => 1}}}
|
||||
}
|
||||
|
||||
assert {:noreply, {_, _, _, {buffer_size, buffer}} = state} =
|
||||
handle_info(
|
||||
{:compressed_metrics,
|
||||
[
|
||||
{Telemetry.Metrics.Distribution, [:foo], %{"foo" => "bar"}, one_minute_ago,
|
||||
11.3, :request}
|
||||
]},
|
||||
state
|
||||
)
|
||||
|
||||
assert buffer_size == 1
|
||||
|
||||
assert buffer == %{
|
||||
{Telemetry.Metrics.Distribution, [:foo], %{"foo" => "bar"}, :request} =>
|
||||
{two_minutes_ago, one_minute_ago,
|
||||
{2, 16.8, 5.5, 11.3, 8.410000000000002, %{0 => 0, 8 => 1, 16 => 1}}}
|
||||
}
|
||||
|
||||
assert {:noreply, {_, _, _, {buffer_size, buffer}} = state} =
|
||||
handle_info(
|
||||
{:compressed_metrics,
|
||||
[
|
||||
{Telemetry.Metrics.Distribution, [:foo], %{"foo" => "bar"}, one_minute_ago,
|
||||
-1, :request}
|
||||
]},
|
||||
state
|
||||
)
|
||||
|
||||
assert buffer_size == 1
|
||||
|
||||
assert buffer == %{
|
||||
{Telemetry.Metrics.Distribution, [:foo], %{"foo" => "bar"}, :request} =>
|
||||
{two_minutes_ago, one_minute_ago,
|
||||
{3, 15.8, -1, 11.3, 47.681111111111115, %{0 => 1, 8 => 1, 16 => 1}}}
|
||||
}
|
||||
|
||||
assert {:noreply, {_, _, _, {0, %{}}}} = handle_info(:flush, state)
|
||||
|
||||
assert_receive {:bypass_request, _conn, body}
|
||||
|
||||
assert body == %{
|
||||
"timeSeries" => [
|
||||
%{
|
||||
"metric" => %{
|
||||
"type" => "custom.googleapis.com/elixir/foo/distribution",
|
||||
"labels" => %{"foo" => "bar", "app" => "myapp"}
|
||||
},
|
||||
"resource" => %{"type" => "test"},
|
||||
"unit" => "request",
|
||||
"metricKind" => "CUMULATIVE",
|
||||
"valueType" => "DISTRIBUTION",
|
||||
"points" => [
|
||||
%{
|
||||
"interval" => %{
|
||||
"endTime" => DateTime.to_iso8601(one_minute_ago),
|
||||
"startTime" => DateTime.to_iso8601(two_minutes_ago)
|
||||
},
|
||||
"value" => %{
|
||||
"distributionValue" => %{
|
||||
"count" => 3,
|
||||
"mean" => 5.266666666666667,
|
||||
"sumOfSquaredDeviation" => 47.681111111111115,
|
||||
"bucketCounts" => [1, 1, 1, 0],
|
||||
"bucketOptions" => %{
|
||||
"exponentialBuckets" => %{
|
||||
"growthFactor" => 2,
|
||||
"numFiniteBuckets" => 3,
|
||||
"scale" => 1
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
end
|
||||
|
||||
test "aggregates and delivers Metrics.Sum metrics" do
|
||||
Bypass.open()
|
||||
|> GoogleCloudPlatform.mock_instance_metadata_token_endpoint()
|
||||
|> GoogleCloudPlatform.mock_metrics_submit_endpoint()
|
||||
|
||||
now = DateTime.utc_now()
|
||||
one_minute_ago = DateTime.add(now, -1, :minute)
|
||||
two_minutes_ago = DateTime.add(now, -2, :minute)
|
||||
|
||||
tags = {%{type: "test"}, %{app: "myapp"}}
|
||||
|
||||
assert {:noreply, {[], "proj", ^tags, {buffer_size, buffer}} = state} =
|
||||
handle_info(
|
||||
{:compressed_metrics,
|
||||
[
|
||||
{Telemetry.Metrics.Sum, [:foo], %{"foo" => "bar"}, two_minutes_ago, 1,
|
||||
:request}
|
||||
]},
|
||||
{[], "proj", tags, {0, %{}}}
|
||||
)
|
||||
|
||||
assert buffer_size == 1
|
||||
|
||||
assert buffer == %{
|
||||
{Telemetry.Metrics.Sum, [:foo], %{"foo" => "bar"}, :request} =>
|
||||
{two_minutes_ago, two_minutes_ago, 1}
|
||||
}
|
||||
|
||||
assert {:noreply, {_, _, _, {buffer_size, buffer}} = state} =
|
||||
handle_info(
|
||||
{:compressed_metrics,
|
||||
[
|
||||
{Telemetry.Metrics.Sum, [:foo], %{"foo" => "bar"}, one_minute_ago, 2.19,
|
||||
:request}
|
||||
]},
|
||||
state
|
||||
)
|
||||
|
||||
assert buffer_size == 1
|
||||
|
||||
assert buffer == %{
|
||||
{Telemetry.Metrics.Sum, [:foo], %{"foo" => "bar"}, :request} =>
|
||||
{two_minutes_ago, one_minute_ago, 3.19}
|
||||
}
|
||||
|
||||
assert {:noreply, {_, _, _, {0, %{}}}} = handle_info(:flush, state)
|
||||
|
||||
assert_receive {:bypass_request, _conn, body}
|
||||
|
||||
assert body == %{
|
||||
"timeSeries" => [
|
||||
%{
|
||||
"metric" => %{
|
||||
"type" => "custom.googleapis.com/elixir/foo/sum",
|
||||
"labels" => %{"foo" => "bar", "app" => "myapp"}
|
||||
},
|
||||
"resource" => %{"type" => "test"},
|
||||
"unit" => "request",
|
||||
"metricKind" => "CUMULATIVE",
|
||||
"valueType" => "DOUBLE",
|
||||
"points" => [
|
||||
%{
|
||||
"interval" => %{
|
||||
"endTime" => DateTime.to_iso8601(one_minute_ago),
|
||||
"startTime" => DateTime.to_iso8601(two_minutes_ago)
|
||||
},
|
||||
"value" => %{"doubleValue" => 3.19}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
end
|
||||
|
||||
test "aggregates and delivers Metrics.Summary metrics" do
|
||||
Bypass.open()
|
||||
|> GoogleCloudPlatform.mock_instance_metadata_token_endpoint()
|
||||
|> GoogleCloudPlatform.mock_metrics_submit_endpoint()
|
||||
|
||||
now = DateTime.utc_now()
|
||||
one_minute_ago = DateTime.add(now, -1, :minute)
|
||||
two_minutes_ago = DateTime.add(now, -2, :minute)
|
||||
|
||||
tags = {%{type: "test"}, %{app: "myapp"}}
|
||||
|
||||
assert {:noreply, {[], "proj", ^tags, {buffer_size, buffer}} = state} =
|
||||
handle_info(
|
||||
{:compressed_metrics,
|
||||
[
|
||||
{Telemetry.Metrics.Summary, [:foo], %{"foo" => "bar"}, two_minutes_ago, 5.5,
|
||||
:request}
|
||||
]},
|
||||
{[], "proj", tags, {0, %{}}}
|
||||
)
|
||||
|
||||
assert buffer_size == 1
|
||||
|
||||
assert buffer == %{
|
||||
{Telemetry.Metrics.Summary, [:foo], %{"foo" => "bar"}, :request} =>
|
||||
{two_minutes_ago, two_minutes_ago, {1, 5.5, 5.5, 5.5, 0, %{0 => 0, 8 => 1}}}
|
||||
}
|
||||
|
||||
assert {:noreply, {_, _, _, {buffer_size, buffer}} = state} =
|
||||
handle_info(
|
||||
{:compressed_metrics,
|
||||
[
|
||||
{Telemetry.Metrics.Summary, [:foo], %{"foo" => "bar"}, one_minute_ago, 11.3,
|
||||
:request}
|
||||
]},
|
||||
state
|
||||
)
|
||||
|
||||
assert buffer_size == 1
|
||||
|
||||
assert buffer == %{
|
||||
{Telemetry.Metrics.Summary, [:foo], %{"foo" => "bar"}, :request} =>
|
||||
{two_minutes_ago, one_minute_ago,
|
||||
{2, 16.8, 5.5, 11.3, 8.410000000000002, %{0 => 0, 8 => 1, 16 => 1}}}
|
||||
}
|
||||
|
||||
assert {:noreply, {_, _, _, {0, %{}}}} = handle_info(:flush, state)
|
||||
|
||||
assert_receive {:bypass_request, _conn, body}
|
||||
|
||||
assert body == %{
|
||||
"timeSeries" => [
|
||||
%{
|
||||
"metric" => %{
|
||||
"type" => "custom.googleapis.com/elixir/foo/summary",
|
||||
"labels" => %{"foo" => "bar", "app" => "myapp"}
|
||||
},
|
||||
"resource" => %{"type" => "test"},
|
||||
"unit" => "request",
|
||||
"metricKind" => "CUMULATIVE",
|
||||
"valueType" => "DISTRIBUTION",
|
||||
"points" => [
|
||||
%{
|
||||
"interval" => %{
|
||||
"endTime" => DateTime.to_iso8601(one_minute_ago),
|
||||
"startTime" => DateTime.to_iso8601(two_minutes_ago)
|
||||
},
|
||||
"value" => %{
|
||||
"distributionValue" => %{
|
||||
"count" => 2,
|
||||
"mean" => 8.4,
|
||||
"sumOfSquaredDeviation" => 8.410000000000002,
|
||||
"bucketCounts" => [0, 1, 1, 0],
|
||||
"bucketOptions" => %{
|
||||
"exponentialBuckets" => %{
|
||||
"growthFactor" => 2,
|
||||
"numFiniteBuckets" => 3,
|
||||
"scale" => 1
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
%{
|
||||
"metric" => %{
|
||||
"labels" => %{"app" => "myapp", "foo" => "bar"},
|
||||
"type" => "custom.googleapis.com/elixir/foo/min"
|
||||
},
|
||||
"metricKind" => "CUMULATIVE",
|
||||
"points" => [
|
||||
%{
|
||||
"interval" => %{
|
||||
"endTime" => DateTime.to_iso8601(one_minute_ago),
|
||||
"startTime" => DateTime.to_iso8601(two_minutes_ago)
|
||||
},
|
||||
"value" => %{"doubleValue" => 5.5}
|
||||
}
|
||||
],
|
||||
"resource" => %{"type" => "test"},
|
||||
"unit" => "request",
|
||||
"valueType" => "DOUBLE"
|
||||
},
|
||||
%{
|
||||
"metric" => %{
|
||||
"labels" => %{"app" => "myapp", "foo" => "bar"},
|
||||
"type" => "custom.googleapis.com/elixir/foo/max"
|
||||
},
|
||||
"metricKind" => "CUMULATIVE",
|
||||
"points" => [
|
||||
%{
|
||||
"interval" => %{
|
||||
"endTime" => DateTime.to_iso8601(one_minute_ago),
|
||||
"startTime" => DateTime.to_iso8601(two_minutes_ago)
|
||||
},
|
||||
"value" => %{"doubleValue" => 11.3}
|
||||
}
|
||||
],
|
||||
"resource" => %{"type" => "test"},
|
||||
"unit" => "request",
|
||||
"valueType" => "DOUBLE"
|
||||
}
|
||||
]
|
||||
}
|
||||
end
|
||||
|
||||
test "aggregates and delivers Metrics.LastValue metrics" do
|
||||
Bypass.open()
|
||||
|> GoogleCloudPlatform.mock_instance_metadata_token_endpoint()
|
||||
|> GoogleCloudPlatform.mock_metrics_submit_endpoint()
|
||||
|
||||
now = DateTime.utc_now()
|
||||
one_minute_ago = DateTime.add(now, -1, :minute)
|
||||
two_minutes_ago = DateTime.add(now, -2, :minute)
|
||||
|
||||
tags = {%{type: "test"}, %{app: "myapp"}}
|
||||
|
||||
assert {:noreply, {[], "proj", ^tags, {buffer_size, buffer}} = state} =
|
||||
handle_info(
|
||||
{:compressed_metrics,
|
||||
[
|
||||
{Telemetry.Metrics.LastValue, [:foo], %{"foo" => "bar"}, two_minutes_ago, 1,
|
||||
:request}
|
||||
]},
|
||||
{[], "proj", tags, {0, %{}}}
|
||||
)
|
||||
|
||||
assert buffer_size == 1
|
||||
|
||||
assert buffer == %{
|
||||
{Telemetry.Metrics.LastValue, [:foo], %{"foo" => "bar"}, :request} =>
|
||||
{two_minutes_ago, two_minutes_ago, 1}
|
||||
}
|
||||
|
||||
assert {:noreply, {_, _, _, {buffer_size, buffer}} = state} =
|
||||
handle_info(
|
||||
{:compressed_metrics,
|
||||
[
|
||||
{Telemetry.Metrics.LastValue, [:foo], %{"foo" => "bar"}, one_minute_ago, -1,
|
||||
:request}
|
||||
]},
|
||||
state
|
||||
)
|
||||
|
||||
assert buffer_size == 1
|
||||
|
||||
assert buffer == %{
|
||||
{Telemetry.Metrics.LastValue, [:foo], %{"foo" => "bar"}, :request} =>
|
||||
{two_minutes_ago, one_minute_ago, -1}
|
||||
}
|
||||
|
||||
assert {:noreply, {_, _, _, {0, %{}}}} = handle_info(:flush, state)
|
||||
|
||||
assert_receive {:bypass_request, _conn, body}
|
||||
|
||||
assert body == %{
|
||||
"timeSeries" => [
|
||||
%{
|
||||
"metric" => %{
|
||||
"type" => "custom.googleapis.com/elixir/foo/last_value",
|
||||
"labels" => %{"foo" => "bar", "app" => "myapp"}
|
||||
},
|
||||
"resource" => %{"type" => "test"},
|
||||
"unit" => "request",
|
||||
"metricKind" => "CUMULATIVE",
|
||||
"valueType" => "DOUBLE",
|
||||
"points" => [
|
||||
%{
|
||||
"interval" => %{
|
||||
"endTime" => DateTime.to_iso8601(one_minute_ago),
|
||||
"startTime" => DateTime.to_iso8601(two_minutes_ago)
|
||||
},
|
||||
"value" => %{"doubleValue" => -1}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
end
|
||||
|
||||
test "submits the metrics to Google Cloud when buffer is filled" do
|
||||
Bypass.open()
|
||||
|> GoogleCloudPlatform.mock_instance_metadata_token_endpoint()
|
||||
|> GoogleCloudPlatform.mock_metrics_submit_endpoint()
|
||||
|
||||
now = DateTime.utc_now()
|
||||
tags = {%{type: "test"}, %{app: "myapp"}}
|
||||
|
||||
{_, _, _, {buffer_size, buffer}} =
|
||||
Enum.reduce(1..101, {[], "proj", tags, {0, %{}}}, fn i, state ->
|
||||
{:noreply, state} =
|
||||
handle_info(
|
||||
{:compressed_metrics,
|
||||
[{Telemetry.Metrics.Counter, [:foo, i], %{}, now, i, :request}]},
|
||||
state
|
||||
)
|
||||
|
||||
state
|
||||
end)
|
||||
|
||||
assert buffer_size == 1
|
||||
|
||||
assert buffer == %{
|
||||
{Telemetry.Metrics.Counter, [:foo, 101], %{}, :request} => {now, now, 1}
|
||||
}
|
||||
|
||||
assert_receive {:bypass_request, _conn, %{"timeSeries" => time_series}}
|
||||
assert length(time_series) == 100
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -5,8 +5,40 @@ defmodule Domain.Mocks.GoogleCloudPlatform do
|
||||
Domain.Config.put_env_override(:domain, Domain.GoogleCloudPlatform, config)
|
||||
end
|
||||
|
||||
def mock_instance_metadata_id_endpoint(bypass, id \\ Ecto.UUID.generate()) do
|
||||
token_endpoint_path = "/instance/id"
|
||||
|
||||
test_pid = self()
|
||||
|
||||
Bypass.stub(bypass, "GET", token_endpoint_path, fn conn ->
|
||||
conn = Plug.Conn.fetch_query_params(conn)
|
||||
send(test_pid, {:bypass_request, conn})
|
||||
Plug.Conn.send_resp(conn, 200, id)
|
||||
end)
|
||||
|
||||
override_endpoint_url(:metadata_endpoint_url, "http://localhost:#{bypass.port}/")
|
||||
|
||||
bypass
|
||||
end
|
||||
|
||||
def mock_instance_metadata_zone_endpoint(bypass, zone \\ "projects/001001/zones/us-east-1") do
|
||||
token_endpoint_path = "/instance/zone"
|
||||
|
||||
test_pid = self()
|
||||
|
||||
Bypass.stub(bypass, "GET", token_endpoint_path, fn conn ->
|
||||
conn = Plug.Conn.fetch_query_params(conn)
|
||||
send(test_pid, {:bypass_request, conn})
|
||||
Plug.Conn.send_resp(conn, 200, zone)
|
||||
end)
|
||||
|
||||
override_endpoint_url(:metadata_endpoint_url, "http://localhost:#{bypass.port}/")
|
||||
|
||||
bypass
|
||||
end
|
||||
|
||||
def mock_instance_metadata_token_endpoint(bypass, resp \\ nil) do
|
||||
token_endpoint_path = "computeMetadata/v1/instance/service-accounts/default/token"
|
||||
token_endpoint_path = "/instance/service-accounts/default/token"
|
||||
|
||||
resp =
|
||||
resp ||
|
||||
@@ -24,10 +56,7 @@ defmodule Domain.Mocks.GoogleCloudPlatform do
|
||||
Plug.Conn.send_resp(conn, 200, Jason.encode!(resp))
|
||||
end)
|
||||
|
||||
override_endpoint_url(
|
||||
:token_endpoint_url,
|
||||
"http://localhost:#{bypass.port}/#{token_endpoint_path}"
|
||||
)
|
||||
override_endpoint_url(:metadata_endpoint_url, "http://localhost:#{bypass.port}/")
|
||||
|
||||
bypass
|
||||
end
|
||||
@@ -180,4 +209,25 @@ defmodule Domain.Mocks.GoogleCloudPlatform do
|
||||
|
||||
bypass
|
||||
end
|
||||
|
||||
def mock_metrics_submit_endpoint(bypass) do
|
||||
metrics_endpoint_path = "v3/projects/firezone-staging/timeSeries"
|
||||
|
||||
test_pid = self()
|
||||
|
||||
Bypass.expect(bypass, "POST", metrics_endpoint_path, fn conn ->
|
||||
conn = Plug.Conn.fetch_query_params(conn)
|
||||
{:ok, binary, conn} = Plug.Conn.read_body(conn)
|
||||
body = Jason.decode!(binary)
|
||||
send(test_pid, {:bypass_request, conn, body})
|
||||
Plug.Conn.send_resp(conn, 200, Jason.encode!(%{}))
|
||||
end)
|
||||
|
||||
override_endpoint_url(
|
||||
:cloud_metrics_endpoint_url,
|
||||
"http://localhost:#{bypass.port}/#{metrics_endpoint_path}"
|
||||
)
|
||||
|
||||
bypass
|
||||
end
|
||||
end
|
||||
|
||||
@@ -9,8 +9,7 @@ defmodule Web.Application do
|
||||
|
||||
children = [
|
||||
Web.Mailer,
|
||||
Web.Endpoint,
|
||||
Web.Telemetry
|
||||
Web.Endpoint
|
||||
]
|
||||
|
||||
opts = [strategy: :one_for_one, name: Web.Supervisor]
|
||||
|
||||
@@ -1,63 +0,0 @@
|
||||
defmodule Web.Telemetry do
|
||||
use Supervisor
|
||||
import Telemetry.Metrics
|
||||
|
||||
def start_link(arg) do
|
||||
Supervisor.start_link(__MODULE__, arg, name: __MODULE__)
|
||||
end
|
||||
|
||||
@impl true
|
||||
def init(_arg) do
|
||||
children = [
|
||||
# Telemetry poller will execute the given period measurements
|
||||
# every 10_000ms. Learn more here: https://hexdocs.pm/telemetry_metrics
|
||||
{:telemetry_poller, measurements: periodic_measurements(), period: 10_000}
|
||||
# Add reporters as children of your supervision tree.
|
||||
# {Telemetry.Metrics.ConsoleReporter, metrics: metrics()}
|
||||
]
|
||||
|
||||
Supervisor.init(children, strategy: :one_for_one)
|
||||
end
|
||||
|
||||
def metrics do
|
||||
[
|
||||
# Phoenix Metrics
|
||||
summary("phoenix.endpoint.start.system_time",
|
||||
unit: {:native, :millisecond}
|
||||
),
|
||||
summary("phoenix.endpoint.stop.duration",
|
||||
unit: {:native, :millisecond}
|
||||
),
|
||||
summary("phoenix.router_dispatch.start.system_time",
|
||||
tags: [:route],
|
||||
unit: {:native, :millisecond}
|
||||
),
|
||||
summary("phoenix.router_dispatch.exception.duration",
|
||||
tags: [:route],
|
||||
unit: {:native, :millisecond}
|
||||
),
|
||||
summary("phoenix.router_dispatch.stop.duration",
|
||||
tags: [:route],
|
||||
unit: {:native, :millisecond}
|
||||
),
|
||||
summary("phoenix.socket_connected.duration",
|
||||
unit: {:native, :millisecond}
|
||||
),
|
||||
summary("phoenix.channel_join.duration",
|
||||
unit: {:native, :millisecond}
|
||||
),
|
||||
summary("phoenix.channel_handled_in.duration",
|
||||
tags: [:event],
|
||||
unit: {:native, :millisecond}
|
||||
)
|
||||
]
|
||||
end
|
||||
|
||||
defp periodic_measurements do
|
||||
[
|
||||
# A module, function and arguments to be invoked periodically.
|
||||
# This function must call :telemetry.execute/3 and a metric must be added above.
|
||||
# {Web, :count_users, []}
|
||||
]
|
||||
end
|
||||
end
|
||||
@@ -37,9 +37,7 @@ config :domain, Domain.Gateways,
|
||||
gateway_ipv4_masquerade: true,
|
||||
gateway_ipv6_masquerade: true
|
||||
|
||||
config :domain, Domain.Telemetry,
|
||||
enabled: true,
|
||||
id: "firezone-dev"
|
||||
config :domain, Domain.Telemetry, metrics_reporter: nil
|
||||
|
||||
config :domain, Domain.Auth.Adapters.GoogleWorkspace.APIClient,
|
||||
endpoint: "https://admin.googleapis.com",
|
||||
@@ -64,10 +62,11 @@ config :domain, Domain.Billing,
|
||||
config :domain, platform_adapter: nil
|
||||
|
||||
config :domain, Domain.GoogleCloudPlatform,
|
||||
token_endpoint_url:
|
||||
"http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token",
|
||||
metadata_endpoint_url: "http://metadata.google.internal/computeMetadata/v1",
|
||||
aggregated_list_endpoint_url:
|
||||
"https://compute.googleapis.com/compute/v1/projects/${project_id}/aggregated/instances",
|
||||
cloud_metrics_endpoint_url:
|
||||
"https://monitoring.googleapis.com/v3/projects/${project_id}/timeSeries",
|
||||
sign_endpoint_url: "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/",
|
||||
cloud_storage_url: "https://storage.googleapis.com"
|
||||
|
||||
|
||||
@@ -173,6 +173,12 @@ if config_env() == :prod do
|
||||
otlp_endpoint: System.get_env("OTLP_ENDPOINT")
|
||||
end
|
||||
|
||||
config :domain, Domain.Telemetry, metrics_reporter: compile_config!(:telemetry_metrics_reporter)
|
||||
|
||||
if telemetry_metrics_reporter = compile_config!(:telemetry_metrics_reporter) do
|
||||
config :domain, telemetry_metrics_reporter, compile_config!(:telemetry_metrics_reporter_opts)
|
||||
end
|
||||
|
||||
config :domain,
|
||||
http_client_ssl_opts: compile_config!(:http_client_ssl_opts)
|
||||
|
||||
|
||||
@@ -24,9 +24,9 @@ config :domain, Domain.ConnectivityChecks, enabled: false
|
||||
|
||||
config :domain, platform_adapter: Domain.GoogleCloudPlatform
|
||||
|
||||
config :domain, Domain.GoogleCloudPlatform,
|
||||
project_id: "fz-test",
|
||||
service_account_email: "foo@iam.example.com"
|
||||
config :domain, Domain.GoogleCloudPlatform, service_account_email: "foo@iam.example.com"
|
||||
|
||||
config :domain, Domain.Telemetry.GoogleCloudMetricsReporter, project_id: "fz-test"
|
||||
|
||||
###############################
|
||||
##### Web #####################
|
||||
|
||||
@@ -26,9 +26,9 @@ export RELEASE_DISTRIBUTION=name
|
||||
#
|
||||
# Having a valid DNS record is important to remotely connect to a running Erlang node.
|
||||
if [[ "${RELEASE_HOST_DISCOVERY_METHOD}" == "gce_metadata" ]]; then
|
||||
GCP_PROJECT_ID=$(curl "http://metadata.google.internal/computeMetadata/v1/project/project-id" -H "Metadata-Flavor: Google" -s)
|
||||
GCP_INSTANCE_NAME=$(curl "http://metadata.google.internal/computeMetadata/v1/instance/name" -H "Metadata-Flavor: Google" -s)
|
||||
GCP_INSTANCE_ZONE=$(curl "http://metadata.google.internal/computeMetadata/v1/instance/zone" -H "Metadata-Flavor: Google" -s | sed 's:.*/::')
|
||||
export GCP_PROJECT_ID=$(curl "http://metadata.google.internal/computeMetadata/v1/project/project-id" -H "Metadata-Flavor: Google" -s)
|
||||
export GCP_INSTANCE_NAME=$(curl "http://metadata.google.internal/computeMetadata/v1/instance/name" -H "Metadata-Flavor: Google" -s)
|
||||
export GCP_INSTANCE_ZONE=$(curl "http://metadata.google.internal/computeMetadata/v1/instance/zone" -H "Metadata-Flavor: Google" -s | sed 's:.*/::')
|
||||
RELEASE_HOSTNAME="$GCP_INSTANCE_NAME.$GCP_INSTANCE_ZONE.c.${GCP_PROJECT_ID}.internal"
|
||||
else
|
||||
RELEASE_HOSTNAME=${RELEASE_HOSTNAME:-127.0.0.1}
|
||||
|
||||
@@ -37,6 +37,16 @@ locals {
|
||||
name = "OTEL_RESOURCE_ATTRIBUTES"
|
||||
value = "application.name=${local.application_name}"
|
||||
},
|
||||
{
|
||||
name = "TELEMETRY_METRICS_REPORTER"
|
||||
value = "Domain.Telemetry.GoogleCloudMetricsReporter"
|
||||
},
|
||||
{
|
||||
name = "TELEMETRY_METRICS_REPORTER_OPTS"
|
||||
value = jsonencode({
|
||||
project_id = var.project_id
|
||||
})
|
||||
},
|
||||
{
|
||||
name = "PLATFORM_ADAPTER"
|
||||
value = "Elixir.Domain.GoogleCloudPlatform"
|
||||
|
||||
@@ -53,8 +53,8 @@ write_files:
|
||||
[Service]
|
||||
TimeoutStartSec=0
|
||||
Restart=always
|
||||
ExecStartPre=/usr/bin/docker pull otel/opentelemetry-collector-contrib:0.90.1
|
||||
ExecStart=/usr/bin/docker run --rm -u 2000 --name=otel-collector --network host --volume /etc/otelcol-contrib/:/etc/otelcol-contrib/ otel/opentelemetry-collector-contrib:0.90.1
|
||||
ExecStartPre=/usr/bin/docker pull otel/opentelemetry-collector-contrib:0.97.0
|
||||
ExecStart=/usr/bin/docker run --rm -u 2000 --name=otel-collector --network host --volume /etc/otelcol-contrib/:/etc/otelcol-contrib/ otel/opentelemetry-collector-contrib:0.97.0
|
||||
ExecStop=/usr/bin/docker stop otel-collector
|
||||
ExecStopPost=/usr/bin/docker rm otel-collector
|
||||
|
||||
|
||||
@@ -83,8 +83,8 @@ write_files:
|
||||
[Service]
|
||||
TimeoutStartSec=0
|
||||
Restart=always
|
||||
ExecStartPre=/usr/bin/docker pull otel/opentelemetry-collector-contrib:0.90.1
|
||||
ExecStart=/usr/bin/docker run --rm -u 2000 --name=otel-collector --network host --volume /etc/otelcol-contrib/:/etc/otelcol-contrib/ otel/opentelemetry-collector-contrib:0.90.1
|
||||
ExecStartPre=/usr/bin/docker pull otel/opentelemetry-collector-contrib:0.97.0
|
||||
ExecStart=/usr/bin/docker run --rm -u 2000 --name=otel-collector --network host --volume /etc/otelcol-contrib/:/etc/otelcol-contrib/ otel/opentelemetry-collector-contrib:0.97.0
|
||||
ExecStop=/usr/bin/docker stop otel-collector
|
||||
ExecStopPost=/usr/bin/docker rm otel-collector
|
||||
|
||||
|
||||
@@ -83,8 +83,8 @@ write_files:
|
||||
[Service]
|
||||
TimeoutStartSec=0
|
||||
Restart=always
|
||||
ExecStartPre=/usr/bin/docker pull otel/opentelemetry-collector-contrib:0.90.1
|
||||
ExecStart=/usr/bin/docker run --rm -u 2000 --name=otel-collector --network host --volume /etc/otelcol-contrib/:/etc/otelcol-contrib/ otel/opentelemetry-collector-contrib:0.90.1
|
||||
ExecStartPre=/usr/bin/docker pull otel/opentelemetry-collector-contrib:0.97.0
|
||||
ExecStart=/usr/bin/docker run --rm -u 2000 --name=otel-collector --network host --volume /etc/otelcol-contrib/:/etc/otelcol-contrib/ otel/opentelemetry-collector-contrib:0.97.0
|
||||
ExecStop=/usr/bin/docker stop otel-collector
|
||||
ExecStopPost=/usr/bin/docker rm otel-collector
|
||||
|
||||
|
||||
Reference in New Issue
Block a user