From ea6f1ce1454f1ce94932653f04c8274c6a3bf027 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Sat, 2 Aug 2025 10:23:35 +0000 Subject: [PATCH] chore(telemetry): allow to dynamically change the log filter (#10065) In addition to sending true/false for a feature-flag, PostHog also allows us to send a payload with them. We can use this to carry the log-filter we'd like to stream logs for. With this, we can dynamically change which logs we are getting forwarded to Sentry. Unfortunately, this cannot be done on a per-user basis, meaning we will always have the same log filter for all users where the feature-flag is enabled. --- rust/Cargo.lock | 1 + rust/logging/src/lib.rs | 4 +- rust/telemetry/Cargo.toml | 1 + rust/telemetry/src/feature_flags.rs | 115 ++++++++++++++++++++++++---- 4 files changed, 103 insertions(+), 18 deletions(-) diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 2af2e97ce..c5a2aaca7 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -2619,6 +2619,7 @@ dependencies = [ "thiserror 2.0.12", "tokio", "tracing", + "tracing-subscriber", "uuid", ] diff --git a/rust/logging/src/lib.rs b/rust/logging/src/lib.rs index dff679e50..3be7620d3 100644 --- a/rust/logging/src/lib.rs +++ b/rust/logging/src/lib.rs @@ -210,10 +210,10 @@ where let mut event_filter = match *md.level() { Level::ERROR | Level::WARN => EventFilter::Event | EventFilter::Breadcrumb, Level::INFO | Level::DEBUG => EventFilter::Breadcrumb, - _ => return EventFilter::Ignore, + Level::TRACE => EventFilter::Ignore, }; - if feature_flags::stream_logs() { + if feature_flags::stream_logs(md) { event_filter |= EventFilter::Log } diff --git a/rust/telemetry/Cargo.toml b/rust/telemetry/Cargo.toml index 9902b2db0..47b432de5 100644 --- a/rust/telemetry/Cargo.toml +++ b/rust/telemetry/Cargo.toml @@ -20,6 +20,7 @@ serde_json = { workspace = true } sha2 = { workspace = true } tokio = { workspace = true, features = ["rt", "rt-multi-thread"] } tracing = { workspace = true } +tracing-subscriber = { workspace = true, features = ["env-filter"] } uuid = { workspace = true } [dev-dependencies] diff --git a/rust/telemetry/src/feature_flags.rs b/rust/telemetry/src/feature_flags.rs index e3b9be0a4..e75907fc0 100644 --- a/rust/telemetry/src/feature_flags.rs +++ b/rust/telemetry/src/feature_flags.rs @@ -1,4 +1,5 @@ use std::{ + fmt, str::FromStr as _, sync::{ LazyLock, @@ -8,8 +9,11 @@ use std::{ }; use anyhow::{Context as _, Result, bail}; +use parking_lot::RwLock; use serde::{Deserialize, Serialize}; use sha2::Digest as _; +use tracing::{Metadata, level_filters::LevelFilter}; +use tracing_subscriber::filter::Targets; use crate::{ Env, @@ -31,8 +35,8 @@ pub fn drop_llmnr_nxdomain_responses() -> bool { FEATURE_FLAGS.drop_llmnr_nxdomain_responses() } -pub fn stream_logs() -> bool { - FEATURE_FLAGS.stream_logs() +pub fn stream_logs(metadata: &Metadata<'_>) -> bool { + FEATURE_FLAGS.stream_logs(metadata) } pub fn map_enobufs_to_would_block() -> bool { @@ -54,12 +58,12 @@ pub(crate) async fn evaluate_now(user_id: String, env: Env) { Env::OnPrem | Env::DockerCompose | Env::Localhost => return, }; - let flags = decide(user_id, api_key.to_owned()) + let (flags, payloads) = decide(user_id, api_key.to_owned()) .await .inspect_err(|e| tracing::debug!("Failed to evaluate feature flags: {e:#}")) .unwrap_or_default(); - FEATURE_FLAGS.update(flags); + FEATURE_FLAGS.update(flags, payloads); sentry::Hub::main().configure_scope(|scope| { scope.set_context("flags", sentry_flag_context(flags)); @@ -98,7 +102,10 @@ pub(crate) async fn reeval_timer() { } } -async fn decide(maybe_legacy_id: String, api_key: String) -> Result { +async fn decide( + maybe_legacy_id: String, + api_key: String, +) -> Result<(FeatureFlagsResponse, FeatureFlagPayloadsResponse)> { let distinct_id = if uuid::Uuid::from_str(&maybe_legacy_id).is_ok() { hex::encode(sha2::Sha256::digest(&maybe_legacy_id)) } else { @@ -118,19 +125,19 @@ async fn decide(maybe_legacy_id: String, api_key: String) -> Result() - .await - .context("Failed to deserialize response")?; + let decide_response = + serde_json::from_str::(&body).context("Failed to deserialize response")?; - Ok(json.feature_flags) + Ok(( + decide_response.feature_flags, + decide_response.feature_flag_payloads, + )) } #[derive(Debug, Serialize)] @@ -143,6 +150,7 @@ struct DecideRequest { #[serde(rename_all = "camelCase")] struct DecideResponse { feature_flags: FeatureFlagsResponse, + feature_flag_payloads: FeatureFlagPayloadsResponse, } #[derive(Debug, Deserialize, Default, Clone, Copy)] @@ -158,11 +166,18 @@ struct FeatureFlagsResponse { map_enobufs_to_wouldblock: bool, } +#[derive(Debug, Deserialize, Default, Clone)] +#[serde(rename_all = "kebab-case")] +struct FeatureFlagPayloadsResponse { + #[serde(default)] + stream_logs: String, +} + #[derive(Debug, Default)] struct FeatureFlags { icmp_unreachable_instead_of_nat64: AtomicBool, drop_llmnr_nxdomain_responses: AtomicBool, - stream_logs: AtomicBool, + stream_logs: RwLock, map_enobufs_to_wouldblock: AtomicBool, } @@ -181,14 +196,22 @@ impl FeatureFlags { stream_logs, map_enobufs_to_wouldblock, }: FeatureFlagsResponse, + payloads: FeatureFlagPayloadsResponse, ) { self.icmp_unreachable_instead_of_nat64 .store(icmp_unreachable_instead_of_nat64, Ordering::Relaxed); self.drop_llmnr_nxdomain_responses .store(drop_llmnr_nxdomain_responses, Ordering::Relaxed); - self.stream_logs.store(stream_logs, Ordering::Relaxed); self.map_enobufs_to_wouldblock .store(map_enobufs_to_wouldblock, Ordering::Relaxed); + + let log_filter = if stream_logs { + LogFilter::parse(payloads.stream_logs) + } else { + LogFilter::default() + }; + + *self.stream_logs.write() = log_filter; } fn icmp_unreachable_instead_of_nat64(&self) -> bool { @@ -200,8 +223,8 @@ impl FeatureFlags { self.drop_llmnr_nxdomain_responses.load(Ordering::Relaxed) } - fn stream_logs(&self) -> bool { - self.stream_logs.load(Ordering::Relaxed) + fn stream_logs(&self, metadata: &Metadata<'_>) -> bool { + self.stream_logs.read().enabled(metadata) } fn map_enobufs_to_wouldblock(&self) -> bool { @@ -240,3 +263,63 @@ fn sentry_flag_context(flags: FeatureFlagsResponse) -> sentry::protocol::Context sentry::protocol::Context::Other(serde_json::from_value(value).expect("to and from json works")) } + +struct LogFilter { + directives: String, + targets: Targets, +} + +impl Default for LogFilter { + fn default() -> Self { + Self { + directives: String::default(), + targets: Targets::new(), + } + } +} + +impl LogFilter { + fn parse(directives: String) -> Self { + let directives = match serde_json::from_str::(&directives) { + Ok(directives) => directives, + Err(e) => { + tracing::debug!("Failed to parse directives from JSON: {e}"); + + String::from("debug") + } + }; + + let targets = Targets::from_str(&directives).unwrap_or_else(|e| { + tracing::debug!(%directives, "Failed to parse env-filter: {e}"); + + Targets::new().with_default(LevelFilter::DEBUG) + }); + + Self { + directives, + targets, + } + } + + fn enabled(&self, md: &Metadata<'_>) -> bool { + self.targets.would_enable(md.target(), md.level()) + } +} + +impl fmt::Debug for LogFilter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.directives.fmt(f) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn filter_parses_from_nested_json() { + let filter = LogFilter::parse("\"debug,str0m::ice_::pair=trace\"".to_owned()); + + assert_eq!(filter.directives, "debug,str0m::ice_::pair=trace"); + } +}