feat(connlib): regularly evaluate feature flags (#8467)

In order to be able to dynamically configure long-running applications
such as the Gateway via feature-flags, we need to regularly re-evaluate
them by sending another POST request to the `/decide` endpoint.

To do this without impacting anything else, we create a separate runtime
that is lazily initialised on first access and use that to run the async
code for connecting to the PostHog service. In addition to that, we also
spawn a task that re-evaluates the feature flags for the currently set
user in the Sentry context every 5 minutes.

Resolves: #8454

---------

Signed-off-by: Thomas Eizinger <thomas@eizinger.io>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
Thomas Eizinger
2025-03-18 10:50:54 +11:00
committed by GitHub
parent 4ce2f160e3
commit e54a7c2d64
3 changed files with 150 additions and 118 deletions

View File

@@ -7,11 +7,11 @@ license = { workspace = true }
[dependencies]
anyhow = { workspace = true }
parking_lot = { workspace = true }
reqwest = { workspace = true, features = ["blocking"] }
reqwest = { workspace = true }
sentry = { workspace = true, features = ["contexts", "backtrace", "debug-images", "panic", "reqwest", "rustls", "tracing"] }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["rt"] }
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
tracing = { workspace = true }
[dev-dependencies]

View File

@@ -0,0 +1,144 @@
use std::{sync::LazyLock, time::Duration};
use anyhow::{bail, Context as _, Result};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use tokio::runtime::Runtime;
const POSTHOG_API_KEY: &str = "phc_uXXl56plyvIBHj81WwXBLtdPElIRbm7keRTdUCmk8ll";
const RE_EVAL_DURATION: Duration = Duration::from_secs(5 * 60);
static RUNTIME: LazyLock<Runtime> = LazyLock::new(init_runtime);
// Process-wide storage of enabled feature flags.
//
// Defaults to everything off.
static FEATURE_FLAGS: LazyLock<RwLock<FeatureFlags>> = LazyLock::new(RwLock::default);
pub fn icmp_unreachable_instead_of_nat64() -> bool {
FEATURE_FLAGS.read().icmp_unreachable_instead_of_nat64
}
pub fn drop_llmnr_nxdomain_responses() -> bool {
FEATURE_FLAGS.read().drop_llmnr_nxdomain_responses
}
pub(crate) fn reevaluate(user_id: String) {
RUNTIME.spawn(async move {
let flags = decide(user_id)
.await
.inspect_err(|e| tracing::debug!("Failed to evaluate feature flags: {e:#}"))
.unwrap_or_default();
tracing::debug!(?flags, "Evaluated feature-flags");
*FEATURE_FLAGS.write() = flags;
sentry::Hub::main().configure_scope(|scope| {
scope.set_context("flags", sentry_flag_context(flags));
});
});
}
/// Initialize the runtime to use for evaluating feature flags.
fn init_runtime() -> Runtime {
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1) // We only need 1 worker thread.
.thread_name("feature-flag-worker")
.enable_io()
.enable_time()
.build()
.expect("to be able to build runtime");
runtime.spawn(async move {
loop {
tokio::time::sleep(RE_EVAL_DURATION).await;
let Some(user_id) = sentry::Hub::main()
.configure_scope(|scope| scope.user().and_then(|u| u.id.clone()))
else {
continue; // Nothing to do if we don't have a user-id set.
};
reevaluate(user_id);
}
});
runtime
}
async fn decide(distinct_id: String) -> Result<FeatureFlags> {
let response = reqwest::ClientBuilder::new()
.connection_verbose(true)
.build()?
.post("https://us.i.posthog.com/decide?v=3")
.json(&DecideRequest {
api_key: POSTHOG_API_KEY.to_string(),
distinct_id,
})
.send()
.await
.context("Failed to send POST request")?;
let status = response.status();
if !status.is_success() {
let body = response.text().await.unwrap_or_default();
bail!("Failed to get feature flags; status={status}, body={body}")
}
let json = response
.json::<DecideResponse>()
.await
.context("Failed to deserialize response")?;
Ok(json.feature_flags)
}
#[derive(Debug, Serialize)]
struct DecideRequest {
api_key: String,
distinct_id: String,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct DecideResponse {
feature_flags: FeatureFlags,
}
#[derive(Debug, Deserialize, Default, Clone, Copy)]
#[serde(rename_all = "kebab-case")]
struct FeatureFlags {
#[serde(default)]
icmp_unreachable_instead_of_nat64: bool,
#[serde(default)]
drop_llmnr_nxdomain_responses: bool,
}
fn sentry_flag_context(flags: FeatureFlags) -> sentry::protocol::Context {
#[derive(Debug, serde::Serialize)]
#[serde(tag = "flag", rename_all = "snake_case")]
enum SentryFlag {
IcmpUnreachableInsteadOfNat64 { result: bool },
DropLlmnrNxdomainResponses { result: bool },
}
// Exhaustive destruction so we don't forget to update this when we add a flag.
let FeatureFlags {
icmp_unreachable_instead_of_nat64,
drop_llmnr_nxdomain_responses,
} = flags;
let value = serde_json::json!({
"values": [
SentryFlag::IcmpUnreachableInsteadOfNat64 {
result: icmp_unreachable_instead_of_nat64,
},
SentryFlag::DropLlmnrNxdomainResponses { result: drop_llmnr_nxdomain_responses }
]
});
sentry::protocol::Context::Other(serde_json::from_value(value).expect("to and from json works"))
}

View File

@@ -1,15 +1,11 @@
#![cfg_attr(test, allow(clippy::unwrap_used))]
use std::{
sync::{Arc, LazyLock},
time::Duration,
};
use std::{sync::Arc, time::Duration};
use anyhow::{bail, Context, Result};
use env::ON_PREM;
use parking_lot::RwLock;
use sentry::protocol::SessionStatus;
use serde::{Deserialize, Serialize};
pub mod feature_flags;
pub struct Dsn(&'static str);
@@ -26,28 +22,6 @@ pub const HEADLESS_DSN: Dsn = Dsn("https://bc27dca8bb37be0142c48c4f89647c13@o450
pub const RELAY_DSN: Dsn = Dsn("https://9d5f664d8f8f7f1716d4b63a58bcafd5@o4507971108339712.ingest.us.sentry.io/4508373298970624");
pub const TESTING: Dsn = Dsn("https://55ef451fca9054179a11f5d132c02f45@o4507971108339712.ingest.us.sentry.io/4508792604852224");
const POSTHOG_API_KEY: &str = "phc_uXXl56plyvIBHj81WwXBLtdPElIRbm7keRTdUCmk8ll";
// Process-wide storage of enabled feature flags.
//
// Defaults to everything off.
static FEATURE_FLAGS: LazyLock<RwLock<FeatureFlags>> = LazyLock::new(RwLock::default);
/// Exposes all feature flags as public, static functions.
///
/// These only ever hit an in-memory location so can even be called from hot paths.
pub mod feature_flags {
use crate::*;
pub fn icmp_unreachable_instead_of_nat64() -> bool {
FEATURE_FLAGS.read().icmp_unreachable_instead_of_nat64
}
pub fn drop_llmnr_nxdomain_responses() -> bool {
FEATURE_FLAGS.read().drop_llmnr_nxdomain_responses
}
}
mod env {
use std::borrow::Cow;
@@ -185,19 +159,7 @@ impl Telemetry {
|user| user.id = Some(id)
});
std::thread::spawn(|| {
let flags = evaluate_feature_flags(id)
.inspect_err(|e| tracing::debug!("Failed to evaluate feature flags: {e:#}"))
.unwrap_or_default();
tracing::debug!(?flags, "Evaluated feature-flags");
*FEATURE_FLAGS.write() = flags;
sentry::Hub::main().configure_scope(|scope| {
scope.set_context("flags", sentry_flag_context(flags));
});
});
feature_flags::reevaluate(id);
}
}
@@ -214,80 +176,6 @@ fn set_current_user(user: Option<sentry::User>) {
sentry::Hub::main().configure_scope(|scope| scope.set_user(user));
}
fn evaluate_feature_flags(distinct_id: String) -> Result<FeatureFlags> {
let response = reqwest::blocking::ClientBuilder::new()
.connection_verbose(true)
.build()?
.post("https://us.i.posthog.com/decide?v=3")
.json(&DecideRequest {
api_key: POSTHOG_API_KEY.to_string(),
distinct_id,
})
.send()
.context("Failed to send POST request")?;
let status = response.status();
if !status.is_success() {
let body = response.text().unwrap_or_default();
bail!("Failed to get feature flags; status={status}, body={body}")
}
let json = response
.json::<DecideResponse>()
.context("Failed to deserialize response")?;
Ok(json.feature_flags)
}
#[derive(Debug, Serialize)]
struct DecideRequest {
api_key: String,
distinct_id: String,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct DecideResponse {
feature_flags: FeatureFlags,
}
#[derive(Debug, Deserialize, Default, Clone, Copy)]
#[serde(rename_all = "kebab-case")]
struct FeatureFlags {
#[serde(default)]
icmp_unreachable_instead_of_nat64: bool,
#[serde(default)]
drop_llmnr_nxdomain_responses: bool,
}
fn sentry_flag_context(flags: FeatureFlags) -> sentry::protocol::Context {
#[derive(Debug, serde::Serialize)]
#[serde(tag = "flag", rename_all = "snake_case")]
enum SentryFlag {
IcmpUnreachableInsteadOfNat64 { result: bool },
DropLlmnrNxdomainResponses { result: bool },
}
// Exhaustive destruction so we don't forget to update this when we add a flag.
let FeatureFlags {
icmp_unreachable_instead_of_nat64,
drop_llmnr_nxdomain_responses,
} = flags;
let value = serde_json::json!({
"values": [
SentryFlag::IcmpUnreachableInsteadOfNat64 {
result: icmp_unreachable_instead_of_nat64,
},
SentryFlag::DropLlmnrNxdomainResponses { result: drop_llmnr_nxdomain_responses }
]
});
sentry::protocol::Context::Other(serde_json::from_value(value).expect("to and from json works"))
}
#[cfg(test)]
mod tests {
use super::*;