From 10ba02e341acd7ee3260334c3a37ab6e4f954736 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 14 Feb 2025 16:32:51 +1100 Subject: [PATCH] fix(connlib): split TUN send & recv into separate threads (#8117) We appear to have caused a pretty big performance regression (~40%) in 037a2e64b6b49f335a32c6fd30a12afe45d967e9 (identified through `git-bisect`). Specifically, the regression appears to have been caused by [`aef411a` (#7605)](https://github.com/firezone/firezone/pull/7605/commits/aef411abf561ba5e6a3d6ee133e6f709b3bd7043). Weirdly enough, undoing just that on top of `main` doesn't fix the regression. My hypothesis is that using the same file descriptor for read AND write interests on the same runtime causes issues because those interests are occasionally cleared (i.e. on false-positive wake-ups). In this PR, we spawn a dedicated thread each for the sending and receiving operations of the TUN device. On unix-based systems, a TUN device is just a file descriptor and can therefore simply be copied and read & written to from different threads. Most importantly, we only construct the `AsyncFd` _within_ the newly spawned thread and runtime because constructing an `AsyncFd` implicitly registers with the runtime active on the current thread. As a nice benefit, this allows us to get rid of a `future::select`. Those are always kind of nasty because they cancel the future that wasn't ready. My original intuition was that we drop packets due to cancelled futures there but that could not be confirmed in experiments. --- .../src/tun_device_manager/linux.rs | 38 +++-- rust/connlib/clients/android/src/tun.rs | 30 ++-- rust/connlib/clients/apple/src/tun.rs | 21 +-- rust/tun/src/unix.rs | 140 +++++++----------- website/src/components/Changelog/Android.tsx | 3 + website/src/components/Changelog/GUI.tsx | 8 +- website/src/components/Changelog/Headless.tsx | 8 +- 7 files changed, 130 insertions(+), 118 deletions(-) diff --git a/rust/bin-shared/src/tun_device_manager/linux.rs b/rust/bin-shared/src/tun_device_manager/linux.rs index 0acd12d21..6f9395e4a 100644 --- a/rust/bin-shared/src/tun_device_manager/linux.rs +++ b/rust/bin-shared/src/tun_device_manager/linux.rs @@ -13,7 +13,9 @@ use libc::{ use netlink_packet_route::route::{RouteProtocol, RouteScope}; use netlink_packet_route::rule::RuleAction; use rtnetlink::{new_connection, Error::NetlinkError, Handle, RouteAddRequest, RuleAddRequest}; +use std::os::fd::{FromRawFd, OwnedFd}; use std::path::Path; +use std::sync::Arc; use std::task::{Context, Poll}; use std::{ collections::HashSet, @@ -26,7 +28,6 @@ use std::{ }; use tokio::sync::mpsc; use tun::ioctl; -use tun::unix::TunFd; const TUNSETIFF: libc::c_ulong = 0x4004_54ca; const TUN_DEV_MAJOR: u32 = 10; @@ -308,17 +309,34 @@ impl Tun { let (outbound_tx, outbound_rx) = flume::bounded(1000); // flume is an MPMC channel, therefore perfect for workstealing outbound packets. for n in 0..num_threads { - let fd = open_tun()?; + let fd = Arc::new(open_tun()?); let outbound_rx = outbound_rx.clone().into_stream(); let inbound_tx = inbound_tx.clone(); std::thread::Builder::new() - .name(format!("TUN send/recv {n}/{num_threads}")) - .spawn(move || { - firezone_logging::unwrap_or_warn!( - tun::unix::send_recv_tun(fd, inbound_tx, outbound_rx, read, write), - "Failed to send / recv from TUN device: {}" - ) + .name(format!("TUN send {n}/{num_threads}")) + .spawn({ + let fd = fd.clone(); + + move || { + firezone_logging::unwrap_or_warn!( + tun::unix::tun_send(fd, outbound_rx, write), + "Failed to send to TUN device: {}" + ) + } + }) + .map_err(io::Error::other)?; + std::thread::Builder::new() + .name(format!("TUN recv {n}/{num_threads}")) + .spawn({ + let fd = fd.clone(); + + move || { + firezone_logging::unwrap_or_warn!( + tun::unix::tun_recv(fd, inbound_tx, read), + "Failed to recv from TUN device: {}" + ) + } }) .map_err(io::Error::other)?; } @@ -330,7 +348,7 @@ impl Tun { } } -fn open_tun() -> Result { +fn open_tun() -> Result { let fd = match unsafe { open(TUN_FILE.as_ptr() as _, O_RDWR) } { -1 => return Err(get_last_error()), fd => fd, @@ -347,7 +365,7 @@ fn open_tun() -> Result { set_non_blocking(fd)?; // Safety: We are not closing the FD. - let fd = unsafe { TunFd::new(fd) }; + let fd = unsafe { OwnedFd::from_raw_fd(fd) }; Ok(fd) } diff --git a/rust/connlib/clients/android/src/tun.rs b/rust/connlib/clients/android/src/tun.rs index bde8ba14d..b64dd1e60 100644 --- a/rust/connlib/clients/android/src/tun.rs +++ b/rust/connlib/clients/android/src/tun.rs @@ -1,16 +1,17 @@ use futures::SinkExt as _; use ip_packet::{IpPacket, IpPacketBuf}; +use std::os::fd::{FromRawFd, OwnedFd}; use std::task::{Context, Poll}; use std::{io, os::fd::RawFd}; use tokio::sync::mpsc; use tun::ioctl; -use tun::unix::TunFd; #[derive(Debug)] pub struct Tun { name: String, outbound_tx: flume::r#async::SendSink<'static, IpPacket>, inbound_rx: mpsc::Receiver, + _fd: OwnedFd, } impl tun::Tun for Tun { @@ -52,26 +53,26 @@ impl Tun { pub unsafe fn from_fd(fd: RawFd) -> io::Result { let name = interface_name(fd)?; - // Safety: We are forwarding the safety requirements to the caller. - let fd = unsafe { TunFd::new(fd) }; - let (inbound_tx, inbound_rx) = mpsc::channel(1000); let (outbound_tx, outbound_rx) = flume::bounded(1000); // flume is an MPMC channel, therefore perfect for workstealing outbound packets. // TODO: Test whether we can set `IFF_MULTI_QUEUE` on Android devices. std::thread::Builder::new() - .name("TUN send/recv".to_owned()) - .spawn(|| { + .name("TUN send".to_owned()) + .spawn(move || { firezone_logging::unwrap_or_warn!( - tun::unix::send_recv_tun( - fd, - inbound_tx, - outbound_rx.into_stream(), - read, - write, - ), - "Failed to send / recv from TUN device: {}" + tun::unix::tun_send(fd, outbound_rx.into_stream(), write), + "Failed to send to TUN device: {}" + ) + }) + .map_err(io::Error::other)?; + std::thread::Builder::new() + .name("TUN recv".to_owned()) + .spawn(move || { + firezone_logging::unwrap_or_warn!( + tun::unix::tun_recv(fd, inbound_tx, read), + "Failed to recv from TUN device: {}" ) }) .map_err(io::Error::other)?; @@ -80,6 +81,7 @@ impl Tun { name, outbound_tx: outbound_tx.into_sink(), inbound_rx, + _fd: unsafe { OwnedFd::from_raw_fd(fd) }, // `OwnedFd` will close the fd on drop. }) } } diff --git a/rust/connlib/clients/apple/src/tun.rs b/rust/connlib/clients/apple/src/tun.rs index 225d0ef30..72c3c0e9f 100644 --- a/rust/connlib/clients/apple/src/tun.rs +++ b/rust/connlib/clients/apple/src/tun.rs @@ -25,17 +25,20 @@ impl Tun { let (outbound_tx, outbound_rx) = flume::bounded(1000); // flume is an MPMC channel, therefore perfect for workstealing outbound packets. std::thread::Builder::new() - .name("TUN send/recv".to_owned()) + .name("TUN send".to_owned()) .spawn(move || { firezone_logging::unwrap_or_warn!( - tun::unix::send_recv_tun( - fd, - inbound_tx, - outbound_rx.into_stream(), - read, - write, - ), - "Failed to send / recv from TUN device: {}" + tun::unix::tun_send(fd, outbound_rx.into_stream(), write), + "Failed to send to TUN device: {}" + ) + }) + .map_err(io::Error::other)?; + std::thread::Builder::new() + .name("TUN recv".to_owned()) + .spawn(move || { + firezone_logging::unwrap_or_warn!( + tun::unix::tun_recv(fd, inbound_tx, read), + "Failed to recv from TUN device: {}" ) }) .map_err(io::Error::other)?; diff --git a/rust/tun/src/unix.rs b/rust/tun/src/unix.rs index ef589404e..a93ae9ebb 100644 --- a/rust/tun/src/unix.rs +++ b/rust/tun/src/unix.rs @@ -1,120 +1,94 @@ -use anyhow::{Context as _, Result}; -use futures::future::Either; +use anyhow::{bail, Context as _, Result}; use futures::StreamExt as _; use ip_packet::{IpPacket, IpPacketBuf}; use std::io; -use std::os::fd::{AsRawFd, RawFd}; -use std::pin::pin; +use std::os::fd::AsRawFd; use tokio::io::unix::AsyncFd; use tokio::sync::mpsc; -pub struct TunFd { - inner: RawFd, -} - -impl TunFd { - /// # Safety - /// - /// You must not close this FD yourself. - /// [`TunFd`] will close it for you. - pub unsafe fn new(fd: RawFd) -> Self { - Self { inner: fd } - } -} - -impl AsRawFd for TunFd { - fn as_raw_fd(&self) -> RawFd { - self.inner - } -} - -impl Drop for TunFd { - fn drop(&mut self) { - // Safety: We are the only ones closing the FD. - unsafe { libc::close(self.inner) }; - } -} - -/// Creates a new current-thread [`tokio`] runtime and concurrently reads and writes packets to the given TUN file-descriptor using the provided function pointers for the actual syscall. -/// -/// This function will block until failure and is therefore intended to be called from a new thread. -/// -/// - Every packet received on `outbound_rx` channel will be written to the file descriptor using the `write` syscall. -/// - Every packet read using the `read` syscall will be sent into the `inbound_tx` channel. -/// - Every time we read a packet from `outbound_rx`, we notify `outbound_capacity_waker` about the newly gained capacity. -/// - In case any of the channels close, we exit the task. -/// - IO errors are not fallible. -pub fn send_recv_tun( +pub fn tun_send( fd: T, - inbound_tx: mpsc::Sender, - mut outbound_rx: flume::r#async::RecvStream<'static, IpPacket>, - read: impl Fn(RawFd, &mut IpPacketBuf) -> io::Result, - write: impl Fn(RawFd, &IpPacket) -> io::Result, + mut outbound_rx: flume::r#async::RecvStream<'_, IpPacket>, + write: impl Fn(i32, &IpPacket) -> std::result::Result, ) -> Result<()> where - T: AsRawFd, + T: AsRawFd + Clone, { tokio::runtime::Builder::new_current_thread() .enable_all() .build() .context("Failed to create runtime")? .block_on(async move { - let fd = AsyncFd::new(fd)?; + let fd = AsyncFd::with_interest(fd, tokio::io::Interest::WRITABLE)?; + + while let Some(packet) = outbound_rx.next().await { + if let Err(e) = fd + .async_io(tokio::io::Interest::WRITABLE, |fd| { + write(fd.as_raw_fd(), &packet) + }) + .await + { + tracing::warn!("Failed to write to TUN FD: {e}"); + } + } + + anyhow::Ok(()) + })?; + + anyhow::Ok(()) +} + +pub fn tun_recv( + fd: T, + inbound_tx: mpsc::Sender, + read: impl Fn(i32, &mut IpPacketBuf) -> std::result::Result, +) -> Result<()> +where + T: AsRawFd + Clone, +{ + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .context("Failed to create runtime")? + .block_on(async move { + let fd = AsyncFd::with_interest(fd, tokio::io::Interest::READABLE)?; loop { - let next_inbound_packet = pin!(fd.async_io(tokio::io::Interest::READABLE, |fd| { - let mut ip_packet_buf = IpPacketBuf::new(); + let next_inbound_packet = fd + .async_io(tokio::io::Interest::READABLE, |fd| { + let mut ip_packet_buf = IpPacketBuf::new(); - let len = read(fd.as_raw_fd(), &mut ip_packet_buf)?; + let len = read(fd.as_raw_fd(), &mut ip_packet_buf)?; - if len == 0 { - return Ok(None); - } + if len == 0 { + return Ok(None); + } - let packet = IpPacket::new(ip_packet_buf, len) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?; + let packet = IpPacket::new(ip_packet_buf, len) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?; - Ok(Some(packet)) - })); - let next_outbound_packet = pin!(outbound_rx.next()); + Ok(Some(packet)) + }) + .await; - match futures::future::select(next_outbound_packet, next_inbound_packet).await { - Either::Right((Ok(None), _)) => { - return Err(io::Error::new( - io::ErrorKind::NotConnected, - "TUN file descriptor is closed", - )); - } - Either::Right((Ok(Some(packet)), _)) => { + match next_inbound_packet { + Ok(None) => bail!("TUN file descriptor is closed"), + Ok(Some(packet)) => { if inbound_tx.send(packet).await.is_err() { tracing::debug!("Inbound packet receiver gone, shutting down task"); break; }; } - Either::Right((Err(e), _)) => { + Err(e) => { tracing::warn!("Failed to read from TUN FD: {e}"); continue; } - Either::Left((Some(packet), _)) => { - if let Err(e) = fd - .async_io(tokio::io::Interest::WRITABLE, |fd| { - write(fd.as_raw_fd(), &packet) - }) - .await - { - tracing::warn!("Failed to write to TUN FD: {e}"); - }; - } - Either::Left((None, _)) => { - tracing::debug!("Outbound packet sender gone, shutting down task"); - break; - } } } - Ok(()) + anyhow::Ok(()) })?; - Ok(()) + anyhow::Ok(()) } diff --git a/website/src/components/Changelog/Android.tsx b/website/src/components/Changelog/Android.tsx index 22bc3bd20..c37de52de 100644 --- a/website/src/components/Changelog/Android.tsx +++ b/website/src/components/Changelog/Android.tsx @@ -24,6 +24,9 @@ export default function Android() { Fixes a minor memory leak that could occur after being disconnected unexpectedly. + + Fixes an upload speed performance regression. + diff --git a/website/src/components/Changelog/GUI.tsx b/website/src/components/Changelog/GUI.tsx index 53667e235..bd64ece45 100644 --- a/website/src/components/Changelog/GUI.tsx +++ b/website/src/components/Changelog/GUI.tsx @@ -8,7 +8,13 @@ export default function GUI({ os }: { os: OS }) { return ( {/* When you cut a release, remove any solved issues from the "known issues" lists over in `client-apps`. This must not be done when the issue's PR merges. */} - + + {os === OS.Linux && ( + + Fixes an upload speed performance regression. + + )} + Fixes a visual regression where the Settings and About window lost diff --git a/website/src/components/Changelog/Headless.tsx b/website/src/components/Changelog/Headless.tsx index da12a91ab..22f528a85 100644 --- a/website/src/components/Changelog/Headless.tsx +++ b/website/src/components/Changelog/Headless.tsx @@ -9,7 +9,13 @@ export default function Headless({ os }: { os: OS }) { return ( {/* When you cut a release, remove any solved issues from the "known issues" lists over in `client-apps`. This must not be done when the issue's PR merges. */} - + + {os === OS.Linux && ( + + Fixes an upload speed performance regression. + + )} + Hides the --check and --exit CLI options