Files
firezone/rust/telemetry/src/lib.rs
Thomas Eizinger 43db1e63e2 chore(telemetry): rate limit identical events to 1 per 5min (#9551)
It is in the nature of our application that errors may occur in rapid
succession if anything in the packet processing path fails. Most of the
time, these repeated errors don't add any additional information so
reporting one of them to Sentry is more than enough.

To achieve this, we add a `before_send` callback that utilizes a
concurrent cache with an upper bound of 10000 items and a TTL of 5
minutes. In other words, if we have submitted an event to Sentry that
had the exact same message in the last 5 minutes, we will not send it.

Internally, `moka` uses a concurrent hash map and therefore, the key is
hashed and not actually stored. Hash codes are u64, meaning the memory
footprint of this cache is only ~ 64kb (not accounting for constant
overhead of the cache internals).
2025-06-17 16:48:48 +00:00

303 lines
9.2 KiB
Rust

#![cfg_attr(test, allow(clippy::unwrap_used))]
use std::{borrow::Cow, fmt, str::FromStr, sync::Arc, time::Duration};
use anyhow::{Ok, Result, bail};
use sentry::{
BeforeCallback,
protocol::{Event, SessionStatus},
};
pub mod analytics;
pub mod feature_flags;
pub mod otel;
mod posthog;
pub struct Dsn(&'static str);
// TODO: Dynamic DSN
// Sentry docs say this does not need to be protected:
// > DSNs are safe to keep public because they only allow submission of new events and related event data; they do not allow read access to any information.
// <https://docs.sentry.io/concepts/key-terms/dsn-explainer/#dsn-utilization>
pub const ANDROID_DSN: Dsn = Dsn(
"https://928a6ee1f6af9734100b8bc89b2dc87d@o4507971108339712.ingest.us.sentry.io/4508175126233088",
);
pub const APPLE_DSN: Dsn = Dsn(
"https://66c71f83675f01abfffa8eb977bcbbf7@o4507971108339712.ingest.us.sentry.io/4508175177023488",
);
pub const GATEWAY_DSN: Dsn = Dsn(
"https://f763102cc3937199ec483fbdae63dfdc@o4507971108339712.ingest.us.sentry.io/4508162914451456",
);
pub const GUI_DSN: Dsn = Dsn(
"https://2e17bf5ed24a78c0ac9e84a5de2bd6fc@o4507971108339712.ingest.us.sentry.io/4508008945549312",
);
pub const HEADLESS_DSN: Dsn = Dsn(
"https://bc27dca8bb37be0142c48c4f89647c13@o4507971108339712.ingest.us.sentry.io/4508010028728320",
);
pub const RELAY_DSN: Dsn = Dsn(
"https://9d5f664d8f8f7f1716d4b63a58bcafd5@o4507971108339712.ingest.us.sentry.io/4508373298970624",
);
pub const TESTING: Dsn = Dsn(
"https://55ef451fca9054179a11f5d132c02f45@o4507971108339712.ingest.us.sentry.io/4508792604852224",
);
#[derive(Debug, PartialEq, Clone, Copy)]
pub(crate) enum Env {
Production,
Staging,
DockerCompose,
Localhost,
OnPrem,
}
impl Env {
pub(crate) fn from_api_url(api_url: &str) -> Self {
match api_url.trim_end_matches('/') {
"wss://api.firezone.dev" => Self::Production,
"wss://api.firez.one" => Self::Staging,
"ws://api:8081" => Self::DockerCompose,
"ws://localhost:8081" => Self::DockerCompose,
_ => Self::OnPrem,
}
}
pub(crate) fn as_str(&self) -> &'static str {
match self {
Env::Production => "production",
Env::Staging => "staging",
Env::DockerCompose => "docker-compose",
Env::Localhost => "localhost",
Env::OnPrem => "on-prem",
}
}
}
impl FromStr for Env {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"production" => Ok(Self::Production),
"staging" => Ok(Self::Staging),
"docker-compose" => Ok(Self::DockerCompose),
"localhost" => Ok(Self::Localhost),
"on-prem" => Ok(Self::OnPrem),
other => bail!("Unknown env `{other}`"),
}
}
}
impl fmt::Display for Env {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.as_str().fmt(f)
}
}
#[derive(Default)]
pub struct Telemetry {
inner: Option<sentry::ClientInitGuard>,
}
impl Drop for Telemetry {
fn drop(&mut self) {
if self.inner.is_none() {
return;
}
// Conclude telemetry session as "abnormal" if we get dropped without closing it properly first.
sentry::end_session_with_status(SessionStatus::Abnormal);
}
}
impl Telemetry {
pub fn start(&mut self, api_url: &str, release: &str, dsn: Dsn) {
// Can't use URLs as `environment` directly, because Sentry doesn't allow slashes in environments.
// <https://docs.sentry.io/platforms/rust/configuration/environments/>
let environment = Env::from_api_url(api_url);
if self
.inner
.as_ref()
.and_then(|i| i.options().environment.as_ref())
.is_some_and(|env| env == environment.as_str())
{
tracing::debug!(%environment, "Telemetry already initialised");
return;
}
// Stop any previous telemetry session.
if let Some(inner) = self.inner.take() {
tracing::debug!("Stopping previous telemetry session");
sentry::end_session();
drop(inner);
set_current_user(None);
}
if environment == Env::OnPrem {
tracing::debug!(%api_url, "Telemetry won't start in unofficial environment");
return;
}
tracing::info!(%environment, "Starting telemetry");
let inner = sentry::init((
dsn.0,
sentry::ClientOptions {
environment: Some(Cow::Borrowed(environment.as_str())),
// We can't get the release number ourselves because we don't know if we're embedded in a GUI Client or a Headless Client.
release: Some(release.to_owned().into()),
traces_sampler: Some(Arc::new(|tx| {
// Only submit `telemetry` spans to Sentry.
// Those get sampled at creation time (to save CPU power) so we want to submit all of them.
if tx.name() == "telemetry" { 1.0 } else { 0.0 }
})),
max_breadcrumbs: 500,
before_send: Some(event_rate_limiter(Duration::from_secs(60 * 5))),
..Default::default()
},
));
// Configure scope on the main hub so that all threads will get the tags
sentry::Hub::main().configure_scope(|scope| {
scope.set_tag("api_url", api_url);
let ctx = sentry::integrations::contexts::utils::device_context();
scope.set_context("device", ctx);
let ctx = sentry::integrations::contexts::utils::rust_context();
scope.set_context("rust", ctx);
if let Some(ctx) = sentry::integrations::contexts::utils::os_context() {
scope.set_context("os", ctx);
}
});
self.inner.replace(inner);
sentry::start_session();
}
/// Flushes events to sentry.io and drops the guard
pub async fn stop(&mut self) {
self.end_session(SessionStatus::Exited).await;
}
pub async fn stop_on_crash(&mut self) {
self.end_session(SessionStatus::Crashed).await;
}
async fn end_session(&mut self, status: SessionStatus) {
let Some(inner) = self.inner.take() else {
return;
};
tracing::info!("Stopping telemetry");
// Sentry uses blocking IO for flushing ..
let _ = tokio::task::spawn_blocking(move || {
if !inner.flush(Some(Duration::from_secs(5))) {
tracing::error!("Failed to flush telemetry events to sentry.io");
return;
};
tracing::debug!("Flushed telemetry");
})
.await;
sentry::end_session_with_status(status);
}
pub fn set_account_slug(slug: String) {
update_user(|user| {
user.other.insert("account_slug".to_owned(), slug.into());
});
}
pub fn set_firezone_id(id: String) {
update_user({
let id = id.clone();
|user| user.id = Some(id)
});
let Some(client) = sentry::Hub::main().client() else {
return;
};
let Some(env) = client.options().environment.as_ref() else {
return; // Nothing to do if we don't have an environment set.
};
feature_flags::reevaluate(id, env);
}
}
fn event_rate_limiter(timeout: Duration) -> BeforeCallback<Event<'static>> {
let cache = moka::sync::CacheBuilder::<String, (), _>::default()
.max_capacity(10_000)
.time_to_live(timeout)
.build();
Arc::new(move |event: Event<'static>| {
let Some(message) = &event.message else {
return Some(event);
};
if cache.contains_key(message) {
return None;
}
cache.insert(message.clone(), ());
Some(event)
})
}
fn update_user(update: impl FnOnce(&mut sentry::User)) {
sentry::Hub::main().configure_scope(|scope| {
let mut user = scope.user().cloned().unwrap_or_default();
update(&mut user);
scope.set_user(Some(user));
});
}
fn set_current_user(user: Option<sentry::User>) {
sentry::Hub::main().configure_scope(|scope| scope.set_user(user));
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn starting_session_for_unsupported_env_disables_current_one() {
let mut telemetry = Telemetry::default();
telemetry.start("wss://api.firez.one", "1.0.0", TESTING);
telemetry.start("wss://example.com", "1.0.0", TESTING);
assert!(telemetry.inner.is_none());
}
#[test]
fn rate_limits_events_with_same_message() {
let before_send = event_rate_limiter(Duration::from_secs(1));
let event1 = event("foo");
let event2 = event("bar");
assert!(before_send(event1.clone()).is_some());
assert!(before_send(event2.clone()).is_some());
assert!(before_send(event1.clone()).is_none());
std::thread::sleep(Duration::from_secs(1));
assert!(before_send(event1.clone()).is_some());
}
fn event(msg: &str) -> Event<'static> {
Event {
message: Some(msg.to_owned()),
..Default::default()
}
}
}