From 43db1e63e2baec4e030cb613d8ba9089fdcecad9 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 17 Jun 2025 18:48:48 +0200 Subject: [PATCH] chore(telemetry): rate limit identical events to 1 per 5min (#9551) It is in the nature of our application that errors may occur in rapid succession if anything in the packet processing path fails. Most of the time, these repeated errors don't add any additional information so reporting one of them to Sentry is more than enough. To achieve this, we add a `before_send` callback that utilizes a concurrent cache with an upper bound of 10000 items and a TTL of 5 minutes. In other words, if we have submitted an event to Sentry that had the exact same message in the last 5 minutes, we will not send it. Internally, `moka` uses a concurrent hash map and therefore, the key is hashed and not actually stored. Hash codes are u64, meaning the memory footprint of this cache is only ~ 64kb (not accounting for constant overhead of the cache internals). --- rust/Cargo.lock | 1 + rust/telemetry/Cargo.toml | 1 + rust/telemetry/src/lib.rs | 52 +++++++++++++++++++++++++++++++++++++-- 3 files changed, 52 insertions(+), 2 deletions(-) diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 1e522349d..18a650ef4 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -2520,6 +2520,7 @@ version = "0.1.0" dependencies = [ "anyhow", "ip-packet", + "moka", "opentelemetry", "opentelemetry_sdk", "parking_lot", diff --git a/rust/telemetry/Cargo.toml b/rust/telemetry/Cargo.toml index 5cd711156..9f61cdb90 100644 --- a/rust/telemetry/Cargo.toml +++ b/rust/telemetry/Cargo.toml @@ -7,6 +7,7 @@ license = { workspace = true } [dependencies] anyhow = { workspace = true } ip-packet = { workspace = true } +moka = { workspace = true, features = ["sync"] } opentelemetry = { workspace = true } opentelemetry_sdk = { workspace = true } parking_lot = { workspace = true } diff --git a/rust/telemetry/src/lib.rs b/rust/telemetry/src/lib.rs index 5ca65f94f..d74f58435 100644 --- a/rust/telemetry/src/lib.rs +++ b/rust/telemetry/src/lib.rs @@ -2,8 +2,11 @@ use std::{borrow::Cow, fmt, str::FromStr, sync::Arc, time::Duration}; -use anyhow::bail; -use sentry::protocol::SessionStatus; +use anyhow::{Ok, Result, bail}; +use sentry::{ + BeforeCallback, + protocol::{Event, SessionStatus}, +}; pub mod analytics; pub mod feature_flags; @@ -154,6 +157,7 @@ impl Telemetry { if tx.name() == "telemetry" { 1.0 } else { 0.0 } })), max_breadcrumbs: 500, + before_send: Some(event_rate_limiter(Duration::from_secs(60 * 5))), ..Default::default() }, )); @@ -226,6 +230,27 @@ impl Telemetry { } } +fn event_rate_limiter(timeout: Duration) -> BeforeCallback> { + let cache = moka::sync::CacheBuilder::::default() + .max_capacity(10_000) + .time_to_live(timeout) + .build(); + + Arc::new(move |event: Event<'static>| { + let Some(message) = &event.message else { + return Some(event); + }; + + if cache.contains_key(message) { + return None; + } + + cache.insert(message.clone(), ()); + + Some(event) + }) +} + fn update_user(update: impl FnOnce(&mut sentry::User)) { sentry::Hub::main().configure_scope(|scope| { let mut user = scope.user().cloned().unwrap_or_default(); @@ -251,4 +276,27 @@ mod tests { assert!(telemetry.inner.is_none()); } + + #[test] + fn rate_limits_events_with_same_message() { + let before_send = event_rate_limiter(Duration::from_secs(1)); + + let event1 = event("foo"); + let event2 = event("bar"); + + assert!(before_send(event1.clone()).is_some()); + assert!(before_send(event2.clone()).is_some()); + assert!(before_send(event1.clone()).is_none()); + + std::thread::sleep(Duration::from_secs(1)); + + assert!(before_send(event1.clone()).is_some()); + } + + fn event(msg: &str) -> Event<'static> { + Event { + message: Some(msg.to_owned()), + ..Default::default() + } + } }