diff --git a/rust/client-shared/src/eventloop.rs b/rust/client-shared/src/eventloop.rs index 7c0bf3619..a37f66f65 100644 --- a/rust/client-shared/src/eventloop.rs +++ b/rust/client-shared/src/eventloop.rs @@ -6,7 +6,9 @@ use firezone_tunnel::messages::client::{ EgressMessages, FailReason, FlowCreated, FlowCreationFailed, GatewayIceCandidates, GatewaysIceCandidates, IngressMessages, InitClient, }; -use firezone_tunnel::{ClientEvent, ClientTunnel, DnsResourceRecord, IpConfig, TunConfig}; +use firezone_tunnel::{ + ClientEvent, ClientTunnel, DnsResourceRecord, IpConfig, TunConfig, TunnelError, +}; use parking_lot::Mutex; use phoenix_channel::{ErrorReply, PhoenixChannel, PublicKeyParam}; use socket_factory::{SocketFactory, TcpSocket, UdpSocket}; @@ -132,7 +134,7 @@ impl Eventloop { enum CombinedEvent { Command(Option), - Tunnel(Result), + Tunnel(ClientEvent), Portal(Option>), } @@ -147,8 +149,7 @@ impl Eventloop { ControlFlow::Break(()) => return Ok(()), } } - CombinedEvent::Tunnel(Ok(event)) => self.handle_tunnel_event(event).await?, - CombinedEvent::Tunnel(Err(e)) => self.handle_tunnel_error(e)?, + CombinedEvent::Tunnel(event) => self.handle_tunnel_event(event).await?, CombinedEvent::Portal(Some(event)) => { let msg = event.context("Connection to portal failed")?; @@ -244,52 +245,55 @@ impl Eventloop { .send(Some(config)) .context("Failed to emit event")?; } - firezone_tunnel::ClientEvent::DnsRecordsChanged { records } => { + ClientEvent::DnsRecordsChanged { records } => { *DNS_RESOURCE_RECORDS_CACHE.lock() = records; } + ClientEvent::Error(error) => self.handle_tunnel_error(error)?, } Ok(()) } - fn handle_tunnel_error(&mut self, e: anyhow::Error) -> Result<()> { - if e.root_cause() - .downcast_ref::() - .is_some_and(is_unreachable) - { - tracing::debug!("{e:#}"); // Log these on DEBUG so they don't go completely unnoticed. - return Ok(()); - } - - // Invalid Input can be all sorts of things but we mostly see it with unreachable addresses. - if e.root_cause() - .downcast_ref::() - .is_some_and(|e| e.kind() == io::ErrorKind::InvalidInput) - { - tracing::debug!("{e:#}"); - return Ok(()); - } - - if e.root_cause() - .is::() - { - return Err(e); - } - - if e.root_cause() - .downcast_ref::() - .is_some_and(|e| e.kind() == io::ErrorKind::PermissionDenied) - { - if !mem::replace(&mut self.logged_permission_denied, true) { - tracing::info!( - "Encountered `PermissionDenied` IO error. Check your local firewall rules to allow outbound STUN/TURN/WireGuard and general UDP traffic." - ) + fn handle_tunnel_error(&mut self, mut e: TunnelError) -> Result<()> { + for e in e.drain() { + if e.root_cause() + .downcast_ref::() + .is_some_and(is_unreachable) + { + tracing::debug!("{e:#}"); // Log these on DEBUG so they don't go completely unnoticed. + return Ok(()); } - return Ok(()); - } + // Invalid Input can be all sorts of things but we mostly see it with unreachable addresses. + if e.root_cause() + .downcast_ref::() + .is_some_and(|e| e.kind() == io::ErrorKind::InvalidInput) + { + tracing::debug!("{e:#}"); + return Ok(()); + } - tracing::warn!("Tunnel error: {e:#}"); + if e.root_cause() + .is::() + { + return Err(e); + } + + if e.root_cause() + .downcast_ref::() + .is_some_and(|e| e.kind() == io::ErrorKind::PermissionDenied) + { + if !mem::replace(&mut self.logged_permission_denied, true) { + tracing::info!( + "Encountered `PermissionDenied` IO error. Check your local firewall rules to allow outbound STUN/TURN/WireGuard and general UDP traffic." + ) + } + + return Ok(()); + } + + tracing::warn!("Tunnel error: {e:#}"); + } Ok(()) } diff --git a/rust/connlib/tunnel/src/io.rs b/rust/connlib/tunnel/src/io.rs index df9cb66ee..24982efb6 100644 --- a/rust/connlib/tunnel/src/io.rs +++ b/rust/connlib/tunnel/src/io.rs @@ -3,8 +3,9 @@ mod nameserver_set; mod tcp_dns; mod udp_dns; -use crate::{device_channel::Device, dns, otel, sockets::Sockets}; +use crate::{TunnelError, device_channel::Device, dns, otel, sockets::Sockets}; use anyhow::{Context as _, Result}; +use chrono::{DateTime, Utc}; use futures::FutureExt as _; use futures_bounded::FuturesTupleSet; use gat_lending_iterator::LendingIterator; @@ -82,13 +83,58 @@ impl Default for Buffers { } } -pub enum Input { - Timeout(Instant), - Device(D), - Network(I), - TcpDnsQuery(l4_tcp_dns_server::Query), - UdpDnsQuery(l4_udp_dns_server::Query), - DnsResponse(dns::RecursiveResponse), +/// Represents all IO sources that may be ready during a single event-loop tick. +/// +/// This structure allows us to batch-process multiple ready sources rather than +/// handling them one at a time, improving fairness and preventing starvation. +pub struct Input { + pub now: Instant, + pub now_utc: DateTime, + pub timeout: bool, + pub device: Option, + pub network: Option, + pub tcp_dns_query: Option, + pub udp_dns_query: Option, + pub dns_response: Option, + pub error: TunnelError, +} + +impl Input { + fn error(e: impl Into) -> Self { + Self { + now: Instant::now(), + now_utc: Utc::now(), + timeout: false, + device: None, + network: None, + tcp_dns_query: None, + udp_dns_query: None, + dns_response: None, + error: TunnelError::single(e), + } + } +} + +fn poll_to_option(poll: Poll) -> Option { + match poll { + Poll::Ready(r) => Some(r), + Poll::Pending => None, + } +} + +fn poll_result_to_option(poll: Poll>, error: &mut TunnelError) -> Option +where + anyhow::Error: From, +{ + match poll { + Poll::Ready(Ok(r)) => Some(r), + Poll::Ready(Err(e)) => { + error.push(e); + + None + } + Poll::Pending => None, + } } const DNS_QUERY_TIMEOUT: Duration = Duration::from_secs(5); @@ -157,14 +203,14 @@ impl Io { cx: &mut Context<'_>, buffers: &'b mut Buffers, ) -> Poll< - Result< - Input< - impl Iterator + use<'b>, - impl for<'a> LendingIterator = DatagramIn<'a>> + use<>, - >, + Input< + impl Iterator + use<'b>, + impl for<'a> LendingIterator = DatagramIn<'a>> + use<>, >, > { - ready!(self.flush(cx)?); + if let Err(e) = ready!(self.flush(cx)) { + return Poll::Ready(Input::error(e)); + } if self.reval_nameserver_interval.poll_tick(cx).is_ready() { self.nameservers.evaluate(); @@ -173,93 +219,103 @@ impl Io { // We purposely don't want to block the event loop here because we can do plenty of other work while this is running. let _ = self.nameservers.poll(cx); - if let Poll::Ready(network) = self.sockets.poll_recv_from(cx) { - return Poll::Ready(Ok(Input::Network( + let network = self.sockets.poll_recv_from(cx).map(|network| { + anyhow::Ok( network .context("UDP socket failed")? .filter(is_max_wg_packet_size), - ))); + ) + }); + + let device = self + .tun + .poll_read_many(cx, &mut buffers.ip, MAX_INBOUND_PACKET_BATCH) + .map(|num_packets| { + let num_ipv4 = buffers.ip[..num_packets] + .iter() + .filter(|p| p.ipv4_header().is_some()) + .count(); + let num_ipv6 = num_packets - num_ipv4; + + self.packet_counter.add( + num_ipv4 as u64, + &[ + otel::attr::network_type_ipv4(), + otel::attr::network_io_direction_receive(), + ], + ); + self.packet_counter.add( + num_ipv6 as u64, + &[ + otel::attr::network_type_ipv6(), + otel::attr::network_io_direction_receive(), + ], + ); + + buffers.ip.drain(..num_packets) + }); + + let udp_dns_query = self + .udp_dns_server + .poll(cx) + .map(|query| query.context("Failed to poll UDP DNS server")); + + let tcp_dns_query = self + .tcp_dns_server + .poll(cx) + .map(|query| query.context("Failed to poll TCP DNS server")); + + let dns_response = self + .dns_queries + .poll_unpin(cx) + .map(|(result, meta)| match result { + Ok(result) => dns::RecursiveResponse { + server: meta.server, + query: meta.query, + message: result, + transport: meta.transport, + }, + Err(e @ futures_bounded::Timeout { .. }) => dns::RecursiveResponse { + server: meta.server, + query: meta.query, + message: Err(io::Error::new(io::ErrorKind::TimedOut, e)), + transport: meta.transport, + }, + }); + + let timeout = self + .timeout + .as_mut() + .map(|timeout| timeout.poll_unpin(cx).is_ready()) + .unwrap_or(false); + + if timeout { + self.timeout = None; } - if let Poll::Ready(num_packets) = - self.tun - .poll_read_many(cx, &mut buffers.ip, MAX_INBOUND_PACKET_BATCH) + if !timeout + && device.is_pending() + && network.is_pending() + && tcp_dns_query.is_pending() + && udp_dns_query.is_pending() + && dns_response.is_pending() { - let num_ipv4 = buffers.ip[..num_packets] - .iter() - .filter(|p| p.ipv4_header().is_some()) - .count(); - let num_ipv6 = num_packets - num_ipv4; - - self.packet_counter.add( - num_ipv4 as u64, - &[ - otel::attr::network_type_ipv4(), - otel::attr::network_io_direction_receive(), - ], - ); - self.packet_counter.add( - num_ipv6 as u64, - &[ - otel::attr::network_type_ipv6(), - otel::attr::network_io_direction_receive(), - ], - ); - - return Poll::Ready(Ok(Input::Device(buffers.ip.drain(..num_packets)))); + return Poll::Pending; } - if let Poll::Ready(query) = self.udp_dns_server.poll(cx) { - return Poll::Ready(Ok(Input::UdpDnsQuery( - query.context("Failed to poll UDP DNS server")?, - ))); - } + let mut error = TunnelError::default(); - if let Poll::Ready(query) = self.tcp_dns_server.poll(cx) { - return Poll::Ready(Ok(Input::TcpDnsQuery( - query.context("Failed to poll TCP DNS server")?, - ))); - } - - match self.dns_queries.poll_unpin(cx) { - Poll::Ready((result, meta)) => { - let response = match result { - Ok(result) => dns::RecursiveResponse { - server: meta.server, - query: meta.query, - message: result, - transport: meta.transport, - }, - Err(e @ futures_bounded::Timeout { .. }) => dns::RecursiveResponse { - server: meta.server, - query: meta.query, - message: Err(io::Error::new(io::ErrorKind::TimedOut, e)), - transport: meta.transport, - }, - }; - - return Poll::Ready(Ok(Input::DnsResponse(response))); - } - Poll::Pending => {} - } - - if let Some(timeout) = self.timeout.as_mut() - && timeout.poll_unpin(cx).is_ready() - { - // Always emit `now` as the timeout value. - // This ensures that time within our state machine is always monotonic. - // If we were to use the `deadline` of the timer instead, time may go backwards. - // That is because it is valid to set a `Sleep` to a timestamp in the past. - // It will resolve immediately but it will still report the old timestamp as its deadline. - // To guard against this case, specifically call `Instant::now` here. - let now = Instant::now(); - - self.timeout = None; // Clear the timeout. - - return Poll::Ready(Ok(Input::Timeout(now))); - } - - Poll::Pending + Poll::Ready(Input { + now: Instant::now(), + now_utc: Utc::now(), + timeout, + device: poll_to_option(device), + network: poll_result_to_option(network, &mut error), + tcp_dns_query: poll_result_to_option(tcp_dns_query, &mut error), + udp_dns_query: poll_result_to_option(udp_dns_query, &mut error), + dns_response: poll_to_option(dns_response), + error, + }) } pub fn flush(&mut self, cx: &mut Context<'_>) -> Poll> { @@ -446,11 +502,11 @@ mod tests { let deadline = Instant::now() + Duration::from_secs(1); io.reset_timeout(deadline, ""); - let Input::Timeout(timeout) = io.next().await else { - panic!("Unexpected result"); - }; + let input = io.next().await; - assert!(timeout >= deadline, "timer expire after deadline"); + assert!(input.timeout); + assert!(input.now >= deadline, "timer expire after deadline"); + drop(input); let poll = io.poll_test(); @@ -465,9 +521,8 @@ mod tests { io.reset_timeout(now - Duration::from_secs(10), ""); - let Input::Timeout(timeout) = io.next().await else { - panic!("Unexpected result"); - }; + let input = io.next().await; + let timeout = input.now; assert!(timeout >= now, "timeout = {timeout:?}, now = {now:?}"); } @@ -501,17 +556,14 @@ mod tests { ) }) .await - .unwrap() } fn poll_test( &mut self, ) -> Poll< - Result< - Input< - impl Iterator + use<>, - impl for<'a> LendingIterator = DatagramIn<'a>> + use<>, - >, + Input< + impl Iterator + use<>, + impl for<'a> LendingIterator = DatagramIn<'a>> + use<>, >, > { self.poll( diff --git a/rust/connlib/tunnel/src/lib.rs b/rust/connlib/tunnel/src/lib.rs index 9f0bf1e03..f8319d68c 100644 --- a/rust/connlib/tunnel/src/lib.rs +++ b/rust/connlib/tunnel/src/lib.rs @@ -18,7 +18,7 @@ use ip_packet::Ecn; use socket_factory::{SocketFactory, TcpSocket, UdpSocket}; use std::{ collections::BTreeSet, - fmt, future, + fmt, future, mem, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}, sync::Arc, task::{Context, Poll, ready}, @@ -140,42 +140,62 @@ impl ClientTunnel { self.io.reset(); } - pub fn poll_next_event(&mut self, cx: &mut Context<'_>) -> Poll> { + pub fn poll_next_event(&mut self, cx: &mut Context<'_>) -> Poll { for _ in 0..MAX_EVENTLOOP_ITERS { + let mut ready = false; + ready!(self.io.poll_has_sockets(cx)); // Suspend everything if we don't have any sockets. - if let Some(e) = self.role_state.poll_event() { - return Poll::Ready(Ok(e)); + // Pass up existing events. + if let Some(event) = self.role_state.poll_event() { + return Poll::Ready(event); } - if let Some(packet) = self.role_state.poll_packets() { + // Drain all buffered IP packets. + while let Some(packet) = self.role_state.poll_packets() { self.io.send_tun(packet); - continue; + ready = true; } - if let Some(trans) = self.role_state.poll_transmit() { + // Drain all buffered transmits. + while let Some(trans) = self.role_state.poll_transmit() { self.io .send_network(trans.src, trans.dst, &trans.payload, Ecn::NonEct); - continue; + ready = true; } - if let Some(query) = self.role_state.poll_dns_queries() { + // Drain all scheduled DNS queries. + while let Some(query) = self.role_state.poll_dns_queries() { self.io.send_dns_query(query); - continue; + ready = true; } - if let Some((timeout, reason)) = self.role_state.poll_timeout() { - self.io.reset_timeout(timeout, reason); - } + // Process all IO sources that are ready. + if let Poll::Ready(io::Input { + now, + now_utc: _, + timeout, + dns_response, + tcp_dns_query: _, + udp_dns_query: _, + device, + network, + error, + }) = self.io.poll(cx, &mut self.buffers) + { + if let Some(response) = dns_response { + self.role_state.handle_dns_response(response); + self.role_state.handle_timeout(now); - match self.io.poll(cx, &mut self.buffers)? { - Poll::Ready(io::Input::Timeout(timeout)) => { - self.role_state.handle_timeout(timeout); - continue; + ready = true; } - Poll::Ready(io::Input::Device(packets)) => { - let now = Instant::now(); + if timeout { + self.role_state.handle_timeout(now); + ready = true; + } + + if let Some(packets) = device { for packet in packets { if packet.is_fz_p2p_control() { tracing::warn!("Packet matches heuristics of FZ p2p control protocol"); @@ -183,20 +203,25 @@ impl ClientTunnel { let ecn = packet.ecn(); - let Some(transmit) = self.role_state.handle_tun_input(packet, now) else { - self.role_state.handle_timeout(now); - continue; - }; - - self.io - .send_network(transmit.src, transmit.dst, &transmit.payload, ecn); + match self.role_state.handle_tun_input(packet, now) { + Some(transmit) => { + self.io.send_network( + transmit.src, + transmit.dst, + &transmit.payload, + ecn, + ); + } + None => { + self.role_state.handle_timeout(now); + } + } } - continue; + ready = true; } - Poll::Ready(io::Input::Network(mut packets)) => { - let now = Instant::now(); + if let Some(mut packets) = network { while let Some(received) = packets.next() { self.packet_counter.add( 1, @@ -207,35 +232,34 @@ impl ClientTunnel { ], ); - let Some(packet) = self.role_state.handle_network_input( + match self.role_state.handle_network_input( received.local, received.from, received.packet, now, - ) else { - self.role_state.handle_timeout(now); - continue; + ) { + Some(packet) => self + .io + .send_tun(packet.with_ecn_from_transport(received.ecn)), + None => self.role_state.handle_timeout(now), }; - - self.io - .send_tun(packet.with_ecn_from_transport(received.ecn)); } - continue; + ready = true; } - Poll::Ready(io::Input::DnsResponse(packet)) => { - self.role_state.handle_dns_response(packet); - self.role_state.handle_timeout(Instant::now()); - continue; + + // Reset timer for time-based wakeup. + if let Some((timeout, reason)) = self.role_state.poll_timeout() { + self.io.reset_timeout(timeout, reason); } - Poll::Ready(io::Input::UdpDnsQuery(_) | io::Input::TcpDnsQuery(_)) => { - debug_assert!( - false, - "Client does not (yet) use userspace DNS server sockets" - ); - continue; + + if !error.is_empty() { + return Poll::Ready(ClientEvent::Error(error)); } - Poll::Pending => {} + } + + if ready { + continue; } return Poll::Pending; @@ -293,26 +317,39 @@ impl GatewayTunnel { .boxed() } - pub fn poll_next_event(&mut self, cx: &mut Context<'_>) -> Poll> { + pub fn poll_next_event(&mut self, cx: &mut Context<'_>) -> Poll { for _ in 0..MAX_EVENTLOOP_ITERS { + let mut ready = false; + ready!(self.io.poll_has_sockets(cx)); // Suspend everything if we don't have any sockets. + // Pass up existing events. if let Some(other) = self.role_state.poll_event() { - return Poll::Ready(Ok(other)); + return Poll::Ready(other); } - if let Some(trans) = self.role_state.poll_transmit() { + // Drain all buffered transmits. + while let Some(trans) = self.role_state.poll_transmit() { self.io .send_network(trans.src, trans.dst, &trans.payload, Ecn::NonEct); - continue; + + ready = true; } - if let Some((timeout, reason)) = self.role_state.poll_timeout() { - self.io.reset_timeout(timeout, reason); - } - - match self.io.poll(cx, &mut self.buffers)? { - Poll::Ready(io::Input::DnsResponse(response)) => { + // Process all IO sources that are ready. + if let Poll::Ready(io::Input { + now, + now_utc, + timeout, + dns_response, + tcp_dns_query, + udp_dns_query, + device, + network, + mut error, + }) = self.io.poll(cx, &mut self.buffers) + { + if let Some(response) = dns_response { let message = response.message.unwrap_or_else(|e| { tracing::debug!("DNS query failed: {e}"); @@ -321,22 +358,26 @@ impl GatewayTunnel { match response.transport { dns::Transport::Udp { source } => { - self.io.send_udp_dns_response(source, message)?; + if let Err(e) = self.io.send_udp_dns_response(source, message) { + error.push(e); + } } dns::Transport::Tcp { remote, .. } => { - self.io.send_tcp_dns_response(remote, message)?; + if let Err(e) = self.io.send_tcp_dns_response(remote, message) { + error.push(e); + } } } - continue; + ready = true; } - Poll::Ready(io::Input::Timeout(timeout)) => { - self.role_state.handle_timeout(timeout, Utc::now()); - continue; - } - Poll::Ready(io::Input::Device(packets)) => { - let now = Instant::now(); + if timeout { + self.role_state.handle_timeout(now, now_utc); + ready = true; + } + + if let Some(packets) = device { for packet in packets { if packet.is_fz_p2p_control() { tracing::warn!("Packet matches heuristics of FZ p2p control protocol"); @@ -344,21 +385,26 @@ impl GatewayTunnel { let ecn = packet.ecn(); - let Some(transmit) = self.role_state.handle_tun_input(packet, now)? else { - self.role_state.handle_timeout(now, Utc::now()); - continue; - }; - - self.io - .send_network(transmit.src, transmit.dst, &transmit.payload, ecn); + match self.role_state.handle_tun_input(packet, now) { + Ok(Some(transmit)) => { + self.io.send_network( + transmit.src, + transmit.dst, + &transmit.payload, + ecn, + ); + } + Ok(None) => { + self.role_state.handle_timeout(now, Utc::now()); + } + Err(e) => error.push(e), + } } - continue; + ready = true; } - Poll::Ready(io::Input::Network(mut packets)) => { - let now = Instant::now(); - let utc_now = Utc::now(); + if let Some(mut packets) = network { while let Some(received) = packets.next() { self.packet_counter.add( 1, @@ -369,61 +415,78 @@ impl GatewayTunnel { ], ); - let Some(packet) = self.role_state.handle_network_input( + match self.role_state.handle_network_input( received.local, received.from, received.packet, now, - )? - else { - self.role_state.handle_timeout(now, utc_now); - continue; + ) { + Ok(Some(packet)) => self + .io + .send_tun(packet.with_ecn_from_transport(received.ecn)), + Ok(None) => self.role_state.handle_timeout(now, now_utc), + Err(e) => error.push(e), }; - - self.io - .send_tun(packet.with_ecn_from_transport(received.ecn)); } - continue; + ready = true; } - Poll::Ready(io::Input::UdpDnsQuery(query)) => { - let Some(nameserver) = self.io.fastest_nameserver() else { + + if let Some(query) = udp_dns_query { + if let Some(nameserver) = self.io.fastest_nameserver() { + self.io.send_dns_query(dns::RecursiveQuery::via_udp( + query.source, + SocketAddr::new(nameserver, dns::DNS_PORT), + query.message, + )); + } else { tracing::warn!(query = ?query.message, "No nameserver available to handle UDP DNS query"); - self.io.send_udp_dns_response( + if let Err(e) = self.io.send_udp_dns_response( query.source, dns_types::Response::servfail(&query.message), - )?; + ) { + error.push(e); + } + } - continue; - }; - - self.io.send_dns_query(dns::RecursiveQuery::via_udp( - query.source, - SocketAddr::new(nameserver, dns::DNS_PORT), - query.message, - )); + ready = true; } - Poll::Ready(io::Input::TcpDnsQuery(query)) => { - let Some(nameserver) = self.io.fastest_nameserver() else { + + if let Some(query) = tcp_dns_query { + if let Some(nameserver) = self.io.fastest_nameserver() { + self.io.send_dns_query(dns::RecursiveQuery::via_tcp( + query.local, + query.remote, + SocketAddr::new(nameserver, dns::DNS_PORT), + query.message, + )); + } else { tracing::warn!(query = ?query.message, "No nameserver available to handle TCP DNS query"); - self.io.send_tcp_dns_response( + if let Err(e) = self.io.send_tcp_dns_response( query.remote, dns_types::Response::servfail(&query.message), - )?; + ) { + error.push(e); + } + } - continue; - }; - - self.io.send_dns_query(dns::RecursiveQuery::via_tcp( - query.local, - query.remote, - SocketAddr::new(nameserver, dns::DNS_PORT), - query.message, - )); + ready = true; } - Poll::Pending => {} + + // Reset timer for time-based wakeup. + if let Some((timeout, reason)) = self.role_state.poll_timeout() { + self.io.reset_timeout(timeout, reason); + } + + if !error.is_empty() { + return Poll::Ready(GatewayEvent::Error(error)); + } + } + + if ready { + continue; } return Poll::Pending; @@ -435,7 +498,7 @@ impl GatewayTunnel { } } -#[derive(Clone, Debug)] +#[derive(Debug)] pub enum ClientEvent { AddedIceCandidates { conn_id: GatewayId, @@ -457,6 +520,7 @@ pub enum ClientEvent { records: BTreeSet, }, TunInterfaceUpdated(TunConfig), + Error(TunnelError), } #[derive(Clone, derive_more::Debug, PartialEq, Eq)] @@ -509,6 +573,49 @@ pub enum GatewayEvent { candidates: BTreeSet, }, ResolveDns(ResolveDnsRequest), + Error(TunnelError), +} + +/// A collection of errors that occurred during a single event-loop tick. +/// +/// This type purposely doesn't provide a `From` implementation for any errors. +/// We want compile-time safety inside the event-loop that we don't abort processing in the middle of a packet batch. +#[derive(Debug, Default)] +pub struct TunnelError { + errors: Vec, +} + +impl TunnelError { + pub fn single(e: impl Into) -> Self { + Self { + errors: vec![e.into()], + } + } + + pub fn push(&mut self, e: impl Into) { + self.errors.push(e.into()); + } + + pub fn is_empty(&self) -> bool { + self.errors.is_empty() + } + + pub fn drain(&mut self) -> impl Iterator { + mem::take(&mut self.errors).into_iter() + } +} + +impl Drop for TunnelError { + fn drop(&mut self) { + debug_assert!( + self.errors.is_empty(), + "should never drop `TunnelError` without consuming errors" + ); + + if !self.errors.is_empty() { + tracing::error!("should never drop `TunnelError` without consuming errors") + } + } } /// Adapter-struct to [`fmt::Display`] a [`BTreeSet`]. diff --git a/rust/connlib/tunnel/src/tests/sut.rs b/rust/connlib/tunnel/src/tests/sut.rs index d6e93fa43..9b9c66cd7 100644 --- a/rust/connlib/tunnel/src/tests/sut.rs +++ b/rust/connlib/tunnel/src/tests/sut.rs @@ -869,6 +869,7 @@ impl TunnelTest { Ok(()) } + ClientEvent::Error(_) => unreachable!("ClientState never emits `TunnelError`"), } } @@ -1006,5 +1007,6 @@ fn on_gateway_event( .unwrap() }) } + GatewayEvent::Error(_) => unreachable!("GatewayState never emits `TunnelError`"), } } diff --git a/rust/gateway/src/eventloop.rs b/rust/gateway/src/eventloop.rs index 856a0bdd9..fb4e29848 100644 --- a/rust/gateway/src/eventloop.rs +++ b/rust/gateway/src/eventloop.rs @@ -14,7 +14,7 @@ use firezone_tunnel::messages::gateway::{ use firezone_tunnel::messages::{ConnectionAccepted, GatewayResponse, RelaysPresence}; use firezone_tunnel::{ DnsResourceNatEntry, GatewayEvent, GatewayTunnel, IPV4_TUNNEL, IPV6_TUNNEL, IpConfig, - ResolveDnsRequest, + ResolveDnsRequest, TunnelError, }; use futures::FutureExt; use futures::future::BoxFuture; @@ -116,7 +116,7 @@ impl Eventloop { enum CombinedEvent { SigIntTerm, ShutdownComplete(Result<()>), - Tunnel(Result), + Tunnel(GatewayEvent), Portal(Option>), DomainResolved((Result, Arc>, ResolveTrigger)), } @@ -125,12 +125,9 @@ impl Eventloop { pub async fn run(mut self) -> Result<()> { loop { match future::poll_fn(|cx| self.next_event(cx)).await { - CombinedEvent::Tunnel(Ok(event)) => { + CombinedEvent::Tunnel(event) => { self.handle_tunnel_event(event).await?; } - CombinedEvent::Tunnel(Err(e)) => { - self.handle_tunnel_error(e)?; - } CombinedEvent::Portal(Some(Ok(msg))) => { self.handle_portal_message(msg).await?; } @@ -252,61 +249,64 @@ impl Eventloop { tracing::warn!("Too many dns resolution requests, dropping existing one"); }; } + GatewayEvent::Error(error) => self.handle_tunnel_error(error)?, } Ok(()) } - fn handle_tunnel_error(&mut self, e: anyhow::Error) -> Result<()> { - if e.root_cause() - .downcast_ref::() - .is_some_and(is_unreachable) - { - tracing::debug!("{e:#}"); // Log these on DEBUG so they don't go completely unnoticed. - return Ok(()); - } - - // Invalid Input can be all sorts of things but we mostly see it with unreachable addresses. - if e.root_cause() - .downcast_ref::() - .is_some_and(|e| e.kind() == io::ErrorKind::InvalidInput) - { - tracing::debug!("{e:#}"); - return Ok(()); - } - - // Unknown connection just means packets are bouncing on the TUN device because the Client disconnected. - if e.root_cause().is::() { - tracing::debug!("{e:#}"); - return Ok(()); - } - - if e.root_cause() - .downcast_ref::() - .is_some_and(|e| e.kind() == io::ErrorKind::PermissionDenied) - { - if !mem::replace(&mut self.logged_permission_denied, true) { - tracing::info!( - "Encountered `PermissionDenied` IO error. Check your local firewall rules to allow outbound STUN/TURN/WireGuard and general UDP traffic." - ) + fn handle_tunnel_error(&mut self, mut e: TunnelError) -> Result<()> { + for e in e.drain() { + if e.root_cause() + .downcast_ref::() + .is_some_and(is_unreachable) + { + tracing::debug!("{e:#}"); // Log these on DEBUG so they don't go completely unnoticed. + return Ok(()); } - return Ok(()); - } + // Invalid Input can be all sorts of things but we mostly see it with unreachable addresses. + if e.root_cause() + .downcast_ref::() + .is_some_and(|e| e.kind() == io::ErrorKind::InvalidInput) + { + tracing::debug!("{e:#}"); + return Ok(()); + } - if e.root_cause().is::() { - // Some IP packets cannot be translated and should be dropped "silently". - // Do so by ignoring the error here. - return Ok(()); - } + // Unknown connection just means packets are bouncing on the TUN device because the Client disconnected. + if e.root_cause().is::() { + tracing::debug!("{e:#}"); + return Ok(()); + } - if e.root_cause() - .is::() - { - return Err(e); - } + if e.root_cause() + .downcast_ref::() + .is_some_and(|e| e.kind() == io::ErrorKind::PermissionDenied) + { + if !mem::replace(&mut self.logged_permission_denied, true) { + tracing::info!( + "Encountered `PermissionDenied` IO error. Check your local firewall rules to allow outbound STUN/TURN/WireGuard and general UDP traffic." + ) + } - tracing::warn!("Tunnel error: {e:#}"); + return Ok(()); + } + + if e.root_cause().is::() { + // Some IP packets cannot be translated and should be dropped "silently". + // Do so by ignoring the error here. + return Ok(()); + } + + if e.root_cause() + .is::() + { + return Err(e); + } + + tracing::warn!("Tunnel error: {e:#}"); + } Ok(()) }