From e54a7c2d64371a76166bf93771313db5645bdb72 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 18 Mar 2025 10:50:54 +1100 Subject: [PATCH] feat(connlib): regularly evaluate feature flags (#8467) In order to be able to dynamically configure long-running applications such as the Gateway via feature-flags, we need to regularly re-evaluate them by sending another POST request to the `/decide` endpoint. To do this without impacting anything else, we create a separate runtime that is lazily initialised on first access and use that to run the async code for connecting to the PostHog service. In addition to that, we also spawn a task that re-evaluates the feature flags for the currently set user in the Sentry context every 5 minutes. Resolves: #8454 --------- Signed-off-by: Thomas Eizinger Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- rust/telemetry/Cargo.toml | 4 +- rust/telemetry/src/feature_flags.rs | 144 ++++++++++++++++++++++++++++ rust/telemetry/src/lib.rs | 120 +---------------------- 3 files changed, 150 insertions(+), 118 deletions(-) create mode 100644 rust/telemetry/src/feature_flags.rs diff --git a/rust/telemetry/Cargo.toml b/rust/telemetry/Cargo.toml index 8788fd8b8..40c3da34c 100644 --- a/rust/telemetry/Cargo.toml +++ b/rust/telemetry/Cargo.toml @@ -7,11 +7,11 @@ license = { workspace = true } [dependencies] anyhow = { workspace = true } parking_lot = { workspace = true } -reqwest = { workspace = true, features = ["blocking"] } +reqwest = { workspace = true } sentry = { workspace = true, features = ["contexts", "backtrace", "debug-images", "panic", "reqwest", "rustls", "tracing"] } serde = { workspace = true } serde_json = { workspace = true } -tokio = { workspace = true, features = ["rt"] } +tokio = { workspace = true, features = ["rt", "rt-multi-thread"] } tracing = { workspace = true } [dev-dependencies] diff --git a/rust/telemetry/src/feature_flags.rs b/rust/telemetry/src/feature_flags.rs new file mode 100644 index 000000000..126cb3da5 --- /dev/null +++ b/rust/telemetry/src/feature_flags.rs @@ -0,0 +1,144 @@ +use std::{sync::LazyLock, time::Duration}; + +use anyhow::{bail, Context as _, Result}; +use parking_lot::RwLock; +use serde::{Deserialize, Serialize}; +use tokio::runtime::Runtime; + +const POSTHOG_API_KEY: &str = "phc_uXXl56plyvIBHj81WwXBLtdPElIRbm7keRTdUCmk8ll"; +const RE_EVAL_DURATION: Duration = Duration::from_secs(5 * 60); + +static RUNTIME: LazyLock = LazyLock::new(init_runtime); + +// Process-wide storage of enabled feature flags. +// +// Defaults to everything off. +static FEATURE_FLAGS: LazyLock> = LazyLock::new(RwLock::default); + +pub fn icmp_unreachable_instead_of_nat64() -> bool { + FEATURE_FLAGS.read().icmp_unreachable_instead_of_nat64 +} + +pub fn drop_llmnr_nxdomain_responses() -> bool { + FEATURE_FLAGS.read().drop_llmnr_nxdomain_responses +} + +pub(crate) fn reevaluate(user_id: String) { + RUNTIME.spawn(async move { + let flags = decide(user_id) + .await + .inspect_err(|e| tracing::debug!("Failed to evaluate feature flags: {e:#}")) + .unwrap_or_default(); + + tracing::debug!(?flags, "Evaluated feature-flags"); + + *FEATURE_FLAGS.write() = flags; + + sentry::Hub::main().configure_scope(|scope| { + scope.set_context("flags", sentry_flag_context(flags)); + }); + }); +} + +/// Initialize the runtime to use for evaluating feature flags. +fn init_runtime() -> Runtime { + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) // We only need 1 worker thread. + .thread_name("feature-flag-worker") + .enable_io() + .enable_time() + .build() + .expect("to be able to build runtime"); + + runtime.spawn(async move { + loop { + tokio::time::sleep(RE_EVAL_DURATION).await; + + let Some(user_id) = sentry::Hub::main() + .configure_scope(|scope| scope.user().and_then(|u| u.id.clone())) + else { + continue; // Nothing to do if we don't have a user-id set. + }; + + reevaluate(user_id); + } + }); + + runtime +} + +async fn decide(distinct_id: String) -> Result { + let response = reqwest::ClientBuilder::new() + .connection_verbose(true) + .build()? + .post("https://us.i.posthog.com/decide?v=3") + .json(&DecideRequest { + api_key: POSTHOG_API_KEY.to_string(), + distinct_id, + }) + .send() + .await + .context("Failed to send POST request")?; + + let status = response.status(); + + if !status.is_success() { + let body = response.text().await.unwrap_or_default(); + + bail!("Failed to get feature flags; status={status}, body={body}") + } + + let json = response + .json::() + .await + .context("Failed to deserialize response")?; + + Ok(json.feature_flags) +} + +#[derive(Debug, Serialize)] +struct DecideRequest { + api_key: String, + distinct_id: String, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct DecideResponse { + feature_flags: FeatureFlags, +} + +#[derive(Debug, Deserialize, Default, Clone, Copy)] +#[serde(rename_all = "kebab-case")] +struct FeatureFlags { + #[serde(default)] + icmp_unreachable_instead_of_nat64: bool, + #[serde(default)] + drop_llmnr_nxdomain_responses: bool, +} + +fn sentry_flag_context(flags: FeatureFlags) -> sentry::protocol::Context { + #[derive(Debug, serde::Serialize)] + #[serde(tag = "flag", rename_all = "snake_case")] + enum SentryFlag { + IcmpUnreachableInsteadOfNat64 { result: bool }, + DropLlmnrNxdomainResponses { result: bool }, + } + + // Exhaustive destruction so we don't forget to update this when we add a flag. + let FeatureFlags { + icmp_unreachable_instead_of_nat64, + drop_llmnr_nxdomain_responses, + } = flags; + + let value = serde_json::json!({ + "values": [ + SentryFlag::IcmpUnreachableInsteadOfNat64 { + result: icmp_unreachable_instead_of_nat64, + }, + SentryFlag::DropLlmnrNxdomainResponses { result: drop_llmnr_nxdomain_responses } + ] + }); + + sentry::protocol::Context::Other(serde_json::from_value(value).expect("to and from json works")) +} diff --git a/rust/telemetry/src/lib.rs b/rust/telemetry/src/lib.rs index 3210e21e6..b12714672 100644 --- a/rust/telemetry/src/lib.rs +++ b/rust/telemetry/src/lib.rs @@ -1,15 +1,11 @@ #![cfg_attr(test, allow(clippy::unwrap_used))] -use std::{ - sync::{Arc, LazyLock}, - time::Duration, -}; +use std::{sync::Arc, time::Duration}; -use anyhow::{bail, Context, Result}; use env::ON_PREM; -use parking_lot::RwLock; use sentry::protocol::SessionStatus; -use serde::{Deserialize, Serialize}; + +pub mod feature_flags; pub struct Dsn(&'static str); @@ -26,28 +22,6 @@ pub const HEADLESS_DSN: Dsn = Dsn("https://bc27dca8bb37be0142c48c4f89647c13@o450 pub const RELAY_DSN: Dsn = Dsn("https://9d5f664d8f8f7f1716d4b63a58bcafd5@o4507971108339712.ingest.us.sentry.io/4508373298970624"); pub const TESTING: Dsn = Dsn("https://55ef451fca9054179a11f5d132c02f45@o4507971108339712.ingest.us.sentry.io/4508792604852224"); -const POSTHOG_API_KEY: &str = "phc_uXXl56plyvIBHj81WwXBLtdPElIRbm7keRTdUCmk8ll"; - -// Process-wide storage of enabled feature flags. -// -// Defaults to everything off. -static FEATURE_FLAGS: LazyLock> = LazyLock::new(RwLock::default); - -/// Exposes all feature flags as public, static functions. -/// -/// These only ever hit an in-memory location so can even be called from hot paths. -pub mod feature_flags { - use crate::*; - - pub fn icmp_unreachable_instead_of_nat64() -> bool { - FEATURE_FLAGS.read().icmp_unreachable_instead_of_nat64 - } - - pub fn drop_llmnr_nxdomain_responses() -> bool { - FEATURE_FLAGS.read().drop_llmnr_nxdomain_responses - } -} - mod env { use std::borrow::Cow; @@ -185,19 +159,7 @@ impl Telemetry { |user| user.id = Some(id) }); - std::thread::spawn(|| { - let flags = evaluate_feature_flags(id) - .inspect_err(|e| tracing::debug!("Failed to evaluate feature flags: {e:#}")) - .unwrap_or_default(); - - tracing::debug!(?flags, "Evaluated feature-flags"); - - *FEATURE_FLAGS.write() = flags; - - sentry::Hub::main().configure_scope(|scope| { - scope.set_context("flags", sentry_flag_context(flags)); - }); - }); + feature_flags::reevaluate(id); } } @@ -214,80 +176,6 @@ fn set_current_user(user: Option) { sentry::Hub::main().configure_scope(|scope| scope.set_user(user)); } -fn evaluate_feature_flags(distinct_id: String) -> Result { - let response = reqwest::blocking::ClientBuilder::new() - .connection_verbose(true) - .build()? - .post("https://us.i.posthog.com/decide?v=3") - .json(&DecideRequest { - api_key: POSTHOG_API_KEY.to_string(), - distinct_id, - }) - .send() - .context("Failed to send POST request")?; - - let status = response.status(); - - if !status.is_success() { - let body = response.text().unwrap_or_default(); - - bail!("Failed to get feature flags; status={status}, body={body}") - } - - let json = response - .json::() - .context("Failed to deserialize response")?; - - Ok(json.feature_flags) -} - -#[derive(Debug, Serialize)] -struct DecideRequest { - api_key: String, - distinct_id: String, -} - -#[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -struct DecideResponse { - feature_flags: FeatureFlags, -} - -#[derive(Debug, Deserialize, Default, Clone, Copy)] -#[serde(rename_all = "kebab-case")] -struct FeatureFlags { - #[serde(default)] - icmp_unreachable_instead_of_nat64: bool, - #[serde(default)] - drop_llmnr_nxdomain_responses: bool, -} - -fn sentry_flag_context(flags: FeatureFlags) -> sentry::protocol::Context { - #[derive(Debug, serde::Serialize)] - #[serde(tag = "flag", rename_all = "snake_case")] - enum SentryFlag { - IcmpUnreachableInsteadOfNat64 { result: bool }, - DropLlmnrNxdomainResponses { result: bool }, - } - - // Exhaustive destruction so we don't forget to update this when we add a flag. - let FeatureFlags { - icmp_unreachable_instead_of_nat64, - drop_llmnr_nxdomain_responses, - } = flags; - - let value = serde_json::json!({ - "values": [ - SentryFlag::IcmpUnreachableInsteadOfNat64 { - result: icmp_unreachable_instead_of_nat64, - }, - SentryFlag::DropLlmnrNxdomainResponses { result: drop_llmnr_nxdomain_responses } - ] - }); - - sentry::protocol::Context::Other(serde_json::from_value(value).expect("to and from json works")) -} - #[cfg(test)] mod tests { use super::*;