diff --git a/rust/Cargo.lock b/rust/Cargo.lock index ca61633e1..9096eb174 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1975,6 +1975,7 @@ dependencies = [ "snownet", "socket-factory", "static_assertions", + "thiserror", "tokio", "tracing", "tracing-subscriber", diff --git a/rust/gateway/Cargo.toml b/rust/gateway/Cargo.toml index 6118ab79c..716e500c3 100644 --- a/rust/gateway/Cargo.toml +++ b/rust/gateway/Cargo.toml @@ -33,6 +33,7 @@ serde = { workspace = true, features = ["std", "derive"] } snownet = { workspace = true } socket-factory = { workspace = true } static_assertions = { workspace = true } +thiserror = { workspace = true } tokio = { workspace = true, features = ["sync", "macros", "fs", "signal", "rt"] } tracing = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/rust/gateway/src/eventloop.rs b/rust/gateway/src/eventloop.rs index 433ff2b22..67216120b 100644 --- a/rust/gateway/src/eventloop.rs +++ b/rust/gateway/src/eventloop.rs @@ -3,21 +3,25 @@ use boringtun::x25519::PublicKey; use connlib_model::DomainName; #[cfg(not(target_os = "windows"))] use dns_lookup::{AddrInfoHints, AddrInfoIter, LookupError}; -use firezone_logging::{anyhow_dyn_err, std_dyn_err, telemetry_event, telemetry_span}; +use firezone_bin_shared::TunDeviceManager; +use firezone_logging::{anyhow_dyn_err, telemetry_event, telemetry_span}; use firezone_tunnel::messages::gateway::{ AllowAccess, ClientIceCandidates, ClientsIceCandidates, ConnectionReady, EgressMessages, IngressMessages, RejectAccess, RequestConnection, }; -use firezone_tunnel::messages::{ConnectionAccepted, GatewayResponse, Interface, RelaysPresence}; -use firezone_tunnel::{DnsResourceNatEntry, GatewayTunnel, ResolveDnsRequest}; -use futures::channel::mpsc; +use firezone_tunnel::messages::{ConnectionAccepted, GatewayResponse, RelaysPresence}; +use firezone_tunnel::{ + DnsResourceNatEntry, GatewayTunnel, ResolveDnsRequest, IPV4_PEERS, IPV6_PEERS, +}; use phoenix_channel::{PhoenixChannel, PublicKeyParam}; use std::collections::BTreeSet; use std::convert::Infallible; use std::io; use std::net::IpAddr; +use std::sync::Arc; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; +use tokio::sync::Mutex; use tracing::Instrument; pub const PHOENIX_TOPIC: &str = "gateway"; @@ -41,33 +45,32 @@ enum ResolveTrigger { pub struct Eventloop { tunnel: GatewayTunnel, portal: PhoenixChannel<(), IngressMessages, (), PublicKeyParam>, - tun_device_channel: mpsc::Sender, + tun_device_manager: Arc>, resolve_tasks: futures_bounded::FuturesTupleSet>, ResolveTrigger>, + set_interface_tasks: futures_bounded::FuturesSet>, } impl Eventloop { pub(crate) fn new( tunnel: GatewayTunnel, mut portal: PhoenixChannel<(), IngressMessages, (), PublicKeyParam>, - tun_device_channel: mpsc::Sender, + tun_device_manager: TunDeviceManager, ) -> Self { portal.connect(PublicKeyParam(tunnel.public_key().to_bytes())); Self { tunnel, portal, + tun_device_manager: Arc::new(Mutex::new(tun_device_manager)), resolve_tasks: futures_bounded::FuturesTupleSet::new(DNS_RESOLUTION_TIMEOUT, 1000), - tun_device_channel, + set_interface_tasks: futures_bounded::FuturesSet::new(Duration::from_secs(5), 10), } } } impl Eventloop { - pub fn poll( - &mut self, - cx: &mut Context<'_>, - ) -> Poll> { + pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll> { loop { match self.tunnel.poll_next_event(cx) { Poll::Ready(Ok(event)) => { @@ -132,6 +135,15 @@ impl Eventloop { Poll::Pending => {} } + match self.set_interface_tasks.poll_unpin(cx) { + Poll::Ready(result) => { + result + .unwrap_or_else(|e| Err(anyhow::Error::new(e))) + .context("Failed to update TUN interface")?; + } + Poll::Pending => {} + } + match self.portal.poll(cx)? { Poll::Ready(event) => { self.handle_portal_event(event); @@ -312,12 +324,30 @@ impl Eventloop { Instant::now(), ); - // FIXME(tech-debt): Currently, the `Tunnel` creates the TUN device as part of `set_interface`. - // For the gateway, it doesn't do anything else so in an ideal world, we would cause the side-effect out here and just pass an opaque `Device` to the `Tunnel`. - // That requires more refactoring of other platforms, so for now, we need to rely on the `Tunnel` interface and cause the side-effect separately via the `TunDeviceManager`. - if let Err(e) = self.tun_device_channel.try_send(init.interface) { - tracing::warn!(error = std_dyn_err(&e), "Failed to set interface"); - } + if self + .set_interface_tasks + .try_push({ + let tun_device_manager = self.tun_device_manager.clone(); + + async move { + let mut tun_device_manager = tun_device_manager.lock().await; + + tun_device_manager + .set_ips(init.interface.ipv4, init.interface.ipv6) + .await + .context("Failed to set TUN interface IPs")?; + tun_device_manager + .set_routes(vec![IPV4_PEERS], vec![IPV6_PEERS]) + .await + .context("Failed to set TUN routes")?; + + Ok(()) + } + }) + .is_err() + { + tracing::warn!("Too many 'Update TUN device' tasks"); + }; } phoenix_channel::Event::InboundMessage { msg: IngressMessages::ResourceUpdated(resource_description), @@ -424,6 +454,14 @@ impl Eventloop { } } +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("Failed to login to portal: {0}")] + PhoenixChannel(#[from] phoenix_channel::Error), + #[error("Failed to update TUN device: {0:#}")] + UpdateTun(#[from] anyhow::Error), +} + async fn resolve(domain: Option) -> Result> { let Some(domain) = domain.clone() else { return Ok(vec![]); diff --git a/rust/gateway/src/main.rs b/rust/gateway/src/main.rs index a9c080876..9d5cfff2a 100644 --- a/rust/gateway/src/main.rs +++ b/rust/gateway/src/main.rs @@ -9,13 +9,11 @@ use firezone_bin_shared::{ }; use firezone_logging::anyhow_dyn_err; use firezone_telemetry::Telemetry; -use firezone_tunnel::messages::Interface; -use firezone_tunnel::{GatewayTunnel, IPV4_PEERS, IPV6_PEERS}; +use firezone_tunnel::GatewayTunnel; use phoenix_channel::get_user_agent; use phoenix_channel::LoginUrl; -use futures::channel::mpsc; -use futures::{future, StreamExt, TryFutureExt}; +use futures::{future, TryFutureExt}; use phoenix_channel::PhoenixChannel; use secrecy::{Secret, SecretString}; use std::path::Path; @@ -106,8 +104,6 @@ async fn try_main(cli: Cli, telemetry: &mut Telemetry) -> Result { ) .context("Failed to resolve portal URL")?; - let (sender, receiver) = mpsc::channel::(10); - let mut tun_device_manager = TunDeviceManager::new(ip_packet::MAX_IP_SIZE, cli.tun_threads) .context("Failed to create TUN device manager")?; let tun = tun_device_manager @@ -115,10 +111,8 @@ async fn try_main(cli: Cli, telemetry: &mut Telemetry) -> Result { .context("Failed to create TUN device")?; tunnel.set_tun(Box::new(tun)); - tokio::spawn(update_device_task(tun_device_manager, receiver)); - let task = tokio::spawn(future::poll_fn({ - let mut eventloop = Eventloop::new(tunnel, portal, sender); + let mut eventloop = Eventloop::new(tunnel, portal, tun_device_manager); move |cx| eventloop.poll(cx) })) @@ -135,7 +129,7 @@ async fn try_main(cli: Cli, telemetry: &mut Telemetry) -> Result { .map_err(|e| e.factor_first().0)? { future::Either::Left((Err(e), _)) => { - tracing::info!("Failed to login to portal: {e}"); + tracing::info!("{e}"); Ok(ExitCode::FAILURE) } @@ -168,27 +162,6 @@ async fn get_firezone_id(env_id: Option) -> Result { Ok(id) } -async fn update_device_task( - mut tun_device: TunDeviceManager, - mut receiver: mpsc::Receiver, -) { - while let Some(next_interface) = receiver.next().await { - if let Err(e) = tun_device - .set_ips(next_interface.ipv4, next_interface.ipv6) - .await - { - tracing::warn!(error = anyhow_dyn_err(&e), "Failed to set interface"); - } - - if let Err(e) = tun_device - .set_routes(vec![IPV4_PEERS], vec![IPV6_PEERS]) - .await - { - tracing::warn!(error = anyhow_dyn_err(&e), "Failed; to set routes"); - }; - } -} - #[derive(Parser)] #[command(author, version, about, long_about = None)] struct Cli {