mirror of
https://github.com/outbackdingo/firezone.git
synced 2026-01-27 10:18:54 +00:00
fix(connlib): split TUN send & recv into separate threads (#8117)
We appear to have caused a pretty big performance regression (~40%) in037a2e64b6(identified through `git-bisect`). Specifically, the regression appears to have been caused by [`aef411a` (#7605)](aef411abf5). Weirdly enough, undoing just that on top of `main` doesn't fix the regression. My hypothesis is that using the same file descriptor for read AND write interests on the same runtime causes issues because those interests are occasionally cleared (i.e. on false-positive wake-ups). In this PR, we spawn a dedicated thread each for the sending and receiving operations of the TUN device. On unix-based systems, a TUN device is just a file descriptor and can therefore simply be copied and read & written to from different threads. Most importantly, we only construct the `AsyncFd` _within_ the newly spawned thread and runtime because constructing an `AsyncFd` implicitly registers with the runtime active on the current thread. As a nice benefit, this allows us to get rid of a `future::select`. Those are always kind of nasty because they cancel the future that wasn't ready. My original intuition was that we drop packets due to cancelled futures there but that could not be confirmed in experiments.
This commit is contained in:
@@ -13,7 +13,9 @@ use libc::{
|
||||
use netlink_packet_route::route::{RouteProtocol, RouteScope};
|
||||
use netlink_packet_route::rule::RuleAction;
|
||||
use rtnetlink::{new_connection, Error::NetlinkError, Handle, RouteAddRequest, RuleAddRequest};
|
||||
use std::os::fd::{FromRawFd, OwnedFd};
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
@@ -26,7 +28,6 @@ use std::{
|
||||
};
|
||||
use tokio::sync::mpsc;
|
||||
use tun::ioctl;
|
||||
use tun::unix::TunFd;
|
||||
|
||||
const TUNSETIFF: libc::c_ulong = 0x4004_54ca;
|
||||
const TUN_DEV_MAJOR: u32 = 10;
|
||||
@@ -308,17 +309,34 @@ impl Tun {
|
||||
let (outbound_tx, outbound_rx) = flume::bounded(1000); // flume is an MPMC channel, therefore perfect for workstealing outbound packets.
|
||||
|
||||
for n in 0..num_threads {
|
||||
let fd = open_tun()?;
|
||||
let fd = Arc::new(open_tun()?);
|
||||
let outbound_rx = outbound_rx.clone().into_stream();
|
||||
let inbound_tx = inbound_tx.clone();
|
||||
|
||||
std::thread::Builder::new()
|
||||
.name(format!("TUN send/recv {n}/{num_threads}"))
|
||||
.spawn(move || {
|
||||
firezone_logging::unwrap_or_warn!(
|
||||
tun::unix::send_recv_tun(fd, inbound_tx, outbound_rx, read, write),
|
||||
"Failed to send / recv from TUN device: {}"
|
||||
)
|
||||
.name(format!("TUN send {n}/{num_threads}"))
|
||||
.spawn({
|
||||
let fd = fd.clone();
|
||||
|
||||
move || {
|
||||
firezone_logging::unwrap_or_warn!(
|
||||
tun::unix::tun_send(fd, outbound_rx, write),
|
||||
"Failed to send to TUN device: {}"
|
||||
)
|
||||
}
|
||||
})
|
||||
.map_err(io::Error::other)?;
|
||||
std::thread::Builder::new()
|
||||
.name(format!("TUN recv {n}/{num_threads}"))
|
||||
.spawn({
|
||||
let fd = fd.clone();
|
||||
|
||||
move || {
|
||||
firezone_logging::unwrap_or_warn!(
|
||||
tun::unix::tun_recv(fd, inbound_tx, read),
|
||||
"Failed to recv from TUN device: {}"
|
||||
)
|
||||
}
|
||||
})
|
||||
.map_err(io::Error::other)?;
|
||||
}
|
||||
@@ -330,7 +348,7 @@ impl Tun {
|
||||
}
|
||||
}
|
||||
|
||||
fn open_tun() -> Result<TunFd, io::Error> {
|
||||
fn open_tun() -> Result<OwnedFd, io::Error> {
|
||||
let fd = match unsafe { open(TUN_FILE.as_ptr() as _, O_RDWR) } {
|
||||
-1 => return Err(get_last_error()),
|
||||
fd => fd,
|
||||
@@ -347,7 +365,7 @@ fn open_tun() -> Result<TunFd, io::Error> {
|
||||
set_non_blocking(fd)?;
|
||||
|
||||
// Safety: We are not closing the FD.
|
||||
let fd = unsafe { TunFd::new(fd) };
|
||||
let fd = unsafe { OwnedFd::from_raw_fd(fd) };
|
||||
|
||||
Ok(fd)
|
||||
}
|
||||
|
||||
@@ -1,16 +1,17 @@
|
||||
use futures::SinkExt as _;
|
||||
use ip_packet::{IpPacket, IpPacketBuf};
|
||||
use std::os::fd::{FromRawFd, OwnedFd};
|
||||
use std::task::{Context, Poll};
|
||||
use std::{io, os::fd::RawFd};
|
||||
use tokio::sync::mpsc;
|
||||
use tun::ioctl;
|
||||
use tun::unix::TunFd;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Tun {
|
||||
name: String,
|
||||
outbound_tx: flume::r#async::SendSink<'static, IpPacket>,
|
||||
inbound_rx: mpsc::Receiver<IpPacket>,
|
||||
_fd: OwnedFd,
|
||||
}
|
||||
|
||||
impl tun::Tun for Tun {
|
||||
@@ -52,26 +53,26 @@ impl Tun {
|
||||
pub unsafe fn from_fd(fd: RawFd) -> io::Result<Self> {
|
||||
let name = interface_name(fd)?;
|
||||
|
||||
// Safety: We are forwarding the safety requirements to the caller.
|
||||
let fd = unsafe { TunFd::new(fd) };
|
||||
|
||||
let (inbound_tx, inbound_rx) = mpsc::channel(1000);
|
||||
let (outbound_tx, outbound_rx) = flume::bounded(1000); // flume is an MPMC channel, therefore perfect for workstealing outbound packets.
|
||||
|
||||
// TODO: Test whether we can set `IFF_MULTI_QUEUE` on Android devices.
|
||||
|
||||
std::thread::Builder::new()
|
||||
.name("TUN send/recv".to_owned())
|
||||
.spawn(|| {
|
||||
.name("TUN send".to_owned())
|
||||
.spawn(move || {
|
||||
firezone_logging::unwrap_or_warn!(
|
||||
tun::unix::send_recv_tun(
|
||||
fd,
|
||||
inbound_tx,
|
||||
outbound_rx.into_stream(),
|
||||
read,
|
||||
write,
|
||||
),
|
||||
"Failed to send / recv from TUN device: {}"
|
||||
tun::unix::tun_send(fd, outbound_rx.into_stream(), write),
|
||||
"Failed to send to TUN device: {}"
|
||||
)
|
||||
})
|
||||
.map_err(io::Error::other)?;
|
||||
std::thread::Builder::new()
|
||||
.name("TUN recv".to_owned())
|
||||
.spawn(move || {
|
||||
firezone_logging::unwrap_or_warn!(
|
||||
tun::unix::tun_recv(fd, inbound_tx, read),
|
||||
"Failed to recv from TUN device: {}"
|
||||
)
|
||||
})
|
||||
.map_err(io::Error::other)?;
|
||||
@@ -80,6 +81,7 @@ impl Tun {
|
||||
name,
|
||||
outbound_tx: outbound_tx.into_sink(),
|
||||
inbound_rx,
|
||||
_fd: unsafe { OwnedFd::from_raw_fd(fd) }, // `OwnedFd` will close the fd on drop.
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,17 +25,20 @@ impl Tun {
|
||||
let (outbound_tx, outbound_rx) = flume::bounded(1000); // flume is an MPMC channel, therefore perfect for workstealing outbound packets.
|
||||
|
||||
std::thread::Builder::new()
|
||||
.name("TUN send/recv".to_owned())
|
||||
.name("TUN send".to_owned())
|
||||
.spawn(move || {
|
||||
firezone_logging::unwrap_or_warn!(
|
||||
tun::unix::send_recv_tun(
|
||||
fd,
|
||||
inbound_tx,
|
||||
outbound_rx.into_stream(),
|
||||
read,
|
||||
write,
|
||||
),
|
||||
"Failed to send / recv from TUN device: {}"
|
||||
tun::unix::tun_send(fd, outbound_rx.into_stream(), write),
|
||||
"Failed to send to TUN device: {}"
|
||||
)
|
||||
})
|
||||
.map_err(io::Error::other)?;
|
||||
std::thread::Builder::new()
|
||||
.name("TUN recv".to_owned())
|
||||
.spawn(move || {
|
||||
firezone_logging::unwrap_or_warn!(
|
||||
tun::unix::tun_recv(fd, inbound_tx, read),
|
||||
"Failed to recv from TUN device: {}"
|
||||
)
|
||||
})
|
||||
.map_err(io::Error::other)?;
|
||||
|
||||
@@ -1,120 +1,94 @@
|
||||
use anyhow::{Context as _, Result};
|
||||
use futures::future::Either;
|
||||
use anyhow::{bail, Context as _, Result};
|
||||
use futures::StreamExt as _;
|
||||
use ip_packet::{IpPacket, IpPacketBuf};
|
||||
use std::io;
|
||||
use std::os::fd::{AsRawFd, RawFd};
|
||||
use std::pin::pin;
|
||||
use std::os::fd::AsRawFd;
|
||||
use tokio::io::unix::AsyncFd;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
pub struct TunFd {
|
||||
inner: RawFd,
|
||||
}
|
||||
|
||||
impl TunFd {
|
||||
/// # Safety
|
||||
///
|
||||
/// You must not close this FD yourself.
|
||||
/// [`TunFd`] will close it for you.
|
||||
pub unsafe fn new(fd: RawFd) -> Self {
|
||||
Self { inner: fd }
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRawFd for TunFd {
|
||||
fn as_raw_fd(&self) -> RawFd {
|
||||
self.inner
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TunFd {
|
||||
fn drop(&mut self) {
|
||||
// Safety: We are the only ones closing the FD.
|
||||
unsafe { libc::close(self.inner) };
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new current-thread [`tokio`] runtime and concurrently reads and writes packets to the given TUN file-descriptor using the provided function pointers for the actual syscall.
|
||||
///
|
||||
/// This function will block until failure and is therefore intended to be called from a new thread.
|
||||
///
|
||||
/// - Every packet received on `outbound_rx` channel will be written to the file descriptor using the `write` syscall.
|
||||
/// - Every packet read using the `read` syscall will be sent into the `inbound_tx` channel.
|
||||
/// - Every time we read a packet from `outbound_rx`, we notify `outbound_capacity_waker` about the newly gained capacity.
|
||||
/// - In case any of the channels close, we exit the task.
|
||||
/// - IO errors are not fallible.
|
||||
pub fn send_recv_tun<T>(
|
||||
pub fn tun_send<T>(
|
||||
fd: T,
|
||||
inbound_tx: mpsc::Sender<IpPacket>,
|
||||
mut outbound_rx: flume::r#async::RecvStream<'static, IpPacket>,
|
||||
read: impl Fn(RawFd, &mut IpPacketBuf) -> io::Result<usize>,
|
||||
write: impl Fn(RawFd, &IpPacket) -> io::Result<usize>,
|
||||
mut outbound_rx: flume::r#async::RecvStream<'_, IpPacket>,
|
||||
write: impl Fn(i32, &IpPacket) -> std::result::Result<usize, io::Error>,
|
||||
) -> Result<()>
|
||||
where
|
||||
T: AsRawFd,
|
||||
T: AsRawFd + Clone,
|
||||
{
|
||||
tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.context("Failed to create runtime")?
|
||||
.block_on(async move {
|
||||
let fd = AsyncFd::new(fd)?;
|
||||
let fd = AsyncFd::with_interest(fd, tokio::io::Interest::WRITABLE)?;
|
||||
|
||||
while let Some(packet) = outbound_rx.next().await {
|
||||
if let Err(e) = fd
|
||||
.async_io(tokio::io::Interest::WRITABLE, |fd| {
|
||||
write(fd.as_raw_fd(), &packet)
|
||||
})
|
||||
.await
|
||||
{
|
||||
tracing::warn!("Failed to write to TUN FD: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
anyhow::Ok(())
|
||||
})?;
|
||||
|
||||
anyhow::Ok(())
|
||||
}
|
||||
|
||||
pub fn tun_recv<T>(
|
||||
fd: T,
|
||||
inbound_tx: mpsc::Sender<IpPacket>,
|
||||
read: impl Fn(i32, &mut IpPacketBuf) -> std::result::Result<usize, io::Error>,
|
||||
) -> Result<()>
|
||||
where
|
||||
T: AsRawFd + Clone,
|
||||
{
|
||||
tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.context("Failed to create runtime")?
|
||||
.block_on(async move {
|
||||
let fd = AsyncFd::with_interest(fd, tokio::io::Interest::READABLE)?;
|
||||
|
||||
loop {
|
||||
let next_inbound_packet = pin!(fd.async_io(tokio::io::Interest::READABLE, |fd| {
|
||||
let mut ip_packet_buf = IpPacketBuf::new();
|
||||
let next_inbound_packet = fd
|
||||
.async_io(tokio::io::Interest::READABLE, |fd| {
|
||||
let mut ip_packet_buf = IpPacketBuf::new();
|
||||
|
||||
let len = read(fd.as_raw_fd(), &mut ip_packet_buf)?;
|
||||
let len = read(fd.as_raw_fd(), &mut ip_packet_buf)?;
|
||||
|
||||
if len == 0 {
|
||||
return Ok(None);
|
||||
}
|
||||
if len == 0 {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let packet = IpPacket::new(ip_packet_buf, len)
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
|
||||
let packet = IpPacket::new(ip_packet_buf, len)
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
|
||||
|
||||
Ok(Some(packet))
|
||||
}));
|
||||
let next_outbound_packet = pin!(outbound_rx.next());
|
||||
Ok(Some(packet))
|
||||
})
|
||||
.await;
|
||||
|
||||
match futures::future::select(next_outbound_packet, next_inbound_packet).await {
|
||||
Either::Right((Ok(None), _)) => {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::NotConnected,
|
||||
"TUN file descriptor is closed",
|
||||
));
|
||||
}
|
||||
Either::Right((Ok(Some(packet)), _)) => {
|
||||
match next_inbound_packet {
|
||||
Ok(None) => bail!("TUN file descriptor is closed"),
|
||||
Ok(Some(packet)) => {
|
||||
if inbound_tx.send(packet).await.is_err() {
|
||||
tracing::debug!("Inbound packet receiver gone, shutting down task");
|
||||
|
||||
break;
|
||||
};
|
||||
}
|
||||
Either::Right((Err(e), _)) => {
|
||||
Err(e) => {
|
||||
tracing::warn!("Failed to read from TUN FD: {e}");
|
||||
continue;
|
||||
}
|
||||
Either::Left((Some(packet), _)) => {
|
||||
if let Err(e) = fd
|
||||
.async_io(tokio::io::Interest::WRITABLE, |fd| {
|
||||
write(fd.as_raw_fd(), &packet)
|
||||
})
|
||||
.await
|
||||
{
|
||||
tracing::warn!("Failed to write to TUN FD: {e}");
|
||||
};
|
||||
}
|
||||
Either::Left((None, _)) => {
|
||||
tracing::debug!("Outbound packet sender gone, shutting down task");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
anyhow::Ok(())
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
anyhow::Ok(())
|
||||
}
|
||||
|
||||
@@ -24,6 +24,9 @@ export default function Android() {
|
||||
Fixes a minor memory leak that could occur after being disconnected
|
||||
unexpectedly.
|
||||
</ChangeItem>
|
||||
<ChangeItem pull="8117">
|
||||
Fixes an upload speed performance regression.
|
||||
</ChangeItem>
|
||||
</Unreleased>
|
||||
<Entry version="1.4.1" date={new Date("2025-01-28")}>
|
||||
<ChangeItem pull="7891">
|
||||
|
||||
@@ -8,7 +8,13 @@ export default function GUI({ os }: { os: OS }) {
|
||||
return (
|
||||
<Entries downloadLinks={downloadLinks(os)} title={title(os)}>
|
||||
{/* When you cut a release, remove any solved issues from the "known issues" lists over in `client-apps`. This must not be done when the issue's PR merges. */}
|
||||
<Unreleased></Unreleased>
|
||||
<Unreleased>
|
||||
{os === OS.Linux && (
|
||||
<ChangeItem pull="8117">
|
||||
Fixes an upload speed performance regression.
|
||||
</ChangeItem>
|
||||
)}
|
||||
</Unreleased>
|
||||
<Entry version="1.4.5" date={new Date("2025-02-12")}>
|
||||
<ChangeItem pull="8105">
|
||||
Fixes a visual regression where the Settings and About window lost
|
||||
|
||||
@@ -9,7 +9,13 @@ export default function Headless({ os }: { os: OS }) {
|
||||
return (
|
||||
<Entries downloadLinks={downloadLinks(os)} title={title(os)}>
|
||||
{/* When you cut a release, remove any solved issues from the "known issues" lists over in `client-apps`. This must not be done when the issue's PR merges. */}
|
||||
<Unreleased></Unreleased>
|
||||
<Unreleased>
|
||||
{os === OS.Linux && (
|
||||
<ChangeItem pull="8117">
|
||||
Fixes an upload speed performance regression.
|
||||
</ChangeItem>
|
||||
)}
|
||||
</Unreleased>
|
||||
<Entry version="1.4.3" date={new Date("2025-02-11")}>
|
||||
<ChangeItem pull="8055">
|
||||
Hides the <code>--check</code> and <code>--exit</code> CLI options
|
||||
|
||||
Reference in New Issue
Block a user