diff --git a/rust/connlib/tunnel/src/device_channel/tun_windows.rs b/rust/connlib/tunnel/src/device_channel/tun_windows.rs index 8270aaf6e..56094544c 100644 --- a/rust/connlib/tunnel/src/device_channel/tun_windows.rs +++ b/rust/connlib/tunnel/src/device_channel/tun_windows.rs @@ -14,7 +14,7 @@ use std::{ sync::Arc, task::{ready, Context, Poll}, }; -use tokio::sync::mpsc::{self, error::TrySendError}; +use tokio::sync::mpsc; use windows::Win32::{ NetworkManagement::{ IpHelper::{ @@ -92,7 +92,9 @@ impl Tun { set_iface_config(adapter.get_luid(), MTU as u32)?; let session = Arc::new(adapter.start_session(wintun::MAX_RING_CAPACITY)?); - let (packet_tx, packet_rx) = mpsc::channel(5); + // 4 is a nice power of two. Wintun already queues packets for us, so we don't + // need much capacity here. + let (packet_tx, packet_rx) = mpsc::channel(4); let recv_thread = start_recv_thread(packet_tx, Arc::clone(&session))?; Ok(Self { @@ -128,6 +130,7 @@ impl Tun { Ok(()) } + // Moves packets from the user towards the Internet pub fn poll_read(&mut self, buf: &mut [u8], cx: &mut Context<'_>) -> Poll> { let pkt = ready!(self.packet_rx.poll_recv(cx)); @@ -163,6 +166,7 @@ impl Tun { self.write(bytes) } + // Moves packets from the Internet towards the user #[allow(clippy::unnecessary_wraps)] // Fn signature must align with other platform implementations. fn write(&self, bytes: &[u8]) -> io::Result { let len = bytes @@ -241,6 +245,7 @@ pub(crate) fn flush_dns() -> Result<()> { Ok(()) } +// Moves packets from the user towards the Internet fn start_recv_thread( packet_tx: mpsc::Sender, session: Arc, @@ -251,27 +256,29 @@ fn start_recv_thread( loop { match session.receive_blocking() { Ok(pkt) => { - match packet_tx.try_send(pkt) { + // Use `blocking_send` so that if connlib is behind by a few packets, + // Wintun will queue up new packets in its ring buffer while we + // wait for our MPSC channel to clear. + match packet_tx.blocking_send(pkt) { Ok(()) => {} - Err(TrySendError::Closed(_)) => { - // This is redundant since we aren't using - // `blocking_send` anymore but it's defense in depth. + Err(_) => { tracing::info!( - "Closing worker thread because packet channel closed" + "Stopping outbound worker thread because the packet channel closed" ); break; } - Err(TrySendError::Full(_)) => {} // Just drop the packet, it's IP } } - Err(wintun::Error::ShuttingDown) => break, + Err(wintun::Error::ShuttingDown) => { + tracing::info!("Stopping outbound worker thread because Wintun is shutting down"); + break; + } Err(e) => { tracing::error!("wintun::Session::receive_blocking: {e:#?}"); break; } } } - tracing::info!("recv_task exiting gracefully"); }) }