fix(connlib): retry sending packet on ENOBUFS (#10965)

In #9798, we added a check to map `ENOBUFS` to `WOUDLBLOCK` on MacOS.
More experimentation on that front revealed that this was actually
incorrect and the UDP sending task will hang as the OS does **not**
notify us once there are new buffers available.

This may explain some random connection hangs that some users have
recently complained about. I've already disabled the feature flag in
production, this PR therefore only removes code that is now inactive.

In order to make this as robust as possible, we implement a retry loop
with an exponential backoff, starting a 2ns. At most, we will be
retrying such a packet for 16ms. Local experiments on my Macbook have
shown that most of the time, new buffer space is available within 1ms.
The exponential backoff ensures we retry very quickly on faster machines
but still successfully send the packet on slower machines.

According to the linked mailing list, the link-speed of the attached
network has nothing to do with this which makes sense. UDP has no
congestion control so sending packets is merely a function of how fast
the CPU can process them.

Related:
https://lists.freebsd.org/pipermail/freebsd-hackers/2004-January/005369.html

---------

Signed-off-by: Thomas Eizinger <thomas@eizinger.io>
Co-authored-by: Jamil <jamilbk@users.noreply.github.com>
This commit is contained in:
Thomas Eizinger
2025-11-26 15:36:36 +11:00
committed by GitHub
parent 7c91955458
commit e1c13d448a
4 changed files with 97 additions and 42 deletions

1
rust/Cargo.lock generated
View File

@@ -7030,7 +7030,6 @@ dependencies = [
"opentelemetry",
"quinn-udp",
"socket2 0.6.1",
"telemetry",
"tokio",
"tracing",
]

View File

@@ -14,18 +14,17 @@ ip-packet = { workspace = true }
opentelemetry = { workspace = true, features = ["metrics"] }
quinn-udp = { workspace = true }
socket2 = { workspace = true }
tokio = { workspace = true, features = ["net"] }
tokio = { workspace = true, features = ["net", "time"] }
tracing = { workspace = true }
[target.'cfg(any(target_os = "macos", target_os = "ios"))'.dependencies]
libc = { workspace = true }
[target.'cfg(target_os = "android")'.dependencies]
libc = { workspace = true }
[target.'cfg(target_os = "linux")'.dependencies]
libc = { workspace = true }
[target.'cfg(target_os = "macos")'.dependencies]
libc = { workspace = true }
telemetry = { workspace = true }
[dev-dependencies]
derive_more = { workspace = true, features = ["deref"] }

View File

@@ -1,4 +1,4 @@
use anyhow::{Context as _, Result};
use anyhow::{Context as _, ErrorExt, Result};
use bufferpool::{Buffer, BufferPool};
use bytes::{Buf as _, BytesMut};
use gat_lending_iterator::LendingIterator;
@@ -8,6 +8,7 @@ use quinn_udp::{EcnCodepoint, Transmit, UdpSockRef};
use std::io;
use std::io::IoSliceMut;
use std::ops::Deref;
use std::time::Duration;
use std::{
net::{IpAddr, SocketAddr},
task::{Context, Poll},
@@ -26,6 +27,10 @@ pub const SEND_BUFFER_SIZE: usize = 16 * ONE_MB;
pub const RECV_BUFFER_SIZE: usize = 128 * ONE_MB;
const ONE_MB: usize = 1024 * 1024;
/// How many times we at most try to re-send a packet if we encounter ENOBUFS.
#[cfg(any(target_os = "macos", target_os = "ios", test))]
const MAX_ENOBUFS_RETRIES: u32 = 24;
impl<F, S> SocketFactory<S> for F
where
F: Fn(SocketAddr) -> io::Result<S> + Send + Sync + 'static,
@@ -306,21 +311,21 @@ impl UdpSocket {
datagram.ecn,
)?;
match self.send_transmit(&transmit).await {
Ok(()) => Ok(()),
let mut attempt = 0;
// On Linux and Android, we retry sending once for os error 5.
//
// quinn-udp disables GSO for those but cannot automatically re-send them because we need to split the datagram differently.
#[cfg(any(target_os = "linux", target_os = "android"))]
Err(e) if is_os_error_5(&e) => {
self.send_transmit(&transmit).await?;
loop {
match self.send_transmit(&transmit).await {
Ok(()) => return Ok(()),
Err(e) => {
let backoff = backoff(&e, attempt).ok_or(e)?; // Attempt to get a backoff value or otherwise bail with error.
Ok(())
tracing::debug!(?backoff, dst = %datagram.dst, len = %datagram.packet.len(), "Retrying packet");
tokio::time::sleep(backoff).await;
}
}
// Any other error or other OS returns the error directly.
Err(e) => Err(e),
attempt += 1;
}
}
@@ -364,24 +369,7 @@ impl UdpSocket {
);
self.inner
.async_io(Interest::WRITABLE, || {
match self.state.try_send((&self.inner).into(), &chunk) {
Ok(()) => Ok(()),
#[cfg(target_os = "macos")]
Err(e)
if e.raw_os_error().is_some_and(|e| e == libc::ENOBUFS)
&& telemetry::feature_flags::map_enobufs_to_would_block() =>
{
telemetry::analytics::feature_flag_called(
"map-enobufs-to-wouldblock",
);
tracing::debug!("Encountered ENOBUFS, treating as WouldBlock");
Err(io::Error::from(io::ErrorKind::WouldBlock))
}
Err(e) => Err(e),
}
})
.async_io(Interest::WRITABLE, || self.state.try_send((&self.inner).into(), &chunk))
.await
.with_context(|| format!("Failed to send datagram-batch {batch_num}/{num_batches} with segment_size {segment_size} and total length {num_bytes} to {dst}"))?;
}
@@ -516,13 +504,41 @@ fn is_equal_modulo_scope_for_ipv6_link_local(expected: SocketAddr, actual: Socke
}
}
#[cfg(any(target_os = "linux", target_os = "android"))]
fn is_os_error_5(e: &anyhow::Error) -> bool {
use anyhow::ErrorExt;
#[cfg_attr(
not(any(
target_os = "linux",
target_os = "android",
target_os = "macos",
target_os = "ios"
)),
expect(unused_variables, reason = "No backoff strategy for other platforms")
)]
fn backoff(e: &anyhow::Error, attempts: u32) -> Option<Duration> {
let raw_os_error = e.any_downcast_ref::<io::Error>()?.raw_os_error()?;
e.any_downcast_ref::<io::Error>()
.and_then(|e| e.raw_os_error())
.is_some_and(|c| c == libc::EIO)
// On Linux and Android, we retry sending once for os error 5.
//
// quinn-udp disables GSO for those but cannot automatically re-send them because we need to split the datagram differently.
#[cfg(any(target_os = "linux", target_os = "android"))]
if raw_os_error == libc::EIO && attempts < 1 {
return Some(Duration::ZERO);
}
// On MacOS, the kernel may return ENOBUFS if the buffer fills up.
//
// Ideally, we would be able to suspend here but MacOS doesn't support that.
// Thus, we do the next best thing and retry.
#[cfg(any(target_os = "macos", target_os = "ios"))]
if raw_os_error == libc::ENOBUFS && attempts < MAX_ENOBUFS_RETRIES {
return Some(exp_delay(attempts));
}
None
}
#[cfg(any(target_os = "macos", target_os = "ios", test))]
fn exp_delay(attempts: u32) -> Duration {
Duration::from_nanos(2_u64.pow(attempts))
}
/// An iterator that segments an array of buffers into individual datagrams.
@@ -729,4 +745,41 @@ mod tests {
assert!(is_equal_modulo_scope_for_ipv6_link_local(left, right))
}
#[test]
fn max_enobufs_delay() {
assert_eq!(
exp_delay(MAX_ENOBUFS_RETRIES),
Duration::from_nanos(16_777_216) // ~16ms
)
}
#[test]
#[cfg(target_os = "linux")]
fn immediate_retry_of_os_error_5() {
let err = anyhow::Error::new(io::Error::from_raw_os_error(libc::EIO));
let backoff = backoff(&err, 0);
assert_eq!(backoff.unwrap(), Duration::ZERO);
}
#[test]
#[cfg(target_os = "linux")]
fn only_one_retry_of_os_error_5() {
let err = anyhow::Error::new(io::Error::from_raw_os_error(libc::EIO));
let backoff = backoff(&err, 1);
assert!(backoff.is_none());
}
#[test]
#[cfg(any(target_os = "macos", target_os = "ios"))]
fn at_most_24_retries_of_enobufs() {
let err = anyhow::Error::new(io::Error::from_raw_os_error(libc::ENOBUFS));
assert!(backoff(&err, 23).is_some());
assert!(backoff(&err, 24).is_none());
}
}

View File

@@ -47,6 +47,10 @@ export default function Apple() {
Fixes an issue where concurrent DNS queries with the same ID would be
dropped.
</ChangeItem>
<ChangeItem pull="10965">
Fixes an issue where some packets would get dropped under high
throughput scenarios.
</ChangeItem>
</Unreleased>
<Entry version="1.5.9" date={new Date("2025-10-20")}>
<ChangeItem pull="10603">