From 29f8dd8688bf241cdcf06c15c0be7fa2e18ac951 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 2 Jun 2025 20:37:38 +0800 Subject: [PATCH] fix(connlib): block until UDP thread has been set up (#9363) Internally, `connlib` spawns a new thread for handling IO on the UDP socket. In order to make sure that this thread is operational, we intended to block `connlib`s main thread until the setup of the UDP thread has successfully completed. Unfortunately, this isn't quite the case because we already send an `Ok(())` value into the channel once we've successfully bound the socket. Following the binding, we also try to increase the maximum buffer size of the socket. Even though the intention here was to also log this error, the error value sent into the channel there is never read because we only ever read one value from the `error_tx` channel. To fix this, we move the sending of the `Ok(())` value to the very bottom of the UDP thread, just before we kick it off. Whilst this does not fix the actual issue as to why the setup of the UDP thread fails, these changes will at least surface the error. --- rust/connlib/tunnel/src/sockets.rs | 145 +++++++++++++++++------------ 1 file changed, 83 insertions(+), 62 deletions(-) diff --git a/rust/connlib/tunnel/src/sockets.rs b/rust/connlib/tunnel/src/sockets.rs index b8f547446..5554aa66a 100644 --- a/rust/connlib/tunnel/src/sockets.rs +++ b/rust/connlib/tunnel/src/sockets.rs @@ -154,7 +154,6 @@ struct ThreadedUdpSocket { } impl ThreadedUdpSocket { - #[expect(clippy::unwrap_in_result, reason = "We unwrap in the new thread.")] fn new(sf: Arc>, addr: SocketAddr) -> io::Result { let (outbound_tx, outbound_rx) = flume::bounded(10); let (inbound_tx, inbound_rx) = flume::bounded(10); @@ -166,83 +165,105 @@ impl ThreadedUdpSocket { SocketAddr::V6(_) => "UDP IPv6".to_owned(), }) .spawn(move || { - tokio::runtime::Builder::new_current_thread() + let runtime = match tokio::runtime::Builder::new_current_thread() .enable_all() .build() - .expect("Failed to spawn tokio runtime on UDP thread") - .block_on(async move { - let mut socket = match sf(&addr) { - Ok(s) => { - let _ = error_tx.send(Ok(())); + { + Ok(r) => r, + Err(e) => { + let _ = error_tx.send(Err(e)); + return; + } + }; - s - } - Err(e) => { - let _ = error_tx.send(Err(e)); - return; - } - }; - - let io_error_counter = opentelemetry::global::meter("connlib") - .u64_counter("system.network.errors") - .with_description("Number of IO errors encountered") - .with_unit("{error}") - .build(); - - match socket.set_buffer_sizes(socket_factory::SEND_BUFFER_SIZE, socket_factory::RECV_BUFFER_SIZE) { - Ok(()) => {}, - Err(e) => { - let _ = error_tx.send(Err(e)); - return; - }, + runtime.block_on(async move { + let mut socket = match sf(&addr) { + Ok(s) => s, + Err(e) => { + let _ = error_tx.send(Err(e)); + return; } + }; - let send = pin!(async { - while let Ok(datagram) = outbound_rx.recv_async().await { - if let Err(e) = socket.send(datagram).await { - if let Some(io) = e.downcast_ref::() { - io_error_counter.add(1, &[ + let io_error_counter = opentelemetry::global::meter("connlib") + .u64_counter("system.network.errors") + .with_description("Number of IO errors encountered") + .with_unit("{error}") + .build(); + + match socket.set_buffer_sizes( + socket_factory::SEND_BUFFER_SIZE, + socket_factory::RECV_BUFFER_SIZE, + ) { + Ok(()) => {} + Err(e) => { + let _ = error_tx.send(Err(e)); + return; + } + } + + let send = pin!(async { + while let Ok(datagram) = outbound_rx.recv_async().await { + if let Err(e) = socket.send(datagram).await { + if let Some(io) = e.downcast_ref::() { + io_error_counter.add( + 1, + &[ otel::attr::network_io_direction_transmit(), otel::attr::network_type_for_addr(addr), otel::attr::io_error_type(io), - otel::attr::io_error_code(io) - ]); - } + otel::attr::io_error_code(io), + ], + ); + } - // We use the inbound_tx channel to send the error back to the main thread. - if inbound_tx.send_async(Err(e)).await.is_err() { - tracing::debug!("Channel for inbound datagrams closed; exiting UDP thread"); - break; - } - }; - } + // We use the inbound_tx channel to send the error back to the main thread. + if inbound_tx.send_async(Err(e)).await.is_err() { + tracing::debug!( + "Channel for inbound datagrams closed; exiting UDP thread" + ); + break; + } + }; + } - tracing::debug!( - "Channel for outbound datagrams closed; exiting UDP thread" - ); - }); - let receive = pin!(async { - loop { - let result = socket.recv_from().await; + tracing::debug!( + "Channel for outbound datagrams closed; exiting UDP thread" + ); + }); + let receive = pin!(async { + loop { + let result = socket.recv_from().await; - if let Some(io) = result.as_ref().err().and_then(|e| e.downcast_ref::()) { - io_error_counter.add(1, &[ + if let Some(io) = result + .as_ref() + .err() + .and_then(|e| e.downcast_ref::()) + { + io_error_counter.add( + 1, + &[ otel::attr::network_io_direction_receive(), otel::attr::network_type_for_addr(addr), otel::attr::io_error_type(io), - otel::attr::io_error_code(io) - ]); - } - - if inbound_tx.send_async(result).await.is_err() { - tracing::debug!("Channel for inbound datagrams closed; exiting UDP thread"); - break; - } + otel::attr::io_error_code(io), + ], + ); } - }); - futures::future::select(send, receive).await; - }) + if inbound_tx.send_async(result).await.is_err() { + tracing::debug!( + "Channel for inbound datagrams closed; exiting UDP thread" + ); + break; + } + } + }); + + let _ = error_tx.send(Ok(())); + + futures::future::select(send, receive).await; + }) })?; error_rx.recv().map_err(io::Error::other)??;