fix(gui-client): mitigate deadlock when shutting down TUN device (#8268)

In #8159, we introduced a regression that could lead to a deadlock when
shutting down the TUN device. Whilst we did close the channel prior to
awaiting the thread to exit, we failed to notice that _another_ instance
of the sender could be alive as part of an internally stored "sending
permit" with the `PollSender` in case another packet is queued for
sending. We need to explicitly call `abort_send` to free that.

Judging from the comment and a prior bug, this shutdown logic has been
buggy before. To further avoid this deadlock, we introduce two changes:

- The worker threads only receive a `Weak` reference to the
`wintun::Session`
- We move all device-related state into a dedicated `TunState` struct
that we can drop prior to joining the threads

The combination of these features means that all strong references to
channels and the session are definitely dropped without having to wait
for anything. To provide a clean and synchronous shutdown, we wait for
at most 5s on the worker-threads. If they don't exit until then, we log
a warning and exit anyway.

This should greatly reduce the risk of future bugs here because the
session (and thus the WinTUN device) gets shutdown in any case and so at
worst, we have a few zombie threads around.

Resolves: #8265
This commit is contained in:
Thomas Eizinger
2025-02-26 11:46:12 +11:00
committed by GitHub
parent 06c51f6840
commit 96170be082
6 changed files with 244 additions and 188 deletions

View File

@@ -12,121 +12,3 @@ pub use windows as platform;
#[cfg(any(target_os = "linux", target_os = "windows"))]
pub use platform::TunDeviceManager;
#[cfg(test)]
#[cfg(any(target_os = "linux", target_os = "windows"))]
mod tests {
use super::*;
use ip_network::Ipv4Network;
use socket_factory::DatagramOut;
use std::{
net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4},
time::Duration,
};
use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
#[tokio::test]
#[ignore = "Needs admin / sudo and Internet"]
async fn tunnel() {
let _guard = firezone_logging::test("debug");
no_packet_loops_tcp().await;
no_packet_loops_udp().await;
tunnel_drop();
}
// Starts up a WinTun device, claims all routes, and checks if we can still make
// TCP connections outside of our tunnel.
async fn no_packet_loops_tcp() {
let ipv4 = Ipv4Addr::from([100, 90, 215, 97]);
let ipv6 = Ipv6Addr::from([0xfd00, 0x2021, 0x1111, 0x0, 0x0, 0x0, 0x0016, 0x588f]);
let mut device_manager = TunDeviceManager::new(1280, 1).unwrap();
let _tun = device_manager.make_tun().unwrap();
device_manager.set_ips(ipv4, ipv6).await.unwrap();
// Configure `0.0.0.0/0` route.
device_manager
.set_routes(
vec![Ipv4Network::new(Ipv4Addr::UNSPECIFIED, 0).unwrap()],
vec![],
)
.await
.unwrap();
let remote = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::from([1, 1, 1, 1]), 80));
let socket = crate::platform::tcp_socket_factory(&remote).unwrap();
let mut stream = socket.connect(remote).await.unwrap();
// Send an HTTP request
stream.write_all("GET /\r\n\r\n".as_bytes()).await.unwrap();
let mut bytes = vec![];
stream.read_to_end(&mut bytes).await.unwrap();
let s = String::from_utf8(bytes).unwrap();
assert_eq!(s, "<html>\r\n<head><title>400 Bad Request</title></head>\r\n<body>\r\n<center><h1>400 Bad Request</h1></center>\r\n<hr><center>cloudflare</center>\r\n</body>\r\n</html>\r\n");
}
// Starts up a WinTUN device, adds a "full-route" (`0.0.0.0/0`), and checks if we can still send packets to IPs outside of our tunnel.
async fn no_packet_loops_udp() {
let ipv4 = Ipv4Addr::from([100, 90, 215, 97]);
let ipv6 = Ipv6Addr::from([0xfd00, 0x2021, 0x1111, 0x0, 0x0, 0x0, 0x0016, 0x588f]);
let mut device_manager = TunDeviceManager::new(1280, 1).unwrap();
let _tun = device_manager.make_tun().unwrap();
device_manager.set_ips(ipv4, ipv6).await.unwrap();
// Configure `0.0.0.0/0` route.
device_manager
.set_routes(
vec![Ipv4Network::new(Ipv4Addr::UNSPECIFIED, 0).unwrap()],
vec![],
)
.await
.unwrap();
// Make a socket.
let mut socket = crate::platform::udp_socket_factory(&SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::UNSPECIFIED,
0,
)))
.unwrap();
std::future::poll_fn(|cx| socket.poll_send_ready(cx))
.await
.unwrap();
// Send a STUN request.
socket
.send(DatagramOut {
src: None,
dst: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(141, 101, 90, 0), 3478)), // stun.cloudflare.com,
packet: &hex_literal::hex!("000100002112A4420123456789abcdef01234567").as_ref(),
segment_size: None,
})
.unwrap();
let task = std::future::poll_fn(|cx| {
let mut buf = [0u8; 1000];
let result = std::task::ready!(socket.poll_recv_from(&mut buf, cx));
let _response = result.unwrap().next().unwrap();
std::task::Poll::Ready(())
});
tokio::time::timeout(Duration::from_secs(10), task)
.await
.unwrap();
}
/// Checks for regressions in issue #4765, un-initializing Wintun
/// Redundant but harmless on Linux.
fn tunnel_drop() {
let mut tun_device_manager = TunDeviceManager::new(1280, 1).unwrap();
// Each cycle takes about half a second, so this will take a fair bit to run.
for _ in 0..50 {
let _tun = tun_device_manager.make_tun().unwrap(); // This will panic if we don't correctly clean-up the wintun interface.
}
}
}

View File

@@ -7,8 +7,9 @@ use ip_network::{IpNetwork, Ipv4Network, Ipv6Network};
use ip_packet::{IpPacket, IpPacketBuf};
use ring::digest;
use std::net::IpAddr;
use std::sync::Weak;
use std::task::ready;
use std::time::Duration;
use std::time::{Duration, Instant};
use std::{
collections::HashSet,
io::{self, Read as _},
@@ -193,38 +194,60 @@ pub struct Tun {
/// The index of our network adapter, we can use this when asking Windows to add / remove routes / DNS rules
/// It's stable across app restarts and I'm assuming across system reboots too.
iface_idx: u32,
outbound_tx: PollSender<IpPacket>,
inbound_rx: mpsc::Receiver<IpPacket>,
luid: wintun::NET_LUID_LH,
state: Option<TunState>,
send_thread: Option<std::thread::JoinHandle<()>>,
recv_thread: Option<std::thread::JoinHandle<()>>,
luid: wintun::NET_LUID_LH,
}
/// All state relevant to the WinTUN device.
struct TunState {
#[expect(
dead_code,
reason = "The send/recv threads have `Weak` references to this which we need to keep alive"
)]
session: Arc<wintun::Session>,
outbound_tx: PollSender<IpPacket>,
inbound_rx: mpsc::Receiver<IpPacket>,
}
impl Drop for Tun {
fn drop(&mut self) {
tracing::debug!(
channel_capacity = self.inbound_rx.capacity(),
"Shutting down packet channel..."
);
// Close channels to avoid deadlocks, see <https://github.com/firezone/firezone/pull/5571>.
self.outbound_tx.close();
self.inbound_rx.close();
if let Err(error) = self
let recv_thread = self
.recv_thread
.take()
.expect("`recv_thread` should always be `Some` until `Tun` drops")
.join()
{
tracing::error!("`Tun::recv_thread` panicked: {error:?}");
}
if let Err(error) = self
.expect("`recv_thread` should always be `Some` until `Tun` drops");
let send_thread = self
.send_thread
.take()
.expect("`send_thread` should always be `Some` until `Tun` drops")
.join()
{
.expect("`send_thread` should always be `Some` until `Tun` drops");
let _ = self.state.take(); // Drop all channel / tunnel state, allowing the worker threads to exit gracefully.
let start = Instant::now();
while !recv_thread.is_finished() || !send_thread.is_finished() {
std::thread::sleep(Duration::from_millis(100));
if start.elapsed() > Duration::from_secs(5) {
tracing::warn!(recv_thread_finished = %recv_thread.is_finished(), send_thread_finished = %send_thread.is_finished(), "TUN worker threads did not exit gracefully in 5s");
return;
}
}
tracing::debug!(
"Worker threads exited gracefully after {:?}",
start.elapsed()
);
if let Err(error) = recv_thread.join() {
tracing::error!("`Tun::recv_thread` panicked: {error:?}");
}
if let Err(error) = send_thread.join() {
tracing::error!("`Tun::send_thread` panicked: {error:?}");
}
}
@@ -258,18 +281,21 @@ impl Tun {
);
let (outbound_tx, outbound_rx) = mpsc::channel(1000);
let (inbound_tx, inbound_rx) = mpsc::channel(1000); // We want to be able to batch-receive from this.
let send_thread = start_send_thread(outbound_rx, Arc::clone(&session))
let send_thread = start_send_thread(outbound_rx, Arc::downgrade(&session))
.context("Failed to start send thread")?;
let recv_thread = start_recv_thread(inbound_tx, Arc::clone(&session))
let recv_thread = start_recv_thread(inbound_tx, Arc::downgrade(&session))
.context("Failed to start recv thread")?;
Ok(Self {
iface_idx,
luid,
state: Some(TunState {
session,
outbound_tx: PollSender::new(outbound_tx),
inbound_rx,
}),
send_thread: Some(send_thread),
recv_thread: Some(recv_thread),
outbound_tx: PollSender::new(outbound_tx),
inbound_rx,
})
}
@@ -286,7 +312,11 @@ impl tun::Tun for Tun {
buf: &mut Vec<IpPacket>,
max: usize,
) -> Poll<usize> {
self.inbound_rx.poll_recv_many(cx, buf, max)
self.state
.as_mut()
.expect("`tun_state` to always be `Some` until we drop")
.inbound_rx
.poll_recv_many(cx, buf, max)
}
fn name(&self) -> &str {
@@ -296,6 +326,9 @@ impl tun::Tun for Tun {
/// Check if more packets can be sent.
fn poll_send_ready(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
ready!(self
.state
.as_mut()
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Internal state gone"))?
.outbound_tx
.poll_reserve(cx)
.map_err(io::Error::other)?);
@@ -305,7 +338,10 @@ impl tun::Tun for Tun {
/// Send a packet.
fn send(&mut self, packet: IpPacket) -> io::Result<()> {
self.outbound_tx
self.state
.as_mut()
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Internal state gone"))?
.outbound_tx
.send_item(packet)
.map_err(io::Error::other)?;
@@ -316,67 +352,77 @@ impl tun::Tun for Tun {
// Moves packets from Internet towards the user
fn start_send_thread(
mut packet_rx: mpsc::Receiver<IpPacket>,
session: Arc<wintun::Session>,
session: Weak<wintun::Session>,
) -> io::Result<std::thread::JoinHandle<()>> {
// See <https://learn.microsoft.com/en-us/windows/win32/debug/system-error-codes--0-499->.
const ERROR_BUFFER_OVERFLOW: i32 = 0x6F;
std::thread::Builder::new()
.name("TUN send".into())
.spawn(move || {
'recv: loop {
let Some(packet) = packet_rx.blocking_recv() else {
tracing::info!(
"Stopping TUN send worker thread because the packet channel closed"
.spawn(move || loop {
let Some(packet) = packet_rx.blocking_recv() else {
tracing::debug!(
"Stopping TUN send worker thread because the packet channel closed"
);
break;
};
let bytes = packet.packet();
let Ok(len) = bytes.len().try_into() else {
tracing::warn!("Packet too large; length does not fit into u16");
continue;
};
loop {
let Some(session) = session.upgrade() else {
tracing::debug!(
"Stopping TUN send worker thread because the `wintun::Session` was dropped"
);
break;
return;
};
let bytes = packet.packet();
let Ok(len) = bytes.len().try_into() else {
tracing::warn!("Packet too large; length does not fit into u16");
continue;
};
let mut pkt = loop {
match session.allocate_send_packet(len) {
Ok(pkt) => break pkt,
Err(wintun::Error::Io(e))
if e.raw_os_error()
.is_some_and(|code| code == ERROR_BUFFER_OVERFLOW) =>
{
tracing::debug!("WinTUN ring buffer is full");
std::thread::sleep(Duration::from_millis(10)); // Suspend for a bit to avoid busy-looping.
}
Err(e) => {
tracing::error!("Failed to allocate WinTUN packet: {e}");
continue 'recv;
}
match session.allocate_send_packet(len) {
Ok(mut pkt) => {
pkt.bytes_mut().copy_from_slice(bytes);
// `send_packet` cannot fail to enqueue the packet, since we already allocated
// space in the ring buffer.
session.send_packet(pkt);
}
};
pkt.bytes_mut().copy_from_slice(bytes);
// `send_packet` cannot fail to enqueue the packet, since we already allocated
// space in the ring buffer.
session.send_packet(pkt);
Err(wintun::Error::Io(e))
if e.raw_os_error()
.is_some_and(|code| code == ERROR_BUFFER_OVERFLOW) =>
{
tracing::debug!("WinTUN ring buffer is full");
std::thread::sleep(Duration::from_millis(10)); // Suspend for a bit to avoid busy-looping.
}
Err(e) => {
tracing::error!("Failed to allocate WinTUN packet: {e}");
break;
}
}
}
})
}
fn start_recv_thread(
packet_tx: mpsc::Sender<IpPacket>,
session: Arc<wintun::Session>,
session: Weak<wintun::Session>,
) -> io::Result<std::thread::JoinHandle<()>> {
std::thread::Builder::new()
.name("TUN recv".into())
.spawn(move || loop {
let pkt = match session.receive_blocking() {
let Some(receive_result) = session.upgrade().map(|s| s.receive_blocking()) else {
tracing::debug!(
"Stopping TUN recv worker thread because the `wintun::Session` was dropped"
);
break;
};
let pkt = match receive_result {
Ok(pkt) => pkt,
Err(wintun::Error::ShuttingDown) => {
tracing::info!(
"Stopping outbound worker thread because Wintun is shutting down"
);
tracing::debug!("Stopping recv worker thread because Wintun is shutting down");
break;
}
Err(e) => {
@@ -413,8 +459,8 @@ fn start_recv_thread(
match packet_tx.blocking_send(pkt) {
Ok(()) => {}
Err(_) => {
tracing::info!(
"Stopping outbound worker thread because the packet channel closed"
tracing::debug!(
"Stopping TUN recv worker thread because the packet channel closed"
);
break;
}

View File

@@ -0,0 +1,41 @@
#![allow(clippy::unwrap_used)]
use firezone_bin_shared::{platform::tcp_socket_factory, TunDeviceManager};
use ip_network::Ipv4Network;
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4};
use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
// Starts up a WinTun device, claims all routes, and checks if we can still make
// TCP connections outside of our tunnel.
#[tokio::test]
#[ignore = "Needs admin / sudo and Internet"]
async fn no_packet_loops_tcp() {
firezone_logging::test_global("debug"); // `Tun` uses threads and we want to see the logs of all threads.
let ipv4 = Ipv4Addr::from([100, 90, 215, 97]);
let ipv6 = Ipv6Addr::from([0xfd00, 0x2021, 0x1111, 0x0, 0x0, 0x0, 0x0016, 0x588f]);
let mut device_manager = TunDeviceManager::new(1280, 1).unwrap();
let _tun = device_manager.make_tun().unwrap();
device_manager.set_ips(ipv4, ipv6).await.unwrap();
// Configure `0.0.0.0/0` route.
device_manager
.set_routes(
vec![Ipv4Network::new(Ipv4Addr::UNSPECIFIED, 0).unwrap()],
vec![],
)
.await
.unwrap();
let remote = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::from([1, 1, 1, 1]), 80));
let socket = tcp_socket_factory(&remote).unwrap();
let mut stream = socket.connect(remote).await.unwrap();
// Send an HTTP request
stream.write_all("GET /\r\n\r\n".as_bytes()).await.unwrap();
let mut bytes = vec![];
stream.read_to_end(&mut bytes).await.unwrap();
let s = String::from_utf8(bytes).unwrap();
assert_eq!(s, "<html>\r\n<head><title>400 Bad Request</title></head>\r\n<body>\r\n<center><h1>400 Bad Request</h1></center>\r\n<hr><center>cloudflare</center>\r\n</body>\r\n</html>\r\n");
}

View File

@@ -0,0 +1,63 @@
#![allow(clippy::unwrap_used)]
use firezone_bin_shared::{platform::udp_socket_factory, TunDeviceManager};
use ip_network::Ipv4Network;
use socket_factory::DatagramOut;
use std::{
net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4},
time::Duration,
};
// Starts up a WinTUN device, adds a "full-route" (`0.0.0.0/0`), and checks if we can still send packets to IPs outside of our tunnel.
#[tokio::test]
#[ignore = "Needs admin / sudo and Internet"]
async fn no_packet_loops_udp() {
firezone_logging::test_global("debug"); // `Tun` uses threads and we want to see the logs of all threads.
let ipv4 = Ipv4Addr::from([100, 90, 215, 97]);
let ipv6 = Ipv6Addr::from([0xfd00, 0x2021, 0x1111, 0x0, 0x0, 0x0, 0x0016, 0x588f]);
let mut device_manager = TunDeviceManager::new(1280, 1).unwrap();
let _tun = device_manager.make_tun().unwrap();
device_manager.set_ips(ipv4, ipv6).await.unwrap();
// Configure `0.0.0.0/0` route.
device_manager
.set_routes(
vec![Ipv4Network::new(Ipv4Addr::UNSPECIFIED, 0).unwrap()],
vec![],
)
.await
.unwrap();
// Make a socket.
let mut socket =
udp_socket_factory(&SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))).unwrap();
std::future::poll_fn(|cx| socket.poll_send_ready(cx))
.await
.unwrap();
// Send a STUN request.
socket
.send(DatagramOut {
src: None,
dst: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(141, 101, 90, 0), 3478)), // stun.cloudflare.com,
packet: &hex_literal::hex!("000100002112A4420123456789abcdef01234567").as_ref(),
segment_size: None,
})
.unwrap();
let task = std::future::poll_fn(|cx| {
let mut buf = [0u8; 1000];
let result = std::task::ready!(socket.poll_recv_from(&mut buf, cx));
let _response = result.unwrap().next().unwrap();
std::task::Poll::Ready(())
});
tokio::time::timeout(Duration::from_secs(10), task)
.await
.unwrap();
}

View File

@@ -0,0 +1,18 @@
#![allow(clippy::unwrap_used)]
use firezone_bin_shared::TunDeviceManager;
/// Checks for regressions in issue #4765, un-initializing Wintun
/// Redundant but harmless on Linux.
#[tokio::test] // Needs a runtime.
#[ignore = "Needs admin / sudo and Internet"]
async fn tunnel_drop() {
firezone_logging::test_global("debug"); // `Tun` uses threads and we want to see the logs of all threads.
let mut tun_device_manager = TunDeviceManager::new(1280, 1).unwrap();
// Each cycle takes about half a second, so this will take a fair bit to run.
for _ in 0..50 {
let _tun = tun_device_manager.make_tun().unwrap(); // This will panic if we don't correctly clean-up the wintun interface.
}
}

View File

@@ -14,6 +14,12 @@ export default function GUI({ os }: { os: OS }) {
Configures the IPC service to log to journald.
</ChangeItem>
)}
{os === OS.Windows && (
<ChangeItem pull="8268">
Fixes a dead-lock that could occur during shutdown of the TUN
device if there were still packets queued for sending.
</ChangeItem>
)}
</Unreleased>
<Entry version="1.4.6" date={new Date("2025-02-20")}>
{os === OS.Linux && (