From dde98f198597066a2d0f1ce60b65c902708ba137 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Sat, 7 Oct 2023 09:05:52 +1100 Subject: [PATCH] refactor(gateway): introduce `Eventloop` (#2244) --- rust/Cargo.lock | 67 ++--- rust/Cargo.toml | 3 +- rust/connlib/gateway-shared/Cargo.toml | 22 -- rust/connlib/gateway-shared/src/control.rs | 166 ------------ rust/connlib/gateway-shared/src/lib.rs | 238 ------------------ rust/connlib/shared/src/messages.rs | 6 + rust/gateway/Cargo.toml | 24 +- rust/gateway/src/control.rs | 50 ++++ rust/gateway/src/eventloop.rs | 221 ++++++++++++++++ rust/gateway/src/main.rs | 102 ++++++-- .../src/messages.rs | 0 11 files changed, 424 insertions(+), 475 deletions(-) delete mode 100644 rust/connlib/gateway-shared/Cargo.toml delete mode 100644 rust/connlib/gateway-shared/src/control.rs delete mode 100644 rust/connlib/gateway-shared/src/lib.rs create mode 100644 rust/gateway/src/control.rs create mode 100644 rust/gateway/src/eventloop.rs rename rust/{connlib/gateway-shared => gateway}/src/messages.rs (100%) diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 8a131f570..3dac829fb 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -156,9 +156,9 @@ dependencies = [ [[package]] name = "anstream" -version = "0.5.0" +version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1f58811cfac344940f1a400b6e6231ce35171f614f26439e80f8c1465c5cc0c" +checksum = "2ab91ebe16eb252986481c5b62f6098f3b698a45e34b5b98200cf20dd2484a44" dependencies = [ "anstyle", "anstyle-parse", @@ -194,9 +194,9 @@ dependencies = [ [[package]] name = "anstyle-wincon" -version = "2.1.0" +version = "3.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58f54d10c6dfa51283a066ceab3ec1ab78d13fae00aa49243a45e4571fb79dfd" +checksum = "f0699d10d2f4d628a98ee7b57b289abbc98ff3bad977cb3152709d4bf2330628" dependencies = [ "anstyle", "windows-sys 0.48.0", @@ -693,9 +693,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.4.4" +version = "4.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1d7b8d5ec32af0fadc644bf1fd509a688c2103b185644bb1e29d164e0703136" +checksum = "d04704f56c2cde07f43e8e2c154b43f216dc5c92fc98ada720177362f953b956" dependencies = [ "clap_builder", "clap_derive", @@ -703,9 +703,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.4.4" +version = "4.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5179bb514e4d7c2051749d8fcefa2ed6d06a9f4e6d69faf3805f5d80b8cf8d56" +checksum = "0e231faeaca65ebd1ea3c737966bf858971cd38c3849107aa3ea7de90a804e45" dependencies = [ "anstream", "anstyle", @@ -814,25 +814,6 @@ dependencies = [ "webrtc", ] -[[package]] -name = "connlib-gateway-shared" -version = "1.20231001.0" -dependencies = [ - "async-trait", - "backoff", - "chrono", - "connlib-shared", - "firezone-tunnel", - "secrecy", - "serde", - "serde_json", - "tokio", - "tokio-tungstenite", - "tracing", - "url", - "webrtc", -] - [[package]] name = "connlib-shared" version = "1.20231001.0" @@ -1256,12 +1237,26 @@ name = "firezone-gateway" version = "1.20231001.0" dependencies = [ "anyhow", + "async-trait", + "backoff", + "boringtun", + "chrono", "clap", - "connlib-gateway-shared", + "connlib-shared", + "firezone-tunnel", + "futures", + "futures-bounded", "headless-utils", + "phoenix-channel", "secrecy", + "serde", + "serde_json", + "tokio", + "tokio-tungstenite", "tracing", "tracing-subscriber", + "url", + "webrtc", ] [[package]] @@ -1340,6 +1335,16 @@ dependencies = [ "futures-util", ] +[[package]] +name = "futures-bounded" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b07bbbe7d7e78809544c6f718d875627addc73a7c3582447abc052cd3dc67e0" +dependencies = [ + "futures-timer", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.28" @@ -1396,6 +1401,12 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" +[[package]] +name = "futures-timer" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" + [[package]] name = "futures-util" version = "0.3.28" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 18ae24f50..19c3cbc20 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -3,7 +3,6 @@ members = [ "connlib/clients/android", "connlib/clients/apple", "connlib/clients/shared", - "connlib/gateway-shared", "connlib/shared", "connlib/tunnel", "gateway", @@ -27,12 +26,12 @@ secrecy = "0.8" connlib-client-android = { path = "connlib/clients/android"} connlib-client-apple = { path = "connlib/clients/apple"} connlib-client-shared = { path = "connlib/clients/shared"} -connlib-gateway-shared = { path = "connlib/gateway-shared"} firezone-gateway = { path = "gateway"} firezone-headless-client = { path = "headless-client"} headless-utils = { path = "headless-utils"} connlib-shared = { path = "connlib/shared"} firezone-tunnel = { path = "connlib/tunnel"} +phoenix-channel = { path = "phoenix-channel"} # Patched to use https://github.com/rust-lang/cc-rs/pull/708 # (the `patch` section can't be used for build deps...) diff --git a/rust/connlib/gateway-shared/Cargo.toml b/rust/connlib/gateway-shared/Cargo.toml deleted file mode 100644 index 036fd5efc..000000000 --- a/rust/connlib/gateway-shared/Cargo.toml +++ /dev/null @@ -1,22 +0,0 @@ -[package] -name = "connlib-gateway-shared" -# mark:automatic-version -version = "1.20231001.0" -edition = "2021" - -[dependencies] -secrecy = { workspace = true } -connlib-shared = { workspace = true } -async-trait = { version = "0.1", default-features = false } -firezone-tunnel = { workspace = true } -tokio = { version = "1.32", default-features = false, features = ["sync"] } -tracing = { workspace = true } -serde = { version = "1.0", default-features = false, features = ["std", "derive"] } -chrono = { workspace = true } -backoff = { workspace = true } -webrtc = "0.8" -url = { version = "2.4.1", default-features = false } -tokio-tungstenite = { version = "0.20", default-features = false, features = ["connect", "handshake", "rustls-tls-webpki-roots"] } - -[dev-dependencies] -serde_json = { version = "1.0", default-features = false, features = ["std"] } diff --git a/rust/connlib/gateway-shared/src/control.rs b/rust/connlib/gateway-shared/src/control.rs deleted file mode 100644 index 49534e959..000000000 --- a/rust/connlib/gateway-shared/src/control.rs +++ /dev/null @@ -1,166 +0,0 @@ -use super::messages::{ - ConnectionReady, EgressMessages, IngressMessages, InitGateway, RequestConnection, -}; -use crate::messages::{AllowAccess, BroadcastClientIceCandidates, ClientIceCandidates}; -use async_trait::async_trait; -use connlib_shared::Error::ControlProtocolError; -use connlib_shared::{ - control::PhoenixSenderWithTopic, - messages::{GatewayId, ResourceDescription}, - Callbacks, Result, -}; -use firezone_tunnel::{ConnId, ControlSignal, Tunnel}; -use std::sync::Arc; -use webrtc::ice_transport::ice_candidate::RTCIceCandidate; - -pub struct ControlPlane { - pub tunnel: Arc>, - pub control_signaler: ControlSignaler, -} - -#[derive(Clone)] -pub struct ControlSignaler { - pub control_signal: PhoenixSenderWithTopic, -} - -#[async_trait] -impl ControlSignal for ControlSignaler { - async fn signal_connection_to( - &self, - resource: &ResourceDescription, - _connected_gateway_ids: &[GatewayId], - _: usize, - ) -> Result<()> { - tracing::warn!("A message to network resource: {resource:?} was discarded, gateways aren't meant to be used as clients."); - Ok(()) - } - - async fn signal_ice_candidate( - &self, - ice_candidate: RTCIceCandidate, - conn_id: ConnId, - ) -> Result<()> { - // TODO: We probably want to have different signal_ice_candidate - // functions for gateway/client but ultimately we just want - // separate control_plane modules - if let ConnId::Client(id) = conn_id { - self.control_signal - .clone() - .send(EgressMessages::BroadcastIceCandidates( - BroadcastClientIceCandidates { - client_ids: vec![id], - candidates: vec![ice_candidate.to_json()?], - }, - )) - .await?; - - Ok(()) - } else { - Err(ControlProtocolError) - } - } -} - -impl ControlPlane { - #[tracing::instrument(level = "trace", skip(self))] - pub async fn init(&mut self, init: InitGateway) -> Result<()> { - if let Err(e) = self.tunnel.set_interface(&init.interface).await { - tracing::error!("Couldn't initialize interface: {e}"); - Err(e) - } else { - // TODO: Enable masquerading here. - tracing::info!("Firezoned Started!"); - Ok(()) - } - } - - #[tracing::instrument(level = "trace", skip(self))] - pub fn connection_request(&self, connection_request: RequestConnection) { - let tunnel = Arc::clone(&self.tunnel); - let mut control_signaler = self.control_signaler.clone(); - tokio::spawn(async move { - match tunnel - .set_peer_connection_request( - connection_request.client.rtc_session_description, - connection_request.client.peer.into(), - connection_request.relays, - connection_request.client.id, - connection_request.expires_at, - connection_request.resource, - ) - .await - { - Ok(gateway_rtc_session_description) => { - if let Err(err) = control_signaler - .control_signal - .send(EgressMessages::ConnectionReady(ConnectionReady { - reference: connection_request.reference, - gateway_rtc_session_description, - })) - .await - { - tunnel.cleanup_connection(connection_request.client.id.into()); - let _ = tunnel.callbacks().on_error(&err); - } - } - Err(err) => { - tunnel.cleanup_connection(connection_request.client.id.into()); - let _ = tunnel.callbacks().on_error(&err); - } - } - }); - } - - #[tracing::instrument(level = "trace", skip(self))] - pub fn allow_access( - &self, - AllowAccess { - client_id, - resource, - expires_at, - }: AllowAccess, - ) { - self.tunnel.allow_access(resource, client_id, expires_at) - } - - async fn add_ice_candidate( - &self, - ClientIceCandidates { - client_id, - candidates, - }: ClientIceCandidates, - ) { - for candidate in candidates { - if let Err(e) = self - .tunnel - .add_ice_candidate(client_id.into(), candidate) - .await - { - tracing::error!(err = ?e,"add_ice_candidate"); - let _ = self.tunnel.callbacks().on_error(&e); - } - } - } - - #[tracing::instrument(level = "trace", skip(self))] - pub async fn handle_message(&mut self, msg: IngressMessages) -> Result<()> { - match msg { - IngressMessages::Init(init) => self.init(init).await?, - IngressMessages::RequestConnection(connection_request) => { - self.connection_request(connection_request) - } - IngressMessages::AllowAccess(allow_access) => { - self.allow_access(allow_access); - } - - IngressMessages::IceCandidates(ice_candidate) => { - self.add_ice_candidate(ice_candidate).await - } - } - Ok(()) - } - - pub async fn stats_event(&mut self) { - tracing::debug!(target: "tunnel_state", stats = ?self.tunnel.stats()); - } -} diff --git a/rust/connlib/gateway-shared/src/lib.rs b/rust/connlib/gateway-shared/src/lib.rs deleted file mode 100644 index 017a16d87..000000000 --- a/rust/connlib/gateway-shared/src/lib.rs +++ /dev/null @@ -1,238 +0,0 @@ -//! Main connlib library for gateway. -pub use connlib_shared::{get_device_id, messages::ResourceDescription, Callbacks, Error}; - -use crate::control::ControlSignaler; -use backoff::{backoff::Backoff, ExponentialBackoffBuilder}; -use connlib_shared::control::SecureUrl; -use connlib_shared::{control::PhoenixChannel, login_url, CallbackErrorFacade, Mode, Result}; -use control::ControlPlane; -use firezone_tunnel::Tunnel; -use messages::IngressMessages; -use secrecy::{Secret, SecretString}; -use std::sync::Arc; -use std::time::Duration; -use tokio::runtime::Runtime; -use url::Url; - -mod control; -mod messages; - -struct StopRuntime; - -/// A session is the entry-point for connlib, maintains the runtime and the tunnel. -/// -/// A session is created using [Session::connect], then to stop a session we use [Session::disconnect]. -pub struct Session { - runtime_stopper: tokio::sync::mpsc::Sender, - pub callbacks: CallbackErrorFacade, -} - -macro_rules! fatal_error { - ($result:expr, $rt:expr, $cb:expr) => { - match $result { - Ok(res) => res, - Err(err) => { - Self::disconnect_inner($rt, $cb, Some(err)); - return; - } - } - }; -} - -impl Session -where - CB: Callbacks + 'static, -{ - /// Starts a session in the background. - /// - /// This will: - /// 1. Create and start a tokio runtime - /// 2. Connect to the control plane to the portal - /// 3. Start the tunnel in the background and forward control plane messages to it. - /// - /// The generic parameter `CB` should implement all the handlers and that's how errors will be surfaced. - /// - /// On a fatal error you should call `[Session::disconnect]` and start a new one. - // TODO: token should be something like SecretString but we need to think about FFI compatibility - pub fn connect( - portal_url: impl TryInto, - token: SecretString, - device_id: String, - callbacks: CB, - ) -> Result { - // TODO: We could use tokio::runtime::current() to get the current runtime - // which could work with swift-rust that already runs a runtime. But IDK if that will work - // in all platforms, a couple of new threads shouldn't bother none. - // Big question here however is how do we get the result? We could block here await the result and spawn a new task. - // but then platforms should know that this function is blocking. - - let callbacks = CallbackErrorFacade(callbacks); - let (tx, mut rx) = tokio::sync::mpsc::channel(1); - let this = Self { - runtime_stopper: tx.clone(), - callbacks, - }; - let runtime = tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build()?; - { - let callbacks = this.callbacks.clone(); - let default_panic_hook = std::panic::take_hook(); - std::panic::set_hook(Box::new({ - let tx = tx.clone(); - move |info| { - let tx = tx.clone(); - let err = info - .payload() - .downcast_ref::<&str>() - .map(|s| Error::Panic(s.to_string())) - .unwrap_or(Error::PanicNonStringPayload); - Self::disconnect_inner(tx, &callbacks, Some(err)); - default_panic_hook(info); - } - })); - } - - Self::connect_inner( - &runtime, - tx, - portal_url.try_into().map_err(|_| Error::UriError)?, - token, - device_id, - this.callbacks.clone(), - ); - std::thread::spawn(move || { - rx.blocking_recv(); - runtime.shutdown_background(); - }); - - Ok(this) - } - - fn connect_inner( - runtime: &Runtime, - runtime_stopper: tokio::sync::mpsc::Sender, - portal_url: Url, - token: SecretString, - device_id: String, - callbacks: CallbackErrorFacade, - ) { - runtime.spawn(async move { - let (connect_url, private_key) = fatal_error!( - login_url(Mode::Gateway, portal_url, token, device_id), - runtime_stopper, - &callbacks - ); - - // This is kinda hacky, the buffer size is 1 so that we make sure that we - // process one message at a time, blocking if a previous message haven't been processed - // to force queue ordering. - let (control_plane_sender, mut control_plane_receiver) = tokio::sync::mpsc::channel(1); - - let mut connection = PhoenixChannel::<_, IngressMessages, IngressMessages, IngressMessages>::new(Secret::new(SecureUrl::from_url(connect_url)), move |msg, reference| { - let control_plane_sender = control_plane_sender.clone(); - async move { - tracing::trace!("Received message: {msg:?}"); - if let Err(e) = control_plane_sender.send((msg, reference)).await { - tracing::warn!("Received a message after handler already closed: {e}. Probably message received during session clean up."); - } - } - }); - - // Used to send internal messages - let control_signaler = ControlSignaler { control_signal: connection.sender_with_topic("gateway".to_owned()) }; - let tunnel = fatal_error!( - Tunnel::new(private_key, control_signaler.clone(), callbacks.clone()).await, - runtime_stopper, - &callbacks - ); - - let mut control_plane = ControlPlane { - tunnel: Arc::new(tunnel), - control_signaler, - }; - - tokio::spawn(async move { - let mut interval = tokio::time::interval(Duration::from_secs(10)); - loop { - tokio::select! { - Some((msg, _)) = control_plane_receiver.recv() => { - match msg { - Ok(msg) => control_plane.handle_message(msg).await?, - Err(_msg_reply) => todo!(), - } - }, - _ = interval.tick() => control_plane.stats_event().await, - else => break - } - } - - Result::Ok(()) - }); - - tokio::spawn(async move { - let mut exponential_backoff = ExponentialBackoffBuilder::default() - .with_max_elapsed_time(None) - .build(); - loop { - // `connection.start` calls the callback only after connecting - tracing::debug!("Attempting connection to portal..."); - let result = connection.start(vec!["gateway".to_owned()], || exponential_backoff.reset()).await; - tracing::warn!("Disconnected from the portal"); - if let Err(e) = &result { - tracing::warn!(error = ?e, "Portal connection error"); - } - if let Some(t) = exponential_backoff.next_backoff() { - tracing::warn!("Error connecting to portal, retrying in {} seconds", t.as_secs()); - let _ = callbacks.on_error(&result.err().unwrap_or(Error::PortalConnectionError(tokio_tungstenite::tungstenite::Error::ConnectionClosed))); - tokio::time::sleep(t).await; - } else { - tracing::error!("Connection to portal failed, giving up"); - fatal_error!( - result.and(Err(Error::PortalConnectionError(tokio_tungstenite::tungstenite::Error::ConnectionClosed))), - runtime_stopper, - &callbacks - ); - } - } - - }); - - }); - } - - fn disconnect_inner( - runtime_stopper: tokio::sync::mpsc::Sender, - callbacks: &CallbackErrorFacade, - error: Option, - ) { - // 1. Close the websocket connection - // 2. Free the device handle (Linux) - // 3. Close the file descriptor (Linux/Android) - // 4. Remove the mapping - - // The way we cleanup the tasks is we drop the runtime - // this means we don't need to keep track of different tasks - // but if any of the tasks never yields this will block forever! - // So always yield and if you spawn a blocking tasks rewrite this. - // Furthermore, we will depend on Drop impls to do the list above so, - // implement them :) - // if there's no receiver the runtime is already stopped - // there's an edge case where this is called before the thread is listening for stop threads. - // but I believe in that case the channel will be in a signaled state achieving the same result - - if let Err(err) = runtime_stopper.try_send(StopRuntime) { - tracing::error!("Couldn't stop runtime: {err}"); - } - - let _ = callbacks.on_disconnect(error.as_ref()); - } - - /// Cleanup a [Session]. - /// - /// For now this just drops the runtime, which should drop all pending tasks. - /// Further cleanup should be done here. (Otherwise we can just drop [Session]). - pub fn disconnect(&mut self, error: Option) { - Self::disconnect_inner(self.runtime_stopper.clone(), &self.callbacks, error) - } -} diff --git a/rust/connlib/shared/src/messages.rs b/rust/connlib/shared/src/messages.rs index ef1eb81ab..cbdf504a4 100644 --- a/rust/connlib/shared/src/messages.rs +++ b/rust/connlib/shared/src/messages.rs @@ -43,6 +43,12 @@ impl fmt::Display for ResourceId { } } +impl fmt::Display for ClientId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + /// Represents a wireguard peer. #[derive(Debug, Deserialize, Serialize, Clone)] pub struct Peer { diff --git a/rust/gateway/Cargo.toml b/rust/gateway/Cargo.toml index ec1257230..dfb66d6f8 100644 --- a/rust/gateway/Cargo.toml +++ b/rust/gateway/Cargo.toml @@ -7,10 +7,26 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -secrecy = { workspace = true } -connlib-gateway-shared = { workspace = true } +anyhow = "1.0.75" +async-trait = { version = "0.1", default-features = false } +backoff = { workspace = true } +boringtun = { workspace = true } +chrono = { workspace = true } +clap = "4.4.5" +connlib-shared = { workspace = true } +firezone-tunnel = { workspace = true } +futures = "0.3.28" +futures-bounded = "0.1.0" headless-utils = { workspace = true } -anyhow = { version = "1.0" } +phoenix-channel = { workspace = true } +secrecy = { workspace = true } +serde = { version = "1.0", default-features = false, features = ["std", "derive"] } +tokio = { version = "1.32", default-features = false, features = ["sync", "macros"] } +tokio-tungstenite = { version = "0.20", default-features = false, features = ["connect", "handshake", "rustls-tls-webpki-roots"] } tracing = { workspace = true } -clap = { version = "4.3", features = ["derive", "env"] } tracing-subscriber = "0.3.17" +url = { version = "2.4.1", default-features = false } +webrtc = "0.8" + +[dev-dependencies] +serde_json = { version = "1.0", default-features = false, features = ["std"] } diff --git a/rust/gateway/src/control.rs b/rust/gateway/src/control.rs new file mode 100644 index 000000000..659bbc4b4 --- /dev/null +++ b/rust/gateway/src/control.rs @@ -0,0 +1,50 @@ +use async_trait::async_trait; +use connlib_shared::messages::ClientId; +use connlib_shared::Error::ControlProtocolError; +use connlib_shared::{ + messages::{GatewayId, ResourceDescription}, + Result, +}; +use firezone_tunnel::{ConnId, ControlSignal}; +use tokio::sync::mpsc; +use webrtc::ice_transport::ice_candidate::RTCIceCandidate; + +#[derive(Clone)] +pub struct ControlSignaler { + tx: mpsc::Sender<(ClientId, RTCIceCandidate)>, +} + +impl ControlSignaler { + pub fn new(tx: mpsc::Sender<(ClientId, RTCIceCandidate)>) -> Self { + Self { tx } + } +} + +#[async_trait] +impl ControlSignal for ControlSignaler { + async fn signal_connection_to( + &self, + resource: &ResourceDescription, + _connected_gateway_ids: &[GatewayId], + _: usize, + ) -> Result<()> { + tracing::warn!("A message to network resource: {resource:?} was discarded, gateways aren't meant to be used as clients."); + Ok(()) + } + + async fn signal_ice_candidate( + &self, + ice_candidate: RTCIceCandidate, + conn_id: ConnId, + ) -> Result<()> { + // TODO: We probably want to have different signal_ice_candidate + // functions for gateway/client but ultimately we just want + // separate control_plane modules + if let ConnId::Client(id) = conn_id { + let _ = self.tx.send((id, ice_candidate)).await; + Ok(()) + } else { + Err(ControlProtocolError) + } + } +} diff --git a/rust/gateway/src/eventloop.rs b/rust/gateway/src/eventloop.rs new file mode 100644 index 000000000..6531e602b --- /dev/null +++ b/rust/gateway/src/eventloop.rs @@ -0,0 +1,221 @@ +use crate::control::ControlSignaler; +use crate::messages::{ + AllowAccess, BroadcastClientIceCandidates, ClientIceCandidates, ConnectionReady, + EgressMessages, IngressMessages, +}; +use crate::CallbackHandler; +use anyhow::Result; +use connlib_shared::messages::ClientId; +use connlib_shared::Error; +use firezone_tunnel::Tunnel; +use phoenix_channel::PhoenixChannel; +use std::convert::Infallible; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::time::Duration; +use tokio::sync::mpsc; +use webrtc::ice_transport::ice_candidate::RTCIceCandidate; +use webrtc::peer_connection::sdp::session_description::RTCSessionDescription; + +pub const PHOENIX_TOPIC: &str = "gateway"; + +pub struct Eventloop { + tunnel: Arc>, + control_rx: mpsc::Receiver<(ClientId, RTCIceCandidate)>, + portal: PhoenixChannel, + + // TODO: Strongly type request reference (currently `String`) + connection_request_tasks: + futures_bounded::FuturesMap<(ClientId, String), Result>, + add_ice_candidate_tasks: futures_bounded::FuturesSet>, + + print_stats_timer: tokio::time::Interval, +} + +impl Eventloop { + pub(crate) fn new( + tunnel: Arc>, + control_rx: mpsc::Receiver<(ClientId, RTCIceCandidate)>, + portal: PhoenixChannel, + ) -> Self { + Self { + tunnel, + control_rx, + portal, + + // TODO: Pick sane values for timeouts and size. + connection_request_tasks: futures_bounded::FuturesMap::new( + Duration::from_secs(60), + 100, + ), + add_ice_candidate_tasks: futures_bounded::FuturesSet::new(Duration::from_secs(60), 100), + print_stats_timer: tokio::time::interval(Duration::from_secs(10)), + } + } +} + +impl Eventloop { + #[tracing::instrument(name = "Eventloop::poll", skip_all, level = "debug")] + pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll> { + loop { + if let Poll::Ready(Some((client, ice_candidate))) = self.control_rx.poll_recv(cx) { + let ice_candidate = match ice_candidate.to_json() { + Ok(ice_candidate) => ice_candidate, + Err(e) => { + tracing::warn!( + "Failed to serialize ICE candidate to JSON: {:#}", + anyhow::Error::new(e) + ); + continue; + } + }; + + tracing::debug!(%client, candidate = %ice_candidate.candidate, "Sending ICE candidate to client"); + + let _id = self.portal.send( + PHOENIX_TOPIC, + EgressMessages::BroadcastIceCandidates(BroadcastClientIceCandidates { + client_ids: vec![client], + candidates: vec![ice_candidate], + }), + ); + continue; + } + + match self.connection_request_tasks.poll_unpin(cx) { + Poll::Ready(((client, reference), Ok(Ok(gateway_rtc_session_description)))) => { + tracing::debug!(%client, %reference, "Connection is ready"); + + let _id = self.portal.send( + PHOENIX_TOPIC, + EgressMessages::ConnectionReady(ConnectionReady { + reference, + gateway_rtc_session_description, + }), + ); + + // TODO: If outbound request fails, cleanup connection. + continue; + } + Poll::Ready(((client, _), Ok(Err(e)))) => { + self.tunnel.cleanup_connection(client.into()); + tracing::debug!(%client, "Connection request failed: {:#}", anyhow::Error::new(e)); + + continue; + } + Poll::Ready(((client, reference), Err(e))) => { + tracing::debug!( + %client, + %reference, + "Failed to establish connection: {:#}", anyhow::Error::new(e) + ); + continue; + } + Poll::Pending => {} + } + + match self.add_ice_candidate_tasks.poll_unpin(cx) { + Poll::Ready(Ok(Ok(()))) => { + continue; + } + Poll::Ready(Ok(Err(e))) => { + tracing::error!("Failed to add ICE candidate: {:#}", anyhow::Error::new(e)); + + continue; + } + Poll::Ready(Err(e)) => { + tracing::error!("Failed to add ICE candidate: {e}"); + continue; + } + Poll::Pending => {} + } + + match self.portal.poll(cx)? { + Poll::Ready(phoenix_channel::Event::InboundMessage { + msg: IngressMessages::Init(_), + .. + }) => { + tracing::warn!("Received init message during operation"); + debug_assert!(false, "Received init message during operation"); + } + Poll::Ready(phoenix_channel::Event::InboundMessage { + msg: IngressMessages::RequestConnection(req), + .. + }) => { + let tunnel = Arc::clone(&self.tunnel); + + match self.connection_request_tasks.try_push( + (req.client.id, req.reference.clone()), + async move { + tunnel + .set_peer_connection_request( + req.client.rtc_session_description, + req.client.peer.into(), + req.relays, + req.client.id, + req.expires_at, + req.resource, + ) + .await + }, + ) { + Err(futures_bounded::PushError::BeyondCapacity(_)) => { + tracing::warn!("Too many connections requests, dropping existing one"); + } + Err(futures_bounded::PushError::ReplacedFuture(_)) => { + debug_assert!(false, "Received a 2nd connection requested with the same reference from the same client"); + } + Ok(()) => {} + }; + continue; + } + Poll::Ready(phoenix_channel::Event::InboundMessage { + msg: + IngressMessages::AllowAccess(AllowAccess { + client_id, + resource, + expires_at, + }), + .. + }) => { + tracing::debug!(client = %client_id, resource = %resource.id(), expires = %expires_at.to_rfc3339() ,"Allowing access to resource"); + + self.tunnel.allow_access(resource, client_id, expires_at); + continue; + } + Poll::Ready(phoenix_channel::Event::InboundMessage { + msg: + IngressMessages::IceCandidates(ClientIceCandidates { + client_id, + candidates, + }), + .. + }) => { + for candidate in candidates { + tracing::debug!(client = %client_id, candidate = %candidate.candidate, "Adding ICE candidate from client"); + + let tunnel = Arc::clone(&self.tunnel); + if self + .add_ice_candidate_tasks + .try_push(async move { + tunnel.add_ice_candidate(client_id.into(), candidate).await + }) + .is_err() + { + tracing::debug!("Received too many ICE candidates, dropping some"); + } + } + continue; + } + _ => {} + } + + if self.print_stats_timer.poll_tick(cx).is_ready() { + tracing::debug!(target: "tunnel_state", stats = ?self.tunnel.stats()); + continue; + } + + return Poll::Pending; + } + } +} diff --git a/rust/gateway/src/main.rs b/rust/gateway/src/main.rs index ad51d5959..03df56640 100644 --- a/rust/gateway/src/main.rs +++ b/rust/gateway/src/main.rs @@ -1,30 +1,102 @@ -use anyhow::Result; +use crate::control::ControlSignaler; +use crate::eventloop::{Eventloop, PHOENIX_TOPIC}; +use crate::messages::IngressMessages; +use anyhow::{Context as _, Result}; +use backoff::ExponentialBackoffBuilder; +use boringtun::x25519::StaticSecret; use clap::Parser; -use connlib_gateway_shared::{get_device_id, Callbacks, Session}; -use headless_utils::{block_on_ctrl_c, setup_global_subscriber, CommonArgs}; -use secrecy::SecretString; +use connlib_shared::{get_device_id, get_user_agent, login_url, Callbacks, Mode}; +use firezone_tunnel::Tunnel; +use futures::{future, TryFutureExt}; +use headless_utils::{setup_global_subscriber, CommonArgs}; +use phoenix_channel::SecureUrl; +use secrecy::{Secret, SecretString}; +use std::convert::Infallible; +use std::sync::Arc; +use std::time::Duration; use tracing_subscriber::layer; -fn main() -> Result<()> { +mod control; +mod eventloop; +mod messages; + +#[tokio::main] +async fn main() -> Result<()> { let cli = Cli::parse(); setup_global_subscriber(layer::Identity::new()); - let device_id = get_device_id(); - let mut session = Session::connect( + let (connect_url, private_key) = login_url( + Mode::Gateway, cli.common.url, - SecretString::from(cli.common.secret), - device_id, - CallbackHandler, - ) - .unwrap(); - tracing::info!("new_session"); + SecretString::new(cli.common.secret), + get_device_id(), + )?; - block_on_ctrl_c(); + tokio::spawn(backoff::future::retry_notify( + ExponentialBackoffBuilder::default() + .with_max_elapsed_time(None) + .build(), + move || { + connect( + private_key.clone(), + Secret::new(SecureUrl::from_url(connect_url.clone())), + ) + .map_err(backoff::Error::transient) + }, + |error, t: Duration| { + tracing::warn!(retry_in = ?t, "Error connecting to portal: {error}"); + }, + )); + + tokio::signal::ctrl_c().await?; - session.disconnect(None); Ok(()) } +async fn connect(private_key: StaticSecret, connect_url: Secret) -> Result { + // Note: This is only needed because [`Tunnel`] does not (yet) have a synchronous, poll-like interface. If it would have, ICE candidates would be emitted as events and we could just hand them to the phoenix channel. + let (control_tx, control_rx) = tokio::sync::mpsc::channel(1); + let signaler = ControlSignaler::new(control_tx); + let tunnel = Arc::new(Tunnel::new(private_key, signaler, CallbackHandler).await?); + + tracing::debug!("Attempting connection to portal..."); + + let mut channel = + phoenix_channel::PhoenixChannel::connect(connect_url, get_user_agent()).await?; + channel.join(PHOENIX_TOPIC, ()); + + let channel = loop { + match future::poll_fn(|cx| channel.poll(cx)) + .await + .context("portal connection failed")? + { + phoenix_channel::Event::JoinedRoom { topic } if topic == PHOENIX_TOPIC => { + tracing::info!("Joined gateway room on portal") + } + phoenix_channel::Event::InboundMessage { + topic, + msg: IngressMessages::Init(init), + } => { + tracing::info!("Received init message from portal on topic {topic}"); + + tunnel + .set_interface(&init.interface) + .await + .context("Failed to set interface")?; + + break channel; + } + other => { + tracing::debug!("Unhandled message from portal: {other:?}"); + } + } + }; + + let mut eventloop = Eventloop::new(tunnel, control_rx, channel); + + future::poll_fn(|cx| eventloop.poll(cx)).await +} + #[derive(Clone)] struct CallbackHandler; diff --git a/rust/connlib/gateway-shared/src/messages.rs b/rust/gateway/src/messages.rs similarity index 100% rename from rust/connlib/gateway-shared/src/messages.rs rename to rust/gateway/src/messages.rs