From a109c1a2ef07083840d86fcf844a4b2919095598 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 21 Aug 2025 05:42:54 +0000 Subject: [PATCH] feat(connlib): discard intermediate resource and TUN updates (#10223) Right now, the Client event-loops have a channel with 1000 items for sending new resource lists and updates to the TUN device to the host app. This is kind of unnecessary as we always only care about the last version of these. Intermediate updates that the host app doesn't process are effectively irrelevant. We've had an issue before where a bug in the portal caused us to receive many updates to resources which ended up crashing Client apps because this channel filled up. To be more resilient on this front, we refactor the Client event loop to use a `watch` channel for this. Watch channels only retain the last value that got sent into them. --- rust/Cargo.lock | 2 + rust/apple-client-ffi/src/lib.rs | 25 ++-- .../src/tun_device_manager/linux.rs | 4 +- .../src/tun_device_manager/macos.rs | 4 +- .../src/tun_device_manager/windows.rs | 8 +- rust/client-ffi/src/lib.rs | 24 ++-- rust/client-shared/Cargo.toml | 1 + rust/client-shared/src/eventloop.rs | 82 +++-------- rust/client-shared/src/lib.rs | 131 +++++++++--------- rust/client-shared/src/serde_routelist.rs | 4 +- rust/connlib/tunnel/src/lib.rs | 6 + rust/gui-client/src-tauri/src/service.rs | 19 ++- rust/headless-client/src/main.rs | 16 +-- 13 files changed, 139 insertions(+), 187 deletions(-) diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 769a9f7f5..3fd2749e7 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1360,6 +1360,7 @@ dependencies = [ "thiserror 2.0.15", "time", "tokio", + "tokio-stream", "tracing", "tun", "url", @@ -7996,6 +7997,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] diff --git a/rust/apple-client-ffi/src/lib.rs b/rust/apple-client-ffi/src/lib.rs index d49435a2e..6eaf2cd98 100644 --- a/rust/apple-client-ffi/src/lib.rs +++ b/rust/apple-client-ffi/src/lib.rs @@ -142,8 +142,8 @@ impl CallbackHandler { tunnel_address_v6: Ipv6Addr, dns_addresses: Vec, search_domain: Option, - route_list_v4: Vec, - route_list_v6: Vec, + route_list_v4: impl IntoIterator, + route_list_v6: impl IntoIterator, ) { match ( serde_json::to_string(&dns_addresses), @@ -315,21 +315,14 @@ impl WrappedSession { while let Some(event) = event_stream.next().await { match event { - Event::TunInterfaceUpdated { - ipv4, - ipv6, - dns, - search_domain, - ipv4_routes, - ipv6_routes, - } => { + Event::TunInterfaceUpdated(config) => { callback_handler.on_set_interface_config( - ipv4, - ipv6, - dns, - search_domain, - ipv4_routes, - ipv6_routes, + config.ip.v4, + config.ip.v6, + config.dns_sentinel_ips(), + config.search_domain, + config.ipv4_routes, + config.ipv6_routes, ); } Event::ResourcesUpdated(resource_views) => { diff --git a/rust/bin-shared/src/tun_device_manager/linux.rs b/rust/bin-shared/src/tun_device_manager/linux.rs index 5a430a35a..45f0cc41a 100644 --- a/rust/bin-shared/src/tun_device_manager/linux.rs +++ b/rust/bin-shared/src/tun_device_manager/linux.rs @@ -175,8 +175,8 @@ impl TunDeviceManager { pub async fn set_routes( &mut self, - ipv4: Vec, - ipv6: Vec, + ipv4: impl IntoIterator, + ipv6: impl IntoIterator, ) -> Result<()> { let new_routes: HashSet = ipv4 .into_iter() diff --git a/rust/bin-shared/src/tun_device_manager/macos.rs b/rust/bin-shared/src/tun_device_manager/macos.rs index 3e6541b6f..58a69d3f0 100644 --- a/rust/bin-shared/src/tun_device_manager/macos.rs +++ b/rust/bin-shared/src/tun_device_manager/macos.rs @@ -29,8 +29,8 @@ impl TunDeviceManager { )] pub async fn set_routes( &mut self, - _ipv4: Vec, - _ipv6: Vec, + _ipv4: impl IntoIterator, + _ipv6: impl IntoIterator, ) -> Result<()> { bail!("Not implemented") } diff --git a/rust/bin-shared/src/tun_device_manager/windows.rs b/rust/bin-shared/src/tun_device_manager/windows.rs index 67df51867..089f15200 100644 --- a/rust/bin-shared/src/tun_device_manager/windows.rs +++ b/rust/bin-shared/src/tun_device_manager/windows.rs @@ -99,8 +99,12 @@ impl TunDeviceManager { Ok(()) } - #[tracing::instrument(level = "trace", skip(self))] - pub async fn set_routes(&mut self, v4: Vec, v6: Vec) -> Result<()> { + #[expect(clippy::unused_async, reason = "Must match Linux API")] + pub async fn set_routes( + &mut self, + v4: impl IntoIterator, + v6: impl IntoIterator, + ) -> Result<()> { let iface_idx = self .iface_idx .context("Cannot set routes without having created TUN device")?; diff --git a/rust/client-ffi/src/lib.rs b/rust/client-ffi/src/lib.rs index c1b0c6c58..7fcee34bf 100644 --- a/rust/client-ffi/src/lib.rs +++ b/rust/client-ffi/src/lib.rs @@ -169,25 +169,21 @@ impl Session { pub async fn next_event(&self) -> Result, Error> { match self.events.lock().await.next().await { - Some(client_shared::Event::TunInterfaceUpdated { - ipv4, - ipv6, - dns, - search_domain, - ipv4_routes, - ipv6_routes, - }) => { - let dns = serde_json::to_string(&dns).context("Failed to serialize DNS servers")?; - let ipv4_routes = serde_json::to_string(&V4RouteList::new(ipv4_routes)) + Some(client_shared::Event::TunInterfaceUpdated(config)) => { + let dns = serde_json::to_string( + &config.dns_by_sentinel.left_values().collect::>(), + ) + .context("Failed to serialize DNS servers")?; + let ipv4_routes = serde_json::to_string(&V4RouteList::new(config.ipv4_routes)) .context("Failed to serialize IPv4 routes")?; - let ipv6_routes = serde_json::to_string(&V6RouteList::new(ipv6_routes)) + let ipv6_routes = serde_json::to_string(&V6RouteList::new(config.ipv6_routes)) .context("Failed to serialize IPv6 routes")?; Ok(Some(Event::TunInterfaceUpdated { - ipv4: ipv4.to_string(), - ipv6: ipv6.to_string(), + ipv4: config.ip.v4.to_string(), + ipv6: config.ip.v6.to_string(), dns, - search_domain: search_domain.map(|d| d.to_string()), + search_domain: config.search_domain.map(|d| d.to_string()), ipv4_routes, ipv6_routes, })) diff --git a/rust/client-shared/Cargo.toml b/rust/client-shared/Cargo.toml index 9d34fe61f..5e3b04f3a 100644 --- a/rust/client-shared/Cargo.toml +++ b/rust/client-shared/Cargo.toml @@ -23,6 +23,7 @@ socket-factory = { workspace = true } thiserror = { workspace = true } time = { workspace = true, features = ["formatting"] } tokio = { workspace = true, features = ["rt", "sync"] } +tokio-stream = { workspace = true, features = ["sync"] } tracing = { workspace = true, features = ["std", "attributes"] } tun = { workspace = true } url = { workspace = true, features = ["serde"] } diff --git a/rust/client-shared/src/eventloop.rs b/rust/client-shared/src/eventloop.rs index 33fb42002..e21ce62a7 100644 --- a/rust/client-shared/src/eventloop.rs +++ b/rust/client-shared/src/eventloop.rs @@ -1,16 +1,13 @@ use crate::PHOENIX_TOPIC; use anyhow::{Context as _, Result}; use connlib_model::{PublicKey, ResourceId, ResourceView}; -use dns_types::DomainName; use firezone_tunnel::messages::RelaysPresence; use firezone_tunnel::messages::client::{ EgressMessages, FailReason, FlowCreated, FlowCreationFailed, GatewayIceCandidates, GatewaysIceCandidates, IngressMessages, InitClient, }; use firezone_tunnel::{ClientEvent, ClientTunnel, IpConfig, TunConfig}; -use ip_network::{Ipv4Network, Ipv6Network}; use phoenix_channel::{ErrorReply, PhoenixChannel, PublicKeyParam}; -use std::net::{Ipv4Addr, Ipv6Addr}; use std::ops::ControlFlow; use std::pin::pin; use std::time::Instant; @@ -21,16 +18,17 @@ use std::{ task::{Context, Poll}, }; use std::{future, mem}; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, watch}; use tun::Tun; pub struct Eventloop { tunnel: ClientTunnel, cmd_rx: mpsc::UnboundedReceiver, - event_tx: mpsc::Sender, + resource_list_sender: watch::Sender>, + tun_config_sender: watch::Sender>, - portal_event_rx: mpsc::Receiver, + portal_event_rx: mpsc::Receiver>, portal_cmd_tx: mpsc::Sender, logged_permission_denied: bool, @@ -45,46 +43,11 @@ pub enum Command { SetDisabledResources(BTreeSet), } -pub enum Event { - TunInterfaceUpdated { - ipv4: Ipv4Addr, - ipv6: Ipv6Addr, - dns: Vec, - search_domain: Option, - ipv4_routes: Vec, - ipv6_routes: Vec, - }, - ResourcesUpdated(Vec), - Disconnected(DisconnectError), -} - -impl Event { - fn tun_interface_updated(config: TunConfig) -> Self { - Self::TunInterfaceUpdated { - ipv4: config.ip.v4, - ipv6: config.ip.v6, - dns: config.dns_by_sentinel.left_values().copied().collect(), - search_domain: config.search_domain, - ipv4_routes: Vec::from_iter(config.ipv4_routes), - ipv6_routes: Vec::from_iter(config.ipv6_routes), - } - } -} - enum PortalCommand { Connect(PublicKeyParam), Send(EgressMessages), } -#[expect( - clippy::large_enum_variant, - reason = "This type is only sent through a channel so the stack-size doesn't matter much." -)] -enum PortalEvent { - Received(IngressMessages), - Error(phoenix_channel::Error), -} - /// Unified error type to use across connlib. #[derive(thiserror::Error, Debug)] #[error("{0:#}")] @@ -111,7 +74,8 @@ impl Eventloop { tunnel: ClientTunnel, mut portal: PhoenixChannel<(), IngressMessages, (), PublicKeyParam>, cmd_rx: mpsc::UnboundedReceiver, - event_tx: mpsc::Sender, + resource_list_sender: watch::Sender>, + tun_config_sender: watch::Sender>, ) -> Self { let (portal_event_tx, portal_event_rx) = mpsc::channel(128); let (portal_cmd_tx, portal_cmd_rx) = mpsc::channel(128); @@ -127,10 +91,11 @@ impl Eventloop { Self { tunnel, cmd_rx, - event_tx, logged_permission_denied: false, portal_event_rx, portal_cmd_tx, + resource_list_sender, + tun_config_sender, } } } @@ -138,11 +103,11 @@ impl Eventloop { enum CombinedEvent { Command(Option), Tunnel(Result), - Portal(Option), + Portal(Option>), } impl Eventloop { - pub async fn run(mut self) -> Result<()> { + pub async fn run(mut self) -> Result<(), DisconnectError> { loop { match future::poll_fn(|cx| self.next_event(cx)).await { CombinedEvent::Command(None) => return Ok(()), @@ -154,14 +119,15 @@ impl Eventloop { } CombinedEvent::Tunnel(Ok(event)) => self.handle_tunnel_event(event).await?, CombinedEvent::Tunnel(Err(e)) => self.handle_tunnel_error(e)?, - CombinedEvent::Portal(Some(PortalEvent::Received(msg))) => { + CombinedEvent::Portal(Some(event)) => { + let msg = event.context("Connection to portal failed")?; + self.handle_portal_message(msg).await?; } - CombinedEvent::Portal(Some(PortalEvent::Error(e))) => { - return Err(e).context("Connection to portal failed"); - } CombinedEvent::Portal(None) => { - return Err(anyhow::Error::msg("portal task exited unexpectedly")); + return Err(DisconnectError(anyhow::Error::msg( + "portal task exited unexpectedly", + ))); } } } @@ -238,15 +204,13 @@ impl Eventloop { .context("Failed to send message to portal")?; } ClientEvent::ResourcesChanged { resources } => { - self.event_tx - .send(Event::ResourcesUpdated(resources)) - .await + self.resource_list_sender + .send(resources) .context("Failed to emit event")?; } ClientEvent::TunInterfaceUpdated(config) => { - self.event_tx - .send(Event::tun_interface_updated(config)) - .await + self.tun_config_sender + .send(Some(config)) .context("Failed to emit event")?; } } @@ -431,7 +395,7 @@ impl Eventloop { async fn phoenix_channel_event_loop( mut portal: PhoenixChannel<(), IngressMessages, (), PublicKeyParam>, - event_tx: mpsc::Sender, + event_tx: mpsc::Sender>, mut cmd_rx: mpsc::Receiver, ) { use futures::future::Either; @@ -441,7 +405,7 @@ async fn phoenix_channel_event_loop( loop { match select(poll_fn(|cx| portal.poll(cx)), pin!(cmd_rx.recv())).await { Either::Left((Ok(phoenix_channel::Event::InboundMessage { msg, .. }), _)) => { - if event_tx.send(PortalEvent::Received(msg)).await.is_err() { + if event_tx.send(Ok(msg)).await.is_err() { tracing::debug!("Event channel closed: exiting phoenix-channel event-loop"); break; @@ -479,7 +443,7 @@ async fn phoenix_channel_event_loop( "Hiccup in portal connection: {error:#}" ), Either::Left((Err(e), _)) => { - let _ = event_tx.send(PortalEvent::Error(e)).await; // We don't care about the result because we are exiting anyway. + let _ = event_tx.send(Err(e)).await; // We don't care about the result because we are exiting anyway. break; } diff --git a/rust/client-shared/src/lib.rs b/rust/client-shared/src/lib.rs index 5b27a104a..6371f2c01 100644 --- a/rust/client-shared/src/lib.rs +++ b/rust/client-shared/src/lib.rs @@ -1,21 +1,26 @@ //! Main connlib library for clients. pub use crate::serde_routelist::{V4RouteList, V6RouteList}; pub use connlib_model::StaticSecret; -pub use eventloop::{DisconnectError, Event}; +pub use eventloop::DisconnectError; +pub use firezone_tunnel::TunConfig; pub use firezone_tunnel::messages::client::{IngressMessages, ResourceDescription}; -use anyhow::{Context as _, Result}; -use connlib_model::ResourceId; +use anyhow::Result; +use connlib_model::{ResourceId, ResourceView}; use eventloop::{Command, Eventloop}; use firezone_tunnel::ClientTunnel; +use futures::{FutureExt, StreamExt}; use phoenix_channel::{PhoenixChannel, PublicKeyParam}; use socket_factory::{SocketFactory, TcpSocket, UdpSocket}; use std::collections::BTreeSet; +use std::future; use std::net::IpAddr; use std::sync::Arc; use std::task::{Context, Poll}; -use tokio::sync::mpsc::{Receiver, Sender, UnboundedReceiver, UnboundedSender}; +use tokio::sync::mpsc; +use tokio::sync::watch; use tokio::task::JoinHandle; +use tokio_stream::wrappers::WatchStream; use tun::Tun; mod eventloop; @@ -29,12 +34,21 @@ const PHOENIX_TOPIC: &str = "client"; /// To stop the session, simply drop this struct. #[derive(Clone, Debug)] pub struct Session { - channel: UnboundedSender, + channel: mpsc::UnboundedSender, } #[derive(Debug)] pub struct EventStream { - channel: Receiver, + eventloop: JoinHandle>, + resource_list_receiver: WatchStream>, + tun_config_receiver: WatchStream>, +} + +#[derive(Debug)] +pub enum Event { + TunInterfaceUpdated(TunConfig), + ResourcesUpdated(Vec), + Disconnected(DisconnectError), } impl Session { @@ -47,19 +61,31 @@ impl Session { portal: PhoenixChannel<(), IngressMessages, (), PublicKeyParam>, handle: tokio::runtime::Handle, ) -> (Self, EventStream) { - let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel(); - let (event_tx, event_rx) = tokio::sync::mpsc::channel(1000); + let (cmd_tx, cmd_rx) = mpsc::unbounded_channel(); - let connect_handle = handle.spawn(connect( - tcp_socket_factory, - udp_socket_factory, - portal, - cmd_rx, - event_tx.clone(), - )); - handle.spawn(connect_supervisor(connect_handle, event_tx)); + // Use `watch` channels for resource list and TUN config because we are only ever interested in the last value and don't care about intermediate updates. + let (tun_config_sender, tun_config_receiver) = watch::channel(None); + let (resource_list_sender, resource_list_receiver) = watch::channel(Vec::default()); - (Self { channel: cmd_tx }, EventStream { channel: event_rx }) + let eventloop = handle.spawn( + Eventloop::new( + ClientTunnel::new(tcp_socket_factory, udp_socket_factory), + portal, + cmd_rx, + resource_list_sender, + tun_config_sender, + ) + .run(), + ); + + ( + Self { channel: cmd_tx }, + EventStream { + eventloop, + resource_list_receiver: WatchStream::from_changes(resource_list_receiver), + tun_config_receiver: WatchStream::from_changes(tun_config_receiver), + }, + ) } /// Reset a [`Session`]. @@ -114,11 +140,30 @@ impl Session { impl EventStream { pub fn poll_next(&mut self, cx: &mut Context) -> Poll> { - self.channel.poll_recv(cx) + match self.eventloop.poll_unpin(cx) { + Poll::Ready(Ok(Ok(()))) => return Poll::Ready(None), + Poll::Ready(Ok(Err(e))) => return Poll::Ready(Some(Event::Disconnected(e))), + Poll::Ready(Err(e)) => { + return Poll::Ready(Some(Event::Disconnected(DisconnectError::from( + anyhow::Error::new(e).context("connlib crashed"), + )))); + } + Poll::Pending => {} + } + + if let Poll::Ready(Some(resources)) = self.resource_list_receiver.poll_next_unpin(cx) { + return Poll::Ready(Some(Event::ResourcesUpdated(resources))); + } + + if let Poll::Ready(Some(Some(config))) = self.tun_config_receiver.poll_next_unpin(cx) { + return Poll::Ready(Some(Event::TunInterfaceUpdated(config))); + } + + Poll::Pending } pub async fn next(&mut self) -> Option { - self.channel.recv().await + future::poll_fn(|cx| self.poll_next(cx)).await } } @@ -127,51 +172,3 @@ impl Drop for Session { tracing::debug!("`Session` dropped") } } - -/// Connects to the portal and starts a tunnel. -/// -/// When this function exits, the tunnel failed unrecoverably and you need to call it again. -async fn connect( - tcp_socket_factory: Arc>, - udp_socket_factory: Arc>, - portal: PhoenixChannel<(), IngressMessages, (), PublicKeyParam>, - cmd_rx: UnboundedReceiver, - event_tx: Sender, -) -> Result<()> { - Eventloop::new( - ClientTunnel::new(tcp_socket_factory, udp_socket_factory), - portal, - cmd_rx, - event_tx, - ) - .run() - .await?; - - Ok(()) -} - -/// A supervisor task that handles, when [`connect`] exits. -async fn connect_supervisor( - connect_handle: JoinHandle>, - event_tx: tokio::sync::mpsc::Sender, -) { - let task = async { - connect_handle.await.context("connlib crashed")??; - - Ok(()) - }; - - let error = match task.await { - Ok(()) => { - tracing::info!("connlib exited gracefully"); - - return; - } - Err(e) => e, - }; - - match event_tx.send(Event::Disconnected(error)).await { - Ok(()) => (), - Err(_) => tracing::debug!("Event stream closed before we could send disconnected event"), - } -} diff --git a/rust/client-shared/src/serde_routelist.rs b/rust/client-shared/src/serde_routelist.rs index 3ad351162..902a61a61 100644 --- a/rust/client-shared/src/serde_routelist.rs +++ b/rust/client-shared/src/serde_routelist.rs @@ -13,7 +13,7 @@ struct Cidr { pub struct V4RouteList(Vec>); impl V4RouteList { - pub fn new(route: Vec) -> Self { + pub fn new(route: impl IntoIterator) -> Self { Self( route .into_iter() @@ -32,7 +32,7 @@ impl V4RouteList { pub struct V6RouteList(Vec>); impl V6RouteList { - pub fn new(route: Vec) -> Self { + pub fn new(route: impl IntoIterator) -> Self { Self( route .into_iter() diff --git a/rust/connlib/tunnel/src/lib.rs b/rust/connlib/tunnel/src/lib.rs index 1f69d33cb..bca296e72 100644 --- a/rust/connlib/tunnel/src/lib.rs +++ b/rust/connlib/tunnel/src/lib.rs @@ -446,6 +446,12 @@ pub struct TunConfig { pub ipv6_routes: BTreeSet, } +impl TunConfig { + pub fn dns_sentinel_ips(&self) -> Vec { + self.dns_by_sentinel.left_values().copied().collect() + } +} + #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct IpConfig { pub v4: Ipv4Addr, diff --git a/rust/gui-client/src-tauri/src/service.rs b/rust/gui-client/src-tauri/src/service.rs index 56e74a92b..7a35bcc1f 100644 --- a/rust/gui-client/src-tauri/src/service.rs +++ b/rust/gui-client/src-tauri/src/service.rs @@ -473,19 +473,16 @@ impl<'a> Handler<'a> { }) .await? } - client_shared::Event::TunInterfaceUpdated { - ipv4, - ipv6, - dns, - search_domain, - ipv4_routes, - ipv6_routes, - } => { + client_shared::Event::TunInterfaceUpdated(config) => { self.session.transition_to_connected()?; - self.tun_device.set_ips(ipv4, ipv6).await?; - self.dns_controller.set_dns(dns, search_domain).await?; - self.tun_device.set_routes(ipv4_routes, ipv6_routes).await?; + self.tun_device.set_ips(config.ip.v4, config.ip.v6).await?; + self.dns_controller + .set_dns(config.dns_sentinel_ips(), config.search_domain) + .await?; + self.tun_device + .set_routes(config.ipv4_routes, config.ipv6_routes) + .await?; self.dns_controller.flush()?; } client_shared::Event::ResourcesUpdated(resources) => { diff --git a/rust/headless-client/src/main.rs b/rust/headless-client/src/main.rs index 5b430e61c..4d8de9b6b 100644 --- a/rust/headless-client/src/main.rs +++ b/rust/headless-client/src/main.rs @@ -328,18 +328,10 @@ fn main() -> Result<()> { // On every Resources update, flush DNS to mitigate dns_controller.flush()?; } - client_shared::Event::TunInterfaceUpdated { - ipv4, - ipv6, - dns, - search_domain, - ipv4_routes, - ipv6_routes, - } => { - tun_device.set_ips(ipv4, ipv6).await?; - tun_device.set_routes(ipv4_routes, ipv6_routes).await?; - - dns_controller.set_dns(dns, search_domain).await?; + client_shared::Event::TunInterfaceUpdated(config) => { + tun_device.set_ips(config.ip.v4, config.ip.v6).await?; + dns_controller.set_dns(config.dns_sentinel_ips(), config.search_domain).await?; + tun_device.set_routes(config.ipv4_routes, config.ipv6_routes).await?; // `on_set_interface_config` is guaranteed to be called when the tunnel is completely ready //