refactor(connlib): periodically record queue depths (#10242)

Instead of recording the queue depths on every event-loop tick, we now
record them once a second by setting a Gauge. Not only is that a simpler
instrument to work with but it is significantly more performant. The
current version - when metrics are enabled - takes on quite a bit of CPU
time.

Resolves: #10237
This commit is contained in:
Thomas Eizinger
2025-09-02 12:57:36 +10:00
committed by GitHub
parent 2dd61d7c5c
commit e84bdc5566
15 changed files with 169 additions and 157 deletions

2
rust/Cargo.lock generated
View File

@@ -2344,6 +2344,7 @@ dependencies = [
"dirs",
"dns-types",
"firezone-logging",
"firezone-telemetry",
"futures",
"gat-lending-iterator",
"hex",
@@ -2608,6 +2609,7 @@ name = "firezone-telemetry"
version = "0.1.0"
dependencies = [
"anyhow",
"flume",
"futures",
"hex",
"ip-packet",

View File

@@ -1,3 +1,4 @@
use firezone_telemetry::otel;
use futures::SinkExt as _;
use ip_packet::{IpPacket, IpPacketBuf, IpVersion};
use libc::{AF_INET, AF_INET6, F_GETFL, F_SETFL, O_NONBLOCK, fcntl, iovec, msghdr, recvmsg};
@@ -27,6 +28,21 @@ impl Tun {
let (inbound_tx, inbound_rx) = mpsc::channel(QUEUE_SIZE);
let (outbound_tx, outbound_rx) = mpsc::channel(QUEUE_SIZE);
tokio::spawn(otel::metrics::periodic_system_queue_length(
outbound_tx.downgrade(),
[
otel::attr::queue_item_ip_packet(),
otel::attr::network_io_direction_transmit(),
],
));
tokio::spawn(otel::metrics::periodic_system_queue_length(
inbound_tx.downgrade(),
[
otel::attr::queue_item_ip_packet(),
otel::attr::network_io_direction_receive(),
],
));
std::thread::Builder::new()
.name("TUN send".to_owned())
.spawn(move || {
@@ -81,16 +97,6 @@ impl tun::Tun for Tun {
) -> Poll<usize> {
self.inbound_rx.poll_recv_many(cx, buf, max)
}
fn queue_lengths(&self) -> (usize, usize) {
(
self.inbound_rx.len(),
self.outbound_tx
.get_ref()
.map(|s| QUEUE_SIZE - s.capacity())
.unwrap_or_default(),
)
}
}
fn get_last_error() -> io::Error {

View File

@@ -13,6 +13,7 @@ axum = { workspace = true, features = ["http1", "tokio"] }
clap = { workspace = true, features = ["derive", "env"] }
dns-types = { workspace = true }
firezone-logging = { workspace = true }
firezone-telemetry = { workspace = true }
futures = { workspace = true, features = ["std", "async-await"] }
gat-lending-iterator = { workspace = true }
hex = { workspace = true }

View File

@@ -3,6 +3,7 @@
use crate::FIREZONE_MARK;
use anyhow::{Context as _, Result, anyhow};
use firezone_logging::err_with_src;
use firezone_telemetry::otel;
use futures::{SinkExt, TryStreamExt};
use ip_network::{IpNetwork, Ipv4Network, Ipv6Network};
use ip_packet::{IpPacket, IpPacketBuf};
@@ -347,6 +348,21 @@ impl Tun {
let (inbound_tx, inbound_rx) = mpsc::channel(QUEUE_SIZE);
let (outbound_tx, outbound_rx) = mpsc::channel(QUEUE_SIZE);
tokio::spawn(otel::metrics::periodic_system_queue_length(
outbound_tx.downgrade(),
[
otel::attr::queue_item_ip_packet(),
otel::attr::network_io_direction_transmit(),
],
));
tokio::spawn(otel::metrics::periodic_system_queue_length(
inbound_tx.downgrade(),
[
otel::attr::queue_item_ip_packet(),
otel::attr::network_io_direction_receive(),
],
));
let fd = Arc::new(open_tun()?);
std::thread::Builder::new()
@@ -434,16 +450,6 @@ impl tun::Tun for Tun {
fn name(&self) -> &str {
TunDeviceManager::IFACE_NAME
}
fn queue_lengths(&self) -> (usize, usize) {
(
self.inbound_rx.len(),
self.outbound_tx
.get_ref()
.map(|s| QUEUE_SIZE - s.capacity())
.unwrap_or_default(),
)
}
}
fn get_last_error() -> io::Error {

View File

@@ -3,6 +3,7 @@ use crate::windows::TUNNEL_UUID;
use crate::windows::error::{NOT_FOUND, NOT_SUPPORTED, OBJECT_EXISTS};
use anyhow::{Context as _, Result};
use firezone_logging::err_with_src;
use firezone_telemetry::otel;
use ip_network::{IpNetwork, Ipv4Network, Ipv6Network};
use ip_packet::{IpPacket, IpPacketBuf};
use ring::digest;
@@ -298,6 +299,22 @@ impl Tun {
);
let (outbound_tx, outbound_rx) = mpsc::channel(QUEUE_SIZE);
let (inbound_tx, inbound_rx) = mpsc::channel(QUEUE_SIZE); // We want to be able to batch-receive from this.
tokio::spawn(otel::metrics::periodic_system_queue_length(
outbound_tx.downgrade(),
[
otel::attr::queue_item_ip_packet(),
otel::attr::network_io_direction_transmit(),
],
));
tokio::spawn(otel::metrics::periodic_system_queue_length(
inbound_tx.downgrade(),
[
otel::attr::queue_item_ip_packet(),
otel::attr::network_io_direction_receive(),
],
));
let send_thread = start_send_thread(outbound_rx, Arc::downgrade(&session))
.context("Failed to start send thread")?;
let recv_thread = start_recv_thread(inbound_tx, Arc::downgrade(&session))
@@ -365,21 +382,6 @@ impl tun::Tun for Tun {
Ok(())
}
fn queue_lengths(&self) -> (usize, usize) {
self.state
.as_ref()
.map(|s| {
(
s.inbound_rx.len(),
s.outbound_tx
.get_ref()
.map(|s| QUEUE_SIZE - s.capacity())
.unwrap_or_default(),
)
})
.unwrap_or_default()
}
}
// Moves packets from Internet towards the user

View File

@@ -6,26 +6,20 @@ use std::{collections::BTreeSet, net::IpAddr};
// Passes in CI but not locally. Maybe ReactorScram's dev system has IPv6 misconfigured. There it fails to pick up the IPv6 DNS servers.
#[ignore = "Needs admin, changes system state"]
#[test]
fn dns_control() {
#[tokio::test]
async fn dns_control() {
let _guard = firezone_logging::test("debug");
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
let mut tun_dev_manager = firezone_bin_shared::TunDeviceManager::new(1280).unwrap();
let _tun = tun_dev_manager.make_tun().unwrap();
rt.block_on(async {
tun_dev_manager
.set_ips(
[100, 92, 193, 137].into(),
[0xfd00, 0x2021, 0x1111, 0x0, 0x0, 0x0, 0xa, 0x9db5].into(),
)
.await
})
.unwrap();
tun_dev_manager
.set_ips(
[100, 92, 193, 137].into(),
[0xfd00, 0x2021, 0x1111, 0x0, 0x0, 0x0, 0xa, 0x9db5].into(),
)
.await
.unwrap();
let mut dns_controller = DnsController {
dns_control_method: DnsControlMethod::Nrpt,
@@ -41,12 +35,10 @@ fn dns_control() {
0xfd00, 0x2021, 0x1111, 0x8000, 0x0100, 0x0100, 0x0111, 0x0004,
]),
];
rt.block_on(async {
dns_controller
.set_dns(fz_dns_servers.clone(), None)
.await
.unwrap();
});
dns_controller
.set_dns(fz_dns_servers.clone(), None)
.await
.unwrap();
let adapter = ipconfig::get_adapters()
.unwrap()

View File

@@ -1,3 +1,4 @@
use firezone_telemetry::otel;
use futures::SinkExt as _;
use ip_packet::{IpPacket, IpPacketBuf};
use std::os::fd::{FromRawFd, OwnedFd};
@@ -44,16 +45,6 @@ impl tun::Tun for Tun {
) -> Poll<usize> {
self.inbound_rx.poll_recv_many(cx, buf, max)
}
fn queue_lengths(&self) -> (usize, usize) {
(
self.inbound_rx.len(),
self.outbound_tx
.get_ref()
.map(|s| QUEUE_SIZE - s.capacity())
.unwrap_or_default(),
)
}
}
impl Tun {
@@ -69,6 +60,21 @@ impl Tun {
let (inbound_tx, inbound_rx) = mpsc::channel(QUEUE_SIZE);
let (outbound_tx, outbound_rx) = mpsc::channel(QUEUE_SIZE);
tokio::spawn(otel::metrics::periodic_system_queue_length(
outbound_tx.downgrade(),
[
otel::attr::queue_item_ip_packet(),
otel::attr::network_io_direction_transmit(),
],
));
tokio::spawn(otel::metrics::periodic_system_queue_length(
inbound_tx.downgrade(),
[
otel::attr::queue_item_ip_packet(),
otel::attr::network_io_direction_receive(),
],
));
std::thread::Builder::new()
.name("TUN send".to_owned())
.spawn(move || {

View File

@@ -65,8 +65,4 @@ impl tun::Tun for Tun {
fn name(&self) -> &str {
todo!()
}
fn queue_lengths(&self) -> (usize, usize) {
todo!()
}
}

View File

@@ -26,7 +26,4 @@ pub trait Tun: Send + Sync + 'static {
/// The name of the TUN device.
fn name(&self) -> &str;
/// The number of inbounds and outbound packets sitting in queues.
fn queue_lengths(&self) -> (usize, usize);
}

View File

@@ -85,13 +85,6 @@ impl Device {
Ok(())
}
pub fn queue_lengths(&self) -> (usize, usize) {
self.tun
.as_ref()
.map(|t| t.queue_lengths())
.unwrap_or_default()
}
fn tun(&mut self) -> io::Result<&mut dyn Tun> {
Ok(self
.tun

View File

@@ -61,7 +61,6 @@ pub struct Io {
tun: Device,
outbound_packet_buffer: VecDeque<IpPacket>,
packet_counter: opentelemetry::metrics::Counter<u64>,
queue_lengths: opentelemetry::metrics::Histogram<u64>,
}
#[derive(Debug)]
@@ -128,13 +127,6 @@ impl Io {
.u64_counter("system.network.packets")
.with_description("The number of packets processed.")
.build(),
queue_lengths: opentelemetry::global::meter("connlib")
.u64_histogram("system.queue.length")
.with_description("The length of a queue.")
.with_boundaries(vec![
1.0, 2.0, 5.0, 10.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0,
])
.build(),
}
}
@@ -174,8 +166,6 @@ impl Io {
> {
ready!(self.flush(cx)?);
self.record_queue_lengths();
if self.reval_nameserver_interval.poll_tick(cx).is_ready() {
self.nameservers.evaluate();
}
@@ -431,33 +421,6 @@ impl Io {
) -> io::Result<()> {
self.tcp_dns_server.send_response(to, message)
}
/// Records the lengths of our RX und TX queues in a histogram.
fn record_queue_lengths(&self) {
let (tun_rx, tun_tx) = self.tun.queue_lengths();
self.queue_lengths.record(
tun_rx as u64,
&[
otel::attr::network_io_direction_receive(),
otel::attr::queue_item_ip_packet(),
],
);
self.queue_lengths.record(
tun_tx as u64,
&[
otel::attr::network_io_direction_transmit(),
otel::attr::queue_item_ip_packet(),
],
);
let (udp_rx, udp_tx) = self.sockets.queue_lengths();
self.queue_lengths
.record(udp_rx as u64, &[otel::attr::queue_item_gro_batch()]);
self.queue_lengths
.record(udp_tx as u64, &[otel::attr::queue_item_gso_batch()]);
}
}
fn is_max_wg_packet_size(d: &DatagramIn) -> bool {
@@ -582,9 +545,5 @@ mod tests {
fn name(&self) -> &str {
"dummy"
}
fn queue_lengths(&self) -> (usize, usize) {
(0, 0)
}
}
}

View File

@@ -120,22 +120,6 @@ impl Sockets {
Poll::Ready(Ok(iter))
}
pub fn queue_lengths(&self) -> (usize, usize) {
let (v4_inbound, v4_outbound) = self
.socket_v4
.as_ref()
.map(|s| s.queue_lengths())
.unwrap_or_default();
let (v6_inbound, v6_outbound) = self
.socket_v6
.as_ref()
.map(|s| s.queue_lengths())
.unwrap_or_default();
(v4_inbound + v6_inbound, v4_outbound + v6_outbound)
}
}
struct PacketIter<T4, T6> {
@@ -204,6 +188,21 @@ impl ThreadedUdpSocket {
let (inbound_tx, inbound_rx) = mpsc::channel(QUEUE_SIZE);
let (error_tx, error_rx) = flume::bounded(0);
tokio::spawn(otel::metrics::periodic_system_queue_length(
outbound_tx.downgrade(),
[
otel::attr::queue_item_gso_batch(),
otel::attr::network_type_for_addr(preferred_addr),
],
));
tokio::spawn(otel::metrics::periodic_system_queue_length(
inbound_tx.downgrade(),
[
otel::attr::queue_item_gro_batch(),
otel::attr::network_type_for_addr(preferred_addr),
],
));
let thread_name = match preferred_addr {
SocketAddr::V4(_) => "UDP IPv4".to_owned(),
SocketAddr::V6(_) => "UDP IPv6".to_owned(),
@@ -373,21 +372,6 @@ impl ThreadedUdpSocket {
fn channels_mut(&mut self) -> Result<&mut Channels> {
self.channels.as_mut().context("Missing channels")
}
fn queue_lengths(&self) -> (usize, usize) {
self.channels
.as_ref()
.map(|c| {
(
c.inbound_rx.len(),
c.outbound_tx
.get_ref()
.map(|c| QUEUE_SIZE - c.capacity())
.unwrap_or_default(),
)
})
.unwrap_or_default()
}
}
impl Drop for ThreadedUdpSocket {

View File

@@ -364,10 +364,6 @@ impl Tun for ValidateChecksumAdapter {
fn name(&self) -> &str {
self.inner.name()
}
fn queue_lengths(&self) -> (usize, usize) {
self.inner.queue_lengths()
}
}
impl ValidateChecksumAdapter {

View File

@@ -6,6 +6,7 @@ license = { workspace = true }
[dependencies]
anyhow = { workspace = true }
flume = { workspace = true }
futures = { workspace = true }
hex = { workspace = true }
ip-packet = { workspace = true }

View File

@@ -114,7 +114,14 @@ pub mod attr {
}
pub mod metrics {
use opentelemetry::metrics::Counter;
use std::{ops::ControlFlow, time::Duration};
use opentelemetry::{
KeyValue,
metrics::{Counter, Gauge},
};
use crate::otel::QueueLength;
pub fn network_packet_dropped() -> Counter<u64> {
opentelemetry::global::meter("connlib")
@@ -123,6 +130,70 @@ pub mod metrics {
.with_unit("{packet}")
.build()
}
pub async fn periodic_system_queue_length<const N: usize>(
queue: impl QueueLength,
attributes: [KeyValue; N],
) {
let gauge = opentelemetry::global::meter("connlib")
.u64_gauge("system.queue.length")
.with_description("The length of a queue.")
.build();
periodic_gauge(
gauge,
|gauge| {
let len = match queue.queue_length() {
Some(len) => len,
None => return ControlFlow::Break(()),
};
gauge.record(len, &attributes);
ControlFlow::Continue(())
},
Duration::from_secs(1),
)
.await;
}
pub async fn periodic_gauge<T>(
gauge: Gauge<T>,
callback: impl Fn(&Gauge<T>) -> ControlFlow<(), ()>,
interval: Duration,
) {
while callback(&gauge).is_continue() {
tokio::time::sleep(interval).await;
}
}
}
pub trait QueueLength: Send + Sync + 'static {
fn queue_length(&self) -> Option<u64>;
}
impl<T> QueueLength for tokio::sync::mpsc::WeakSender<T>
where
T: Send + Sync + 'static,
{
fn queue_length(&self) -> Option<u64> {
let sender = self.upgrade()?;
let len = sender.max_capacity() - sender.capacity();
Some(len as u64)
}
}
impl<T> QueueLength for flume::WeakSender<T>
where
T: Send + Sync + 'static,
{
fn queue_length(&self) -> Option<u64> {
let sender = self.upgrade()?;
let len = sender.len();
Some(len as u64)
}
}
pub fn default_resource_with<const N: usize>(attributes: [KeyValue; N]) -> Resource {