diff --git a/elixir/apps/domain/lib/domain/telemetry/reporter/google_cloud_metrics.ex b/elixir/apps/domain/lib/domain/telemetry/reporter/google_cloud_metrics.ex index fd5d2ab8a..f90b69df0 100644 --- a/elixir/apps/domain/lib/domain/telemetry/reporter/google_cloud_metrics.ex +++ b/elixir/apps/domain/lib/domain/telemetry/reporter/google_cloud_metrics.ex @@ -16,10 +16,10 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetrics do # This is bounded by the maximum number of time series points we can # send in a single Google Cloud Metrics API call. # See https://cloud.google.com/monitoring/quotas#custom_metrics_quotas - @buffer_size 200 + @buffer_capacity 200 # Maximum time in seconds to wait before flushing the buffer - # in case it did not reach the @buffer_size limit within the flush interval + # in case it did not reach the @buffer_capacity limit within the flush interval @flush_interval 60 @flush_timer :timer.seconds(@flush_interval) @@ -72,7 +72,7 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetrics do end @impl true - def terminate(_, {events, _project_id, _resource_and_labels, {_buffer_size, _buffer}}) do + def terminate(_, {events, _project_id, _resource_and_labels, {_buffer_length, _buffer}}) do for event <- events do :telemetry.detach({__MODULE__, event, self()}) end @@ -85,46 +85,49 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetrics do @impl true def handle_info( {:compressed_metrics, compressed}, - {events, project_id, {resource, labels}, {buffer_size, buffer}} + {events, project_id, {resource, labels}, {buffer_length, buffer}} ) do - {buffer_size, buffer} = + {buffer_length, buffer} = + if length(compressed) + buffer_length >= @buffer_capacity do + # Flush existing buffer if the new metrics push us over the buffer capacity + flush(project_id, resource, labels, {buffer_length, buffer}) + + {0, %{}} + else + # Otherwise, just append the new metrics to the existing buffer + {buffer_length, buffer} + end + + {buffer_length, buffer} = Enum.reduce( compressed, - {buffer_size, buffer}, - fn {schema, name, tags, at, measurement, unit}, {buffer_size, buffer} -> + {buffer_length, buffer}, + fn {schema, name, tags, at, measurement, unit}, {buffer_length, buffer} -> {increment, buffer} = Map.get_and_update(buffer, {schema, name, tags, unit}, fn prev_value -> buffer(schema, prev_value, at, measurement) end) - {buffer_size + increment, buffer} + {buffer_length + increment, buffer} end ) - {buffer_size, buffer} = - if buffer_size >= @buffer_size do - # Flush immediately if the buffer is full - flush(project_id, resource, labels, {buffer_size, buffer}) - else - {buffer_size, buffer} - end - - {:noreply, {events, project_id, {resource, labels}, {buffer_size, buffer}}} + {:noreply, {events, project_id, {resource, labels}, {buffer_length, buffer}}} end - def handle_info(:flush, {events, project_id, {resource, labels}, {buffer_size, buffer}}) do - {buffer_size, buffer} = + def handle_info(:flush, {events, project_id, {resource, labels}, {buffer_length, buffer}}) do + {buffer_length, buffer} = if all_intervals_greater_than_5s?(buffer) do - flush(project_id, resource, labels, {buffer_size, buffer}) + flush(project_id, resource, labels, {buffer_length, buffer}) else - {buffer_size, buffer} + {buffer_length, buffer} end - # Prevent starvation by introducing a randomized 2s jitter + # Stagger API calls jitter = :rand.uniform(2000) Process.send_after(self(), :flush, @flush_timer + jitter) - {:noreply, {events, project_id, {resource, labels}, {buffer_size, buffer}}} + {:noreply, {events, project_id, {resource, labels}, {buffer_length, buffer}}} end # Google Cloud Monitoring API does not support sampling intervals shorter than 5 seconds @@ -270,7 +273,7 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetrics do project_id, resource, labels, - {buffer_size, buffer} + {buffer_length, buffer} ) do time_series = buffer @@ -287,10 +290,10 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetrics do Logger.warning("Failed to send metrics to Google Cloud Monitoring API", reason: inspect(reason), time_series: inspect(time_series), - count: buffer_size + count: buffer_length ) - {buffer_size, buffer} + {buffer_length, buffer} end end diff --git a/elixir/apps/domain/test/domain/telemetry/reporter/google_cloud_metrics_test.exs b/elixir/apps/domain/test/domain/telemetry/reporter/google_cloud_metrics_test.exs index d5caad34f..468b8e367 100644 --- a/elixir/apps/domain/test/domain/telemetry/reporter/google_cloud_metrics_test.exs +++ b/elixir/apps/domain/test/domain/telemetry/reporter/google_cloud_metrics_test.exs @@ -457,7 +457,7 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetricsTest do } end - test "submits the metrics to Google Cloud when buffer is filled" do + test "submits the metrics to Google Cloud when incoming metrics surpass buffer length" do Bypass.open() |> GoogleCloudPlatform.mock_instance_metadata_token_endpoint() |> GoogleCloudPlatform.mock_metrics_submit_endpoint() @@ -465,8 +465,9 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetricsTest do now = DateTime.utc_now() tags = {%{type: "test"}, %{app: "myapp"}} + # Send 199 metrics {_, _, _, {buffer_size, buffer}} = - Enum.reduce(1..201, {[], "proj", tags, {0, %{}}}, fn i, state -> + Enum.reduce(1..199, {[], "proj", tags, {0, %{}}}, fn i, state -> {:noreply, state} = handle_info( {:compressed_metrics, @@ -477,14 +478,22 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetricsTest do state end) + assert buffer_size == 199 + + refute_receive {:bypass_request, _conn, _body} + + # Send the 200th metric, which should trigger the flush + {:noreply, {_, _, _, {buffer_size, buffer}}} = + handle_info( + {:compressed_metrics, + [{Telemetry.Metrics.Counter, [:foo, 200], %{}, now, 200, :request}]}, + {[], "proj", tags, {buffer_size, buffer}} + ) + + assert buffer == %{{Telemetry.Metrics.Counter, [:foo, 200], %{}, :request} => {now, now, 1}} assert buffer_size == 1 - - assert buffer == %{ - {Telemetry.Metrics.Counter, [:foo, 201], %{}, :request} => {now, now, 1} - } - assert_receive {:bypass_request, _conn, %{"timeSeries" => time_series}} - assert length(time_series) == 200 + assert length(time_series) == 199 end end end