diff --git a/rust/Cargo.lock b/rust/Cargo.lock index b78be7834..aea92aab6 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -328,7 +328,6 @@ dependencies = [ "dns-types", "firezone-logging", "firezone-telemetry", - "flume", "futures", "ip-packet", "ip_network", @@ -342,6 +341,7 @@ dependencies = [ "swift-bridge", "swift-bridge-build", "tokio", + "tokio-util", "tracing", "tracing-appender", "tracing-subscriber", @@ -1327,6 +1327,7 @@ dependencies = [ "socket-factory", "thiserror 2.0.15", "tokio", + "tokio-util", "tracing", "tracing-appender", "tracing-subscriber", @@ -2340,7 +2341,6 @@ dependencies = [ "dirs", "dns-types", "firezone-logging", - "flume", "futures", "gat-lending-iterator", "hex", @@ -2403,7 +2403,6 @@ dependencies = [ "libc", "moka", "nix 0.30.1", - "num_cpus", "opentelemetry", "opentelemetry-otlp", "opentelemetry-stdout", @@ -3336,12 +3335,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fbf6a919d6cf397374f7dfeeea91d974c7c0a7221d0d0f4f20d859d329e53fcc" -[[package]] -name = "hermit-abi" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f154ce46856750ed433c8649605bf7ed2de3bc35fd9d2a9f30cddd873c80cb08" - [[package]] name = "hex" version = "0.4.3" @@ -4722,16 +4715,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "num_cpus" -version = "1.17.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b" -dependencies = [ - "hermit-abi 0.5.1", - "libc", -] - [[package]] name = "num_enum" version = "0.7.3" @@ -5473,7 +5456,7 @@ checksum = "a604568c3202727d1507653cb121dbd627a58684eb09a820fd746bee38b4442f" dependencies = [ "cfg-if", "concurrent-queue", - "hermit-abi 0.4.0", + "hermit-abi", "pin-project-lite", "rustix 0.38.44", "tracing", @@ -8392,8 +8375,6 @@ name = "tun" version = "0.1.0" dependencies = [ "anyhow", - "flume", - "futures", "ip-packet", "libc", "tokio", diff --git a/rust/apple-client-ffi/Cargo.toml b/rust/apple-client-ffi/Cargo.toml index c31f97d41..b34ec8d4d 100644 --- a/rust/apple-client-ffi/Cargo.toml +++ b/rust/apple-client-ffi/Cargo.toml @@ -18,7 +18,6 @@ connlib-model = { workspace = true } dns-types = { workspace = true } firezone-logging = { workspace = true } firezone-telemetry = { workspace = true } -flume = { workspace = true } futures = { workspace = true } ip-packet = { workspace = true } ip_network = { workspace = true } @@ -30,6 +29,7 @@ serde_json = { workspace = true } socket-factory = { workspace = true } swift-bridge = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "sync"] } +tokio-util = { workspace = true } tracing = { workspace = true } tracing-appender = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/rust/apple-client-ffi/src/tun.rs b/rust/apple-client-ffi/src/tun.rs index 2969fe40c..0f18fe647 100644 --- a/rust/apple-client-ffi/src/tun.rs +++ b/rust/apple-client-ffi/src/tun.rs @@ -7,11 +7,14 @@ use std::{ os::fd::{AsRawFd as _, RawFd}, }; use tokio::sync::mpsc; +use tokio_util::sync::PollSender; + +const QUEUE_SIZE: usize = 10_000; #[derive(Debug)] pub struct Tun { name: String, - outbound_tx: flume::r#async::SendSink<'static, IpPacket>, + outbound_tx: PollSender, inbound_rx: mpsc::Receiver, } @@ -21,14 +24,14 @@ impl Tun { set_non_blocking(fd)?; let name = name(fd)?; - let (inbound_tx, inbound_rx) = mpsc::channel(1000); - let (outbound_tx, outbound_rx) = flume::bounded(1000); // flume is an MPMC channel, therefore perfect for workstealing outbound packets. + let (inbound_tx, inbound_rx) = mpsc::channel(QUEUE_SIZE); + let (outbound_tx, outbound_rx) = mpsc::channel(QUEUE_SIZE); std::thread::Builder::new() .name("TUN send".to_owned()) .spawn(move || { firezone_logging::unwrap_or_warn!( - tun::unix::tun_send(fd, outbound_rx.into_stream(), write), + tun::unix::tun_send(fd, outbound_rx, write), "Failed to send to TUN device: {}" ) }) @@ -45,7 +48,7 @@ impl Tun { Ok(Tun { name, - outbound_tx: outbound_tx.into_sink(), + outbound_tx: PollSender::new(outbound_tx), inbound_rx, }) } @@ -78,6 +81,16 @@ impl tun::Tun for Tun { ) -> Poll { self.inbound_rx.poll_recv_many(cx, buf, max) } + + fn queue_lengths(&self) -> (usize, usize) { + ( + self.inbound_rx.len(), + self.outbound_tx + .get_ref() + .map(|s| QUEUE_SIZE - s.capacity()) + .unwrap_or_default(), + ) + } } fn get_last_error() -> io::Error { diff --git a/rust/bin-shared/Cargo.toml b/rust/bin-shared/Cargo.toml index 8f4227ca3..9d7b2b979 100644 --- a/rust/bin-shared/Cargo.toml +++ b/rust/bin-shared/Cargo.toml @@ -33,13 +33,13 @@ uuid = { workspace = true, features = ["v4"] } [target.'cfg(target_os = "linux")'.dependencies] atomicwrites = { workspace = true } dirs = { workspace = true } -flume = { workspace = true } libc = { workspace = true } netlink-packet-core = { workspace = true } netlink-packet-route = { workspace = true } nix = { workspace = true, features = ["socket"] } resolv-conf = { workspace = true } rtnetlink = { workspace = true } +tokio-util = { workspace = true } zbus = { workspace = true } # Can't use `zbus`'s `tokio` feature here, or it will break toast popups all the way over in `gui-client`. [target.'cfg(windows)'.dependencies] diff --git a/rust/bin-shared/src/tun_device_manager/linux.rs b/rust/bin-shared/src/tun_device_manager/linux.rs index f35ff0a86..5a430a35a 100644 --- a/rust/bin-shared/src/tun_device_manager/linux.rs +++ b/rust/bin-shared/src/tun_device_manager/linux.rs @@ -10,6 +10,7 @@ use libc::{ EEXIST, ENOENT, ESRCH, F_GETFL, F_SETFL, O_NONBLOCK, O_RDWR, S_IFCHR, fcntl, makedev, mknod, open, }; +use netlink_packet_route::link::LinkAttribute; use netlink_packet_route::route::{RouteMessage, RouteProtocol, RouteScope}; use netlink_packet_route::rule::RuleAction; use rtnetlink::{Error::NetlinkError, Handle, RuleAddRequest, new_connection}; @@ -28,6 +29,7 @@ use std::{ os::{fd::RawFd, unix::fs::PermissionsExt}, }; use tokio::sync::mpsc; +use tokio_util::sync::PollSender; use tun::ioctl; const TUNSETIFF: libc::c_ulong = 0x4004_54ca; @@ -41,7 +43,6 @@ const FIREZONE_TABLE: u32 = 0x2021_fd00; /// For lack of a better name pub struct TunDeviceManager { mtu: u32, - num_threads: usize, connection: Connection, routes: HashSet, } @@ -63,7 +64,7 @@ impl TunDeviceManager { /// Creates a new managed tunnel device. /// /// Panics if called without a Tokio runtime. - pub fn new(mtu: usize, num_threads: usize) -> Result { + pub fn new(mtu: usize) -> Result { let (cxn, handle, _) = new_connection().context("Failed to create netlink connection")?; let task = tokio::spawn(cxn); let connection = Connection { handle, task }; @@ -72,12 +73,26 @@ impl TunDeviceManager { connection, routes: Default::default(), mtu: mtu as u32, - num_threads, }) } pub fn make_tun(&mut self) -> Result> { - Ok(Box::new(Tun::new(self.num_threads)?)) + let tun = Box::new(Tun::new()?); + + // Do this in a separate task because: + // a) We want it to be infallible. + // b) We don't want `async` to creep into the API. + tokio::spawn({ + let handle = self.connection.handle.clone(); + + async move { + if let Err(e) = set_txqueue_length(handle, 10_000).await { + tracing::warn!("Failed to set TX queue length: {e}") + } + } + }); + + Ok(tun) } #[tracing::instrument(level = "trace", skip(self))] @@ -197,6 +212,31 @@ impl TunDeviceManager { } } +async fn set_txqueue_length(handle: Handle, queue_len: u32) -> Result<()> { + let index = handle + .link() + .get() + .match_name(TunDeviceManager::IFACE_NAME.to_string()) + .execute() + .try_next() + .await? + .context("No interface")? + .header + .index; + + handle + .link() + .set( + LinkUnspec::new_with_index(index) + .append_extra_attribute(LinkAttribute::TxQueueLen(queue_len)) + .build(), + ) + .execute() + .await?; + + Ok(()) +} + fn make_rule(handle: &Handle) -> RuleAddRequest { let mut rule = handle .rule() @@ -292,54 +332,48 @@ async fn remove_route(route: &IpNetwork, idx: u32, handle: &Handle) { tracing::warn!(%route, "Failed to remove route: {}", err_with_src(&err)); } +const QUEUE_SIZE: usize = 10_000; + #[derive(Debug)] pub struct Tun { - outbound_tx: flume::r#async::SendSink<'static, IpPacket>, + outbound_tx: PollSender, inbound_rx: mpsc::Receiver, } impl Tun { - pub fn new(num_threads: usize) -> Result { + pub fn new() -> Result { create_tun_device()?; - let (inbound_tx, inbound_rx) = mpsc::channel(1000); - let (outbound_tx, outbound_rx) = flume::bounded(1000); // flume is an MPMC channel, therefore perfect for workstealing outbound packets. + let (inbound_tx, inbound_rx) = mpsc::channel(QUEUE_SIZE); + let (outbound_tx, outbound_rx) = mpsc::channel(QUEUE_SIZE); - for n in 0..num_threads { - let fd = Arc::new(open_tun()?); - let outbound_rx = outbound_rx.clone().into_stream(); - let inbound_tx = inbound_tx.clone(); + let fd = Arc::new(open_tun()?); - std::thread::Builder::new() - .name(format!("TUN send {n}/{num_threads}")) - .spawn({ - let fd = fd.clone(); + std::thread::Builder::new() + .name("TUN send".to_owned()) + .spawn({ + let fd = fd.clone(); - move || { - firezone_logging::unwrap_or_warn!( - tun::unix::tun_send(fd, outbound_rx, write), - "Failed to send to TUN device: {}" - ) - } - }) - .map_err(io::Error::other)?; - std::thread::Builder::new() - .name(format!("TUN recv {n}/{num_threads}")) - .spawn({ - let fd = fd.clone(); - - move || { - firezone_logging::unwrap_or_warn!( - tun::unix::tun_recv(fd, inbound_tx, read), - "Failed to recv from TUN device: {}" - ) - } - }) - .map_err(io::Error::other)?; - } + move || { + firezone_logging::unwrap_or_warn!( + tun::unix::tun_send(fd, outbound_rx, write), + "Failed to send to TUN device: {}" + ) + } + }) + .map_err(io::Error::other)?; + std::thread::Builder::new() + .name("TUN recv".to_owned()) + .spawn(move || { + firezone_logging::unwrap_or_warn!( + tun::unix::tun_recv(fd, inbound_tx, read), + "Failed to recv from TUN device: {}" + ) + }) + .map_err(io::Error::other)?; Ok(Self { - outbound_tx: outbound_tx.into_sink(), + outbound_tx: PollSender::new(outbound_tx), inbound_rx, }) } @@ -400,6 +434,16 @@ impl tun::Tun for Tun { fn name(&self) -> &str { TunDeviceManager::IFACE_NAME } + + fn queue_lengths(&self) -> (usize, usize) { + ( + self.inbound_rx.len(), + self.outbound_tx + .get_ref() + .map(|s| QUEUE_SIZE - s.capacity()) + .unwrap_or_default(), + ) + } } fn get_last_error() -> io::Error { diff --git a/rust/bin-shared/src/tun_device_manager/macos.rs b/rust/bin-shared/src/tun_device_manager/macos.rs index 86f45dc05..3e6541b6f 100644 --- a/rust/bin-shared/src/tun_device_manager/macos.rs +++ b/rust/bin-shared/src/tun_device_manager/macos.rs @@ -7,7 +7,7 @@ use tun::Tun; pub struct TunDeviceManager {} impl TunDeviceManager { - pub fn new(_mtu: usize, _num_threads: usize) -> Result { + pub fn new(_mtu: usize) -> Result { bail!("Not implemented") } diff --git a/rust/bin-shared/src/tun_device_manager/windows.rs b/rust/bin-shared/src/tun_device_manager/windows.rs index 57267af77..67df51867 100644 --- a/rust/bin-shared/src/tun_device_manager/windows.rs +++ b/rust/bin-shared/src/tun_device_manager/windows.rs @@ -46,6 +46,8 @@ use wintun::Adapter; /// where that is configured. const RING_BUFFER_SIZE: u32 = 0x10_0000; +const QUEUE_SIZE: usize = 1000; + pub struct TunDeviceManager { mtu: u32, @@ -59,7 +61,7 @@ pub struct TunDeviceManager { impl TunDeviceManager { #[expect(clippy::unnecessary_wraps, reason = "Fallible on Linux")] - pub fn new(mtu: usize, _num_threads: usize) -> Result { + pub fn new(mtu: usize) -> Result { Ok(Self { iface_idx: None, luid: None, @@ -290,8 +292,8 @@ impl Tun { .start_session(RING_BUFFER_SIZE) .context("Failed to start session")?, ); - let (outbound_tx, outbound_rx) = mpsc::channel(1000); - let (inbound_tx, inbound_rx) = mpsc::channel(1000); // We want to be able to batch-receive from this. + let (outbound_tx, outbound_rx) = mpsc::channel(QUEUE_SIZE); + let (inbound_tx, inbound_rx) = mpsc::channel(QUEUE_SIZE); // We want to be able to batch-receive from this. let send_thread = start_send_thread(outbound_rx, Arc::downgrade(&session)) .context("Failed to start send thread")?; let recv_thread = start_recv_thread(inbound_tx, Arc::downgrade(&session)) @@ -359,6 +361,21 @@ impl tun::Tun for Tun { Ok(()) } + + fn queue_lengths(&self) -> (usize, usize) { + self.state + .as_ref() + .map(|s| { + ( + s.inbound_rx.len(), + s.outbound_tx + .get_ref() + .map(|s| QUEUE_SIZE - s.capacity()) + .unwrap_or_default(), + ) + }) + .unwrap_or_default() + } } // Moves packets from Internet towards the user diff --git a/rust/bin-shared/tests/dns_control_windows.rs b/rust/bin-shared/tests/dns_control_windows.rs index dfed79147..de967ef2c 100644 --- a/rust/bin-shared/tests/dns_control_windows.rs +++ b/rust/bin-shared/tests/dns_control_windows.rs @@ -14,7 +14,7 @@ fn dns_control() { .build() .unwrap(); - let mut tun_dev_manager = firezone_bin_shared::TunDeviceManager::new(1280, 1).unwrap(); // Note: num_threads (`1`) is unused on windows. + let mut tun_dev_manager = firezone_bin_shared::TunDeviceManager::new(1280).unwrap(); let _tun = tun_dev_manager.make_tun().unwrap(); rt.block_on(async { diff --git a/rust/bin-shared/tests/no_packet_loops_tcp.rs b/rust/bin-shared/tests/no_packet_loops_tcp.rs index 3c3ea6531..d48331ff4 100644 --- a/rust/bin-shared/tests/no_packet_loops_tcp.rs +++ b/rust/bin-shared/tests/no_packet_loops_tcp.rs @@ -15,7 +15,7 @@ async fn no_packet_loops_tcp() { let ipv4 = Ipv4Addr::from([100, 90, 215, 97]); let ipv6 = Ipv6Addr::from([0xfd00, 0x2021, 0x1111, 0x0, 0x0, 0x0, 0x0016, 0x588f]); - let mut device_manager = TunDeviceManager::new(1280, 1).unwrap(); + let mut device_manager = TunDeviceManager::new(1280).unwrap(); let _tun = device_manager.make_tun().unwrap(); device_manager.set_ips(ipv4, ipv6).await.unwrap(); diff --git a/rust/bin-shared/tests/no_packet_loops_udp.rs b/rust/bin-shared/tests/no_packet_loops_udp.rs index cc5c8e5aa..55148d206 100644 --- a/rust/bin-shared/tests/no_packet_loops_udp.rs +++ b/rust/bin-shared/tests/no_packet_loops_udp.rs @@ -24,7 +24,7 @@ async fn no_packet_loops_udp() { let bufferpool = BufferPool::::new(0, "test"); - let mut device_manager = TunDeviceManager::new(1280, 1).unwrap(); + let mut device_manager = TunDeviceManager::new(1280).unwrap(); let _tun = device_manager.make_tun().unwrap(); device_manager.set_ips(ipv4, ipv6).await.unwrap(); diff --git a/rust/bin-shared/tests/tunnel_drop.rs b/rust/bin-shared/tests/tunnel_drop.rs index 3cc779ea6..4e1c8b397 100644 --- a/rust/bin-shared/tests/tunnel_drop.rs +++ b/rust/bin-shared/tests/tunnel_drop.rs @@ -9,7 +9,7 @@ use firezone_bin_shared::TunDeviceManager; async fn tunnel_drop() { firezone_logging::test_global("debug"); // `Tun` uses threads and we want to see the logs of all threads. - let mut tun_device_manager = TunDeviceManager::new(1280, 1).unwrap(); + let mut tun_device_manager = TunDeviceManager::new(1280).unwrap(); // Each cycle takes about half a second, so this will take a fair bit to run. for _ in 0..50 { diff --git a/rust/client-ffi/Cargo.toml b/rust/client-ffi/Cargo.toml index 77a134ddc..c45fa840c 100644 --- a/rust/client-ffi/Cargo.toml +++ b/rust/client-ffi/Cargo.toml @@ -30,6 +30,7 @@ serde_json = { workspace = true } socket-factory = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "sync"] } +tokio-util = { workspace = true } tracing = { workspace = true, features = ["std", "attributes"] } tracing-appender = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/rust/client-ffi/src/platform/android/tun.rs b/rust/client-ffi/src/platform/android/tun.rs index a17c409db..f72c17bb9 100644 --- a/rust/client-ffi/src/platform/android/tun.rs +++ b/rust/client-ffi/src/platform/android/tun.rs @@ -4,12 +4,15 @@ use std::os::fd::{FromRawFd, OwnedFd}; use std::task::{Context, Poll}; use std::{io, os::fd::RawFd}; use tokio::sync::mpsc; +use tokio_util::sync::PollSender; use tun::ioctl; +const QUEUE_SIZE: usize = 1000; + #[derive(Debug)] pub struct Tun { name: String, - outbound_tx: flume::r#async::SendSink<'static, IpPacket>, + outbound_tx: PollSender, inbound_rx: mpsc::Receiver, _fd: OwnedFd, } @@ -41,6 +44,16 @@ impl tun::Tun for Tun { ) -> Poll { self.inbound_rx.poll_recv_many(cx, buf, max) } + + fn queue_lengths(&self) -> (usize, usize) { + ( + self.inbound_rx.len(), + self.outbound_tx + .get_ref() + .map(|s| QUEUE_SIZE - s.capacity()) + .unwrap_or_default(), + ) + } } impl Tun { @@ -53,16 +66,14 @@ impl Tun { pub unsafe fn from_fd(fd: RawFd) -> io::Result { let name = unsafe { interface_name(fd)? }; - let (inbound_tx, inbound_rx) = mpsc::channel(1000); - let (outbound_tx, outbound_rx) = flume::bounded(1000); // flume is an MPMC channel, therefore perfect for workstealing outbound packets. - - // TODO: Test whether we can set `IFF_MULTI_QUEUE` on Android devices. + let (inbound_tx, inbound_rx) = mpsc::channel(QUEUE_SIZE); + let (outbound_tx, outbound_rx) = mpsc::channel(QUEUE_SIZE); std::thread::Builder::new() .name("TUN send".to_owned()) .spawn(move || { firezone_logging::unwrap_or_warn!( - tun::unix::tun_send(fd, outbound_rx.into_stream(), write), + tun::unix::tun_send(fd, outbound_rx, write), "Failed to send to TUN device: {}" ) }) @@ -79,7 +90,7 @@ impl Tun { Ok(Tun { name, - outbound_tx: outbound_tx.into_sink(), + outbound_tx: PollSender::new(outbound_tx), inbound_rx, _fd: unsafe { OwnedFd::from_raw_fd(fd) }, // `OwnedFd` will close the fd on drop. }) diff --git a/rust/client-ffi/src/platform/fallback.rs b/rust/client-ffi/src/platform/fallback.rs index cfd89287b..df558db4c 100644 --- a/rust/client-ffi/src/platform/fallback.rs +++ b/rust/client-ffi/src/platform/fallback.rs @@ -65,4 +65,8 @@ impl tun::Tun for Tun { fn name(&self) -> &str { todo!() } + + fn queue_lengths(&self) -> (usize, usize) { + todo!() + } } diff --git a/rust/connlib/dns-over-tcp/tests/smoke_server.rs b/rust/connlib/dns-over-tcp/tests/smoke_server.rs index a9f424712..bc5514a7d 100644 --- a/rust/connlib/dns-over-tcp/tests/smoke_server.rs +++ b/rust/connlib/dns-over-tcp/tests/smoke_server.rs @@ -23,7 +23,7 @@ async fn smoke() { let ipv4 = Ipv4Addr::from([100, 90, 215, 97]); let ipv6 = Ipv6Addr::from([0xfd00, 0x2021, 0x1111, 0x0, 0x0, 0x0, 0x0016, 0x588f]); - let mut device_manager = TunDeviceManager::new(1280, 1).unwrap(); + let mut device_manager = TunDeviceManager::new(1280).unwrap(); let tun = device_manager.make_tun().unwrap(); device_manager.set_ips(ipv4, ipv6).await.unwrap(); device_manager diff --git a/rust/connlib/socket-factory/src/lib.rs b/rust/connlib/socket-factory/src/lib.rs index 3281e7b04..07a71695e 100644 --- a/rust/connlib/socket-factory/src/lib.rs +++ b/rust/connlib/socket-factory/src/lib.rs @@ -163,7 +163,7 @@ pub struct UdpSocket { /// A buffer pool for batches of incoming UDP packets. buffer_pool: BufferPool>, - gro_batch_histogram: opentelemetry::metrics::Histogram, + batch_histogram: opentelemetry::metrics::Histogram, port: u16, } @@ -184,7 +184,7 @@ impl UdpSocket { IpAddr::V6(_) => "udp-socket-v6", }, ), - gro_batch_histogram: opentelemetry::global::meter("connlib") + batch_histogram: opentelemetry::global::meter("connlib") .u64_histogram("system.network.packets.batch_count") .with_description( "How many batches of packets we have processed in a single syscall.", @@ -286,7 +286,7 @@ impl UdpSocket { .await .context("Failed to read from socket")?; - self.gro_batch_histogram.record( + self.batch_histogram.record( len as u64, &[ KeyValue::new("network.transport", "udp"), @@ -350,6 +350,16 @@ impl UdpSocket { } async fn send_inner(&self, chunk: Transmit<'_>) -> io::Result<()> { + let batch_size = chunk.contents.len() / chunk.segment_size.unwrap_or(chunk.contents.len()); + + self.batch_histogram.record( + batch_size as u64, + &[ + KeyValue::new("network.transport", "udp"), + KeyValue::new("network.io.direction", "transmit"), + ], + ); + self.inner .async_io(Interest::WRITABLE, || { match self.state.try_send((&self.inner).into(), &chunk) { diff --git a/rust/connlib/tun/Cargo.toml b/rust/connlib/tun/Cargo.toml index b5cd6e922..3f4d35495 100644 --- a/rust/connlib/tun/Cargo.toml +++ b/rust/connlib/tun/Cargo.toml @@ -10,8 +10,6 @@ anyhow = { workspace = true } ip-packet = { workspace = true } [target.'cfg(target_family = "unix")'.dependencies] -flume = { workspace = true } -futures = { workspace = true } libc = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } diff --git a/rust/connlib/tun/src/lib.rs b/rust/connlib/tun/src/lib.rs index d069f7c01..2c6424751 100644 --- a/rust/connlib/tun/src/lib.rs +++ b/rust/connlib/tun/src/lib.rs @@ -26,4 +26,7 @@ pub trait Tun: Send + Sync + 'static { /// The name of the TUN device. fn name(&self) -> &str; + + /// The number of inbounds and outbound packets sitting in queues. + fn queue_lengths(&self) -> (usize, usize); } diff --git a/rust/connlib/tun/src/unix.rs b/rust/connlib/tun/src/unix.rs index 065dfcc9d..6f3e09ea1 100644 --- a/rust/connlib/tun/src/unix.rs +++ b/rust/connlib/tun/src/unix.rs @@ -1,5 +1,4 @@ use anyhow::{Context as _, Result, bail}; -use futures::StreamExt as _; use ip_packet::{IpPacket, IpPacketBuf}; use std::io; use std::os::fd::AsRawFd; @@ -8,7 +7,7 @@ use tokio::sync::mpsc; pub fn tun_send( fd: T, - mut outbound_rx: flume::r#async::RecvStream<'_, IpPacket>, + mut outbound_rx: mpsc::Receiver, write: impl Fn(i32, &IpPacket) -> std::result::Result, ) -> Result<()> where @@ -21,7 +20,7 @@ where .block_on(async move { let fd = AsyncFd::with_interest(fd, tokio::io::Interest::WRITABLE)?; - while let Some(packet) = outbound_rx.next().await { + while let Some(packet) = outbound_rx.recv().await { if let Err(e) = fd .async_io(tokio::io::Interest::WRITABLE, |fd| { write(fd.as_raw_fd(), &packet) diff --git a/rust/connlib/tunnel/src/device_channel.rs b/rust/connlib/tunnel/src/device_channel.rs index c1de01889..71f6ed20f 100644 --- a/rust/connlib/tunnel/src/device_channel.rs +++ b/rust/connlib/tunnel/src/device_channel.rs @@ -85,6 +85,13 @@ impl Device { Ok(()) } + pub fn queue_lengths(&self) -> (usize, usize) { + self.tun + .as_ref() + .map(|t| t.queue_lengths()) + .unwrap_or_default() + } + fn tun(&mut self) -> io::Result<&mut dyn Tun> { Ok(self .tun diff --git a/rust/connlib/tunnel/src/io.rs b/rust/connlib/tunnel/src/io.rs index 75d86ec68..a0afc68ec 100644 --- a/rust/connlib/tunnel/src/io.rs +++ b/rust/connlib/tunnel/src/io.rs @@ -61,6 +61,7 @@ pub struct Io { tun: Device, outbound_packet_buffer: VecDeque, packet_counter: opentelemetry::metrics::Counter, + queue_lengths: opentelemetry::metrics::Histogram, } #[derive(Debug)] @@ -127,6 +128,13 @@ impl Io { .u64_counter("system.network.packets") .with_description("The number of packets processed.") .build(), + queue_lengths: opentelemetry::global::meter("connlib") + .u64_histogram("system.queue.length") + .with_description("The length of a queue.") + .with_boundaries(vec![ + 1.0, 2.0, 5.0, 10.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, + ]) + .build(), } } @@ -166,6 +174,8 @@ impl Io { > { ready!(self.flush(cx)?); + self.record_queue_lengths(); + if self.reval_nameserver_interval.poll_tick(cx).is_ready() { self.nameservers.evaluate(); } @@ -421,6 +431,33 @@ impl Io { ) -> io::Result<()> { self.tcp_dns_server.send_response(to, message) } + + /// Records the lengths of our RX und TX queues in a histogram. + fn record_queue_lengths(&self) { + let (tun_rx, tun_tx) = self.tun.queue_lengths(); + + self.queue_lengths.record( + tun_rx as u64, + &[ + otel::attr::network_io_direction_receive(), + otel::attr::queue_item_ip_packet(), + ], + ); + self.queue_lengths.record( + tun_tx as u64, + &[ + otel::attr::network_io_direction_transmit(), + otel::attr::queue_item_ip_packet(), + ], + ); + + let (udp_rx, udp_tx) = self.sockets.queue_lengths(); + + self.queue_lengths + .record(udp_rx as u64, &[otel::attr::queue_item_gro_batch()]); + self.queue_lengths + .record(udp_tx as u64, &[otel::attr::queue_item_gso_batch()]); + } } fn is_max_wg_packet_size(d: &DatagramIn) -> bool { @@ -545,5 +582,9 @@ mod tests { fn name(&self) -> &str { "dummy" } + + fn queue_lengths(&self) -> (usize, usize) { + (0, 0) + } } } diff --git a/rust/connlib/tunnel/src/sockets.rs b/rust/connlib/tunnel/src/sockets.rs index 73e901d82..73422b6a3 100644 --- a/rust/connlib/tunnel/src/sockets.rs +++ b/rust/connlib/tunnel/src/sockets.rs @@ -117,6 +117,22 @@ impl Sockets { Poll::Ready(Ok(iter)) } + + pub fn queue_lengths(&self) -> (usize, usize) { + let (v4_inbound, v4_outbound) = self + .socket_v4 + .as_ref() + .map(|s| s.queue_lengths()) + .unwrap_or_default(); + + let (v6_inbound, v6_outbound) = self + .socket_v6 + .as_ref() + .map(|s| s.queue_lengths()) + .unwrap_or_default(); + + (v4_inbound + v6_inbound, v4_outbound + v6_outbound) + } } struct PacketIter { @@ -333,6 +349,13 @@ impl ThreadedUdpSocket { fn channels_mut(&mut self) -> Result<&mut Channels> { self.channels.as_mut().context("Missing channels") } + + fn queue_lengths(&self) -> (usize, usize) { + self.channels + .as_ref() + .map(|c| (c.inbound_rx.len(), c.outbound_tx.len())) + .unwrap_or_default() + } } impl Drop for ThreadedUdpSocket { diff --git a/rust/deny.toml b/rust/deny.toml index 9993c7aa5..29aafa062 100644 --- a/rust/deny.toml +++ b/rust/deny.toml @@ -23,13 +23,13 @@ # dependencies not shared by any other crates, would be ignored, as the target # list here is effectively saying which targets you are building for. targets = [ - # The triple can be any string, but only the target triples built in to - # rustc (as of 1.40) can be checked against actual config expressions - #"x86_64-unknown-linux-musl", - # You can also specify which target_features you promise are enabled for a - # particular target. target_features are currently not validated against - # the actual valid features supported by the target architecture. - #{ triple = "wasm32-unknown-unknown", features = ["atomics"] }, + # The triple can be any string, but only the target triples built in to + # rustc (as of 1.40) can be checked against actual config expressions + #"x86_64-unknown-linux-musl", + # You can also specify which target_features you promise are enabled for a + # particular target. target_features are currently not validated against + # the actual valid features supported by the target architecture. + #{ triple = "wasm32-unknown-unknown", features = ["atomics"] }, ] # When creating the dependency graph used as the source of truth when checks are # executed, this field can be used to prune crates from the graph, removing them @@ -70,31 +70,31 @@ feature-depth = 1 # A list of advisory IDs to ignore. Note that ignored advisories will still # output a note when they are encountered. ignore = [ - "RUSTSEC-2020-0095", # `difference` is unmaintained - "RUSTSEC-2024-0384", # `instant` is unmaintained - "RUSTSEC-2024-0370", # `proc-macro-error` is unmaintained + "RUSTSEC-2020-0095", # `difference` is unmaintained + "RUSTSEC-2024-0384", # `instant` is unmaintained + "RUSTSEC-2024-0370", # `proc-macro-error` is unmaintained - # `gtk-rs` crates are unmaintained - "RUSTSEC-2024-0411", - "RUSTSEC-2024-0412", - "RUSTSEC-2024-0413", - "RUSTSEC-2024-0414", - "RUSTSEC-2024-0415", - "RUSTSEC-2024-0416", - "RUSTSEC-2024-0417", - "RUSTSEC-2024-0418", - "RUSTSEC-2024-0419", - "RUSTSEC-2024-0420", + # `gtk-rs` crates are unmaintained + "RUSTSEC-2024-0411", + "RUSTSEC-2024-0412", + "RUSTSEC-2024-0413", + "RUSTSEC-2024-0414", + "RUSTSEC-2024-0415", + "RUSTSEC-2024-0416", + "RUSTSEC-2024-0417", + "RUSTSEC-2024-0418", + "RUSTSEC-2024-0419", + "RUSTSEC-2024-0420", - "RUSTSEC-2025-0012", # backoff, See #8386 - "RUSTSEC-2024-0436", # paste, See #8387 + "RUSTSEC-2025-0012", # backoff, See #8386 + "RUSTSEC-2024-0436", # paste, See #8387 - "RUSTSEC-2024-0429", # `glib`, need to wait for tauri to upgrade + "RUSTSEC-2024-0429", # `glib`, need to wait for tauri to upgrade - #"RUSTSEC-0000-0000", - #{ id = "RUSTSEC-0000-0000", reason = "you can specify a reason the advisory is ignored" }, - #"a-crate-that-is-yanked@0.1.1", # you can also ignore yanked crate versions if you wish - #{ crate = "a-crate-that-is-yanked@0.1.1", reason = "you can specify why you are ignoring the yanked crate" }, + #"RUSTSEC-0000-0000", + #{ id = "RUSTSEC-0000-0000", reason = "you can specify a reason the advisory is ignored" }, + #"a-crate-that-is-yanked@0.1.1", # you can also ignore yanked crate versions if you wish + #{ crate = "a-crate-that-is-yanked@0.1.1", reason = "you can specify why you are ignoring the yanked crate" }, ] # If this is true, then cargo deny will use the git executable to fetch advisory database. # If this is false, then it uses a built-in git library. @@ -110,19 +110,19 @@ ignore = [ # See https://spdx.org/licenses/ for list of possible licenses # [possible values: any SPDX 3.11 short identifier (+ optional exception)]. allow = [ - "MIT", - "Apache-2.0", - "Apache-2.0 WITH LLVM-exception", - "BSD-2-Clause", - "BSD-3-Clause", - "MPL-2.0", - "ISC", - "0BSD", - "Unicode-3.0", - "BSL-1.0", - "Zlib", - "CDLA-Permissive-2.0", - "NCSA", + "MIT", + "Apache-2.0", + "Apache-2.0 WITH LLVM-exception", + "BSD-2-Clause", + "BSD-3-Clause", + "MPL-2.0", + "ISC", + "0BSD", + "Unicode-3.0", + "BSL-1.0", + "Zlib", + "CDLA-Permissive-2.0", + "NCSA", ] # The confidence threshold for detecting a license from license text. # The higher the value, the more closely the license text must be to the @@ -132,9 +132,9 @@ confidence-threshold = 0.8 # Allow 1 or more licenses on a per-crate basis, so that particular licenses # aren't accepted for every possible crate as with the normal allow list exceptions = [ - # Each entry is the crate and version constraint, and its specific allow - # list - #{ allow = ["Zlib"], crate = "adler32" }, + # Each entry is the crate and version constraint, and its specific allow + # list + #{ allow = ["Zlib"], crate = "adler32" }, ] # Some crates don't have (easily) machine readable licensing information, @@ -146,8 +146,8 @@ crate = "ring" # The SPDX expression for the license requirements of the crate expression = "MIT AND ISC AND OpenSSL" license-files = [ - # Each entry is a crate relative path, and the (opaque) hash of its contents - { path = "LICENSE", hash = 0xbd0eed23 }, + # Each entry is a crate relative path, and the (opaque) hash of its contents + { path = "LICENSE", hash = 0xbd0eed23 }, ] [licenses.private] @@ -165,7 +165,7 @@ ignore = true # is only published to private registries, and ignore is true, the crate will # not have its license(s) checked registries = [ - #"https://sekretz.com/registry + #"https://sekretz.com/registry ] # This section is considered when running `cargo deny check bans`. @@ -192,16 +192,16 @@ workspace-default-features = "allow" external-default-features = "allow" # List of crates that are allowed. Use with care! allow = [ - #"ansi_term@0.11.0", - #{ crate = "ansi_term@0.11.0", reason = "you can specify a reason it is allowed" }, + #"ansi_term@0.11.0", + #{ crate = "ansi_term@0.11.0", reason = "you can specify a reason it is allowed" }, ] # List of crates to deny deny = [ - #"ansi_term@0.11.0", - #{ crate = "ansi_term@0.11.0", reason = "you can specify a reason it is banned" }, - # Wrapper crates can optionally be specified to allow the crate when it - # is a direct dependency of the otherwise banned crate - #{ crate = "ansi_term@0.11.0", wrappers = ["this-crate-directly-depends-on-ansi_term"] }, + #"ansi_term@0.11.0", + #{ crate = "ansi_term@0.11.0", reason = "you can specify a reason it is banned" }, + # Wrapper crates can optionally be specified to allow the crate when it + # is a direct dependency of the otherwise banned crate + #{ crate = "ansi_term@0.11.0", wrappers = ["this-crate-directly-depends-on-ansi_term"] }, ] # List of features to allow/deny @@ -228,69 +228,68 @@ deny = [ #exact = true skip = [ - "base64", - "bitflags", - "core-foundation", - "core-graphics", - "core-graphics-types", - "derive_more", - "getrandom", - "hashbrown", - "heck", - "hermit-abi", - "indexmap", - "itertools", - "libloading", - "linux-raw-sys", - "nix", - "nu-ansi-term", - "phf", - "phf_codegen", - "phf_generator", - "phf_macros", - "phf_shared", - "proc-macro-crate", - "quick-xml", - "rand", - "rand_chacha", - "rand_core", - "raw-window-handle", - "redox_users", - "regex-automata", - "regex-syntax", - "rustix", - "siphasher", - "socket2", - "syn", - "thiserror", - "thiserror-impl", - "toml", - "toml_edit", - "tower", - "wasi", - "webpki-roots", - "windows-sys", - "windows-targets", - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_gnullvm", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", - "winnow", - "winreg", - #"ansi_term@0.11.0", - #{ crate = "ansi_term@0.11.0", reason = "you can specify a reason why it can't be updated/removed" }, + "base64", + "bitflags", + "core-foundation", + "core-graphics", + "core-graphics-types", + "derive_more", + "getrandom", + "hashbrown", + "heck", + "indexmap", + "itertools", + "libloading", + "linux-raw-sys", + "nix", + "nu-ansi-term", + "phf", + "phf_codegen", + "phf_generator", + "phf_macros", + "phf_shared", + "proc-macro-crate", + "quick-xml", + "rand", + "rand_chacha", + "rand_core", + "raw-window-handle", + "redox_users", + "regex-automata", + "regex-syntax", + "rustix", + "siphasher", + "socket2", + "syn", + "thiserror", + "thiserror-impl", + "toml", + "toml_edit", + "tower", + "wasi", + "webpki-roots", + "windows-sys", + "windows-targets", + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_gnullvm", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", + "winnow", + "winreg", + #"ansi_term@0.11.0", + #{ crate = "ansi_term@0.11.0", reason = "you can specify a reason why it can't be updated/removed" }, ] # Certain crates/versions that will be skipped when doing duplicate detection. # Similarly to `skip` allows you to skip certain crates during duplicate # detection. Unlike skip, it also includes the entire tree of transitive # dependencies starting at the specified crate, up to a certain depth, which is # by default infinite. skip-tree = [ - #"ansi_term@0.11.0", # will be skipped along with _all_ of its direct and transitive dependencies - #{ crate = "ansi_term@0.11.0", depth = 20 }, + #"ansi_term@0.11.0", # will be skipped along with _all_ of its direct and transitive dependencies + #{ crate = "ansi_term@0.11.0", depth = 20 }, ] # This section is considered when running `cargo deny check sources`. diff --git a/rust/gateway/Cargo.toml b/rust/gateway/Cargo.toml index 3bebd082a..d0da8e57a 100644 --- a/rust/gateway/Cargo.toml +++ b/rust/gateway/Cargo.toml @@ -25,7 +25,6 @@ ip-packet = { workspace = true } ip_network = { workspace = true } libc = { workspace = true, features = ["std", "const-extern-fn", "extra_traits"] } moka = { workspace = true, features = ["future"] } -num_cpus = { workspace = true } opentelemetry = { workspace = true, features = ["metrics"] } opentelemetry-otlp = { workspace = true, features = ["metrics", "grpc-tonic"] } opentelemetry-stdout = { workspace = true, features = ["metrics"] } diff --git a/rust/gateway/src/main.rs b/rust/gateway/src/main.rs index 3e0fe6efe..9550f5683 100644 --- a/rust/gateway/src/main.rs +++ b/rust/gateway/src/main.rs @@ -20,9 +20,9 @@ use phoenix_channel::get_user_agent; use futures::{TryFutureExt, future}; use phoenix_channel::PhoenixChannel; use secrecy::Secret; +use std::pin::pin; +use std::process::ExitCode; use std::{collections::BTreeSet, path::Path}; -use std::{fmt, pin::pin}; -use std::{process::ExitCode, str::FromStr}; use std::{sync::Arc, time::Duration}; use tokio::signal::ctrl_c; use tracing_subscriber::layer; @@ -182,7 +182,7 @@ async fn try_main(cli: Cli, telemetry: &mut Telemetry) -> Result<()> { ) .context("Failed to resolve portal URL")?; - let mut tun_device_manager = TunDeviceManager::new(ip_packet::MAX_IP_SIZE, cli.tun_threads.0) + let mut tun_device_manager = TunDeviceManager::new(ip_packet::MAX_IP_SIZE) .context("Failed to create TUN device manager")?; let tun = tun_device_manager .make_tun() @@ -274,10 +274,6 @@ struct Cli { #[arg(short = 'i', long, env = "FIREZONE_ID")] firezone_id: Option, - /// How many threads to use for reading and writing to the TUN device. - #[arg(long, env = "FIREZONE_NUM_TUN_THREADS", default_value_t)] - tun_threads: NumThreads, - /// Where to export metrics to. /// /// This configuration option is private API and has no stability guarantees. @@ -316,33 +312,6 @@ impl Cli { } } -#[derive(Debug, Clone, Copy)] -struct NumThreads(pub usize); - -impl Default for NumThreads { - fn default() -> Self { - if num_cpus::get() < 4 { - return Self(1); - } - - Self(2) - } -} - -impl fmt::Display for NumThreads { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.0) - } -} - -impl FromStr for NumThreads { - type Err = ::Err; - - fn from_str(s: &str) -> std::result::Result { - Ok(Self(s.parse()?)) - } -} - /// An adapter struct around [`Tun`] that validates IPv4, UDP and TCP checksums. struct ValidateChecksumAdapter { inner: Box, @@ -403,6 +372,10 @@ impl Tun for ValidateChecksumAdapter { fn name(&self) -> &str { self.inner.name() } + + fn queue_lengths(&self) -> (usize, usize) { + self.inner.queue_lengths() + } } impl ValidateChecksumAdapter { diff --git a/rust/gui-client/src-tauri/src/service.rs b/rust/gui-client/src-tauri/src/service.rs index dbac551b5..286b0e9da 100644 --- a/rust/gui-client/src-tauri/src/service.rs +++ b/rust/gui-client/src-tauri/src/service.rs @@ -290,7 +290,7 @@ impl<'a> Handler<'a> { .next_client_split() .await .context("Failed to wait for incoming IPC connection from a GUI")?; - let tun_device = TunDeviceManager::new(ip_packet::MAX_IP_SIZE, 1)?; + let tun_device = TunDeviceManager::new(ip_packet::MAX_IP_SIZE)?; let dns_notifier = new_dns_notifier().await?.boxed(); let network_notifier = new_network_notifier().await?.boxed(); diff --git a/rust/headless-client/src/main.rs b/rust/headless-client/src/main.rs index a8dc75cd0..eeb5e7853 100644 --- a/rust/headless-client/src/main.rs +++ b/rust/headless-client/src/main.rs @@ -276,7 +276,7 @@ fn main() -> Result<()> { let mut terminate = signals::Terminate::new()?; let mut hangup = signals::Hangup::new()?; - let mut tun_device = TunDeviceManager::new(ip_packet::MAX_IP_SIZE, 1)?; + let mut tun_device = TunDeviceManager::new(ip_packet::MAX_IP_SIZE)?; let tokio_handle = tokio::runtime::Handle::current(); diff --git a/rust/telemetry/src/otel.rs b/rust/telemetry/src/otel.rs index 953c8b77f..a25c5bdcd 100644 --- a/rust/telemetry/src/otel.rs +++ b/rust/telemetry/src/otel.rs @@ -85,6 +85,18 @@ pub mod attr { KeyValue::new("error.type", value) } + pub fn queue_item_ip_packet() -> KeyValue { + KeyValue::new("queue.item", "ip-packet") + } + + pub fn queue_item_gro_batch() -> KeyValue { + KeyValue::new("queue.item", "udp-gro-batch") + } + + pub fn queue_item_gso_batch() -> KeyValue { + KeyValue::new("queue.item", "udp-gso-batch") + } + #[cfg(test)] mod tests { use super::*; diff --git a/website/src/components/Changelog/Gateway.tsx b/website/src/components/Changelog/Gateway.tsx index 4e0c6d7d7..a06ce1e6c 100644 --- a/website/src/components/Changelog/Gateway.tsx +++ b/website/src/components/Changelog/Gateway.tsx @@ -22,7 +22,13 @@ export default function Gateway() { return ( - + + + Remove the FIREZONE_NUM_TUN_THREADS env variable. The Gateway will now + always default to a single TUN thread. Using multiple threads can + cause packet reordering which hurts TCP throughput performance. + + Fixes an issue where connections would fail to establish in