refactor(portal): don't rely on db to gate metric reporting (#9565)

This table was added to try and gate the request rate to Google's
Metrics API.

However, this was a flawed endeavor as we later discovered that the time
series points need to be spaced apart at least 5s, not the API requests
themselves.

This PR gets rid of the table and therefore the problematic DB query
which is timing out quite often due to the contention involved in 12
elixir nodes trying to grab a lock every 5s.

Related: #9539 
Related:
https://firezone-inc.sentry.io/issues/6346235615/?project=4508756715569152&query=is%3Aunresolved&referrer=issue-stream&stream_index=1
This commit is contained in:
Jamil
2025-06-18 11:40:33 -07:00
committed by GitHub
parent a20989a819
commit 236c21111a
6 changed files with 33 additions and 205 deletions

View File

@@ -7,12 +7,7 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetrics do
use GenServer
alias Telemetry.Metrics
alias Domain.{
GoogleCloudPlatform,
Repo,
Telemetry.Reporter.Log
}
alias Domain.GoogleCloudPlatform
require Logger
@@ -25,10 +20,7 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetrics do
# Maximum time in seconds to wait before flushing the buffer
# in case it did not reach the @buffer_size limit within the flush interval
# This is bounded by the maximum rate we can send metrics to Google Cloud.
# A locking mechanism is used to ensure multiple nodes flushing
# metrics independently don't exceed this in aggregate.
@flush_interval 5
@flush_interval 60
@flush_timer :timer.seconds(@flush_interval)
def start_link(opts) do
@@ -111,10 +103,8 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetrics do
{buffer_size, buffer} =
if buffer_size >= @buffer_size do
log = get_log()
# Flush immediately if the buffer is full
flush(project_id, resource, labels, {buffer_size, buffer}, log, false)
flush(project_id, resource, labels, {buffer_size, buffer})
else
{buffer_size, buffer}
end
@@ -123,11 +113,9 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetrics do
end
def handle_info(:flush, {events, project_id, {resource, labels}, {buffer_size, buffer}}) do
log = get_log()
{buffer_size, buffer} =
if all_intervals_greater_than_5s?(buffer) && havent_flushed_in_over_5s?(log) do
flush(project_id, resource, labels, {buffer_size, buffer}, log, true)
if all_intervals_greater_than_5s?(buffer) do
flush(project_id, resource, labels, {buffer_size, buffer})
else
{buffer_size, buffer}
end
@@ -139,12 +127,6 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetrics do
{:noreply, {events, project_id, {resource, labels}, {buffer_size, buffer}}}
end
defp get_log do
Log.Query.all()
|> Log.Query.by_reporter_module("#{__MODULE__}")
|> Repo.one()
end
# Google Cloud Monitoring API does not support sampling intervals shorter than 5 seconds
defp all_intervals_greater_than_5s?(buffer) do
Enum.all?(buffer, &interval_greater_than_5s?/1)
@@ -177,16 +159,6 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetrics do
DateTime.diff(ended_at, started_at, :second) > 5
end
defp havent_flushed_in_over_5s?(nil), do: true
defp havent_flushed_in_over_5s?(log) do
last_flushed_at = log.last_flushed_at
now = DateTime.utc_now()
diff = DateTime.diff(now, last_flushed_at, :second)
diff >= @flush_interval
end
# counts the total number of emitted events
defp buffer(Metrics.Counter, nil, at, _measurement) do
{1, {at, at, 1}}
@@ -298,9 +270,7 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetrics do
project_id,
resource,
labels,
{buffer_size, buffer},
log,
lock
{buffer_size, buffer}
) do
time_series =
buffer
@@ -309,52 +279,21 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetrics do
format_time_series(schema, name, labels, resource, measurements, unit)
end)
case update_last_flushed_at(log, lock) do
{:ok, _} ->
case GoogleCloudPlatform.send_metrics(project_id, time_series) do
:ok ->
:ok
{:error, reason} ->
Logger.warning("Failed to send metrics to Google Cloud Monitoring API",
reason: inspect(reason),
time_series: inspect(time_series),
count: buffer_size
)
end
case GoogleCloudPlatform.send_metrics(project_id, time_series) do
:ok ->
{0, %{}}
{:error, changeset} ->
Logger.info(
"Failed to update last_flushed_at. Waiting for next flush.",
changeset: inspect(changeset)
{:error, reason} ->
Logger.warning("Failed to send metrics to Google Cloud Monitoring API",
reason: inspect(reason),
time_series: inspect(time_series),
count: buffer_size
)
{buffer_size, buffer}
end
end
defp update_last_flushed_at(nil, _) do
%Log{last_flushed_at: DateTime.utc_now(), reporter_module: "#{__MODULE__}"}
|> Log.Changeset.changeset()
|> Repo.insert()
end
defp update_last_flushed_at(log, false) do
log
|> Log.Changeset.changeset()
|> Repo.update(force: true)
end
defp update_last_flushed_at(log, true) do
log
|> Log.Changeset.changeset()
|> Log.Changeset.update_last_flushed_at_with_lock(DateTime.utc_now())
# No fields are updated, but we need to force the update for optimistic locking to work
|> Repo.update(force: true)
end
defp truncate_labels(labels) do
for {k, v} <- labels, into: %{} do
{k, v |> to_string() |> truncate_label()}

View File

@@ -1,10 +0,0 @@
defmodule Domain.Telemetry.Reporter.Log do
use Domain, :schema
schema "telemetry_reporter_logs" do
field :reporter_module, :string
field :last_flushed_at, :utc_datetime_usec
timestamps()
end
end

View File

@@ -1,15 +0,0 @@
defmodule Domain.Telemetry.Reporter.Log.Changeset do
use Domain, :changeset
def changeset(%Domain.Telemetry.Reporter.Log{} = log, attrs \\ %{}) do
log
|> cast(attrs, [:reporter_module, :last_flushed_at])
|> validate_required([:reporter_module])
|> unique_constraint(:reporter_module)
end
def update_last_flushed_at_with_lock(%Ecto.Changeset{} = changeset, last_flushed_at) do
changeset
|> optimistic_lock(:last_flushed_at, fn _ -> last_flushed_at end)
end
end

View File

@@ -1,11 +0,0 @@
defmodule Domain.Telemetry.Reporter.Log.Query do
use Domain, :query
def all do
from(log in Domain.Telemetry.Reporter.Log, as: :log)
end
def by_reporter_module(queryable, reporter_module) do
where(queryable, [log: log], log.reporter_module == ^reporter_module)
end
end

View File

@@ -0,0 +1,20 @@
defmodule Domain.Repo.Migrations.DropTelemetryReporterLogs do
use Ecto.Migration
def up do
drop(table(:telemetry_reporter_logs))
end
def down do
create table(:telemetry_reporter_logs, primary_key: false) do
add(:id, :binary_id, primary_key: true)
add(:reporter_module, :string, null: false)
add(:last_flushed_at, :utc_datetime_usec)
timestamps(type: :utc_datetime_usec)
end
create(index(:telemetry_reporter_logs, [:reporter_module], unique: true))
end
end

View File

@@ -2,101 +2,6 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetricsTest do
use Domain.DataCase, async: true
import Domain.Telemetry.Reporter.GoogleCloudMetrics
alias Domain.Mocks.GoogleCloudPlatform
alias Domain.Telemetry.Reporter.Log
describe "handle_info/2 for :flush" do
test "updates reporter_logs table with last_flushed_at" do
Bypass.open()
|> GoogleCloudPlatform.mock_instance_metadata_token_endpoint()
|> GoogleCloudPlatform.mock_metrics_submit_endpoint()
reporter_module = "Elixir.Domain.Telemetry.Reporter.GoogleCloudMetrics"
now = DateTime.utc_now()
one_minute_ago = DateTime.add(now, -1, :minute)
tags = {%{type: "test"}, %{app: "myapp"}}
assert {:noreply, {[], "proj", ^tags, {buffer_size, buffer}} = state} =
handle_info(
{:compressed_metrics,
[
{Telemetry.Metrics.Counter, [:foo], %{"foo" => "bar"}, one_minute_ago, 1,
:request}
]},
{[], "proj", tags, {0, %{}}}
)
assert buffer_size == 1
assert buffer == %{
{Telemetry.Metrics.Counter, [:foo], %{"foo" => "bar"}, :request} =>
{one_minute_ago, one_minute_ago, 1}
}
# Flush
assert {:noreply, {_, _, _, {0, %{}}}} = handle_info(:flush, state)
assert %Log{last_flushed_at: last_flushed_at} =
Log.Query.all()
|> Log.Query.by_reporter_module(reporter_module)
|> Repo.one()
# Assert last_flushed_at is within 3 seconds of now
assert DateTime.diff(last_flushed_at, now, :millisecond) < 3000
assert {:noreply, {[], "proj", ^tags, {buffer_size, buffer}} = state} =
handle_info(
{:compressed_metrics,
[
{Telemetry.Metrics.Counter, [:foo], %{"foo" => "bar"}, one_minute_ago, 1,
:request}
]},
{[], "proj", tags, {0, %{}}}
)
assert buffer_size == 1
assert buffer == %{
{Telemetry.Metrics.Counter, [:foo], %{"foo" => "bar"}, :request} =>
{one_minute_ago, one_minute_ago, 1}
}
# Flush again; assert buffer didn't change
assert {:noreply,
{_, _, _,
{1,
%{
{Telemetry.Metrics.Counter, [:foo], %{"foo" => "bar"}, :request} =>
{one_minute_ago, one_minute_ago, 1}
}}}} = handle_info(:flush, state)
# Assert last_flushed_at is not updated
assert %Log{last_flushed_at: last_flushed_at2} =
Log.Query.all()
|> Log.Query.by_reporter_module(reporter_module)
|> Repo.one()
assert last_flushed_at == last_flushed_at2
# Reset last_updated_at, flush, and assert last_flushed_at is updated
Log.Query.all()
|> Log.Query.by_reporter_module(reporter_module)
|> Repo.one()
|> Log.Changeset.changeset(%{last_flushed_at: one_minute_ago})
|> Repo.update()
assert {:noreply, {_, _, _, {0, %{}}}} = handle_info(:flush, state)
assert %Log{last_flushed_at: last_flushed_at3} =
Log.Query.all()
|> Log.Query.by_reporter_module(reporter_module)
|> Repo.one()
# Assert last_flushed_at is within 3 seconds of now
assert DateTime.diff(last_flushed_at3, now, :millisecond) < 3000
end
end
describe "handle_info/2 for :compressed_metrics" do
test "aggregates and delivers Metrics.Counter metrics" do