chore(telemetry): allow to dynamically change the log filter (#10065)

In addition to sending true/false for a feature-flag, PostHog also
allows us to send a payload with them. We can use this to carry the
log-filter we'd like to stream logs for. With this, we can dynamically
change which logs we are getting forwarded to Sentry.

Unfortunately, this cannot be done on a per-user basis, meaning we will
always have the same log filter for all users where the feature-flag is
enabled.
This commit is contained in:
Thomas Eizinger
2025-08-02 10:23:35 +00:00
committed by GitHub
parent cd177a6448
commit ea6f1ce145
4 changed files with 103 additions and 18 deletions

1
rust/Cargo.lock generated
View File

@@ -2619,6 +2619,7 @@ dependencies = [
"thiserror 2.0.12",
"tokio",
"tracing",
"tracing-subscriber",
"uuid",
]

View File

@@ -210,10 +210,10 @@ where
let mut event_filter = match *md.level() {
Level::ERROR | Level::WARN => EventFilter::Event | EventFilter::Breadcrumb,
Level::INFO | Level::DEBUG => EventFilter::Breadcrumb,
_ => return EventFilter::Ignore,
Level::TRACE => EventFilter::Ignore,
};
if feature_flags::stream_logs() {
if feature_flags::stream_logs(md) {
event_filter |= EventFilter::Log
}

View File

@@ -20,6 +20,7 @@ serde_json = { workspace = true }
sha2 = { workspace = true }
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
uuid = { workspace = true }
[dev-dependencies]

View File

@@ -1,4 +1,5 @@
use std::{
fmt,
str::FromStr as _,
sync::{
LazyLock,
@@ -8,8 +9,11 @@ use std::{
};
use anyhow::{Context as _, Result, bail};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use sha2::Digest as _;
use tracing::{Metadata, level_filters::LevelFilter};
use tracing_subscriber::filter::Targets;
use crate::{
Env,
@@ -31,8 +35,8 @@ pub fn drop_llmnr_nxdomain_responses() -> bool {
FEATURE_FLAGS.drop_llmnr_nxdomain_responses()
}
pub fn stream_logs() -> bool {
FEATURE_FLAGS.stream_logs()
pub fn stream_logs(metadata: &Metadata<'_>) -> bool {
FEATURE_FLAGS.stream_logs(metadata)
}
pub fn map_enobufs_to_would_block() -> bool {
@@ -54,12 +58,12 @@ pub(crate) async fn evaluate_now(user_id: String, env: Env) {
Env::OnPrem | Env::DockerCompose | Env::Localhost => return,
};
let flags = decide(user_id, api_key.to_owned())
let (flags, payloads) = decide(user_id, api_key.to_owned())
.await
.inspect_err(|e| tracing::debug!("Failed to evaluate feature flags: {e:#}"))
.unwrap_or_default();
FEATURE_FLAGS.update(flags);
FEATURE_FLAGS.update(flags, payloads);
sentry::Hub::main().configure_scope(|scope| {
scope.set_context("flags", sentry_flag_context(flags));
@@ -98,7 +102,10 @@ pub(crate) async fn reeval_timer() {
}
}
async fn decide(maybe_legacy_id: String, api_key: String) -> Result<FeatureFlagsResponse> {
async fn decide(
maybe_legacy_id: String,
api_key: String,
) -> Result<(FeatureFlagsResponse, FeatureFlagPayloadsResponse)> {
let distinct_id = if uuid::Uuid::from_str(&maybe_legacy_id).is_ok() {
hex::encode(sha2::Sha256::digest(&maybe_legacy_id))
} else {
@@ -118,19 +125,19 @@ async fn decide(maybe_legacy_id: String, api_key: String) -> Result<FeatureFlags
.context("Failed to send POST request")?;
let status = response.status();
let body = response.text().await.unwrap_or_default();
if !status.is_success() {
let body = response.text().await.unwrap_or_default();
bail!("Failed to get feature flags; status={status}, body={body}")
}
let json = response
.json::<DecideResponse>()
.await
.context("Failed to deserialize response")?;
let decide_response =
serde_json::from_str::<DecideResponse>(&body).context("Failed to deserialize response")?;
Ok(json.feature_flags)
Ok((
decide_response.feature_flags,
decide_response.feature_flag_payloads,
))
}
#[derive(Debug, Serialize)]
@@ -143,6 +150,7 @@ struct DecideRequest {
#[serde(rename_all = "camelCase")]
struct DecideResponse {
feature_flags: FeatureFlagsResponse,
feature_flag_payloads: FeatureFlagPayloadsResponse,
}
#[derive(Debug, Deserialize, Default, Clone, Copy)]
@@ -158,11 +166,18 @@ struct FeatureFlagsResponse {
map_enobufs_to_wouldblock: bool,
}
#[derive(Debug, Deserialize, Default, Clone)]
#[serde(rename_all = "kebab-case")]
struct FeatureFlagPayloadsResponse {
#[serde(default)]
stream_logs: String,
}
#[derive(Debug, Default)]
struct FeatureFlags {
icmp_unreachable_instead_of_nat64: AtomicBool,
drop_llmnr_nxdomain_responses: AtomicBool,
stream_logs: AtomicBool,
stream_logs: RwLock<LogFilter>,
map_enobufs_to_wouldblock: AtomicBool,
}
@@ -181,14 +196,22 @@ impl FeatureFlags {
stream_logs,
map_enobufs_to_wouldblock,
}: FeatureFlagsResponse,
payloads: FeatureFlagPayloadsResponse,
) {
self.icmp_unreachable_instead_of_nat64
.store(icmp_unreachable_instead_of_nat64, Ordering::Relaxed);
self.drop_llmnr_nxdomain_responses
.store(drop_llmnr_nxdomain_responses, Ordering::Relaxed);
self.stream_logs.store(stream_logs, Ordering::Relaxed);
self.map_enobufs_to_wouldblock
.store(map_enobufs_to_wouldblock, Ordering::Relaxed);
let log_filter = if stream_logs {
LogFilter::parse(payloads.stream_logs)
} else {
LogFilter::default()
};
*self.stream_logs.write() = log_filter;
}
fn icmp_unreachable_instead_of_nat64(&self) -> bool {
@@ -200,8 +223,8 @@ impl FeatureFlags {
self.drop_llmnr_nxdomain_responses.load(Ordering::Relaxed)
}
fn stream_logs(&self) -> bool {
self.stream_logs.load(Ordering::Relaxed)
fn stream_logs(&self, metadata: &Metadata<'_>) -> bool {
self.stream_logs.read().enabled(metadata)
}
fn map_enobufs_to_wouldblock(&self) -> bool {
@@ -240,3 +263,63 @@ fn sentry_flag_context(flags: FeatureFlagsResponse) -> sentry::protocol::Context
sentry::protocol::Context::Other(serde_json::from_value(value).expect("to and from json works"))
}
struct LogFilter {
directives: String,
targets: Targets,
}
impl Default for LogFilter {
fn default() -> Self {
Self {
directives: String::default(),
targets: Targets::new(),
}
}
}
impl LogFilter {
fn parse(directives: String) -> Self {
let directives = match serde_json::from_str::<String>(&directives) {
Ok(directives) => directives,
Err(e) => {
tracing::debug!("Failed to parse directives from JSON: {e}");
String::from("debug")
}
};
let targets = Targets::from_str(&directives).unwrap_or_else(|e| {
tracing::debug!(%directives, "Failed to parse env-filter: {e}");
Targets::new().with_default(LevelFilter::DEBUG)
});
Self {
directives,
targets,
}
}
fn enabled(&self, md: &Metadata<'_>) -> bool {
self.targets.would_enable(md.target(), md.level())
}
}
impl fmt::Debug for LogFilter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.directives.fmt(f)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn filter_parses_from_nested_json() {
let filter = LogFilter::parse("\"debug,str0m::ice_::pair=trace\"".to_owned());
assert_eq!(filter.directives, "debug,str0m::ice_::pair=trace");
}
}