From d19d20da51a2bea2fdfca29808ae3c6ffa190911 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 30 Apr 2025 12:00:41 +1000 Subject: [PATCH] fix(connlib): send IO errors from UDP threads to event-loop (#8933) With #7590, we've moved all UDP IO operations to a separate thread. As a result, some of the error handling of IO errors within the Client's and Gateway's event-loop no longer applied as those are now captured within the respective thread. To fix this, we extend the type-signature of the receive-channel to also allow for errors and use that to send back errors from sending AND receiving UDP datagrams. --- rust/connlib/tunnel/src/io.rs | 2 +- rust/connlib/tunnel/src/sockets.rs | 27 ++++++++++++--------------- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/rust/connlib/tunnel/src/io.rs b/rust/connlib/tunnel/src/io.rs index 6dc4ae59c..182195a23 100644 --- a/rust/connlib/tunnel/src/io.rs +++ b/rust/connlib/tunnel/src/io.rs @@ -176,7 +176,7 @@ impl Io { if let Poll::Ready(network) = self.sockets.poll_recv_from(cx) { return Poll::Ready(Ok(Input::Network( network - .context("Failed to receive from UDP sockets")? + .context("UDP socket failed")? .filter(is_max_wg_packet_size), ))); } diff --git a/rust/connlib/tunnel/src/sockets.rs b/rust/connlib/tunnel/src/sockets.rs index 5f344e28b..556bc5b97 100644 --- a/rust/connlib/tunnel/src/sockets.rs +++ b/rust/connlib/tunnel/src/sockets.rs @@ -152,7 +152,7 @@ where struct ThreadedUdpSocket { outbound_tx: flume::r#async::SendSink<'static, DatagramOut>, - inbound_rx: flume::r#async::RecvStream<'static, DatagramSegmentIter>, + inbound_rx: flume::r#async::RecvStream<'static, Result>, } impl ThreadedUdpSocket { @@ -196,7 +196,11 @@ impl ThreadedUdpSocket { let send = pin!(async { while let Ok(datagram) = outbound_rx.recv_async().await { if let Err(e) = socket.send(datagram).await { - tracing::debug!("Failed to send datagram: {e:#}") + // We use the inbound_tx channel to send the error back to the main thread. + if inbound_tx.send_async(Err(e)).await.is_err() { + tracing::debug!("Channel for inbound datagrams closed; exiting UDP thread"); + break; + } }; } @@ -206,18 +210,11 @@ impl ThreadedUdpSocket { }); let receive = pin!(async { loop { - match socket.recv_from().await { - Ok(datagrams) => { - if inbound_tx.send_async(datagrams).await.is_err() { - tracing::debug!( - "Channel for inbound datagrams closed; exiting UDP thread" - ); - break; - } - }, - Err(e) => { - tracing::debug!("Failed to receive from socket: {e:#}") - }, + let result = socket.recv_from().await; + + if inbound_tx.send_async(result).await.is_err() { + tracing::debug!("Channel for inbound datagrams closed; exiting UDP thread"); + break; } } }); @@ -251,7 +248,7 @@ impl ThreadedUdpSocket { fn poll_recv_from(&mut self, cx: &mut Context<'_>) -> Poll> { let iter = ready!(self.inbound_rx.poll_next_unpin(cx)).ok_or(UdpSocketThreadStopped)?; - Poll::Ready(Ok(iter)) + Poll::Ready(iter) } }