diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 230fd902b..57f921321 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -6858,8 +6858,10 @@ dependencies = [ "bufferpool", "bytes", "derive_more 2.0.1", + "firezone-telemetry", "gat-lending-iterator", "ip-packet", + "libc", "opentelemetry", "parking_lot", "quinn-udp", diff --git a/rust/connlib/socket-factory/Cargo.toml b/rust/connlib/socket-factory/Cargo.toml index cc1b849da..509b15212 100644 --- a/rust/connlib/socket-factory/Cargo.toml +++ b/rust/connlib/socket-factory/Cargo.toml @@ -18,5 +18,9 @@ socket2 = { workspace = true } tokio = { workspace = true, features = ["net"] } tracing = { workspace = true } +[target.'cfg(target_os = "macos")'.dependencies] +firezone-telemetry = { workspace = true } +libc = { workspace = true } + [dev-dependencies] derive_more = { workspace = true, features = ["deref"] } diff --git a/rust/connlib/socket-factory/src/lib.rs b/rust/connlib/socket-factory/src/lib.rs index dc3134332..7d0d0f910 100644 --- a/rust/connlib/socket-factory/src/lib.rs +++ b/rust/connlib/socket-factory/src/lib.rs @@ -327,10 +327,7 @@ impl UdpSocket { tracing::trace!(target: "wire::net::send", src = ?datagram.src, %dst, ecn = ?chunk.ecn, %num_packets, %segment_size); - self.inner - .async_io(Interest::WRITABLE, || { - self.state.try_send((&self.inner).into(), &chunk) - }) + self.send_inner(chunk) .await .with_context(|| format!("Failed to send datagram-batch {batch_num}/{num_batches} with segment_size {segment_size} and total length {num_bytes} to {dst}"))?; } @@ -340,10 +337,7 @@ impl UdpSocket { tracing::trace!(target: "wire::net::send", src = ?datagram.src, %dst, ecn = ?transmit.ecn, %num_bytes); - self.inner - .async_io(Interest::WRITABLE, || { - self.state.try_send((&self.inner).into(), &transmit) - }) + self.send_inner(transmit) .await .with_context(|| format!("Failed to send single-datagram to {dst}"))?; } @@ -352,6 +346,26 @@ impl UdpSocket { Ok(()) } + async fn send_inner(&self, chunk: Transmit<'_>) -> io::Result<()> { + self.inner + .async_io(Interest::WRITABLE, || { + match self.state.try_send((&self.inner).into(), &chunk) { + Ok(()) => Ok(()), + #[cfg(target_os = "macos")] + Err(e) + if e.raw_os_error().is_some_and(|e| e == libc::ENOBUFS) + && firezone_telemetry::feature_flags::map_enobufs_to_would_block() => + { + tracing::debug!("Encountered ENOBUFS, treating as WouldBlock"); + + Err(io::Error::from(io::ErrorKind::WouldBlock)) + } + Err(e) => Err(e), + } + }) + .await + } + /// Calculate the chunk size for a given segment size. /// /// At most, an IP packet can 65535 (`u16::MAX`) bytes. diff --git a/rust/telemetry/src/feature_flags.rs b/rust/telemetry/src/feature_flags.rs index 1efb75d7f..91b6bbf31 100644 --- a/rust/telemetry/src/feature_flags.rs +++ b/rust/telemetry/src/feature_flags.rs @@ -35,6 +35,10 @@ pub fn stream_logs() -> bool { FEATURE_FLAGS.stream_logs() } +pub fn map_enobufs_to_would_block() -> bool { + FEATURE_FLAGS.map_enobufs_to_wouldblock() +} + pub(crate) async fn evaluate_now(user_id: String, env: Env) { if user_id.is_empty() { return; @@ -146,6 +150,8 @@ struct FeatureFlagsResponse { drop_llmnr_nxdomain_responses: bool, #[serde(default)] stream_logs: bool, + #[serde(default)] + map_enobufs_to_wouldblock: bool, } #[derive(Debug, Default)] @@ -153,6 +159,7 @@ struct FeatureFlags { icmp_unreachable_instead_of_nat64: AtomicBool, drop_llmnr_nxdomain_responses: AtomicBool, stream_logs: AtomicBool, + map_enobufs_to_wouldblock: AtomicBool, } /// Accessors to the actual feature flags. @@ -168,6 +175,7 @@ impl FeatureFlags { icmp_unreachable_instead_of_nat64, drop_llmnr_nxdomain_responses, stream_logs, + map_enobufs_to_wouldblock, }: FeatureFlagsResponse, ) { self.icmp_unreachable_instead_of_nat64 @@ -175,6 +183,8 @@ impl FeatureFlags { self.drop_llmnr_nxdomain_responses .store(drop_llmnr_nxdomain_responses, Ordering::Relaxed); self.stream_logs.store(stream_logs, Ordering::Relaxed); + self.map_enobufs_to_wouldblock + .store(map_enobufs_to_wouldblock, Ordering::Relaxed); } fn icmp_unreachable_instead_of_nat64(&self) -> bool { @@ -189,6 +199,10 @@ impl FeatureFlags { fn stream_logs(&self) -> bool { self.stream_logs.load(Ordering::Relaxed) } + + fn map_enobufs_to_wouldblock(&self) -> bool { + self.map_enobufs_to_wouldblock.load(Ordering::Relaxed) + } } fn sentry_flag_context(flags: FeatureFlagsResponse) -> sentry::protocol::Context { @@ -198,6 +212,7 @@ fn sentry_flag_context(flags: FeatureFlagsResponse) -> sentry::protocol::Context IcmpUnreachableInsteadOfNat64 { result: bool }, DropLlmnrNxdomainResponses { result: bool }, StreamLogs { result: bool }, + MapENOBUFSToWouldBlock { result: bool }, } // Exhaustive destruction so we don't forget to update this when we add a flag. @@ -205,6 +220,7 @@ fn sentry_flag_context(flags: FeatureFlagsResponse) -> sentry::protocol::Context icmp_unreachable_instead_of_nat64, drop_llmnr_nxdomain_responses, stream_logs, + map_enobufs_to_wouldblock, } = flags; let value = serde_json::json!({ @@ -213,7 +229,8 @@ fn sentry_flag_context(flags: FeatureFlagsResponse) -> sentry::protocol::Context result: icmp_unreachable_instead_of_nat64, }, SentryFlag::DropLlmnrNxdomainResponses { result: drop_llmnr_nxdomain_responses }, - SentryFlag::StreamLogs { result: stream_logs } + SentryFlag::StreamLogs { result: stream_logs }, + SentryFlag::MapENOBUFSToWouldBlock { result: map_enobufs_to_wouldblock }, ] });