mirror of
https://github.com/outbackdingo/firezone.git
synced 2026-01-27 10:18:54 +00:00
feat(connlib): improve throughput on higher latencies (#10231)
Turns out the multi-threaded access of the TUN device on the Gateway causes packet reordering which makes the TCP congestion controller throttle the connection. Additionally, the default TX queue length of a TUN device on Linux is only 500 packets. With just a single thread and an increased TX queue length, we get a throughput performance of just over 1 GBit/s for a 20ms link between Client and Gateway with basically no packet drops: ``` Connecting to host 172.20.0.110, port 5201 [ 5] local 100.79.130.70 port 49546 connected to 172.20.0.110 port 5201 [ ID] Interval Transfer Bitrate Retr Cwnd [ 5] 0.00-1.00 sec 116 MBytes 977 Mbits/sec 0 6.40 MBytes [ 5] 1.00-2.00 sec 137 MBytes 1.15 Gbits/sec 0 6.40 MBytes [ 5] 2.00-3.00 sec 134 MBytes 1.13 Gbits/sec 0 6.40 MBytes [ 5] 3.00-4.00 sec 136 MBytes 1.14 Gbits/sec 47 6.40 MBytes [ 5] 4.00-5.00 sec 137 MBytes 1.15 Gbits/sec 0 6.40 MBytes [ 5] 5.00-6.00 sec 138 MBytes 1.16 Gbits/sec 0 6.40 MBytes [ 5] 6.00-7.00 sec 138 MBytes 1.15 Gbits/sec 0 6.40 MBytes [ 5] 7.00-8.00 sec 138 MBytes 1.15 Gbits/sec 0 6.40 MBytes [ 5] 8.00-9.00 sec 138 MBytes 1.16 Gbits/sec 0 6.40 MBytes [ 5] 9.00-10.00 sec 138 MBytes 1.15 Gbits/sec 0 6.40 MBytes [ 5] 10.00-11.00 sec 139 MBytes 1.17 Gbits/sec 0 6.40 MBytes [ 5] 11.00-12.00 sec 139 MBytes 1.17 Gbits/sec 0 6.40 MBytes [ 5] 12.00-13.00 sec 136 MBytes 1.14 Gbits/sec 0 6.40 MBytes [ 5] 13.00-14.00 sec 139 MBytes 1.17 Gbits/sec 0 6.40 MBytes [ 5] 14.00-15.00 sec 140 MBytes 1.17 Gbits/sec 0 6.40 MBytes [ 5] 15.00-16.00 sec 138 MBytes 1.16 Gbits/sec 0 6.40 MBytes [ 5] 16.00-17.00 sec 137 MBytes 1.15 Gbits/sec 0 6.40 MBytes [ 5] 17.00-18.00 sec 139 MBytes 1.17 Gbits/sec 0 6.40 MBytes [ 5] 18.00-19.00 sec 138 MBytes 1.16 Gbits/sec 0 6.40 MBytes [ 5] 19.00-20.00 sec 136 MBytes 1.14 Gbits/sec 0 6.40 MBytes - - - - - - - - - - - - - - - - - - - - - - - - - [ ID] Interval Transfer Bitrate Retr [ 5] 0.00-20.00 sec 2.67 GBytes 1.15 Gbits/sec 47 sender [ 5] 0.00-20.02 sec 2.67 GBytes 1.15 Gbits/sec receiver iperf Done. ``` For further debugging in the future, we are now recording the send and receive queue depths of both the TUN device and the UDP sockets. Neither of those showed to be full in my testing which leads me to conclude that it isn't any buffer inside Firezone that is too small here. Related: #7452 --------- Signed-off-by: Thomas Eizinger <thomas@eizinger.io>
This commit is contained in:
25
rust/Cargo.lock
generated
25
rust/Cargo.lock
generated
@@ -328,7 +328,6 @@ dependencies = [
|
||||
"dns-types",
|
||||
"firezone-logging",
|
||||
"firezone-telemetry",
|
||||
"flume",
|
||||
"futures",
|
||||
"ip-packet",
|
||||
"ip_network",
|
||||
@@ -342,6 +341,7 @@ dependencies = [
|
||||
"swift-bridge",
|
||||
"swift-bridge-build",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"tracing-appender",
|
||||
"tracing-subscriber",
|
||||
@@ -1327,6 +1327,7 @@ dependencies = [
|
||||
"socket-factory",
|
||||
"thiserror 2.0.15",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"tracing-appender",
|
||||
"tracing-subscriber",
|
||||
@@ -2340,7 +2341,6 @@ dependencies = [
|
||||
"dirs",
|
||||
"dns-types",
|
||||
"firezone-logging",
|
||||
"flume",
|
||||
"futures",
|
||||
"gat-lending-iterator",
|
||||
"hex",
|
||||
@@ -2403,7 +2403,6 @@ dependencies = [
|
||||
"libc",
|
||||
"moka",
|
||||
"nix 0.30.1",
|
||||
"num_cpus",
|
||||
"opentelemetry",
|
||||
"opentelemetry-otlp",
|
||||
"opentelemetry-stdout",
|
||||
@@ -3336,12 +3335,6 @@ version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fbf6a919d6cf397374f7dfeeea91d974c7c0a7221d0d0f4f20d859d329e53fcc"
|
||||
|
||||
[[package]]
|
||||
name = "hermit-abi"
|
||||
version = "0.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f154ce46856750ed433c8649605bf7ed2de3bc35fd9d2a9f30cddd873c80cb08"
|
||||
|
||||
[[package]]
|
||||
name = "hex"
|
||||
version = "0.4.3"
|
||||
@@ -4722,16 +4715,6 @@ dependencies = [
|
||||
"autocfg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num_cpus"
|
||||
version = "1.17.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b"
|
||||
dependencies = [
|
||||
"hermit-abi 0.5.1",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num_enum"
|
||||
version = "0.7.3"
|
||||
@@ -5473,7 +5456,7 @@ checksum = "a604568c3202727d1507653cb121dbd627a58684eb09a820fd746bee38b4442f"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"concurrent-queue",
|
||||
"hermit-abi 0.4.0",
|
||||
"hermit-abi",
|
||||
"pin-project-lite",
|
||||
"rustix 0.38.44",
|
||||
"tracing",
|
||||
@@ -8392,8 +8375,6 @@ name = "tun"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"flume",
|
||||
"futures",
|
||||
"ip-packet",
|
||||
"libc",
|
||||
"tokio",
|
||||
|
||||
@@ -18,7 +18,6 @@ connlib-model = { workspace = true }
|
||||
dns-types = { workspace = true }
|
||||
firezone-logging = { workspace = true }
|
||||
firezone-telemetry = { workspace = true }
|
||||
flume = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
ip-packet = { workspace = true }
|
||||
ip_network = { workspace = true }
|
||||
@@ -30,6 +29,7 @@ serde_json = { workspace = true }
|
||||
socket-factory = { workspace = true }
|
||||
swift-bridge = { workspace = true }
|
||||
tokio = { workspace = true, features = ["rt-multi-thread", "sync"] }
|
||||
tokio-util = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
tracing-appender = { workspace = true }
|
||||
tracing-subscriber = { workspace = true }
|
||||
|
||||
@@ -7,11 +7,14 @@ use std::{
|
||||
os::fd::{AsRawFd as _, RawFd},
|
||||
};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_util::sync::PollSender;
|
||||
|
||||
const QUEUE_SIZE: usize = 10_000;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Tun {
|
||||
name: String,
|
||||
outbound_tx: flume::r#async::SendSink<'static, IpPacket>,
|
||||
outbound_tx: PollSender<IpPacket>,
|
||||
inbound_rx: mpsc::Receiver<IpPacket>,
|
||||
}
|
||||
|
||||
@@ -21,14 +24,14 @@ impl Tun {
|
||||
set_non_blocking(fd)?;
|
||||
let name = name(fd)?;
|
||||
|
||||
let (inbound_tx, inbound_rx) = mpsc::channel(1000);
|
||||
let (outbound_tx, outbound_rx) = flume::bounded(1000); // flume is an MPMC channel, therefore perfect for workstealing outbound packets.
|
||||
let (inbound_tx, inbound_rx) = mpsc::channel(QUEUE_SIZE);
|
||||
let (outbound_tx, outbound_rx) = mpsc::channel(QUEUE_SIZE);
|
||||
|
||||
std::thread::Builder::new()
|
||||
.name("TUN send".to_owned())
|
||||
.spawn(move || {
|
||||
firezone_logging::unwrap_or_warn!(
|
||||
tun::unix::tun_send(fd, outbound_rx.into_stream(), write),
|
||||
tun::unix::tun_send(fd, outbound_rx, write),
|
||||
"Failed to send to TUN device: {}"
|
||||
)
|
||||
})
|
||||
@@ -45,7 +48,7 @@ impl Tun {
|
||||
|
||||
Ok(Tun {
|
||||
name,
|
||||
outbound_tx: outbound_tx.into_sink(),
|
||||
outbound_tx: PollSender::new(outbound_tx),
|
||||
inbound_rx,
|
||||
})
|
||||
}
|
||||
@@ -78,6 +81,16 @@ impl tun::Tun for Tun {
|
||||
) -> Poll<usize> {
|
||||
self.inbound_rx.poll_recv_many(cx, buf, max)
|
||||
}
|
||||
|
||||
fn queue_lengths(&self) -> (usize, usize) {
|
||||
(
|
||||
self.inbound_rx.len(),
|
||||
self.outbound_tx
|
||||
.get_ref()
|
||||
.map(|s| QUEUE_SIZE - s.capacity())
|
||||
.unwrap_or_default(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
fn get_last_error() -> io::Error {
|
||||
|
||||
@@ -33,13 +33,13 @@ uuid = { workspace = true, features = ["v4"] }
|
||||
[target.'cfg(target_os = "linux")'.dependencies]
|
||||
atomicwrites = { workspace = true }
|
||||
dirs = { workspace = true }
|
||||
flume = { workspace = true }
|
||||
libc = { workspace = true }
|
||||
netlink-packet-core = { workspace = true }
|
||||
netlink-packet-route = { workspace = true }
|
||||
nix = { workspace = true, features = ["socket"] }
|
||||
resolv-conf = { workspace = true }
|
||||
rtnetlink = { workspace = true }
|
||||
tokio-util = { workspace = true }
|
||||
zbus = { workspace = true } # Can't use `zbus`'s `tokio` feature here, or it will break toast popups all the way over in `gui-client`.
|
||||
|
||||
[target.'cfg(windows)'.dependencies]
|
||||
|
||||
@@ -10,6 +10,7 @@ use libc::{
|
||||
EEXIST, ENOENT, ESRCH, F_GETFL, F_SETFL, O_NONBLOCK, O_RDWR, S_IFCHR, fcntl, makedev, mknod,
|
||||
open,
|
||||
};
|
||||
use netlink_packet_route::link::LinkAttribute;
|
||||
use netlink_packet_route::route::{RouteMessage, RouteProtocol, RouteScope};
|
||||
use netlink_packet_route::rule::RuleAction;
|
||||
use rtnetlink::{Error::NetlinkError, Handle, RuleAddRequest, new_connection};
|
||||
@@ -28,6 +29,7 @@ use std::{
|
||||
os::{fd::RawFd, unix::fs::PermissionsExt},
|
||||
};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_util::sync::PollSender;
|
||||
use tun::ioctl;
|
||||
|
||||
const TUNSETIFF: libc::c_ulong = 0x4004_54ca;
|
||||
@@ -41,7 +43,6 @@ const FIREZONE_TABLE: u32 = 0x2021_fd00;
|
||||
/// For lack of a better name
|
||||
pub struct TunDeviceManager {
|
||||
mtu: u32,
|
||||
num_threads: usize,
|
||||
connection: Connection,
|
||||
routes: HashSet<IpNetwork>,
|
||||
}
|
||||
@@ -63,7 +64,7 @@ impl TunDeviceManager {
|
||||
/// Creates a new managed tunnel device.
|
||||
///
|
||||
/// Panics if called without a Tokio runtime.
|
||||
pub fn new(mtu: usize, num_threads: usize) -> Result<Self> {
|
||||
pub fn new(mtu: usize) -> Result<Self> {
|
||||
let (cxn, handle, _) = new_connection().context("Failed to create netlink connection")?;
|
||||
let task = tokio::spawn(cxn);
|
||||
let connection = Connection { handle, task };
|
||||
@@ -72,12 +73,26 @@ impl TunDeviceManager {
|
||||
connection,
|
||||
routes: Default::default(),
|
||||
mtu: mtu as u32,
|
||||
num_threads,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn make_tun(&mut self) -> Result<Box<dyn tun::Tun>> {
|
||||
Ok(Box::new(Tun::new(self.num_threads)?))
|
||||
let tun = Box::new(Tun::new()?);
|
||||
|
||||
// Do this in a separate task because:
|
||||
// a) We want it to be infallible.
|
||||
// b) We don't want `async` to creep into the API.
|
||||
tokio::spawn({
|
||||
let handle = self.connection.handle.clone();
|
||||
|
||||
async move {
|
||||
if let Err(e) = set_txqueue_length(handle, 10_000).await {
|
||||
tracing::warn!("Failed to set TX queue length: {e}")
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(tun)
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip(self))]
|
||||
@@ -197,6 +212,31 @@ impl TunDeviceManager {
|
||||
}
|
||||
}
|
||||
|
||||
async fn set_txqueue_length(handle: Handle, queue_len: u32) -> Result<()> {
|
||||
let index = handle
|
||||
.link()
|
||||
.get()
|
||||
.match_name(TunDeviceManager::IFACE_NAME.to_string())
|
||||
.execute()
|
||||
.try_next()
|
||||
.await?
|
||||
.context("No interface")?
|
||||
.header
|
||||
.index;
|
||||
|
||||
handle
|
||||
.link()
|
||||
.set(
|
||||
LinkUnspec::new_with_index(index)
|
||||
.append_extra_attribute(LinkAttribute::TxQueueLen(queue_len))
|
||||
.build(),
|
||||
)
|
||||
.execute()
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn make_rule(handle: &Handle) -> RuleAddRequest {
|
||||
let mut rule = handle
|
||||
.rule()
|
||||
@@ -292,54 +332,48 @@ async fn remove_route(route: &IpNetwork, idx: u32, handle: &Handle) {
|
||||
tracing::warn!(%route, "Failed to remove route: {}", err_with_src(&err));
|
||||
}
|
||||
|
||||
const QUEUE_SIZE: usize = 10_000;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Tun {
|
||||
outbound_tx: flume::r#async::SendSink<'static, IpPacket>,
|
||||
outbound_tx: PollSender<IpPacket>,
|
||||
inbound_rx: mpsc::Receiver<IpPacket>,
|
||||
}
|
||||
|
||||
impl Tun {
|
||||
pub fn new(num_threads: usize) -> Result<Self> {
|
||||
pub fn new() -> Result<Self> {
|
||||
create_tun_device()?;
|
||||
|
||||
let (inbound_tx, inbound_rx) = mpsc::channel(1000);
|
||||
let (outbound_tx, outbound_rx) = flume::bounded(1000); // flume is an MPMC channel, therefore perfect for workstealing outbound packets.
|
||||
let (inbound_tx, inbound_rx) = mpsc::channel(QUEUE_SIZE);
|
||||
let (outbound_tx, outbound_rx) = mpsc::channel(QUEUE_SIZE);
|
||||
|
||||
for n in 0..num_threads {
|
||||
let fd = Arc::new(open_tun()?);
|
||||
let outbound_rx = outbound_rx.clone().into_stream();
|
||||
let inbound_tx = inbound_tx.clone();
|
||||
let fd = Arc::new(open_tun()?);
|
||||
|
||||
std::thread::Builder::new()
|
||||
.name(format!("TUN send {n}/{num_threads}"))
|
||||
.spawn({
|
||||
let fd = fd.clone();
|
||||
std::thread::Builder::new()
|
||||
.name("TUN send".to_owned())
|
||||
.spawn({
|
||||
let fd = fd.clone();
|
||||
|
||||
move || {
|
||||
firezone_logging::unwrap_or_warn!(
|
||||
tun::unix::tun_send(fd, outbound_rx, write),
|
||||
"Failed to send to TUN device: {}"
|
||||
)
|
||||
}
|
||||
})
|
||||
.map_err(io::Error::other)?;
|
||||
std::thread::Builder::new()
|
||||
.name(format!("TUN recv {n}/{num_threads}"))
|
||||
.spawn({
|
||||
let fd = fd.clone();
|
||||
|
||||
move || {
|
||||
firezone_logging::unwrap_or_warn!(
|
||||
tun::unix::tun_recv(fd, inbound_tx, read),
|
||||
"Failed to recv from TUN device: {}"
|
||||
)
|
||||
}
|
||||
})
|
||||
.map_err(io::Error::other)?;
|
||||
}
|
||||
move || {
|
||||
firezone_logging::unwrap_or_warn!(
|
||||
tun::unix::tun_send(fd, outbound_rx, write),
|
||||
"Failed to send to TUN device: {}"
|
||||
)
|
||||
}
|
||||
})
|
||||
.map_err(io::Error::other)?;
|
||||
std::thread::Builder::new()
|
||||
.name("TUN recv".to_owned())
|
||||
.spawn(move || {
|
||||
firezone_logging::unwrap_or_warn!(
|
||||
tun::unix::tun_recv(fd, inbound_tx, read),
|
||||
"Failed to recv from TUN device: {}"
|
||||
)
|
||||
})
|
||||
.map_err(io::Error::other)?;
|
||||
|
||||
Ok(Self {
|
||||
outbound_tx: outbound_tx.into_sink(),
|
||||
outbound_tx: PollSender::new(outbound_tx),
|
||||
inbound_rx,
|
||||
})
|
||||
}
|
||||
@@ -400,6 +434,16 @@ impl tun::Tun for Tun {
|
||||
fn name(&self) -> &str {
|
||||
TunDeviceManager::IFACE_NAME
|
||||
}
|
||||
|
||||
fn queue_lengths(&self) -> (usize, usize) {
|
||||
(
|
||||
self.inbound_rx.len(),
|
||||
self.outbound_tx
|
||||
.get_ref()
|
||||
.map(|s| QUEUE_SIZE - s.capacity())
|
||||
.unwrap_or_default(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
fn get_last_error() -> io::Error {
|
||||
|
||||
@@ -7,7 +7,7 @@ use tun::Tun;
|
||||
pub struct TunDeviceManager {}
|
||||
|
||||
impl TunDeviceManager {
|
||||
pub fn new(_mtu: usize, _num_threads: usize) -> Result<Self> {
|
||||
pub fn new(_mtu: usize) -> Result<Self> {
|
||||
bail!("Not implemented")
|
||||
}
|
||||
|
||||
|
||||
@@ -46,6 +46,8 @@ use wintun::Adapter;
|
||||
/// where that is configured.
|
||||
const RING_BUFFER_SIZE: u32 = 0x10_0000;
|
||||
|
||||
const QUEUE_SIZE: usize = 1000;
|
||||
|
||||
pub struct TunDeviceManager {
|
||||
mtu: u32,
|
||||
|
||||
@@ -59,7 +61,7 @@ pub struct TunDeviceManager {
|
||||
|
||||
impl TunDeviceManager {
|
||||
#[expect(clippy::unnecessary_wraps, reason = "Fallible on Linux")]
|
||||
pub fn new(mtu: usize, _num_threads: usize) -> Result<Self> {
|
||||
pub fn new(mtu: usize) -> Result<Self> {
|
||||
Ok(Self {
|
||||
iface_idx: None,
|
||||
luid: None,
|
||||
@@ -290,8 +292,8 @@ impl Tun {
|
||||
.start_session(RING_BUFFER_SIZE)
|
||||
.context("Failed to start session")?,
|
||||
);
|
||||
let (outbound_tx, outbound_rx) = mpsc::channel(1000);
|
||||
let (inbound_tx, inbound_rx) = mpsc::channel(1000); // We want to be able to batch-receive from this.
|
||||
let (outbound_tx, outbound_rx) = mpsc::channel(QUEUE_SIZE);
|
||||
let (inbound_tx, inbound_rx) = mpsc::channel(QUEUE_SIZE); // We want to be able to batch-receive from this.
|
||||
let send_thread = start_send_thread(outbound_rx, Arc::downgrade(&session))
|
||||
.context("Failed to start send thread")?;
|
||||
let recv_thread = start_recv_thread(inbound_tx, Arc::downgrade(&session))
|
||||
@@ -359,6 +361,21 @@ impl tun::Tun for Tun {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn queue_lengths(&self) -> (usize, usize) {
|
||||
self.state
|
||||
.as_ref()
|
||||
.map(|s| {
|
||||
(
|
||||
s.inbound_rx.len(),
|
||||
s.outbound_tx
|
||||
.get_ref()
|
||||
.map(|s| QUEUE_SIZE - s.capacity())
|
||||
.unwrap_or_default(),
|
||||
)
|
||||
})
|
||||
.unwrap_or_default()
|
||||
}
|
||||
}
|
||||
|
||||
// Moves packets from Internet towards the user
|
||||
|
||||
@@ -14,7 +14,7 @@ fn dns_control() {
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let mut tun_dev_manager = firezone_bin_shared::TunDeviceManager::new(1280, 1).unwrap(); // Note: num_threads (`1`) is unused on windows.
|
||||
let mut tun_dev_manager = firezone_bin_shared::TunDeviceManager::new(1280).unwrap();
|
||||
let _tun = tun_dev_manager.make_tun().unwrap();
|
||||
|
||||
rt.block_on(async {
|
||||
|
||||
@@ -15,7 +15,7 @@ async fn no_packet_loops_tcp() {
|
||||
let ipv4 = Ipv4Addr::from([100, 90, 215, 97]);
|
||||
let ipv6 = Ipv6Addr::from([0xfd00, 0x2021, 0x1111, 0x0, 0x0, 0x0, 0x0016, 0x588f]);
|
||||
|
||||
let mut device_manager = TunDeviceManager::new(1280, 1).unwrap();
|
||||
let mut device_manager = TunDeviceManager::new(1280).unwrap();
|
||||
let _tun = device_manager.make_tun().unwrap();
|
||||
device_manager.set_ips(ipv4, ipv6).await.unwrap();
|
||||
|
||||
|
||||
@@ -24,7 +24,7 @@ async fn no_packet_loops_udp() {
|
||||
|
||||
let bufferpool = BufferPool::<BytesMut>::new(0, "test");
|
||||
|
||||
let mut device_manager = TunDeviceManager::new(1280, 1).unwrap();
|
||||
let mut device_manager = TunDeviceManager::new(1280).unwrap();
|
||||
let _tun = device_manager.make_tun().unwrap();
|
||||
device_manager.set_ips(ipv4, ipv6).await.unwrap();
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ use firezone_bin_shared::TunDeviceManager;
|
||||
async fn tunnel_drop() {
|
||||
firezone_logging::test_global("debug"); // `Tun` uses threads and we want to see the logs of all threads.
|
||||
|
||||
let mut tun_device_manager = TunDeviceManager::new(1280, 1).unwrap();
|
||||
let mut tun_device_manager = TunDeviceManager::new(1280).unwrap();
|
||||
|
||||
// Each cycle takes about half a second, so this will take a fair bit to run.
|
||||
for _ in 0..50 {
|
||||
|
||||
@@ -30,6 +30,7 @@ serde_json = { workspace = true }
|
||||
socket-factory = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tokio = { workspace = true, features = ["rt-multi-thread", "sync"] }
|
||||
tokio-util = { workspace = true }
|
||||
tracing = { workspace = true, features = ["std", "attributes"] }
|
||||
tracing-appender = { workspace = true }
|
||||
tracing-subscriber = { workspace = true }
|
||||
|
||||
@@ -4,12 +4,15 @@ use std::os::fd::{FromRawFd, OwnedFd};
|
||||
use std::task::{Context, Poll};
|
||||
use std::{io, os::fd::RawFd};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_util::sync::PollSender;
|
||||
use tun::ioctl;
|
||||
|
||||
const QUEUE_SIZE: usize = 1000;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Tun {
|
||||
name: String,
|
||||
outbound_tx: flume::r#async::SendSink<'static, IpPacket>,
|
||||
outbound_tx: PollSender<IpPacket>,
|
||||
inbound_rx: mpsc::Receiver<IpPacket>,
|
||||
_fd: OwnedFd,
|
||||
}
|
||||
@@ -41,6 +44,16 @@ impl tun::Tun for Tun {
|
||||
) -> Poll<usize> {
|
||||
self.inbound_rx.poll_recv_many(cx, buf, max)
|
||||
}
|
||||
|
||||
fn queue_lengths(&self) -> (usize, usize) {
|
||||
(
|
||||
self.inbound_rx.len(),
|
||||
self.outbound_tx
|
||||
.get_ref()
|
||||
.map(|s| QUEUE_SIZE - s.capacity())
|
||||
.unwrap_or_default(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl Tun {
|
||||
@@ -53,16 +66,14 @@ impl Tun {
|
||||
pub unsafe fn from_fd(fd: RawFd) -> io::Result<Self> {
|
||||
let name = unsafe { interface_name(fd)? };
|
||||
|
||||
let (inbound_tx, inbound_rx) = mpsc::channel(1000);
|
||||
let (outbound_tx, outbound_rx) = flume::bounded(1000); // flume is an MPMC channel, therefore perfect for workstealing outbound packets.
|
||||
|
||||
// TODO: Test whether we can set `IFF_MULTI_QUEUE` on Android devices.
|
||||
let (inbound_tx, inbound_rx) = mpsc::channel(QUEUE_SIZE);
|
||||
let (outbound_tx, outbound_rx) = mpsc::channel(QUEUE_SIZE);
|
||||
|
||||
std::thread::Builder::new()
|
||||
.name("TUN send".to_owned())
|
||||
.spawn(move || {
|
||||
firezone_logging::unwrap_or_warn!(
|
||||
tun::unix::tun_send(fd, outbound_rx.into_stream(), write),
|
||||
tun::unix::tun_send(fd, outbound_rx, write),
|
||||
"Failed to send to TUN device: {}"
|
||||
)
|
||||
})
|
||||
@@ -79,7 +90,7 @@ impl Tun {
|
||||
|
||||
Ok(Tun {
|
||||
name,
|
||||
outbound_tx: outbound_tx.into_sink(),
|
||||
outbound_tx: PollSender::new(outbound_tx),
|
||||
inbound_rx,
|
||||
_fd: unsafe { OwnedFd::from_raw_fd(fd) }, // `OwnedFd` will close the fd on drop.
|
||||
})
|
||||
|
||||
@@ -65,4 +65,8 @@ impl tun::Tun for Tun {
|
||||
fn name(&self) -> &str {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn queue_lengths(&self) -> (usize, usize) {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,7 +23,7 @@ async fn smoke() {
|
||||
let ipv4 = Ipv4Addr::from([100, 90, 215, 97]);
|
||||
let ipv6 = Ipv6Addr::from([0xfd00, 0x2021, 0x1111, 0x0, 0x0, 0x0, 0x0016, 0x588f]);
|
||||
|
||||
let mut device_manager = TunDeviceManager::new(1280, 1).unwrap();
|
||||
let mut device_manager = TunDeviceManager::new(1280).unwrap();
|
||||
let tun = device_manager.make_tun().unwrap();
|
||||
device_manager.set_ips(ipv4, ipv6).await.unwrap();
|
||||
device_manager
|
||||
|
||||
@@ -163,7 +163,7 @@ pub struct UdpSocket {
|
||||
/// A buffer pool for batches of incoming UDP packets.
|
||||
buffer_pool: BufferPool<Vec<u8>>,
|
||||
|
||||
gro_batch_histogram: opentelemetry::metrics::Histogram<u64>,
|
||||
batch_histogram: opentelemetry::metrics::Histogram<u64>,
|
||||
port: u16,
|
||||
}
|
||||
|
||||
@@ -184,7 +184,7 @@ impl UdpSocket {
|
||||
IpAddr::V6(_) => "udp-socket-v6",
|
||||
},
|
||||
),
|
||||
gro_batch_histogram: opentelemetry::global::meter("connlib")
|
||||
batch_histogram: opentelemetry::global::meter("connlib")
|
||||
.u64_histogram("system.network.packets.batch_count")
|
||||
.with_description(
|
||||
"How many batches of packets we have processed in a single syscall.",
|
||||
@@ -286,7 +286,7 @@ impl UdpSocket {
|
||||
.await
|
||||
.context("Failed to read from socket")?;
|
||||
|
||||
self.gro_batch_histogram.record(
|
||||
self.batch_histogram.record(
|
||||
len as u64,
|
||||
&[
|
||||
KeyValue::new("network.transport", "udp"),
|
||||
@@ -350,6 +350,16 @@ impl UdpSocket {
|
||||
}
|
||||
|
||||
async fn send_inner(&self, chunk: Transmit<'_>) -> io::Result<()> {
|
||||
let batch_size = chunk.contents.len() / chunk.segment_size.unwrap_or(chunk.contents.len());
|
||||
|
||||
self.batch_histogram.record(
|
||||
batch_size as u64,
|
||||
&[
|
||||
KeyValue::new("network.transport", "udp"),
|
||||
KeyValue::new("network.io.direction", "transmit"),
|
||||
],
|
||||
);
|
||||
|
||||
self.inner
|
||||
.async_io(Interest::WRITABLE, || {
|
||||
match self.state.try_send((&self.inner).into(), &chunk) {
|
||||
|
||||
@@ -10,8 +10,6 @@ anyhow = { workspace = true }
|
||||
ip-packet = { workspace = true }
|
||||
|
||||
[target.'cfg(target_family = "unix")'.dependencies]
|
||||
flume = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
libc = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
|
||||
@@ -26,4 +26,7 @@ pub trait Tun: Send + Sync + 'static {
|
||||
|
||||
/// The name of the TUN device.
|
||||
fn name(&self) -> &str;
|
||||
|
||||
/// The number of inbounds and outbound packets sitting in queues.
|
||||
fn queue_lengths(&self) -> (usize, usize);
|
||||
}
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use anyhow::{Context as _, Result, bail};
|
||||
use futures::StreamExt as _;
|
||||
use ip_packet::{IpPacket, IpPacketBuf};
|
||||
use std::io;
|
||||
use std::os::fd::AsRawFd;
|
||||
@@ -8,7 +7,7 @@ use tokio::sync::mpsc;
|
||||
|
||||
pub fn tun_send<T>(
|
||||
fd: T,
|
||||
mut outbound_rx: flume::r#async::RecvStream<'_, IpPacket>,
|
||||
mut outbound_rx: mpsc::Receiver<IpPacket>,
|
||||
write: impl Fn(i32, &IpPacket) -> std::result::Result<usize, io::Error>,
|
||||
) -> Result<()>
|
||||
where
|
||||
@@ -21,7 +20,7 @@ where
|
||||
.block_on(async move {
|
||||
let fd = AsyncFd::with_interest(fd, tokio::io::Interest::WRITABLE)?;
|
||||
|
||||
while let Some(packet) = outbound_rx.next().await {
|
||||
while let Some(packet) = outbound_rx.recv().await {
|
||||
if let Err(e) = fd
|
||||
.async_io(tokio::io::Interest::WRITABLE, |fd| {
|
||||
write(fd.as_raw_fd(), &packet)
|
||||
|
||||
@@ -85,6 +85,13 @@ impl Device {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn queue_lengths(&self) -> (usize, usize) {
|
||||
self.tun
|
||||
.as_ref()
|
||||
.map(|t| t.queue_lengths())
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
fn tun(&mut self) -> io::Result<&mut dyn Tun> {
|
||||
Ok(self
|
||||
.tun
|
||||
|
||||
@@ -61,6 +61,7 @@ pub struct Io {
|
||||
tun: Device,
|
||||
outbound_packet_buffer: VecDeque<IpPacket>,
|
||||
packet_counter: opentelemetry::metrics::Counter<u64>,
|
||||
queue_lengths: opentelemetry::metrics::Histogram<u64>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -127,6 +128,13 @@ impl Io {
|
||||
.u64_counter("system.network.packets")
|
||||
.with_description("The number of packets processed.")
|
||||
.build(),
|
||||
queue_lengths: opentelemetry::global::meter("connlib")
|
||||
.u64_histogram("system.queue.length")
|
||||
.with_description("The length of a queue.")
|
||||
.with_boundaries(vec![
|
||||
1.0, 2.0, 5.0, 10.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0,
|
||||
])
|
||||
.build(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -166,6 +174,8 @@ impl Io {
|
||||
> {
|
||||
ready!(self.flush(cx)?);
|
||||
|
||||
self.record_queue_lengths();
|
||||
|
||||
if self.reval_nameserver_interval.poll_tick(cx).is_ready() {
|
||||
self.nameservers.evaluate();
|
||||
}
|
||||
@@ -421,6 +431,33 @@ impl Io {
|
||||
) -> io::Result<()> {
|
||||
self.tcp_dns_server.send_response(to, message)
|
||||
}
|
||||
|
||||
/// Records the lengths of our RX und TX queues in a histogram.
|
||||
fn record_queue_lengths(&self) {
|
||||
let (tun_rx, tun_tx) = self.tun.queue_lengths();
|
||||
|
||||
self.queue_lengths.record(
|
||||
tun_rx as u64,
|
||||
&[
|
||||
otel::attr::network_io_direction_receive(),
|
||||
otel::attr::queue_item_ip_packet(),
|
||||
],
|
||||
);
|
||||
self.queue_lengths.record(
|
||||
tun_tx as u64,
|
||||
&[
|
||||
otel::attr::network_io_direction_transmit(),
|
||||
otel::attr::queue_item_ip_packet(),
|
||||
],
|
||||
);
|
||||
|
||||
let (udp_rx, udp_tx) = self.sockets.queue_lengths();
|
||||
|
||||
self.queue_lengths
|
||||
.record(udp_rx as u64, &[otel::attr::queue_item_gro_batch()]);
|
||||
self.queue_lengths
|
||||
.record(udp_tx as u64, &[otel::attr::queue_item_gso_batch()]);
|
||||
}
|
||||
}
|
||||
|
||||
fn is_max_wg_packet_size(d: &DatagramIn) -> bool {
|
||||
@@ -545,5 +582,9 @@ mod tests {
|
||||
fn name(&self) -> &str {
|
||||
"dummy"
|
||||
}
|
||||
|
||||
fn queue_lengths(&self) -> (usize, usize) {
|
||||
(0, 0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -117,6 +117,22 @@ impl Sockets {
|
||||
|
||||
Poll::Ready(Ok(iter))
|
||||
}
|
||||
|
||||
pub fn queue_lengths(&self) -> (usize, usize) {
|
||||
let (v4_inbound, v4_outbound) = self
|
||||
.socket_v4
|
||||
.as_ref()
|
||||
.map(|s| s.queue_lengths())
|
||||
.unwrap_or_default();
|
||||
|
||||
let (v6_inbound, v6_outbound) = self
|
||||
.socket_v6
|
||||
.as_ref()
|
||||
.map(|s| s.queue_lengths())
|
||||
.unwrap_or_default();
|
||||
|
||||
(v4_inbound + v6_inbound, v4_outbound + v6_outbound)
|
||||
}
|
||||
}
|
||||
|
||||
struct PacketIter<T4, T6> {
|
||||
@@ -333,6 +349,13 @@ impl ThreadedUdpSocket {
|
||||
fn channels_mut(&mut self) -> Result<&mut Channels> {
|
||||
self.channels.as_mut().context("Missing channels")
|
||||
}
|
||||
|
||||
fn queue_lengths(&self) -> (usize, usize) {
|
||||
self.channels
|
||||
.as_ref()
|
||||
.map(|c| (c.inbound_rx.len(), c.outbound_tx.len()))
|
||||
.unwrap_or_default()
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ThreadedUdpSocket {
|
||||
|
||||
221
rust/deny.toml
221
rust/deny.toml
@@ -23,13 +23,13 @@
|
||||
# dependencies not shared by any other crates, would be ignored, as the target
|
||||
# list here is effectively saying which targets you are building for.
|
||||
targets = [
|
||||
# The triple can be any string, but only the target triples built in to
|
||||
# rustc (as of 1.40) can be checked against actual config expressions
|
||||
#"x86_64-unknown-linux-musl",
|
||||
# You can also specify which target_features you promise are enabled for a
|
||||
# particular target. target_features are currently not validated against
|
||||
# the actual valid features supported by the target architecture.
|
||||
#{ triple = "wasm32-unknown-unknown", features = ["atomics"] },
|
||||
# The triple can be any string, but only the target triples built in to
|
||||
# rustc (as of 1.40) can be checked against actual config expressions
|
||||
#"x86_64-unknown-linux-musl",
|
||||
# You can also specify which target_features you promise are enabled for a
|
||||
# particular target. target_features are currently not validated against
|
||||
# the actual valid features supported by the target architecture.
|
||||
#{ triple = "wasm32-unknown-unknown", features = ["atomics"] },
|
||||
]
|
||||
# When creating the dependency graph used as the source of truth when checks are
|
||||
# executed, this field can be used to prune crates from the graph, removing them
|
||||
@@ -70,31 +70,31 @@ feature-depth = 1
|
||||
# A list of advisory IDs to ignore. Note that ignored advisories will still
|
||||
# output a note when they are encountered.
|
||||
ignore = [
|
||||
"RUSTSEC-2020-0095", # `difference` is unmaintained
|
||||
"RUSTSEC-2024-0384", # `instant` is unmaintained
|
||||
"RUSTSEC-2024-0370", # `proc-macro-error` is unmaintained
|
||||
"RUSTSEC-2020-0095", # `difference` is unmaintained
|
||||
"RUSTSEC-2024-0384", # `instant` is unmaintained
|
||||
"RUSTSEC-2024-0370", # `proc-macro-error` is unmaintained
|
||||
|
||||
# `gtk-rs` crates are unmaintained
|
||||
"RUSTSEC-2024-0411",
|
||||
"RUSTSEC-2024-0412",
|
||||
"RUSTSEC-2024-0413",
|
||||
"RUSTSEC-2024-0414",
|
||||
"RUSTSEC-2024-0415",
|
||||
"RUSTSEC-2024-0416",
|
||||
"RUSTSEC-2024-0417",
|
||||
"RUSTSEC-2024-0418",
|
||||
"RUSTSEC-2024-0419",
|
||||
"RUSTSEC-2024-0420",
|
||||
# `gtk-rs` crates are unmaintained
|
||||
"RUSTSEC-2024-0411",
|
||||
"RUSTSEC-2024-0412",
|
||||
"RUSTSEC-2024-0413",
|
||||
"RUSTSEC-2024-0414",
|
||||
"RUSTSEC-2024-0415",
|
||||
"RUSTSEC-2024-0416",
|
||||
"RUSTSEC-2024-0417",
|
||||
"RUSTSEC-2024-0418",
|
||||
"RUSTSEC-2024-0419",
|
||||
"RUSTSEC-2024-0420",
|
||||
|
||||
"RUSTSEC-2025-0012", # backoff, See #8386
|
||||
"RUSTSEC-2024-0436", # paste, See #8387
|
||||
"RUSTSEC-2025-0012", # backoff, See #8386
|
||||
"RUSTSEC-2024-0436", # paste, See #8387
|
||||
|
||||
"RUSTSEC-2024-0429", # `glib`, need to wait for tauri to upgrade
|
||||
"RUSTSEC-2024-0429", # `glib`, need to wait for tauri to upgrade
|
||||
|
||||
#"RUSTSEC-0000-0000",
|
||||
#{ id = "RUSTSEC-0000-0000", reason = "you can specify a reason the advisory is ignored" },
|
||||
#"a-crate-that-is-yanked@0.1.1", # you can also ignore yanked crate versions if you wish
|
||||
#{ crate = "a-crate-that-is-yanked@0.1.1", reason = "you can specify why you are ignoring the yanked crate" },
|
||||
#"RUSTSEC-0000-0000",
|
||||
#{ id = "RUSTSEC-0000-0000", reason = "you can specify a reason the advisory is ignored" },
|
||||
#"a-crate-that-is-yanked@0.1.1", # you can also ignore yanked crate versions if you wish
|
||||
#{ crate = "a-crate-that-is-yanked@0.1.1", reason = "you can specify why you are ignoring the yanked crate" },
|
||||
]
|
||||
# If this is true, then cargo deny will use the git executable to fetch advisory database.
|
||||
# If this is false, then it uses a built-in git library.
|
||||
@@ -110,19 +110,19 @@ ignore = [
|
||||
# See https://spdx.org/licenses/ for list of possible licenses
|
||||
# [possible values: any SPDX 3.11 short identifier (+ optional exception)].
|
||||
allow = [
|
||||
"MIT",
|
||||
"Apache-2.0",
|
||||
"Apache-2.0 WITH LLVM-exception",
|
||||
"BSD-2-Clause",
|
||||
"BSD-3-Clause",
|
||||
"MPL-2.0",
|
||||
"ISC",
|
||||
"0BSD",
|
||||
"Unicode-3.0",
|
||||
"BSL-1.0",
|
||||
"Zlib",
|
||||
"CDLA-Permissive-2.0",
|
||||
"NCSA",
|
||||
"MIT",
|
||||
"Apache-2.0",
|
||||
"Apache-2.0 WITH LLVM-exception",
|
||||
"BSD-2-Clause",
|
||||
"BSD-3-Clause",
|
||||
"MPL-2.0",
|
||||
"ISC",
|
||||
"0BSD",
|
||||
"Unicode-3.0",
|
||||
"BSL-1.0",
|
||||
"Zlib",
|
||||
"CDLA-Permissive-2.0",
|
||||
"NCSA",
|
||||
]
|
||||
# The confidence threshold for detecting a license from license text.
|
||||
# The higher the value, the more closely the license text must be to the
|
||||
@@ -132,9 +132,9 @@ confidence-threshold = 0.8
|
||||
# Allow 1 or more licenses on a per-crate basis, so that particular licenses
|
||||
# aren't accepted for every possible crate as with the normal allow list
|
||||
exceptions = [
|
||||
# Each entry is the crate and version constraint, and its specific allow
|
||||
# list
|
||||
#{ allow = ["Zlib"], crate = "adler32" },
|
||||
# Each entry is the crate and version constraint, and its specific allow
|
||||
# list
|
||||
#{ allow = ["Zlib"], crate = "adler32" },
|
||||
]
|
||||
|
||||
# Some crates don't have (easily) machine readable licensing information,
|
||||
@@ -146,8 +146,8 @@ crate = "ring"
|
||||
# The SPDX expression for the license requirements of the crate
|
||||
expression = "MIT AND ISC AND OpenSSL"
|
||||
license-files = [
|
||||
# Each entry is a crate relative path, and the (opaque) hash of its contents
|
||||
{ path = "LICENSE", hash = 0xbd0eed23 },
|
||||
# Each entry is a crate relative path, and the (opaque) hash of its contents
|
||||
{ path = "LICENSE", hash = 0xbd0eed23 },
|
||||
]
|
||||
|
||||
[licenses.private]
|
||||
@@ -165,7 +165,7 @@ ignore = true
|
||||
# is only published to private registries, and ignore is true, the crate will
|
||||
# not have its license(s) checked
|
||||
registries = [
|
||||
#"https://sekretz.com/registry
|
||||
#"https://sekretz.com/registry
|
||||
]
|
||||
|
||||
# This section is considered when running `cargo deny check bans`.
|
||||
@@ -192,16 +192,16 @@ workspace-default-features = "allow"
|
||||
external-default-features = "allow"
|
||||
# List of crates that are allowed. Use with care!
|
||||
allow = [
|
||||
#"ansi_term@0.11.0",
|
||||
#{ crate = "ansi_term@0.11.0", reason = "you can specify a reason it is allowed" },
|
||||
#"ansi_term@0.11.0",
|
||||
#{ crate = "ansi_term@0.11.0", reason = "you can specify a reason it is allowed" },
|
||||
]
|
||||
# List of crates to deny
|
||||
deny = [
|
||||
#"ansi_term@0.11.0",
|
||||
#{ crate = "ansi_term@0.11.0", reason = "you can specify a reason it is banned" },
|
||||
# Wrapper crates can optionally be specified to allow the crate when it
|
||||
# is a direct dependency of the otherwise banned crate
|
||||
#{ crate = "ansi_term@0.11.0", wrappers = ["this-crate-directly-depends-on-ansi_term"] },
|
||||
#"ansi_term@0.11.0",
|
||||
#{ crate = "ansi_term@0.11.0", reason = "you can specify a reason it is banned" },
|
||||
# Wrapper crates can optionally be specified to allow the crate when it
|
||||
# is a direct dependency of the otherwise banned crate
|
||||
#{ crate = "ansi_term@0.11.0", wrappers = ["this-crate-directly-depends-on-ansi_term"] },
|
||||
]
|
||||
|
||||
# List of features to allow/deny
|
||||
@@ -228,69 +228,68 @@ deny = [
|
||||
#exact = true
|
||||
|
||||
skip = [
|
||||
"base64",
|
||||
"bitflags",
|
||||
"core-foundation",
|
||||
"core-graphics",
|
||||
"core-graphics-types",
|
||||
"derive_more",
|
||||
"getrandom",
|
||||
"hashbrown",
|
||||
"heck",
|
||||
"hermit-abi",
|
||||
"indexmap",
|
||||
"itertools",
|
||||
"libloading",
|
||||
"linux-raw-sys",
|
||||
"nix",
|
||||
"nu-ansi-term",
|
||||
"phf",
|
||||
"phf_codegen",
|
||||
"phf_generator",
|
||||
"phf_macros",
|
||||
"phf_shared",
|
||||
"proc-macro-crate",
|
||||
"quick-xml",
|
||||
"rand",
|
||||
"rand_chacha",
|
||||
"rand_core",
|
||||
"raw-window-handle",
|
||||
"redox_users",
|
||||
"regex-automata",
|
||||
"regex-syntax",
|
||||
"rustix",
|
||||
"siphasher",
|
||||
"socket2",
|
||||
"syn",
|
||||
"thiserror",
|
||||
"thiserror-impl",
|
||||
"toml",
|
||||
"toml_edit",
|
||||
"tower",
|
||||
"wasi",
|
||||
"webpki-roots",
|
||||
"windows-sys",
|
||||
"windows-targets",
|
||||
"windows_aarch64_gnullvm",
|
||||
"windows_aarch64_msvc",
|
||||
"windows_i686_gnu",
|
||||
"windows_i686_gnullvm",
|
||||
"windows_i686_msvc",
|
||||
"windows_x86_64_gnu",
|
||||
"windows_x86_64_gnullvm",
|
||||
"windows_x86_64_msvc",
|
||||
"winnow",
|
||||
"winreg",
|
||||
#"ansi_term@0.11.0",
|
||||
#{ crate = "ansi_term@0.11.0", reason = "you can specify a reason why it can't be updated/removed" },
|
||||
"base64",
|
||||
"bitflags",
|
||||
"core-foundation",
|
||||
"core-graphics",
|
||||
"core-graphics-types",
|
||||
"derive_more",
|
||||
"getrandom",
|
||||
"hashbrown",
|
||||
"heck",
|
||||
"indexmap",
|
||||
"itertools",
|
||||
"libloading",
|
||||
"linux-raw-sys",
|
||||
"nix",
|
||||
"nu-ansi-term",
|
||||
"phf",
|
||||
"phf_codegen",
|
||||
"phf_generator",
|
||||
"phf_macros",
|
||||
"phf_shared",
|
||||
"proc-macro-crate",
|
||||
"quick-xml",
|
||||
"rand",
|
||||
"rand_chacha",
|
||||
"rand_core",
|
||||
"raw-window-handle",
|
||||
"redox_users",
|
||||
"regex-automata",
|
||||
"regex-syntax",
|
||||
"rustix",
|
||||
"siphasher",
|
||||
"socket2",
|
||||
"syn",
|
||||
"thiserror",
|
||||
"thiserror-impl",
|
||||
"toml",
|
||||
"toml_edit",
|
||||
"tower",
|
||||
"wasi",
|
||||
"webpki-roots",
|
||||
"windows-sys",
|
||||
"windows-targets",
|
||||
"windows_aarch64_gnullvm",
|
||||
"windows_aarch64_msvc",
|
||||
"windows_i686_gnu",
|
||||
"windows_i686_gnullvm",
|
||||
"windows_i686_msvc",
|
||||
"windows_x86_64_gnu",
|
||||
"windows_x86_64_gnullvm",
|
||||
"windows_x86_64_msvc",
|
||||
"winnow",
|
||||
"winreg",
|
||||
#"ansi_term@0.11.0",
|
||||
#{ crate = "ansi_term@0.11.0", reason = "you can specify a reason why it can't be updated/removed" },
|
||||
] # Certain crates/versions that will be skipped when doing duplicate detection.
|
||||
# Similarly to `skip` allows you to skip certain crates during duplicate
|
||||
# detection. Unlike skip, it also includes the entire tree of transitive
|
||||
# dependencies starting at the specified crate, up to a certain depth, which is
|
||||
# by default infinite.
|
||||
skip-tree = [
|
||||
#"ansi_term@0.11.0", # will be skipped along with _all_ of its direct and transitive dependencies
|
||||
#{ crate = "ansi_term@0.11.0", depth = 20 },
|
||||
#"ansi_term@0.11.0", # will be skipped along with _all_ of its direct and transitive dependencies
|
||||
#{ crate = "ansi_term@0.11.0", depth = 20 },
|
||||
]
|
||||
|
||||
# This section is considered when running `cargo deny check sources`.
|
||||
|
||||
@@ -25,7 +25,6 @@ ip-packet = { workspace = true }
|
||||
ip_network = { workspace = true }
|
||||
libc = { workspace = true, features = ["std", "const-extern-fn", "extra_traits"] }
|
||||
moka = { workspace = true, features = ["future"] }
|
||||
num_cpus = { workspace = true }
|
||||
opentelemetry = { workspace = true, features = ["metrics"] }
|
||||
opentelemetry-otlp = { workspace = true, features = ["metrics", "grpc-tonic"] }
|
||||
opentelemetry-stdout = { workspace = true, features = ["metrics"] }
|
||||
|
||||
@@ -20,9 +20,9 @@ use phoenix_channel::get_user_agent;
|
||||
use futures::{TryFutureExt, future};
|
||||
use phoenix_channel::PhoenixChannel;
|
||||
use secrecy::Secret;
|
||||
use std::pin::pin;
|
||||
use std::process::ExitCode;
|
||||
use std::{collections::BTreeSet, path::Path};
|
||||
use std::{fmt, pin::pin};
|
||||
use std::{process::ExitCode, str::FromStr};
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use tokio::signal::ctrl_c;
|
||||
use tracing_subscriber::layer;
|
||||
@@ -182,7 +182,7 @@ async fn try_main(cli: Cli, telemetry: &mut Telemetry) -> Result<()> {
|
||||
)
|
||||
.context("Failed to resolve portal URL")?;
|
||||
|
||||
let mut tun_device_manager = TunDeviceManager::new(ip_packet::MAX_IP_SIZE, cli.tun_threads.0)
|
||||
let mut tun_device_manager = TunDeviceManager::new(ip_packet::MAX_IP_SIZE)
|
||||
.context("Failed to create TUN device manager")?;
|
||||
let tun = tun_device_manager
|
||||
.make_tun()
|
||||
@@ -274,10 +274,6 @@ struct Cli {
|
||||
#[arg(short = 'i', long, env = "FIREZONE_ID")]
|
||||
firezone_id: Option<String>,
|
||||
|
||||
/// How many threads to use for reading and writing to the TUN device.
|
||||
#[arg(long, env = "FIREZONE_NUM_TUN_THREADS", default_value_t)]
|
||||
tun_threads: NumThreads,
|
||||
|
||||
/// Where to export metrics to.
|
||||
///
|
||||
/// This configuration option is private API and has no stability guarantees.
|
||||
@@ -316,33 +312,6 @@ impl Cli {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
struct NumThreads(pub usize);
|
||||
|
||||
impl Default for NumThreads {
|
||||
fn default() -> Self {
|
||||
if num_cpus::get() < 4 {
|
||||
return Self(1);
|
||||
}
|
||||
|
||||
Self(2)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for NumThreads {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{}", self.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for NumThreads {
|
||||
type Err = <usize as FromStr>::Err;
|
||||
|
||||
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
|
||||
Ok(Self(s.parse()?))
|
||||
}
|
||||
}
|
||||
|
||||
/// An adapter struct around [`Tun`] that validates IPv4, UDP and TCP checksums.
|
||||
struct ValidateChecksumAdapter {
|
||||
inner: Box<dyn Tun>,
|
||||
@@ -403,6 +372,10 @@ impl Tun for ValidateChecksumAdapter {
|
||||
fn name(&self) -> &str {
|
||||
self.inner.name()
|
||||
}
|
||||
|
||||
fn queue_lengths(&self) -> (usize, usize) {
|
||||
self.inner.queue_lengths()
|
||||
}
|
||||
}
|
||||
|
||||
impl ValidateChecksumAdapter {
|
||||
|
||||
@@ -290,7 +290,7 @@ impl<'a> Handler<'a> {
|
||||
.next_client_split()
|
||||
.await
|
||||
.context("Failed to wait for incoming IPC connection from a GUI")?;
|
||||
let tun_device = TunDeviceManager::new(ip_packet::MAX_IP_SIZE, 1)?;
|
||||
let tun_device = TunDeviceManager::new(ip_packet::MAX_IP_SIZE)?;
|
||||
let dns_notifier = new_dns_notifier().await?.boxed();
|
||||
let network_notifier = new_network_notifier().await?.boxed();
|
||||
|
||||
|
||||
@@ -276,7 +276,7 @@ fn main() -> Result<()> {
|
||||
let mut terminate = signals::Terminate::new()?;
|
||||
let mut hangup = signals::Hangup::new()?;
|
||||
|
||||
let mut tun_device = TunDeviceManager::new(ip_packet::MAX_IP_SIZE, 1)?;
|
||||
let mut tun_device = TunDeviceManager::new(ip_packet::MAX_IP_SIZE)?;
|
||||
|
||||
let tokio_handle = tokio::runtime::Handle::current();
|
||||
|
||||
|
||||
@@ -85,6 +85,18 @@ pub mod attr {
|
||||
KeyValue::new("error.type", value)
|
||||
}
|
||||
|
||||
pub fn queue_item_ip_packet() -> KeyValue {
|
||||
KeyValue::new("queue.item", "ip-packet")
|
||||
}
|
||||
|
||||
pub fn queue_item_gro_batch() -> KeyValue {
|
||||
KeyValue::new("queue.item", "udp-gro-batch")
|
||||
}
|
||||
|
||||
pub fn queue_item_gso_batch() -> KeyValue {
|
||||
KeyValue::new("queue.item", "udp-gso-batch")
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -22,7 +22,13 @@ export default function Gateway() {
|
||||
|
||||
return (
|
||||
<Entries downloadLinks={downloadLinks} title="Gateway">
|
||||
<Unreleased></Unreleased>
|
||||
<Unreleased>
|
||||
<ChangeItem pull="10231">
|
||||
Remove the FIREZONE_NUM_TUN_THREADS env variable. The Gateway will now
|
||||
always default to a single TUN thread. Using multiple threads can
|
||||
cause packet reordering which hurts TCP throughput performance.
|
||||
</ChangeItem>
|
||||
</Unreleased>
|
||||
<Entry version="1.4.15" date={new Date("2025-08-05")}>
|
||||
<ChangeItem pull="10109">
|
||||
Fixes an issue where connections would fail to establish in
|
||||
|
||||
Reference in New Issue
Block a user