From e84bdc5566f3334bf034d10cfdd1b6641853e2b7 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 2 Sep 2025 12:57:36 +1000 Subject: [PATCH] refactor(connlib): periodically record queue depths (#10242) Instead of recording the queue depths on every event-loop tick, we now record them once a second by setting a Gauge. Not only is that a simpler instrument to work with but it is significantly more performant. The current version - when metrics are enabled - takes on quite a bit of CPU time. Resolves: #10237 --- rust/Cargo.lock | 2 + rust/apple-client-ffi/src/tun.rs | 26 ++++--- rust/bin-shared/Cargo.toml | 1 + .../src/tun_device_manager/linux.rs | 26 ++++--- .../src/tun_device_manager/windows.rs | 32 ++++---- rust/bin-shared/tests/dns_control_windows.rs | 34 ++++----- rust/client-ffi/src/platform/android/tun.rs | 26 ++++--- rust/client-ffi/src/platform/fallback.rs | 4 - rust/connlib/tun/src/lib.rs | 3 - rust/connlib/tunnel/src/device_channel.rs | 7 -- rust/connlib/tunnel/src/io.rs | 41 ----------- rust/connlib/tunnel/src/sockets.rs | 46 ++++-------- rust/gateway/src/main.rs | 4 - rust/telemetry/Cargo.toml | 1 + rust/telemetry/src/otel.rs | 73 ++++++++++++++++++- 15 files changed, 169 insertions(+), 157 deletions(-) diff --git a/rust/Cargo.lock b/rust/Cargo.lock index ece532f0e..1ef7b8428 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -2344,6 +2344,7 @@ dependencies = [ "dirs", "dns-types", "firezone-logging", + "firezone-telemetry", "futures", "gat-lending-iterator", "hex", @@ -2608,6 +2609,7 @@ name = "firezone-telemetry" version = "0.1.0" dependencies = [ "anyhow", + "flume", "futures", "hex", "ip-packet", diff --git a/rust/apple-client-ffi/src/tun.rs b/rust/apple-client-ffi/src/tun.rs index 0f18fe647..50432c584 100644 --- a/rust/apple-client-ffi/src/tun.rs +++ b/rust/apple-client-ffi/src/tun.rs @@ -1,3 +1,4 @@ +use firezone_telemetry::otel; use futures::SinkExt as _; use ip_packet::{IpPacket, IpPacketBuf, IpVersion}; use libc::{AF_INET, AF_INET6, F_GETFL, F_SETFL, O_NONBLOCK, fcntl, iovec, msghdr, recvmsg}; @@ -27,6 +28,21 @@ impl Tun { let (inbound_tx, inbound_rx) = mpsc::channel(QUEUE_SIZE); let (outbound_tx, outbound_rx) = mpsc::channel(QUEUE_SIZE); + tokio::spawn(otel::metrics::periodic_system_queue_length( + outbound_tx.downgrade(), + [ + otel::attr::queue_item_ip_packet(), + otel::attr::network_io_direction_transmit(), + ], + )); + tokio::spawn(otel::metrics::periodic_system_queue_length( + inbound_tx.downgrade(), + [ + otel::attr::queue_item_ip_packet(), + otel::attr::network_io_direction_receive(), + ], + )); + std::thread::Builder::new() .name("TUN send".to_owned()) .spawn(move || { @@ -81,16 +97,6 @@ impl tun::Tun for Tun { ) -> Poll { self.inbound_rx.poll_recv_many(cx, buf, max) } - - fn queue_lengths(&self) -> (usize, usize) { - ( - self.inbound_rx.len(), - self.outbound_tx - .get_ref() - .map(|s| QUEUE_SIZE - s.capacity()) - .unwrap_or_default(), - ) - } } fn get_last_error() -> io::Error { diff --git a/rust/bin-shared/Cargo.toml b/rust/bin-shared/Cargo.toml index 9d7b2b979..821c82afc 100644 --- a/rust/bin-shared/Cargo.toml +++ b/rust/bin-shared/Cargo.toml @@ -13,6 +13,7 @@ axum = { workspace = true, features = ["http1", "tokio"] } clap = { workspace = true, features = ["derive", "env"] } dns-types = { workspace = true } firezone-logging = { workspace = true } +firezone-telemetry = { workspace = true } futures = { workspace = true, features = ["std", "async-await"] } gat-lending-iterator = { workspace = true } hex = { workspace = true } diff --git a/rust/bin-shared/src/tun_device_manager/linux.rs b/rust/bin-shared/src/tun_device_manager/linux.rs index 45f0cc41a..64c5e221f 100644 --- a/rust/bin-shared/src/tun_device_manager/linux.rs +++ b/rust/bin-shared/src/tun_device_manager/linux.rs @@ -3,6 +3,7 @@ use crate::FIREZONE_MARK; use anyhow::{Context as _, Result, anyhow}; use firezone_logging::err_with_src; +use firezone_telemetry::otel; use futures::{SinkExt, TryStreamExt}; use ip_network::{IpNetwork, Ipv4Network, Ipv6Network}; use ip_packet::{IpPacket, IpPacketBuf}; @@ -347,6 +348,21 @@ impl Tun { let (inbound_tx, inbound_rx) = mpsc::channel(QUEUE_SIZE); let (outbound_tx, outbound_rx) = mpsc::channel(QUEUE_SIZE); + tokio::spawn(otel::metrics::periodic_system_queue_length( + outbound_tx.downgrade(), + [ + otel::attr::queue_item_ip_packet(), + otel::attr::network_io_direction_transmit(), + ], + )); + tokio::spawn(otel::metrics::periodic_system_queue_length( + inbound_tx.downgrade(), + [ + otel::attr::queue_item_ip_packet(), + otel::attr::network_io_direction_receive(), + ], + )); + let fd = Arc::new(open_tun()?); std::thread::Builder::new() @@ -434,16 +450,6 @@ impl tun::Tun for Tun { fn name(&self) -> &str { TunDeviceManager::IFACE_NAME } - - fn queue_lengths(&self) -> (usize, usize) { - ( - self.inbound_rx.len(), - self.outbound_tx - .get_ref() - .map(|s| QUEUE_SIZE - s.capacity()) - .unwrap_or_default(), - ) - } } fn get_last_error() -> io::Error { diff --git a/rust/bin-shared/src/tun_device_manager/windows.rs b/rust/bin-shared/src/tun_device_manager/windows.rs index 089f15200..ec04640b2 100644 --- a/rust/bin-shared/src/tun_device_manager/windows.rs +++ b/rust/bin-shared/src/tun_device_manager/windows.rs @@ -3,6 +3,7 @@ use crate::windows::TUNNEL_UUID; use crate::windows::error::{NOT_FOUND, NOT_SUPPORTED, OBJECT_EXISTS}; use anyhow::{Context as _, Result}; use firezone_logging::err_with_src; +use firezone_telemetry::otel; use ip_network::{IpNetwork, Ipv4Network, Ipv6Network}; use ip_packet::{IpPacket, IpPacketBuf}; use ring::digest; @@ -298,6 +299,22 @@ impl Tun { ); let (outbound_tx, outbound_rx) = mpsc::channel(QUEUE_SIZE); let (inbound_tx, inbound_rx) = mpsc::channel(QUEUE_SIZE); // We want to be able to batch-receive from this. + + tokio::spawn(otel::metrics::periodic_system_queue_length( + outbound_tx.downgrade(), + [ + otel::attr::queue_item_ip_packet(), + otel::attr::network_io_direction_transmit(), + ], + )); + tokio::spawn(otel::metrics::periodic_system_queue_length( + inbound_tx.downgrade(), + [ + otel::attr::queue_item_ip_packet(), + otel::attr::network_io_direction_receive(), + ], + )); + let send_thread = start_send_thread(outbound_rx, Arc::downgrade(&session)) .context("Failed to start send thread")?; let recv_thread = start_recv_thread(inbound_tx, Arc::downgrade(&session)) @@ -365,21 +382,6 @@ impl tun::Tun for Tun { Ok(()) } - - fn queue_lengths(&self) -> (usize, usize) { - self.state - .as_ref() - .map(|s| { - ( - s.inbound_rx.len(), - s.outbound_tx - .get_ref() - .map(|s| QUEUE_SIZE - s.capacity()) - .unwrap_or_default(), - ) - }) - .unwrap_or_default() - } } // Moves packets from Internet towards the user diff --git a/rust/bin-shared/tests/dns_control_windows.rs b/rust/bin-shared/tests/dns_control_windows.rs index de967ef2c..f38d3c4b0 100644 --- a/rust/bin-shared/tests/dns_control_windows.rs +++ b/rust/bin-shared/tests/dns_control_windows.rs @@ -6,26 +6,20 @@ use std::{collections::BTreeSet, net::IpAddr}; // Passes in CI but not locally. Maybe ReactorScram's dev system has IPv6 misconfigured. There it fails to pick up the IPv6 DNS servers. #[ignore = "Needs admin, changes system state"] -#[test] -fn dns_control() { +#[tokio::test] +async fn dns_control() { let _guard = firezone_logging::test("debug"); - let rt = tokio::runtime::Builder::new_current_thread() - .build() - .unwrap(); - let mut tun_dev_manager = firezone_bin_shared::TunDeviceManager::new(1280).unwrap(); let _tun = tun_dev_manager.make_tun().unwrap(); - rt.block_on(async { - tun_dev_manager - .set_ips( - [100, 92, 193, 137].into(), - [0xfd00, 0x2021, 0x1111, 0x0, 0x0, 0x0, 0xa, 0x9db5].into(), - ) - .await - }) - .unwrap(); + tun_dev_manager + .set_ips( + [100, 92, 193, 137].into(), + [0xfd00, 0x2021, 0x1111, 0x0, 0x0, 0x0, 0xa, 0x9db5].into(), + ) + .await + .unwrap(); let mut dns_controller = DnsController { dns_control_method: DnsControlMethod::Nrpt, @@ -41,12 +35,10 @@ fn dns_control() { 0xfd00, 0x2021, 0x1111, 0x8000, 0x0100, 0x0100, 0x0111, 0x0004, ]), ]; - rt.block_on(async { - dns_controller - .set_dns(fz_dns_servers.clone(), None) - .await - .unwrap(); - }); + dns_controller + .set_dns(fz_dns_servers.clone(), None) + .await + .unwrap(); let adapter = ipconfig::get_adapters() .unwrap() diff --git a/rust/client-ffi/src/platform/android/tun.rs b/rust/client-ffi/src/platform/android/tun.rs index f72c17bb9..f9670dd53 100644 --- a/rust/client-ffi/src/platform/android/tun.rs +++ b/rust/client-ffi/src/platform/android/tun.rs @@ -1,3 +1,4 @@ +use firezone_telemetry::otel; use futures::SinkExt as _; use ip_packet::{IpPacket, IpPacketBuf}; use std::os::fd::{FromRawFd, OwnedFd}; @@ -44,16 +45,6 @@ impl tun::Tun for Tun { ) -> Poll { self.inbound_rx.poll_recv_many(cx, buf, max) } - - fn queue_lengths(&self) -> (usize, usize) { - ( - self.inbound_rx.len(), - self.outbound_tx - .get_ref() - .map(|s| QUEUE_SIZE - s.capacity()) - .unwrap_or_default(), - ) - } } impl Tun { @@ -69,6 +60,21 @@ impl Tun { let (inbound_tx, inbound_rx) = mpsc::channel(QUEUE_SIZE); let (outbound_tx, outbound_rx) = mpsc::channel(QUEUE_SIZE); + tokio::spawn(otel::metrics::periodic_system_queue_length( + outbound_tx.downgrade(), + [ + otel::attr::queue_item_ip_packet(), + otel::attr::network_io_direction_transmit(), + ], + )); + tokio::spawn(otel::metrics::periodic_system_queue_length( + inbound_tx.downgrade(), + [ + otel::attr::queue_item_ip_packet(), + otel::attr::network_io_direction_receive(), + ], + )); + std::thread::Builder::new() .name("TUN send".to_owned()) .spawn(move || { diff --git a/rust/client-ffi/src/platform/fallback.rs b/rust/client-ffi/src/platform/fallback.rs index df558db4c..cfd89287b 100644 --- a/rust/client-ffi/src/platform/fallback.rs +++ b/rust/client-ffi/src/platform/fallback.rs @@ -65,8 +65,4 @@ impl tun::Tun for Tun { fn name(&self) -> &str { todo!() } - - fn queue_lengths(&self) -> (usize, usize) { - todo!() - } } diff --git a/rust/connlib/tun/src/lib.rs b/rust/connlib/tun/src/lib.rs index 2c6424751..d069f7c01 100644 --- a/rust/connlib/tun/src/lib.rs +++ b/rust/connlib/tun/src/lib.rs @@ -26,7 +26,4 @@ pub trait Tun: Send + Sync + 'static { /// The name of the TUN device. fn name(&self) -> &str; - - /// The number of inbounds and outbound packets sitting in queues. - fn queue_lengths(&self) -> (usize, usize); } diff --git a/rust/connlib/tunnel/src/device_channel.rs b/rust/connlib/tunnel/src/device_channel.rs index 71f6ed20f..c1de01889 100644 --- a/rust/connlib/tunnel/src/device_channel.rs +++ b/rust/connlib/tunnel/src/device_channel.rs @@ -85,13 +85,6 @@ impl Device { Ok(()) } - pub fn queue_lengths(&self) -> (usize, usize) { - self.tun - .as_ref() - .map(|t| t.queue_lengths()) - .unwrap_or_default() - } - fn tun(&mut self) -> io::Result<&mut dyn Tun> { Ok(self .tun diff --git a/rust/connlib/tunnel/src/io.rs b/rust/connlib/tunnel/src/io.rs index 7411e97fe..df9cb66ee 100644 --- a/rust/connlib/tunnel/src/io.rs +++ b/rust/connlib/tunnel/src/io.rs @@ -61,7 +61,6 @@ pub struct Io { tun: Device, outbound_packet_buffer: VecDeque, packet_counter: opentelemetry::metrics::Counter, - queue_lengths: opentelemetry::metrics::Histogram, } #[derive(Debug)] @@ -128,13 +127,6 @@ impl Io { .u64_counter("system.network.packets") .with_description("The number of packets processed.") .build(), - queue_lengths: opentelemetry::global::meter("connlib") - .u64_histogram("system.queue.length") - .with_description("The length of a queue.") - .with_boundaries(vec![ - 1.0, 2.0, 5.0, 10.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, - ]) - .build(), } } @@ -174,8 +166,6 @@ impl Io { > { ready!(self.flush(cx)?); - self.record_queue_lengths(); - if self.reval_nameserver_interval.poll_tick(cx).is_ready() { self.nameservers.evaluate(); } @@ -431,33 +421,6 @@ impl Io { ) -> io::Result<()> { self.tcp_dns_server.send_response(to, message) } - - /// Records the lengths of our RX und TX queues in a histogram. - fn record_queue_lengths(&self) { - let (tun_rx, tun_tx) = self.tun.queue_lengths(); - - self.queue_lengths.record( - tun_rx as u64, - &[ - otel::attr::network_io_direction_receive(), - otel::attr::queue_item_ip_packet(), - ], - ); - self.queue_lengths.record( - tun_tx as u64, - &[ - otel::attr::network_io_direction_transmit(), - otel::attr::queue_item_ip_packet(), - ], - ); - - let (udp_rx, udp_tx) = self.sockets.queue_lengths(); - - self.queue_lengths - .record(udp_rx as u64, &[otel::attr::queue_item_gro_batch()]); - self.queue_lengths - .record(udp_tx as u64, &[otel::attr::queue_item_gso_batch()]); - } } fn is_max_wg_packet_size(d: &DatagramIn) -> bool { @@ -582,9 +545,5 @@ mod tests { fn name(&self) -> &str { "dummy" } - - fn queue_lengths(&self) -> (usize, usize) { - (0, 0) - } } } diff --git a/rust/connlib/tunnel/src/sockets.rs b/rust/connlib/tunnel/src/sockets.rs index 2e1575a83..91dc0a467 100644 --- a/rust/connlib/tunnel/src/sockets.rs +++ b/rust/connlib/tunnel/src/sockets.rs @@ -120,22 +120,6 @@ impl Sockets { Poll::Ready(Ok(iter)) } - - pub fn queue_lengths(&self) -> (usize, usize) { - let (v4_inbound, v4_outbound) = self - .socket_v4 - .as_ref() - .map(|s| s.queue_lengths()) - .unwrap_or_default(); - - let (v6_inbound, v6_outbound) = self - .socket_v6 - .as_ref() - .map(|s| s.queue_lengths()) - .unwrap_or_default(); - - (v4_inbound + v6_inbound, v4_outbound + v6_outbound) - } } struct PacketIter { @@ -204,6 +188,21 @@ impl ThreadedUdpSocket { let (inbound_tx, inbound_rx) = mpsc::channel(QUEUE_SIZE); let (error_tx, error_rx) = flume::bounded(0); + tokio::spawn(otel::metrics::periodic_system_queue_length( + outbound_tx.downgrade(), + [ + otel::attr::queue_item_gso_batch(), + otel::attr::network_type_for_addr(preferred_addr), + ], + )); + tokio::spawn(otel::metrics::periodic_system_queue_length( + inbound_tx.downgrade(), + [ + otel::attr::queue_item_gro_batch(), + otel::attr::network_type_for_addr(preferred_addr), + ], + )); + let thread_name = match preferred_addr { SocketAddr::V4(_) => "UDP IPv4".to_owned(), SocketAddr::V6(_) => "UDP IPv6".to_owned(), @@ -373,21 +372,6 @@ impl ThreadedUdpSocket { fn channels_mut(&mut self) -> Result<&mut Channels> { self.channels.as_mut().context("Missing channels") } - - fn queue_lengths(&self) -> (usize, usize) { - self.channels - .as_ref() - .map(|c| { - ( - c.inbound_rx.len(), - c.outbound_tx - .get_ref() - .map(|c| QUEUE_SIZE - c.capacity()) - .unwrap_or_default(), - ) - }) - .unwrap_or_default() - } } impl Drop for ThreadedUdpSocket { diff --git a/rust/gateway/src/main.rs b/rust/gateway/src/main.rs index 0c1281fb5..ad4d249d9 100644 --- a/rust/gateway/src/main.rs +++ b/rust/gateway/src/main.rs @@ -364,10 +364,6 @@ impl Tun for ValidateChecksumAdapter { fn name(&self) -> &str { self.inner.name() } - - fn queue_lengths(&self) -> (usize, usize) { - self.inner.queue_lengths() - } } impl ValidateChecksumAdapter { diff --git a/rust/telemetry/Cargo.toml b/rust/telemetry/Cargo.toml index 7cf3fae33..56db18f01 100644 --- a/rust/telemetry/Cargo.toml +++ b/rust/telemetry/Cargo.toml @@ -6,6 +6,7 @@ license = { workspace = true } [dependencies] anyhow = { workspace = true } +flume = { workspace = true } futures = { workspace = true } hex = { workspace = true } ip-packet = { workspace = true } diff --git a/rust/telemetry/src/otel.rs b/rust/telemetry/src/otel.rs index a25c5bdcd..491d6ce76 100644 --- a/rust/telemetry/src/otel.rs +++ b/rust/telemetry/src/otel.rs @@ -114,7 +114,14 @@ pub mod attr { } pub mod metrics { - use opentelemetry::metrics::Counter; + use std::{ops::ControlFlow, time::Duration}; + + use opentelemetry::{ + KeyValue, + metrics::{Counter, Gauge}, + }; + + use crate::otel::QueueLength; pub fn network_packet_dropped() -> Counter { opentelemetry::global::meter("connlib") @@ -123,6 +130,70 @@ pub mod metrics { .with_unit("{packet}") .build() } + + pub async fn periodic_system_queue_length( + queue: impl QueueLength, + attributes: [KeyValue; N], + ) { + let gauge = opentelemetry::global::meter("connlib") + .u64_gauge("system.queue.length") + .with_description("The length of a queue.") + .build(); + + periodic_gauge( + gauge, + |gauge| { + let len = match queue.queue_length() { + Some(len) => len, + None => return ControlFlow::Break(()), + }; + + gauge.record(len, &attributes); + + ControlFlow::Continue(()) + }, + Duration::from_secs(1), + ) + .await; + } + + pub async fn periodic_gauge( + gauge: Gauge, + callback: impl Fn(&Gauge) -> ControlFlow<(), ()>, + interval: Duration, + ) { + while callback(&gauge).is_continue() { + tokio::time::sleep(interval).await; + } + } +} + +pub trait QueueLength: Send + Sync + 'static { + fn queue_length(&self) -> Option; +} + +impl QueueLength for tokio::sync::mpsc::WeakSender +where + T: Send + Sync + 'static, +{ + fn queue_length(&self) -> Option { + let sender = self.upgrade()?; + let len = sender.max_capacity() - sender.capacity(); + + Some(len as u64) + } +} + +impl QueueLength for flume::WeakSender +where + T: Send + Sync + 'static, +{ + fn queue_length(&self) -> Option { + let sender = self.upgrade()?; + let len = sender.len(); + + Some(len as u64) + } } pub fn default_resource_with(attributes: [KeyValue; N]) -> Resource {