feat(relay): print stats every 10s (#3408)

In #3400, a discussion started on what the correct log level would be
for the production relay. Currently, the relay logs some stats about
each packet on debug, i.e. where it came from, where it is going to and
how big it is. This isn't very useful in production though and will fill
up our log disk quickly.

This PR introduces a stats timer like we already have it in other
components. We print the number of allocations, how many channels we
have and how much data we relayed over all these channels since we last
printed. The interval is currently set to 10 seconds.

Here is what this output could look like (captured locally using
`relay/run_smoke_test.sh`, although slightly tweaked, printing ever 2s,
using release mode and larger packets on the clients):

```
2024-01-26T05:01:02.445555Z  INFO relay: Seeding RNG from '0'
2024-01-26T05:01:02.445580Z  WARN relay: No portal token supplied, starting standalone mode
2024-01-26T05:01:02.445827Z  INFO relay: Listening for incoming traffic on UDP port 3478
2024-01-26T05:01:02.447035Z  INFO Eventloop::poll: relay: num_allocations=0 num_channels=0 throughput=0.00 B/s
2024-01-26T05:01:02.649194Z  INFO Eventloop::poll:handle_client_input{sender=127.0.0.1:39092 transaction_id="8f20177512495fcb563c60de" allocation=AID-1}: relay: Created new allocation first_relay_address=127.0.0.1 lifetime=600s
2024-01-26T05:01:02.650744Z  INFO Eventloop::poll:handle_client_input{sender=127.0.0.1:39092 transaction_id="6445943a353d5e8c262a821f" allocation=AID-1 peer=127.0.0.1:41094 channel=16384}: relay: Successfully bound channel
2024-01-26T05:01:04.446317Z  INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=631.54 MB/s
2024-01-26T05:01:06.446319Z  INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=698.73 MB/s
2024-01-26T05:01:08.446325Z  INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=708.98 MB/s
2024-01-26T05:01:10.446324Z  INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=690.79 MB/s
2024-01-26T05:01:12.446316Z  INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=715.53 MB/s
2024-01-26T05:01:14.446315Z  INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=706.90 MB/s
2024-01-26T05:01:16.446313Z  INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=712.03 MB/s
2024-01-26T05:01:18.446319Z  INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=717.54 MB/s
2024-01-26T05:01:20.446316Z  INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=690.74 MB/s
2024-01-26T05:01:22.446313Z  INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=705.08 MB/s
2024-01-26T05:01:24.446311Z  INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=700.41 MB/s
2024-01-26T05:01:26.446319Z  INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=717.57 MB/s
2024-01-26T05:01:28.446320Z  INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=688.82 MB/s
2024-01-26T05:01:30.446329Z  INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=696.35 MB/s
2024-01-26T05:01:32.446317Z  INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=724.03 MB/s
2024-01-26T05:01:34.446320Z  INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=713.46 MB/s
2024-01-26T05:01:36.446314Z  INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=716.13 MB/s
2024-01-26T05:01:38.446327Z  INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=687.16 MB/s
2024-01-26T05:01:40.446315Z  INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=708.20 MB/s
2024-01-26T05:01:42.446314Z  INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=689.36 MB/s
2024-01-26T05:01:44.446314Z  INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=698.62 MB/s
2024-01-26T05:01:46.446315Z  INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=696.21 MB/s
2024-01-26T05:01:48.446378Z  INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=696.36 MB/s
2024-01-26T05:01:50.446314Z  INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=709.47 MB/s
2024-01-26T05:01:52.446319Z  INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=714.48 MB/s
2024-01-26T05:01:54.446323Z  INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=690.71 MB/s
2024-01-26T05:01:56.446313Z  INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=692.70 MB/s
2024-01-26T05:01:58.446321Z  INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=687.87 MB/s
2024-01-26T05:02:00.446316Z  INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=682.11 MB/s
2024-01-26T05:02:02.446312Z  INFO Eventloop::poll: relay: num_allocations=1 num_channels=1 throughput=700.07 MB/s
```
This commit is contained in:
Thomas Eizinger
2024-01-29 10:52:15 -08:00
committed by GitHub
parent d043f81e95
commit c9834ee8ee
2 changed files with 84 additions and 21 deletions

View File

@@ -19,13 +19,15 @@ use std::convert::Infallible;
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
use std::pin::Pin;
use std::task::{ready, Poll};
use std::time::SystemTime;
use std::time::{Duration, SystemTime};
use tracing::{level_filters::LevelFilter, Instrument, Subscriber};
use tracing_core::Dispatch;
use tracing_stackdriver::CloudTraceConfiguration;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};
use url::Url;
const STATS_LOG_INTERVAL: Duration = Duration::from_secs(10);
#[derive(Parser, Debug)]
struct Args {
/// The public (i.e. internet-reachable) IPv4 address of the relay server.
@@ -121,7 +123,7 @@ async fn main() -> Result<()> {
.instrument(span)
.await?
} else {
tracing::warn!("No portal token supplied, starting standalone mode");
tracing::warn!(target: "relay", "No portal token supplied, starting standalone mode");
None
};
@@ -130,7 +132,7 @@ async fn main() -> Result<()> {
tokio::spawn(firezone_relay::health_check::serve(args.health_check_addr));
tracing::info!("Listening for incoming traffic on UDP port 3478");
tracing::info!(target: "relay", "Listening for incoming traffic on UDP port 3478");
future::poll_fn(|cx| eventloop.poll(cx))
.await
@@ -158,7 +160,7 @@ async fn setup_tracing(args: &Args) -> Result<()> {
Some(endpoint) => {
let grpc_endpoint = format!("http://{endpoint}");
tracing::trace!(%grpc_endpoint, "Setting up OTLP exporter for collector");
tracing::trace!(target: "relay", %grpc_endpoint, "Setting up OTLP exporter for collector");
let exporter = opentelemetry_otlp::new_exporter()
.tonic()
@@ -174,7 +176,7 @@ async fn setup_tracing(args: &Args) -> Result<()> {
.install_batch(opentelemetry::runtime::Tokio)
.context("Failed to create OTLP trace pipeline")?;
tracing::trace!("Successfully initialized trace provider on tokio runtime");
tracing::trace!(target: "relay", "Successfully initialized trace provider on tokio runtime");
let exporter = opentelemetry_otlp::new_exporter()
.tonic()
@@ -186,7 +188,7 @@ async fn setup_tracing(args: &Args) -> Result<()> {
.build()
.context("Failed to create OTLP metrics pipeline")?;
tracing::trace!("Successfully initialized metric controller on tokio runtime");
tracing::trace!(target: "relay", "Successfully initialized metric controller on tokio runtime");
tracing_subscriber::registry()
.with(log_layer(args))
@@ -223,7 +225,7 @@ where
(LogFormat::Human, _) => tracing_subscriber::fmt::layer().boxed(),
(LogFormat::Json, _) => tracing_subscriber::fmt::layer().json().boxed(),
(LogFormat::GoogleCloud, None) => {
tracing::warn!("Emitting logs in Google Cloud format but without the project ID set. Spans will be emitted without IDs!");
tracing::warn!(target: "relay", "Emitting logs in Google Cloud format but without the project ID set. Spans will be emitted without IDs!");
tracing_stackdriver::layer().boxed()
}
@@ -250,7 +252,7 @@ async fn connect_to_portal(
use secrecy::ExposeSecret;
if !url.path().is_empty() {
tracing::warn!("Overwriting path component of portal URL with '/relay/websocket'");
tracing::warn!(target: "relay", "Overwriting path component of portal URL with '/relay/websocket'");
}
url.set_path("relay/websocket");
@@ -296,7 +298,7 @@ fn make_rng(seed: Option<u64>) -> StdRng {
return StdRng::from_entropy();
};
tracing::info!("Seeding RNG from '{seed}'");
tracing::info!(target: "relay", "Seeding RNG from '{seed}'");
StdRng::seed_from_u64(seed)
}
@@ -304,7 +306,7 @@ fn make_rng(seed: Option<u64>) -> StdRng {
#[cfg(not(debug_assertions))]
fn make_rng(seed: Option<u64>) -> StdRng {
if seed.is_some() {
tracing::debug!("Ignoring rng-seed because we are running in release mode");
tracing::debug!(target: "relay", "Ignoring rng-seed because we are running in release mode");
}
StdRng::from_entropy()
@@ -320,6 +322,9 @@ struct Eventloop<R> {
relay_data_sender: mpsc::Sender<(Vec<u8>, SocketAddr, AllocationId)>,
relay_data_receiver: mpsc::Receiver<(Vec<u8>, SocketAddr, AllocationId)>,
sleep: Sleep,
stats_log_interval: tokio::time::Interval,
last_num_bytes_relayed: u64,
}
impl<R> Eventloop<R>
@@ -363,6 +368,8 @@ where
relay_data_sender,
relay_data_receiver,
sleep: Sleep::default(),
stats_log_interval: tokio::time::interval(STATS_LOG_INTERVAL),
last_num_bytes_relayed: 0,
})
}
@@ -398,7 +405,7 @@ where
// Should never happen because we poll for readiness above.
if e.is_full() {
tracing::warn!(%recipient, "Dropping message because channel to primary UDP socket task is full");
tracing::warn!(target: "relay", %recipient, "Dropping message because channel to primary UDP socket task is full");
}
}
}
@@ -417,11 +424,11 @@ where
let _guard = span.enter();
if self.allocations.remove(&(id, family)).is_none() {
tracing::debug!("Unknown allocation {id}");
tracing::debug!(target: "relay", "Unknown allocation {id}");
continue;
};
tracing::info!("Freeing addresses of allocation {id}");
tracing::info!(target: "relay", "Freeing addresses of allocation {id}");
}
Command::Wake { deadline } => {
let span = tracing::error_span!("Command::Wake", ?deadline);
@@ -429,12 +436,12 @@ where
match deadline.duration_since(now) {
Ok(duration) => {
tracing::trace!(?duration, "Suspending event loop")
tracing::trace!(target: "relay", ?duration, "Suspending event loop")
}
Err(e) => {
let difference = e.duration();
tracing::warn!(
tracing::warn!(target: "relay",
?difference,
"Wake time is already in the past, waking now"
)
@@ -450,7 +457,7 @@ where
let mut allocation = match self.allocations.entry((id, receiver.family())) {
Entry::Occupied(entry) => entry,
Entry::Vacant(_) => {
tracing::debug!(allocation = %id, family = %receiver.family(), "Unknown allocation");
tracing::debug!(target: "relay", allocation = %id, family = %receiver.family(), "Unknown allocation");
continue;
}
};
@@ -490,7 +497,7 @@ where
// Priority 5: Handle portal messages
match self.channel.as_mut().map(|c| c.poll(cx)) {
Some(Poll::Ready(Err(Error::Serde(e)))) => {
tracing::warn!("Failed to deserialize portal message: {e}");
tracing::warn!(target: "relay", "Failed to deserialize portal message: {e}");
continue; // This is not a hard-error, we can continue.
}
Some(Poll::Ready(Err(e))) => {
@@ -500,7 +507,7 @@ where
continue;
}
Some(Poll::Ready(Ok(Event::JoinedRoom { topic }))) => {
tracing::info!("Successfully joined room '{topic}'");
tracing::info!(target: "relay", "Successfully joined room '{topic}'");
continue;
}
Some(Poll::Ready(Ok(Event::ErrorResponse {
@@ -508,11 +515,11 @@ where
req_id,
reason,
}))) => {
tracing::warn!("Request with ID {req_id} on topic {topic} failed: {reason}");
tracing::warn!(target: "relay", "Request with ID {req_id} on topic {topic} failed: {reason}");
continue;
}
Some(Poll::Ready(Ok(Event::HeartbeatSent))) => {
tracing::debug!("Heartbeat sent to portal");
tracing::debug!(target: "relay", "Heartbeat sent to portal");
continue;
}
Some(Poll::Ready(Ok(
@@ -522,11 +529,38 @@ where
| None => {}
}
if self.stats_log_interval.poll_tick(cx).is_ready() {
let num_allocations = self.server.num_allocations();
let num_channels = self.server.num_channels();
let bytes_relayed_since_last_tick =
self.server.num_relayed_bytes() - self.last_num_bytes_relayed;
self.last_num_bytes_relayed = self.server.num_relayed_bytes();
let avg_throughput = bytes_relayed_since_last_tick / STATS_LOG_INTERVAL.as_secs();
tracing::info!(target: "relay", "Allocations = {num_allocations} Channels = {num_channels} Throughput = {}", fmt_human_throughput(avg_throughput as f64));
}
return Poll::Pending;
}
}
}
fn fmt_human_throughput(mut throughput: f64) -> String {
let units = ["B/s", "kB/s", "MB/s", "GB/s", "TB/s"];
for unit in units {
if throughput < 1000.0 {
return format!("{throughput:.2} {unit}");
}
throughput /= 1000.0;
}
format!("{throughput:.2} TB/s")
}
async fn main_udp_socket_task(
family: AddressFamily,
mut inbound_data_sender: mpsc::Sender<(Vec<u8>, SocketAddr)>,
@@ -547,3 +581,16 @@ async fn main_udp_socket_task(
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn prints_humanfriendly_throughput() {
assert_eq!(fmt_human_throughput(42.0), "42.00 B/s");
assert_eq!(fmt_human_throughput(1_234.0), "1.23 kB/s");
assert_eq!(fmt_human_throughput(955_333_999.0), "955.33 MB/s");
assert_eq!(fmt_human_throughput(100_000_000_000.0), "100.00 GB/s");
}
}

View File

@@ -76,6 +76,7 @@ pub struct Server<R> {
allocations_up_down_counter: UpDownCounter<i64>,
data_relayed_counter: Counter<u64>,
data_relayed: u64, // Keep a separate counter because `Counter` doesn't expose the current value :(
responses_counter: Counter<u64>,
}
@@ -173,7 +174,7 @@ where
let data_relayed_counter = meter
.u64_counter("data_relayed_bytes")
.with_description("The number of bytes relayed")
.with_unit(Unit::new("kB"))
.with_unit(Unit::new("b"))
.init();
Self {
@@ -196,6 +197,7 @@ where
allocations_up_down_counter,
responses_counter,
data_relayed_counter,
data_relayed: 0,
}
}
@@ -210,6 +212,18 @@ where
self.nonces.add_new(nonce);
}
pub fn num_relayed_bytes(&self) -> u64 {
self.data_relayed
}
pub fn num_allocations(&self) -> usize {
self.allocations.len()
}
pub fn num_channels(&self) -> usize {
self.channels_by_client_and_number.len()
}
/// Process the bytes received from a client.
///
/// After calling this method, you should call [`Server::next_command`] until it returns `None`.
@@ -351,6 +365,7 @@ where
tracing::debug!(target: "relay", "Relaying {} bytes", bytes.len());
self.data_relayed_counter.add(bytes.len() as u64, &[]);
self.data_relayed += bytes.len() as u64;
let data = ChannelData::new(*channel_number, bytes).to_bytes();
@@ -750,6 +765,7 @@ where
tracing::debug!(target: "relay", "Relaying {} bytes", data.len());
self.data_relayed_counter.add(data.len() as u64, &[]);
self.data_relayed += data.len() as u64;
if tracing::enabled!(target: "wire", tracing::Level::TRACE) {
let hex_bytes = hex::encode(data);