diff --git a/elixir/apps/domain/lib/domain/telemetry/reporter/google_cloud_metrics.ex b/elixir/apps/domain/lib/domain/telemetry/reporter/google_cloud_metrics.ex index 58c8d269c..fd5d2ab8a 100644 --- a/elixir/apps/domain/lib/domain/telemetry/reporter/google_cloud_metrics.ex +++ b/elixir/apps/domain/lib/domain/telemetry/reporter/google_cloud_metrics.ex @@ -7,12 +7,7 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetrics do use GenServer alias Telemetry.Metrics - - alias Domain.{ - GoogleCloudPlatform, - Repo, - Telemetry.Reporter.Log - } + alias Domain.GoogleCloudPlatform require Logger @@ -25,10 +20,7 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetrics do # Maximum time in seconds to wait before flushing the buffer # in case it did not reach the @buffer_size limit within the flush interval - # This is bounded by the maximum rate we can send metrics to Google Cloud. - # A locking mechanism is used to ensure multiple nodes flushing - # metrics independently don't exceed this in aggregate. - @flush_interval 5 + @flush_interval 60 @flush_timer :timer.seconds(@flush_interval) def start_link(opts) do @@ -111,10 +103,8 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetrics do {buffer_size, buffer} = if buffer_size >= @buffer_size do - log = get_log() - # Flush immediately if the buffer is full - flush(project_id, resource, labels, {buffer_size, buffer}, log, false) + flush(project_id, resource, labels, {buffer_size, buffer}) else {buffer_size, buffer} end @@ -123,11 +113,9 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetrics do end def handle_info(:flush, {events, project_id, {resource, labels}, {buffer_size, buffer}}) do - log = get_log() - {buffer_size, buffer} = - if all_intervals_greater_than_5s?(buffer) && havent_flushed_in_over_5s?(log) do - flush(project_id, resource, labels, {buffer_size, buffer}, log, true) + if all_intervals_greater_than_5s?(buffer) do + flush(project_id, resource, labels, {buffer_size, buffer}) else {buffer_size, buffer} end @@ -139,12 +127,6 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetrics do {:noreply, {events, project_id, {resource, labels}, {buffer_size, buffer}}} end - defp get_log do - Log.Query.all() - |> Log.Query.by_reporter_module("#{__MODULE__}") - |> Repo.one() - end - # Google Cloud Monitoring API does not support sampling intervals shorter than 5 seconds defp all_intervals_greater_than_5s?(buffer) do Enum.all?(buffer, &interval_greater_than_5s?/1) @@ -177,16 +159,6 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetrics do DateTime.diff(ended_at, started_at, :second) > 5 end - defp havent_flushed_in_over_5s?(nil), do: true - - defp havent_flushed_in_over_5s?(log) do - last_flushed_at = log.last_flushed_at - now = DateTime.utc_now() - diff = DateTime.diff(now, last_flushed_at, :second) - - diff >= @flush_interval - end - # counts the total number of emitted events defp buffer(Metrics.Counter, nil, at, _measurement) do {1, {at, at, 1}} @@ -298,9 +270,7 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetrics do project_id, resource, labels, - {buffer_size, buffer}, - log, - lock + {buffer_size, buffer} ) do time_series = buffer @@ -309,52 +279,21 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetrics do format_time_series(schema, name, labels, resource, measurements, unit) end) - case update_last_flushed_at(log, lock) do - {:ok, _} -> - case GoogleCloudPlatform.send_metrics(project_id, time_series) do - :ok -> - :ok - - {:error, reason} -> - Logger.warning("Failed to send metrics to Google Cloud Monitoring API", - reason: inspect(reason), - time_series: inspect(time_series), - count: buffer_size - ) - end - + case GoogleCloudPlatform.send_metrics(project_id, time_series) do + :ok -> {0, %{}} - {:error, changeset} -> - Logger.info( - "Failed to update last_flushed_at. Waiting for next flush.", - changeset: inspect(changeset) + {:error, reason} -> + Logger.warning("Failed to send metrics to Google Cloud Monitoring API", + reason: inspect(reason), + time_series: inspect(time_series), + count: buffer_size ) {buffer_size, buffer} end end - defp update_last_flushed_at(nil, _) do - %Log{last_flushed_at: DateTime.utc_now(), reporter_module: "#{__MODULE__}"} - |> Log.Changeset.changeset() - |> Repo.insert() - end - - defp update_last_flushed_at(log, false) do - log - |> Log.Changeset.changeset() - |> Repo.update(force: true) - end - - defp update_last_flushed_at(log, true) do - log - |> Log.Changeset.changeset() - |> Log.Changeset.update_last_flushed_at_with_lock(DateTime.utc_now()) - # No fields are updated, but we need to force the update for optimistic locking to work - |> Repo.update(force: true) - end - defp truncate_labels(labels) do for {k, v} <- labels, into: %{} do {k, v |> to_string() |> truncate_label()} diff --git a/elixir/apps/domain/lib/domain/telemetry/reporter/log.ex b/elixir/apps/domain/lib/domain/telemetry/reporter/log.ex deleted file mode 100644 index 62a8780e1..000000000 --- a/elixir/apps/domain/lib/domain/telemetry/reporter/log.ex +++ /dev/null @@ -1,10 +0,0 @@ -defmodule Domain.Telemetry.Reporter.Log do - use Domain, :schema - - schema "telemetry_reporter_logs" do - field :reporter_module, :string - field :last_flushed_at, :utc_datetime_usec - - timestamps() - end -end diff --git a/elixir/apps/domain/lib/domain/telemetry/reporter/log/changeset.ex b/elixir/apps/domain/lib/domain/telemetry/reporter/log/changeset.ex deleted file mode 100644 index 406c709ae..000000000 --- a/elixir/apps/domain/lib/domain/telemetry/reporter/log/changeset.ex +++ /dev/null @@ -1,15 +0,0 @@ -defmodule Domain.Telemetry.Reporter.Log.Changeset do - use Domain, :changeset - - def changeset(%Domain.Telemetry.Reporter.Log{} = log, attrs \\ %{}) do - log - |> cast(attrs, [:reporter_module, :last_flushed_at]) - |> validate_required([:reporter_module]) - |> unique_constraint(:reporter_module) - end - - def update_last_flushed_at_with_lock(%Ecto.Changeset{} = changeset, last_flushed_at) do - changeset - |> optimistic_lock(:last_flushed_at, fn _ -> last_flushed_at end) - end -end diff --git a/elixir/apps/domain/lib/domain/telemetry/reporter/log/query.ex b/elixir/apps/domain/lib/domain/telemetry/reporter/log/query.ex deleted file mode 100644 index bb2bd8b57..000000000 --- a/elixir/apps/domain/lib/domain/telemetry/reporter/log/query.ex +++ /dev/null @@ -1,11 +0,0 @@ -defmodule Domain.Telemetry.Reporter.Log.Query do - use Domain, :query - - def all do - from(log in Domain.Telemetry.Reporter.Log, as: :log) - end - - def by_reporter_module(queryable, reporter_module) do - where(queryable, [log: log], log.reporter_module == ^reporter_module) - end -end diff --git a/elixir/apps/domain/priv/repo/conditional_migrations/20250617175017_drop_telemetry_reporter_logs.exs b/elixir/apps/domain/priv/repo/conditional_migrations/20250617175017_drop_telemetry_reporter_logs.exs new file mode 100644 index 000000000..f3c08942a --- /dev/null +++ b/elixir/apps/domain/priv/repo/conditional_migrations/20250617175017_drop_telemetry_reporter_logs.exs @@ -0,0 +1,20 @@ +defmodule Domain.Repo.Migrations.DropTelemetryReporterLogs do + use Ecto.Migration + + def up do + drop(table(:telemetry_reporter_logs)) + end + + def down do + create table(:telemetry_reporter_logs, primary_key: false) do + add(:id, :binary_id, primary_key: true) + + add(:reporter_module, :string, null: false) + add(:last_flushed_at, :utc_datetime_usec) + + timestamps(type: :utc_datetime_usec) + end + + create(index(:telemetry_reporter_logs, [:reporter_module], unique: true)) + end +end diff --git a/elixir/apps/domain/test/domain/telemetry/reporter/google_cloud_metrics_test.exs b/elixir/apps/domain/test/domain/telemetry/reporter/google_cloud_metrics_test.exs index ffa49c521..d5caad34f 100644 --- a/elixir/apps/domain/test/domain/telemetry/reporter/google_cloud_metrics_test.exs +++ b/elixir/apps/domain/test/domain/telemetry/reporter/google_cloud_metrics_test.exs @@ -2,101 +2,6 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetricsTest do use Domain.DataCase, async: true import Domain.Telemetry.Reporter.GoogleCloudMetrics alias Domain.Mocks.GoogleCloudPlatform - alias Domain.Telemetry.Reporter.Log - - describe "handle_info/2 for :flush" do - test "updates reporter_logs table with last_flushed_at" do - Bypass.open() - |> GoogleCloudPlatform.mock_instance_metadata_token_endpoint() - |> GoogleCloudPlatform.mock_metrics_submit_endpoint() - - reporter_module = "Elixir.Domain.Telemetry.Reporter.GoogleCloudMetrics" - - now = DateTime.utc_now() - one_minute_ago = DateTime.add(now, -1, :minute) - - tags = {%{type: "test"}, %{app: "myapp"}} - - assert {:noreply, {[], "proj", ^tags, {buffer_size, buffer}} = state} = - handle_info( - {:compressed_metrics, - [ - {Telemetry.Metrics.Counter, [:foo], %{"foo" => "bar"}, one_minute_ago, 1, - :request} - ]}, - {[], "proj", tags, {0, %{}}} - ) - - assert buffer_size == 1 - - assert buffer == %{ - {Telemetry.Metrics.Counter, [:foo], %{"foo" => "bar"}, :request} => - {one_minute_ago, one_minute_ago, 1} - } - - # Flush - assert {:noreply, {_, _, _, {0, %{}}}} = handle_info(:flush, state) - - assert %Log{last_flushed_at: last_flushed_at} = - Log.Query.all() - |> Log.Query.by_reporter_module(reporter_module) - |> Repo.one() - - # Assert last_flushed_at is within 3 seconds of now - assert DateTime.diff(last_flushed_at, now, :millisecond) < 3000 - - assert {:noreply, {[], "proj", ^tags, {buffer_size, buffer}} = state} = - handle_info( - {:compressed_metrics, - [ - {Telemetry.Metrics.Counter, [:foo], %{"foo" => "bar"}, one_minute_ago, 1, - :request} - ]}, - {[], "proj", tags, {0, %{}}} - ) - - assert buffer_size == 1 - - assert buffer == %{ - {Telemetry.Metrics.Counter, [:foo], %{"foo" => "bar"}, :request} => - {one_minute_ago, one_minute_ago, 1} - } - - # Flush again; assert buffer didn't change - assert {:noreply, - {_, _, _, - {1, - %{ - {Telemetry.Metrics.Counter, [:foo], %{"foo" => "bar"}, :request} => - {one_minute_ago, one_minute_ago, 1} - }}}} = handle_info(:flush, state) - - # Assert last_flushed_at is not updated - assert %Log{last_flushed_at: last_flushed_at2} = - Log.Query.all() - |> Log.Query.by_reporter_module(reporter_module) - |> Repo.one() - - assert last_flushed_at == last_flushed_at2 - - # Reset last_updated_at, flush, and assert last_flushed_at is updated - Log.Query.all() - |> Log.Query.by_reporter_module(reporter_module) - |> Repo.one() - |> Log.Changeset.changeset(%{last_flushed_at: one_minute_ago}) - |> Repo.update() - - assert {:noreply, {_, _, _, {0, %{}}}} = handle_info(:flush, state) - - assert %Log{last_flushed_at: last_flushed_at3} = - Log.Query.all() - |> Log.Query.by_reporter_module(reporter_module) - |> Repo.one() - - # Assert last_flushed_at is within 3 seconds of now - assert DateTime.diff(last_flushed_at3, now, :millisecond) < 3000 - end - end describe "handle_info/2 for :compressed_metrics" do test "aggregates and delivers Metrics.Counter metrics" do