From b0590fa532ad20d994ea87eb5f61486fefcfe42e Mon Sep 17 00:00:00 2001 From: Andrew Dryga Date: Wed, 10 Apr 2024 13:04:59 -0600 Subject: [PATCH] chore(portal): Send metrics to Google Cloud Monitoring (#4564) --- elixir/apps/api/lib/api/application.ex | 1 - elixir/apps/api/lib/api/telemetry.ex | 71 --- .../cluster/google_compute_labels_strategy.ex | 8 +- .../domain/lib/domain/config/definitions.ex | 26 +- .../lib/domain/google_cloud_platform.ex | 87 +++- elixir/apps/domain/lib/domain/relays.ex | 8 + elixir/apps/domain/lib/domain/telemetry.ex | 57 +- .../google_cloud_metrics_reporter.ex | 458 ++++++++++++++++ .../domain/google_cloud_platform_test.exs | 69 ++- .../google_cloud_metrics_reporter_test.exs | 493 ++++++++++++++++++ .../support/mocks/google_cloud_platform.ex | 60 ++- elixir/apps/web/lib/web/application.ex | 3 +- elixir/apps/web/lib/web/telemetry.ex | 63 --- elixir/config/config.exs | 9 +- elixir/config/runtime.exs | 6 + elixir/config/test.exs | 6 +- elixir/rel/env.sh.eex | 6 +- .../modules/google-cloud/apps/elixir/main.tf | 10 + .../apps/elixir/templates/cloud-init.yaml | 4 +- .../templates/cloud-init.yaml | 4 +- .../apps/relay/templates/cloud-init.yaml | 4 +- 21 files changed, 1280 insertions(+), 173 deletions(-) delete mode 100644 elixir/apps/api/lib/api/telemetry.ex create mode 100644 elixir/apps/domain/lib/domain/telemetry/google_cloud_metrics_reporter.ex create mode 100644 elixir/apps/domain/test/domain/telemetry/google_cloud_metrics_reporter_test.exs delete mode 100644 elixir/apps/web/lib/web/telemetry.ex diff --git a/elixir/apps/api/lib/api/application.ex b/elixir/apps/api/lib/api/application.ex index 18b1a7ce4..e83082974 100644 --- a/elixir/apps/api/lib/api/application.ex +++ b/elixir/apps/api/lib/api/application.ex @@ -7,7 +7,6 @@ defmodule API.Application do _ = OpentelemetryPhoenix.setup(adapter: :cowboy2) children = [ - API.Telemetry, API.Endpoint ] diff --git a/elixir/apps/api/lib/api/telemetry.ex b/elixir/apps/api/lib/api/telemetry.ex deleted file mode 100644 index c8c453407..000000000 --- a/elixir/apps/api/lib/api/telemetry.ex +++ /dev/null @@ -1,71 +0,0 @@ -defmodule API.Telemetry do - use Supervisor - import Telemetry.Metrics - - def start_link(arg) do - Supervisor.start_link(__MODULE__, arg, name: __MODULE__) - end - - @impl true - def init(_arg) do - children = [ - # Telemetry poller will execute the given period measurements - # every 10_000ms. Learn more here: https://hexdocs.pm/telemetry_metrics - {:telemetry_poller, measurements: periodic_measurements(), period: 10_000} - # Add reporters as children of your supervision tree. - # {Telemetry.Metrics.ConsoleReporter, metrics: metrics()} - ] - - Supervisor.init(children, strategy: :one_for_one) - end - - def metrics do - [ - # Phoenix Metrics - summary("phoenix.endpoint.start.system_time", - unit: {:native, :millisecond} - ), - summary("phoenix.endpoint.stop.duration", - unit: {:native, :millisecond} - ), - summary("phoenix.router_dispatch.start.system_time", - tags: [:route], - unit: {:native, :millisecond} - ), - summary("phoenix.router_dispatch.exception.duration", - tags: [:route], - unit: {:native, :millisecond} - ), - summary("phoenix.router_dispatch.stop.duration", - tags: [:route], - unit: {:native, :millisecond} - ), - summary("phoenix.socket_connected.duration", - unit: {:native, :millisecond} - ), - summary("phoenix.channel_join.duration", - unit: {:native, :millisecond} - ), - summary("phoenix.channel_handled_in.duration", - tags: [:event], - unit: {:native, :millisecond} - ), - - # VM Metrics - summary("vm.memory.total", unit: {:byte, :kilobyte}), - summary("vm.total_run_queue_lengths.total"), - summary("vm.total_run_queue_lengths.cpu"), - summary("vm.total_run_queue_lengths.io") - ] - end - - # XXX: We might want to contribute initial_timeout to telemetry_poller - # so that we can remove out custom gen_servers that do long polling. - defp periodic_measurements do - [ - # A module, function and arguments to be invoked periodically. - # This function must call :telemetry.execute/3 and a metric must be added above. - # {API, :count_users, []} - ] - end -end diff --git a/elixir/apps/domain/lib/domain/cluster/google_compute_labels_strategy.ex b/elixir/apps/domain/lib/domain/cluster/google_compute_labels_strategy.ex index d315fa906..234e4ed31 100644 --- a/elixir/apps/domain/lib/domain/cluster/google_compute_labels_strategy.ex +++ b/elixir/apps/domain/lib/domain/cluster/google_compute_labels_strategy.ex @@ -167,7 +167,13 @@ defmodule Domain.Cluster.GoogleComputeLabelsStrategy do node_name end) - Logger.debug("Found #{length(nodes)}", module: __MODULE__, nodes: Enum.join(nodes, ", ")) + count = length(nodes) + + :telemetry.execute([:domain, :cluster], %{ + discovered_nodes_count: count + }) + + Logger.debug("Found #{count}", module: __MODULE__, nodes: Enum.join(nodes, ", ")) {:ok, nodes} end diff --git a/elixir/apps/domain/lib/domain/config/definitions.ex b/elixir/apps/domain/lib/domain/config/definitions.ex index 936e7916e..7b0f3ea7e 100644 --- a/elixir/apps/domain/lib/domain/config/definitions.ex +++ b/elixir/apps/domain/lib/domain/config/definitions.ex @@ -120,7 +120,8 @@ defmodule Domain.Config.Definitions do {"Instrumentation", [ :instrumentation_client_logs_enabled, - :instrumentation_client_logs_bucket + :instrumentation_client_logs_bucket, + :telemetry_metrics_reporter ]} ] end @@ -453,6 +454,29 @@ defmodule Domain.Config.Definitions do """ defconfig(:instrumentation_client_logs_bucket, :string, default: "logs") + @doc """ + Reporter to use for sending metrics to the telemetry backend. + """ + defconfig( + :telemetry_metrics_reporter, + {:parameterized, Ecto.Enum, + Ecto.Enum.init( + values: [ + Telemetry.Metrics.ConsoleReporter, + Domain.Telemetry.GoogleCloudMetricsReporter + ] + )}, + default: nil + ) + + @doc """ + Configuration for the telemetry metrics reporter. + """ + defconfig(:telemetry_metrics_reporter_opts, :map, + default: %{}, + dump: &Dumper.keyword/1 + ) + ############################################## ## Gateways ############################################## diff --git a/elixir/apps/domain/lib/domain/google_cloud_platform.ex b/elixir/apps/domain/lib/domain/google_cloud_platform.ex index 375ef0844..b346db704 100644 --- a/elixir/apps/domain/lib/domain/google_cloud_platform.ex +++ b/elixir/apps/domain/lib/domain/google_cloud_platform.ex @@ -36,8 +36,12 @@ defmodule Domain.GoogleCloudPlatform do # are limited by the service account attached to it. def fetch_access_token do config = fetch_config!() - token_endpoint_url = Keyword.fetch!(config, :token_endpoint_url) - request = Finch.build(:get, token_endpoint_url, [{"Metadata-Flavor", "Google"}]) + metadata_endpoint_url = Keyword.fetch!(config, :metadata_endpoint_url) + + request = + Finch.build(:get, metadata_endpoint_url <> "/instance/service-accounts/default/token", [ + {"Metadata-Flavor", "Google"} + ]) case Finch.request(request, __MODULE__.Finch) do {:ok, %Finch.Response{status: 200, body: response}} -> @@ -55,6 +59,52 @@ defmodule Domain.GoogleCloudPlatform do end end + def fetch_instance_id do + config = fetch_config!() + metadata_endpoint_url = Keyword.fetch!(config, :metadata_endpoint_url) + + request = + Finch.build(:get, metadata_endpoint_url <> "/instance/id", [ + {"Metadata-Flavor", "Google"} + ]) + + case Finch.request(request, __MODULE__.Finch) do + {:ok, %Finch.Response{status: 200, body: instance_id}} -> + {:ok, instance_id} + + {:ok, response} -> + Logger.error("Can't fetch instance ID", reason: inspect(response)) + {:error, {response.status, response.body}} + + {:error, reason} -> + Logger.error("Can't fetch instance ID", reason: inspect(reason)) + {:error, reason} + end + end + + def fetch_instance_zone do + config = fetch_config!() + metadata_endpoint_url = Keyword.fetch!(config, :metadata_endpoint_url) + + request = + Finch.build(:get, metadata_endpoint_url <> "/instance/zone", [ + {"Metadata-Flavor", "Google"} + ]) + + case Finch.request(request, __MODULE__.Finch) do + {:ok, %Finch.Response{status: 200, body: zone}} -> + {:ok, zone |> String.split("/") |> List.last()} + + {:ok, response} -> + Logger.error("Can't fetch instance zone", reason: inspect(response)) + {:error, {response.status, response.body}} + + {:error, reason} -> + Logger.error("Can't fetch instance zone", reason: inspect(reason)) + {:error, reason} + end + end + def list_google_cloud_instances_by_labels(project_id, label_values) do aggregated_list_endpoint_url = fetch_config!() @@ -132,6 +182,39 @@ defmodule Domain.GoogleCloudPlatform do end end + @doc """ + Sends metrics to Google Cloud Monitoring API. + """ + def send_metrics(project_id, time_series) do + cloud_metrics_endpoint_url = + fetch_config!() + |> Keyword.fetch!(:cloud_metrics_endpoint_url) + |> String.replace("${project_id}", project_id) + + body = Jason.encode!(%{"timeSeries" => time_series}) + + with {:ok, access_token} <- fetch_and_cache_access_token(), + request = + Finch.build( + :post, + cloud_metrics_endpoint_url, + [ + {"Content-Type", "application/json"}, + {"Authorization", "Bearer #{access_token}"} + ], + body + ), + {:ok, %{status: 200}} <- Finch.request(request, __MODULE__.Finch) do + :ok + else + {:ok, %{status: status, body: body}} -> + {:error, {status, body}} + + {:error, reason} -> + {:error, reason} + end + end + defp fetch_config! do Domain.Config.fetch_env!(:domain, __MODULE__) end diff --git a/elixir/apps/domain/lib/domain/relays.ex b/elixir/apps/domain/lib/domain/relays.ex index 9bcecb0a8..e44b4d469 100644 --- a/elixir/apps/domain/lib/domain/relays.ex +++ b/elixir/apps/domain/lib/domain/relays.ex @@ -16,6 +16,14 @@ defmodule Domain.Relays do Supervisor.init(children, strategy: :one_for_one) end + def send_metrics do + count = global_groups_presence_topic() |> Presence.list() |> Enum.count() + + :telemetry.execute([:domain, :relays], %{ + online_relays_count: count + }) + end + def fetch_group_by_id(id, %Auth.Subject{} = subject, opts \\ []) do with :ok <- Auth.ensure_has_permissions(subject, Authorizer.manage_relays_permission()), true <- Repo.valid_uuid?(id) do diff --git a/elixir/apps/domain/lib/domain/telemetry.ex b/elixir/apps/domain/lib/domain/telemetry.ex index db57784f2..277caed69 100644 --- a/elixir/apps/domain/lib/domain/telemetry.ex +++ b/elixir/apps/domain/lib/domain/telemetry.ex @@ -9,6 +9,8 @@ defmodule Domain.Telemetry do @impl true def init(_arg) do + config = Domain.Config.fetch_env!(:domain, __MODULE__) + children = [ # We start a /healthz endpoint that is used for liveness probes {Bandit, plug: Telemetry.HealthzPlug, scheme: :http, port: 4000}, @@ -16,35 +18,72 @@ defmodule Domain.Telemetry do # Telemetry poller will execute the given period measurements # every 10_000ms. Learn more here: https://hexdocs.pm/telemetry_metrics {:telemetry_poller, measurements: periodic_measurements(), period: 10_000} - # Add reporters as children of your supervision tree. - # {Telemetry.Metrics.ConsoleReporter, metrics: metrics()} ] - Supervisor.init(children, strategy: :one_for_one) + reporter_children = + if metrics_reporter = Keyword.get(config, :metrics_reporter) do + [{metrics_reporter, metrics: metrics()}] + else + [] + end + + Supervisor.init(children ++ reporter_children, strategy: :one_for_one) end def metrics do [ # Database Metrics - summary("domain.repo.query.total_time", unit: {:native, :millisecond}), + distribution("domain.repo.query.total_time", unit: {:native, :millisecond}), summary("domain.repo.query.decode_time", unit: {:native, :millisecond}), - summary("domain.repo.query.query_time", unit: {:native, :millisecond}), + summary("domain.repo.query.query_time", tags: [:query], unit: {:native, :millisecond}), summary("domain.repo.query.queue_time", unit: {:native, :millisecond}), summary("domain.repo.query.idle_time", unit: {:native, :millisecond}), + # Phoenix Metrics + summary("phoenix.endpoint.start.system_time", + unit: {:native, :millisecond} + ), + summary("phoenix.endpoint.stop.duration", + unit: {:native, :millisecond} + ), + summary("phoenix.router_dispatch.start.system_time", + tags: [:route], + unit: {:native, :millisecond} + ), + summary("phoenix.router_dispatch.exception.duration", + tags: [:route], + unit: {:native, :millisecond} + ), + summary("phoenix.router_dispatch.stop.duration", + tags: [:route], + unit: {:native, :millisecond} + ), + summary("phoenix.socket_connected.duration", + unit: {:native, :millisecond} + ), + summary("phoenix.channel_join.duration", + unit: {:native, :millisecond} + ), + summary("phoenix.channel_handled_in.duration", + tags: [:event], + unit: {:native, :millisecond} + ), + # VM Metrics summary("vm.memory.total", unit: {:byte, :kilobyte}), summary("vm.total_run_queue_lengths.total"), summary("vm.total_run_queue_lengths.cpu"), - summary("vm.total_run_queue_lengths.io") + summary("vm.total_run_queue_lengths.io"), + + # Application metrics + last_value("domain.relays.online_relays_count"), + last_value("domain.metrics.discovered_nodes_count") ] end defp periodic_measurements do [ - # A module, function and arguments to be invoked periodically. - # This function must call :telemetry.execute/3 and a metric must be added above. - # {Web, :count_users, []} + {Domain.Relays, :send_metrics, []} ] end end diff --git a/elixir/apps/domain/lib/domain/telemetry/google_cloud_metrics_reporter.ex b/elixir/apps/domain/lib/domain/telemetry/google_cloud_metrics_reporter.ex new file mode 100644 index 000000000..ac2ad2d85 --- /dev/null +++ b/elixir/apps/domain/lib/domain/telemetry/google_cloud_metrics_reporter.ex @@ -0,0 +1,458 @@ +defmodule Domain.Telemetry.GoogleCloudMetricsReporter do + @doc """ + This module implement Telemetry reporter that sends metrics to Google Cloud Monitoring API, + with a *best effort* approach. It means that if the reporter fails to send metrics to the API, + it will not retry, but it will keep trying to send new metrics. + """ + use GenServer + alias Telemetry.Metrics + alias Domain.GoogleCloudPlatform + require Logger + + # Maximum number of metrics a buffer can hold, + # after this count they will be delivered or flushed right away. + @buffer_size 100 + + # Maximum time in seconds to wait before flushing the buffer + # in case it did not reach the @buffer_size limit within the flush interval + @flush_interval :timer.seconds(10) + + def start_link(opts) do + project_id = + Application.fetch_env!(:domain, __MODULE__) + |> Keyword.fetch!(:project_id) + + metrics = Keyword.fetch!(opts, :metrics) + GenServer.start_link(__MODULE__, {metrics, project_id}) + end + + @impl true + def init({metrics, project_id}) do + Process.flag(:trap_exit, true) + groups = Enum.group_by(metrics, & &1.event_name) + + node_name = + System.get_env("RELEASE_NODE") || + Node.self() + + application_version = + System.get_env("RELEASE_VERSION") || + to_string(Application.spec(:domain, :vsn)) + + labels = %{node_name: node_name, application_version: application_version} + + {:ok, instance_id} = GoogleCloudPlatform.fetch_instance_id() + {:ok, zone} = GoogleCloudPlatform.fetch_instance_zone() + + resource = %{ + "type" => "gce_instance", + "labels" => %{ + "project_id" => project_id, + "instance_id" => instance_id, + "zone" => zone + } + } + + events = + for {event, metrics} <- groups do + id = {__MODULE__, event, self()} + :telemetry.attach(id, event, &__MODULE__.handle_event/4, {metrics, self()}) + event + end + + Process.send_after(self(), :flush, @flush_interval) + + {:ok, {events, project_id, {resource, labels}, {0, %{}}}} + end + + @impl true + def terminate(_, {events, _project_id, _resource_and_labels, {_buffer_size, _buffer}}) do + for event <- events do + :telemetry.detach({__MODULE__, event, self()}) + end + + :ok + end + + ## Metrics aggregation and delivery + + @impl true + def handle_info( + {:compressed_metrics, compressed}, + {events, project_id, {resource, labels}, {buffer_size, buffer}} + ) do + {buffer_size, buffer} = + Enum.reduce( + compressed, + {buffer_size, buffer}, + fn {schema, name, tags, at, measurement, unit}, {buffer_size, buffer} -> + {increment, buffer} = + Map.get_and_update(buffer, {schema, name, tags, unit}, fn prev_value -> + buffer(schema, prev_value, at, measurement) + end) + + {buffer_size + increment, buffer} + end + ) + + {buffer_size, buffer} = + if buffer_size >= @buffer_size do + flush(project_id, resource, labels, {buffer_size, buffer}) + else + {buffer_size, buffer} + end + + {:noreply, {events, project_id, {resource, labels}, {buffer_size, buffer}}} + end + + def handle_info(:flush, {events, project_id, {resource, labels}, {buffer_size, buffer}}) do + {buffer_size, buffer} = flush(project_id, resource, labels, {buffer_size, buffer}) + Process.send_after(self(), :flush, @flush_interval) + {:noreply, {events, project_id, {resource, labels}, {buffer_size, buffer}}} + end + + # counts the total number of emitted events + defp buffer(Metrics.Counter, nil, at, _measurement) do + {1, {at, at, 1}} + end + + defp buffer(Metrics.Counter, {started_at, _ended_at, num}, ended_at, _measurement) do + {0, {started_at, ended_at, num + 1}} + end + + # builds a histogram of selected measurement + defp buffer(Metrics.Distribution, nil, at, measurement) do + buckets = init_buckets() |> update_buckets(measurement) + {1, {at, at, {1, measurement, measurement, measurement, 0, buckets}}} + end + + defp buffer( + Metrics.Distribution, + {started_at, _ended_at, {count, sum, min, max, squared_deviation_sum, buckets}}, + ended_at, + measurement + ) do + count = count + 1 + sum = sum + measurement + min = min(min, measurement) + max = max(max, measurement) + mean = sum / count + deviation = measurement - mean + squared_deviation_sum = squared_deviation_sum + :math.pow(deviation, 2) + buckets = update_buckets(buckets, measurement) + + {0, {started_at, ended_at, {count, sum, min, max, squared_deviation_sum, buckets}}} + end + + # keeps track of the sum of selected measurement + defp buffer(Metrics.Sum, nil, at, measurement) do + {1, {at, at, measurement}} + end + + defp buffer(Metrics.Sum, {started_at, _ended_at, sum}, ended_at, measurement) do + {0, {started_at, ended_at, sum + measurement}} + end + + # calculating statistics of the selected measurement, like maximum, mean, percentiles etc + # since google does not support more than one metric point per 5 seconds we must aggregate them + defp buffer(Metrics.Summary, nil, at, measurement) do + buckets = init_buckets() |> update_buckets(measurement) + {1, {at, at, {1, measurement, measurement, measurement, 0, buckets}}} + end + + defp buffer( + Metrics.Summary, + {started_at, _ended_at, {count, sum, min, max, squared_deviation_sum, buckets}}, + ended_at, + measurement + ) do + count = count + 1 + sum = sum + measurement + min = min(min, measurement) + max = max(max, measurement) + mean = sum / count + deviation = measurement - mean + squared_deviation_sum = squared_deviation_sum + :math.pow(deviation, 2) + buckets = update_buckets(buckets, measurement) + + {0, {started_at, ended_at, {count, sum, min, max, squared_deviation_sum, buckets}}} + end + + # holding the value of the selected measurement from the most recent event + defp buffer(Metrics.LastValue, nil, at, measurement) do + {1, {at, at, measurement}} + end + + defp buffer(Metrics.LastValue, {started_at, _ended_at, _measurement}, ended_at, measurement) do + {0, {started_at, ended_at, measurement}} + end + + defp init_buckets do + # add an underflow bucket + %{0 => 0} + end + + # We use exponential bucketing for histograms and distributions + defp update_buckets(buckets, measurement) do + # Determine the nearest power of 2 for the measurement + power_of_2 = + if measurement <= 0 do + 0 + else + :math.pow(2, :math.ceil(:math.log2(measurement))) + end + + # put measurement into this bucket + Map.update(buckets, trunc(power_of_2), 1, &(&1 + 1)) + end + + defp bucket_counts(buckets) do + value_buckets = + buckets + |> Enum.sort_by(&elem(&1, 0)) + |> Enum.map(fn {_bucket, count} -> + count + end) + + # append an overflow bucket which will be empty + value_buckets ++ [0] + end + + defp flush(project_id, resource, labels, {buffer_size, buffer}) do + buffer + |> Enum.flat_map(fn {{schema, name, tags, unit}, measurements} -> + labels = Map.merge(labels, tags) + format_time_series(schema, name, labels, resource, measurements, unit) + end) + |> Enum.chunk_every(200) + |> Enum.each(fn time_series -> + case GoogleCloudPlatform.send_metrics(project_id, time_series) do + :ok -> + :ok + + {:error, reason} -> + Logger.warning("Failed to send metrics to Google Cloud Monitoring API", + reason: inspect(reason), + count: buffer_size + ) + end + end) + + {0, %{}} + end + + defp format_time_series(Metrics.Counter, name, labels, resource, measurements, unit) do + {started_at, ended_at, count} = measurements + + [ + %{ + metric: %{ + type: "custom.googleapis.com/elixir/#{Enum.join(name, "/")}/count", + labels: labels + }, + resource: resource, + unit: to_string(unit), + metricKind: "CUMULATIVE", + valueType: "INT64", + points: [ + %{ + interval: %{"startTime" => started_at, "endTime" => ended_at}, + value: %{"int64Value" => count} + } + ] + } + ] + end + + # builds a histogram of selected measurement + defp format_time_series(Metrics.Distribution, name, labels, resource, measurements, unit) do + {started_at, ended_at, {count, sum, _min, _max, squared_deviation_sum, buckets}} = + measurements + + [ + %{ + metric: %{ + type: "custom.googleapis.com/elixir/#{Enum.join(name, "/")}/distribution", + labels: labels + }, + resource: resource, + unit: to_string(unit), + metricKind: "CUMULATIVE", + valueType: "DISTRIBUTION", + points: [ + %{ + interval: %{"startTime" => started_at, "endTime" => ended_at}, + value: %{ + "distributionValue" => %{ + "count" => count, + "mean" => sum / count, + "sumOfSquaredDeviation" => squared_deviation_sum, + "bucketOptions" => %{ + "exponentialBuckets" => %{ + "numFiniteBuckets" => Enum.count(buckets), + "growthFactor" => 2, + "scale" => 1 + } + }, + "bucketCounts" => bucket_counts(buckets) + } + } + } + ] + } + ] + end + + # keeps track of the sum of selected measurement + defp format_time_series(Metrics.Sum, name, labels, resource, measurements, unit) do + {started_at, ended_at, sum} = measurements + + [ + %{ + metric: %{ + type: "custom.googleapis.com/elixir/#{Enum.join(name, "/")}/sum", + labels: labels + }, + resource: resource, + unit: to_string(unit), + metricKind: "CUMULATIVE", + valueType: "DOUBLE", + points: [ + %{ + interval: %{"startTime" => started_at, "endTime" => ended_at}, + value: %{"doubleValue" => sum} + } + ] + } + ] + end + + # calculating statistics of the selected measurement, like maximum, mean, percentiles etc + defp format_time_series(Metrics.Summary, name, labels, resource, measurements, unit) do + {started_at, ended_at, {count, sum, min, max, squared_deviation_sum, buckets}} = measurements + + [ + %{ + metric: %{ + type: "custom.googleapis.com/elixir/#{Enum.join(name, "/")}/summary", + labels: labels + }, + resource: resource, + unit: to_string(unit), + metricKind: "CUMULATIVE", + valueType: "DISTRIBUTION", + points: [ + %{ + interval: %{"startTime" => started_at, "endTime" => ended_at}, + value: %{ + "distributionValue" => %{ + "count" => count, + "mean" => sum / count, + "sumOfSquaredDeviation" => squared_deviation_sum, + "bucketOptions" => %{ + "exponentialBuckets" => %{ + "numFiniteBuckets" => Enum.count(buckets), + "growthFactor" => 2, + "scale" => 1 + } + }, + "bucketCounts" => bucket_counts(buckets) + } + } + } + ] + }, + %{ + metric: %{ + type: "custom.googleapis.com/elixir/#{Enum.join(name, "/")}/min", + labels: labels + }, + resource: resource, + unit: to_string(unit), + metricKind: "CUMULATIVE", + valueType: "DOUBLE", + points: [ + %{ + interval: %{"startTime" => started_at, "endTime" => ended_at}, + value: %{"doubleValue" => min} + } + ] + }, + %{ + metric: %{ + type: "custom.googleapis.com/elixir/#{Enum.join(name, "/")}/max", + labels: labels + }, + resource: resource, + unit: to_string(unit), + metricKind: "CUMULATIVE", + valueType: "DOUBLE", + points: [ + %{ + interval: %{"startTime" => started_at, "endTime" => ended_at}, + value: %{"doubleValue" => max} + } + ] + } + ] + end + + # holding the value of the selected measurement from the most recent event + defp format_time_series(Metrics.LastValue, name, labels, resource, measurements, unit) do + {started_at, ended_at, last_value} = measurements + + [ + %{ + metric: %{ + type: "custom.googleapis.com/elixir/#{Enum.join(name, "/")}/last_value", + labels: labels + }, + resource: resource, + unit: to_string(unit), + metricKind: "CUMULATIVE", + valueType: "DOUBLE", + points: [ + %{ + interval: %{"startTime" => started_at, "endTime" => ended_at}, + value: %{"doubleValue" => last_value} + } + ] + } + ] + end + + ## Telemetry handlers + + @doc false + def handle_event(_event_name, measurements, metadata, {metrics, aggregator_pid}) do + now = DateTime.utc_now() |> DateTime.to_iso8601() + + compressed_metrics = + for %schema{} = metric <- metrics, + keep?(metric, metadata), + measurement = extract_measurement(metric, measurements, metadata) do + tags = extract_tags(metric, metadata) + {schema, metric.name, tags, now, measurement, metric.unit} + end + + send(aggregator_pid, {:compressed_metrics, compressed_metrics}) + + :ok + end + + defp keep?(%{keep: nil}, _metadata), do: true + defp keep?(metric, metadata), do: metric.keep.(metadata) + + defp extract_measurement(metric, measurements, metadata) do + case metric.measurement do + fun when is_function(fun, 2) -> fun.(measurements, metadata) + fun when is_function(fun, 1) -> fun.(measurements) + key -> measurements[key] + end + end + + defp extract_tags(metric, metadata) do + tag_values = metric.tag_values.(metadata) + Map.take(tag_values, metric.tags) + end +end diff --git a/elixir/apps/domain/test/domain/google_cloud_platform_test.exs b/elixir/apps/domain/test/domain/google_cloud_platform_test.exs index d2f8956a7..68478e724 100644 --- a/elixir/apps/domain/test/domain/google_cloud_platform_test.exs +++ b/elixir/apps/domain/test/domain/google_cloud_platform_test.exs @@ -98,7 +98,7 @@ defmodule Domain.GoogleCloudPlatformTest do }) == {:error, %Mint.TransportError{reason: :econnrefused}} GoogleCloudPlatform.override_endpoint_url( - :token_endpoint_url, + :metadata_endpoint_url, "http://localhost:#{bypass.port}/" ) @@ -164,4 +164,71 @@ defmodule Domain.GoogleCloudPlatformTest do assert conn.method == "POST" end end + + describe "send_metrics/3" do + test "returns list of nodes in all regions when access token is not set", %{bypass: bypass} do + GoogleCloudPlatform.mock_instance_metadata_token_endpoint(bypass) + GoogleCloudPlatform.mock_metrics_submit_endpoint(bypass) + + time_series = [ + %{ + "metric" => %{ + "type" => "custom.googleapis.com/my_metric", + "labels" => %{ + "my_label" => "my_value" + } + }, + "resource" => %{ + "type" => "gce_instance", + "labels" => %{ + "project_id" => "firezone-staging", + "instance_id" => "1234567890123456789", + "zone" => "us-central1-f" + } + }, + "points" => [ + %{ + "interval" => %{ + "endTime" => "2024-04-05T10:00:00-04:00" + }, + "value" => %{ + "doubleValue" => 123.45 + } + } + ] + } + ] + + assert send_metrics("firezone-staging", time_series) == :ok + + assert_receive {:bypass_request, conn, body} + + assert conn.request_path == "/v3/projects/firezone-staging/timeSeries" + assert body == %{"timeSeries" => time_series} + end + + test "returns error when compute endpoint is down", %{bypass: bypass} do + GoogleCloudPlatform.mock_instance_metadata_token_endpoint(bypass) + + Bypass.down(bypass) + + GoogleCloudPlatform.override_endpoint_url( + :cloud_metrics_endpoint_url, + "http://localhost:#{bypass.port}/" + ) + + assert send_metrics("firezone-staging", %{ + "cluster_name" => "firezone" + }) == {:error, %Mint.TransportError{reason: :econnrefused}} + + GoogleCloudPlatform.override_endpoint_url( + :metadata_endpoint_url, + "http://localhost:#{bypass.port}/" + ) + + assert send_metrics("firezone-staging", %{ + "cluster_name" => "firezone" + }) == {:error, %Mint.TransportError{reason: :econnrefused}} + end + end end diff --git a/elixir/apps/domain/test/domain/telemetry/google_cloud_metrics_reporter_test.exs b/elixir/apps/domain/test/domain/telemetry/google_cloud_metrics_reporter_test.exs new file mode 100644 index 000000000..c11d11d5c --- /dev/null +++ b/elixir/apps/domain/test/domain/telemetry/google_cloud_metrics_reporter_test.exs @@ -0,0 +1,493 @@ +defmodule Domain.Telemetry.GoogleCloudMetricsReporterTest do + use ExUnit.Case, async: true + import Domain.Telemetry.GoogleCloudMetricsReporter + alias Domain.Mocks.GoogleCloudPlatform + + describe "handle_info/2 for :compressed_metrics" do + test "aggregates and delivers Metrics.Counter metrics" do + Bypass.open() + |> GoogleCloudPlatform.mock_instance_metadata_token_endpoint() + |> GoogleCloudPlatform.mock_metrics_submit_endpoint() + + now = DateTime.utc_now() + one_minute_ago = DateTime.add(now, -1, :minute) + two_minutes_ago = DateTime.add(now, -2, :minute) + + tags = {%{type: "test"}, %{app: "myapp"}} + + assert {:noreply, {[], "proj", ^tags, {buffer_size, buffer}} = state} = + handle_info( + {:compressed_metrics, + [ + {Telemetry.Metrics.Counter, [:foo], %{"foo" => "bar"}, two_minutes_ago, 1, + :request} + ]}, + {[], "proj", tags, {0, %{}}} + ) + + assert buffer_size == 1 + + assert buffer == %{ + {Telemetry.Metrics.Counter, [:foo], %{"foo" => "bar"}, :request} => + {two_minutes_ago, two_minutes_ago, 1} + } + + assert {:noreply, {_, _, _, {buffer_size, buffer}} = state} = + handle_info( + {:compressed_metrics, + [ + {Telemetry.Metrics.Counter, [:foo], %{"foo" => "bar"}, one_minute_ago, 1.1, + :request} + ]}, + state + ) + + assert buffer_size == 1 + + assert buffer == %{ + {Telemetry.Metrics.Counter, [:foo], %{"foo" => "bar"}, :request} => + {two_minutes_ago, one_minute_ago, 2} + } + + assert {:noreply, {_, _, _, {0, %{}}}} = handle_info(:flush, state) + + assert_receive {:bypass_request, _conn, body} + + assert body == %{ + "timeSeries" => [ + %{ + "metric" => %{ + "type" => "custom.googleapis.com/elixir/foo/count", + "labels" => %{"foo" => "bar", "app" => "myapp"} + }, + "resource" => %{"type" => "test"}, + "unit" => "request", + "metricKind" => "CUMULATIVE", + "valueType" => "INT64", + "points" => [ + %{ + "interval" => %{ + "endTime" => DateTime.to_iso8601(one_minute_ago), + "startTime" => DateTime.to_iso8601(two_minutes_ago) + }, + "value" => %{"int64Value" => 2} + } + ] + } + ] + } + end + + test "aggregates and delivers Metrics.Distribution metrics" do + Bypass.open() + |> GoogleCloudPlatform.mock_instance_metadata_token_endpoint() + |> GoogleCloudPlatform.mock_metrics_submit_endpoint() + + now = DateTime.utc_now() + one_minute_ago = DateTime.add(now, -1, :minute) + two_minutes_ago = DateTime.add(now, -2, :minute) + + tags = {%{type: "test"}, %{app: "myapp"}} + + assert {:noreply, {[], "proj", ^tags, {buffer_size, buffer}} = state} = + handle_info( + {:compressed_metrics, + [ + {Telemetry.Metrics.Distribution, [:foo], %{"foo" => "bar"}, two_minutes_ago, + 5.5, :request} + ]}, + {[], "proj", tags, {0, %{}}} + ) + + assert buffer_size == 1 + + assert buffer == %{ + {Telemetry.Metrics.Distribution, [:foo], %{"foo" => "bar"}, :request} => + {two_minutes_ago, two_minutes_ago, {1, 5.5, 5.5, 5.5, 0, %{0 => 0, 8 => 1}}} + } + + assert {:noreply, {_, _, _, {buffer_size, buffer}} = state} = + handle_info( + {:compressed_metrics, + [ + {Telemetry.Metrics.Distribution, [:foo], %{"foo" => "bar"}, one_minute_ago, + 11.3, :request} + ]}, + state + ) + + assert buffer_size == 1 + + assert buffer == %{ + {Telemetry.Metrics.Distribution, [:foo], %{"foo" => "bar"}, :request} => + {two_minutes_ago, one_minute_ago, + {2, 16.8, 5.5, 11.3, 8.410000000000002, %{0 => 0, 8 => 1, 16 => 1}}} + } + + assert {:noreply, {_, _, _, {buffer_size, buffer}} = state} = + handle_info( + {:compressed_metrics, + [ + {Telemetry.Metrics.Distribution, [:foo], %{"foo" => "bar"}, one_minute_ago, + -1, :request} + ]}, + state + ) + + assert buffer_size == 1 + + assert buffer == %{ + {Telemetry.Metrics.Distribution, [:foo], %{"foo" => "bar"}, :request} => + {two_minutes_ago, one_minute_ago, + {3, 15.8, -1, 11.3, 47.681111111111115, %{0 => 1, 8 => 1, 16 => 1}}} + } + + assert {:noreply, {_, _, _, {0, %{}}}} = handle_info(:flush, state) + + assert_receive {:bypass_request, _conn, body} + + assert body == %{ + "timeSeries" => [ + %{ + "metric" => %{ + "type" => "custom.googleapis.com/elixir/foo/distribution", + "labels" => %{"foo" => "bar", "app" => "myapp"} + }, + "resource" => %{"type" => "test"}, + "unit" => "request", + "metricKind" => "CUMULATIVE", + "valueType" => "DISTRIBUTION", + "points" => [ + %{ + "interval" => %{ + "endTime" => DateTime.to_iso8601(one_minute_ago), + "startTime" => DateTime.to_iso8601(two_minutes_ago) + }, + "value" => %{ + "distributionValue" => %{ + "count" => 3, + "mean" => 5.266666666666667, + "sumOfSquaredDeviation" => 47.681111111111115, + "bucketCounts" => [1, 1, 1, 0], + "bucketOptions" => %{ + "exponentialBuckets" => %{ + "growthFactor" => 2, + "numFiniteBuckets" => 3, + "scale" => 1 + } + } + } + } + } + ] + } + ] + } + end + + test "aggregates and delivers Metrics.Sum metrics" do + Bypass.open() + |> GoogleCloudPlatform.mock_instance_metadata_token_endpoint() + |> GoogleCloudPlatform.mock_metrics_submit_endpoint() + + now = DateTime.utc_now() + one_minute_ago = DateTime.add(now, -1, :minute) + two_minutes_ago = DateTime.add(now, -2, :minute) + + tags = {%{type: "test"}, %{app: "myapp"}} + + assert {:noreply, {[], "proj", ^tags, {buffer_size, buffer}} = state} = + handle_info( + {:compressed_metrics, + [ + {Telemetry.Metrics.Sum, [:foo], %{"foo" => "bar"}, two_minutes_ago, 1, + :request} + ]}, + {[], "proj", tags, {0, %{}}} + ) + + assert buffer_size == 1 + + assert buffer == %{ + {Telemetry.Metrics.Sum, [:foo], %{"foo" => "bar"}, :request} => + {two_minutes_ago, two_minutes_ago, 1} + } + + assert {:noreply, {_, _, _, {buffer_size, buffer}} = state} = + handle_info( + {:compressed_metrics, + [ + {Telemetry.Metrics.Sum, [:foo], %{"foo" => "bar"}, one_minute_ago, 2.19, + :request} + ]}, + state + ) + + assert buffer_size == 1 + + assert buffer == %{ + {Telemetry.Metrics.Sum, [:foo], %{"foo" => "bar"}, :request} => + {two_minutes_ago, one_minute_ago, 3.19} + } + + assert {:noreply, {_, _, _, {0, %{}}}} = handle_info(:flush, state) + + assert_receive {:bypass_request, _conn, body} + + assert body == %{ + "timeSeries" => [ + %{ + "metric" => %{ + "type" => "custom.googleapis.com/elixir/foo/sum", + "labels" => %{"foo" => "bar", "app" => "myapp"} + }, + "resource" => %{"type" => "test"}, + "unit" => "request", + "metricKind" => "CUMULATIVE", + "valueType" => "DOUBLE", + "points" => [ + %{ + "interval" => %{ + "endTime" => DateTime.to_iso8601(one_minute_ago), + "startTime" => DateTime.to_iso8601(two_minutes_ago) + }, + "value" => %{"doubleValue" => 3.19} + } + ] + } + ] + } + end + + test "aggregates and delivers Metrics.Summary metrics" do + Bypass.open() + |> GoogleCloudPlatform.mock_instance_metadata_token_endpoint() + |> GoogleCloudPlatform.mock_metrics_submit_endpoint() + + now = DateTime.utc_now() + one_minute_ago = DateTime.add(now, -1, :minute) + two_minutes_ago = DateTime.add(now, -2, :minute) + + tags = {%{type: "test"}, %{app: "myapp"}} + + assert {:noreply, {[], "proj", ^tags, {buffer_size, buffer}} = state} = + handle_info( + {:compressed_metrics, + [ + {Telemetry.Metrics.Summary, [:foo], %{"foo" => "bar"}, two_minutes_ago, 5.5, + :request} + ]}, + {[], "proj", tags, {0, %{}}} + ) + + assert buffer_size == 1 + + assert buffer == %{ + {Telemetry.Metrics.Summary, [:foo], %{"foo" => "bar"}, :request} => + {two_minutes_ago, two_minutes_ago, {1, 5.5, 5.5, 5.5, 0, %{0 => 0, 8 => 1}}} + } + + assert {:noreply, {_, _, _, {buffer_size, buffer}} = state} = + handle_info( + {:compressed_metrics, + [ + {Telemetry.Metrics.Summary, [:foo], %{"foo" => "bar"}, one_minute_ago, 11.3, + :request} + ]}, + state + ) + + assert buffer_size == 1 + + assert buffer == %{ + {Telemetry.Metrics.Summary, [:foo], %{"foo" => "bar"}, :request} => + {two_minutes_ago, one_minute_ago, + {2, 16.8, 5.5, 11.3, 8.410000000000002, %{0 => 0, 8 => 1, 16 => 1}}} + } + + assert {:noreply, {_, _, _, {0, %{}}}} = handle_info(:flush, state) + + assert_receive {:bypass_request, _conn, body} + + assert body == %{ + "timeSeries" => [ + %{ + "metric" => %{ + "type" => "custom.googleapis.com/elixir/foo/summary", + "labels" => %{"foo" => "bar", "app" => "myapp"} + }, + "resource" => %{"type" => "test"}, + "unit" => "request", + "metricKind" => "CUMULATIVE", + "valueType" => "DISTRIBUTION", + "points" => [ + %{ + "interval" => %{ + "endTime" => DateTime.to_iso8601(one_minute_ago), + "startTime" => DateTime.to_iso8601(two_minutes_ago) + }, + "value" => %{ + "distributionValue" => %{ + "count" => 2, + "mean" => 8.4, + "sumOfSquaredDeviation" => 8.410000000000002, + "bucketCounts" => [0, 1, 1, 0], + "bucketOptions" => %{ + "exponentialBuckets" => %{ + "growthFactor" => 2, + "numFiniteBuckets" => 3, + "scale" => 1 + } + } + } + } + } + ] + }, + %{ + "metric" => %{ + "labels" => %{"app" => "myapp", "foo" => "bar"}, + "type" => "custom.googleapis.com/elixir/foo/min" + }, + "metricKind" => "CUMULATIVE", + "points" => [ + %{ + "interval" => %{ + "endTime" => DateTime.to_iso8601(one_minute_ago), + "startTime" => DateTime.to_iso8601(two_minutes_ago) + }, + "value" => %{"doubleValue" => 5.5} + } + ], + "resource" => %{"type" => "test"}, + "unit" => "request", + "valueType" => "DOUBLE" + }, + %{ + "metric" => %{ + "labels" => %{"app" => "myapp", "foo" => "bar"}, + "type" => "custom.googleapis.com/elixir/foo/max" + }, + "metricKind" => "CUMULATIVE", + "points" => [ + %{ + "interval" => %{ + "endTime" => DateTime.to_iso8601(one_minute_ago), + "startTime" => DateTime.to_iso8601(two_minutes_ago) + }, + "value" => %{"doubleValue" => 11.3} + } + ], + "resource" => %{"type" => "test"}, + "unit" => "request", + "valueType" => "DOUBLE" + } + ] + } + end + + test "aggregates and delivers Metrics.LastValue metrics" do + Bypass.open() + |> GoogleCloudPlatform.mock_instance_metadata_token_endpoint() + |> GoogleCloudPlatform.mock_metrics_submit_endpoint() + + now = DateTime.utc_now() + one_minute_ago = DateTime.add(now, -1, :minute) + two_minutes_ago = DateTime.add(now, -2, :minute) + + tags = {%{type: "test"}, %{app: "myapp"}} + + assert {:noreply, {[], "proj", ^tags, {buffer_size, buffer}} = state} = + handle_info( + {:compressed_metrics, + [ + {Telemetry.Metrics.LastValue, [:foo], %{"foo" => "bar"}, two_minutes_ago, 1, + :request} + ]}, + {[], "proj", tags, {0, %{}}} + ) + + assert buffer_size == 1 + + assert buffer == %{ + {Telemetry.Metrics.LastValue, [:foo], %{"foo" => "bar"}, :request} => + {two_minutes_ago, two_minutes_ago, 1} + } + + assert {:noreply, {_, _, _, {buffer_size, buffer}} = state} = + handle_info( + {:compressed_metrics, + [ + {Telemetry.Metrics.LastValue, [:foo], %{"foo" => "bar"}, one_minute_ago, -1, + :request} + ]}, + state + ) + + assert buffer_size == 1 + + assert buffer == %{ + {Telemetry.Metrics.LastValue, [:foo], %{"foo" => "bar"}, :request} => + {two_minutes_ago, one_minute_ago, -1} + } + + assert {:noreply, {_, _, _, {0, %{}}}} = handle_info(:flush, state) + + assert_receive {:bypass_request, _conn, body} + + assert body == %{ + "timeSeries" => [ + %{ + "metric" => %{ + "type" => "custom.googleapis.com/elixir/foo/last_value", + "labels" => %{"foo" => "bar", "app" => "myapp"} + }, + "resource" => %{"type" => "test"}, + "unit" => "request", + "metricKind" => "CUMULATIVE", + "valueType" => "DOUBLE", + "points" => [ + %{ + "interval" => %{ + "endTime" => DateTime.to_iso8601(one_minute_ago), + "startTime" => DateTime.to_iso8601(two_minutes_ago) + }, + "value" => %{"doubleValue" => -1} + } + ] + } + ] + } + end + + test "submits the metrics to Google Cloud when buffer is filled" do + Bypass.open() + |> GoogleCloudPlatform.mock_instance_metadata_token_endpoint() + |> GoogleCloudPlatform.mock_metrics_submit_endpoint() + + now = DateTime.utc_now() + tags = {%{type: "test"}, %{app: "myapp"}} + + {_, _, _, {buffer_size, buffer}} = + Enum.reduce(1..101, {[], "proj", tags, {0, %{}}}, fn i, state -> + {:noreply, state} = + handle_info( + {:compressed_metrics, + [{Telemetry.Metrics.Counter, [:foo, i], %{}, now, i, :request}]}, + state + ) + + state + end) + + assert buffer_size == 1 + + assert buffer == %{ + {Telemetry.Metrics.Counter, [:foo, 101], %{}, :request} => {now, now, 1} + } + + assert_receive {:bypass_request, _conn, %{"timeSeries" => time_series}} + assert length(time_series) == 100 + end + end +end diff --git a/elixir/apps/domain/test/support/mocks/google_cloud_platform.ex b/elixir/apps/domain/test/support/mocks/google_cloud_platform.ex index 6012f11e8..35cd797d9 100644 --- a/elixir/apps/domain/test/support/mocks/google_cloud_platform.ex +++ b/elixir/apps/domain/test/support/mocks/google_cloud_platform.ex @@ -5,8 +5,40 @@ defmodule Domain.Mocks.GoogleCloudPlatform do Domain.Config.put_env_override(:domain, Domain.GoogleCloudPlatform, config) end + def mock_instance_metadata_id_endpoint(bypass, id \\ Ecto.UUID.generate()) do + token_endpoint_path = "/instance/id" + + test_pid = self() + + Bypass.stub(bypass, "GET", token_endpoint_path, fn conn -> + conn = Plug.Conn.fetch_query_params(conn) + send(test_pid, {:bypass_request, conn}) + Plug.Conn.send_resp(conn, 200, id) + end) + + override_endpoint_url(:metadata_endpoint_url, "http://localhost:#{bypass.port}/") + + bypass + end + + def mock_instance_metadata_zone_endpoint(bypass, zone \\ "projects/001001/zones/us-east-1") do + token_endpoint_path = "/instance/zone" + + test_pid = self() + + Bypass.stub(bypass, "GET", token_endpoint_path, fn conn -> + conn = Plug.Conn.fetch_query_params(conn) + send(test_pid, {:bypass_request, conn}) + Plug.Conn.send_resp(conn, 200, zone) + end) + + override_endpoint_url(:metadata_endpoint_url, "http://localhost:#{bypass.port}/") + + bypass + end + def mock_instance_metadata_token_endpoint(bypass, resp \\ nil) do - token_endpoint_path = "computeMetadata/v1/instance/service-accounts/default/token" + token_endpoint_path = "/instance/service-accounts/default/token" resp = resp || @@ -24,10 +56,7 @@ defmodule Domain.Mocks.GoogleCloudPlatform do Plug.Conn.send_resp(conn, 200, Jason.encode!(resp)) end) - override_endpoint_url( - :token_endpoint_url, - "http://localhost:#{bypass.port}/#{token_endpoint_path}" - ) + override_endpoint_url(:metadata_endpoint_url, "http://localhost:#{bypass.port}/") bypass end @@ -180,4 +209,25 @@ defmodule Domain.Mocks.GoogleCloudPlatform do bypass end + + def mock_metrics_submit_endpoint(bypass) do + metrics_endpoint_path = "v3/projects/firezone-staging/timeSeries" + + test_pid = self() + + Bypass.expect(bypass, "POST", metrics_endpoint_path, fn conn -> + conn = Plug.Conn.fetch_query_params(conn) + {:ok, binary, conn} = Plug.Conn.read_body(conn) + body = Jason.decode!(binary) + send(test_pid, {:bypass_request, conn, body}) + Plug.Conn.send_resp(conn, 200, Jason.encode!(%{})) + end) + + override_endpoint_url( + :cloud_metrics_endpoint_url, + "http://localhost:#{bypass.port}/#{metrics_endpoint_path}" + ) + + bypass + end end diff --git a/elixir/apps/web/lib/web/application.ex b/elixir/apps/web/lib/web/application.ex index 22d7170bc..2411158a1 100644 --- a/elixir/apps/web/lib/web/application.ex +++ b/elixir/apps/web/lib/web/application.ex @@ -9,8 +9,7 @@ defmodule Web.Application do children = [ Web.Mailer, - Web.Endpoint, - Web.Telemetry + Web.Endpoint ] opts = [strategy: :one_for_one, name: Web.Supervisor] diff --git a/elixir/apps/web/lib/web/telemetry.ex b/elixir/apps/web/lib/web/telemetry.ex deleted file mode 100644 index c5cac76f4..000000000 --- a/elixir/apps/web/lib/web/telemetry.ex +++ /dev/null @@ -1,63 +0,0 @@ -defmodule Web.Telemetry do - use Supervisor - import Telemetry.Metrics - - def start_link(arg) do - Supervisor.start_link(__MODULE__, arg, name: __MODULE__) - end - - @impl true - def init(_arg) do - children = [ - # Telemetry poller will execute the given period measurements - # every 10_000ms. Learn more here: https://hexdocs.pm/telemetry_metrics - {:telemetry_poller, measurements: periodic_measurements(), period: 10_000} - # Add reporters as children of your supervision tree. - # {Telemetry.Metrics.ConsoleReporter, metrics: metrics()} - ] - - Supervisor.init(children, strategy: :one_for_one) - end - - def metrics do - [ - # Phoenix Metrics - summary("phoenix.endpoint.start.system_time", - unit: {:native, :millisecond} - ), - summary("phoenix.endpoint.stop.duration", - unit: {:native, :millisecond} - ), - summary("phoenix.router_dispatch.start.system_time", - tags: [:route], - unit: {:native, :millisecond} - ), - summary("phoenix.router_dispatch.exception.duration", - tags: [:route], - unit: {:native, :millisecond} - ), - summary("phoenix.router_dispatch.stop.duration", - tags: [:route], - unit: {:native, :millisecond} - ), - summary("phoenix.socket_connected.duration", - unit: {:native, :millisecond} - ), - summary("phoenix.channel_join.duration", - unit: {:native, :millisecond} - ), - summary("phoenix.channel_handled_in.duration", - tags: [:event], - unit: {:native, :millisecond} - ) - ] - end - - defp periodic_measurements do - [ - # A module, function and arguments to be invoked periodically. - # This function must call :telemetry.execute/3 and a metric must be added above. - # {Web, :count_users, []} - ] - end -end diff --git a/elixir/config/config.exs b/elixir/config/config.exs index 5ab66da8c..3befc24ff 100644 --- a/elixir/config/config.exs +++ b/elixir/config/config.exs @@ -37,9 +37,7 @@ config :domain, Domain.Gateways, gateway_ipv4_masquerade: true, gateway_ipv6_masquerade: true -config :domain, Domain.Telemetry, - enabled: true, - id: "firezone-dev" +config :domain, Domain.Telemetry, metrics_reporter: nil config :domain, Domain.Auth.Adapters.GoogleWorkspace.APIClient, endpoint: "https://admin.googleapis.com", @@ -64,10 +62,11 @@ config :domain, Domain.Billing, config :domain, platform_adapter: nil config :domain, Domain.GoogleCloudPlatform, - token_endpoint_url: - "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token", + metadata_endpoint_url: "http://metadata.google.internal/computeMetadata/v1", aggregated_list_endpoint_url: "https://compute.googleapis.com/compute/v1/projects/${project_id}/aggregated/instances", + cloud_metrics_endpoint_url: + "https://monitoring.googleapis.com/v3/projects/${project_id}/timeSeries", sign_endpoint_url: "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/", cloud_storage_url: "https://storage.googleapis.com" diff --git a/elixir/config/runtime.exs b/elixir/config/runtime.exs index c48142bc4..5bb002a1f 100644 --- a/elixir/config/runtime.exs +++ b/elixir/config/runtime.exs @@ -173,6 +173,12 @@ if config_env() == :prod do otlp_endpoint: System.get_env("OTLP_ENDPOINT") end + config :domain, Domain.Telemetry, metrics_reporter: compile_config!(:telemetry_metrics_reporter) + + if telemetry_metrics_reporter = compile_config!(:telemetry_metrics_reporter) do + config :domain, telemetry_metrics_reporter, compile_config!(:telemetry_metrics_reporter_opts) + end + config :domain, http_client_ssl_opts: compile_config!(:http_client_ssl_opts) diff --git a/elixir/config/test.exs b/elixir/config/test.exs index d1978175e..992c36f6e 100644 --- a/elixir/config/test.exs +++ b/elixir/config/test.exs @@ -24,9 +24,9 @@ config :domain, Domain.ConnectivityChecks, enabled: false config :domain, platform_adapter: Domain.GoogleCloudPlatform -config :domain, Domain.GoogleCloudPlatform, - project_id: "fz-test", - service_account_email: "foo@iam.example.com" +config :domain, Domain.GoogleCloudPlatform, service_account_email: "foo@iam.example.com" + +config :domain, Domain.Telemetry.GoogleCloudMetricsReporter, project_id: "fz-test" ############################### ##### Web ##################### diff --git a/elixir/rel/env.sh.eex b/elixir/rel/env.sh.eex index f989637c0..ee4fc44d6 100644 --- a/elixir/rel/env.sh.eex +++ b/elixir/rel/env.sh.eex @@ -26,9 +26,9 @@ export RELEASE_DISTRIBUTION=name # # Having a valid DNS record is important to remotely connect to a running Erlang node. if [[ "${RELEASE_HOST_DISCOVERY_METHOD}" == "gce_metadata" ]]; then - GCP_PROJECT_ID=$(curl "http://metadata.google.internal/computeMetadata/v1/project/project-id" -H "Metadata-Flavor: Google" -s) - GCP_INSTANCE_NAME=$(curl "http://metadata.google.internal/computeMetadata/v1/instance/name" -H "Metadata-Flavor: Google" -s) - GCP_INSTANCE_ZONE=$(curl "http://metadata.google.internal/computeMetadata/v1/instance/zone" -H "Metadata-Flavor: Google" -s | sed 's:.*/::') + export GCP_PROJECT_ID=$(curl "http://metadata.google.internal/computeMetadata/v1/project/project-id" -H "Metadata-Flavor: Google" -s) + export GCP_INSTANCE_NAME=$(curl "http://metadata.google.internal/computeMetadata/v1/instance/name" -H "Metadata-Flavor: Google" -s) + export GCP_INSTANCE_ZONE=$(curl "http://metadata.google.internal/computeMetadata/v1/instance/zone" -H "Metadata-Flavor: Google" -s | sed 's:.*/::') RELEASE_HOSTNAME="$GCP_INSTANCE_NAME.$GCP_INSTANCE_ZONE.c.${GCP_PROJECT_ID}.internal" else RELEASE_HOSTNAME=${RELEASE_HOSTNAME:-127.0.0.1} diff --git a/terraform/modules/google-cloud/apps/elixir/main.tf b/terraform/modules/google-cloud/apps/elixir/main.tf index f468611a6..7c0caa4ae 100644 --- a/terraform/modules/google-cloud/apps/elixir/main.tf +++ b/terraform/modules/google-cloud/apps/elixir/main.tf @@ -37,6 +37,16 @@ locals { name = "OTEL_RESOURCE_ATTRIBUTES" value = "application.name=${local.application_name}" }, + { + name = "TELEMETRY_METRICS_REPORTER" + value = "Domain.Telemetry.GoogleCloudMetricsReporter" + }, + { + name = "TELEMETRY_METRICS_REPORTER_OPTS" + value = jsonencode({ + project_id = var.project_id + }) + }, { name = "PLATFORM_ADAPTER" value = "Elixir.Domain.GoogleCloudPlatform" diff --git a/terraform/modules/google-cloud/apps/elixir/templates/cloud-init.yaml b/terraform/modules/google-cloud/apps/elixir/templates/cloud-init.yaml index cff1714a2..b4e1e95ab 100644 --- a/terraform/modules/google-cloud/apps/elixir/templates/cloud-init.yaml +++ b/terraform/modules/google-cloud/apps/elixir/templates/cloud-init.yaml @@ -53,8 +53,8 @@ write_files: [Service] TimeoutStartSec=0 Restart=always - ExecStartPre=/usr/bin/docker pull otel/opentelemetry-collector-contrib:0.90.1 - ExecStart=/usr/bin/docker run --rm -u 2000 --name=otel-collector --network host --volume /etc/otelcol-contrib/:/etc/otelcol-contrib/ otel/opentelemetry-collector-contrib:0.90.1 + ExecStartPre=/usr/bin/docker pull otel/opentelemetry-collector-contrib:0.97.0 + ExecStart=/usr/bin/docker run --rm -u 2000 --name=otel-collector --network host --volume /etc/otelcol-contrib/:/etc/otelcol-contrib/ otel/opentelemetry-collector-contrib:0.97.0 ExecStop=/usr/bin/docker stop otel-collector ExecStopPost=/usr/bin/docker rm otel-collector diff --git a/terraform/modules/google-cloud/apps/gateway-region-instance-group/templates/cloud-init.yaml b/terraform/modules/google-cloud/apps/gateway-region-instance-group/templates/cloud-init.yaml index 7821525d1..0203fcb62 100644 --- a/terraform/modules/google-cloud/apps/gateway-region-instance-group/templates/cloud-init.yaml +++ b/terraform/modules/google-cloud/apps/gateway-region-instance-group/templates/cloud-init.yaml @@ -83,8 +83,8 @@ write_files: [Service] TimeoutStartSec=0 Restart=always - ExecStartPre=/usr/bin/docker pull otel/opentelemetry-collector-contrib:0.90.1 - ExecStart=/usr/bin/docker run --rm -u 2000 --name=otel-collector --network host --volume /etc/otelcol-contrib/:/etc/otelcol-contrib/ otel/opentelemetry-collector-contrib:0.90.1 + ExecStartPre=/usr/bin/docker pull otel/opentelemetry-collector-contrib:0.97.0 + ExecStart=/usr/bin/docker run --rm -u 2000 --name=otel-collector --network host --volume /etc/otelcol-contrib/:/etc/otelcol-contrib/ otel/opentelemetry-collector-contrib:0.97.0 ExecStop=/usr/bin/docker stop otel-collector ExecStopPost=/usr/bin/docker rm otel-collector diff --git a/terraform/modules/google-cloud/apps/relay/templates/cloud-init.yaml b/terraform/modules/google-cloud/apps/relay/templates/cloud-init.yaml index 658ab7bfa..1373de5c4 100644 --- a/terraform/modules/google-cloud/apps/relay/templates/cloud-init.yaml +++ b/terraform/modules/google-cloud/apps/relay/templates/cloud-init.yaml @@ -83,8 +83,8 @@ write_files: [Service] TimeoutStartSec=0 Restart=always - ExecStartPre=/usr/bin/docker pull otel/opentelemetry-collector-contrib:0.90.1 - ExecStart=/usr/bin/docker run --rm -u 2000 --name=otel-collector --network host --volume /etc/otelcol-contrib/:/etc/otelcol-contrib/ otel/opentelemetry-collector-contrib:0.90.1 + ExecStartPre=/usr/bin/docker pull otel/opentelemetry-collector-contrib:0.97.0 + ExecStart=/usr/bin/docker run --rm -u 2000 --name=otel-collector --network host --volume /etc/otelcol-contrib/:/etc/otelcol-contrib/ otel/opentelemetry-collector-contrib:0.97.0 ExecStop=/usr/bin/docker stop otel-collector ExecStopPost=/usr/bin/docker rm otel-collector