fix(connlib): send IO errors from UDP threads to event-loop (#8933)

With #7590, we've moved all UDP IO operations to a separate thread. As a
result, some of the error handling of IO errors within the Client's and
Gateway's event-loop no longer applied as those are now captured within
the respective thread. To fix this, we extend the type-signature of the
receive-channel to also allow for errors and use that to send back
errors from sending AND receiving UDP datagrams.
This commit is contained in:
Thomas Eizinger
2025-04-30 12:00:41 +10:00
committed by GitHub
parent 4881280a3a
commit d19d20da51
2 changed files with 13 additions and 16 deletions

View File

@@ -176,7 +176,7 @@ impl Io {
if let Poll::Ready(network) = self.sockets.poll_recv_from(cx) {
return Poll::Ready(Ok(Input::Network(
network
.context("Failed to receive from UDP sockets")?
.context("UDP socket failed")?
.filter(is_max_wg_packet_size),
)));
}

View File

@@ -152,7 +152,7 @@ where
struct ThreadedUdpSocket {
outbound_tx: flume::r#async::SendSink<'static, DatagramOut>,
inbound_rx: flume::r#async::RecvStream<'static, DatagramSegmentIter>,
inbound_rx: flume::r#async::RecvStream<'static, Result<DatagramSegmentIter>>,
}
impl ThreadedUdpSocket {
@@ -196,7 +196,11 @@ impl ThreadedUdpSocket {
let send = pin!(async {
while let Ok(datagram) = outbound_rx.recv_async().await {
if let Err(e) = socket.send(datagram).await {
tracing::debug!("Failed to send datagram: {e:#}")
// We use the inbound_tx channel to send the error back to the main thread.
if inbound_tx.send_async(Err(e)).await.is_err() {
tracing::debug!("Channel for inbound datagrams closed; exiting UDP thread");
break;
}
};
}
@@ -206,18 +210,11 @@ impl ThreadedUdpSocket {
});
let receive = pin!(async {
loop {
match socket.recv_from().await {
Ok(datagrams) => {
if inbound_tx.send_async(datagrams).await.is_err() {
tracing::debug!(
"Channel for inbound datagrams closed; exiting UDP thread"
);
break;
}
},
Err(e) => {
tracing::debug!("Failed to receive from socket: {e:#}")
},
let result = socket.recv_from().await;
if inbound_tx.send_async(result).await.is_err() {
tracing::debug!("Channel for inbound datagrams closed; exiting UDP thread");
break;
}
}
});
@@ -251,7 +248,7 @@ impl ThreadedUdpSocket {
fn poll_recv_from(&mut self, cx: &mut Context<'_>) -> Poll<Result<DatagramSegmentIter>> {
let iter = ready!(self.inbound_rx.poll_next_unpin(cx)).ok_or(UdpSocketThreadStopped)?;
Poll::Ready(Ok(iter))
Poll::Ready(iter)
}
}