diff --git a/rust/connlib/tunnel/src/sockets.rs b/rust/connlib/tunnel/src/sockets.rs index b8f547446..5554aa66a 100644 --- a/rust/connlib/tunnel/src/sockets.rs +++ b/rust/connlib/tunnel/src/sockets.rs @@ -154,7 +154,6 @@ struct ThreadedUdpSocket { } impl ThreadedUdpSocket { - #[expect(clippy::unwrap_in_result, reason = "We unwrap in the new thread.")] fn new(sf: Arc>, addr: SocketAddr) -> io::Result { let (outbound_tx, outbound_rx) = flume::bounded(10); let (inbound_tx, inbound_rx) = flume::bounded(10); @@ -166,83 +165,105 @@ impl ThreadedUdpSocket { SocketAddr::V6(_) => "UDP IPv6".to_owned(), }) .spawn(move || { - tokio::runtime::Builder::new_current_thread() + let runtime = match tokio::runtime::Builder::new_current_thread() .enable_all() .build() - .expect("Failed to spawn tokio runtime on UDP thread") - .block_on(async move { - let mut socket = match sf(&addr) { - Ok(s) => { - let _ = error_tx.send(Ok(())); + { + Ok(r) => r, + Err(e) => { + let _ = error_tx.send(Err(e)); + return; + } + }; - s - } - Err(e) => { - let _ = error_tx.send(Err(e)); - return; - } - }; - - let io_error_counter = opentelemetry::global::meter("connlib") - .u64_counter("system.network.errors") - .with_description("Number of IO errors encountered") - .with_unit("{error}") - .build(); - - match socket.set_buffer_sizes(socket_factory::SEND_BUFFER_SIZE, socket_factory::RECV_BUFFER_SIZE) { - Ok(()) => {}, - Err(e) => { - let _ = error_tx.send(Err(e)); - return; - }, + runtime.block_on(async move { + let mut socket = match sf(&addr) { + Ok(s) => s, + Err(e) => { + let _ = error_tx.send(Err(e)); + return; } + }; - let send = pin!(async { - while let Ok(datagram) = outbound_rx.recv_async().await { - if let Err(e) = socket.send(datagram).await { - if let Some(io) = e.downcast_ref::() { - io_error_counter.add(1, &[ + let io_error_counter = opentelemetry::global::meter("connlib") + .u64_counter("system.network.errors") + .with_description("Number of IO errors encountered") + .with_unit("{error}") + .build(); + + match socket.set_buffer_sizes( + socket_factory::SEND_BUFFER_SIZE, + socket_factory::RECV_BUFFER_SIZE, + ) { + Ok(()) => {} + Err(e) => { + let _ = error_tx.send(Err(e)); + return; + } + } + + let send = pin!(async { + while let Ok(datagram) = outbound_rx.recv_async().await { + if let Err(e) = socket.send(datagram).await { + if let Some(io) = e.downcast_ref::() { + io_error_counter.add( + 1, + &[ otel::attr::network_io_direction_transmit(), otel::attr::network_type_for_addr(addr), otel::attr::io_error_type(io), - otel::attr::io_error_code(io) - ]); - } + otel::attr::io_error_code(io), + ], + ); + } - // We use the inbound_tx channel to send the error back to the main thread. - if inbound_tx.send_async(Err(e)).await.is_err() { - tracing::debug!("Channel for inbound datagrams closed; exiting UDP thread"); - break; - } - }; - } + // We use the inbound_tx channel to send the error back to the main thread. + if inbound_tx.send_async(Err(e)).await.is_err() { + tracing::debug!( + "Channel for inbound datagrams closed; exiting UDP thread" + ); + break; + } + }; + } - tracing::debug!( - "Channel for outbound datagrams closed; exiting UDP thread" - ); - }); - let receive = pin!(async { - loop { - let result = socket.recv_from().await; + tracing::debug!( + "Channel for outbound datagrams closed; exiting UDP thread" + ); + }); + let receive = pin!(async { + loop { + let result = socket.recv_from().await; - if let Some(io) = result.as_ref().err().and_then(|e| e.downcast_ref::()) { - io_error_counter.add(1, &[ + if let Some(io) = result + .as_ref() + .err() + .and_then(|e| e.downcast_ref::()) + { + io_error_counter.add( + 1, + &[ otel::attr::network_io_direction_receive(), otel::attr::network_type_for_addr(addr), otel::attr::io_error_type(io), - otel::attr::io_error_code(io) - ]); - } - - if inbound_tx.send_async(result).await.is_err() { - tracing::debug!("Channel for inbound datagrams closed; exiting UDP thread"); - break; - } + otel::attr::io_error_code(io), + ], + ); } - }); - futures::future::select(send, receive).await; - }) + if inbound_tx.send_async(result).await.is_err() { + tracing::debug!( + "Channel for inbound datagrams closed; exiting UDP thread" + ); + break; + } + } + }); + + let _ = error_tx.send(Ok(())); + + futures::future::select(send, receive).await; + }) })?; error_rx.recv().map_err(io::Error::other)??;