diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 769a9f7f5..3fd2749e7 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1360,6 +1360,7 @@ dependencies = [ "thiserror 2.0.15", "time", "tokio", + "tokio-stream", "tracing", "tun", "url", @@ -7996,6 +7997,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] diff --git a/rust/apple-client-ffi/src/lib.rs b/rust/apple-client-ffi/src/lib.rs index d49435a2e..6eaf2cd98 100644 --- a/rust/apple-client-ffi/src/lib.rs +++ b/rust/apple-client-ffi/src/lib.rs @@ -142,8 +142,8 @@ impl CallbackHandler { tunnel_address_v6: Ipv6Addr, dns_addresses: Vec, search_domain: Option, - route_list_v4: Vec, - route_list_v6: Vec, + route_list_v4: impl IntoIterator, + route_list_v6: impl IntoIterator, ) { match ( serde_json::to_string(&dns_addresses), @@ -315,21 +315,14 @@ impl WrappedSession { while let Some(event) = event_stream.next().await { match event { - Event::TunInterfaceUpdated { - ipv4, - ipv6, - dns, - search_domain, - ipv4_routes, - ipv6_routes, - } => { + Event::TunInterfaceUpdated(config) => { callback_handler.on_set_interface_config( - ipv4, - ipv6, - dns, - search_domain, - ipv4_routes, - ipv6_routes, + config.ip.v4, + config.ip.v6, + config.dns_sentinel_ips(), + config.search_domain, + config.ipv4_routes, + config.ipv6_routes, ); } Event::ResourcesUpdated(resource_views) => { diff --git a/rust/bin-shared/src/tun_device_manager/linux.rs b/rust/bin-shared/src/tun_device_manager/linux.rs index 5a430a35a..45f0cc41a 100644 --- a/rust/bin-shared/src/tun_device_manager/linux.rs +++ b/rust/bin-shared/src/tun_device_manager/linux.rs @@ -175,8 +175,8 @@ impl TunDeviceManager { pub async fn set_routes( &mut self, - ipv4: Vec, - ipv6: Vec, + ipv4: impl IntoIterator, + ipv6: impl IntoIterator, ) -> Result<()> { let new_routes: HashSet = ipv4 .into_iter() diff --git a/rust/bin-shared/src/tun_device_manager/macos.rs b/rust/bin-shared/src/tun_device_manager/macos.rs index 3e6541b6f..58a69d3f0 100644 --- a/rust/bin-shared/src/tun_device_manager/macos.rs +++ b/rust/bin-shared/src/tun_device_manager/macos.rs @@ -29,8 +29,8 @@ impl TunDeviceManager { )] pub async fn set_routes( &mut self, - _ipv4: Vec, - _ipv6: Vec, + _ipv4: impl IntoIterator, + _ipv6: impl IntoIterator, ) -> Result<()> { bail!("Not implemented") } diff --git a/rust/bin-shared/src/tun_device_manager/windows.rs b/rust/bin-shared/src/tun_device_manager/windows.rs index 67df51867..089f15200 100644 --- a/rust/bin-shared/src/tun_device_manager/windows.rs +++ b/rust/bin-shared/src/tun_device_manager/windows.rs @@ -99,8 +99,12 @@ impl TunDeviceManager { Ok(()) } - #[tracing::instrument(level = "trace", skip(self))] - pub async fn set_routes(&mut self, v4: Vec, v6: Vec) -> Result<()> { + #[expect(clippy::unused_async, reason = "Must match Linux API")] + pub async fn set_routes( + &mut self, + v4: impl IntoIterator, + v6: impl IntoIterator, + ) -> Result<()> { let iface_idx = self .iface_idx .context("Cannot set routes without having created TUN device")?; diff --git a/rust/client-ffi/src/lib.rs b/rust/client-ffi/src/lib.rs index c1b0c6c58..7fcee34bf 100644 --- a/rust/client-ffi/src/lib.rs +++ b/rust/client-ffi/src/lib.rs @@ -169,25 +169,21 @@ impl Session { pub async fn next_event(&self) -> Result, Error> { match self.events.lock().await.next().await { - Some(client_shared::Event::TunInterfaceUpdated { - ipv4, - ipv6, - dns, - search_domain, - ipv4_routes, - ipv6_routes, - }) => { - let dns = serde_json::to_string(&dns).context("Failed to serialize DNS servers")?; - let ipv4_routes = serde_json::to_string(&V4RouteList::new(ipv4_routes)) + Some(client_shared::Event::TunInterfaceUpdated(config)) => { + let dns = serde_json::to_string( + &config.dns_by_sentinel.left_values().collect::>(), + ) + .context("Failed to serialize DNS servers")?; + let ipv4_routes = serde_json::to_string(&V4RouteList::new(config.ipv4_routes)) .context("Failed to serialize IPv4 routes")?; - let ipv6_routes = serde_json::to_string(&V6RouteList::new(ipv6_routes)) + let ipv6_routes = serde_json::to_string(&V6RouteList::new(config.ipv6_routes)) .context("Failed to serialize IPv6 routes")?; Ok(Some(Event::TunInterfaceUpdated { - ipv4: ipv4.to_string(), - ipv6: ipv6.to_string(), + ipv4: config.ip.v4.to_string(), + ipv6: config.ip.v6.to_string(), dns, - search_domain: search_domain.map(|d| d.to_string()), + search_domain: config.search_domain.map(|d| d.to_string()), ipv4_routes, ipv6_routes, })) diff --git a/rust/client-shared/Cargo.toml b/rust/client-shared/Cargo.toml index 9d34fe61f..5e3b04f3a 100644 --- a/rust/client-shared/Cargo.toml +++ b/rust/client-shared/Cargo.toml @@ -23,6 +23,7 @@ socket-factory = { workspace = true } thiserror = { workspace = true } time = { workspace = true, features = ["formatting"] } tokio = { workspace = true, features = ["rt", "sync"] } +tokio-stream = { workspace = true, features = ["sync"] } tracing = { workspace = true, features = ["std", "attributes"] } tun = { workspace = true } url = { workspace = true, features = ["serde"] } diff --git a/rust/client-shared/src/eventloop.rs b/rust/client-shared/src/eventloop.rs index 33fb42002..e21ce62a7 100644 --- a/rust/client-shared/src/eventloop.rs +++ b/rust/client-shared/src/eventloop.rs @@ -1,16 +1,13 @@ use crate::PHOENIX_TOPIC; use anyhow::{Context as _, Result}; use connlib_model::{PublicKey, ResourceId, ResourceView}; -use dns_types::DomainName; use firezone_tunnel::messages::RelaysPresence; use firezone_tunnel::messages::client::{ EgressMessages, FailReason, FlowCreated, FlowCreationFailed, GatewayIceCandidates, GatewaysIceCandidates, IngressMessages, InitClient, }; use firezone_tunnel::{ClientEvent, ClientTunnel, IpConfig, TunConfig}; -use ip_network::{Ipv4Network, Ipv6Network}; use phoenix_channel::{ErrorReply, PhoenixChannel, PublicKeyParam}; -use std::net::{Ipv4Addr, Ipv6Addr}; use std::ops::ControlFlow; use std::pin::pin; use std::time::Instant; @@ -21,16 +18,17 @@ use std::{ task::{Context, Poll}, }; use std::{future, mem}; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, watch}; use tun::Tun; pub struct Eventloop { tunnel: ClientTunnel, cmd_rx: mpsc::UnboundedReceiver, - event_tx: mpsc::Sender, + resource_list_sender: watch::Sender>, + tun_config_sender: watch::Sender>, - portal_event_rx: mpsc::Receiver, + portal_event_rx: mpsc::Receiver>, portal_cmd_tx: mpsc::Sender, logged_permission_denied: bool, @@ -45,46 +43,11 @@ pub enum Command { SetDisabledResources(BTreeSet), } -pub enum Event { - TunInterfaceUpdated { - ipv4: Ipv4Addr, - ipv6: Ipv6Addr, - dns: Vec, - search_domain: Option, - ipv4_routes: Vec, - ipv6_routes: Vec, - }, - ResourcesUpdated(Vec), - Disconnected(DisconnectError), -} - -impl Event { - fn tun_interface_updated(config: TunConfig) -> Self { - Self::TunInterfaceUpdated { - ipv4: config.ip.v4, - ipv6: config.ip.v6, - dns: config.dns_by_sentinel.left_values().copied().collect(), - search_domain: config.search_domain, - ipv4_routes: Vec::from_iter(config.ipv4_routes), - ipv6_routes: Vec::from_iter(config.ipv6_routes), - } - } -} - enum PortalCommand { Connect(PublicKeyParam), Send(EgressMessages), } -#[expect( - clippy::large_enum_variant, - reason = "This type is only sent through a channel so the stack-size doesn't matter much." -)] -enum PortalEvent { - Received(IngressMessages), - Error(phoenix_channel::Error), -} - /// Unified error type to use across connlib. #[derive(thiserror::Error, Debug)] #[error("{0:#}")] @@ -111,7 +74,8 @@ impl Eventloop { tunnel: ClientTunnel, mut portal: PhoenixChannel<(), IngressMessages, (), PublicKeyParam>, cmd_rx: mpsc::UnboundedReceiver, - event_tx: mpsc::Sender, + resource_list_sender: watch::Sender>, + tun_config_sender: watch::Sender>, ) -> Self { let (portal_event_tx, portal_event_rx) = mpsc::channel(128); let (portal_cmd_tx, portal_cmd_rx) = mpsc::channel(128); @@ -127,10 +91,11 @@ impl Eventloop { Self { tunnel, cmd_rx, - event_tx, logged_permission_denied: false, portal_event_rx, portal_cmd_tx, + resource_list_sender, + tun_config_sender, } } } @@ -138,11 +103,11 @@ impl Eventloop { enum CombinedEvent { Command(Option), Tunnel(Result), - Portal(Option), + Portal(Option>), } impl Eventloop { - pub async fn run(mut self) -> Result<()> { + pub async fn run(mut self) -> Result<(), DisconnectError> { loop { match future::poll_fn(|cx| self.next_event(cx)).await { CombinedEvent::Command(None) => return Ok(()), @@ -154,14 +119,15 @@ impl Eventloop { } CombinedEvent::Tunnel(Ok(event)) => self.handle_tunnel_event(event).await?, CombinedEvent::Tunnel(Err(e)) => self.handle_tunnel_error(e)?, - CombinedEvent::Portal(Some(PortalEvent::Received(msg))) => { + CombinedEvent::Portal(Some(event)) => { + let msg = event.context("Connection to portal failed")?; + self.handle_portal_message(msg).await?; } - CombinedEvent::Portal(Some(PortalEvent::Error(e))) => { - return Err(e).context("Connection to portal failed"); - } CombinedEvent::Portal(None) => { - return Err(anyhow::Error::msg("portal task exited unexpectedly")); + return Err(DisconnectError(anyhow::Error::msg( + "portal task exited unexpectedly", + ))); } } } @@ -238,15 +204,13 @@ impl Eventloop { .context("Failed to send message to portal")?; } ClientEvent::ResourcesChanged { resources } => { - self.event_tx - .send(Event::ResourcesUpdated(resources)) - .await + self.resource_list_sender + .send(resources) .context("Failed to emit event")?; } ClientEvent::TunInterfaceUpdated(config) => { - self.event_tx - .send(Event::tun_interface_updated(config)) - .await + self.tun_config_sender + .send(Some(config)) .context("Failed to emit event")?; } } @@ -431,7 +395,7 @@ impl Eventloop { async fn phoenix_channel_event_loop( mut portal: PhoenixChannel<(), IngressMessages, (), PublicKeyParam>, - event_tx: mpsc::Sender, + event_tx: mpsc::Sender>, mut cmd_rx: mpsc::Receiver, ) { use futures::future::Either; @@ -441,7 +405,7 @@ async fn phoenix_channel_event_loop( loop { match select(poll_fn(|cx| portal.poll(cx)), pin!(cmd_rx.recv())).await { Either::Left((Ok(phoenix_channel::Event::InboundMessage { msg, .. }), _)) => { - if event_tx.send(PortalEvent::Received(msg)).await.is_err() { + if event_tx.send(Ok(msg)).await.is_err() { tracing::debug!("Event channel closed: exiting phoenix-channel event-loop"); break; @@ -479,7 +443,7 @@ async fn phoenix_channel_event_loop( "Hiccup in portal connection: {error:#}" ), Either::Left((Err(e), _)) => { - let _ = event_tx.send(PortalEvent::Error(e)).await; // We don't care about the result because we are exiting anyway. + let _ = event_tx.send(Err(e)).await; // We don't care about the result because we are exiting anyway. break; } diff --git a/rust/client-shared/src/lib.rs b/rust/client-shared/src/lib.rs index 5b27a104a..6371f2c01 100644 --- a/rust/client-shared/src/lib.rs +++ b/rust/client-shared/src/lib.rs @@ -1,21 +1,26 @@ //! Main connlib library for clients. pub use crate::serde_routelist::{V4RouteList, V6RouteList}; pub use connlib_model::StaticSecret; -pub use eventloop::{DisconnectError, Event}; +pub use eventloop::DisconnectError; +pub use firezone_tunnel::TunConfig; pub use firezone_tunnel::messages::client::{IngressMessages, ResourceDescription}; -use anyhow::{Context as _, Result}; -use connlib_model::ResourceId; +use anyhow::Result; +use connlib_model::{ResourceId, ResourceView}; use eventloop::{Command, Eventloop}; use firezone_tunnel::ClientTunnel; +use futures::{FutureExt, StreamExt}; use phoenix_channel::{PhoenixChannel, PublicKeyParam}; use socket_factory::{SocketFactory, TcpSocket, UdpSocket}; use std::collections::BTreeSet; +use std::future; use std::net::IpAddr; use std::sync::Arc; use std::task::{Context, Poll}; -use tokio::sync::mpsc::{Receiver, Sender, UnboundedReceiver, UnboundedSender}; +use tokio::sync::mpsc; +use tokio::sync::watch; use tokio::task::JoinHandle; +use tokio_stream::wrappers::WatchStream; use tun::Tun; mod eventloop; @@ -29,12 +34,21 @@ const PHOENIX_TOPIC: &str = "client"; /// To stop the session, simply drop this struct. #[derive(Clone, Debug)] pub struct Session { - channel: UnboundedSender, + channel: mpsc::UnboundedSender, } #[derive(Debug)] pub struct EventStream { - channel: Receiver, + eventloop: JoinHandle>, + resource_list_receiver: WatchStream>, + tun_config_receiver: WatchStream>, +} + +#[derive(Debug)] +pub enum Event { + TunInterfaceUpdated(TunConfig), + ResourcesUpdated(Vec), + Disconnected(DisconnectError), } impl Session { @@ -47,19 +61,31 @@ impl Session { portal: PhoenixChannel<(), IngressMessages, (), PublicKeyParam>, handle: tokio::runtime::Handle, ) -> (Self, EventStream) { - let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel(); - let (event_tx, event_rx) = tokio::sync::mpsc::channel(1000); + let (cmd_tx, cmd_rx) = mpsc::unbounded_channel(); - let connect_handle = handle.spawn(connect( - tcp_socket_factory, - udp_socket_factory, - portal, - cmd_rx, - event_tx.clone(), - )); - handle.spawn(connect_supervisor(connect_handle, event_tx)); + // Use `watch` channels for resource list and TUN config because we are only ever interested in the last value and don't care about intermediate updates. + let (tun_config_sender, tun_config_receiver) = watch::channel(None); + let (resource_list_sender, resource_list_receiver) = watch::channel(Vec::default()); - (Self { channel: cmd_tx }, EventStream { channel: event_rx }) + let eventloop = handle.spawn( + Eventloop::new( + ClientTunnel::new(tcp_socket_factory, udp_socket_factory), + portal, + cmd_rx, + resource_list_sender, + tun_config_sender, + ) + .run(), + ); + + ( + Self { channel: cmd_tx }, + EventStream { + eventloop, + resource_list_receiver: WatchStream::from_changes(resource_list_receiver), + tun_config_receiver: WatchStream::from_changes(tun_config_receiver), + }, + ) } /// Reset a [`Session`]. @@ -114,11 +140,30 @@ impl Session { impl EventStream { pub fn poll_next(&mut self, cx: &mut Context) -> Poll> { - self.channel.poll_recv(cx) + match self.eventloop.poll_unpin(cx) { + Poll::Ready(Ok(Ok(()))) => return Poll::Ready(None), + Poll::Ready(Ok(Err(e))) => return Poll::Ready(Some(Event::Disconnected(e))), + Poll::Ready(Err(e)) => { + return Poll::Ready(Some(Event::Disconnected(DisconnectError::from( + anyhow::Error::new(e).context("connlib crashed"), + )))); + } + Poll::Pending => {} + } + + if let Poll::Ready(Some(resources)) = self.resource_list_receiver.poll_next_unpin(cx) { + return Poll::Ready(Some(Event::ResourcesUpdated(resources))); + } + + if let Poll::Ready(Some(Some(config))) = self.tun_config_receiver.poll_next_unpin(cx) { + return Poll::Ready(Some(Event::TunInterfaceUpdated(config))); + } + + Poll::Pending } pub async fn next(&mut self) -> Option { - self.channel.recv().await + future::poll_fn(|cx| self.poll_next(cx)).await } } @@ -127,51 +172,3 @@ impl Drop for Session { tracing::debug!("`Session` dropped") } } - -/// Connects to the portal and starts a tunnel. -/// -/// When this function exits, the tunnel failed unrecoverably and you need to call it again. -async fn connect( - tcp_socket_factory: Arc>, - udp_socket_factory: Arc>, - portal: PhoenixChannel<(), IngressMessages, (), PublicKeyParam>, - cmd_rx: UnboundedReceiver, - event_tx: Sender, -) -> Result<()> { - Eventloop::new( - ClientTunnel::new(tcp_socket_factory, udp_socket_factory), - portal, - cmd_rx, - event_tx, - ) - .run() - .await?; - - Ok(()) -} - -/// A supervisor task that handles, when [`connect`] exits. -async fn connect_supervisor( - connect_handle: JoinHandle>, - event_tx: tokio::sync::mpsc::Sender, -) { - let task = async { - connect_handle.await.context("connlib crashed")??; - - Ok(()) - }; - - let error = match task.await { - Ok(()) => { - tracing::info!("connlib exited gracefully"); - - return; - } - Err(e) => e, - }; - - match event_tx.send(Event::Disconnected(error)).await { - Ok(()) => (), - Err(_) => tracing::debug!("Event stream closed before we could send disconnected event"), - } -} diff --git a/rust/client-shared/src/serde_routelist.rs b/rust/client-shared/src/serde_routelist.rs index 3ad351162..902a61a61 100644 --- a/rust/client-shared/src/serde_routelist.rs +++ b/rust/client-shared/src/serde_routelist.rs @@ -13,7 +13,7 @@ struct Cidr { pub struct V4RouteList(Vec>); impl V4RouteList { - pub fn new(route: Vec) -> Self { + pub fn new(route: impl IntoIterator) -> Self { Self( route .into_iter() @@ -32,7 +32,7 @@ impl V4RouteList { pub struct V6RouteList(Vec>); impl V6RouteList { - pub fn new(route: Vec) -> Self { + pub fn new(route: impl IntoIterator) -> Self { Self( route .into_iter() diff --git a/rust/connlib/tunnel/src/lib.rs b/rust/connlib/tunnel/src/lib.rs index 1f69d33cb..bca296e72 100644 --- a/rust/connlib/tunnel/src/lib.rs +++ b/rust/connlib/tunnel/src/lib.rs @@ -446,6 +446,12 @@ pub struct TunConfig { pub ipv6_routes: BTreeSet, } +impl TunConfig { + pub fn dns_sentinel_ips(&self) -> Vec { + self.dns_by_sentinel.left_values().copied().collect() + } +} + #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct IpConfig { pub v4: Ipv4Addr, diff --git a/rust/gui-client/src-tauri/src/service.rs b/rust/gui-client/src-tauri/src/service.rs index 56e74a92b..7a35bcc1f 100644 --- a/rust/gui-client/src-tauri/src/service.rs +++ b/rust/gui-client/src-tauri/src/service.rs @@ -473,19 +473,16 @@ impl<'a> Handler<'a> { }) .await? } - client_shared::Event::TunInterfaceUpdated { - ipv4, - ipv6, - dns, - search_domain, - ipv4_routes, - ipv6_routes, - } => { + client_shared::Event::TunInterfaceUpdated(config) => { self.session.transition_to_connected()?; - self.tun_device.set_ips(ipv4, ipv6).await?; - self.dns_controller.set_dns(dns, search_domain).await?; - self.tun_device.set_routes(ipv4_routes, ipv6_routes).await?; + self.tun_device.set_ips(config.ip.v4, config.ip.v6).await?; + self.dns_controller + .set_dns(config.dns_sentinel_ips(), config.search_domain) + .await?; + self.tun_device + .set_routes(config.ipv4_routes, config.ipv6_routes) + .await?; self.dns_controller.flush()?; } client_shared::Event::ResourcesUpdated(resources) => { diff --git a/rust/headless-client/src/main.rs b/rust/headless-client/src/main.rs index 5b430e61c..4d8de9b6b 100644 --- a/rust/headless-client/src/main.rs +++ b/rust/headless-client/src/main.rs @@ -328,18 +328,10 @@ fn main() -> Result<()> { // On every Resources update, flush DNS to mitigate dns_controller.flush()?; } - client_shared::Event::TunInterfaceUpdated { - ipv4, - ipv6, - dns, - search_domain, - ipv4_routes, - ipv6_routes, - } => { - tun_device.set_ips(ipv4, ipv6).await?; - tun_device.set_routes(ipv4_routes, ipv6_routes).await?; - - dns_controller.set_dns(dns, search_domain).await?; + client_shared::Event::TunInterfaceUpdated(config) => { + tun_device.set_ips(config.ip.v4, config.ip.v6).await?; + dns_controller.set_dns(config.dns_sentinel_ips(), config.search_domain).await?; + tun_device.set_routes(config.ipv4_routes, config.ipv6_routes).await?; // `on_set_interface_config` is guaranteed to be called when the tunnel is completely ready //