diff --git a/rust/bin-shared/src/tun_device_manager/linux.rs b/rust/bin-shared/src/tun_device_manager/linux.rs index 0acd12d21..6f9395e4a 100644 --- a/rust/bin-shared/src/tun_device_manager/linux.rs +++ b/rust/bin-shared/src/tun_device_manager/linux.rs @@ -13,7 +13,9 @@ use libc::{ use netlink_packet_route::route::{RouteProtocol, RouteScope}; use netlink_packet_route::rule::RuleAction; use rtnetlink::{new_connection, Error::NetlinkError, Handle, RouteAddRequest, RuleAddRequest}; +use std::os::fd::{FromRawFd, OwnedFd}; use std::path::Path; +use std::sync::Arc; use std::task::{Context, Poll}; use std::{ collections::HashSet, @@ -26,7 +28,6 @@ use std::{ }; use tokio::sync::mpsc; use tun::ioctl; -use tun::unix::TunFd; const TUNSETIFF: libc::c_ulong = 0x4004_54ca; const TUN_DEV_MAJOR: u32 = 10; @@ -308,17 +309,34 @@ impl Tun { let (outbound_tx, outbound_rx) = flume::bounded(1000); // flume is an MPMC channel, therefore perfect for workstealing outbound packets. for n in 0..num_threads { - let fd = open_tun()?; + let fd = Arc::new(open_tun()?); let outbound_rx = outbound_rx.clone().into_stream(); let inbound_tx = inbound_tx.clone(); std::thread::Builder::new() - .name(format!("TUN send/recv {n}/{num_threads}")) - .spawn(move || { - firezone_logging::unwrap_or_warn!( - tun::unix::send_recv_tun(fd, inbound_tx, outbound_rx, read, write), - "Failed to send / recv from TUN device: {}" - ) + .name(format!("TUN send {n}/{num_threads}")) + .spawn({ + let fd = fd.clone(); + + move || { + firezone_logging::unwrap_or_warn!( + tun::unix::tun_send(fd, outbound_rx, write), + "Failed to send to TUN device: {}" + ) + } + }) + .map_err(io::Error::other)?; + std::thread::Builder::new() + .name(format!("TUN recv {n}/{num_threads}")) + .spawn({ + let fd = fd.clone(); + + move || { + firezone_logging::unwrap_or_warn!( + tun::unix::tun_recv(fd, inbound_tx, read), + "Failed to recv from TUN device: {}" + ) + } }) .map_err(io::Error::other)?; } @@ -330,7 +348,7 @@ impl Tun { } } -fn open_tun() -> Result { +fn open_tun() -> Result { let fd = match unsafe { open(TUN_FILE.as_ptr() as _, O_RDWR) } { -1 => return Err(get_last_error()), fd => fd, @@ -347,7 +365,7 @@ fn open_tun() -> Result { set_non_blocking(fd)?; // Safety: We are not closing the FD. - let fd = unsafe { TunFd::new(fd) }; + let fd = unsafe { OwnedFd::from_raw_fd(fd) }; Ok(fd) } diff --git a/rust/connlib/clients/android/src/tun.rs b/rust/connlib/clients/android/src/tun.rs index bde8ba14d..b64dd1e60 100644 --- a/rust/connlib/clients/android/src/tun.rs +++ b/rust/connlib/clients/android/src/tun.rs @@ -1,16 +1,17 @@ use futures::SinkExt as _; use ip_packet::{IpPacket, IpPacketBuf}; +use std::os::fd::{FromRawFd, OwnedFd}; use std::task::{Context, Poll}; use std::{io, os::fd::RawFd}; use tokio::sync::mpsc; use tun::ioctl; -use tun::unix::TunFd; #[derive(Debug)] pub struct Tun { name: String, outbound_tx: flume::r#async::SendSink<'static, IpPacket>, inbound_rx: mpsc::Receiver, + _fd: OwnedFd, } impl tun::Tun for Tun { @@ -52,26 +53,26 @@ impl Tun { pub unsafe fn from_fd(fd: RawFd) -> io::Result { let name = interface_name(fd)?; - // Safety: We are forwarding the safety requirements to the caller. - let fd = unsafe { TunFd::new(fd) }; - let (inbound_tx, inbound_rx) = mpsc::channel(1000); let (outbound_tx, outbound_rx) = flume::bounded(1000); // flume is an MPMC channel, therefore perfect for workstealing outbound packets. // TODO: Test whether we can set `IFF_MULTI_QUEUE` on Android devices. std::thread::Builder::new() - .name("TUN send/recv".to_owned()) - .spawn(|| { + .name("TUN send".to_owned()) + .spawn(move || { firezone_logging::unwrap_or_warn!( - tun::unix::send_recv_tun( - fd, - inbound_tx, - outbound_rx.into_stream(), - read, - write, - ), - "Failed to send / recv from TUN device: {}" + tun::unix::tun_send(fd, outbound_rx.into_stream(), write), + "Failed to send to TUN device: {}" + ) + }) + .map_err(io::Error::other)?; + std::thread::Builder::new() + .name("TUN recv".to_owned()) + .spawn(move || { + firezone_logging::unwrap_or_warn!( + tun::unix::tun_recv(fd, inbound_tx, read), + "Failed to recv from TUN device: {}" ) }) .map_err(io::Error::other)?; @@ -80,6 +81,7 @@ impl Tun { name, outbound_tx: outbound_tx.into_sink(), inbound_rx, + _fd: unsafe { OwnedFd::from_raw_fd(fd) }, // `OwnedFd` will close the fd on drop. }) } } diff --git a/rust/connlib/clients/apple/src/tun.rs b/rust/connlib/clients/apple/src/tun.rs index 225d0ef30..72c3c0e9f 100644 --- a/rust/connlib/clients/apple/src/tun.rs +++ b/rust/connlib/clients/apple/src/tun.rs @@ -25,17 +25,20 @@ impl Tun { let (outbound_tx, outbound_rx) = flume::bounded(1000); // flume is an MPMC channel, therefore perfect for workstealing outbound packets. std::thread::Builder::new() - .name("TUN send/recv".to_owned()) + .name("TUN send".to_owned()) .spawn(move || { firezone_logging::unwrap_or_warn!( - tun::unix::send_recv_tun( - fd, - inbound_tx, - outbound_rx.into_stream(), - read, - write, - ), - "Failed to send / recv from TUN device: {}" + tun::unix::tun_send(fd, outbound_rx.into_stream(), write), + "Failed to send to TUN device: {}" + ) + }) + .map_err(io::Error::other)?; + std::thread::Builder::new() + .name("TUN recv".to_owned()) + .spawn(move || { + firezone_logging::unwrap_or_warn!( + tun::unix::tun_recv(fd, inbound_tx, read), + "Failed to recv from TUN device: {}" ) }) .map_err(io::Error::other)?; diff --git a/rust/tun/src/unix.rs b/rust/tun/src/unix.rs index ef589404e..a93ae9ebb 100644 --- a/rust/tun/src/unix.rs +++ b/rust/tun/src/unix.rs @@ -1,120 +1,94 @@ -use anyhow::{Context as _, Result}; -use futures::future::Either; +use anyhow::{bail, Context as _, Result}; use futures::StreamExt as _; use ip_packet::{IpPacket, IpPacketBuf}; use std::io; -use std::os::fd::{AsRawFd, RawFd}; -use std::pin::pin; +use std::os::fd::AsRawFd; use tokio::io::unix::AsyncFd; use tokio::sync::mpsc; -pub struct TunFd { - inner: RawFd, -} - -impl TunFd { - /// # Safety - /// - /// You must not close this FD yourself. - /// [`TunFd`] will close it for you. - pub unsafe fn new(fd: RawFd) -> Self { - Self { inner: fd } - } -} - -impl AsRawFd for TunFd { - fn as_raw_fd(&self) -> RawFd { - self.inner - } -} - -impl Drop for TunFd { - fn drop(&mut self) { - // Safety: We are the only ones closing the FD. - unsafe { libc::close(self.inner) }; - } -} - -/// Creates a new current-thread [`tokio`] runtime and concurrently reads and writes packets to the given TUN file-descriptor using the provided function pointers for the actual syscall. -/// -/// This function will block until failure and is therefore intended to be called from a new thread. -/// -/// - Every packet received on `outbound_rx` channel will be written to the file descriptor using the `write` syscall. -/// - Every packet read using the `read` syscall will be sent into the `inbound_tx` channel. -/// - Every time we read a packet from `outbound_rx`, we notify `outbound_capacity_waker` about the newly gained capacity. -/// - In case any of the channels close, we exit the task. -/// - IO errors are not fallible. -pub fn send_recv_tun( +pub fn tun_send( fd: T, - inbound_tx: mpsc::Sender, - mut outbound_rx: flume::r#async::RecvStream<'static, IpPacket>, - read: impl Fn(RawFd, &mut IpPacketBuf) -> io::Result, - write: impl Fn(RawFd, &IpPacket) -> io::Result, + mut outbound_rx: flume::r#async::RecvStream<'_, IpPacket>, + write: impl Fn(i32, &IpPacket) -> std::result::Result, ) -> Result<()> where - T: AsRawFd, + T: AsRawFd + Clone, { tokio::runtime::Builder::new_current_thread() .enable_all() .build() .context("Failed to create runtime")? .block_on(async move { - let fd = AsyncFd::new(fd)?; + let fd = AsyncFd::with_interest(fd, tokio::io::Interest::WRITABLE)?; + + while let Some(packet) = outbound_rx.next().await { + if let Err(e) = fd + .async_io(tokio::io::Interest::WRITABLE, |fd| { + write(fd.as_raw_fd(), &packet) + }) + .await + { + tracing::warn!("Failed to write to TUN FD: {e}"); + } + } + + anyhow::Ok(()) + })?; + + anyhow::Ok(()) +} + +pub fn tun_recv( + fd: T, + inbound_tx: mpsc::Sender, + read: impl Fn(i32, &mut IpPacketBuf) -> std::result::Result, +) -> Result<()> +where + T: AsRawFd + Clone, +{ + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .context("Failed to create runtime")? + .block_on(async move { + let fd = AsyncFd::with_interest(fd, tokio::io::Interest::READABLE)?; loop { - let next_inbound_packet = pin!(fd.async_io(tokio::io::Interest::READABLE, |fd| { - let mut ip_packet_buf = IpPacketBuf::new(); + let next_inbound_packet = fd + .async_io(tokio::io::Interest::READABLE, |fd| { + let mut ip_packet_buf = IpPacketBuf::new(); - let len = read(fd.as_raw_fd(), &mut ip_packet_buf)?; + let len = read(fd.as_raw_fd(), &mut ip_packet_buf)?; - if len == 0 { - return Ok(None); - } + if len == 0 { + return Ok(None); + } - let packet = IpPacket::new(ip_packet_buf, len) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?; + let packet = IpPacket::new(ip_packet_buf, len) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?; - Ok(Some(packet)) - })); - let next_outbound_packet = pin!(outbound_rx.next()); + Ok(Some(packet)) + }) + .await; - match futures::future::select(next_outbound_packet, next_inbound_packet).await { - Either::Right((Ok(None), _)) => { - return Err(io::Error::new( - io::ErrorKind::NotConnected, - "TUN file descriptor is closed", - )); - } - Either::Right((Ok(Some(packet)), _)) => { + match next_inbound_packet { + Ok(None) => bail!("TUN file descriptor is closed"), + Ok(Some(packet)) => { if inbound_tx.send(packet).await.is_err() { tracing::debug!("Inbound packet receiver gone, shutting down task"); break; }; } - Either::Right((Err(e), _)) => { + Err(e) => { tracing::warn!("Failed to read from TUN FD: {e}"); continue; } - Either::Left((Some(packet), _)) => { - if let Err(e) = fd - .async_io(tokio::io::Interest::WRITABLE, |fd| { - write(fd.as_raw_fd(), &packet) - }) - .await - { - tracing::warn!("Failed to write to TUN FD: {e}"); - }; - } - Either::Left((None, _)) => { - tracing::debug!("Outbound packet sender gone, shutting down task"); - break; - } } } - Ok(()) + anyhow::Ok(()) })?; - Ok(()) + anyhow::Ok(()) } diff --git a/website/src/components/Changelog/Android.tsx b/website/src/components/Changelog/Android.tsx index 22bc3bd20..c37de52de 100644 --- a/website/src/components/Changelog/Android.tsx +++ b/website/src/components/Changelog/Android.tsx @@ -24,6 +24,9 @@ export default function Android() { Fixes a minor memory leak that could occur after being disconnected unexpectedly. + + Fixes an upload speed performance regression. + diff --git a/website/src/components/Changelog/GUI.tsx b/website/src/components/Changelog/GUI.tsx index 53667e235..bd64ece45 100644 --- a/website/src/components/Changelog/GUI.tsx +++ b/website/src/components/Changelog/GUI.tsx @@ -8,7 +8,13 @@ export default function GUI({ os }: { os: OS }) { return ( {/* When you cut a release, remove any solved issues from the "known issues" lists over in `client-apps`. This must not be done when the issue's PR merges. */} - + + {os === OS.Linux && ( + + Fixes an upload speed performance regression. + + )} + Fixes a visual regression where the Settings and About window lost diff --git a/website/src/components/Changelog/Headless.tsx b/website/src/components/Changelog/Headless.tsx index da12a91ab..22f528a85 100644 --- a/website/src/components/Changelog/Headless.tsx +++ b/website/src/components/Changelog/Headless.tsx @@ -9,7 +9,13 @@ export default function Headless({ os }: { os: OS }) { return ( {/* When you cut a release, remove any solved issues from the "known issues" lists over in `client-apps`. This must not be done when the issue's PR merges. */} - + + {os === OS.Linux && ( + + Fixes an upload speed performance regression. + + )} + Hides the --check and --exit CLI options