From 236c21111af26459af994f1ce031e1ff28e31e89 Mon Sep 17 00:00:00 2001 From: Jamil Date: Wed, 18 Jun 2025 11:40:33 -0700 Subject: [PATCH] refactor(portal): don't rely on db to gate metric reporting (#9565) This table was added to try and gate the request rate to Google's Metrics API. However, this was a flawed endeavor as we later discovered that the time series points need to be spaced apart at least 5s, not the API requests themselves. This PR gets rid of the table and therefore the problematic DB query which is timing out quite often due to the contention involved in 12 elixir nodes trying to grab a lock every 5s. Related: #9539 Related: https://firezone-inc.sentry.io/issues/6346235615/?project=4508756715569152&query=is%3Aunresolved&referrer=issue-stream&stream_index=1 --- .../reporter/google_cloud_metrics.ex | 87 +++-------------- .../lib/domain/telemetry/reporter/log.ex | 10 -- .../telemetry/reporter/log/changeset.ex | 15 --- .../domain/telemetry/reporter/log/query.ex | 11 --- ...617175017_drop_telemetry_reporter_logs.exs | 20 ++++ .../reporter/google_cloud_metrics_test.exs | 95 ------------------- 6 files changed, 33 insertions(+), 205 deletions(-) delete mode 100644 elixir/apps/domain/lib/domain/telemetry/reporter/log.ex delete mode 100644 elixir/apps/domain/lib/domain/telemetry/reporter/log/changeset.ex delete mode 100644 elixir/apps/domain/lib/domain/telemetry/reporter/log/query.ex create mode 100644 elixir/apps/domain/priv/repo/conditional_migrations/20250617175017_drop_telemetry_reporter_logs.exs diff --git a/elixir/apps/domain/lib/domain/telemetry/reporter/google_cloud_metrics.ex b/elixir/apps/domain/lib/domain/telemetry/reporter/google_cloud_metrics.ex index 58c8d269c..fd5d2ab8a 100644 --- a/elixir/apps/domain/lib/domain/telemetry/reporter/google_cloud_metrics.ex +++ b/elixir/apps/domain/lib/domain/telemetry/reporter/google_cloud_metrics.ex @@ -7,12 +7,7 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetrics do use GenServer alias Telemetry.Metrics - - alias Domain.{ - GoogleCloudPlatform, - Repo, - Telemetry.Reporter.Log - } + alias Domain.GoogleCloudPlatform require Logger @@ -25,10 +20,7 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetrics do # Maximum time in seconds to wait before flushing the buffer # in case it did not reach the @buffer_size limit within the flush interval - # This is bounded by the maximum rate we can send metrics to Google Cloud. - # A locking mechanism is used to ensure multiple nodes flushing - # metrics independently don't exceed this in aggregate. - @flush_interval 5 + @flush_interval 60 @flush_timer :timer.seconds(@flush_interval) def start_link(opts) do @@ -111,10 +103,8 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetrics do {buffer_size, buffer} = if buffer_size >= @buffer_size do - log = get_log() - # Flush immediately if the buffer is full - flush(project_id, resource, labels, {buffer_size, buffer}, log, false) + flush(project_id, resource, labels, {buffer_size, buffer}) else {buffer_size, buffer} end @@ -123,11 +113,9 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetrics do end def handle_info(:flush, {events, project_id, {resource, labels}, {buffer_size, buffer}}) do - log = get_log() - {buffer_size, buffer} = - if all_intervals_greater_than_5s?(buffer) && havent_flushed_in_over_5s?(log) do - flush(project_id, resource, labels, {buffer_size, buffer}, log, true) + if all_intervals_greater_than_5s?(buffer) do + flush(project_id, resource, labels, {buffer_size, buffer}) else {buffer_size, buffer} end @@ -139,12 +127,6 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetrics do {:noreply, {events, project_id, {resource, labels}, {buffer_size, buffer}}} end - defp get_log do - Log.Query.all() - |> Log.Query.by_reporter_module("#{__MODULE__}") - |> Repo.one() - end - # Google Cloud Monitoring API does not support sampling intervals shorter than 5 seconds defp all_intervals_greater_than_5s?(buffer) do Enum.all?(buffer, &interval_greater_than_5s?/1) @@ -177,16 +159,6 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetrics do DateTime.diff(ended_at, started_at, :second) > 5 end - defp havent_flushed_in_over_5s?(nil), do: true - - defp havent_flushed_in_over_5s?(log) do - last_flushed_at = log.last_flushed_at - now = DateTime.utc_now() - diff = DateTime.diff(now, last_flushed_at, :second) - - diff >= @flush_interval - end - # counts the total number of emitted events defp buffer(Metrics.Counter, nil, at, _measurement) do {1, {at, at, 1}} @@ -298,9 +270,7 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetrics do project_id, resource, labels, - {buffer_size, buffer}, - log, - lock + {buffer_size, buffer} ) do time_series = buffer @@ -309,52 +279,21 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetrics do format_time_series(schema, name, labels, resource, measurements, unit) end) - case update_last_flushed_at(log, lock) do - {:ok, _} -> - case GoogleCloudPlatform.send_metrics(project_id, time_series) do - :ok -> - :ok - - {:error, reason} -> - Logger.warning("Failed to send metrics to Google Cloud Monitoring API", - reason: inspect(reason), - time_series: inspect(time_series), - count: buffer_size - ) - end - + case GoogleCloudPlatform.send_metrics(project_id, time_series) do + :ok -> {0, %{}} - {:error, changeset} -> - Logger.info( - "Failed to update last_flushed_at. Waiting for next flush.", - changeset: inspect(changeset) + {:error, reason} -> + Logger.warning("Failed to send metrics to Google Cloud Monitoring API", + reason: inspect(reason), + time_series: inspect(time_series), + count: buffer_size ) {buffer_size, buffer} end end - defp update_last_flushed_at(nil, _) do - %Log{last_flushed_at: DateTime.utc_now(), reporter_module: "#{__MODULE__}"} - |> Log.Changeset.changeset() - |> Repo.insert() - end - - defp update_last_flushed_at(log, false) do - log - |> Log.Changeset.changeset() - |> Repo.update(force: true) - end - - defp update_last_flushed_at(log, true) do - log - |> Log.Changeset.changeset() - |> Log.Changeset.update_last_flushed_at_with_lock(DateTime.utc_now()) - # No fields are updated, but we need to force the update for optimistic locking to work - |> Repo.update(force: true) - end - defp truncate_labels(labels) do for {k, v} <- labels, into: %{} do {k, v |> to_string() |> truncate_label()} diff --git a/elixir/apps/domain/lib/domain/telemetry/reporter/log.ex b/elixir/apps/domain/lib/domain/telemetry/reporter/log.ex deleted file mode 100644 index 62a8780e1..000000000 --- a/elixir/apps/domain/lib/domain/telemetry/reporter/log.ex +++ /dev/null @@ -1,10 +0,0 @@ -defmodule Domain.Telemetry.Reporter.Log do - use Domain, :schema - - schema "telemetry_reporter_logs" do - field :reporter_module, :string - field :last_flushed_at, :utc_datetime_usec - - timestamps() - end -end diff --git a/elixir/apps/domain/lib/domain/telemetry/reporter/log/changeset.ex b/elixir/apps/domain/lib/domain/telemetry/reporter/log/changeset.ex deleted file mode 100644 index 406c709ae..000000000 --- a/elixir/apps/domain/lib/domain/telemetry/reporter/log/changeset.ex +++ /dev/null @@ -1,15 +0,0 @@ -defmodule Domain.Telemetry.Reporter.Log.Changeset do - use Domain, :changeset - - def changeset(%Domain.Telemetry.Reporter.Log{} = log, attrs \\ %{}) do - log - |> cast(attrs, [:reporter_module, :last_flushed_at]) - |> validate_required([:reporter_module]) - |> unique_constraint(:reporter_module) - end - - def update_last_flushed_at_with_lock(%Ecto.Changeset{} = changeset, last_flushed_at) do - changeset - |> optimistic_lock(:last_flushed_at, fn _ -> last_flushed_at end) - end -end diff --git a/elixir/apps/domain/lib/domain/telemetry/reporter/log/query.ex b/elixir/apps/domain/lib/domain/telemetry/reporter/log/query.ex deleted file mode 100644 index bb2bd8b57..000000000 --- a/elixir/apps/domain/lib/domain/telemetry/reporter/log/query.ex +++ /dev/null @@ -1,11 +0,0 @@ -defmodule Domain.Telemetry.Reporter.Log.Query do - use Domain, :query - - def all do - from(log in Domain.Telemetry.Reporter.Log, as: :log) - end - - def by_reporter_module(queryable, reporter_module) do - where(queryable, [log: log], log.reporter_module == ^reporter_module) - end -end diff --git a/elixir/apps/domain/priv/repo/conditional_migrations/20250617175017_drop_telemetry_reporter_logs.exs b/elixir/apps/domain/priv/repo/conditional_migrations/20250617175017_drop_telemetry_reporter_logs.exs new file mode 100644 index 000000000..f3c08942a --- /dev/null +++ b/elixir/apps/domain/priv/repo/conditional_migrations/20250617175017_drop_telemetry_reporter_logs.exs @@ -0,0 +1,20 @@ +defmodule Domain.Repo.Migrations.DropTelemetryReporterLogs do + use Ecto.Migration + + def up do + drop(table(:telemetry_reporter_logs)) + end + + def down do + create table(:telemetry_reporter_logs, primary_key: false) do + add(:id, :binary_id, primary_key: true) + + add(:reporter_module, :string, null: false) + add(:last_flushed_at, :utc_datetime_usec) + + timestamps(type: :utc_datetime_usec) + end + + create(index(:telemetry_reporter_logs, [:reporter_module], unique: true)) + end +end diff --git a/elixir/apps/domain/test/domain/telemetry/reporter/google_cloud_metrics_test.exs b/elixir/apps/domain/test/domain/telemetry/reporter/google_cloud_metrics_test.exs index ffa49c521..d5caad34f 100644 --- a/elixir/apps/domain/test/domain/telemetry/reporter/google_cloud_metrics_test.exs +++ b/elixir/apps/domain/test/domain/telemetry/reporter/google_cloud_metrics_test.exs @@ -2,101 +2,6 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetricsTest do use Domain.DataCase, async: true import Domain.Telemetry.Reporter.GoogleCloudMetrics alias Domain.Mocks.GoogleCloudPlatform - alias Domain.Telemetry.Reporter.Log - - describe "handle_info/2 for :flush" do - test "updates reporter_logs table with last_flushed_at" do - Bypass.open() - |> GoogleCloudPlatform.mock_instance_metadata_token_endpoint() - |> GoogleCloudPlatform.mock_metrics_submit_endpoint() - - reporter_module = "Elixir.Domain.Telemetry.Reporter.GoogleCloudMetrics" - - now = DateTime.utc_now() - one_minute_ago = DateTime.add(now, -1, :minute) - - tags = {%{type: "test"}, %{app: "myapp"}} - - assert {:noreply, {[], "proj", ^tags, {buffer_size, buffer}} = state} = - handle_info( - {:compressed_metrics, - [ - {Telemetry.Metrics.Counter, [:foo], %{"foo" => "bar"}, one_minute_ago, 1, - :request} - ]}, - {[], "proj", tags, {0, %{}}} - ) - - assert buffer_size == 1 - - assert buffer == %{ - {Telemetry.Metrics.Counter, [:foo], %{"foo" => "bar"}, :request} => - {one_minute_ago, one_minute_ago, 1} - } - - # Flush - assert {:noreply, {_, _, _, {0, %{}}}} = handle_info(:flush, state) - - assert %Log{last_flushed_at: last_flushed_at} = - Log.Query.all() - |> Log.Query.by_reporter_module(reporter_module) - |> Repo.one() - - # Assert last_flushed_at is within 3 seconds of now - assert DateTime.diff(last_flushed_at, now, :millisecond) < 3000 - - assert {:noreply, {[], "proj", ^tags, {buffer_size, buffer}} = state} = - handle_info( - {:compressed_metrics, - [ - {Telemetry.Metrics.Counter, [:foo], %{"foo" => "bar"}, one_minute_ago, 1, - :request} - ]}, - {[], "proj", tags, {0, %{}}} - ) - - assert buffer_size == 1 - - assert buffer == %{ - {Telemetry.Metrics.Counter, [:foo], %{"foo" => "bar"}, :request} => - {one_minute_ago, one_minute_ago, 1} - } - - # Flush again; assert buffer didn't change - assert {:noreply, - {_, _, _, - {1, - %{ - {Telemetry.Metrics.Counter, [:foo], %{"foo" => "bar"}, :request} => - {one_minute_ago, one_minute_ago, 1} - }}}} = handle_info(:flush, state) - - # Assert last_flushed_at is not updated - assert %Log{last_flushed_at: last_flushed_at2} = - Log.Query.all() - |> Log.Query.by_reporter_module(reporter_module) - |> Repo.one() - - assert last_flushed_at == last_flushed_at2 - - # Reset last_updated_at, flush, and assert last_flushed_at is updated - Log.Query.all() - |> Log.Query.by_reporter_module(reporter_module) - |> Repo.one() - |> Log.Changeset.changeset(%{last_flushed_at: one_minute_ago}) - |> Repo.update() - - assert {:noreply, {_, _, _, {0, %{}}}} = handle_info(:flush, state) - - assert %Log{last_flushed_at: last_flushed_at3} = - Log.Query.all() - |> Log.Query.by_reporter_module(reporter_module) - |> Repo.one() - - # Assert last_flushed_at is within 3 seconds of now - assert DateTime.diff(last_flushed_at3, now, :millisecond) < 3000 - end - end describe "handle_info/2 for :compressed_metrics" do test "aggregates and delivers Metrics.Counter metrics" do