feat: upload logs to GCP on compile-time configurable interval (#2103)

Resolves: #2020.

---------

Co-authored-by: Jamil Bou Kheir <jamilbk@users.noreply.github.com>
This commit is contained in:
Thomas Eizinger
2023-09-29 10:03:03 +10:00
committed by GitHub
parent e446138150
commit bb9dc1aeac
13 changed files with 525 additions and 69 deletions

135
rust/Cargo.lock generated
View File

@@ -1087,6 +1087,15 @@ dependencies = [
"zeroize",
]
[[package]]
name = "encoding_rs"
version = "0.8.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7268b386296a025e474d5140678f75d6de9493ae55a5d709eeb9dd08149945e1"
dependencies = [
"cfg-if 1.0.0",
]
[[package]]
name = "env_logger"
version = "0.10.0"
@@ -1154,11 +1163,14 @@ dependencies = [
"firezone-tunnel",
"libs-common",
"rand",
"reqwest",
"secrecy",
"serde",
"serde_json",
"time",
"tokio",
"tokio-tungstenite",
"tokio-util",
"tracing",
"tracing-android",
"tracing-appender",
@@ -1577,6 +1589,20 @@ dependencies = [
"want",
]
[[package]]
name = "hyper-rustls"
version = "0.24.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d78e1e73ec14cf7375674f74d7dde185c8206fd9dea6fb6295e8a98098aaa97"
dependencies = [
"futures-util",
"http",
"hyper",
"rustls 0.21.7",
"tokio",
"tokio-rustls",
]
[[package]]
name = "hyper-timeout"
version = "0.4.1"
@@ -2765,6 +2791,47 @@ dependencies = [
"webrtc",
]
[[package]]
name = "reqwest"
version = "0.11.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e9ad3fe7488d7e34558a2033d45a0c90b72d97b4f80705666fea71472e2e6a1"
dependencies = [
"base64 0.21.4",
"bytes",
"encoding_rs",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"hyper",
"hyper-rustls",
"ipnet",
"js-sys",
"log",
"mime",
"once_cell",
"percent-encoding",
"pin-project-lite",
"rustls 0.21.7",
"rustls-pemfile",
"serde",
"serde_json",
"serde_urlencoded",
"tokio",
"tokio-rustls",
"tokio-util",
"tower-service",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"wasm-streams",
"web-sys",
"webpki-roots 0.25.2",
"winreg",
]
[[package]]
name = "rfc6979"
version = "0.3.1"
@@ -3104,6 +3171,18 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_urlencoded"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd"
dependencies = [
"form_urlencoded",
"itoa",
"ryu",
"serde",
]
[[package]]
name = "sha1"
version = "0.2.0"
@@ -3469,9 +3548,9 @@ dependencies = [
[[package]]
name = "time"
version = "0.3.28"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17f6bb557fd245c28e6411aa56b6403c689ad95061f50e4be16c274e70a17e48"
checksum = "426f806f4089c493dcac0d24c29c01e2c38baf8e30f1b716ee37e83d200b18fe"
dependencies = [
"deranged",
"itoa",
@@ -3482,15 +3561,15 @@ dependencies = [
[[package]]
name = "time-core"
version = "0.1.1"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb"
checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3"
[[package]]
name = "time-macros"
version = "0.2.14"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a942f44339478ef67935ab2bbaec2fb0322496cf3cbe84b261e06ac3814c572"
checksum = "4ad70d68dba9e1f8aceda7aa6711965dfec1cac869f311a51bd08b3a2ccbce20"
dependencies = [
"time-core",
]
@@ -3584,7 +3663,7 @@ dependencies = [
"tokio",
"tokio-rustls",
"tungstenite",
"webpki-roots",
"webpki-roots 0.23.1",
]
[[package]]
@@ -3956,6 +4035,7 @@ dependencies = [
"form_urlencoded",
"idna",
"percent-encoding",
"serde",
]
[[package]]
@@ -4066,6 +4146,18 @@ dependencies = [
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-futures"
version = "0.4.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c02dbc21516f9f1f04f187958890d7e6026df8d16540b7ad9492bc34a67cea03"
dependencies = [
"cfg-if 1.0.0",
"js-sys",
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.87"
@@ -4095,6 +4187,19 @@ version = "0.2.87"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1"
[[package]]
name = "wasm-streams"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4609d447824375f43e1ffbc051b50ad8f4b3ae8219680c94452ea05eb240ac7"
dependencies = [
"futures-util",
"js-sys",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
]
[[package]]
name = "web-sys"
version = "0.3.64"
@@ -4134,6 +4239,12 @@ dependencies = [
"rustls-webpki 0.100.3",
]
[[package]]
name = "webpki-roots"
version = "0.25.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14247bb57be4f377dfb94c72830b8ce8fc6beac03cf4bf7b9732eadd414123fc"
[[package]]
name = "webrtc"
version = "0.8.0"
@@ -4533,6 +4644,16 @@ version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538"
[[package]]
name = "winreg"
version = "0.50.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1"
dependencies = [
"cfg-if 1.0.0",
"windows-sys 0.48.0",
]
[[package]]
name = "wintun"
version = "0.3.1"

View File

@@ -11,6 +11,7 @@ use jni::{
JNIEnv, JavaVM,
};
use secrecy::SecretString;
use std::path::Path;
use std::sync::OnceLock;
use std::{
net::{Ipv4Addr, Ipv6Addr},
@@ -18,13 +19,13 @@ use std::{
path::PathBuf,
};
use thiserror::Error;
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::prelude::*;
use tracing_subscriber::EnvFilter;
pub struct CallbackHandler {
vm: JavaVM,
callback_handler: GlobalRef,
handle: file_logger::Handle,
}
impl Clone for CallbackHandler {
@@ -37,6 +38,7 @@ impl Clone for CallbackHandler {
Self {
vm: unsafe { std::ptr::read(&self.vm) },
callback_handler: self.callback_handler.clone(),
handle: self.handle.clone(),
}
}
}
@@ -99,7 +101,7 @@ where
tracing_subscriber::layer::Identity::new()
}
fn init_logging(log_dir: PathBuf, log_filter: String) {
fn init_logging(log_dir: &Path, log_filter: String) -> file_logger::Handle {
// On Android, logging state is persisted indefinitely after the System.loadLibrary
// call, which means that a disconnect and tunnel process restart will not
// reinitialize the guard. This is a problem because the guard remains tied to
@@ -108,21 +110,23 @@ fn init_logging(log_dir: PathBuf, log_filter: String) {
//
// So we use a static variable to track whether the guard has been initialized and avoid
// re-initialized it if so.
static LOGGING_GUARD: OnceLock<WorkerGuard> = OnceLock::new();
if LOGGING_GUARD.get().is_some() {
return;
static LOGGING_HANDLE: OnceLock<file_logger::Handle> = OnceLock::new();
if let Some(handle) = LOGGING_HANDLE.get() {
return handle.clone();
}
let (file_layer, guard) = file_logger::layer(log_dir);
let (file_layer, handle) = file_logger::layer(log_dir);
LOGGING_GUARD
.set(guard)
LOGGING_HANDLE
.set(handle.clone())
.expect("Logging guard should never be initialized twice");
let _ = tracing_subscriber::registry()
.with(file_layer.with_filter(EnvFilter::new(log_filter)))
.with(android_layer())
.try_init();
handle
}
impl Callbacks for CallbackHandler {
@@ -283,6 +287,15 @@ impl Callbacks for CallbackHandler {
)
})
}
fn roll_log_file(&self) -> Option<PathBuf> {
self.handle.roll_to_new_file().unwrap_or_else(|e| {
tracing::debug!("Failed to roll over to new file: {e}");
let _ = self.on_error(&Error::LogFileRollError(e));
None
})
}
}
fn throw(env: &mut JNIEnv, class: &str, msg: impl Into<JNIString>) {
@@ -348,13 +361,15 @@ fn connect(
let device_id = string_from_jstring!(env, device_id);
let log_dir = string_from_jstring!(env, log_dir);
let log_filter = string_from_jstring!(env, log_filter);
let handle = init_logging(&PathBuf::from(log_dir), log_filter);
let callback_handler = CallbackHandler {
vm: env.get_java_vm().map_err(ConnectError::GetJavaVmFailed)?,
callback_handler,
handle,
};
init_logging(log_dir.into(), log_filter);
let session = Session::connect(portal_url.as_str(), secret, device_id, callback_handler)?;
Ok(session)

View File

@@ -10,7 +10,6 @@ use std::{
path::PathBuf,
sync::Arc,
};
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::prelude::*;
use tracing_subscriber::EnvFilter;
@@ -65,10 +64,7 @@ mod ffi {
}
/// This is used by the apple client to interact with our code.
pub struct WrappedSession {
session: Session<CallbackHandler>,
_guard: WorkerGuard,
}
pub struct WrappedSession(Session<CallbackHandler>);
// SAFETY: `CallbackHandler.swift` promises to be thread-safe.
// TODO: Uphold that promise!
@@ -76,11 +72,13 @@ unsafe impl Send for ffi::CallbackHandler {}
unsafe impl Sync for ffi::CallbackHandler {}
#[derive(Clone)]
#[repr(transparent)]
// Generated Swift opaque type wrappers have a `Drop` impl that decrements the
// refcount, but there's no way to generate a `Clone` impl that increments the
// recount. Instead, we just wrap it in an `Arc`.
pub struct CallbackHandler(Arc<ffi::CallbackHandler>);
pub struct CallbackHandler {
// Generated Swift opaque type wrappers have a `Drop` impl that decrements the
// refcount, but there's no way to generate a `Clone` impl that increments the
// recount. Instead, we just wrap it in an `Arc`.
inner: Arc<ffi::CallbackHandler>,
handle: file_logger::Handle,
}
impl Callbacks for CallbackHandler {
type Error = std::convert::Infallible;
@@ -92,7 +90,7 @@ impl Callbacks for CallbackHandler {
dns_address: Ipv4Addr,
dns_fallback_strategy: String,
) -> Result<RawFd, Self::Error> {
self.0.on_set_interface_config(
self.inner.on_set_interface_config(
tunnel_address_v4.to_string(),
tunnel_address_v6.to_string(),
dns_address.to_string(),
@@ -102,17 +100,17 @@ impl Callbacks for CallbackHandler {
}
fn on_tunnel_ready(&self) -> Result<(), Self::Error> {
self.0.on_tunnel_ready();
self.inner.on_tunnel_ready();
Ok(())
}
fn on_add_route(&self, route: IpNetwork) -> Result<(), Self::Error> {
self.0.on_add_route(route.to_string());
self.inner.on_add_route(route.to_string());
Ok(())
}
fn on_remove_route(&self, route: IpNetwork) -> Result<(), Self::Error> {
self.0.on_remove_route(route.to_string());
self.inner.on_remove_route(route.to_string());
Ok(())
}
@@ -120,7 +118,7 @@ impl Callbacks for CallbackHandler {
&self,
resource_list: Vec<ResourceDescription>,
) -> Result<(), Self::Error> {
self.0.on_update_resources(
self.inner.on_update_resources(
serde_json::to_string(&resource_list)
.expect("developer error: failed to serialize resource list"),
);
@@ -128,19 +126,28 @@ impl Callbacks for CallbackHandler {
}
fn on_disconnect(&self, error: Option<&Error>) -> Result<(), Self::Error> {
self.0
self.inner
.on_disconnect(error.map(ToString::to_string).unwrap_or_default());
Ok(())
}
fn on_error(&self, error: &Error) -> Result<(), Self::Error> {
self.0.on_error(error.to_string());
self.inner.on_error(error.to_string());
Ok(())
}
fn roll_log_file(&self) -> Option<PathBuf> {
self.handle.roll_to_new_file().unwrap_or_else(|e| {
tracing::debug!("Failed to roll over to new file: {e}");
let _ = self.on_error(&Error::LogFileRollError(e));
None
})
}
}
fn init_logging(log_dir: PathBuf, log_filter: String) -> WorkerGuard {
let (file_layer, guard) = file_logger::layer(log_dir.clone());
fn init_logging(log_dir: PathBuf, log_filter: String) -> file_logger::Handle {
let (file_layer, handle) = file_logger::layer(&log_dir);
let _ = tracing_subscriber::registry()
.with(tracing_oslog::OsLogger::new(
@@ -150,7 +157,7 @@ fn init_logging(log_dir: PathBuf, log_filter: String) -> WorkerGuard {
.with(file_layer.with_filter(EnvFilter::new(log_filter)))
.try_init();
guard
handle
}
impl WrappedSession {
@@ -162,21 +169,23 @@ impl WrappedSession {
log_filter: String,
callback_handler: ffi::CallbackHandler,
) -> Result<Self, String> {
let _guard = init_logging(log_dir.into(), log_filter);
let secret = SecretString::from(token);
let session = Session::connect(
portal_url.as_str(),
secret,
device_id,
CallbackHandler(callback_handler.into()),
CallbackHandler {
inner: Arc::new(callback_handler),
handle: init_logging(log_dir.into(), log_filter),
},
)
.map_err(|err| err.to_string())?;
Ok(Self { session, _guard })
Ok(Self(session))
}
fn disconnect(&mut self) {
self.session.disconnect(None)
self.0.disconnect(None)
}
}

View File

@@ -1,6 +1,6 @@
use anyhow::Result;
use clap::Parser;
use firezone_client_connlib::{file_logger, get_device_id, Callbacks, Session};
use firezone_client_connlib::{file_logger, get_device_id, Callbacks, Error, Session};
use headless_utils::{block_on_ctrl_c, setup_global_subscriber, CommonArgs};
use secrecy::SecretString;
use std::path::PathBuf;
@@ -8,8 +8,7 @@ use std::path::PathBuf;
fn main() -> Result<()> {
let cli = Cli::parse();
let (layer, _guard) = cli.log_dir.map(file_logger::layer).unzip();
let (layer, handle) = cli.log_dir.as_deref().map(file_logger::layer).unzip();
setup_global_subscriber(layer);
let device_id = get_device_id();
@@ -18,7 +17,7 @@ fn main() -> Result<()> {
cli.common.url,
SecretString::from(cli.common.secret),
device_id,
CallbackHandler,
CallbackHandler { handle },
)
.unwrap();
tracing::info!("new_session");
@@ -30,10 +29,24 @@ fn main() -> Result<()> {
}
#[derive(Clone)]
struct CallbackHandler;
struct CallbackHandler {
handle: Option<file_logger::Handle>,
}
impl Callbacks for CallbackHandler {
type Error = std::convert::Infallible;
fn roll_log_file(&self) -> Option<PathBuf> {
self.handle
.as_ref()?
.roll_to_new_file()
.unwrap_or_else(|e| {
tracing::debug!("Failed to roll over to new file: {e}");
let _ = self.on_error(&Error::LogFileRollError(e));
None
})
}
}
#[derive(Parser)]

View File

@@ -7,8 +7,9 @@ edition = "2021"
mock = ["libs-common/mock"]
[dependencies]
tokio = { version = "1.32", default-features = false, features = ["sync", "rt"] }
tokio-util = "0.7.9"
secrecy = { workspace = true }
tokio = { version = "1.32", default-features = false, features = ["sync"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
tracing-appender = { version = "0.2.2" }
@@ -20,7 +21,9 @@ serde = { version = "1.0", default-features = false, features = ["std", "derive"
boringtun = { workspace = true }
backoff = { workspace = true }
webrtc = "0.8"
url = { version = "2.4.1", default-features = false }
url = { version = "2.4.1", features = ["serde"] }
time = { version = "0.3.29", features = ["formatting"] }
reqwest = { version = "0.11.20", default-features = false, features = ["stream", "rustls-tls"] }
rand = { version = "0.8", default-features = false, features = ["std"] }
tokio-tungstenite = { version = "0.19", default-features = false, features = ["connect", "handshake", "rustls-tls-webpki-roots"] }

View File

@@ -0,0 +1,28 @@
use firezone_client_connlib::file_logger;
use std::path::Path;
use std::time::Duration;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{EnvFilter, Layer};
fn main() {
let log_dir = Path::new("./target");
println!("Logging to {}", log_dir.canonicalize().unwrap().display());
let (file_layer, handle) = file_logger::layer(log_dir);
tracing_subscriber::registry()
.with(file_layer.with_filter(EnvFilter::new("info")))
.init();
tracing::info!("First log");
tracing::info!("Second log");
std::thread::sleep(Duration::from_secs(1)); // Make sure we don't use the same filename.
handle.roll_to_new_file().unwrap().unwrap();
tracing::info!("Third log");
tracing::info!("Fourth log");
}

View File

@@ -1,4 +1,5 @@
use std::sync::Arc;
use std::path::PathBuf;
use std::{io, sync::Arc};
use crate::messages::{
BroadcastGatewayIceCandidates, Connect, ConnectionDetails, EgressMessages,
@@ -16,6 +17,8 @@ use webrtc::ice_transport::ice_candidate::RTCIceCandidate;
use async_trait::async_trait;
use firezone_tunnel::{ConnId, ControlSignal, Request, Tunnel};
use tokio::sync::Mutex;
use tokio_util::codec::{BytesCodec, FramedRead};
use url::Url;
#[async_trait]
impl ControlSignal for ControlSignaler {
@@ -78,7 +81,7 @@ pub struct ControlSignaler {
impl<CB: Callbacks + 'static> ControlPlane<CB> {
#[tracing::instrument(level = "trace", skip(self))]
pub async fn init(
async fn init(
&mut self,
InitClient {
interface,
@@ -239,6 +242,17 @@ impl<CB: Callbacks + 'static> ControlPlane<CB> {
Messages::ResourceRemoved(resource) => self.remove_resource(resource.id),
Messages::ResourceUpdated(resource) => self.update_resource(resource),
Messages::IceCandidates(ice_candidate) => self.add_ice_candidate(ice_candidate).await,
Messages::SignedLogUrl(url) => {
let Some(path) = self.tunnel.callbacks().roll_log_file() else {
return Ok(())
};
tokio::spawn(async move {
if let Err(e) = upload(path, url).await {
tracing::warn!("Failed to upload log file: {e}")
}
});
}
}
Ok(())
}
@@ -277,4 +291,47 @@ impl<CB: Callbacks + 'static> ControlPlane<CB> {
pub async fn stats_event(&mut self) {
tracing::debug!(target: "tunnel_state", stats = ?self.tunnel.stats());
}
pub async fn request_log_upload_url(&mut self) {
tracing::info!("Requesting log upload URL from portal");
let _ = self
.control_signaler
.control_signal
.send(EgressMessages::CreateLogSink {})
.await;
}
}
async fn upload(path: PathBuf, url: Url) -> io::Result<()> {
tracing::info!(path = %path.display(), %url, "Uploading log file");
let file = tokio::fs::File::open(&path).await?;
let response = reqwest::Client::new()
.put(url)
.body(reqwest::Body::wrap_stream(FramedRead::new(
file,
BytesCodec::default(),
)))
.send()
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
let status_code = response.status();
if !status_code.is_success() {
let body = response
.text()
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
tracing::warn!(%body, %status_code, "Failed to upload logs");
return Err(io::Error::new(
io::ErrorKind::Other,
"portal returned non-successful exit code",
));
}
Ok(())
}

View File

@@ -13,31 +13,146 @@
//! - Device serials
//! - MAC addresses
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex, MutexGuard};
use std::{fs, io};
use time::OffsetDateTime;
use tracing::Subscriber;
use tracing_appender::non_blocking::{NonBlocking, WorkerGuard};
use tracing_subscriber::Layer;
const LOG_FILE_BASE_NAME: &str = "connlib.log";
/// Create a new file logger layer.
pub fn layer<T>(
log_dir: PathBuf,
) -> (
Box<dyn Layer<T> + Send + Sync + 'static>,
tracing_appender::non_blocking::WorkerGuard,
)
pub fn layer<T>(log_dir: &Path) -> (Box<dyn Layer<T> + Send + Sync + 'static>, Handle)
where
T: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
{
let (writer, guard) = tracing_appender::non_blocking(tracing_appender::rolling::hourly(
log_dir,
LOG_FILE_BASE_NAME,
));
let layer = tracing_stackdriver::layer().with_writer(writer).boxed();
let (appender, handle) = new_appender(log_dir.to_path_buf());
let layer = tracing_stackdriver::layer().with_writer(appender).boxed();
// Return the guard so that the caller maintains a handle to it. Otherwise,
// we have to wait for tracing_appender to flush the logs before exiting.
// See https://docs.rs/tracing-appender/latest/tracing_appender/non_blocking/struct.WorkerGuard.html
(layer, guard)
(layer, handle)
}
fn new_appender(directory: PathBuf) -> (NonBlocking, Handle) {
let inner = Arc::new(Mutex::new(Inner {
directory,
current: None,
}));
let appender = Appender {
inner: inner.clone(),
};
let (non_blocking, guard) = tracing_appender::non_blocking(appender);
let handle = Handle {
inner,
_guard: Arc::new(guard),
};
(non_blocking, handle)
}
/// A handle to our file-logger.
///
/// This handle allows to roll over logging to a new file via [`Handle::roll_to_new_file`]. It also houses the [`WorkerGuard`] of the underlying non-blocking appender.
/// Thus, you MUST NOT drop this handle for as long as you want messages to arrive at the log file.
#[derive(Clone, Debug)]
pub struct Handle {
inner: Arc<Mutex<Inner>>,
_guard: Arc<WorkerGuard>,
}
impl Handle {
/// Rolls over to a new file.
///
/// Returns the path to the now unused, previous log file.
/// If we don't have a log-file yet, `Ok(None)` is returned.
pub fn roll_to_new_file(&self) -> io::Result<Option<PathBuf>> {
let mut inner = try_unlock(&self.inner);
let new_writer = inner.create_new_writer()?;
let Some((_, name)) = inner.current.replace(new_writer) else {
return Ok(None)
};
Ok(Some(inner.directory.join(name)))
}
}
#[derive(Debug)]
struct Appender {
inner: Arc<Mutex<Inner>>,
}
#[derive(Debug)]
struct Inner {
directory: PathBuf,
current: Option<(fs::File, String)>,
}
impl Inner {
fn with_current_file<R>(
&mut self,
cb: impl Fn(&mut fs::File) -> io::Result<R>,
) -> io::Result<R> {
match self.current.as_mut() {
None => {
let (mut file, name) = self.create_new_writer()?;
let ret = cb(&mut file);
self.current = Some((file, name));
ret
}
Some((file, _)) => cb(file),
}
}
// Inspired from `tracing-appender/src/rolling.rs`.
fn create_new_writer(&self) -> io::Result<(fs::File, String)> {
let format =
time::format_description::parse("[year]-[month]-[day]-[hour]-[minute]-[second]")
.expect("static format description to be valid");
let date = OffsetDateTime::now_utc()
.format(&format)
.expect("static format description to be valid");
let filename = format!("{LOG_FILE_BASE_NAME}.{date}");
let path = self.directory.join(&filename);
let mut open_options = fs::OpenOptions::new();
open_options.append(true).create(true);
let new_file = open_options.open(path.as_path());
if new_file.is_err() {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
let file = open_options.open(path)?;
return Ok((file, filename));
}
}
let file = new_file?;
Ok((file, filename))
}
}
impl io::Write for Appender {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
try_unlock(&self.inner).with_current_file(|f| f.write(buf))
}
fn flush(&mut self) -> io::Result<()> {
try_unlock(&self.inner).with_current_file(|f| f.flush())
}
}
fn try_unlock(inner: &Mutex<Inner>) -> MutexGuard<'_, Inner> {
inner.lock().unwrap_or_else(|e| e.into_inner())
}

View File

@@ -166,7 +166,8 @@ where
};
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(10));
let mut log_stats_interval = tokio::time::interval(Duration::from_secs(10));
let mut upload_logs_interval = tokio::time::interval(upload_interval_from_env_or_default());
loop {
tokio::select! {
Some((msg, reference)) = control_plane_receiver.recv() => {
@@ -175,10 +176,12 @@ where
Err(err) => control_plane.handle_error(err, reference).await,
}
},
_ = interval.tick() => control_plane.stats_event().await,
_ = log_stats_interval.tick() => control_plane.stats_event().await,
_ = upload_logs_interval.tick() => control_plane.request_log_upload_url().await,
else => break
}
}
Result::Ok(())
});
@@ -246,3 +249,31 @@ where
Self::disconnect_inner(self.runtime_stopper.clone(), &self.callbacks, error)
}
}
/// Parses an interval from the _compile-time_ env variable `CONNLIB_LOG_UPLOAD_INTERVAL_SECS`.
///
/// If not present or parsing as u64 fails, we fall back to a default interval of 1 hour.
fn upload_interval_from_env_or_default() -> Duration {
const DEFAULT: Duration = Duration::from_secs(60 * 60);
let Some(interval) = option_env!("CONNLIB_LOG_UPLOAD_INTERVAL_SECS") else {
tracing::warn!(interval = ?DEFAULT, "Env variable `CONNLIB_LOG_UPLOAD_INTERVAL_SECS` was not set during compile-time, falling back to default");
return DEFAULT
};
let interval = match interval.parse() {
Ok(i) => i,
Err(e) => {
tracing::warn!(interval = ?DEFAULT, "Failed to parse `CONNLIB_LOG_UPLOAD_INTERVAL_SECS` as u64: {e}");
return DEFAULT;
}
};
tracing::info!(
?interval,
"Using upload interval specified at compile-time via `CONNLIB_LOG_UPLOAD_INTERVAL_SECS`"
);
Duration::from_secs(interval)
}

View File

@@ -7,6 +7,7 @@ use libs_common::messages::{
GatewayId, Interface, Key, Relay, RequestConnection, ResourceDescription, ResourceId,
ReuseConnection,
};
use url::Url;
use webrtc::ice_transport::ice_candidate::RTCIceCandidateInit;
#[derive(Debug, PartialEq, Eq, Deserialize, Serialize, Clone)]
@@ -88,6 +89,8 @@ pub struct GatewayIceCandidates {
pub enum ReplyMessages {
ConnectionDetails(ConnectionDetails),
Connect(Connect),
/// Response for [`EgressMessages::CreateLogSink`].
SignedLogUrl(Url),
}
/// The totality of all messages (might have a macro in the future to derive the other types)
@@ -97,6 +100,7 @@ pub enum Messages {
Init(InitClient),
ConnectionDetails(ConnectionDetails),
Connect(Connect),
SignedLogUrl(Url),
// Resources: arrive in an orderly fashion
ResourceAdded(ResourceDescription),
@@ -123,6 +127,7 @@ impl From<ReplyMessages> for Messages {
match value {
ReplyMessages::ConnectionDetails(m) => Self::ConnectionDetails(m),
ReplyMessages::Connect(m) => Self::Connect(m),
ReplyMessages::SignedLogUrl(url) => Self::SignedLogUrl(url),
}
}
}
@@ -138,6 +143,7 @@ pub enum EgressMessages {
resource_id: ResourceId,
connected_gateway_ids: Vec<GatewayId>,
},
CreateLogSink {},
RequestConnection(RequestConnection),
ReuseConnection(ReuseConnection),
BroadcastIceCandidates(BroadcastGatewayIceCandidates),
@@ -154,6 +160,7 @@ mod test {
};
use chrono::NaiveDateTime;
use libs_common::control::ErrorInfo;
use crate::messages::{ConnectionDetails, EgressMessages, ReplyMessages};
@@ -270,7 +277,7 @@ mod test {
#[test]
fn connection_details_reply() {
let m = PhoenixMessage::<IngressMessages, ReplyMessages>::new_reply(
let m = PhoenixMessage::<IngressMessages, ReplyMessages>::new_ok_reply(
"client",
ReplyMessages::ConnectionDetails(ConnectionDetails {
gateway_id: "73037362-715d-4a83-a749-f18eadd970e6".parse().unwrap(),
@@ -301,6 +308,7 @@ mod test {
}),
],
}),
None,
);
let message = r#"
{
@@ -342,4 +350,34 @@ mod test {
let reply_message = serde_json::from_str(message).unwrap();
assert_eq!(m, reply_message);
}
#[test]
fn create_log_sink_error_response() {
let json = r#"{"event":"phx_reply","ref":"unique_log_sink_ref","topic":"client","payload":{"status":"error","response":"disabled"}}"#;
let actual =
serde_json::from_str::<PhoenixMessage<EgressMessages, ReplyMessages>>(json).unwrap();
let expected = PhoenixMessage::new_err_reply(
"client",
ErrorInfo::Disabled,
"unique_log_sink_ref".to_owned(),
);
assert_eq!(actual, expected)
}
#[test]
fn create_log_sink_ok_response() {
let json = r#"{"event":"phx_reply","ref":"unique_log_sink_ref","topic":"client","payload":{"status":"ok","response":"https://storage.googleapis.com/foo/bar"}}"#;
let actual =
serde_json::from_str::<PhoenixMessage<EgressMessages, ReplyMessages>>(json).unwrap();
let expected = PhoenixMessage::new_ok_reply(
"client",
ReplyMessages::SignedLogUrl("https://storage.googleapis.com/foo/bar".parse().unwrap()),
"unique_log_sink_ref".to_owned(),
);
assert_eq!(actual, expected)
}
}

View File

@@ -3,6 +3,7 @@ use ip_network::IpNetwork;
use std::error::Error;
use std::fmt::{Debug, Display};
use std::net::{Ipv4Addr, Ipv6Addr};
use std::path::PathBuf;
// Avoids having to map types for Windows
type RawFd = i32;
@@ -63,4 +64,8 @@ pub trait Callbacks: Clone + Send + Sync {
tracing::warn!(error = ?error);
Ok(())
}
fn roll_log_file(&self) -> Option<PathBuf> {
None
}
}

View File

@@ -320,14 +320,31 @@ impl<T, R> PhoenixMessage<T, R> {
}
}
pub fn new_reply(topic: impl Into<String>, payload: R) -> Self {
pub fn new_ok_reply(
topic: impl Into<String>,
payload: R,
reference: impl Into<Option<String>>,
) -> Self {
Self {
topic: topic.into(),
// There has to be a better way :\
payload: Payload::Reply(ReplyMessage::PhxReply(PhxReply::Ok(OkReply::Message(
payload,
)))),
reference: None,
reference: reference.into(),
}
}
pub fn new_err_reply(
topic: impl Into<String>,
error: ErrorInfo,
reference: impl Into<Option<String>>,
) -> Self {
Self {
topic: topic.into(),
// There has to be a better way :\
payload: Payload::Reply(ReplyMessage::PhxReply(PhxReply::Error(error))),
reference: reference.into(),
}
}
}
@@ -364,6 +381,7 @@ enum OkReply<T> {
pub enum ErrorInfo {
Reason(String),
Offline,
Disabled,
}
#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)]

View File

@@ -12,6 +12,9 @@ pub enum ConnlibError {
/// Standard IO error.
#[error(transparent)]
Io(#[from] std::io::Error),
/// Standard IO error.
#[error("Failed to roll over log file: {0}")]
LogFileRollError(std::io::Error),
/// Error while decoding a base64 value.
#[error("There was an error while decoding a base64 value: {0}")]
Base64DecodeError(#[from] DecodeError),