diff --git a/rust/connlib/tunnel/src/io.rs b/rust/connlib/tunnel/src/io.rs index 6dc4ae59c..182195a23 100644 --- a/rust/connlib/tunnel/src/io.rs +++ b/rust/connlib/tunnel/src/io.rs @@ -176,7 +176,7 @@ impl Io { if let Poll::Ready(network) = self.sockets.poll_recv_from(cx) { return Poll::Ready(Ok(Input::Network( network - .context("Failed to receive from UDP sockets")? + .context("UDP socket failed")? .filter(is_max_wg_packet_size), ))); } diff --git a/rust/connlib/tunnel/src/sockets.rs b/rust/connlib/tunnel/src/sockets.rs index 5f344e28b..556bc5b97 100644 --- a/rust/connlib/tunnel/src/sockets.rs +++ b/rust/connlib/tunnel/src/sockets.rs @@ -152,7 +152,7 @@ where struct ThreadedUdpSocket { outbound_tx: flume::r#async::SendSink<'static, DatagramOut>, - inbound_rx: flume::r#async::RecvStream<'static, DatagramSegmentIter>, + inbound_rx: flume::r#async::RecvStream<'static, Result>, } impl ThreadedUdpSocket { @@ -196,7 +196,11 @@ impl ThreadedUdpSocket { let send = pin!(async { while let Ok(datagram) = outbound_rx.recv_async().await { if let Err(e) = socket.send(datagram).await { - tracing::debug!("Failed to send datagram: {e:#}") + // We use the inbound_tx channel to send the error back to the main thread. + if inbound_tx.send_async(Err(e)).await.is_err() { + tracing::debug!("Channel for inbound datagrams closed; exiting UDP thread"); + break; + } }; } @@ -206,18 +210,11 @@ impl ThreadedUdpSocket { }); let receive = pin!(async { loop { - match socket.recv_from().await { - Ok(datagrams) => { - if inbound_tx.send_async(datagrams).await.is_err() { - tracing::debug!( - "Channel for inbound datagrams closed; exiting UDP thread" - ); - break; - } - }, - Err(e) => { - tracing::debug!("Failed to receive from socket: {e:#}") - }, + let result = socket.recv_from().await; + + if inbound_tx.send_async(result).await.is_err() { + tracing::debug!("Channel for inbound datagrams closed; exiting UDP thread"); + break; } } }); @@ -251,7 +248,7 @@ impl ThreadedUdpSocket { fn poll_recv_from(&mut self, cx: &mut Context<'_>) -> Poll> { let iter = ready!(self.inbound_rx.poll_next_unpin(cx)).ok_or(UdpSocketThreadStopped)?; - Poll::Ready(Ok(iter)) + Poll::Ready(iter) } }