diff --git a/rust/bin-shared/src/tun_device_manager.rs b/rust/bin-shared/src/tun_device_manager.rs index 185dee436..725107c44 100644 --- a/rust/bin-shared/src/tun_device_manager.rs +++ b/rust/bin-shared/src/tun_device_manager.rs @@ -12,121 +12,3 @@ pub use windows as platform; #[cfg(any(target_os = "linux", target_os = "windows"))] pub use platform::TunDeviceManager; - -#[cfg(test)] -#[cfg(any(target_os = "linux", target_os = "windows"))] -mod tests { - use super::*; - use ip_network::Ipv4Network; - use socket_factory::DatagramOut; - use std::{ - net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4}, - time::Duration, - }; - use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _}; - - #[tokio::test] - #[ignore = "Needs admin / sudo and Internet"] - async fn tunnel() { - let _guard = firezone_logging::test("debug"); - - no_packet_loops_tcp().await; - no_packet_loops_udp().await; - tunnel_drop(); - } - - // Starts up a WinTun device, claims all routes, and checks if we can still make - // TCP connections outside of our tunnel. - async fn no_packet_loops_tcp() { - let ipv4 = Ipv4Addr::from([100, 90, 215, 97]); - let ipv6 = Ipv6Addr::from([0xfd00, 0x2021, 0x1111, 0x0, 0x0, 0x0, 0x0016, 0x588f]); - - let mut device_manager = TunDeviceManager::new(1280, 1).unwrap(); - let _tun = device_manager.make_tun().unwrap(); - device_manager.set_ips(ipv4, ipv6).await.unwrap(); - - // Configure `0.0.0.0/0` route. - device_manager - .set_routes( - vec![Ipv4Network::new(Ipv4Addr::UNSPECIFIED, 0).unwrap()], - vec![], - ) - .await - .unwrap(); - - let remote = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::from([1, 1, 1, 1]), 80)); - let socket = crate::platform::tcp_socket_factory(&remote).unwrap(); - let mut stream = socket.connect(remote).await.unwrap(); - - // Send an HTTP request - stream.write_all("GET /\r\n\r\n".as_bytes()).await.unwrap(); - let mut bytes = vec![]; - stream.read_to_end(&mut bytes).await.unwrap(); - let s = String::from_utf8(bytes).unwrap(); - assert_eq!(s, "\r\n400 Bad Request\r\n\r\n

400 Bad Request

\r\n
cloudflare
\r\n\r\n\r\n"); - } - - // Starts up a WinTUN device, adds a "full-route" (`0.0.0.0/0`), and checks if we can still send packets to IPs outside of our tunnel. - async fn no_packet_loops_udp() { - let ipv4 = Ipv4Addr::from([100, 90, 215, 97]); - let ipv6 = Ipv6Addr::from([0xfd00, 0x2021, 0x1111, 0x0, 0x0, 0x0, 0x0016, 0x588f]); - - let mut device_manager = TunDeviceManager::new(1280, 1).unwrap(); - let _tun = device_manager.make_tun().unwrap(); - device_manager.set_ips(ipv4, ipv6).await.unwrap(); - - // Configure `0.0.0.0/0` route. - device_manager - .set_routes( - vec![Ipv4Network::new(Ipv4Addr::UNSPECIFIED, 0).unwrap()], - vec![], - ) - .await - .unwrap(); - - // Make a socket. - let mut socket = crate::platform::udp_socket_factory(&SocketAddr::V4(SocketAddrV4::new( - Ipv4Addr::UNSPECIFIED, - 0, - ))) - .unwrap(); - - std::future::poll_fn(|cx| socket.poll_send_ready(cx)) - .await - .unwrap(); - - // Send a STUN request. - socket - .send(DatagramOut { - src: None, - dst: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(141, 101, 90, 0), 3478)), // stun.cloudflare.com, - packet: &hex_literal::hex!("000100002112A4420123456789abcdef01234567").as_ref(), - segment_size: None, - }) - .unwrap(); - - let task = std::future::poll_fn(|cx| { - let mut buf = [0u8; 1000]; - let result = std::task::ready!(socket.poll_recv_from(&mut buf, cx)); - - let _response = result.unwrap().next().unwrap(); - - std::task::Poll::Ready(()) - }); - - tokio::time::timeout(Duration::from_secs(10), task) - .await - .unwrap(); - } - - /// Checks for regressions in issue #4765, un-initializing Wintun - /// Redundant but harmless on Linux. - fn tunnel_drop() { - let mut tun_device_manager = TunDeviceManager::new(1280, 1).unwrap(); - - // Each cycle takes about half a second, so this will take a fair bit to run. - for _ in 0..50 { - let _tun = tun_device_manager.make_tun().unwrap(); // This will panic if we don't correctly clean-up the wintun interface. - } - } -} diff --git a/rust/bin-shared/src/tun_device_manager/windows.rs b/rust/bin-shared/src/tun_device_manager/windows.rs index 023fe11f3..96ff8d506 100644 --- a/rust/bin-shared/src/tun_device_manager/windows.rs +++ b/rust/bin-shared/src/tun_device_manager/windows.rs @@ -7,8 +7,9 @@ use ip_network::{IpNetwork, Ipv4Network, Ipv6Network}; use ip_packet::{IpPacket, IpPacketBuf}; use ring::digest; use std::net::IpAddr; +use std::sync::Weak; use std::task::ready; -use std::time::Duration; +use std::time::{Duration, Instant}; use std::{ collections::HashSet, io::{self, Read as _}, @@ -193,38 +194,60 @@ pub struct Tun { /// The index of our network adapter, we can use this when asking Windows to add / remove routes / DNS rules /// It's stable across app restarts and I'm assuming across system reboots too. iface_idx: u32, - outbound_tx: PollSender, - inbound_rx: mpsc::Receiver, + luid: wintun::NET_LUID_LH, + + state: Option, + send_thread: Option>, recv_thread: Option>, - luid: wintun::NET_LUID_LH, +} + +/// All state relevant to the WinTUN device. +struct TunState { + #[expect( + dead_code, + reason = "The send/recv threads have `Weak` references to this which we need to keep alive" + )] + session: Arc, + + outbound_tx: PollSender, + inbound_rx: mpsc::Receiver, } impl Drop for Tun { fn drop(&mut self) { - tracing::debug!( - channel_capacity = self.inbound_rx.capacity(), - "Shutting down packet channel..." - ); - - // Close channels to avoid deadlocks, see . - self.outbound_tx.close(); - self.inbound_rx.close(); - - if let Err(error) = self + let recv_thread = self .recv_thread .take() - .expect("`recv_thread` should always be `Some` until `Tun` drops") - .join() - { - tracing::error!("`Tun::recv_thread` panicked: {error:?}"); - } - if let Err(error) = self + .expect("`recv_thread` should always be `Some` until `Tun` drops"); + + let send_thread = self .send_thread .take() - .expect("`send_thread` should always be `Some` until `Tun` drops") - .join() - { + .expect("`send_thread` should always be `Some` until `Tun` drops"); + + let _ = self.state.take(); // Drop all channel / tunnel state, allowing the worker threads to exit gracefully. + + let start = Instant::now(); + + while !recv_thread.is_finished() || !send_thread.is_finished() { + std::thread::sleep(Duration::from_millis(100)); + + if start.elapsed() > Duration::from_secs(5) { + tracing::warn!(recv_thread_finished = %recv_thread.is_finished(), send_thread_finished = %send_thread.is_finished(), "TUN worker threads did not exit gracefully in 5s"); + return; + } + } + + tracing::debug!( + "Worker threads exited gracefully after {:?}", + start.elapsed() + ); + + if let Err(error) = recv_thread.join() { + tracing::error!("`Tun::recv_thread` panicked: {error:?}"); + } + if let Err(error) = send_thread.join() { tracing::error!("`Tun::send_thread` panicked: {error:?}"); } } @@ -258,18 +281,21 @@ impl Tun { ); let (outbound_tx, outbound_rx) = mpsc::channel(1000); let (inbound_tx, inbound_rx) = mpsc::channel(1000); // We want to be able to batch-receive from this. - let send_thread = start_send_thread(outbound_rx, Arc::clone(&session)) + let send_thread = start_send_thread(outbound_rx, Arc::downgrade(&session)) .context("Failed to start send thread")?; - let recv_thread = start_recv_thread(inbound_tx, Arc::clone(&session)) + let recv_thread = start_recv_thread(inbound_tx, Arc::downgrade(&session)) .context("Failed to start recv thread")?; Ok(Self { iface_idx, luid, + state: Some(TunState { + session, + outbound_tx: PollSender::new(outbound_tx), + inbound_rx, + }), send_thread: Some(send_thread), recv_thread: Some(recv_thread), - outbound_tx: PollSender::new(outbound_tx), - inbound_rx, }) } @@ -286,7 +312,11 @@ impl tun::Tun for Tun { buf: &mut Vec, max: usize, ) -> Poll { - self.inbound_rx.poll_recv_many(cx, buf, max) + self.state + .as_mut() + .expect("`tun_state` to always be `Some` until we drop") + .inbound_rx + .poll_recv_many(cx, buf, max) } fn name(&self) -> &str { @@ -296,6 +326,9 @@ impl tun::Tun for Tun { /// Check if more packets can be sent. fn poll_send_ready(&mut self, cx: &mut Context) -> Poll> { ready!(self + .state + .as_mut() + .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Internal state gone"))? .outbound_tx .poll_reserve(cx) .map_err(io::Error::other)?); @@ -305,7 +338,10 @@ impl tun::Tun for Tun { /// Send a packet. fn send(&mut self, packet: IpPacket) -> io::Result<()> { - self.outbound_tx + self.state + .as_mut() + .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Internal state gone"))? + .outbound_tx .send_item(packet) .map_err(io::Error::other)?; @@ -316,67 +352,77 @@ impl tun::Tun for Tun { // Moves packets from Internet towards the user fn start_send_thread( mut packet_rx: mpsc::Receiver, - session: Arc, + session: Weak, ) -> io::Result> { // See . const ERROR_BUFFER_OVERFLOW: i32 = 0x6F; std::thread::Builder::new() .name("TUN send".into()) - .spawn(move || { - 'recv: loop { - let Some(packet) = packet_rx.blocking_recv() else { - tracing::info!( - "Stopping TUN send worker thread because the packet channel closed" + .spawn(move || loop { + let Some(packet) = packet_rx.blocking_recv() else { + tracing::debug!( + "Stopping TUN send worker thread because the packet channel closed" + ); + break; + }; + + let bytes = packet.packet(); + + let Ok(len) = bytes.len().try_into() else { + tracing::warn!("Packet too large; length does not fit into u16"); + continue; + }; + + loop { + let Some(session) = session.upgrade() else { + tracing::debug!( + "Stopping TUN send worker thread because the `wintun::Session` was dropped" ); - break; + return; }; - let bytes = packet.packet(); - - let Ok(len) = bytes.len().try_into() else { - tracing::warn!("Packet too large; length does not fit into u16"); - continue; - }; - - let mut pkt = loop { - match session.allocate_send_packet(len) { - Ok(pkt) => break pkt, - Err(wintun::Error::Io(e)) - if e.raw_os_error() - .is_some_and(|code| code == ERROR_BUFFER_OVERFLOW) => - { - tracing::debug!("WinTUN ring buffer is full"); - std::thread::sleep(Duration::from_millis(10)); // Suspend for a bit to avoid busy-looping. - } - Err(e) => { - tracing::error!("Failed to allocate WinTUN packet: {e}"); - continue 'recv; - } + match session.allocate_send_packet(len) { + Ok(mut pkt) => { + pkt.bytes_mut().copy_from_slice(bytes); + // `send_packet` cannot fail to enqueue the packet, since we already allocated + // space in the ring buffer. + session.send_packet(pkt); } - }; - - pkt.bytes_mut().copy_from_slice(bytes); - // `send_packet` cannot fail to enqueue the packet, since we already allocated - // space in the ring buffer. - session.send_packet(pkt); + Err(wintun::Error::Io(e)) + if e.raw_os_error() + .is_some_and(|code| code == ERROR_BUFFER_OVERFLOW) => + { + tracing::debug!("WinTUN ring buffer is full"); + std::thread::sleep(Duration::from_millis(10)); // Suspend for a bit to avoid busy-looping. + } + Err(e) => { + tracing::error!("Failed to allocate WinTUN packet: {e}"); + break; + } + } } }) } fn start_recv_thread( packet_tx: mpsc::Sender, - session: Arc, + session: Weak, ) -> io::Result> { std::thread::Builder::new() .name("TUN recv".into()) .spawn(move || loop { - let pkt = match session.receive_blocking() { + let Some(receive_result) = session.upgrade().map(|s| s.receive_blocking()) else { + tracing::debug!( + "Stopping TUN recv worker thread because the `wintun::Session` was dropped" + ); + break; + }; + + let pkt = match receive_result { Ok(pkt) => pkt, Err(wintun::Error::ShuttingDown) => { - tracing::info!( - "Stopping outbound worker thread because Wintun is shutting down" - ); + tracing::debug!("Stopping recv worker thread because Wintun is shutting down"); break; } Err(e) => { @@ -413,8 +459,8 @@ fn start_recv_thread( match packet_tx.blocking_send(pkt) { Ok(()) => {} Err(_) => { - tracing::info!( - "Stopping outbound worker thread because the packet channel closed" + tracing::debug!( + "Stopping TUN recv worker thread because the packet channel closed" ); break; } diff --git a/rust/bin-shared/tests/no_packet_loops_tcp.rs b/rust/bin-shared/tests/no_packet_loops_tcp.rs new file mode 100644 index 000000000..6a63245e6 --- /dev/null +++ b/rust/bin-shared/tests/no_packet_loops_tcp.rs @@ -0,0 +1,41 @@ +#![allow(clippy::unwrap_used)] + +use firezone_bin_shared::{platform::tcp_socket_factory, TunDeviceManager}; +use ip_network::Ipv4Network; +use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4}; +use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _}; + +// Starts up a WinTun device, claims all routes, and checks if we can still make +// TCP connections outside of our tunnel. +#[tokio::test] +#[ignore = "Needs admin / sudo and Internet"] +async fn no_packet_loops_tcp() { + firezone_logging::test_global("debug"); // `Tun` uses threads and we want to see the logs of all threads. + + let ipv4 = Ipv4Addr::from([100, 90, 215, 97]); + let ipv6 = Ipv6Addr::from([0xfd00, 0x2021, 0x1111, 0x0, 0x0, 0x0, 0x0016, 0x588f]); + + let mut device_manager = TunDeviceManager::new(1280, 1).unwrap(); + let _tun = device_manager.make_tun().unwrap(); + device_manager.set_ips(ipv4, ipv6).await.unwrap(); + + // Configure `0.0.0.0/0` route. + device_manager + .set_routes( + vec![Ipv4Network::new(Ipv4Addr::UNSPECIFIED, 0).unwrap()], + vec![], + ) + .await + .unwrap(); + + let remote = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::from([1, 1, 1, 1]), 80)); + let socket = tcp_socket_factory(&remote).unwrap(); + let mut stream = socket.connect(remote).await.unwrap(); + + // Send an HTTP request + stream.write_all("GET /\r\n\r\n".as_bytes()).await.unwrap(); + let mut bytes = vec![]; + stream.read_to_end(&mut bytes).await.unwrap(); + let s = String::from_utf8(bytes).unwrap(); + assert_eq!(s, "\r\n400 Bad Request\r\n\r\n

400 Bad Request

\r\n
cloudflare
\r\n\r\n\r\n"); +} diff --git a/rust/bin-shared/tests/no_packet_loops_udp.rs b/rust/bin-shared/tests/no_packet_loops_udp.rs new file mode 100644 index 000000000..d0ee47867 --- /dev/null +++ b/rust/bin-shared/tests/no_packet_loops_udp.rs @@ -0,0 +1,63 @@ +#![allow(clippy::unwrap_used)] + +use firezone_bin_shared::{platform::udp_socket_factory, TunDeviceManager}; +use ip_network::Ipv4Network; +use socket_factory::DatagramOut; +use std::{ + net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4}, + time::Duration, +}; + +// Starts up a WinTUN device, adds a "full-route" (`0.0.0.0/0`), and checks if we can still send packets to IPs outside of our tunnel. +#[tokio::test] +#[ignore = "Needs admin / sudo and Internet"] +async fn no_packet_loops_udp() { + firezone_logging::test_global("debug"); // `Tun` uses threads and we want to see the logs of all threads. + + let ipv4 = Ipv4Addr::from([100, 90, 215, 97]); + let ipv6 = Ipv6Addr::from([0xfd00, 0x2021, 0x1111, 0x0, 0x0, 0x0, 0x0016, 0x588f]); + + let mut device_manager = TunDeviceManager::new(1280, 1).unwrap(); + let _tun = device_manager.make_tun().unwrap(); + device_manager.set_ips(ipv4, ipv6).await.unwrap(); + + // Configure `0.0.0.0/0` route. + device_manager + .set_routes( + vec![Ipv4Network::new(Ipv4Addr::UNSPECIFIED, 0).unwrap()], + vec![], + ) + .await + .unwrap(); + + // Make a socket. + let mut socket = + udp_socket_factory(&SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))).unwrap(); + + std::future::poll_fn(|cx| socket.poll_send_ready(cx)) + .await + .unwrap(); + + // Send a STUN request. + socket + .send(DatagramOut { + src: None, + dst: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(141, 101, 90, 0), 3478)), // stun.cloudflare.com, + packet: &hex_literal::hex!("000100002112A4420123456789abcdef01234567").as_ref(), + segment_size: None, + }) + .unwrap(); + + let task = std::future::poll_fn(|cx| { + let mut buf = [0u8; 1000]; + let result = std::task::ready!(socket.poll_recv_from(&mut buf, cx)); + + let _response = result.unwrap().next().unwrap(); + + std::task::Poll::Ready(()) + }); + + tokio::time::timeout(Duration::from_secs(10), task) + .await + .unwrap(); +} diff --git a/rust/bin-shared/tests/tunnel_drop.rs b/rust/bin-shared/tests/tunnel_drop.rs new file mode 100644 index 000000000..3cc779ea6 --- /dev/null +++ b/rust/bin-shared/tests/tunnel_drop.rs @@ -0,0 +1,18 @@ +#![allow(clippy::unwrap_used)] + +use firezone_bin_shared::TunDeviceManager; + +/// Checks for regressions in issue #4765, un-initializing Wintun +/// Redundant but harmless on Linux. +#[tokio::test] // Needs a runtime. +#[ignore = "Needs admin / sudo and Internet"] +async fn tunnel_drop() { + firezone_logging::test_global("debug"); // `Tun` uses threads and we want to see the logs of all threads. + + let mut tun_device_manager = TunDeviceManager::new(1280, 1).unwrap(); + + // Each cycle takes about half a second, so this will take a fair bit to run. + for _ in 0..50 { + let _tun = tun_device_manager.make_tun().unwrap(); // This will panic if we don't correctly clean-up the wintun interface. + } +} diff --git a/website/src/components/Changelog/GUI.tsx b/website/src/components/Changelog/GUI.tsx index ef91e3fd0..caad09401 100644 --- a/website/src/components/Changelog/GUI.tsx +++ b/website/src/components/Changelog/GUI.tsx @@ -14,6 +14,12 @@ export default function GUI({ os }: { os: OS }) { Configures the IPC service to log to journald. )} + {os === OS.Windows && ( + + Fixes a dead-lock that could occur during shutdown of the TUN + device if there were still packets queued for sending. + + )} {os === OS.Linux && (