diff --git a/rust/connlib/snownet/src/info.rs b/rust/connlib/snownet/src/info.rs deleted file mode 100644 index 94655cdb9..000000000 --- a/rust/connlib/snownet/src/info.rs +++ /dev/null @@ -1,7 +0,0 @@ -use std::time::Instant; - -#[derive(Debug)] -pub struct ConnectionInfo { - /// When this instance of [`ConnectionInfo`] was created. - pub generated_at: Instant, -} diff --git a/rust/connlib/snownet/src/lib.rs b/rust/connlib/snownet/src/lib.rs index 8eb98f0ac..6c0886cae 100644 --- a/rust/connlib/snownet/src/lib.rs +++ b/rust/connlib/snownet/src/lib.rs @@ -4,16 +4,16 @@ mod allocation; mod backoff; mod channel_data; mod index; -mod info; mod ip_packet; mod node; mod ringbuffer; +mod stats; mod stun_binding; mod utils; -pub use info::ConnectionInfo; pub use ip_packet::{IpPacket, MutableIpPacket}; pub use node::{ Answer, Client, ClientNode, Credentials, Error, Event, Node, Offer, Server, ServerNode, Transmit, }; +pub use stats::{ConnectionStats, NodeStats}; diff --git a/rust/connlib/snownet/src/node.rs b/rust/connlib/snownet/src/node.rs index 2c75f61e4..c0e9db5be 100644 --- a/rust/connlib/snownet/src/node.rs +++ b/rust/connlib/snownet/src/node.rs @@ -21,7 +21,7 @@ use str0m::{Candidate, CandidateKind, IceConnectionState}; use crate::allocation::{Allocation, Socket}; use crate::index::IndexLfsr; -use crate::info::ConnectionInfo; +use crate::stats::{ConnectionStats, NodeStats}; use crate::stun_binding::StunBinding; use crate::utils::earliest; use crate::{IpPacket, MutableIpPacket}; @@ -65,6 +65,8 @@ pub struct Node { buffer: Box<[u8; MAX_UDP_SIZE]>, + stats: NodeStats, + marker: PhantomData, } @@ -106,6 +108,7 @@ where allocations: HashMap::default(), last_now: now, connections: Default::default(), + stats: Default::default(), } } @@ -113,9 +116,8 @@ where (&self.private_key).into() } - /// Lazily retrieve stats of all connections. - pub fn stats(&self) -> impl Iterator + '_ { - self.connections.stats(self.last_now) + pub fn stats(&self) -> (NodeStats, impl Iterator + '_) { + (self.stats, self.connections.stats()) } /// Add an address as a `host` candidate. @@ -535,12 +537,16 @@ where for binding in self.bindings.values_mut() { if let Some(transmit) = binding.poll_transmit() { + self.stats.stun_bytes_to_relays += transmit.payload.len(); + return Some(transmit); } } for allocation in self.allocations.values_mut() { if let Some(transmit) = allocation.poll_transmit() { + self.stats.stun_bytes_to_relays += transmit.payload.len(); + return Some(transmit); } } @@ -574,6 +580,7 @@ where next_timer_update: self.last_now, peer_socket: None, possible_sockets: HashSet::default(), + stats: Default::default(), } } @@ -1050,10 +1057,8 @@ impl Connections where TId: Eq + Hash + Copy, { - fn stats(&self, now: Instant) -> impl Iterator + '_ { - self.established - .keys() - .map(move |id| (*id, ConnectionInfo { generated_at: now })) + fn stats(&self) -> impl Iterator + '_ { + self.established.iter().map(move |(id, c)| (*id, c.stats)) } fn agent_mut(&mut self, id: TId) -> Option<&mut IceAgent> { @@ -1277,6 +1282,8 @@ struct Connection { stun_servers: HashSet, turn_servers: HashSet, + + stats: ConnectionStats, } /// The socket of the peer we are connected to. @@ -1366,6 +1373,8 @@ impl Connection { .find(|(_, allocation)| allocation.has_socket(source)); let Some((relay, allocation)) = allocation else { + self.stats.stun_bytes_to_peer_direct += packet.len(); + // `source` did not match any of our allocated sockets, must be a local one then! return Some(Transmit { src: Some(source), @@ -1381,6 +1390,8 @@ impl Connection { continue; }; + self.stats.stun_bytes_to_peer_relayed += channel_data.len(); + return Some(Transmit { src: None, dst: *relay, diff --git a/rust/connlib/snownet/src/stats.rs b/rust/connlib/snownet/src/stats.rs new file mode 100644 index 000000000..179d93993 --- /dev/null +++ b/rust/connlib/snownet/src/stats.rs @@ -0,0 +1,56 @@ +use std::ops::AddAssign; + +#[derive(Default, Debug, Clone, Copy)] +pub struct NodeStats { + /// How many bytes we sent as part of exchanging STUN messages with relays (control messages only). + pub stun_bytes_to_relays: HumanBytes, +} + +#[derive(Default, Debug, Clone, Copy)] +pub struct ConnectionStats { + /// How many bytes we sent as part of exchanging STUN messages to other peers directly. + pub stun_bytes_to_peer_direct: HumanBytes, + /// How many bytes we sent as part of exchanging STUN messages to other peers via relays. + pub stun_bytes_to_peer_relayed: HumanBytes, +} + +#[derive(Default, Clone, Copy)] +pub struct HumanBytes(pub usize); + +impl std::fmt::Debug for HumanBytes { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", fmt_human_bytes(self.0 as f64)) + } +} + +impl AddAssign for HumanBytes { + fn add_assign(&mut self, rhs: usize) { + self.0 += rhs; + } +} + +fn fmt_human_bytes(mut throughput: f64) -> String { + let units = ["B", "kB", "MB", "GB", "TB"]; + + for unit in units { + if throughput < 1000.0 { + return format!("{throughput:.2} {unit}"); + } + + throughput /= 1000.0; + } + + format!("{throughput:.2} TB") +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn fmt_human_bytes() { + assert_eq!(format!("{:?}", HumanBytes(0)), "0.00 B"); + assert_eq!(format!("{:?}", HumanBytes(1_000)), "1.00 kB"); + assert_eq!(format!("{:?}", HumanBytes(12_500_000)), "12.50 MB"); + } +} diff --git a/rust/connlib/tunnel/src/lib.rs b/rust/connlib/tunnel/src/lib.rs index d0d9917a7..8347b6166 100644 --- a/rust/connlib/tunnel/src/lib.rs +++ b/rust/connlib/tunnel/src/lib.rs @@ -21,7 +21,7 @@ use std::{ hash::Hash, io, task::{ready, Context, Poll}, - time::Instant, + time::{Duration, Instant}, }; pub use client::ClientState; @@ -242,6 +242,7 @@ struct ConnectionState { pub node: Node, write_buf: Box<[u8; MAX_UDP_SIZE]>, connection_pool_timeout: BoxFuture<'static, std::time::Instant>, + stats_timer: tokio::time::Interval, sockets: Sockets, } @@ -255,6 +256,7 @@ where write_buf: Box::new([0; MAX_UDP_SIZE]), connection_pool_timeout: sleep_until(std::time::Instant::now()).boxed(), sockets: Sockets::new()?, + stats_timer: tokio::time::interval(Duration::from_secs(10)), }) } @@ -347,6 +349,18 @@ where } fn poll_next_event(&mut self, cx: &mut Context<'_>) -> Poll> { + if self.stats_timer.poll_tick(cx).is_ready() { + let (node_stats, conn_stats) = self.node.stats(); + + tracing::info!(target: "connlib::stats", "{node_stats:?}"); + + for (id, stats) in conn_stats { + tracing::info!(target: "connlib::stats", %id, "{stats:?}"); + } + + cx.waker().wake_by_ref(); + } + if let Err(e) = ready!(self.sockets.poll_send_ready(cx)) { tracing::warn!("Failed to poll sockets for readiness: {e}"); };