diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 6627b011d..9c8736def 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1087,6 +1087,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "encoding_rs" +version = "0.8.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7268b386296a025e474d5140678f75d6de9493ae55a5d709eeb9dd08149945e1" +dependencies = [ + "cfg-if 1.0.0", +] + [[package]] name = "env_logger" version = "0.10.0" @@ -1154,11 +1163,14 @@ dependencies = [ "firezone-tunnel", "libs-common", "rand", + "reqwest", "secrecy", "serde", "serde_json", + "time", "tokio", "tokio-tungstenite", + "tokio-util", "tracing", "tracing-android", "tracing-appender", @@ -1577,6 +1589,20 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d78e1e73ec14cf7375674f74d7dde185c8206fd9dea6fb6295e8a98098aaa97" +dependencies = [ + "futures-util", + "http", + "hyper", + "rustls 0.21.7", + "tokio", + "tokio-rustls", +] + [[package]] name = "hyper-timeout" version = "0.4.1" @@ -2765,6 +2791,47 @@ dependencies = [ "webrtc", ] +[[package]] +name = "reqwest" +version = "0.11.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e9ad3fe7488d7e34558a2033d45a0c90b72d97b4f80705666fea71472e2e6a1" +dependencies = [ + "base64 0.21.4", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-rustls", + "ipnet", + "js-sys", + "log", + "mime", + "once_cell", + "percent-encoding", + "pin-project-lite", + "rustls 0.21.7", + "rustls-pemfile", + "serde", + "serde_json", + "serde_urlencoded", + "tokio", + "tokio-rustls", + "tokio-util", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "wasm-streams", + "web-sys", + "webpki-roots 0.25.2", + "winreg", +] + [[package]] name = "rfc6979" version = "0.3.1" @@ -3104,6 +3171,18 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "sha1" version = "0.2.0" @@ -3469,9 +3548,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17f6bb557fd245c28e6411aa56b6403c689ad95061f50e4be16c274e70a17e48" +checksum = "426f806f4089c493dcac0d24c29c01e2c38baf8e30f1b716ee37e83d200b18fe" dependencies = [ "deranged", "itoa", @@ -3482,15 +3561,15 @@ dependencies = [ [[package]] name = "time-core" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb" +checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" [[package]] name = "time-macros" -version = "0.2.14" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a942f44339478ef67935ab2bbaec2fb0322496cf3cbe84b261e06ac3814c572" +checksum = "4ad70d68dba9e1f8aceda7aa6711965dfec1cac869f311a51bd08b3a2ccbce20" dependencies = [ "time-core", ] @@ -3584,7 +3663,7 @@ dependencies = [ "tokio", "tokio-rustls", "tungstenite", - "webpki-roots", + "webpki-roots 0.23.1", ] [[package]] @@ -3956,6 +4035,7 @@ dependencies = [ "form_urlencoded", "idna", "percent-encoding", + "serde", ] [[package]] @@ -4066,6 +4146,18 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c02dbc21516f9f1f04f187958890d7e6026df8d16540b7ad9492bc34a67cea03" +dependencies = [ + "cfg-if 1.0.0", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.87" @@ -4095,6 +4187,19 @@ version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1" +[[package]] +name = "wasm-streams" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4609d447824375f43e1ffbc051b50ad8f4b3ae8219680c94452ea05eb240ac7" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "web-sys" version = "0.3.64" @@ -4134,6 +4239,12 @@ dependencies = [ "rustls-webpki 0.100.3", ] +[[package]] +name = "webpki-roots" +version = "0.25.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14247bb57be4f377dfb94c72830b8ce8fc6beac03cf4bf7b9732eadd414123fc" + [[package]] name = "webrtc" version = "0.8.0" @@ -4533,6 +4644,16 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" +[[package]] +name = "winreg" +version = "0.50.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" +dependencies = [ + "cfg-if 1.0.0", + "windows-sys 0.48.0", +] + [[package]] name = "wintun" version = "0.3.1" diff --git a/rust/connlib/clients/android/src/lib.rs b/rust/connlib/clients/android/src/lib.rs index 343d2c23b..c40329330 100644 --- a/rust/connlib/clients/android/src/lib.rs +++ b/rust/connlib/clients/android/src/lib.rs @@ -11,6 +11,7 @@ use jni::{ JNIEnv, JavaVM, }; use secrecy::SecretString; +use std::path::Path; use std::sync::OnceLock; use std::{ net::{Ipv4Addr, Ipv6Addr}, @@ -18,13 +19,13 @@ use std::{ path::PathBuf, }; use thiserror::Error; -use tracing_appender::non_blocking::WorkerGuard; use tracing_subscriber::prelude::*; use tracing_subscriber::EnvFilter; pub struct CallbackHandler { vm: JavaVM, callback_handler: GlobalRef, + handle: file_logger::Handle, } impl Clone for CallbackHandler { @@ -37,6 +38,7 @@ impl Clone for CallbackHandler { Self { vm: unsafe { std::ptr::read(&self.vm) }, callback_handler: self.callback_handler.clone(), + handle: self.handle.clone(), } } } @@ -99,7 +101,7 @@ where tracing_subscriber::layer::Identity::new() } -fn init_logging(log_dir: PathBuf, log_filter: String) { +fn init_logging(log_dir: &Path, log_filter: String) -> file_logger::Handle { // On Android, logging state is persisted indefinitely after the System.loadLibrary // call, which means that a disconnect and tunnel process restart will not // reinitialize the guard. This is a problem because the guard remains tied to @@ -108,21 +110,23 @@ fn init_logging(log_dir: PathBuf, log_filter: String) { // // So we use a static variable to track whether the guard has been initialized and avoid // re-initialized it if so. - static LOGGING_GUARD: OnceLock = OnceLock::new(); - if LOGGING_GUARD.get().is_some() { - return; + static LOGGING_HANDLE: OnceLock = OnceLock::new(); + if let Some(handle) = LOGGING_HANDLE.get() { + return handle.clone(); } - let (file_layer, guard) = file_logger::layer(log_dir); + let (file_layer, handle) = file_logger::layer(log_dir); - LOGGING_GUARD - .set(guard) + LOGGING_HANDLE + .set(handle.clone()) .expect("Logging guard should never be initialized twice"); let _ = tracing_subscriber::registry() .with(file_layer.with_filter(EnvFilter::new(log_filter))) .with(android_layer()) .try_init(); + + handle } impl Callbacks for CallbackHandler { @@ -283,6 +287,15 @@ impl Callbacks for CallbackHandler { ) }) } + + fn roll_log_file(&self) -> Option { + self.handle.roll_to_new_file().unwrap_or_else(|e| { + tracing::debug!("Failed to roll over to new file: {e}"); + let _ = self.on_error(&Error::LogFileRollError(e)); + + None + }) + } } fn throw(env: &mut JNIEnv, class: &str, msg: impl Into) { @@ -348,13 +361,15 @@ fn connect( let device_id = string_from_jstring!(env, device_id); let log_dir = string_from_jstring!(env, log_dir); let log_filter = string_from_jstring!(env, log_filter); + + let handle = init_logging(&PathBuf::from(log_dir), log_filter); + let callback_handler = CallbackHandler { vm: env.get_java_vm().map_err(ConnectError::GetJavaVmFailed)?, callback_handler, + handle, }; - init_logging(log_dir.into(), log_filter); - let session = Session::connect(portal_url.as_str(), secret, device_id, callback_handler)?; Ok(session) diff --git a/rust/connlib/clients/apple/src/lib.rs b/rust/connlib/clients/apple/src/lib.rs index b40fa0220..10916a93c 100644 --- a/rust/connlib/clients/apple/src/lib.rs +++ b/rust/connlib/clients/apple/src/lib.rs @@ -10,7 +10,6 @@ use std::{ path::PathBuf, sync::Arc, }; -use tracing_appender::non_blocking::WorkerGuard; use tracing_subscriber::prelude::*; use tracing_subscriber::EnvFilter; @@ -65,10 +64,7 @@ mod ffi { } /// This is used by the apple client to interact with our code. -pub struct WrappedSession { - session: Session, - _guard: WorkerGuard, -} +pub struct WrappedSession(Session); // SAFETY: `CallbackHandler.swift` promises to be thread-safe. // TODO: Uphold that promise! @@ -76,11 +72,13 @@ unsafe impl Send for ffi::CallbackHandler {} unsafe impl Sync for ffi::CallbackHandler {} #[derive(Clone)] -#[repr(transparent)] -// Generated Swift opaque type wrappers have a `Drop` impl that decrements the -// refcount, but there's no way to generate a `Clone` impl that increments the -// recount. Instead, we just wrap it in an `Arc`. -pub struct CallbackHandler(Arc); +pub struct CallbackHandler { + // Generated Swift opaque type wrappers have a `Drop` impl that decrements the + // refcount, but there's no way to generate a `Clone` impl that increments the + // recount. Instead, we just wrap it in an `Arc`. + inner: Arc, + handle: file_logger::Handle, +} impl Callbacks for CallbackHandler { type Error = std::convert::Infallible; @@ -92,7 +90,7 @@ impl Callbacks for CallbackHandler { dns_address: Ipv4Addr, dns_fallback_strategy: String, ) -> Result { - self.0.on_set_interface_config( + self.inner.on_set_interface_config( tunnel_address_v4.to_string(), tunnel_address_v6.to_string(), dns_address.to_string(), @@ -102,17 +100,17 @@ impl Callbacks for CallbackHandler { } fn on_tunnel_ready(&self) -> Result<(), Self::Error> { - self.0.on_tunnel_ready(); + self.inner.on_tunnel_ready(); Ok(()) } fn on_add_route(&self, route: IpNetwork) -> Result<(), Self::Error> { - self.0.on_add_route(route.to_string()); + self.inner.on_add_route(route.to_string()); Ok(()) } fn on_remove_route(&self, route: IpNetwork) -> Result<(), Self::Error> { - self.0.on_remove_route(route.to_string()); + self.inner.on_remove_route(route.to_string()); Ok(()) } @@ -120,7 +118,7 @@ impl Callbacks for CallbackHandler { &self, resource_list: Vec, ) -> Result<(), Self::Error> { - self.0.on_update_resources( + self.inner.on_update_resources( serde_json::to_string(&resource_list) .expect("developer error: failed to serialize resource list"), ); @@ -128,19 +126,28 @@ impl Callbacks for CallbackHandler { } fn on_disconnect(&self, error: Option<&Error>) -> Result<(), Self::Error> { - self.0 + self.inner .on_disconnect(error.map(ToString::to_string).unwrap_or_default()); Ok(()) } fn on_error(&self, error: &Error) -> Result<(), Self::Error> { - self.0.on_error(error.to_string()); + self.inner.on_error(error.to_string()); Ok(()) } + + fn roll_log_file(&self) -> Option { + self.handle.roll_to_new_file().unwrap_or_else(|e| { + tracing::debug!("Failed to roll over to new file: {e}"); + let _ = self.on_error(&Error::LogFileRollError(e)); + + None + }) + } } -fn init_logging(log_dir: PathBuf, log_filter: String) -> WorkerGuard { - let (file_layer, guard) = file_logger::layer(log_dir.clone()); +fn init_logging(log_dir: PathBuf, log_filter: String) -> file_logger::Handle { + let (file_layer, handle) = file_logger::layer(&log_dir); let _ = tracing_subscriber::registry() .with(tracing_oslog::OsLogger::new( @@ -150,7 +157,7 @@ fn init_logging(log_dir: PathBuf, log_filter: String) -> WorkerGuard { .with(file_layer.with_filter(EnvFilter::new(log_filter))) .try_init(); - guard + handle } impl WrappedSession { @@ -162,21 +169,23 @@ impl WrappedSession { log_filter: String, callback_handler: ffi::CallbackHandler, ) -> Result { - let _guard = init_logging(log_dir.into(), log_filter); let secret = SecretString::from(token); let session = Session::connect( portal_url.as_str(), secret, device_id, - CallbackHandler(callback_handler.into()), + CallbackHandler { + inner: Arc::new(callback_handler), + handle: init_logging(log_dir.into(), log_filter), + }, ) .map_err(|err| err.to_string())?; - Ok(Self { session, _guard }) + Ok(Self(session)) } fn disconnect(&mut self) { - self.session.disconnect(None) + self.0.disconnect(None) } } diff --git a/rust/connlib/clients/headless/src/main.rs b/rust/connlib/clients/headless/src/main.rs index 9e0cbf72e..2aff44aec 100644 --- a/rust/connlib/clients/headless/src/main.rs +++ b/rust/connlib/clients/headless/src/main.rs @@ -1,6 +1,6 @@ use anyhow::Result; use clap::Parser; -use firezone_client_connlib::{file_logger, get_device_id, Callbacks, Session}; +use firezone_client_connlib::{file_logger, get_device_id, Callbacks, Error, Session}; use headless_utils::{block_on_ctrl_c, setup_global_subscriber, CommonArgs}; use secrecy::SecretString; use std::path::PathBuf; @@ -8,8 +8,7 @@ use std::path::PathBuf; fn main() -> Result<()> { let cli = Cli::parse(); - let (layer, _guard) = cli.log_dir.map(file_logger::layer).unzip(); - + let (layer, handle) = cli.log_dir.as_deref().map(file_logger::layer).unzip(); setup_global_subscriber(layer); let device_id = get_device_id(); @@ -18,7 +17,7 @@ fn main() -> Result<()> { cli.common.url, SecretString::from(cli.common.secret), device_id, - CallbackHandler, + CallbackHandler { handle }, ) .unwrap(); tracing::info!("new_session"); @@ -30,10 +29,24 @@ fn main() -> Result<()> { } #[derive(Clone)] -struct CallbackHandler; +struct CallbackHandler { + handle: Option, +} impl Callbacks for CallbackHandler { type Error = std::convert::Infallible; + + fn roll_log_file(&self) -> Option { + self.handle + .as_ref()? + .roll_to_new_file() + .unwrap_or_else(|e| { + tracing::debug!("Failed to roll over to new file: {e}"); + let _ = self.on_error(&Error::LogFileRollError(e)); + + None + }) + } } #[derive(Parser)] diff --git a/rust/connlib/libs/client/Cargo.toml b/rust/connlib/libs/client/Cargo.toml index 6e17d5368..a5ffc2a32 100644 --- a/rust/connlib/libs/client/Cargo.toml +++ b/rust/connlib/libs/client/Cargo.toml @@ -7,8 +7,9 @@ edition = "2021" mock = ["libs-common/mock"] [dependencies] +tokio = { version = "1.32", default-features = false, features = ["sync", "rt"] } +tokio-util = "0.7.9" secrecy = { workspace = true } -tokio = { version = "1.32", default-features = false, features = ["sync"] } tracing = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter"] } tracing-appender = { version = "0.2.2" } @@ -20,7 +21,9 @@ serde = { version = "1.0", default-features = false, features = ["std", "derive" boringtun = { workspace = true } backoff = { workspace = true } webrtc = "0.8" -url = { version = "2.4.1", default-features = false } +url = { version = "2.4.1", features = ["serde"] } +time = { version = "0.3.29", features = ["formatting"] } +reqwest = { version = "0.11.20", default-features = false, features = ["stream", "rustls-tls"] } rand = { version = "0.8", default-features = false, features = ["std"] } tokio-tungstenite = { version = "0.19", default-features = false, features = ["connect", "handshake", "rustls-tls-webpki-roots"] } diff --git a/rust/connlib/libs/client/examples/on_demand_rolling.rs b/rust/connlib/libs/client/examples/on_demand_rolling.rs new file mode 100644 index 000000000..b5f394adc --- /dev/null +++ b/rust/connlib/libs/client/examples/on_demand_rolling.rs @@ -0,0 +1,28 @@ +use firezone_client_connlib::file_logger; +use std::path::Path; +use std::time::Duration; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::{EnvFilter, Layer}; + +fn main() { + let log_dir = Path::new("./target"); + + println!("Logging to {}", log_dir.canonicalize().unwrap().display()); + + let (file_layer, handle) = file_logger::layer(log_dir); + + tracing_subscriber::registry() + .with(file_layer.with_filter(EnvFilter::new("info"))) + .init(); + + tracing::info!("First log"); + tracing::info!("Second log"); + + std::thread::sleep(Duration::from_secs(1)); // Make sure we don't use the same filename. + + handle.roll_to_new_file().unwrap().unwrap(); + + tracing::info!("Third log"); + tracing::info!("Fourth log"); +} diff --git a/rust/connlib/libs/client/src/control.rs b/rust/connlib/libs/client/src/control.rs index 6c0b4d5d0..efe19c961 100644 --- a/rust/connlib/libs/client/src/control.rs +++ b/rust/connlib/libs/client/src/control.rs @@ -1,4 +1,5 @@ -use std::sync::Arc; +use std::path::PathBuf; +use std::{io, sync::Arc}; use crate::messages::{ BroadcastGatewayIceCandidates, Connect, ConnectionDetails, EgressMessages, @@ -16,6 +17,8 @@ use webrtc::ice_transport::ice_candidate::RTCIceCandidate; use async_trait::async_trait; use firezone_tunnel::{ConnId, ControlSignal, Request, Tunnel}; use tokio::sync::Mutex; +use tokio_util::codec::{BytesCodec, FramedRead}; +use url::Url; #[async_trait] impl ControlSignal for ControlSignaler { @@ -78,7 +81,7 @@ pub struct ControlSignaler { impl ControlPlane { #[tracing::instrument(level = "trace", skip(self))] - pub async fn init( + async fn init( &mut self, InitClient { interface, @@ -239,6 +242,17 @@ impl ControlPlane { Messages::ResourceRemoved(resource) => self.remove_resource(resource.id), Messages::ResourceUpdated(resource) => self.update_resource(resource), Messages::IceCandidates(ice_candidate) => self.add_ice_candidate(ice_candidate).await, + Messages::SignedLogUrl(url) => { + let Some(path) = self.tunnel.callbacks().roll_log_file() else { + return Ok(()) + }; + + tokio::spawn(async move { + if let Err(e) = upload(path, url).await { + tracing::warn!("Failed to upload log file: {e}") + } + }); + } } Ok(()) } @@ -277,4 +291,47 @@ impl ControlPlane { pub async fn stats_event(&mut self) { tracing::debug!(target: "tunnel_state", stats = ?self.tunnel.stats()); } + + pub async fn request_log_upload_url(&mut self) { + tracing::info!("Requesting log upload URL from portal"); + + let _ = self + .control_signaler + .control_signal + .send(EgressMessages::CreateLogSink {}) + .await; + } +} + +async fn upload(path: PathBuf, url: Url) -> io::Result<()> { + tracing::info!(path = %path.display(), %url, "Uploading log file"); + + let file = tokio::fs::File::open(&path).await?; + let response = reqwest::Client::new() + .put(url) + .body(reqwest::Body::wrap_stream(FramedRead::new( + file, + BytesCodec::default(), + ))) + .send() + .await + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + + let status_code = response.status(); + + if !status_code.is_success() { + let body = response + .text() + .await + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + + tracing::warn!(%body, %status_code, "Failed to upload logs"); + + return Err(io::Error::new( + io::ErrorKind::Other, + "portal returned non-successful exit code", + )); + } + + Ok(()) } diff --git a/rust/connlib/libs/client/src/file_logger.rs b/rust/connlib/libs/client/src/file_logger.rs index 4daa628d0..dab190093 100644 --- a/rust/connlib/libs/client/src/file_logger.rs +++ b/rust/connlib/libs/client/src/file_logger.rs @@ -13,31 +13,146 @@ //! - Device serials //! - MAC addresses -use std::path::PathBuf; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex, MutexGuard}; +use std::{fs, io}; + +use time::OffsetDateTime; use tracing::Subscriber; +use tracing_appender::non_blocking::{NonBlocking, WorkerGuard}; use tracing_subscriber::Layer; const LOG_FILE_BASE_NAME: &str = "connlib.log"; /// Create a new file logger layer. -pub fn layer( - log_dir: PathBuf, -) -> ( - Box + Send + Sync + 'static>, - tracing_appender::non_blocking::WorkerGuard, -) +pub fn layer(log_dir: &Path) -> (Box + Send + Sync + 'static>, Handle) where T: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>, { - let (writer, guard) = tracing_appender::non_blocking(tracing_appender::rolling::hourly( - log_dir, - LOG_FILE_BASE_NAME, - )); - - let layer = tracing_stackdriver::layer().with_writer(writer).boxed(); + let (appender, handle) = new_appender(log_dir.to_path_buf()); + let layer = tracing_stackdriver::layer().with_writer(appender).boxed(); // Return the guard so that the caller maintains a handle to it. Otherwise, // we have to wait for tracing_appender to flush the logs before exiting. // See https://docs.rs/tracing-appender/latest/tracing_appender/non_blocking/struct.WorkerGuard.html - (layer, guard) + (layer, handle) +} + +fn new_appender(directory: PathBuf) -> (NonBlocking, Handle) { + let inner = Arc::new(Mutex::new(Inner { + directory, + current: None, + })); + let appender = Appender { + inner: inner.clone(), + }; + + let (non_blocking, guard) = tracing_appender::non_blocking(appender); + let handle = Handle { + inner, + _guard: Arc::new(guard), + }; + + (non_blocking, handle) +} + +/// A handle to our file-logger. +/// +/// This handle allows to roll over logging to a new file via [`Handle::roll_to_new_file`]. It also houses the [`WorkerGuard`] of the underlying non-blocking appender. +/// Thus, you MUST NOT drop this handle for as long as you want messages to arrive at the log file. +#[derive(Clone, Debug)] +pub struct Handle { + inner: Arc>, + _guard: Arc, +} + +impl Handle { + /// Rolls over to a new file. + /// + /// Returns the path to the now unused, previous log file. + /// If we don't have a log-file yet, `Ok(None)` is returned. + pub fn roll_to_new_file(&self) -> io::Result> { + let mut inner = try_unlock(&self.inner); + let new_writer = inner.create_new_writer()?; + let Some((_, name)) = inner.current.replace(new_writer) else { + return Ok(None) + }; + + Ok(Some(inner.directory.join(name))) + } +} + +#[derive(Debug)] +struct Appender { + inner: Arc>, +} + +#[derive(Debug)] +struct Inner { + directory: PathBuf, + current: Option<(fs::File, String)>, +} + +impl Inner { + fn with_current_file( + &mut self, + cb: impl Fn(&mut fs::File) -> io::Result, + ) -> io::Result { + match self.current.as_mut() { + None => { + let (mut file, name) = self.create_new_writer()?; + + let ret = cb(&mut file); + + self.current = Some((file, name)); + + ret + } + Some((file, _)) => cb(file), + } + } + + // Inspired from `tracing-appender/src/rolling.rs`. + fn create_new_writer(&self) -> io::Result<(fs::File, String)> { + let format = + time::format_description::parse("[year]-[month]-[day]-[hour]-[minute]-[second]") + .expect("static format description to be valid"); + let date = OffsetDateTime::now_utc() + .format(&format) + .expect("static format description to be valid"); + + let filename = format!("{LOG_FILE_BASE_NAME}.{date}"); + + let path = self.directory.join(&filename); + let mut open_options = fs::OpenOptions::new(); + open_options.append(true).create(true); + + let new_file = open_options.open(path.as_path()); + if new_file.is_err() { + if let Some(parent) = path.parent() { + fs::create_dir_all(parent)?; + let file = open_options.open(path)?; + + return Ok((file, filename)); + } + } + + let file = new_file?; + + Ok((file, filename)) + } +} + +impl io::Write for Appender { + fn write(&mut self, buf: &[u8]) -> io::Result { + try_unlock(&self.inner).with_current_file(|f| f.write(buf)) + } + + fn flush(&mut self) -> io::Result<()> { + try_unlock(&self.inner).with_current_file(|f| f.flush()) + } +} + +fn try_unlock(inner: &Mutex) -> MutexGuard<'_, Inner> { + inner.lock().unwrap_or_else(|e| e.into_inner()) } diff --git a/rust/connlib/libs/client/src/lib.rs b/rust/connlib/libs/client/src/lib.rs index 826f7266c..d08b304e2 100644 --- a/rust/connlib/libs/client/src/lib.rs +++ b/rust/connlib/libs/client/src/lib.rs @@ -166,7 +166,8 @@ where }; tokio::spawn(async move { - let mut interval = tokio::time::interval(Duration::from_secs(10)); + let mut log_stats_interval = tokio::time::interval(Duration::from_secs(10)); + let mut upload_logs_interval = tokio::time::interval(upload_interval_from_env_or_default()); loop { tokio::select! { Some((msg, reference)) = control_plane_receiver.recv() => { @@ -175,10 +176,12 @@ where Err(err) => control_plane.handle_error(err, reference).await, } }, - _ = interval.tick() => control_plane.stats_event().await, + _ = log_stats_interval.tick() => control_plane.stats_event().await, + _ = upload_logs_interval.tick() => control_plane.request_log_upload_url().await, else => break } } + Result::Ok(()) }); @@ -246,3 +249,31 @@ where Self::disconnect_inner(self.runtime_stopper.clone(), &self.callbacks, error) } } + +/// Parses an interval from the _compile-time_ env variable `CONNLIB_LOG_UPLOAD_INTERVAL_SECS`. +/// +/// If not present or parsing as u64 fails, we fall back to a default interval of 1 hour. +fn upload_interval_from_env_or_default() -> Duration { + const DEFAULT: Duration = Duration::from_secs(60 * 60); + + let Some(interval) = option_env!("CONNLIB_LOG_UPLOAD_INTERVAL_SECS") else { + tracing::warn!(interval = ?DEFAULT, "Env variable `CONNLIB_LOG_UPLOAD_INTERVAL_SECS` was not set during compile-time, falling back to default"); + + return DEFAULT + }; + + let interval = match interval.parse() { + Ok(i) => i, + Err(e) => { + tracing::warn!(interval = ?DEFAULT, "Failed to parse `CONNLIB_LOG_UPLOAD_INTERVAL_SECS` as u64: {e}"); + return DEFAULT; + } + }; + + tracing::info!( + ?interval, + "Using upload interval specified at compile-time via `CONNLIB_LOG_UPLOAD_INTERVAL_SECS`" + ); + + Duration::from_secs(interval) +} diff --git a/rust/connlib/libs/client/src/messages.rs b/rust/connlib/libs/client/src/messages.rs index f3600af7c..f6439a7a2 100644 --- a/rust/connlib/libs/client/src/messages.rs +++ b/rust/connlib/libs/client/src/messages.rs @@ -7,6 +7,7 @@ use libs_common::messages::{ GatewayId, Interface, Key, Relay, RequestConnection, ResourceDescription, ResourceId, ReuseConnection, }; +use url::Url; use webrtc::ice_transport::ice_candidate::RTCIceCandidateInit; #[derive(Debug, PartialEq, Eq, Deserialize, Serialize, Clone)] @@ -88,6 +89,8 @@ pub struct GatewayIceCandidates { pub enum ReplyMessages { ConnectionDetails(ConnectionDetails), Connect(Connect), + /// Response for [`EgressMessages::CreateLogSink`]. + SignedLogUrl(Url), } /// The totality of all messages (might have a macro in the future to derive the other types) @@ -97,6 +100,7 @@ pub enum Messages { Init(InitClient), ConnectionDetails(ConnectionDetails), Connect(Connect), + SignedLogUrl(Url), // Resources: arrive in an orderly fashion ResourceAdded(ResourceDescription), @@ -123,6 +127,7 @@ impl From for Messages { match value { ReplyMessages::ConnectionDetails(m) => Self::ConnectionDetails(m), ReplyMessages::Connect(m) => Self::Connect(m), + ReplyMessages::SignedLogUrl(url) => Self::SignedLogUrl(url), } } } @@ -138,6 +143,7 @@ pub enum EgressMessages { resource_id: ResourceId, connected_gateway_ids: Vec, }, + CreateLogSink {}, RequestConnection(RequestConnection), ReuseConnection(ReuseConnection), BroadcastIceCandidates(BroadcastGatewayIceCandidates), @@ -154,6 +160,7 @@ mod test { }; use chrono::NaiveDateTime; + use libs_common::control::ErrorInfo; use crate::messages::{ConnectionDetails, EgressMessages, ReplyMessages}; @@ -270,7 +277,7 @@ mod test { #[test] fn connection_details_reply() { - let m = PhoenixMessage::::new_reply( + let m = PhoenixMessage::::new_ok_reply( "client", ReplyMessages::ConnectionDetails(ConnectionDetails { gateway_id: "73037362-715d-4a83-a749-f18eadd970e6".parse().unwrap(), @@ -301,6 +308,7 @@ mod test { }), ], }), + None, ); let message = r#" { @@ -342,4 +350,34 @@ mod test { let reply_message = serde_json::from_str(message).unwrap(); assert_eq!(m, reply_message); } + + #[test] + fn create_log_sink_error_response() { + let json = r#"{"event":"phx_reply","ref":"unique_log_sink_ref","topic":"client","payload":{"status":"error","response":"disabled"}}"#; + + let actual = + serde_json::from_str::>(json).unwrap(); + let expected = PhoenixMessage::new_err_reply( + "client", + ErrorInfo::Disabled, + "unique_log_sink_ref".to_owned(), + ); + + assert_eq!(actual, expected) + } + + #[test] + fn create_log_sink_ok_response() { + let json = r#"{"event":"phx_reply","ref":"unique_log_sink_ref","topic":"client","payload":{"status":"ok","response":"https://storage.googleapis.com/foo/bar"}}"#; + + let actual = + serde_json::from_str::>(json).unwrap(); + let expected = PhoenixMessage::new_ok_reply( + "client", + ReplyMessages::SignedLogUrl("https://storage.googleapis.com/foo/bar".parse().unwrap()), + "unique_log_sink_ref".to_owned(), + ); + + assert_eq!(actual, expected) + } } diff --git a/rust/connlib/libs/common/src/callbacks.rs b/rust/connlib/libs/common/src/callbacks.rs index 67f402005..5a8d681cc 100644 --- a/rust/connlib/libs/common/src/callbacks.rs +++ b/rust/connlib/libs/common/src/callbacks.rs @@ -3,6 +3,7 @@ use ip_network::IpNetwork; use std::error::Error; use std::fmt::{Debug, Display}; use std::net::{Ipv4Addr, Ipv6Addr}; +use std::path::PathBuf; // Avoids having to map types for Windows type RawFd = i32; @@ -63,4 +64,8 @@ pub trait Callbacks: Clone + Send + Sync { tracing::warn!(error = ?error); Ok(()) } + + fn roll_log_file(&self) -> Option { + None + } } diff --git a/rust/connlib/libs/common/src/control.rs b/rust/connlib/libs/common/src/control.rs index ef0113ce7..cba5e3989 100644 --- a/rust/connlib/libs/common/src/control.rs +++ b/rust/connlib/libs/common/src/control.rs @@ -320,14 +320,31 @@ impl PhoenixMessage { } } - pub fn new_reply(topic: impl Into, payload: R) -> Self { + pub fn new_ok_reply( + topic: impl Into, + payload: R, + reference: impl Into>, + ) -> Self { Self { topic: topic.into(), // There has to be a better way :\ payload: Payload::Reply(ReplyMessage::PhxReply(PhxReply::Ok(OkReply::Message( payload, )))), - reference: None, + reference: reference.into(), + } + } + + pub fn new_err_reply( + topic: impl Into, + error: ErrorInfo, + reference: impl Into>, + ) -> Self { + Self { + topic: topic.into(), + // There has to be a better way :\ + payload: Payload::Reply(ReplyMessage::PhxReply(PhxReply::Error(error))), + reference: reference.into(), } } } @@ -364,6 +381,7 @@ enum OkReply { pub enum ErrorInfo { Reason(String), Offline, + Disabled, } #[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)] diff --git a/rust/connlib/libs/common/src/error.rs b/rust/connlib/libs/common/src/error.rs index a272b6c5f..2215c3c66 100644 --- a/rust/connlib/libs/common/src/error.rs +++ b/rust/connlib/libs/common/src/error.rs @@ -12,6 +12,9 @@ pub enum ConnlibError { /// Standard IO error. #[error(transparent)] Io(#[from] std::io::Error), + /// Standard IO error. + #[error("Failed to roll over log file: {0}")] + LogFileRollError(std::io::Error), /// Error while decoding a base64 value. #[error("There was an error while decoding a base64 value: {0}")] Base64DecodeError(#[from] DecodeError),