diff --git a/rust/Cargo.lock b/rust/Cargo.lock index ecb7571b9..b4182b0b9 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -2403,6 +2403,7 @@ name = "firezone-relay" version = "0.1.0" dependencies = [ "anyhow", + "axum", "aya", "aya-build", "aya-log", diff --git a/rust/relay/server/Cargo.toml b/rust/relay/server/Cargo.toml index 1c8de4090..4486b36f1 100644 --- a/rust/relay/server/Cargo.toml +++ b/rust/relay/server/Cargo.toml @@ -9,6 +9,7 @@ development = ["difference", "env_logger"] [dependencies] anyhow = { workspace = true } +axum = { workspace = true, features = ["http1", "tokio", "query"] } backoff = { workspace = true } base64 = { workspace = true } bytecodec = { workspace = true } diff --git a/rust/relay/server/src/control_endpoint.rs b/rust/relay/server/src/control_endpoint.rs new file mode 100644 index 000000000..1a9f2451b --- /dev/null +++ b/rust/relay/server/src/control_endpoint.rs @@ -0,0 +1,53 @@ +use axum::Router; +use axum::extract::{Query, State}; +use axum::http::StatusCode; +use axum::routing::post; +use firezone_logging::FilterReloadHandle; +use std::net::SocketAddr; +use std::sync::Arc; + +/// Runs an HTTP server that responds to `POST /log_filter?directives=` and sets the given directives as the new log-filter. +pub async fn serve( + addr: impl Into, + filter_reload_handle: FilterReloadHandle, +) -> std::io::Result<()> { + let addr = addr.into(); + + let service = Router::new() + .route("/log_filter", post(set_log_filter)) + .with_state(AppState { + handle: Arc::new(filter_reload_handle), + }) + .into_make_service(); + + axum::serve(tokio::net::TcpListener::bind(addr).await?, service).await?; + + Ok(()) +} + +async fn set_log_filter(Query(params): Query, state: State) -> StatusCode { + let directives = params.directives; + + match state.handle.reload(&directives) { + Ok(()) => { + tracing::info!(%directives, "Applied new logging directives"); + + StatusCode::OK + } + Err(e) => { + tracing::info!(%directives, "Failed to set log filter to new directives: {e}"); + + StatusCode::BAD_REQUEST + } + } +} + +#[derive(Clone)] +struct AppState { + handle: Arc, +} + +#[derive(serde::Deserialize)] +struct QueryParams { + directives: String, +} diff --git a/rust/relay/server/src/lib.rs b/rust/relay/server/src/lib.rs index 3b9edb884..43703808a 100644 --- a/rust/relay/server/src/lib.rs +++ b/rust/relay/server/src/lib.rs @@ -5,6 +5,7 @@ mod server; mod sleep; pub mod auth; +pub mod control_endpoint; pub mod ebpf; #[cfg(feature = "proptest")] #[allow(clippy::unwrap_used)] diff --git a/rust/relay/server/src/main.rs b/rust/relay/server/src/main.rs index 823a8b7f2..360af0d5b 100644 --- a/rust/relay/server/src/main.rs +++ b/rust/relay/server/src/main.rs @@ -5,11 +5,11 @@ use backoff::ExponentialBackoffBuilder; use clap::Parser; use ebpf_shared::Config; use firezone_bin_shared::http_health_check; -use firezone_logging::{err_with_src, sentry_layer}; +use firezone_logging::{FilterReloadHandle, err_with_src, sentry_layer}; use firezone_relay::sockets::Sockets; use firezone_relay::{ AddressFamily, AllocationPort, ChannelData, ClientSocket, Command, IpStack, PeerSocket, Server, - Sleep, VERSION, ebpf, sockets, + Sleep, VERSION, control_endpoint, ebpf, sockets, }; use firezone_telemetry::{RELAY_DSN, Telemetry}; use futures::{FutureExt, future}; @@ -18,16 +18,16 @@ use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; use secrecy::{ExposeSecret, Secret, SecretString}; use std::borrow::Cow; -use std::net::{Ipv4Addr, Ipv6Addr}; +use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; use std::pin::Pin; use std::sync::{Arc, Mutex}; use std::task::{Poll, ready}; use std::time::{Duration, Instant}; use stun_codec::rfc5766::attributes::ChannelNumber; -use tracing::{Subscriber, level_filters::LevelFilter}; +use tracing::Subscriber; use tracing_core::Dispatch; use tracing_stackdriver::CloudTraceConfiguration; -use tracing_subscriber::{EnvFilter, Layer, layer::SubscriberExt, util::SubscriberInitExt}; +use tracing_subscriber::{Layer, layer::SubscriberExt, util::SubscriberInitExt}; use url::Url; const STATS_LOG_INTERVAL: Duration = Duration::from_secs(10); @@ -91,6 +91,10 @@ struct Args { #[command(flatten)] health_check: http_health_check::HealthCheckArgs, + /// The address of the local interface where we should serve our control endpoint. + #[arg(long, env, hide = true, default_value = "127.0.0.1:9999")] + control_endpoint: SocketAddr, + /// Enable sentry.io crash-reporting agent. #[arg(long, env = "FIREZONE_TELEMETRY", default_value_t = false)] telemetry: bool, @@ -136,7 +140,7 @@ fn main() { } async fn try_main(args: Args) -> Result<()> { - setup_tracing(&args)?; + let filter_reload_handle = setup_tracing(&args)?; let mut ebpf = args .ebpf_offloading @@ -177,6 +181,11 @@ async fn try_main(args: Args) -> Result<()> { make_is_healthy(last_heartbeat_sent.clone()), )); + tokio::spawn(control_endpoint::serve( + args.control_endpoint, + filter_reload_handle, + )); + let login = LoginUrl::relay( args.api_url.clone(), &args.token, @@ -222,7 +231,7 @@ async fn try_main(args: Args) -> Result<()> { /// ## Integration with OTLP /// /// If the user has specified [`TraceCollector::Otlp`], we will set up an OTLP-exporter that connects to an OTLP collector specified at `Args.otlp_grpc_endpoint`. -fn setup_tracing(args: &Args) -> Result<()> { +fn setup_tracing(args: &Args) -> Result { use opentelemetry::{global, trace::TracerProvider as _}; use opentelemetry_otlp::WithExportConfig; use opentelemetry_sdk::{runtime::Tokio, trace::Config}; @@ -233,12 +242,20 @@ fn setup_tracing(args: &Args) -> Result<()> { &tracing_subscriber::registry().with(log_layer(args)).into(), ); - let dispatch: Dispatch = match args.otlp_grpc_endpoint.clone() { - None => tracing_subscriber::registry() - .with(log_layer(args)) - .with(env_filter()) - .with(sentry_layer()) - .into(), + let directives = std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string()); + + let (dispatch, reload_handle) = match args.otlp_grpc_endpoint.clone() { + None => { + let (filter, reload_handle) = firezone_logging::try_filter(&directives)?; + + let dispatch: Dispatch = tracing_subscriber::registry() + .with(log_layer(args)) + .with(filter) + .with(sentry_layer()) + .into(); + + (dispatch, reload_handle) + } Some(endpoint) => { let metadata = make_otel_metadata(); let grpc_endpoint = format!("http://{endpoint}"); @@ -273,12 +290,16 @@ fn setup_tracing(args: &Args) -> Result<()> { tracing::trace!(target: "relay", "Successfully initialized metric provider on tokio runtime"); - tracing_subscriber::registry() + let (filter, reload_handle) = firezone_logging::try_filter(&directives)?; + + let dispatch: Dispatch = tracing_subscriber::registry() .with(log_layer(args)) .with(tracing_opentelemetry::layer().with_tracer(tracer_provider.tracer("relay"))) - .with(env_filter()) + .with(filter) .with(sentry_layer()) - .into() + .into(); + + (dispatch, reload_handle) } }; @@ -288,7 +309,7 @@ fn setup_tracing(args: &Args) -> Result<()> { .try_init() .context("Failed to initialize tracing")?; - Ok(()) + Ok(reload_handle) } /// Constructs the base log layer. @@ -316,12 +337,6 @@ where } } -fn env_filter() -> EnvFilter { - EnvFilter::builder() - .with_default_directive(LevelFilter::INFO.into()) - .from_env_lossy() -} - #[derive(Debug, serde::Deserialize)] #[serde(rename_all = "snake_case", tag = "event", content = "payload")] enum IngressMessage {