diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 1e522349d..18a650ef4 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -2520,6 +2520,7 @@ version = "0.1.0" dependencies = [ "anyhow", "ip-packet", + "moka", "opentelemetry", "opentelemetry_sdk", "parking_lot", diff --git a/rust/telemetry/Cargo.toml b/rust/telemetry/Cargo.toml index 5cd711156..9f61cdb90 100644 --- a/rust/telemetry/Cargo.toml +++ b/rust/telemetry/Cargo.toml @@ -7,6 +7,7 @@ license = { workspace = true } [dependencies] anyhow = { workspace = true } ip-packet = { workspace = true } +moka = { workspace = true, features = ["sync"] } opentelemetry = { workspace = true } opentelemetry_sdk = { workspace = true } parking_lot = { workspace = true } diff --git a/rust/telemetry/src/lib.rs b/rust/telemetry/src/lib.rs index 5ca65f94f..d74f58435 100644 --- a/rust/telemetry/src/lib.rs +++ b/rust/telemetry/src/lib.rs @@ -2,8 +2,11 @@ use std::{borrow::Cow, fmt, str::FromStr, sync::Arc, time::Duration}; -use anyhow::bail; -use sentry::protocol::SessionStatus; +use anyhow::{Ok, Result, bail}; +use sentry::{ + BeforeCallback, + protocol::{Event, SessionStatus}, +}; pub mod analytics; pub mod feature_flags; @@ -154,6 +157,7 @@ impl Telemetry { if tx.name() == "telemetry" { 1.0 } else { 0.0 } })), max_breadcrumbs: 500, + before_send: Some(event_rate_limiter(Duration::from_secs(60 * 5))), ..Default::default() }, )); @@ -226,6 +230,27 @@ impl Telemetry { } } +fn event_rate_limiter(timeout: Duration) -> BeforeCallback> { + let cache = moka::sync::CacheBuilder::::default() + .max_capacity(10_000) + .time_to_live(timeout) + .build(); + + Arc::new(move |event: Event<'static>| { + let Some(message) = &event.message else { + return Some(event); + }; + + if cache.contains_key(message) { + return None; + } + + cache.insert(message.clone(), ()); + + Some(event) + }) +} + fn update_user(update: impl FnOnce(&mut sentry::User)) { sentry::Hub::main().configure_scope(|scope| { let mut user = scope.user().cloned().unwrap_or_default(); @@ -251,4 +276,27 @@ mod tests { assert!(telemetry.inner.is_none()); } + + #[test] + fn rate_limits_events_with_same_message() { + let before_send = event_rate_limiter(Duration::from_secs(1)); + + let event1 = event("foo"); + let event2 = event("bar"); + + assert!(before_send(event1.clone()).is_some()); + assert!(before_send(event2.clone()).is_some()); + assert!(before_send(event1.clone()).is_none()); + + std::thread::sleep(Duration::from_secs(1)); + + assert!(before_send(event1.clone()).is_some()); + } + + fn event(msg: &str) -> Event<'static> { + Event { + message: Some(msg.to_owned()), + ..Default::default() + } + } }