From 4e11112d9b4389a4b6498827af6a002f2e7ec5a4 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 21 Aug 2025 09:08:56 +1000 Subject: [PATCH] feat(connlib): improve throughput on higher latencies (#10231) Turns out the multi-threaded access of the TUN device on the Gateway causes packet reordering which makes the TCP congestion controller throttle the connection. Additionally, the default TX queue length of a TUN device on Linux is only 500 packets. With just a single thread and an increased TX queue length, we get a throughput performance of just over 1 GBit/s for a 20ms link between Client and Gateway with basically no packet drops: ``` Connecting to host 172.20.0.110, port 5201 [ 5] local 100.79.130.70 port 49546 connected to 172.20.0.110 port 5201 [ ID] Interval Transfer Bitrate Retr Cwnd [ 5] 0.00-1.00 sec 116 MBytes 977 Mbits/sec 0 6.40 MBytes [ 5] 1.00-2.00 sec 137 MBytes 1.15 Gbits/sec 0 6.40 MBytes [ 5] 2.00-3.00 sec 134 MBytes 1.13 Gbits/sec 0 6.40 MBytes [ 5] 3.00-4.00 sec 136 MBytes 1.14 Gbits/sec 47 6.40 MBytes [ 5] 4.00-5.00 sec 137 MBytes 1.15 Gbits/sec 0 6.40 MBytes [ 5] 5.00-6.00 sec 138 MBytes 1.16 Gbits/sec 0 6.40 MBytes [ 5] 6.00-7.00 sec 138 MBytes 1.15 Gbits/sec 0 6.40 MBytes [ 5] 7.00-8.00 sec 138 MBytes 1.15 Gbits/sec 0 6.40 MBytes [ 5] 8.00-9.00 sec 138 MBytes 1.16 Gbits/sec 0 6.40 MBytes [ 5] 9.00-10.00 sec 138 MBytes 1.15 Gbits/sec 0 6.40 MBytes [ 5] 10.00-11.00 sec 139 MBytes 1.17 Gbits/sec 0 6.40 MBytes [ 5] 11.00-12.00 sec 139 MBytes 1.17 Gbits/sec 0 6.40 MBytes [ 5] 12.00-13.00 sec 136 MBytes 1.14 Gbits/sec 0 6.40 MBytes [ 5] 13.00-14.00 sec 139 MBytes 1.17 Gbits/sec 0 6.40 MBytes [ 5] 14.00-15.00 sec 140 MBytes 1.17 Gbits/sec 0 6.40 MBytes [ 5] 15.00-16.00 sec 138 MBytes 1.16 Gbits/sec 0 6.40 MBytes [ 5] 16.00-17.00 sec 137 MBytes 1.15 Gbits/sec 0 6.40 MBytes [ 5] 17.00-18.00 sec 139 MBytes 1.17 Gbits/sec 0 6.40 MBytes [ 5] 18.00-19.00 sec 138 MBytes 1.16 Gbits/sec 0 6.40 MBytes [ 5] 19.00-20.00 sec 136 MBytes 1.14 Gbits/sec 0 6.40 MBytes - - - - - - - - - - - - - - - - - - - - - - - - - [ ID] Interval Transfer Bitrate Retr [ 5] 0.00-20.00 sec 2.67 GBytes 1.15 Gbits/sec 47 sender [ 5] 0.00-20.02 sec 2.67 GBytes 1.15 Gbits/sec receiver iperf Done. ``` For further debugging in the future, we are now recording the send and receive queue depths of both the TUN device and the UDP sockets. Neither of those showed to be full in my testing which leads me to conclude that it isn't any buffer inside Firezone that is too small here. Related: #7452 --------- Signed-off-by: Thomas Eizinger --- rust/Cargo.lock | 25 +- rust/apple-client-ffi/Cargo.toml | 2 +- rust/apple-client-ffi/src/tun.rs | 23 +- rust/bin-shared/Cargo.toml | 2 +- .../src/tun_device_manager/linux.rs | 122 ++++++---- .../src/tun_device_manager/macos.rs | 2 +- .../src/tun_device_manager/windows.rs | 23 +- rust/bin-shared/tests/dns_control_windows.rs | 2 +- rust/bin-shared/tests/no_packet_loops_tcp.rs | 2 +- rust/bin-shared/tests/no_packet_loops_udp.rs | 2 +- rust/bin-shared/tests/tunnel_drop.rs | 2 +- rust/client-ffi/Cargo.toml | 1 + rust/client-ffi/src/platform/android/tun.rs | 25 +- rust/client-ffi/src/platform/fallback.rs | 4 + .../dns-over-tcp/tests/smoke_server.rs | 2 +- rust/connlib/socket-factory/src/lib.rs | 16 +- rust/connlib/tun/Cargo.toml | 2 - rust/connlib/tun/src/lib.rs | 3 + rust/connlib/tun/src/unix.rs | 5 +- rust/connlib/tunnel/src/device_channel.rs | 7 + rust/connlib/tunnel/src/io.rs | 41 ++++ rust/connlib/tunnel/src/sockets.rs | 23 ++ rust/deny.toml | 221 +++++++++--------- rust/gateway/Cargo.toml | 1 - rust/gateway/src/main.rs | 41 +--- rust/gui-client/src-tauri/src/service.rs | 2 +- rust/headless-client/src/main.rs | 2 +- rust/telemetry/src/otel.rs | 12 + website/src/components/Changelog/Gateway.tsx | 8 +- 29 files changed, 382 insertions(+), 241 deletions(-) diff --git a/rust/Cargo.lock b/rust/Cargo.lock index b78be7834..aea92aab6 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -328,7 +328,6 @@ dependencies = [ "dns-types", "firezone-logging", "firezone-telemetry", - "flume", "futures", "ip-packet", "ip_network", @@ -342,6 +341,7 @@ dependencies = [ "swift-bridge", "swift-bridge-build", "tokio", + "tokio-util", "tracing", "tracing-appender", "tracing-subscriber", @@ -1327,6 +1327,7 @@ dependencies = [ "socket-factory", "thiserror 2.0.15", "tokio", + "tokio-util", "tracing", "tracing-appender", "tracing-subscriber", @@ -2340,7 +2341,6 @@ dependencies = [ "dirs", "dns-types", "firezone-logging", - "flume", "futures", "gat-lending-iterator", "hex", @@ -2403,7 +2403,6 @@ dependencies = [ "libc", "moka", "nix 0.30.1", - "num_cpus", "opentelemetry", "opentelemetry-otlp", "opentelemetry-stdout", @@ -3336,12 +3335,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fbf6a919d6cf397374f7dfeeea91d974c7c0a7221d0d0f4f20d859d329e53fcc" -[[package]] -name = "hermit-abi" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f154ce46856750ed433c8649605bf7ed2de3bc35fd9d2a9f30cddd873c80cb08" - [[package]] name = "hex" version = "0.4.3" @@ -4722,16 +4715,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "num_cpus" -version = "1.17.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b" -dependencies = [ - "hermit-abi 0.5.1", - "libc", -] - [[package]] name = "num_enum" version = "0.7.3" @@ -5473,7 +5456,7 @@ checksum = "a604568c3202727d1507653cb121dbd627a58684eb09a820fd746bee38b4442f" dependencies = [ "cfg-if", "concurrent-queue", - "hermit-abi 0.4.0", + "hermit-abi", "pin-project-lite", "rustix 0.38.44", "tracing", @@ -8392,8 +8375,6 @@ name = "tun" version = "0.1.0" dependencies = [ "anyhow", - "flume", - "futures", "ip-packet", "libc", "tokio", diff --git a/rust/apple-client-ffi/Cargo.toml b/rust/apple-client-ffi/Cargo.toml index c31f97d41..b34ec8d4d 100644 --- a/rust/apple-client-ffi/Cargo.toml +++ b/rust/apple-client-ffi/Cargo.toml @@ -18,7 +18,6 @@ connlib-model = { workspace = true } dns-types = { workspace = true } firezone-logging = { workspace = true } firezone-telemetry = { workspace = true } -flume = { workspace = true } futures = { workspace = true } ip-packet = { workspace = true } ip_network = { workspace = true } @@ -30,6 +29,7 @@ serde_json = { workspace = true } socket-factory = { workspace = true } swift-bridge = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "sync"] } +tokio-util = { workspace = true } tracing = { workspace = true } tracing-appender = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/rust/apple-client-ffi/src/tun.rs b/rust/apple-client-ffi/src/tun.rs index 2969fe40c..0f18fe647 100644 --- a/rust/apple-client-ffi/src/tun.rs +++ b/rust/apple-client-ffi/src/tun.rs @@ -7,11 +7,14 @@ use std::{ os::fd::{AsRawFd as _, RawFd}, }; use tokio::sync::mpsc; +use tokio_util::sync::PollSender; + +const QUEUE_SIZE: usize = 10_000; #[derive(Debug)] pub struct Tun { name: String, - outbound_tx: flume::r#async::SendSink<'static, IpPacket>, + outbound_tx: PollSender, inbound_rx: mpsc::Receiver, } @@ -21,14 +24,14 @@ impl Tun { set_non_blocking(fd)?; let name = name(fd)?; - let (inbound_tx, inbound_rx) = mpsc::channel(1000); - let (outbound_tx, outbound_rx) = flume::bounded(1000); // flume is an MPMC channel, therefore perfect for workstealing outbound packets. + let (inbound_tx, inbound_rx) = mpsc::channel(QUEUE_SIZE); + let (outbound_tx, outbound_rx) = mpsc::channel(QUEUE_SIZE); std::thread::Builder::new() .name("TUN send".to_owned()) .spawn(move || { firezone_logging::unwrap_or_warn!( - tun::unix::tun_send(fd, outbound_rx.into_stream(), write), + tun::unix::tun_send(fd, outbound_rx, write), "Failed to send to TUN device: {}" ) }) @@ -45,7 +48,7 @@ impl Tun { Ok(Tun { name, - outbound_tx: outbound_tx.into_sink(), + outbound_tx: PollSender::new(outbound_tx), inbound_rx, }) } @@ -78,6 +81,16 @@ impl tun::Tun for Tun { ) -> Poll { self.inbound_rx.poll_recv_many(cx, buf, max) } + + fn queue_lengths(&self) -> (usize, usize) { + ( + self.inbound_rx.len(), + self.outbound_tx + .get_ref() + .map(|s| QUEUE_SIZE - s.capacity()) + .unwrap_or_default(), + ) + } } fn get_last_error() -> io::Error { diff --git a/rust/bin-shared/Cargo.toml b/rust/bin-shared/Cargo.toml index 8f4227ca3..9d7b2b979 100644 --- a/rust/bin-shared/Cargo.toml +++ b/rust/bin-shared/Cargo.toml @@ -33,13 +33,13 @@ uuid = { workspace = true, features = ["v4"] } [target.'cfg(target_os = "linux")'.dependencies] atomicwrites = { workspace = true } dirs = { workspace = true } -flume = { workspace = true } libc = { workspace = true } netlink-packet-core = { workspace = true } netlink-packet-route = { workspace = true } nix = { workspace = true, features = ["socket"] } resolv-conf = { workspace = true } rtnetlink = { workspace = true } +tokio-util = { workspace = true } zbus = { workspace = true } # Can't use `zbus`'s `tokio` feature here, or it will break toast popups all the way over in `gui-client`. [target.'cfg(windows)'.dependencies] diff --git a/rust/bin-shared/src/tun_device_manager/linux.rs b/rust/bin-shared/src/tun_device_manager/linux.rs index f35ff0a86..5a430a35a 100644 --- a/rust/bin-shared/src/tun_device_manager/linux.rs +++ b/rust/bin-shared/src/tun_device_manager/linux.rs @@ -10,6 +10,7 @@ use libc::{ EEXIST, ENOENT, ESRCH, F_GETFL, F_SETFL, O_NONBLOCK, O_RDWR, S_IFCHR, fcntl, makedev, mknod, open, }; +use netlink_packet_route::link::LinkAttribute; use netlink_packet_route::route::{RouteMessage, RouteProtocol, RouteScope}; use netlink_packet_route::rule::RuleAction; use rtnetlink::{Error::NetlinkError, Handle, RuleAddRequest, new_connection}; @@ -28,6 +29,7 @@ use std::{ os::{fd::RawFd, unix::fs::PermissionsExt}, }; use tokio::sync::mpsc; +use tokio_util::sync::PollSender; use tun::ioctl; const TUNSETIFF: libc::c_ulong = 0x4004_54ca; @@ -41,7 +43,6 @@ const FIREZONE_TABLE: u32 = 0x2021_fd00; /// For lack of a better name pub struct TunDeviceManager { mtu: u32, - num_threads: usize, connection: Connection, routes: HashSet, } @@ -63,7 +64,7 @@ impl TunDeviceManager { /// Creates a new managed tunnel device. /// /// Panics if called without a Tokio runtime. - pub fn new(mtu: usize, num_threads: usize) -> Result { + pub fn new(mtu: usize) -> Result { let (cxn, handle, _) = new_connection().context("Failed to create netlink connection")?; let task = tokio::spawn(cxn); let connection = Connection { handle, task }; @@ -72,12 +73,26 @@ impl TunDeviceManager { connection, routes: Default::default(), mtu: mtu as u32, - num_threads, }) } pub fn make_tun(&mut self) -> Result> { - Ok(Box::new(Tun::new(self.num_threads)?)) + let tun = Box::new(Tun::new()?); + + // Do this in a separate task because: + // a) We want it to be infallible. + // b) We don't want `async` to creep into the API. + tokio::spawn({ + let handle = self.connection.handle.clone(); + + async move { + if let Err(e) = set_txqueue_length(handle, 10_000).await { + tracing::warn!("Failed to set TX queue length: {e}") + } + } + }); + + Ok(tun) } #[tracing::instrument(level = "trace", skip(self))] @@ -197,6 +212,31 @@ impl TunDeviceManager { } } +async fn set_txqueue_length(handle: Handle, queue_len: u32) -> Result<()> { + let index = handle + .link() + .get() + .match_name(TunDeviceManager::IFACE_NAME.to_string()) + .execute() + .try_next() + .await? + .context("No interface")? + .header + .index; + + handle + .link() + .set( + LinkUnspec::new_with_index(index) + .append_extra_attribute(LinkAttribute::TxQueueLen(queue_len)) + .build(), + ) + .execute() + .await?; + + Ok(()) +} + fn make_rule(handle: &Handle) -> RuleAddRequest { let mut rule = handle .rule() @@ -292,54 +332,48 @@ async fn remove_route(route: &IpNetwork, idx: u32, handle: &Handle) { tracing::warn!(%route, "Failed to remove route: {}", err_with_src(&err)); } +const QUEUE_SIZE: usize = 10_000; + #[derive(Debug)] pub struct Tun { - outbound_tx: flume::r#async::SendSink<'static, IpPacket>, + outbound_tx: PollSender, inbound_rx: mpsc::Receiver, } impl Tun { - pub fn new(num_threads: usize) -> Result { + pub fn new() -> Result { create_tun_device()?; - let (inbound_tx, inbound_rx) = mpsc::channel(1000); - let (outbound_tx, outbound_rx) = flume::bounded(1000); // flume is an MPMC channel, therefore perfect for workstealing outbound packets. + let (inbound_tx, inbound_rx) = mpsc::channel(QUEUE_SIZE); + let (outbound_tx, outbound_rx) = mpsc::channel(QUEUE_SIZE); - for n in 0..num_threads { - let fd = Arc::new(open_tun()?); - let outbound_rx = outbound_rx.clone().into_stream(); - let inbound_tx = inbound_tx.clone(); + let fd = Arc::new(open_tun()?); - std::thread::Builder::new() - .name(format!("TUN send {n}/{num_threads}")) - .spawn({ - let fd = fd.clone(); + std::thread::Builder::new() + .name("TUN send".to_owned()) + .spawn({ + let fd = fd.clone(); - move || { - firezone_logging::unwrap_or_warn!( - tun::unix::tun_send(fd, outbound_rx, write), - "Failed to send to TUN device: {}" - ) - } - }) - .map_err(io::Error::other)?; - std::thread::Builder::new() - .name(format!("TUN recv {n}/{num_threads}")) - .spawn({ - let fd = fd.clone(); - - move || { - firezone_logging::unwrap_or_warn!( - tun::unix::tun_recv(fd, inbound_tx, read), - "Failed to recv from TUN device: {}" - ) - } - }) - .map_err(io::Error::other)?; - } + move || { + firezone_logging::unwrap_or_warn!( + tun::unix::tun_send(fd, outbound_rx, write), + "Failed to send to TUN device: {}" + ) + } + }) + .map_err(io::Error::other)?; + std::thread::Builder::new() + .name("TUN recv".to_owned()) + .spawn(move || { + firezone_logging::unwrap_or_warn!( + tun::unix::tun_recv(fd, inbound_tx, read), + "Failed to recv from TUN device: {}" + ) + }) + .map_err(io::Error::other)?; Ok(Self { - outbound_tx: outbound_tx.into_sink(), + outbound_tx: PollSender::new(outbound_tx), inbound_rx, }) } @@ -400,6 +434,16 @@ impl tun::Tun for Tun { fn name(&self) -> &str { TunDeviceManager::IFACE_NAME } + + fn queue_lengths(&self) -> (usize, usize) { + ( + self.inbound_rx.len(), + self.outbound_tx + .get_ref() + .map(|s| QUEUE_SIZE - s.capacity()) + .unwrap_or_default(), + ) + } } fn get_last_error() -> io::Error { diff --git a/rust/bin-shared/src/tun_device_manager/macos.rs b/rust/bin-shared/src/tun_device_manager/macos.rs index 86f45dc05..3e6541b6f 100644 --- a/rust/bin-shared/src/tun_device_manager/macos.rs +++ b/rust/bin-shared/src/tun_device_manager/macos.rs @@ -7,7 +7,7 @@ use tun::Tun; pub struct TunDeviceManager {} impl TunDeviceManager { - pub fn new(_mtu: usize, _num_threads: usize) -> Result { + pub fn new(_mtu: usize) -> Result { bail!("Not implemented") } diff --git a/rust/bin-shared/src/tun_device_manager/windows.rs b/rust/bin-shared/src/tun_device_manager/windows.rs index 57267af77..67df51867 100644 --- a/rust/bin-shared/src/tun_device_manager/windows.rs +++ b/rust/bin-shared/src/tun_device_manager/windows.rs @@ -46,6 +46,8 @@ use wintun::Adapter; /// where that is configured. const RING_BUFFER_SIZE: u32 = 0x10_0000; +const QUEUE_SIZE: usize = 1000; + pub struct TunDeviceManager { mtu: u32, @@ -59,7 +61,7 @@ pub struct TunDeviceManager { impl TunDeviceManager { #[expect(clippy::unnecessary_wraps, reason = "Fallible on Linux")] - pub fn new(mtu: usize, _num_threads: usize) -> Result { + pub fn new(mtu: usize) -> Result { Ok(Self { iface_idx: None, luid: None, @@ -290,8 +292,8 @@ impl Tun { .start_session(RING_BUFFER_SIZE) .context("Failed to start session")?, ); - let (outbound_tx, outbound_rx) = mpsc::channel(1000); - let (inbound_tx, inbound_rx) = mpsc::channel(1000); // We want to be able to batch-receive from this. + let (outbound_tx, outbound_rx) = mpsc::channel(QUEUE_SIZE); + let (inbound_tx, inbound_rx) = mpsc::channel(QUEUE_SIZE); // We want to be able to batch-receive from this. let send_thread = start_send_thread(outbound_rx, Arc::downgrade(&session)) .context("Failed to start send thread")?; let recv_thread = start_recv_thread(inbound_tx, Arc::downgrade(&session)) @@ -359,6 +361,21 @@ impl tun::Tun for Tun { Ok(()) } + + fn queue_lengths(&self) -> (usize, usize) { + self.state + .as_ref() + .map(|s| { + ( + s.inbound_rx.len(), + s.outbound_tx + .get_ref() + .map(|s| QUEUE_SIZE - s.capacity()) + .unwrap_or_default(), + ) + }) + .unwrap_or_default() + } } // Moves packets from Internet towards the user diff --git a/rust/bin-shared/tests/dns_control_windows.rs b/rust/bin-shared/tests/dns_control_windows.rs index dfed79147..de967ef2c 100644 --- a/rust/bin-shared/tests/dns_control_windows.rs +++ b/rust/bin-shared/tests/dns_control_windows.rs @@ -14,7 +14,7 @@ fn dns_control() { .build() .unwrap(); - let mut tun_dev_manager = firezone_bin_shared::TunDeviceManager::new(1280, 1).unwrap(); // Note: num_threads (`1`) is unused on windows. + let mut tun_dev_manager = firezone_bin_shared::TunDeviceManager::new(1280).unwrap(); let _tun = tun_dev_manager.make_tun().unwrap(); rt.block_on(async { diff --git a/rust/bin-shared/tests/no_packet_loops_tcp.rs b/rust/bin-shared/tests/no_packet_loops_tcp.rs index 3c3ea6531..d48331ff4 100644 --- a/rust/bin-shared/tests/no_packet_loops_tcp.rs +++ b/rust/bin-shared/tests/no_packet_loops_tcp.rs @@ -15,7 +15,7 @@ async fn no_packet_loops_tcp() { let ipv4 = Ipv4Addr::from([100, 90, 215, 97]); let ipv6 = Ipv6Addr::from([0xfd00, 0x2021, 0x1111, 0x0, 0x0, 0x0, 0x0016, 0x588f]); - let mut device_manager = TunDeviceManager::new(1280, 1).unwrap(); + let mut device_manager = TunDeviceManager::new(1280).unwrap(); let _tun = device_manager.make_tun().unwrap(); device_manager.set_ips(ipv4, ipv6).await.unwrap(); diff --git a/rust/bin-shared/tests/no_packet_loops_udp.rs b/rust/bin-shared/tests/no_packet_loops_udp.rs index cc5c8e5aa..55148d206 100644 --- a/rust/bin-shared/tests/no_packet_loops_udp.rs +++ b/rust/bin-shared/tests/no_packet_loops_udp.rs @@ -24,7 +24,7 @@ async fn no_packet_loops_udp() { let bufferpool = BufferPool::::new(0, "test"); - let mut device_manager = TunDeviceManager::new(1280, 1).unwrap(); + let mut device_manager = TunDeviceManager::new(1280).unwrap(); let _tun = device_manager.make_tun().unwrap(); device_manager.set_ips(ipv4, ipv6).await.unwrap(); diff --git a/rust/bin-shared/tests/tunnel_drop.rs b/rust/bin-shared/tests/tunnel_drop.rs index 3cc779ea6..4e1c8b397 100644 --- a/rust/bin-shared/tests/tunnel_drop.rs +++ b/rust/bin-shared/tests/tunnel_drop.rs @@ -9,7 +9,7 @@ use firezone_bin_shared::TunDeviceManager; async fn tunnel_drop() { firezone_logging::test_global("debug"); // `Tun` uses threads and we want to see the logs of all threads. - let mut tun_device_manager = TunDeviceManager::new(1280, 1).unwrap(); + let mut tun_device_manager = TunDeviceManager::new(1280).unwrap(); // Each cycle takes about half a second, so this will take a fair bit to run. for _ in 0..50 { diff --git a/rust/client-ffi/Cargo.toml b/rust/client-ffi/Cargo.toml index 77a134ddc..c45fa840c 100644 --- a/rust/client-ffi/Cargo.toml +++ b/rust/client-ffi/Cargo.toml @@ -30,6 +30,7 @@ serde_json = { workspace = true } socket-factory = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "sync"] } +tokio-util = { workspace = true } tracing = { workspace = true, features = ["std", "attributes"] } tracing-appender = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/rust/client-ffi/src/platform/android/tun.rs b/rust/client-ffi/src/platform/android/tun.rs index a17c409db..f72c17bb9 100644 --- a/rust/client-ffi/src/platform/android/tun.rs +++ b/rust/client-ffi/src/platform/android/tun.rs @@ -4,12 +4,15 @@ use std::os::fd::{FromRawFd, OwnedFd}; use std::task::{Context, Poll}; use std::{io, os::fd::RawFd}; use tokio::sync::mpsc; +use tokio_util::sync::PollSender; use tun::ioctl; +const QUEUE_SIZE: usize = 1000; + #[derive(Debug)] pub struct Tun { name: String, - outbound_tx: flume::r#async::SendSink<'static, IpPacket>, + outbound_tx: PollSender, inbound_rx: mpsc::Receiver, _fd: OwnedFd, } @@ -41,6 +44,16 @@ impl tun::Tun for Tun { ) -> Poll { self.inbound_rx.poll_recv_many(cx, buf, max) } + + fn queue_lengths(&self) -> (usize, usize) { + ( + self.inbound_rx.len(), + self.outbound_tx + .get_ref() + .map(|s| QUEUE_SIZE - s.capacity()) + .unwrap_or_default(), + ) + } } impl Tun { @@ -53,16 +66,14 @@ impl Tun { pub unsafe fn from_fd(fd: RawFd) -> io::Result { let name = unsafe { interface_name(fd)? }; - let (inbound_tx, inbound_rx) = mpsc::channel(1000); - let (outbound_tx, outbound_rx) = flume::bounded(1000); // flume is an MPMC channel, therefore perfect for workstealing outbound packets. - - // TODO: Test whether we can set `IFF_MULTI_QUEUE` on Android devices. + let (inbound_tx, inbound_rx) = mpsc::channel(QUEUE_SIZE); + let (outbound_tx, outbound_rx) = mpsc::channel(QUEUE_SIZE); std::thread::Builder::new() .name("TUN send".to_owned()) .spawn(move || { firezone_logging::unwrap_or_warn!( - tun::unix::tun_send(fd, outbound_rx.into_stream(), write), + tun::unix::tun_send(fd, outbound_rx, write), "Failed to send to TUN device: {}" ) }) @@ -79,7 +90,7 @@ impl Tun { Ok(Tun { name, - outbound_tx: outbound_tx.into_sink(), + outbound_tx: PollSender::new(outbound_tx), inbound_rx, _fd: unsafe { OwnedFd::from_raw_fd(fd) }, // `OwnedFd` will close the fd on drop. }) diff --git a/rust/client-ffi/src/platform/fallback.rs b/rust/client-ffi/src/platform/fallback.rs index cfd89287b..df558db4c 100644 --- a/rust/client-ffi/src/platform/fallback.rs +++ b/rust/client-ffi/src/platform/fallback.rs @@ -65,4 +65,8 @@ impl tun::Tun for Tun { fn name(&self) -> &str { todo!() } + + fn queue_lengths(&self) -> (usize, usize) { + todo!() + } } diff --git a/rust/connlib/dns-over-tcp/tests/smoke_server.rs b/rust/connlib/dns-over-tcp/tests/smoke_server.rs index a9f424712..bc5514a7d 100644 --- a/rust/connlib/dns-over-tcp/tests/smoke_server.rs +++ b/rust/connlib/dns-over-tcp/tests/smoke_server.rs @@ -23,7 +23,7 @@ async fn smoke() { let ipv4 = Ipv4Addr::from([100, 90, 215, 97]); let ipv6 = Ipv6Addr::from([0xfd00, 0x2021, 0x1111, 0x0, 0x0, 0x0, 0x0016, 0x588f]); - let mut device_manager = TunDeviceManager::new(1280, 1).unwrap(); + let mut device_manager = TunDeviceManager::new(1280).unwrap(); let tun = device_manager.make_tun().unwrap(); device_manager.set_ips(ipv4, ipv6).await.unwrap(); device_manager diff --git a/rust/connlib/socket-factory/src/lib.rs b/rust/connlib/socket-factory/src/lib.rs index 3281e7b04..07a71695e 100644 --- a/rust/connlib/socket-factory/src/lib.rs +++ b/rust/connlib/socket-factory/src/lib.rs @@ -163,7 +163,7 @@ pub struct UdpSocket { /// A buffer pool for batches of incoming UDP packets. buffer_pool: BufferPool>, - gro_batch_histogram: opentelemetry::metrics::Histogram, + batch_histogram: opentelemetry::metrics::Histogram, port: u16, } @@ -184,7 +184,7 @@ impl UdpSocket { IpAddr::V6(_) => "udp-socket-v6", }, ), - gro_batch_histogram: opentelemetry::global::meter("connlib") + batch_histogram: opentelemetry::global::meter("connlib") .u64_histogram("system.network.packets.batch_count") .with_description( "How many batches of packets we have processed in a single syscall.", @@ -286,7 +286,7 @@ impl UdpSocket { .await .context("Failed to read from socket")?; - self.gro_batch_histogram.record( + self.batch_histogram.record( len as u64, &[ KeyValue::new("network.transport", "udp"), @@ -350,6 +350,16 @@ impl UdpSocket { } async fn send_inner(&self, chunk: Transmit<'_>) -> io::Result<()> { + let batch_size = chunk.contents.len() / chunk.segment_size.unwrap_or(chunk.contents.len()); + + self.batch_histogram.record( + batch_size as u64, + &[ + KeyValue::new("network.transport", "udp"), + KeyValue::new("network.io.direction", "transmit"), + ], + ); + self.inner .async_io(Interest::WRITABLE, || { match self.state.try_send((&self.inner).into(), &chunk) { diff --git a/rust/connlib/tun/Cargo.toml b/rust/connlib/tun/Cargo.toml index b5cd6e922..3f4d35495 100644 --- a/rust/connlib/tun/Cargo.toml +++ b/rust/connlib/tun/Cargo.toml @@ -10,8 +10,6 @@ anyhow = { workspace = true } ip-packet = { workspace = true } [target.'cfg(target_family = "unix")'.dependencies] -flume = { workspace = true } -futures = { workspace = true } libc = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } diff --git a/rust/connlib/tun/src/lib.rs b/rust/connlib/tun/src/lib.rs index d069f7c01..2c6424751 100644 --- a/rust/connlib/tun/src/lib.rs +++ b/rust/connlib/tun/src/lib.rs @@ -26,4 +26,7 @@ pub trait Tun: Send + Sync + 'static { /// The name of the TUN device. fn name(&self) -> &str; + + /// The number of inbounds and outbound packets sitting in queues. + fn queue_lengths(&self) -> (usize, usize); } diff --git a/rust/connlib/tun/src/unix.rs b/rust/connlib/tun/src/unix.rs index 065dfcc9d..6f3e09ea1 100644 --- a/rust/connlib/tun/src/unix.rs +++ b/rust/connlib/tun/src/unix.rs @@ -1,5 +1,4 @@ use anyhow::{Context as _, Result, bail}; -use futures::StreamExt as _; use ip_packet::{IpPacket, IpPacketBuf}; use std::io; use std::os::fd::AsRawFd; @@ -8,7 +7,7 @@ use tokio::sync::mpsc; pub fn tun_send( fd: T, - mut outbound_rx: flume::r#async::RecvStream<'_, IpPacket>, + mut outbound_rx: mpsc::Receiver, write: impl Fn(i32, &IpPacket) -> std::result::Result, ) -> Result<()> where @@ -21,7 +20,7 @@ where .block_on(async move { let fd = AsyncFd::with_interest(fd, tokio::io::Interest::WRITABLE)?; - while let Some(packet) = outbound_rx.next().await { + while let Some(packet) = outbound_rx.recv().await { if let Err(e) = fd .async_io(tokio::io::Interest::WRITABLE, |fd| { write(fd.as_raw_fd(), &packet) diff --git a/rust/connlib/tunnel/src/device_channel.rs b/rust/connlib/tunnel/src/device_channel.rs index c1de01889..71f6ed20f 100644 --- a/rust/connlib/tunnel/src/device_channel.rs +++ b/rust/connlib/tunnel/src/device_channel.rs @@ -85,6 +85,13 @@ impl Device { Ok(()) } + pub fn queue_lengths(&self) -> (usize, usize) { + self.tun + .as_ref() + .map(|t| t.queue_lengths()) + .unwrap_or_default() + } + fn tun(&mut self) -> io::Result<&mut dyn Tun> { Ok(self .tun diff --git a/rust/connlib/tunnel/src/io.rs b/rust/connlib/tunnel/src/io.rs index 75d86ec68..a0afc68ec 100644 --- a/rust/connlib/tunnel/src/io.rs +++ b/rust/connlib/tunnel/src/io.rs @@ -61,6 +61,7 @@ pub struct Io { tun: Device, outbound_packet_buffer: VecDeque, packet_counter: opentelemetry::metrics::Counter, + queue_lengths: opentelemetry::metrics::Histogram, } #[derive(Debug)] @@ -127,6 +128,13 @@ impl Io { .u64_counter("system.network.packets") .with_description("The number of packets processed.") .build(), + queue_lengths: opentelemetry::global::meter("connlib") + .u64_histogram("system.queue.length") + .with_description("The length of a queue.") + .with_boundaries(vec![ + 1.0, 2.0, 5.0, 10.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, + ]) + .build(), } } @@ -166,6 +174,8 @@ impl Io { > { ready!(self.flush(cx)?); + self.record_queue_lengths(); + if self.reval_nameserver_interval.poll_tick(cx).is_ready() { self.nameservers.evaluate(); } @@ -421,6 +431,33 @@ impl Io { ) -> io::Result<()> { self.tcp_dns_server.send_response(to, message) } + + /// Records the lengths of our RX und TX queues in a histogram. + fn record_queue_lengths(&self) { + let (tun_rx, tun_tx) = self.tun.queue_lengths(); + + self.queue_lengths.record( + tun_rx as u64, + &[ + otel::attr::network_io_direction_receive(), + otel::attr::queue_item_ip_packet(), + ], + ); + self.queue_lengths.record( + tun_tx as u64, + &[ + otel::attr::network_io_direction_transmit(), + otel::attr::queue_item_ip_packet(), + ], + ); + + let (udp_rx, udp_tx) = self.sockets.queue_lengths(); + + self.queue_lengths + .record(udp_rx as u64, &[otel::attr::queue_item_gro_batch()]); + self.queue_lengths + .record(udp_tx as u64, &[otel::attr::queue_item_gso_batch()]); + } } fn is_max_wg_packet_size(d: &DatagramIn) -> bool { @@ -545,5 +582,9 @@ mod tests { fn name(&self) -> &str { "dummy" } + + fn queue_lengths(&self) -> (usize, usize) { + (0, 0) + } } } diff --git a/rust/connlib/tunnel/src/sockets.rs b/rust/connlib/tunnel/src/sockets.rs index 73e901d82..73422b6a3 100644 --- a/rust/connlib/tunnel/src/sockets.rs +++ b/rust/connlib/tunnel/src/sockets.rs @@ -117,6 +117,22 @@ impl Sockets { Poll::Ready(Ok(iter)) } + + pub fn queue_lengths(&self) -> (usize, usize) { + let (v4_inbound, v4_outbound) = self + .socket_v4 + .as_ref() + .map(|s| s.queue_lengths()) + .unwrap_or_default(); + + let (v6_inbound, v6_outbound) = self + .socket_v6 + .as_ref() + .map(|s| s.queue_lengths()) + .unwrap_or_default(); + + (v4_inbound + v6_inbound, v4_outbound + v6_outbound) + } } struct PacketIter { @@ -333,6 +349,13 @@ impl ThreadedUdpSocket { fn channels_mut(&mut self) -> Result<&mut Channels> { self.channels.as_mut().context("Missing channels") } + + fn queue_lengths(&self) -> (usize, usize) { + self.channels + .as_ref() + .map(|c| (c.inbound_rx.len(), c.outbound_tx.len())) + .unwrap_or_default() + } } impl Drop for ThreadedUdpSocket { diff --git a/rust/deny.toml b/rust/deny.toml index 9993c7aa5..29aafa062 100644 --- a/rust/deny.toml +++ b/rust/deny.toml @@ -23,13 +23,13 @@ # dependencies not shared by any other crates, would be ignored, as the target # list here is effectively saying which targets you are building for. targets = [ - # The triple can be any string, but only the target triples built in to - # rustc (as of 1.40) can be checked against actual config expressions - #"x86_64-unknown-linux-musl", - # You can also specify which target_features you promise are enabled for a - # particular target. target_features are currently not validated against - # the actual valid features supported by the target architecture. - #{ triple = "wasm32-unknown-unknown", features = ["atomics"] }, + # The triple can be any string, but only the target triples built in to + # rustc (as of 1.40) can be checked against actual config expressions + #"x86_64-unknown-linux-musl", + # You can also specify which target_features you promise are enabled for a + # particular target. target_features are currently not validated against + # the actual valid features supported by the target architecture. + #{ triple = "wasm32-unknown-unknown", features = ["atomics"] }, ] # When creating the dependency graph used as the source of truth when checks are # executed, this field can be used to prune crates from the graph, removing them @@ -70,31 +70,31 @@ feature-depth = 1 # A list of advisory IDs to ignore. Note that ignored advisories will still # output a note when they are encountered. ignore = [ - "RUSTSEC-2020-0095", # `difference` is unmaintained - "RUSTSEC-2024-0384", # `instant` is unmaintained - "RUSTSEC-2024-0370", # `proc-macro-error` is unmaintained + "RUSTSEC-2020-0095", # `difference` is unmaintained + "RUSTSEC-2024-0384", # `instant` is unmaintained + "RUSTSEC-2024-0370", # `proc-macro-error` is unmaintained - # `gtk-rs` crates are unmaintained - "RUSTSEC-2024-0411", - "RUSTSEC-2024-0412", - "RUSTSEC-2024-0413", - "RUSTSEC-2024-0414", - "RUSTSEC-2024-0415", - "RUSTSEC-2024-0416", - "RUSTSEC-2024-0417", - "RUSTSEC-2024-0418", - "RUSTSEC-2024-0419", - "RUSTSEC-2024-0420", + # `gtk-rs` crates are unmaintained + "RUSTSEC-2024-0411", + "RUSTSEC-2024-0412", + "RUSTSEC-2024-0413", + "RUSTSEC-2024-0414", + "RUSTSEC-2024-0415", + "RUSTSEC-2024-0416", + "RUSTSEC-2024-0417", + "RUSTSEC-2024-0418", + "RUSTSEC-2024-0419", + "RUSTSEC-2024-0420", - "RUSTSEC-2025-0012", # backoff, See #8386 - "RUSTSEC-2024-0436", # paste, See #8387 + "RUSTSEC-2025-0012", # backoff, See #8386 + "RUSTSEC-2024-0436", # paste, See #8387 - "RUSTSEC-2024-0429", # `glib`, need to wait for tauri to upgrade + "RUSTSEC-2024-0429", # `glib`, need to wait for tauri to upgrade - #"RUSTSEC-0000-0000", - #{ id = "RUSTSEC-0000-0000", reason = "you can specify a reason the advisory is ignored" }, - #"a-crate-that-is-yanked@0.1.1", # you can also ignore yanked crate versions if you wish - #{ crate = "a-crate-that-is-yanked@0.1.1", reason = "you can specify why you are ignoring the yanked crate" }, + #"RUSTSEC-0000-0000", + #{ id = "RUSTSEC-0000-0000", reason = "you can specify a reason the advisory is ignored" }, + #"a-crate-that-is-yanked@0.1.1", # you can also ignore yanked crate versions if you wish + #{ crate = "a-crate-that-is-yanked@0.1.1", reason = "you can specify why you are ignoring the yanked crate" }, ] # If this is true, then cargo deny will use the git executable to fetch advisory database. # If this is false, then it uses a built-in git library. @@ -110,19 +110,19 @@ ignore = [ # See https://spdx.org/licenses/ for list of possible licenses # [possible values: any SPDX 3.11 short identifier (+ optional exception)]. allow = [ - "MIT", - "Apache-2.0", - "Apache-2.0 WITH LLVM-exception", - "BSD-2-Clause", - "BSD-3-Clause", - "MPL-2.0", - "ISC", - "0BSD", - "Unicode-3.0", - "BSL-1.0", - "Zlib", - "CDLA-Permissive-2.0", - "NCSA", + "MIT", + "Apache-2.0", + "Apache-2.0 WITH LLVM-exception", + "BSD-2-Clause", + "BSD-3-Clause", + "MPL-2.0", + "ISC", + "0BSD", + "Unicode-3.0", + "BSL-1.0", + "Zlib", + "CDLA-Permissive-2.0", + "NCSA", ] # The confidence threshold for detecting a license from license text. # The higher the value, the more closely the license text must be to the @@ -132,9 +132,9 @@ confidence-threshold = 0.8 # Allow 1 or more licenses on a per-crate basis, so that particular licenses # aren't accepted for every possible crate as with the normal allow list exceptions = [ - # Each entry is the crate and version constraint, and its specific allow - # list - #{ allow = ["Zlib"], crate = "adler32" }, + # Each entry is the crate and version constraint, and its specific allow + # list + #{ allow = ["Zlib"], crate = "adler32" }, ] # Some crates don't have (easily) machine readable licensing information, @@ -146,8 +146,8 @@ crate = "ring" # The SPDX expression for the license requirements of the crate expression = "MIT AND ISC AND OpenSSL" license-files = [ - # Each entry is a crate relative path, and the (opaque) hash of its contents - { path = "LICENSE", hash = 0xbd0eed23 }, + # Each entry is a crate relative path, and the (opaque) hash of its contents + { path = "LICENSE", hash = 0xbd0eed23 }, ] [licenses.private] @@ -165,7 +165,7 @@ ignore = true # is only published to private registries, and ignore is true, the crate will # not have its license(s) checked registries = [ - #"https://sekretz.com/registry + #"https://sekretz.com/registry ] # This section is considered when running `cargo deny check bans`. @@ -192,16 +192,16 @@ workspace-default-features = "allow" external-default-features = "allow" # List of crates that are allowed. Use with care! allow = [ - #"ansi_term@0.11.0", - #{ crate = "ansi_term@0.11.0", reason = "you can specify a reason it is allowed" }, + #"ansi_term@0.11.0", + #{ crate = "ansi_term@0.11.0", reason = "you can specify a reason it is allowed" }, ] # List of crates to deny deny = [ - #"ansi_term@0.11.0", - #{ crate = "ansi_term@0.11.0", reason = "you can specify a reason it is banned" }, - # Wrapper crates can optionally be specified to allow the crate when it - # is a direct dependency of the otherwise banned crate - #{ crate = "ansi_term@0.11.0", wrappers = ["this-crate-directly-depends-on-ansi_term"] }, + #"ansi_term@0.11.0", + #{ crate = "ansi_term@0.11.0", reason = "you can specify a reason it is banned" }, + # Wrapper crates can optionally be specified to allow the crate when it + # is a direct dependency of the otherwise banned crate + #{ crate = "ansi_term@0.11.0", wrappers = ["this-crate-directly-depends-on-ansi_term"] }, ] # List of features to allow/deny @@ -228,69 +228,68 @@ deny = [ #exact = true skip = [ - "base64", - "bitflags", - "core-foundation", - "core-graphics", - "core-graphics-types", - "derive_more", - "getrandom", - "hashbrown", - "heck", - "hermit-abi", - "indexmap", - "itertools", - "libloading", - "linux-raw-sys", - "nix", - "nu-ansi-term", - "phf", - "phf_codegen", - "phf_generator", - "phf_macros", - "phf_shared", - "proc-macro-crate", - "quick-xml", - "rand", - "rand_chacha", - "rand_core", - "raw-window-handle", - "redox_users", - "regex-automata", - "regex-syntax", - "rustix", - "siphasher", - "socket2", - "syn", - "thiserror", - "thiserror-impl", - "toml", - "toml_edit", - "tower", - "wasi", - "webpki-roots", - "windows-sys", - "windows-targets", - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_gnullvm", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", - "winnow", - "winreg", - #"ansi_term@0.11.0", - #{ crate = "ansi_term@0.11.0", reason = "you can specify a reason why it can't be updated/removed" }, + "base64", + "bitflags", + "core-foundation", + "core-graphics", + "core-graphics-types", + "derive_more", + "getrandom", + "hashbrown", + "heck", + "indexmap", + "itertools", + "libloading", + "linux-raw-sys", + "nix", + "nu-ansi-term", + "phf", + "phf_codegen", + "phf_generator", + "phf_macros", + "phf_shared", + "proc-macro-crate", + "quick-xml", + "rand", + "rand_chacha", + "rand_core", + "raw-window-handle", + "redox_users", + "regex-automata", + "regex-syntax", + "rustix", + "siphasher", + "socket2", + "syn", + "thiserror", + "thiserror-impl", + "toml", + "toml_edit", + "tower", + "wasi", + "webpki-roots", + "windows-sys", + "windows-targets", + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_gnullvm", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", + "winnow", + "winreg", + #"ansi_term@0.11.0", + #{ crate = "ansi_term@0.11.0", reason = "you can specify a reason why it can't be updated/removed" }, ] # Certain crates/versions that will be skipped when doing duplicate detection. # Similarly to `skip` allows you to skip certain crates during duplicate # detection. Unlike skip, it also includes the entire tree of transitive # dependencies starting at the specified crate, up to a certain depth, which is # by default infinite. skip-tree = [ - #"ansi_term@0.11.0", # will be skipped along with _all_ of its direct and transitive dependencies - #{ crate = "ansi_term@0.11.0", depth = 20 }, + #"ansi_term@0.11.0", # will be skipped along with _all_ of its direct and transitive dependencies + #{ crate = "ansi_term@0.11.0", depth = 20 }, ] # This section is considered when running `cargo deny check sources`. diff --git a/rust/gateway/Cargo.toml b/rust/gateway/Cargo.toml index 3bebd082a..d0da8e57a 100644 --- a/rust/gateway/Cargo.toml +++ b/rust/gateway/Cargo.toml @@ -25,7 +25,6 @@ ip-packet = { workspace = true } ip_network = { workspace = true } libc = { workspace = true, features = ["std", "const-extern-fn", "extra_traits"] } moka = { workspace = true, features = ["future"] } -num_cpus = { workspace = true } opentelemetry = { workspace = true, features = ["metrics"] } opentelemetry-otlp = { workspace = true, features = ["metrics", "grpc-tonic"] } opentelemetry-stdout = { workspace = true, features = ["metrics"] } diff --git a/rust/gateway/src/main.rs b/rust/gateway/src/main.rs index 3e0fe6efe..9550f5683 100644 --- a/rust/gateway/src/main.rs +++ b/rust/gateway/src/main.rs @@ -20,9 +20,9 @@ use phoenix_channel::get_user_agent; use futures::{TryFutureExt, future}; use phoenix_channel::PhoenixChannel; use secrecy::Secret; +use std::pin::pin; +use std::process::ExitCode; use std::{collections::BTreeSet, path::Path}; -use std::{fmt, pin::pin}; -use std::{process::ExitCode, str::FromStr}; use std::{sync::Arc, time::Duration}; use tokio::signal::ctrl_c; use tracing_subscriber::layer; @@ -182,7 +182,7 @@ async fn try_main(cli: Cli, telemetry: &mut Telemetry) -> Result<()> { ) .context("Failed to resolve portal URL")?; - let mut tun_device_manager = TunDeviceManager::new(ip_packet::MAX_IP_SIZE, cli.tun_threads.0) + let mut tun_device_manager = TunDeviceManager::new(ip_packet::MAX_IP_SIZE) .context("Failed to create TUN device manager")?; let tun = tun_device_manager .make_tun() @@ -274,10 +274,6 @@ struct Cli { #[arg(short = 'i', long, env = "FIREZONE_ID")] firezone_id: Option, - /// How many threads to use for reading and writing to the TUN device. - #[arg(long, env = "FIREZONE_NUM_TUN_THREADS", default_value_t)] - tun_threads: NumThreads, - /// Where to export metrics to. /// /// This configuration option is private API and has no stability guarantees. @@ -316,33 +312,6 @@ impl Cli { } } -#[derive(Debug, Clone, Copy)] -struct NumThreads(pub usize); - -impl Default for NumThreads { - fn default() -> Self { - if num_cpus::get() < 4 { - return Self(1); - } - - Self(2) - } -} - -impl fmt::Display for NumThreads { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.0) - } -} - -impl FromStr for NumThreads { - type Err = ::Err; - - fn from_str(s: &str) -> std::result::Result { - Ok(Self(s.parse()?)) - } -} - /// An adapter struct around [`Tun`] that validates IPv4, UDP and TCP checksums. struct ValidateChecksumAdapter { inner: Box, @@ -403,6 +372,10 @@ impl Tun for ValidateChecksumAdapter { fn name(&self) -> &str { self.inner.name() } + + fn queue_lengths(&self) -> (usize, usize) { + self.inner.queue_lengths() + } } impl ValidateChecksumAdapter { diff --git a/rust/gui-client/src-tauri/src/service.rs b/rust/gui-client/src-tauri/src/service.rs index dbac551b5..286b0e9da 100644 --- a/rust/gui-client/src-tauri/src/service.rs +++ b/rust/gui-client/src-tauri/src/service.rs @@ -290,7 +290,7 @@ impl<'a> Handler<'a> { .next_client_split() .await .context("Failed to wait for incoming IPC connection from a GUI")?; - let tun_device = TunDeviceManager::new(ip_packet::MAX_IP_SIZE, 1)?; + let tun_device = TunDeviceManager::new(ip_packet::MAX_IP_SIZE)?; let dns_notifier = new_dns_notifier().await?.boxed(); let network_notifier = new_network_notifier().await?.boxed(); diff --git a/rust/headless-client/src/main.rs b/rust/headless-client/src/main.rs index a8dc75cd0..eeb5e7853 100644 --- a/rust/headless-client/src/main.rs +++ b/rust/headless-client/src/main.rs @@ -276,7 +276,7 @@ fn main() -> Result<()> { let mut terminate = signals::Terminate::new()?; let mut hangup = signals::Hangup::new()?; - let mut tun_device = TunDeviceManager::new(ip_packet::MAX_IP_SIZE, 1)?; + let mut tun_device = TunDeviceManager::new(ip_packet::MAX_IP_SIZE)?; let tokio_handle = tokio::runtime::Handle::current(); diff --git a/rust/telemetry/src/otel.rs b/rust/telemetry/src/otel.rs index 953c8b77f..a25c5bdcd 100644 --- a/rust/telemetry/src/otel.rs +++ b/rust/telemetry/src/otel.rs @@ -85,6 +85,18 @@ pub mod attr { KeyValue::new("error.type", value) } + pub fn queue_item_ip_packet() -> KeyValue { + KeyValue::new("queue.item", "ip-packet") + } + + pub fn queue_item_gro_batch() -> KeyValue { + KeyValue::new("queue.item", "udp-gro-batch") + } + + pub fn queue_item_gso_batch() -> KeyValue { + KeyValue::new("queue.item", "udp-gso-batch") + } + #[cfg(test)] mod tests { use super::*; diff --git a/website/src/components/Changelog/Gateway.tsx b/website/src/components/Changelog/Gateway.tsx index 4e0c6d7d7..a06ce1e6c 100644 --- a/website/src/components/Changelog/Gateway.tsx +++ b/website/src/components/Changelog/Gateway.tsx @@ -22,7 +22,13 @@ export default function Gateway() { return ( - + + + Remove the FIREZONE_NUM_TUN_THREADS env variable. The Gateway will now + always default to a single TUN thread. Using multiple threads can + cause packet reordering which hurts TCP throughput performance. + + Fixes an issue where connections would fail to establish in