diff --git a/rust/relay/src/main.rs b/rust/relay/src/main.rs index b5591cad2..6592e544e 100644 --- a/rust/relay/src/main.rs +++ b/rust/relay/src/main.rs @@ -19,13 +19,15 @@ use std::convert::Infallible; use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; use std::pin::Pin; use std::task::{ready, Poll}; -use std::time::SystemTime; +use std::time::{Duration, SystemTime}; use tracing::{level_filters::LevelFilter, Instrument, Subscriber}; use tracing_core::Dispatch; use tracing_stackdriver::CloudTraceConfiguration; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer}; use url::Url; +const STATS_LOG_INTERVAL: Duration = Duration::from_secs(10); + #[derive(Parser, Debug)] struct Args { /// The public (i.e. internet-reachable) IPv4 address of the relay server. @@ -121,7 +123,7 @@ async fn main() -> Result<()> { .instrument(span) .await? } else { - tracing::warn!("No portal token supplied, starting standalone mode"); + tracing::warn!(target: "relay", "No portal token supplied, starting standalone mode"); None }; @@ -130,7 +132,7 @@ async fn main() -> Result<()> { tokio::spawn(firezone_relay::health_check::serve(args.health_check_addr)); - tracing::info!("Listening for incoming traffic on UDP port 3478"); + tracing::info!(target: "relay", "Listening for incoming traffic on UDP port 3478"); future::poll_fn(|cx| eventloop.poll(cx)) .await @@ -158,7 +160,7 @@ async fn setup_tracing(args: &Args) -> Result<()> { Some(endpoint) => { let grpc_endpoint = format!("http://{endpoint}"); - tracing::trace!(%grpc_endpoint, "Setting up OTLP exporter for collector"); + tracing::trace!(target: "relay", %grpc_endpoint, "Setting up OTLP exporter for collector"); let exporter = opentelemetry_otlp::new_exporter() .tonic() @@ -174,7 +176,7 @@ async fn setup_tracing(args: &Args) -> Result<()> { .install_batch(opentelemetry::runtime::Tokio) .context("Failed to create OTLP trace pipeline")?; - tracing::trace!("Successfully initialized trace provider on tokio runtime"); + tracing::trace!(target: "relay", "Successfully initialized trace provider on tokio runtime"); let exporter = opentelemetry_otlp::new_exporter() .tonic() @@ -186,7 +188,7 @@ async fn setup_tracing(args: &Args) -> Result<()> { .build() .context("Failed to create OTLP metrics pipeline")?; - tracing::trace!("Successfully initialized metric controller on tokio runtime"); + tracing::trace!(target: "relay", "Successfully initialized metric controller on tokio runtime"); tracing_subscriber::registry() .with(log_layer(args)) @@ -223,7 +225,7 @@ where (LogFormat::Human, _) => tracing_subscriber::fmt::layer().boxed(), (LogFormat::Json, _) => tracing_subscriber::fmt::layer().json().boxed(), (LogFormat::GoogleCloud, None) => { - tracing::warn!("Emitting logs in Google Cloud format but without the project ID set. Spans will be emitted without IDs!"); + tracing::warn!(target: "relay", "Emitting logs in Google Cloud format but without the project ID set. Spans will be emitted without IDs!"); tracing_stackdriver::layer().boxed() } @@ -250,7 +252,7 @@ async fn connect_to_portal( use secrecy::ExposeSecret; if !url.path().is_empty() { - tracing::warn!("Overwriting path component of portal URL with '/relay/websocket'"); + tracing::warn!(target: "relay", "Overwriting path component of portal URL with '/relay/websocket'"); } url.set_path("relay/websocket"); @@ -296,7 +298,7 @@ fn make_rng(seed: Option) -> StdRng { return StdRng::from_entropy(); }; - tracing::info!("Seeding RNG from '{seed}'"); + tracing::info!(target: "relay", "Seeding RNG from '{seed}'"); StdRng::seed_from_u64(seed) } @@ -304,7 +306,7 @@ fn make_rng(seed: Option) -> StdRng { #[cfg(not(debug_assertions))] fn make_rng(seed: Option) -> StdRng { if seed.is_some() { - tracing::debug!("Ignoring rng-seed because we are running in release mode"); + tracing::debug!(target: "relay", "Ignoring rng-seed because we are running in release mode"); } StdRng::from_entropy() @@ -320,6 +322,9 @@ struct Eventloop { relay_data_sender: mpsc::Sender<(Vec, SocketAddr, AllocationId)>, relay_data_receiver: mpsc::Receiver<(Vec, SocketAddr, AllocationId)>, sleep: Sleep, + + stats_log_interval: tokio::time::Interval, + last_num_bytes_relayed: u64, } impl Eventloop @@ -363,6 +368,8 @@ where relay_data_sender, relay_data_receiver, sleep: Sleep::default(), + stats_log_interval: tokio::time::interval(STATS_LOG_INTERVAL), + last_num_bytes_relayed: 0, }) } @@ -398,7 +405,7 @@ where // Should never happen because we poll for readiness above. if e.is_full() { - tracing::warn!(%recipient, "Dropping message because channel to primary UDP socket task is full"); + tracing::warn!(target: "relay", %recipient, "Dropping message because channel to primary UDP socket task is full"); } } } @@ -417,11 +424,11 @@ where let _guard = span.enter(); if self.allocations.remove(&(id, family)).is_none() { - tracing::debug!("Unknown allocation {id}"); + tracing::debug!(target: "relay", "Unknown allocation {id}"); continue; }; - tracing::info!("Freeing addresses of allocation {id}"); + tracing::info!(target: "relay", "Freeing addresses of allocation {id}"); } Command::Wake { deadline } => { let span = tracing::error_span!("Command::Wake", ?deadline); @@ -429,12 +436,12 @@ where match deadline.duration_since(now) { Ok(duration) => { - tracing::trace!(?duration, "Suspending event loop") + tracing::trace!(target: "relay", ?duration, "Suspending event loop") } Err(e) => { let difference = e.duration(); - tracing::warn!( + tracing::warn!(target: "relay", ?difference, "Wake time is already in the past, waking now" ) @@ -450,7 +457,7 @@ where let mut allocation = match self.allocations.entry((id, receiver.family())) { Entry::Occupied(entry) => entry, Entry::Vacant(_) => { - tracing::debug!(allocation = %id, family = %receiver.family(), "Unknown allocation"); + tracing::debug!(target: "relay", allocation = %id, family = %receiver.family(), "Unknown allocation"); continue; } }; @@ -490,7 +497,7 @@ where // Priority 5: Handle portal messages match self.channel.as_mut().map(|c| c.poll(cx)) { Some(Poll::Ready(Err(Error::Serde(e)))) => { - tracing::warn!("Failed to deserialize portal message: {e}"); + tracing::warn!(target: "relay", "Failed to deserialize portal message: {e}"); continue; // This is not a hard-error, we can continue. } Some(Poll::Ready(Err(e))) => { @@ -500,7 +507,7 @@ where continue; } Some(Poll::Ready(Ok(Event::JoinedRoom { topic }))) => { - tracing::info!("Successfully joined room '{topic}'"); + tracing::info!(target: "relay", "Successfully joined room '{topic}'"); continue; } Some(Poll::Ready(Ok(Event::ErrorResponse { @@ -508,11 +515,11 @@ where req_id, reason, }))) => { - tracing::warn!("Request with ID {req_id} on topic {topic} failed: {reason}"); + tracing::warn!(target: "relay", "Request with ID {req_id} on topic {topic} failed: {reason}"); continue; } Some(Poll::Ready(Ok(Event::HeartbeatSent))) => { - tracing::debug!("Heartbeat sent to portal"); + tracing::debug!(target: "relay", "Heartbeat sent to portal"); continue; } Some(Poll::Ready(Ok( @@ -522,11 +529,38 @@ where | None => {} } + if self.stats_log_interval.poll_tick(cx).is_ready() { + let num_allocations = self.server.num_allocations(); + let num_channels = self.server.num_channels(); + + let bytes_relayed_since_last_tick = + self.server.num_relayed_bytes() - self.last_num_bytes_relayed; + self.last_num_bytes_relayed = self.server.num_relayed_bytes(); + + let avg_throughput = bytes_relayed_since_last_tick / STATS_LOG_INTERVAL.as_secs(); + + tracing::info!(target: "relay", "Allocations = {num_allocations} Channels = {num_channels} Throughput = {}", fmt_human_throughput(avg_throughput as f64)); + } + return Poll::Pending; } } } +fn fmt_human_throughput(mut throughput: f64) -> String { + let units = ["B/s", "kB/s", "MB/s", "GB/s", "TB/s"]; + + for unit in units { + if throughput < 1000.0 { + return format!("{throughput:.2} {unit}"); + } + + throughput /= 1000.0; + } + + format!("{throughput:.2} TB/s") +} + async fn main_udp_socket_task( family: AddressFamily, mut inbound_data_sender: mpsc::Sender<(Vec, SocketAddr)>, @@ -547,3 +581,16 @@ async fn main_udp_socket_task( } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn prints_humanfriendly_throughput() { + assert_eq!(fmt_human_throughput(42.0), "42.00 B/s"); + assert_eq!(fmt_human_throughput(1_234.0), "1.23 kB/s"); + assert_eq!(fmt_human_throughput(955_333_999.0), "955.33 MB/s"); + assert_eq!(fmt_human_throughput(100_000_000_000.0), "100.00 GB/s"); + } +} diff --git a/rust/relay/src/server.rs b/rust/relay/src/server.rs index 24e1d0fac..b7203ccd9 100644 --- a/rust/relay/src/server.rs +++ b/rust/relay/src/server.rs @@ -76,6 +76,7 @@ pub struct Server { allocations_up_down_counter: UpDownCounter, data_relayed_counter: Counter, + data_relayed: u64, // Keep a separate counter because `Counter` doesn't expose the current value :( responses_counter: Counter, } @@ -173,7 +174,7 @@ where let data_relayed_counter = meter .u64_counter("data_relayed_bytes") .with_description("The number of bytes relayed") - .with_unit(Unit::new("kB")) + .with_unit(Unit::new("b")) .init(); Self { @@ -196,6 +197,7 @@ where allocations_up_down_counter, responses_counter, data_relayed_counter, + data_relayed: 0, } } @@ -210,6 +212,18 @@ where self.nonces.add_new(nonce); } + pub fn num_relayed_bytes(&self) -> u64 { + self.data_relayed + } + + pub fn num_allocations(&self) -> usize { + self.allocations.len() + } + + pub fn num_channels(&self) -> usize { + self.channels_by_client_and_number.len() + } + /// Process the bytes received from a client. /// /// After calling this method, you should call [`Server::next_command`] until it returns `None`. @@ -351,6 +365,7 @@ where tracing::debug!(target: "relay", "Relaying {} bytes", bytes.len()); self.data_relayed_counter.add(bytes.len() as u64, &[]); + self.data_relayed += bytes.len() as u64; let data = ChannelData::new(*channel_number, bytes).to_bytes(); @@ -750,6 +765,7 @@ where tracing::debug!(target: "relay", "Relaying {} bytes", data.len()); self.data_relayed_counter.add(data.len() as u64, &[]); + self.data_relayed += data.len() as u64; if tracing::enabled!(target: "wire", tracing::Level::TRACE) { let hex_bytes = hex::encode(data);