fix(portal): flush metrics buffer before exceeding limit (#9608)

Instead of checking for buffer surpass _after_ adding new timeseries to
it, we should check before.

Variables were renamed to be a little more clear on what they represent.
This commit is contained in:
Jamil
2025-06-20 14:44:52 -07:00
committed by GitHub
parent a1677494b5
commit e113def903
2 changed files with 46 additions and 34 deletions

View File

@@ -16,10 +16,10 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetrics do
# This is bounded by the maximum number of time series points we can
# send in a single Google Cloud Metrics API call.
# See https://cloud.google.com/monitoring/quotas#custom_metrics_quotas
@buffer_size 200
@buffer_capacity 200
# Maximum time in seconds to wait before flushing the buffer
# in case it did not reach the @buffer_size limit within the flush interval
# in case it did not reach the @buffer_capacity limit within the flush interval
@flush_interval 60
@flush_timer :timer.seconds(@flush_interval)
@@ -72,7 +72,7 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetrics do
end
@impl true
def terminate(_, {events, _project_id, _resource_and_labels, {_buffer_size, _buffer}}) do
def terminate(_, {events, _project_id, _resource_and_labels, {_buffer_length, _buffer}}) do
for event <- events do
:telemetry.detach({__MODULE__, event, self()})
end
@@ -85,46 +85,49 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetrics do
@impl true
def handle_info(
{:compressed_metrics, compressed},
{events, project_id, {resource, labels}, {buffer_size, buffer}}
{events, project_id, {resource, labels}, {buffer_length, buffer}}
) do
{buffer_size, buffer} =
{buffer_length, buffer} =
if length(compressed) + buffer_length >= @buffer_capacity do
# Flush existing buffer if the new metrics push us over the buffer capacity
flush(project_id, resource, labels, {buffer_length, buffer})
{0, %{}}
else
# Otherwise, just append the new metrics to the existing buffer
{buffer_length, buffer}
end
{buffer_length, buffer} =
Enum.reduce(
compressed,
{buffer_size, buffer},
fn {schema, name, tags, at, measurement, unit}, {buffer_size, buffer} ->
{buffer_length, buffer},
fn {schema, name, tags, at, measurement, unit}, {buffer_length, buffer} ->
{increment, buffer} =
Map.get_and_update(buffer, {schema, name, tags, unit}, fn prev_value ->
buffer(schema, prev_value, at, measurement)
end)
{buffer_size + increment, buffer}
{buffer_length + increment, buffer}
end
)
{buffer_size, buffer} =
if buffer_size >= @buffer_size do
# Flush immediately if the buffer is full
flush(project_id, resource, labels, {buffer_size, buffer})
else
{buffer_size, buffer}
end
{:noreply, {events, project_id, {resource, labels}, {buffer_size, buffer}}}
{:noreply, {events, project_id, {resource, labels}, {buffer_length, buffer}}}
end
def handle_info(:flush, {events, project_id, {resource, labels}, {buffer_size, buffer}}) do
{buffer_size, buffer} =
def handle_info(:flush, {events, project_id, {resource, labels}, {buffer_length, buffer}}) do
{buffer_length, buffer} =
if all_intervals_greater_than_5s?(buffer) do
flush(project_id, resource, labels, {buffer_size, buffer})
flush(project_id, resource, labels, {buffer_length, buffer})
else
{buffer_size, buffer}
{buffer_length, buffer}
end
# Prevent starvation by introducing a randomized 2s jitter
# Stagger API calls
jitter = :rand.uniform(2000)
Process.send_after(self(), :flush, @flush_timer + jitter)
{:noreply, {events, project_id, {resource, labels}, {buffer_size, buffer}}}
{:noreply, {events, project_id, {resource, labels}, {buffer_length, buffer}}}
end
# Google Cloud Monitoring API does not support sampling intervals shorter than 5 seconds
@@ -270,7 +273,7 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetrics do
project_id,
resource,
labels,
{buffer_size, buffer}
{buffer_length, buffer}
) do
time_series =
buffer
@@ -287,10 +290,10 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetrics do
Logger.warning("Failed to send metrics to Google Cloud Monitoring API",
reason: inspect(reason),
time_series: inspect(time_series),
count: buffer_size
count: buffer_length
)
{buffer_size, buffer}
{buffer_length, buffer}
end
end

View File

@@ -457,7 +457,7 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetricsTest do
}
end
test "submits the metrics to Google Cloud when buffer is filled" do
test "submits the metrics to Google Cloud when incoming metrics surpass buffer length" do
Bypass.open()
|> GoogleCloudPlatform.mock_instance_metadata_token_endpoint()
|> GoogleCloudPlatform.mock_metrics_submit_endpoint()
@@ -465,8 +465,9 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetricsTest do
now = DateTime.utc_now()
tags = {%{type: "test"}, %{app: "myapp"}}
# Send 199 metrics
{_, _, _, {buffer_size, buffer}} =
Enum.reduce(1..201, {[], "proj", tags, {0, %{}}}, fn i, state ->
Enum.reduce(1..199, {[], "proj", tags, {0, %{}}}, fn i, state ->
{:noreply, state} =
handle_info(
{:compressed_metrics,
@@ -477,14 +478,22 @@ defmodule Domain.Telemetry.Reporter.GoogleCloudMetricsTest do
state
end)
assert buffer_size == 199
refute_receive {:bypass_request, _conn, _body}
# Send the 200th metric, which should trigger the flush
{:noreply, {_, _, _, {buffer_size, buffer}}} =
handle_info(
{:compressed_metrics,
[{Telemetry.Metrics.Counter, [:foo, 200], %{}, now, 200, :request}]},
{[], "proj", tags, {buffer_size, buffer}}
)
assert buffer == %{{Telemetry.Metrics.Counter, [:foo, 200], %{}, :request} => {now, now, 1}}
assert buffer_size == 1
assert buffer == %{
{Telemetry.Metrics.Counter, [:foo, 201], %{}, :request} => {now, now, 1}
}
assert_receive {:bypass_request, _conn, %{"timeSeries" => time_series}}
assert length(time_series) == 200
assert length(time_series) == 199
end
end
end