From c9834ee8ee0068ad145d8b66eb45520e9d3d390c Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 29 Jan 2024 10:52:15 -0800 Subject: [PATCH] feat(relay): print stats every 10s (#3408) In #3400, a discussion started on what the correct log level would be for the production relay. Currently, the relay logs some stats about each packet on debug, i.e. where it came from, where it is going to and how big it is. This isn't very useful in production though and will fill up our log disk quickly. This PR introduces a stats timer like we already have it in other components. We print the number of allocations, how many channels we have and how much data we relayed over all these channels since we last printed. The interval is currently set to 10 seconds. Here is what this output could look like (captured locally using `relay/run_smoke_test.sh`, although slightly tweaked, printing ever 2s, using release mode and larger packets on the clients): ``` 2024-01-26T05:01:02.445555Z INFO relay: Seeding RNG from '0' 2024-01-26T05:01:02.445580Z WARN relay: No portal token supplied, starting standalone mode 2024-01-26T05:01:02.445827Z INFO relay: Listening for incoming traffic on UDP port 3478 2024-01-26T05:01:02.447035Z INFO Eventloop::poll: relay: num_allocations=0 num_channels=0 throughput=0.00 B/s 2024-01-26T05:01:02.649194Z INFO Eventloop::poll:handle_client_input{sender=127.0.0.1:39092 transaction_id="8f20177512495fcb563c60de" allocation=AID-1}: relay: Created new allocation first_relay_address=127.0.0.1 lifetime=600s 2024-01-26T05:01:02.650744Z INFO Eventloop::poll:handle_client_input{sender=127.0.0.1:39092 transaction_id="6445943a353d5e8c262a821f" allocation=AID-1 peer=127.0.0.1:41094 channel=16384}: relay: Successfully bound channel 2024-01-26T05:01:04.446317Z INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=631.54 MB/s 2024-01-26T05:01:06.446319Z INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=698.73 MB/s 2024-01-26T05:01:08.446325Z INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=708.98 MB/s 2024-01-26T05:01:10.446324Z INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=690.79 MB/s 2024-01-26T05:01:12.446316Z INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=715.53 MB/s 2024-01-26T05:01:14.446315Z INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=706.90 MB/s 2024-01-26T05:01:16.446313Z INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=712.03 MB/s 2024-01-26T05:01:18.446319Z INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=717.54 MB/s 2024-01-26T05:01:20.446316Z INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=690.74 MB/s 2024-01-26T05:01:22.446313Z INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=705.08 MB/s 2024-01-26T05:01:24.446311Z INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=700.41 MB/s 2024-01-26T05:01:26.446319Z INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=717.57 MB/s 2024-01-26T05:01:28.446320Z INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=688.82 MB/s 2024-01-26T05:01:30.446329Z INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=696.35 MB/s 2024-01-26T05:01:32.446317Z INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=724.03 MB/s 2024-01-26T05:01:34.446320Z INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=713.46 MB/s 2024-01-26T05:01:36.446314Z INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=716.13 MB/s 2024-01-26T05:01:38.446327Z INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=687.16 MB/s 2024-01-26T05:01:40.446315Z INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=708.20 MB/s 2024-01-26T05:01:42.446314Z INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=689.36 MB/s 2024-01-26T05:01:44.446314Z INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=698.62 MB/s 2024-01-26T05:01:46.446315Z INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=696.21 MB/s 2024-01-26T05:01:48.446378Z INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=696.36 MB/s 2024-01-26T05:01:50.446314Z INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=709.47 MB/s 2024-01-26T05:01:52.446319Z INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=714.48 MB/s 2024-01-26T05:01:54.446323Z INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=690.71 MB/s 2024-01-26T05:01:56.446313Z INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=692.70 MB/s 2024-01-26T05:01:58.446321Z INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=687.87 MB/s 2024-01-26T05:02:00.446316Z INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=682.11 MB/s 2024-01-26T05:02:02.446312Z INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=700.07 MB/s ``` --- rust/relay/src/main.rs | 87 +++++++++++++++++++++++++++++++--------- rust/relay/src/server.rs | 18 ++++++++- 2 files changed, 84 insertions(+), 21 deletions(-) diff --git a/rust/relay/src/main.rs b/rust/relay/src/main.rs index b5591cad2..6592e544e 100644 --- a/rust/relay/src/main.rs +++ b/rust/relay/src/main.rs @@ -19,13 +19,15 @@ use std::convert::Infallible; use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; use std::pin::Pin; use std::task::{ready, Poll}; -use std::time::SystemTime; +use std::time::{Duration, SystemTime}; use tracing::{level_filters::LevelFilter, Instrument, Subscriber}; use tracing_core::Dispatch; use tracing_stackdriver::CloudTraceConfiguration; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer}; use url::Url; +const STATS_LOG_INTERVAL: Duration = Duration::from_secs(10); + #[derive(Parser, Debug)] struct Args { /// The public (i.e. internet-reachable) IPv4 address of the relay server. @@ -121,7 +123,7 @@ async fn main() -> Result<()> { .instrument(span) .await? } else { - tracing::warn!("No portal token supplied, starting standalone mode"); + tracing::warn!(target: "relay", "No portal token supplied, starting standalone mode"); None }; @@ -130,7 +132,7 @@ async fn main() -> Result<()> { tokio::spawn(firezone_relay::health_check::serve(args.health_check_addr)); - tracing::info!("Listening for incoming traffic on UDP port 3478"); + tracing::info!(target: "relay", "Listening for incoming traffic on UDP port 3478"); future::poll_fn(|cx| eventloop.poll(cx)) .await @@ -158,7 +160,7 @@ async fn setup_tracing(args: &Args) -> Result<()> { Some(endpoint) => { let grpc_endpoint = format!("http://{endpoint}"); - tracing::trace!(%grpc_endpoint, "Setting up OTLP exporter for collector"); + tracing::trace!(target: "relay", %grpc_endpoint, "Setting up OTLP exporter for collector"); let exporter = opentelemetry_otlp::new_exporter() .tonic() @@ -174,7 +176,7 @@ async fn setup_tracing(args: &Args) -> Result<()> { .install_batch(opentelemetry::runtime::Tokio) .context("Failed to create OTLP trace pipeline")?; - tracing::trace!("Successfully initialized trace provider on tokio runtime"); + tracing::trace!(target: "relay", "Successfully initialized trace provider on tokio runtime"); let exporter = opentelemetry_otlp::new_exporter() .tonic() @@ -186,7 +188,7 @@ async fn setup_tracing(args: &Args) -> Result<()> { .build() .context("Failed to create OTLP metrics pipeline")?; - tracing::trace!("Successfully initialized metric controller on tokio runtime"); + tracing::trace!(target: "relay", "Successfully initialized metric controller on tokio runtime"); tracing_subscriber::registry() .with(log_layer(args)) @@ -223,7 +225,7 @@ where (LogFormat::Human, _) => tracing_subscriber::fmt::layer().boxed(), (LogFormat::Json, _) => tracing_subscriber::fmt::layer().json().boxed(), (LogFormat::GoogleCloud, None) => { - tracing::warn!("Emitting logs in Google Cloud format but without the project ID set. Spans will be emitted without IDs!"); + tracing::warn!(target: "relay", "Emitting logs in Google Cloud format but without the project ID set. Spans will be emitted without IDs!"); tracing_stackdriver::layer().boxed() } @@ -250,7 +252,7 @@ async fn connect_to_portal( use secrecy::ExposeSecret; if !url.path().is_empty() { - tracing::warn!("Overwriting path component of portal URL with '/relay/websocket'"); + tracing::warn!(target: "relay", "Overwriting path component of portal URL with '/relay/websocket'"); } url.set_path("relay/websocket"); @@ -296,7 +298,7 @@ fn make_rng(seed: Option) -> StdRng { return StdRng::from_entropy(); }; - tracing::info!("Seeding RNG from '{seed}'"); + tracing::info!(target: "relay", "Seeding RNG from '{seed}'"); StdRng::seed_from_u64(seed) } @@ -304,7 +306,7 @@ fn make_rng(seed: Option) -> StdRng { #[cfg(not(debug_assertions))] fn make_rng(seed: Option) -> StdRng { if seed.is_some() { - tracing::debug!("Ignoring rng-seed because we are running in release mode"); + tracing::debug!(target: "relay", "Ignoring rng-seed because we are running in release mode"); } StdRng::from_entropy() @@ -320,6 +322,9 @@ struct Eventloop { relay_data_sender: mpsc::Sender<(Vec, SocketAddr, AllocationId)>, relay_data_receiver: mpsc::Receiver<(Vec, SocketAddr, AllocationId)>, sleep: Sleep, + + stats_log_interval: tokio::time::Interval, + last_num_bytes_relayed: u64, } impl Eventloop @@ -363,6 +368,8 @@ where relay_data_sender, relay_data_receiver, sleep: Sleep::default(), + stats_log_interval: tokio::time::interval(STATS_LOG_INTERVAL), + last_num_bytes_relayed: 0, }) } @@ -398,7 +405,7 @@ where // Should never happen because we poll for readiness above. if e.is_full() { - tracing::warn!(%recipient, "Dropping message because channel to primary UDP socket task is full"); + tracing::warn!(target: "relay", %recipient, "Dropping message because channel to primary UDP socket task is full"); } } } @@ -417,11 +424,11 @@ where let _guard = span.enter(); if self.allocations.remove(&(id, family)).is_none() { - tracing::debug!("Unknown allocation {id}"); + tracing::debug!(target: "relay", "Unknown allocation {id}"); continue; }; - tracing::info!("Freeing addresses of allocation {id}"); + tracing::info!(target: "relay", "Freeing addresses of allocation {id}"); } Command::Wake { deadline } => { let span = tracing::error_span!("Command::Wake", ?deadline); @@ -429,12 +436,12 @@ where match deadline.duration_since(now) { Ok(duration) => { - tracing::trace!(?duration, "Suspending event loop") + tracing::trace!(target: "relay", ?duration, "Suspending event loop") } Err(e) => { let difference = e.duration(); - tracing::warn!( + tracing::warn!(target: "relay", ?difference, "Wake time is already in the past, waking now" ) @@ -450,7 +457,7 @@ where let mut allocation = match self.allocations.entry((id, receiver.family())) { Entry::Occupied(entry) => entry, Entry::Vacant(_) => { - tracing::debug!(allocation = %id, family = %receiver.family(), "Unknown allocation"); + tracing::debug!(target: "relay", allocation = %id, family = %receiver.family(), "Unknown allocation"); continue; } }; @@ -490,7 +497,7 @@ where // Priority 5: Handle portal messages match self.channel.as_mut().map(|c| c.poll(cx)) { Some(Poll::Ready(Err(Error::Serde(e)))) => { - tracing::warn!("Failed to deserialize portal message: {e}"); + tracing::warn!(target: "relay", "Failed to deserialize portal message: {e}"); continue; // This is not a hard-error, we can continue. } Some(Poll::Ready(Err(e))) => { @@ -500,7 +507,7 @@ where continue; } Some(Poll::Ready(Ok(Event::JoinedRoom { topic }))) => { - tracing::info!("Successfully joined room '{topic}'"); + tracing::info!(target: "relay", "Successfully joined room '{topic}'"); continue; } Some(Poll::Ready(Ok(Event::ErrorResponse { @@ -508,11 +515,11 @@ where req_id, reason, }))) => { - tracing::warn!("Request with ID {req_id} on topic {topic} failed: {reason}"); + tracing::warn!(target: "relay", "Request with ID {req_id} on topic {topic} failed: {reason}"); continue; } Some(Poll::Ready(Ok(Event::HeartbeatSent))) => { - tracing::debug!("Heartbeat sent to portal"); + tracing::debug!(target: "relay", "Heartbeat sent to portal"); continue; } Some(Poll::Ready(Ok( @@ -522,11 +529,38 @@ where | None => {} } + if self.stats_log_interval.poll_tick(cx).is_ready() { + let num_allocations = self.server.num_allocations(); + let num_channels = self.server.num_channels(); + + let bytes_relayed_since_last_tick = + self.server.num_relayed_bytes() - self.last_num_bytes_relayed; + self.last_num_bytes_relayed = self.server.num_relayed_bytes(); + + let avg_throughput = bytes_relayed_since_last_tick / STATS_LOG_INTERVAL.as_secs(); + + tracing::info!(target: "relay", "Allocations = {num_allocations} Channels = {num_channels} Throughput = {}", fmt_human_throughput(avg_throughput as f64)); + } + return Poll::Pending; } } } +fn fmt_human_throughput(mut throughput: f64) -> String { + let units = ["B/s", "kB/s", "MB/s", "GB/s", "TB/s"]; + + for unit in units { + if throughput < 1000.0 { + return format!("{throughput:.2} {unit}"); + } + + throughput /= 1000.0; + } + + format!("{throughput:.2} TB/s") +} + async fn main_udp_socket_task( family: AddressFamily, mut inbound_data_sender: mpsc::Sender<(Vec, SocketAddr)>, @@ -547,3 +581,16 @@ async fn main_udp_socket_task( } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn prints_humanfriendly_throughput() { + assert_eq!(fmt_human_throughput(42.0), "42.00 B/s"); + assert_eq!(fmt_human_throughput(1_234.0), "1.23 kB/s"); + assert_eq!(fmt_human_throughput(955_333_999.0), "955.33 MB/s"); + assert_eq!(fmt_human_throughput(100_000_000_000.0), "100.00 GB/s"); + } +} diff --git a/rust/relay/src/server.rs b/rust/relay/src/server.rs index 24e1d0fac..b7203ccd9 100644 --- a/rust/relay/src/server.rs +++ b/rust/relay/src/server.rs @@ -76,6 +76,7 @@ pub struct Server { allocations_up_down_counter: UpDownCounter, data_relayed_counter: Counter, + data_relayed: u64, // Keep a separate counter because `Counter` doesn't expose the current value :( responses_counter: Counter, } @@ -173,7 +174,7 @@ where let data_relayed_counter = meter .u64_counter("data_relayed_bytes") .with_description("The number of bytes relayed") - .with_unit(Unit::new("kB")) + .with_unit(Unit::new("b")) .init(); Self { @@ -196,6 +197,7 @@ where allocations_up_down_counter, responses_counter, data_relayed_counter, + data_relayed: 0, } } @@ -210,6 +212,18 @@ where self.nonces.add_new(nonce); } + pub fn num_relayed_bytes(&self) -> u64 { + self.data_relayed + } + + pub fn num_allocations(&self) -> usize { + self.allocations.len() + } + + pub fn num_channels(&self) -> usize { + self.channels_by_client_and_number.len() + } + /// Process the bytes received from a client. /// /// After calling this method, you should call [`Server::next_command`] until it returns `None`. @@ -351,6 +365,7 @@ where tracing::debug!(target: "relay", "Relaying {} bytes", bytes.len()); self.data_relayed_counter.add(bytes.len() as u64, &[]); + self.data_relayed += bytes.len() as u64; let data = ChannelData::new(*channel_number, bytes).to_bytes(); @@ -750,6 +765,7 @@ where tracing::debug!(target: "relay", "Relaying {} bytes", data.len()); self.data_relayed_counter.add(data.len() as u64, &[]); + self.data_relayed += data.len() as u64; if tracing::enabled!(target: "wire", tracing::Level::TRACE) { let hex_bytes = hex::encode(data);