diff --git a/rust/Cargo.lock b/rust/Cargo.lock index ce3ffb979..5434d2317 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -7030,7 +7030,6 @@ dependencies = [ "opentelemetry", "quinn-udp", "socket2 0.6.1", - "telemetry", "tokio", "tracing", ] diff --git a/rust/libs/connlib/socket-factory/Cargo.toml b/rust/libs/connlib/socket-factory/Cargo.toml index 3828e619b..3918e640e 100644 --- a/rust/libs/connlib/socket-factory/Cargo.toml +++ b/rust/libs/connlib/socket-factory/Cargo.toml @@ -14,18 +14,17 @@ ip-packet = { workspace = true } opentelemetry = { workspace = true, features = ["metrics"] } quinn-udp = { workspace = true } socket2 = { workspace = true } -tokio = { workspace = true, features = ["net"] } +tokio = { workspace = true, features = ["net", "time"] } tracing = { workspace = true } +[target.'cfg(any(target_os = "macos", target_os = "ios"))'.dependencies] +libc = { workspace = true } + [target.'cfg(target_os = "android")'.dependencies] libc = { workspace = true } [target.'cfg(target_os = "linux")'.dependencies] libc = { workspace = true } -[target.'cfg(target_os = "macos")'.dependencies] -libc = { workspace = true } -telemetry = { workspace = true } - [dev-dependencies] derive_more = { workspace = true, features = ["deref"] } diff --git a/rust/libs/connlib/socket-factory/src/lib.rs b/rust/libs/connlib/socket-factory/src/lib.rs index 81ed192bf..b6f9ea990 100644 --- a/rust/libs/connlib/socket-factory/src/lib.rs +++ b/rust/libs/connlib/socket-factory/src/lib.rs @@ -1,4 +1,4 @@ -use anyhow::{Context as _, Result}; +use anyhow::{Context as _, ErrorExt, Result}; use bufferpool::{Buffer, BufferPool}; use bytes::{Buf as _, BytesMut}; use gat_lending_iterator::LendingIterator; @@ -8,6 +8,7 @@ use quinn_udp::{EcnCodepoint, Transmit, UdpSockRef}; use std::io; use std::io::IoSliceMut; use std::ops::Deref; +use std::time::Duration; use std::{ net::{IpAddr, SocketAddr}, task::{Context, Poll}, @@ -26,6 +27,10 @@ pub const SEND_BUFFER_SIZE: usize = 16 * ONE_MB; pub const RECV_BUFFER_SIZE: usize = 128 * ONE_MB; const ONE_MB: usize = 1024 * 1024; +/// How many times we at most try to re-send a packet if we encounter ENOBUFS. +#[cfg(any(target_os = "macos", target_os = "ios", test))] +const MAX_ENOBUFS_RETRIES: u32 = 24; + impl SocketFactory for F where F: Fn(SocketAddr) -> io::Result + Send + Sync + 'static, @@ -306,21 +311,21 @@ impl UdpSocket { datagram.ecn, )?; - match self.send_transmit(&transmit).await { - Ok(()) => Ok(()), + let mut attempt = 0; - // On Linux and Android, we retry sending once for os error 5. - // - // quinn-udp disables GSO for those but cannot automatically re-send them because we need to split the datagram differently. - #[cfg(any(target_os = "linux", target_os = "android"))] - Err(e) if is_os_error_5(&e) => { - self.send_transmit(&transmit).await?; + loop { + match self.send_transmit(&transmit).await { + Ok(()) => return Ok(()), + Err(e) => { + let backoff = backoff(&e, attempt).ok_or(e)?; // Attempt to get a backoff value or otherwise bail with error. - Ok(()) + tracing::debug!(?backoff, dst = %datagram.dst, len = %datagram.packet.len(), "Retrying packet"); + + tokio::time::sleep(backoff).await; + } } - // Any other error or other OS returns the error directly. - Err(e) => Err(e), + attempt += 1; } } @@ -364,24 +369,7 @@ impl UdpSocket { ); self.inner - .async_io(Interest::WRITABLE, || { - match self.state.try_send((&self.inner).into(), &chunk) { - Ok(()) => Ok(()), - #[cfg(target_os = "macos")] - Err(e) - if e.raw_os_error().is_some_and(|e| e == libc::ENOBUFS) - && telemetry::feature_flags::map_enobufs_to_would_block() => - { - telemetry::analytics::feature_flag_called( - "map-enobufs-to-wouldblock", - ); - tracing::debug!("Encountered ENOBUFS, treating as WouldBlock"); - - Err(io::Error::from(io::ErrorKind::WouldBlock)) - } - Err(e) => Err(e), - } - }) + .async_io(Interest::WRITABLE, || self.state.try_send((&self.inner).into(), &chunk)) .await .with_context(|| format!("Failed to send datagram-batch {batch_num}/{num_batches} with segment_size {segment_size} and total length {num_bytes} to {dst}"))?; } @@ -516,13 +504,41 @@ fn is_equal_modulo_scope_for_ipv6_link_local(expected: SocketAddr, actual: Socke } } -#[cfg(any(target_os = "linux", target_os = "android"))] -fn is_os_error_5(e: &anyhow::Error) -> bool { - use anyhow::ErrorExt; +#[cfg_attr( + not(any( + target_os = "linux", + target_os = "android", + target_os = "macos", + target_os = "ios" + )), + expect(unused_variables, reason = "No backoff strategy for other platforms") +)] +fn backoff(e: &anyhow::Error, attempts: u32) -> Option { + let raw_os_error = e.any_downcast_ref::()?.raw_os_error()?; - e.any_downcast_ref::() - .and_then(|e| e.raw_os_error()) - .is_some_and(|c| c == libc::EIO) + // On Linux and Android, we retry sending once for os error 5. + // + // quinn-udp disables GSO for those but cannot automatically re-send them because we need to split the datagram differently. + #[cfg(any(target_os = "linux", target_os = "android"))] + if raw_os_error == libc::EIO && attempts < 1 { + return Some(Duration::ZERO); + } + + // On MacOS, the kernel may return ENOBUFS if the buffer fills up. + // + // Ideally, we would be able to suspend here but MacOS doesn't support that. + // Thus, we do the next best thing and retry. + #[cfg(any(target_os = "macos", target_os = "ios"))] + if raw_os_error == libc::ENOBUFS && attempts < MAX_ENOBUFS_RETRIES { + return Some(exp_delay(attempts)); + } + + None +} + +#[cfg(any(target_os = "macos", target_os = "ios", test))] +fn exp_delay(attempts: u32) -> Duration { + Duration::from_nanos(2_u64.pow(attempts)) } /// An iterator that segments an array of buffers into individual datagrams. @@ -729,4 +745,41 @@ mod tests { assert!(is_equal_modulo_scope_for_ipv6_link_local(left, right)) } + + #[test] + fn max_enobufs_delay() { + assert_eq!( + exp_delay(MAX_ENOBUFS_RETRIES), + Duration::from_nanos(16_777_216) // ~16ms + ) + } + + #[test] + #[cfg(target_os = "linux")] + fn immediate_retry_of_os_error_5() { + let err = anyhow::Error::new(io::Error::from_raw_os_error(libc::EIO)); + + let backoff = backoff(&err, 0); + + assert_eq!(backoff.unwrap(), Duration::ZERO); + } + + #[test] + #[cfg(target_os = "linux")] + fn only_one_retry_of_os_error_5() { + let err = anyhow::Error::new(io::Error::from_raw_os_error(libc::EIO)); + + let backoff = backoff(&err, 1); + + assert!(backoff.is_none()); + } + + #[test] + #[cfg(any(target_os = "macos", target_os = "ios"))] + fn at_most_24_retries_of_enobufs() { + let err = anyhow::Error::new(io::Error::from_raw_os_error(libc::ENOBUFS)); + + assert!(backoff(&err, 23).is_some()); + assert!(backoff(&err, 24).is_none()); + } } diff --git a/website/src/components/Changelog/Apple.tsx b/website/src/components/Changelog/Apple.tsx index af7274059..1729d8af3 100644 --- a/website/src/components/Changelog/Apple.tsx +++ b/website/src/components/Changelog/Apple.tsx @@ -47,6 +47,10 @@ export default function Apple() { Fixes an issue where concurrent DNS queries with the same ID would be dropped. + + Fixes an issue where some packets would get dropped under high + throughput scenarios. +