fix(connlib): block until UDP thread has been set up (#9363)

Internally, `connlib` spawns a new thread for handling IO on the UDP
socket. In order to make sure that this thread is operational, we
intended to block `connlib`s main thread until the setup of the UDP
thread has successfully completed.

Unfortunately, this isn't quite the case because we already send an
`Ok(())` value into the channel once we've successfully bound the
socket. Following the binding, we also try to increase the maximum
buffer size of the socket. Even though the intention here was to also
log this error, the error value sent into the channel there is never
read because we only ever read one value from the `error_tx` channel.

To fix this, we move the sending of the `Ok(())` value to the very
bottom of the UDP thread, just before we kick it off. Whilst this does
not fix the actual issue as to why the setup of the UDP thread fails,
these changes will at least surface the error.
This commit is contained in:
Thomas Eizinger
2025-06-02 20:37:38 +08:00
committed by GitHub
parent 1914ea7076
commit 29f8dd8688

View File

@@ -154,7 +154,6 @@ struct ThreadedUdpSocket {
}
impl ThreadedUdpSocket {
#[expect(clippy::unwrap_in_result, reason = "We unwrap in the new thread.")]
fn new(sf: Arc<dyn SocketFactory<UdpSocket>>, addr: SocketAddr) -> io::Result<Self> {
let (outbound_tx, outbound_rx) = flume::bounded(10);
let (inbound_tx, inbound_rx) = flume::bounded(10);
@@ -166,83 +165,105 @@ impl ThreadedUdpSocket {
SocketAddr::V6(_) => "UDP IPv6".to_owned(),
})
.spawn(move || {
tokio::runtime::Builder::new_current_thread()
let runtime = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to spawn tokio runtime on UDP thread")
.block_on(async move {
let mut socket = match sf(&addr) {
Ok(s) => {
let _ = error_tx.send(Ok(()));
{
Ok(r) => r,
Err(e) => {
let _ = error_tx.send(Err(e));
return;
}
};
s
}
Err(e) => {
let _ = error_tx.send(Err(e));
return;
}
};
let io_error_counter = opentelemetry::global::meter("connlib")
.u64_counter("system.network.errors")
.with_description("Number of IO errors encountered")
.with_unit("{error}")
.build();
match socket.set_buffer_sizes(socket_factory::SEND_BUFFER_SIZE, socket_factory::RECV_BUFFER_SIZE) {
Ok(()) => {},
Err(e) => {
let _ = error_tx.send(Err(e));
return;
},
runtime.block_on(async move {
let mut socket = match sf(&addr) {
Ok(s) => s,
Err(e) => {
let _ = error_tx.send(Err(e));
return;
}
};
let send = pin!(async {
while let Ok(datagram) = outbound_rx.recv_async().await {
if let Err(e) = socket.send(datagram).await {
if let Some(io) = e.downcast_ref::<io::Error>() {
io_error_counter.add(1, &[
let io_error_counter = opentelemetry::global::meter("connlib")
.u64_counter("system.network.errors")
.with_description("Number of IO errors encountered")
.with_unit("{error}")
.build();
match socket.set_buffer_sizes(
socket_factory::SEND_BUFFER_SIZE,
socket_factory::RECV_BUFFER_SIZE,
) {
Ok(()) => {}
Err(e) => {
let _ = error_tx.send(Err(e));
return;
}
}
let send = pin!(async {
while let Ok(datagram) = outbound_rx.recv_async().await {
if let Err(e) = socket.send(datagram).await {
if let Some(io) = e.downcast_ref::<io::Error>() {
io_error_counter.add(
1,
&[
otel::attr::network_io_direction_transmit(),
otel::attr::network_type_for_addr(addr),
otel::attr::io_error_type(io),
otel::attr::io_error_code(io)
]);
}
otel::attr::io_error_code(io),
],
);
}
// We use the inbound_tx channel to send the error back to the main thread.
if inbound_tx.send_async(Err(e)).await.is_err() {
tracing::debug!("Channel for inbound datagrams closed; exiting UDP thread");
break;
}
};
}
// We use the inbound_tx channel to send the error back to the main thread.
if inbound_tx.send_async(Err(e)).await.is_err() {
tracing::debug!(
"Channel for inbound datagrams closed; exiting UDP thread"
);
break;
}
};
}
tracing::debug!(
"Channel for outbound datagrams closed; exiting UDP thread"
);
});
let receive = pin!(async {
loop {
let result = socket.recv_from().await;
tracing::debug!(
"Channel for outbound datagrams closed; exiting UDP thread"
);
});
let receive = pin!(async {
loop {
let result = socket.recv_from().await;
if let Some(io) = result.as_ref().err().and_then(|e| e.downcast_ref::<io::Error>()) {
io_error_counter.add(1, &[
if let Some(io) = result
.as_ref()
.err()
.and_then(|e| e.downcast_ref::<io::Error>())
{
io_error_counter.add(
1,
&[
otel::attr::network_io_direction_receive(),
otel::attr::network_type_for_addr(addr),
otel::attr::io_error_type(io),
otel::attr::io_error_code(io)
]);
}
if inbound_tx.send_async(result).await.is_err() {
tracing::debug!("Channel for inbound datagrams closed; exiting UDP thread");
break;
}
otel::attr::io_error_code(io),
],
);
}
});
futures::future::select(send, receive).await;
})
if inbound_tx.send_async(result).await.is_err() {
tracing::debug!(
"Channel for inbound datagrams closed; exiting UDP thread"
);
break;
}
}
});
let _ = error_tx.send(Ok(()));
futures::future::select(send, receive).await;
})
})?;
error_rx.recv().map_err(io::Error::other)??;