diff --git a/rust/Cargo.lock b/rust/Cargo.lock index c75fcd8c3..0f8d455c1 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1347,6 +1347,7 @@ dependencies = [ "dns-types", "firezone-logging", "firezone-tunnel", + "futures", "ip_network", "libc", "phoenix-channel", diff --git a/rust/client-shared/Cargo.toml b/rust/client-shared/Cargo.toml index 327c88785..9d34fe61f 100644 --- a/rust/client-shared/Cargo.toml +++ b/rust/client-shared/Cargo.toml @@ -12,6 +12,7 @@ connlib-model = { workspace = true } dns-types = { workspace = true } firezone-logging = { workspace = true } firezone-tunnel = { workspace = true } +futures = { workspace = true } ip_network = { workspace = true } libc = { workspace = true } phoenix-channel = { workspace = true } diff --git a/rust/client-shared/src/eventloop.rs b/rust/client-shared/src/eventloop.rs index ae753d301..be3eb0a97 100644 --- a/rust/client-shared/src/eventloop.rs +++ b/rust/client-shared/src/eventloop.rs @@ -7,11 +7,12 @@ use firezone_tunnel::messages::client::{ EgressMessages, FailReason, FlowCreated, FlowCreationFailed, GatewayIceCandidates, GatewaysIceCandidates, IngressMessages, InitClient, }; -use firezone_tunnel::{ClientTunnel, IpConfig}; +use firezone_tunnel::{ClientEvent, ClientTunnel, IpConfig, TunConfig}; use ip_network::{Ipv4Network, Ipv6Network}; -use phoenix_channel::{ErrorReply, OutboundRequestId, PhoenixChannel, PublicKeyParam}; -use std::mem; +use phoenix_channel::{ErrorReply, PhoenixChannel, PublicKeyParam}; use std::net::{Ipv4Addr, Ipv6Addr}; +use std::ops::ControlFlow; +use std::pin::pin; use std::time::Instant; use std::{ collections::BTreeSet, @@ -19,15 +20,18 @@ use std::{ net::IpAddr, task::{Context, Poll}, }; -use tokio::sync::mpsc::error::TrySendError; +use std::{future, mem}; +use tokio::sync::mpsc; use tun::Tun; pub struct Eventloop { tunnel: ClientTunnel, - portal: PhoenixChannel<(), IngressMessages, (), PublicKeyParam>, - cmd_rx: tokio::sync::mpsc::UnboundedReceiver, - event_tx: tokio::sync::mpsc::Sender, + cmd_rx: mpsc::UnboundedReceiver, + event_tx: mpsc::Sender, + + portal_event_rx: mpsc::Receiver, + portal_cmd_tx: mpsc::Sender, logged_permission_denied: bool, } @@ -54,6 +58,33 @@ pub enum Event { Disconnected(DisconnectError), } +impl Event { + fn tun_interface_updated(config: TunConfig) -> Self { + Self::TunInterfaceUpdated { + ipv4: config.ip.v4, + ipv6: config.ip.v6, + dns: config.dns_by_sentinel.left_values().copied().collect(), + search_domain: config.search_domain, + ipv4_routes: Vec::from_iter(config.ipv4_routes), + ipv6_routes: Vec::from_iter(config.ipv6_routes), + } + } +} + +enum PortalCommand { + Connect(PublicKeyParam), + Send(EgressMessages), +} + +#[expect( + clippy::large_enum_variant, + reason = "This type is only sent through a channel so the stack-size doesn't matter much." +)] +enum PortalEvent { + Received(IngressMessages), + Error(phoenix_channel::Error), +} + /// Unified error type to use across connlib. #[derive(thiserror::Error, Debug)] #[error("{0:#}")] @@ -79,216 +110,193 @@ impl Eventloop { pub(crate) fn new( tunnel: ClientTunnel, mut portal: PhoenixChannel<(), IngressMessages, (), PublicKeyParam>, - cmd_rx: tokio::sync::mpsc::UnboundedReceiver, - event_tx: tokio::sync::mpsc::Sender, + cmd_rx: mpsc::UnboundedReceiver, + event_tx: mpsc::Sender, ) -> Self { + let (portal_event_tx, portal_event_rx) = mpsc::channel(128); + let (portal_cmd_tx, portal_cmd_rx) = mpsc::channel(128); + portal.connect(PublicKeyParam(tunnel.public_key().to_bytes())); + tokio::spawn(phoenix_channel_event_loop( + portal, + portal_event_tx, + portal_cmd_rx, + )); + Self { tunnel, - portal, cmd_rx, event_tx, logged_permission_denied: false, + portal_event_rx, + portal_cmd_tx, } } } +enum CombinedEvent { + Command(Option), + Tunnel(Result), + Portal(Option), +} + impl Eventloop { - pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll> { + pub async fn run(mut self) -> Result<()> { loop { - match self.cmd_rx.poll_recv(cx) { - Poll::Ready(None | Some(Command::Stop)) => return Poll::Ready(Ok(())), - Poll::Ready(Some(Command::SetDns(dns))) => { - self.tunnel.state_mut().update_system_resolvers(dns); - - continue; + match future::poll_fn(|cx| self.next_event(cx)).await { + CombinedEvent::Command(None) => return Ok(()), + CombinedEvent::Command(Some(cmd)) => { + match self.handle_eventloop_command(cmd).await? { + ControlFlow::Continue(()) => {} + ControlFlow::Break(()) => return Ok(()), + } } - Poll::Ready(Some(Command::SetDisabledResources(resources))) => { - self.tunnel.state_mut().set_disabled_resources(resources); - continue; + CombinedEvent::Tunnel(Ok(event)) => self.handle_tunnel_event(event).await?, + CombinedEvent::Tunnel(Err(e)) => self.handle_tunnel_error(e)?, + CombinedEvent::Portal(Some(PortalEvent::Received(msg))) => { + self.handle_portal_message(msg).await?; } - Poll::Ready(Some(Command::SetTun(tun))) => { - self.tunnel.set_tun(tun); - continue; + CombinedEvent::Portal(Some(PortalEvent::Error(e))) => { + return Err(e).context("Connection to portal failed"); } - Poll::Ready(Some(Command::Reset(reason))) => { - self.tunnel.reset(&reason); - self.portal - .connect(PublicKeyParam(self.tunnel.public_key().to_bytes())); - - continue; + CombinedEvent::Portal(None) => { + return Err(anyhow::Error::msg("portal task exited unexpectedly")); } - Poll::Pending => {} } - - match self.tunnel.poll_next_event(cx) { - Poll::Ready(Ok(event)) => { - let Some(e) = self.handle_tunnel_event(event) else { - continue; - }; - - match self.event_tx.try_send(e) { - Ok(()) => {} - Err(TrySendError::Closed(_)) => { - tracing::debug!("Event receiver dropped, exiting event loop"); - - return Poll::Ready(Ok(())); - } - Err(TrySendError::Full(_)) => { - tracing::warn!("App cannot keep up with connlib events, dropping"); - } - }; - - continue; - } - Poll::Ready(Err(e)) => { - if e.root_cause() - .downcast_ref::() - .is_some_and(is_unreachable) - { - tracing::debug!("{e:#}"); // Log these on DEBUG so they don't go completely unnoticed. - continue; - } - - // Invalid Input can be all sorts of things but we mostly see it with unreachable addresses. - if e.root_cause() - .downcast_ref::() - .is_some_and(|e| e.kind() == io::ErrorKind::InvalidInput) - { - tracing::debug!("{e:#}"); - continue; - } - - if e.root_cause() - .is::() - { - return Poll::Ready(Err(e)); - } - - if e.root_cause() - .downcast_ref::() - .is_some_and(|e| e.kind() == io::ErrorKind::PermissionDenied) - { - if !mem::replace(&mut self.logged_permission_denied, true) { - tracing::info!( - "Encountered `PermissionDenied` IO error. Check your local firewall rules to allow outbound STUN/TURN/WireGuard and general UDP traffic." - ) - } - - continue; - } - - tracing::warn!("Tunnel error: {e:#}"); - continue; - } - Poll::Pending => {} - } - - match self.portal.poll(cx) { - Poll::Ready(result) => { - let event = result.context("connection to the portal failed")?; - self.handle_portal_event(event); - continue; - } - Poll::Pending => {} - } - - return Poll::Pending; } } - fn handle_tunnel_event(&mut self, event: firezone_tunnel::ClientEvent) -> Option { + async fn handle_eventloop_command(&mut self, command: Command) -> Result> { + match command { + Command::Stop => return Ok(ControlFlow::Break(())), + Command::SetDns(dns) => self.tunnel.state_mut().update_system_resolvers(dns), + Command::SetDisabledResources(resources) => { + self.tunnel.state_mut().set_disabled_resources(resources) + } + Command::SetTun(tun) => { + self.tunnel.set_tun(tun); + } + Command::Reset(reason) => { + self.tunnel.reset(&reason); + self.portal_cmd_tx + .send(PortalCommand::Connect(PublicKeyParam( + self.tunnel.public_key().to_bytes(), + ))) + .await + .context("Failed to connect phoenix-channel")?; + } + } + + Ok(ControlFlow::Continue(())) + } + + async fn handle_tunnel_event(&mut self, event: ClientEvent) -> Result<()> { match event { - firezone_tunnel::ClientEvent::AddedIceCandidates { + ClientEvent::AddedIceCandidates { conn_id: gid, candidates, } => { tracing::debug!(%gid, ?candidates, "Sending new ICE candidates to gateway"); - self.portal.send( - PHOENIX_TOPIC, - EgressMessages::BroadcastIceCandidates(GatewaysIceCandidates { - gateway_ids: vec![gid], - candidates, - }), - ); - - None + self.portal_cmd_tx + .send(PortalCommand::Send(EgressMessages::BroadcastIceCandidates( + GatewaysIceCandidates { + gateway_ids: vec![gid], + candidates, + }, + ))) + .await + .context("Failed to send message to portal")?; } - firezone_tunnel::ClientEvent::RemovedIceCandidates { + ClientEvent::RemovedIceCandidates { conn_id: gid, candidates, } => { tracing::debug!(%gid, ?candidates, "Sending invalidated ICE candidates to gateway"); - self.portal.send( - PHOENIX_TOPIC, - EgressMessages::BroadcastInvalidatedIceCandidates(GatewaysIceCandidates { - gateway_ids: vec![gid], - candidates, - }), - ); - - None + self.portal_cmd_tx + .send(PortalCommand::Send( + EgressMessages::BroadcastInvalidatedIceCandidates(GatewaysIceCandidates { + gateway_ids: vec![gid], + candidates, + }), + )) + .await + .context("Failed to send message to portal")?; } - firezone_tunnel::ClientEvent::ConnectionIntent { + ClientEvent::ConnectionIntent { connected_gateway_ids, resource, } => { - self.portal.send( - PHOENIX_TOPIC, - EgressMessages::CreateFlow { + self.portal_cmd_tx + .send(PortalCommand::Send(EgressMessages::CreateFlow { resource_id: resource, connected_gateway_ids, - }, - ); - - None + })) + .await + .context("Failed to send message to portal")?; } - firezone_tunnel::ClientEvent::ResourcesChanged { resources } => { - Some(Event::ResourcesUpdated(resources)) + ClientEvent::ResourcesChanged { resources } => { + self.event_tx + .send(Event::ResourcesUpdated(resources)) + .await + .context("Failed to emit event")?; } - firezone_tunnel::ClientEvent::TunInterfaceUpdated(config) => { - Some(Event::TunInterfaceUpdated { - ipv4: config.ip.v4, - ipv6: config.ip.v6, - dns: config.dns_by_sentinel.left_values().copied().collect(), - search_domain: config.search_domain, - ipv4_routes: Vec::from_iter(config.ipv4_routes), - ipv6_routes: Vec::from_iter(config.ipv6_routes), - }) + ClientEvent::TunInterfaceUpdated(config) => { + self.event_tx + .send(Event::tun_interface_updated(config)) + .await + .context("Failed to emit event")?; } } + + Ok(()) } - fn handle_portal_event(&mut self, event: phoenix_channel::Event) { - match event { - phoenix_channel::Event::InboundMessage { msg, .. } => { - self.handle_portal_inbound_message(msg); - } - phoenix_channel::Event::SuccessResponse { res: (), .. } => {} - phoenix_channel::Event::ErrorResponse { res, req_id, topic } => { - self.handle_portal_error_reply(res, topic, req_id); - } - phoenix_channel::Event::HeartbeatSent => {} - phoenix_channel::Event::JoinedRoom { .. } => {} - phoenix_channel::Event::Closed => { - unimplemented!("Client never actively closes the portal connection") - } - phoenix_channel::Event::Hiccup { - backoff, - max_elapsed_time, - error, - } => tracing::info!( - ?backoff, - ?max_elapsed_time, - "Hiccup in portal connection: {error:#}" - ), + fn handle_tunnel_error(&mut self, e: anyhow::Error) -> Result<()> { + if e.root_cause() + .downcast_ref::() + .is_some_and(is_unreachable) + { + tracing::debug!("{e:#}"); // Log these on DEBUG so they don't go completely unnoticed. + return Ok(()); } + + // Invalid Input can be all sorts of things but we mostly see it with unreachable addresses. + if e.root_cause() + .downcast_ref::() + .is_some_and(|e| e.kind() == io::ErrorKind::InvalidInput) + { + tracing::debug!("{e:#}"); + return Ok(()); + } + + if e.root_cause() + .is::() + { + return Err(e); + } + + if e.root_cause() + .downcast_ref::() + .is_some_and(|e| e.kind() == io::ErrorKind::PermissionDenied) + { + if !mem::replace(&mut self.logged_permission_denied, true) { + tracing::info!( + "Encountered `PermissionDenied` IO error. Check your local firewall rules to allow outbound STUN/TURN/WireGuard and general UDP traffic." + ) + } + + return Ok(()); + } + + tracing::warn!("Tunnel error: {e:#}"); + + Ok(()) } - fn handle_portal_inbound_message(&mut self, msg: IngressMessages) { + async fn handle_portal_message(&mut self, msg: IngressMessages) -> Result<()> { match msg { IngressMessages::ConfigChanged(config) => self .tunnel @@ -377,8 +385,12 @@ impl Eventloop { ); // Re-connecting to the portal means we will receive another `init` and thus new TURN servers. - self.portal - .connect(PublicKeyParam(self.tunnel.public_key().to_bytes())); + self.portal_cmd_tx + .send(PortalCommand::Connect(PublicKeyParam( + self.tunnel.public_key().to_bytes(), + ))) + .await + .context("Failed to connect phoenix-channel")?; } Err(e) => { tracing::warn!("Failed to request new connection: {e:#}"); @@ -396,23 +408,92 @@ impl Eventloop { tracing::debug!("Failed to create flow: {reason:?}") } } + + Ok(()) } - fn handle_portal_error_reply( - &mut self, - res: ErrorReply, - topic: String, - req_id: OutboundRequestId, - ) { - match res { - ErrorReply::Disabled => { - tracing::debug!(%req_id, "Functionality is disabled"); + fn next_event(&mut self, cx: &mut Context) -> Poll { + if let Poll::Ready(cmd) = self.cmd_rx.poll_recv(cx) { + return Poll::Ready(CombinedEvent::Command(cmd)); + } + + if let Poll::Ready(event) = self.portal_event_rx.poll_recv(cx) { + return Poll::Ready(CombinedEvent::Portal(event)); + } + + if let Poll::Ready(event) = self.tunnel.poll_next_event(cx) { + return Poll::Ready(CombinedEvent::Tunnel(event)); + } + + Poll::Pending + } +} + +async fn phoenix_channel_event_loop( + mut portal: PhoenixChannel<(), IngressMessages, (), PublicKeyParam>, + event_tx: mpsc::Sender, + mut cmd_rx: mpsc::Receiver, +) { + use futures::future::Either; + use futures::future::select; + use std::future::poll_fn; + + loop { + match select(poll_fn(|cx| portal.poll(cx)), pin!(cmd_rx.recv())).await { + Either::Left((Ok(phoenix_channel::Event::InboundMessage { msg, .. }), _)) => { + if event_tx.send(PortalEvent::Received(msg)).await.is_err() { + tracing::debug!("Event channel closed: exiting phoenix-channel event-loop"); + + break; + } } - ErrorReply::UnmatchedTopic => { - self.portal.join(topic, ()); + Either::Left((Ok(phoenix_channel::Event::SuccessResponse { res: (), .. }), _)) => {} + Either::Left((Ok(phoenix_channel::Event::ErrorResponse { res, req_id, topic }), _)) => { + match res { + ErrorReply::Disabled => { + tracing::debug!(%req_id, "Functionality is disabled"); + } + ErrorReply::UnmatchedTopic => { + portal.join(topic, ()); + } + reason @ (ErrorReply::InvalidVersion | ErrorReply::Other) => { + tracing::debug!(%req_id, %reason, "Request failed"); + } + } } - reason @ (ErrorReply::InvalidVersion | ErrorReply::Other) => { - tracing::debug!(%req_id, %reason, "Request failed"); + Either::Left((Ok(phoenix_channel::Event::HeartbeatSent), _)) => {} + Either::Left((Ok(phoenix_channel::Event::JoinedRoom { .. }), _)) => {} + Either::Left((Ok(phoenix_channel::Event::Closed), _)) => { + unimplemented!("Client never actively closes the portal connection") + } + Either::Left(( + Ok(phoenix_channel::Event::Hiccup { + backoff, + max_elapsed_time, + error, + }), + _, + )) => tracing::info!( + ?backoff, + ?max_elapsed_time, + "Hiccup in portal connection: {error:#}" + ), + Either::Left((Err(e), _)) => { + if event_tx.send(PortalEvent::Error(e)).await.is_err() { + tracing::debug!("Event channel closed: exiting phoenix-channel event-loop"); + break; + } + } + Either::Right((Some(PortalCommand::Send(msg)), _)) => { + portal.send(PHOENIX_TOPIC, msg); + } + Either::Right((Some(PortalCommand::Connect(param)), _)) => { + portal.connect(param); + } + Either::Right((None, _)) => { + tracing::debug!("Command channel closed: exiting phoenix-channel event-loop"); + + break; } } } diff --git a/rust/client-shared/src/lib.rs b/rust/client-shared/src/lib.rs index 5b43e95f7..5b27a104a 100644 --- a/rust/client-shared/src/lib.rs +++ b/rust/client-shared/src/lib.rs @@ -138,10 +138,14 @@ async fn connect( cmd_rx: UnboundedReceiver, event_tx: Sender, ) -> Result<()> { - let tunnel = ClientTunnel::new(tcp_socket_factory, udp_socket_factory); - let mut eventloop = Eventloop::new(tunnel, portal, cmd_rx, event_tx); - - std::future::poll_fn(|cx| eventloop.poll(cx)).await?; + Eventloop::new( + ClientTunnel::new(tcp_socket_factory, udp_socket_factory), + portal, + cmd_rx, + event_tx, + ) + .run() + .await?; Ok(()) }