refactor(gateway): introduce Eventloop (#2244)

This commit is contained in:
Thomas Eizinger
2023-10-07 09:05:52 +11:00
committed by GitHub
parent 24aaa4fb7e
commit dde98f1985
11 changed files with 424 additions and 475 deletions

67
rust/Cargo.lock generated
View File

@@ -156,9 +156,9 @@ dependencies = [
[[package]]
name = "anstream"
version = "0.5.0"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1f58811cfac344940f1a400b6e6231ce35171f614f26439e80f8c1465c5cc0c"
checksum = "2ab91ebe16eb252986481c5b62f6098f3b698a45e34b5b98200cf20dd2484a44"
dependencies = [
"anstyle",
"anstyle-parse",
@@ -194,9 +194,9 @@ dependencies = [
[[package]]
name = "anstyle-wincon"
version = "2.1.0"
version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "58f54d10c6dfa51283a066ceab3ec1ab78d13fae00aa49243a45e4571fb79dfd"
checksum = "f0699d10d2f4d628a98ee7b57b289abbc98ff3bad977cb3152709d4bf2330628"
dependencies = [
"anstyle",
"windows-sys 0.48.0",
@@ -693,9 +693,9 @@ dependencies = [
[[package]]
name = "clap"
version = "4.4.4"
version = "4.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1d7b8d5ec32af0fadc644bf1fd509a688c2103b185644bb1e29d164e0703136"
checksum = "d04704f56c2cde07f43e8e2c154b43f216dc5c92fc98ada720177362f953b956"
dependencies = [
"clap_builder",
"clap_derive",
@@ -703,9 +703,9 @@ dependencies = [
[[package]]
name = "clap_builder"
version = "4.4.4"
version = "4.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5179bb514e4d7c2051749d8fcefa2ed6d06a9f4e6d69faf3805f5d80b8cf8d56"
checksum = "0e231faeaca65ebd1ea3c737966bf858971cd38c3849107aa3ea7de90a804e45"
dependencies = [
"anstream",
"anstyle",
@@ -814,25 +814,6 @@ dependencies = [
"webrtc",
]
[[package]]
name = "connlib-gateway-shared"
version = "1.20231001.0"
dependencies = [
"async-trait",
"backoff",
"chrono",
"connlib-shared",
"firezone-tunnel",
"secrecy",
"serde",
"serde_json",
"tokio",
"tokio-tungstenite",
"tracing",
"url",
"webrtc",
]
[[package]]
name = "connlib-shared"
version = "1.20231001.0"
@@ -1256,12 +1237,26 @@ name = "firezone-gateway"
version = "1.20231001.0"
dependencies = [
"anyhow",
"async-trait",
"backoff",
"boringtun",
"chrono",
"clap",
"connlib-gateway-shared",
"connlib-shared",
"firezone-tunnel",
"futures",
"futures-bounded",
"headless-utils",
"phoenix-channel",
"secrecy",
"serde",
"serde_json",
"tokio",
"tokio-tungstenite",
"tracing",
"tracing-subscriber",
"url",
"webrtc",
]
[[package]]
@@ -1340,6 +1335,16 @@ dependencies = [
"futures-util",
]
[[package]]
name = "futures-bounded"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b07bbbe7d7e78809544c6f718d875627addc73a7c3582447abc052cd3dc67e0"
dependencies = [
"futures-timer",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.28"
@@ -1396,6 +1401,12 @@ version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65"
[[package]]
name = "futures-timer"
version = "3.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c"
[[package]]
name = "futures-util"
version = "0.3.28"

View File

@@ -3,7 +3,6 @@ members = [
"connlib/clients/android",
"connlib/clients/apple",
"connlib/clients/shared",
"connlib/gateway-shared",
"connlib/shared",
"connlib/tunnel",
"gateway",
@@ -27,12 +26,12 @@ secrecy = "0.8"
connlib-client-android = { path = "connlib/clients/android"}
connlib-client-apple = { path = "connlib/clients/apple"}
connlib-client-shared = { path = "connlib/clients/shared"}
connlib-gateway-shared = { path = "connlib/gateway-shared"}
firezone-gateway = { path = "gateway"}
firezone-headless-client = { path = "headless-client"}
headless-utils = { path = "headless-utils"}
connlib-shared = { path = "connlib/shared"}
firezone-tunnel = { path = "connlib/tunnel"}
phoenix-channel = { path = "phoenix-channel"}
# Patched to use https://github.com/rust-lang/cc-rs/pull/708
# (the `patch` section can't be used for build deps...)

View File

@@ -1,22 +0,0 @@
[package]
name = "connlib-gateway-shared"
# mark:automatic-version
version = "1.20231001.0"
edition = "2021"
[dependencies]
secrecy = { workspace = true }
connlib-shared = { workspace = true }
async-trait = { version = "0.1", default-features = false }
firezone-tunnel = { workspace = true }
tokio = { version = "1.32", default-features = false, features = ["sync"] }
tracing = { workspace = true }
serde = { version = "1.0", default-features = false, features = ["std", "derive"] }
chrono = { workspace = true }
backoff = { workspace = true }
webrtc = "0.8"
url = { version = "2.4.1", default-features = false }
tokio-tungstenite = { version = "0.20", default-features = false, features = ["connect", "handshake", "rustls-tls-webpki-roots"] }
[dev-dependencies]
serde_json = { version = "1.0", default-features = false, features = ["std"] }

View File

@@ -1,166 +0,0 @@
use super::messages::{
ConnectionReady, EgressMessages, IngressMessages, InitGateway, RequestConnection,
};
use crate::messages::{AllowAccess, BroadcastClientIceCandidates, ClientIceCandidates};
use async_trait::async_trait;
use connlib_shared::Error::ControlProtocolError;
use connlib_shared::{
control::PhoenixSenderWithTopic,
messages::{GatewayId, ResourceDescription},
Callbacks, Result,
};
use firezone_tunnel::{ConnId, ControlSignal, Tunnel};
use std::sync::Arc;
use webrtc::ice_transport::ice_candidate::RTCIceCandidate;
pub struct ControlPlane<CB: Callbacks> {
pub tunnel: Arc<Tunnel<ControlSignaler, CB>>,
pub control_signaler: ControlSignaler,
}
#[derive(Clone)]
pub struct ControlSignaler {
pub control_signal: PhoenixSenderWithTopic,
}
#[async_trait]
impl ControlSignal for ControlSignaler {
async fn signal_connection_to(
&self,
resource: &ResourceDescription,
_connected_gateway_ids: &[GatewayId],
_: usize,
) -> Result<()> {
tracing::warn!("A message to network resource: {resource:?} was discarded, gateways aren't meant to be used as clients.");
Ok(())
}
async fn signal_ice_candidate(
&self,
ice_candidate: RTCIceCandidate,
conn_id: ConnId,
) -> Result<()> {
// TODO: We probably want to have different signal_ice_candidate
// functions for gateway/client but ultimately we just want
// separate control_plane modules
if let ConnId::Client(id) = conn_id {
self.control_signal
.clone()
.send(EgressMessages::BroadcastIceCandidates(
BroadcastClientIceCandidates {
client_ids: vec![id],
candidates: vec![ice_candidate.to_json()?],
},
))
.await?;
Ok(())
} else {
Err(ControlProtocolError)
}
}
}
impl<CB: Callbacks + 'static> ControlPlane<CB> {
#[tracing::instrument(level = "trace", skip(self))]
pub async fn init(&mut self, init: InitGateway) -> Result<()> {
if let Err(e) = self.tunnel.set_interface(&init.interface).await {
tracing::error!("Couldn't initialize interface: {e}");
Err(e)
} else {
// TODO: Enable masquerading here.
tracing::info!("Firezoned Started!");
Ok(())
}
}
#[tracing::instrument(level = "trace", skip(self))]
pub fn connection_request(&self, connection_request: RequestConnection) {
let tunnel = Arc::clone(&self.tunnel);
let mut control_signaler = self.control_signaler.clone();
tokio::spawn(async move {
match tunnel
.set_peer_connection_request(
connection_request.client.rtc_session_description,
connection_request.client.peer.into(),
connection_request.relays,
connection_request.client.id,
connection_request.expires_at,
connection_request.resource,
)
.await
{
Ok(gateway_rtc_session_description) => {
if let Err(err) = control_signaler
.control_signal
.send(EgressMessages::ConnectionReady(ConnectionReady {
reference: connection_request.reference,
gateway_rtc_session_description,
}))
.await
{
tunnel.cleanup_connection(connection_request.client.id.into());
let _ = tunnel.callbacks().on_error(&err);
}
}
Err(err) => {
tunnel.cleanup_connection(connection_request.client.id.into());
let _ = tunnel.callbacks().on_error(&err);
}
}
});
}
#[tracing::instrument(level = "trace", skip(self))]
pub fn allow_access(
&self,
AllowAccess {
client_id,
resource,
expires_at,
}: AllowAccess,
) {
self.tunnel.allow_access(resource, client_id, expires_at)
}
async fn add_ice_candidate(
&self,
ClientIceCandidates {
client_id,
candidates,
}: ClientIceCandidates,
) {
for candidate in candidates {
if let Err(e) = self
.tunnel
.add_ice_candidate(client_id.into(), candidate)
.await
{
tracing::error!(err = ?e,"add_ice_candidate");
let _ = self.tunnel.callbacks().on_error(&e);
}
}
}
#[tracing::instrument(level = "trace", skip(self))]
pub async fn handle_message(&mut self, msg: IngressMessages) -> Result<()> {
match msg {
IngressMessages::Init(init) => self.init(init).await?,
IngressMessages::RequestConnection(connection_request) => {
self.connection_request(connection_request)
}
IngressMessages::AllowAccess(allow_access) => {
self.allow_access(allow_access);
}
IngressMessages::IceCandidates(ice_candidate) => {
self.add_ice_candidate(ice_candidate).await
}
}
Ok(())
}
pub async fn stats_event(&mut self) {
tracing::debug!(target: "tunnel_state", stats = ?self.tunnel.stats());
}
}

View File

@@ -1,238 +0,0 @@
//! Main connlib library for gateway.
pub use connlib_shared::{get_device_id, messages::ResourceDescription, Callbacks, Error};
use crate::control::ControlSignaler;
use backoff::{backoff::Backoff, ExponentialBackoffBuilder};
use connlib_shared::control::SecureUrl;
use connlib_shared::{control::PhoenixChannel, login_url, CallbackErrorFacade, Mode, Result};
use control::ControlPlane;
use firezone_tunnel::Tunnel;
use messages::IngressMessages;
use secrecy::{Secret, SecretString};
use std::sync::Arc;
use std::time::Duration;
use tokio::runtime::Runtime;
use url::Url;
mod control;
mod messages;
struct StopRuntime;
/// A session is the entry-point for connlib, maintains the runtime and the tunnel.
///
/// A session is created using [Session::connect], then to stop a session we use [Session::disconnect].
pub struct Session<CB: Callbacks> {
runtime_stopper: tokio::sync::mpsc::Sender<StopRuntime>,
pub callbacks: CallbackErrorFacade<CB>,
}
macro_rules! fatal_error {
($result:expr, $rt:expr, $cb:expr) => {
match $result {
Ok(res) => res,
Err(err) => {
Self::disconnect_inner($rt, $cb, Some(err));
return;
}
}
};
}
impl<CB> Session<CB>
where
CB: Callbacks + 'static,
{
/// Starts a session in the background.
///
/// This will:
/// 1. Create and start a tokio runtime
/// 2. Connect to the control plane to the portal
/// 3. Start the tunnel in the background and forward control plane messages to it.
///
/// The generic parameter `CB` should implement all the handlers and that's how errors will be surfaced.
///
/// On a fatal error you should call `[Session::disconnect]` and start a new one.
// TODO: token should be something like SecretString but we need to think about FFI compatibility
pub fn connect(
portal_url: impl TryInto<Url>,
token: SecretString,
device_id: String,
callbacks: CB,
) -> Result<Self> {
// TODO: We could use tokio::runtime::current() to get the current runtime
// which could work with swift-rust that already runs a runtime. But IDK if that will work
// in all platforms, a couple of new threads shouldn't bother none.
// Big question here however is how do we get the result? We could block here await the result and spawn a new task.
// but then platforms should know that this function is blocking.
let callbacks = CallbackErrorFacade(callbacks);
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let this = Self {
runtime_stopper: tx.clone(),
callbacks,
};
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
{
let callbacks = this.callbacks.clone();
let default_panic_hook = std::panic::take_hook();
std::panic::set_hook(Box::new({
let tx = tx.clone();
move |info| {
let tx = tx.clone();
let err = info
.payload()
.downcast_ref::<&str>()
.map(|s| Error::Panic(s.to_string()))
.unwrap_or(Error::PanicNonStringPayload);
Self::disconnect_inner(tx, &callbacks, Some(err));
default_panic_hook(info);
}
}));
}
Self::connect_inner(
&runtime,
tx,
portal_url.try_into().map_err(|_| Error::UriError)?,
token,
device_id,
this.callbacks.clone(),
);
std::thread::spawn(move || {
rx.blocking_recv();
runtime.shutdown_background();
});
Ok(this)
}
fn connect_inner(
runtime: &Runtime,
runtime_stopper: tokio::sync::mpsc::Sender<StopRuntime>,
portal_url: Url,
token: SecretString,
device_id: String,
callbacks: CallbackErrorFacade<CB>,
) {
runtime.spawn(async move {
let (connect_url, private_key) = fatal_error!(
login_url(Mode::Gateway, portal_url, token, device_id),
runtime_stopper,
&callbacks
);
// This is kinda hacky, the buffer size is 1 so that we make sure that we
// process one message at a time, blocking if a previous message haven't been processed
// to force queue ordering.
let (control_plane_sender, mut control_plane_receiver) = tokio::sync::mpsc::channel(1);
let mut connection = PhoenixChannel::<_, IngressMessages, IngressMessages, IngressMessages>::new(Secret::new(SecureUrl::from_url(connect_url)), move |msg, reference| {
let control_plane_sender = control_plane_sender.clone();
async move {
tracing::trace!("Received message: {msg:?}");
if let Err(e) = control_plane_sender.send((msg, reference)).await {
tracing::warn!("Received a message after handler already closed: {e}. Probably message received during session clean up.");
}
}
});
// Used to send internal messages
let control_signaler = ControlSignaler { control_signal: connection.sender_with_topic("gateway".to_owned()) };
let tunnel = fatal_error!(
Tunnel::new(private_key, control_signaler.clone(), callbacks.clone()).await,
runtime_stopper,
&callbacks
);
let mut control_plane = ControlPlane {
tunnel: Arc::new(tunnel),
control_signaler,
};
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(10));
loop {
tokio::select! {
Some((msg, _)) = control_plane_receiver.recv() => {
match msg {
Ok(msg) => control_plane.handle_message(msg).await?,
Err(_msg_reply) => todo!(),
}
},
_ = interval.tick() => control_plane.stats_event().await,
else => break
}
}
Result::Ok(())
});
tokio::spawn(async move {
let mut exponential_backoff = ExponentialBackoffBuilder::default()
.with_max_elapsed_time(None)
.build();
loop {
// `connection.start` calls the callback only after connecting
tracing::debug!("Attempting connection to portal...");
let result = connection.start(vec!["gateway".to_owned()], || exponential_backoff.reset()).await;
tracing::warn!("Disconnected from the portal");
if let Err(e) = &result {
tracing::warn!(error = ?e, "Portal connection error");
}
if let Some(t) = exponential_backoff.next_backoff() {
tracing::warn!("Error connecting to portal, retrying in {} seconds", t.as_secs());
let _ = callbacks.on_error(&result.err().unwrap_or(Error::PortalConnectionError(tokio_tungstenite::tungstenite::Error::ConnectionClosed)));
tokio::time::sleep(t).await;
} else {
tracing::error!("Connection to portal failed, giving up");
fatal_error!(
result.and(Err(Error::PortalConnectionError(tokio_tungstenite::tungstenite::Error::ConnectionClosed))),
runtime_stopper,
&callbacks
);
}
}
});
});
}
fn disconnect_inner(
runtime_stopper: tokio::sync::mpsc::Sender<StopRuntime>,
callbacks: &CallbackErrorFacade<CB>,
error: Option<Error>,
) {
// 1. Close the websocket connection
// 2. Free the device handle (Linux)
// 3. Close the file descriptor (Linux/Android)
// 4. Remove the mapping
// The way we cleanup the tasks is we drop the runtime
// this means we don't need to keep track of different tasks
// but if any of the tasks never yields this will block forever!
// So always yield and if you spawn a blocking tasks rewrite this.
// Furthermore, we will depend on Drop impls to do the list above so,
// implement them :)
// if there's no receiver the runtime is already stopped
// there's an edge case where this is called before the thread is listening for stop threads.
// but I believe in that case the channel will be in a signaled state achieving the same result
if let Err(err) = runtime_stopper.try_send(StopRuntime) {
tracing::error!("Couldn't stop runtime: {err}");
}
let _ = callbacks.on_disconnect(error.as_ref());
}
/// Cleanup a [Session].
///
/// For now this just drops the runtime, which should drop all pending tasks.
/// Further cleanup should be done here. (Otherwise we can just drop [Session]).
pub fn disconnect(&mut self, error: Option<Error>) {
Self::disconnect_inner(self.runtime_stopper.clone(), &self.callbacks, error)
}
}

View File

@@ -43,6 +43,12 @@ impl fmt::Display for ResourceId {
}
}
impl fmt::Display for ClientId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
/// Represents a wireguard peer.
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct Peer {

View File

@@ -7,10 +7,26 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
secrecy = { workspace = true }
connlib-gateway-shared = { workspace = true }
anyhow = "1.0.75"
async-trait = { version = "0.1", default-features = false }
backoff = { workspace = true }
boringtun = { workspace = true }
chrono = { workspace = true }
clap = "4.4.5"
connlib-shared = { workspace = true }
firezone-tunnel = { workspace = true }
futures = "0.3.28"
futures-bounded = "0.1.0"
headless-utils = { workspace = true }
anyhow = { version = "1.0" }
phoenix-channel = { workspace = true }
secrecy = { workspace = true }
serde = { version = "1.0", default-features = false, features = ["std", "derive"] }
tokio = { version = "1.32", default-features = false, features = ["sync", "macros"] }
tokio-tungstenite = { version = "0.20", default-features = false, features = ["connect", "handshake", "rustls-tls-webpki-roots"] }
tracing = { workspace = true }
clap = { version = "4.3", features = ["derive", "env"] }
tracing-subscriber = "0.3.17"
url = { version = "2.4.1", default-features = false }
webrtc = "0.8"
[dev-dependencies]
serde_json = { version = "1.0", default-features = false, features = ["std"] }

View File

@@ -0,0 +1,50 @@
use async_trait::async_trait;
use connlib_shared::messages::ClientId;
use connlib_shared::Error::ControlProtocolError;
use connlib_shared::{
messages::{GatewayId, ResourceDescription},
Result,
};
use firezone_tunnel::{ConnId, ControlSignal};
use tokio::sync::mpsc;
use webrtc::ice_transport::ice_candidate::RTCIceCandidate;
#[derive(Clone)]
pub struct ControlSignaler {
tx: mpsc::Sender<(ClientId, RTCIceCandidate)>,
}
impl ControlSignaler {
pub fn new(tx: mpsc::Sender<(ClientId, RTCIceCandidate)>) -> Self {
Self { tx }
}
}
#[async_trait]
impl ControlSignal for ControlSignaler {
async fn signal_connection_to(
&self,
resource: &ResourceDescription,
_connected_gateway_ids: &[GatewayId],
_: usize,
) -> Result<()> {
tracing::warn!("A message to network resource: {resource:?} was discarded, gateways aren't meant to be used as clients.");
Ok(())
}
async fn signal_ice_candidate(
&self,
ice_candidate: RTCIceCandidate,
conn_id: ConnId,
) -> Result<()> {
// TODO: We probably want to have different signal_ice_candidate
// functions for gateway/client but ultimately we just want
// separate control_plane modules
if let ConnId::Client(id) = conn_id {
let _ = self.tx.send((id, ice_candidate)).await;
Ok(())
} else {
Err(ControlProtocolError)
}
}
}

View File

@@ -0,0 +1,221 @@
use crate::control::ControlSignaler;
use crate::messages::{
AllowAccess, BroadcastClientIceCandidates, ClientIceCandidates, ConnectionReady,
EgressMessages, IngressMessages,
};
use crate::CallbackHandler;
use anyhow::Result;
use connlib_shared::messages::ClientId;
use connlib_shared::Error;
use firezone_tunnel::Tunnel;
use phoenix_channel::PhoenixChannel;
use std::convert::Infallible;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::sync::mpsc;
use webrtc::ice_transport::ice_candidate::RTCIceCandidate;
use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
pub const PHOENIX_TOPIC: &str = "gateway";
pub struct Eventloop {
tunnel: Arc<Tunnel<ControlSignaler, CallbackHandler>>,
control_rx: mpsc::Receiver<(ClientId, RTCIceCandidate)>,
portal: PhoenixChannel<IngressMessages, ()>,
// TODO: Strongly type request reference (currently `String`)
connection_request_tasks:
futures_bounded::FuturesMap<(ClientId, String), Result<RTCSessionDescription, Error>>,
add_ice_candidate_tasks: futures_bounded::FuturesSet<Result<(), Error>>,
print_stats_timer: tokio::time::Interval,
}
impl Eventloop {
pub(crate) fn new(
tunnel: Arc<Tunnel<ControlSignaler, CallbackHandler>>,
control_rx: mpsc::Receiver<(ClientId, RTCIceCandidate)>,
portal: PhoenixChannel<IngressMessages, ()>,
) -> Self {
Self {
tunnel,
control_rx,
portal,
// TODO: Pick sane values for timeouts and size.
connection_request_tasks: futures_bounded::FuturesMap::new(
Duration::from_secs(60),
100,
),
add_ice_candidate_tasks: futures_bounded::FuturesSet::new(Duration::from_secs(60), 100),
print_stats_timer: tokio::time::interval(Duration::from_secs(10)),
}
}
}
impl Eventloop {
#[tracing::instrument(name = "Eventloop::poll", skip_all, level = "debug")]
pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Result<Infallible>> {
loop {
if let Poll::Ready(Some((client, ice_candidate))) = self.control_rx.poll_recv(cx) {
let ice_candidate = match ice_candidate.to_json() {
Ok(ice_candidate) => ice_candidate,
Err(e) => {
tracing::warn!(
"Failed to serialize ICE candidate to JSON: {:#}",
anyhow::Error::new(e)
);
continue;
}
};
tracing::debug!(%client, candidate = %ice_candidate.candidate, "Sending ICE candidate to client");
let _id = self.portal.send(
PHOENIX_TOPIC,
EgressMessages::BroadcastIceCandidates(BroadcastClientIceCandidates {
client_ids: vec![client],
candidates: vec![ice_candidate],
}),
);
continue;
}
match self.connection_request_tasks.poll_unpin(cx) {
Poll::Ready(((client, reference), Ok(Ok(gateway_rtc_session_description)))) => {
tracing::debug!(%client, %reference, "Connection is ready");
let _id = self.portal.send(
PHOENIX_TOPIC,
EgressMessages::ConnectionReady(ConnectionReady {
reference,
gateway_rtc_session_description,
}),
);
// TODO: If outbound request fails, cleanup connection.
continue;
}
Poll::Ready(((client, _), Ok(Err(e)))) => {
self.tunnel.cleanup_connection(client.into());
tracing::debug!(%client, "Connection request failed: {:#}", anyhow::Error::new(e));
continue;
}
Poll::Ready(((client, reference), Err(e))) => {
tracing::debug!(
%client,
%reference,
"Failed to establish connection: {:#}", anyhow::Error::new(e)
);
continue;
}
Poll::Pending => {}
}
match self.add_ice_candidate_tasks.poll_unpin(cx) {
Poll::Ready(Ok(Ok(()))) => {
continue;
}
Poll::Ready(Ok(Err(e))) => {
tracing::error!("Failed to add ICE candidate: {:#}", anyhow::Error::new(e));
continue;
}
Poll::Ready(Err(e)) => {
tracing::error!("Failed to add ICE candidate: {e}");
continue;
}
Poll::Pending => {}
}
match self.portal.poll(cx)? {
Poll::Ready(phoenix_channel::Event::InboundMessage {
msg: IngressMessages::Init(_),
..
}) => {
tracing::warn!("Received init message during operation");
debug_assert!(false, "Received init message during operation");
}
Poll::Ready(phoenix_channel::Event::InboundMessage {
msg: IngressMessages::RequestConnection(req),
..
}) => {
let tunnel = Arc::clone(&self.tunnel);
match self.connection_request_tasks.try_push(
(req.client.id, req.reference.clone()),
async move {
tunnel
.set_peer_connection_request(
req.client.rtc_session_description,
req.client.peer.into(),
req.relays,
req.client.id,
req.expires_at,
req.resource,
)
.await
},
) {
Err(futures_bounded::PushError::BeyondCapacity(_)) => {
tracing::warn!("Too many connections requests, dropping existing one");
}
Err(futures_bounded::PushError::ReplacedFuture(_)) => {
debug_assert!(false, "Received a 2nd connection requested with the same reference from the same client");
}
Ok(()) => {}
};
continue;
}
Poll::Ready(phoenix_channel::Event::InboundMessage {
msg:
IngressMessages::AllowAccess(AllowAccess {
client_id,
resource,
expires_at,
}),
..
}) => {
tracing::debug!(client = %client_id, resource = %resource.id(), expires = %expires_at.to_rfc3339() ,"Allowing access to resource");
self.tunnel.allow_access(resource, client_id, expires_at);
continue;
}
Poll::Ready(phoenix_channel::Event::InboundMessage {
msg:
IngressMessages::IceCandidates(ClientIceCandidates {
client_id,
candidates,
}),
..
}) => {
for candidate in candidates {
tracing::debug!(client = %client_id, candidate = %candidate.candidate, "Adding ICE candidate from client");
let tunnel = Arc::clone(&self.tunnel);
if self
.add_ice_candidate_tasks
.try_push(async move {
tunnel.add_ice_candidate(client_id.into(), candidate).await
})
.is_err()
{
tracing::debug!("Received too many ICE candidates, dropping some");
}
}
continue;
}
_ => {}
}
if self.print_stats_timer.poll_tick(cx).is_ready() {
tracing::debug!(target: "tunnel_state", stats = ?self.tunnel.stats());
continue;
}
return Poll::Pending;
}
}
}

View File

@@ -1,30 +1,102 @@
use anyhow::Result;
use crate::control::ControlSignaler;
use crate::eventloop::{Eventloop, PHOENIX_TOPIC};
use crate::messages::IngressMessages;
use anyhow::{Context as _, Result};
use backoff::ExponentialBackoffBuilder;
use boringtun::x25519::StaticSecret;
use clap::Parser;
use connlib_gateway_shared::{get_device_id, Callbacks, Session};
use headless_utils::{block_on_ctrl_c, setup_global_subscriber, CommonArgs};
use secrecy::SecretString;
use connlib_shared::{get_device_id, get_user_agent, login_url, Callbacks, Mode};
use firezone_tunnel::Tunnel;
use futures::{future, TryFutureExt};
use headless_utils::{setup_global_subscriber, CommonArgs};
use phoenix_channel::SecureUrl;
use secrecy::{Secret, SecretString};
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;
use tracing_subscriber::layer;
fn main() -> Result<()> {
mod control;
mod eventloop;
mod messages;
#[tokio::main]
async fn main() -> Result<()> {
let cli = Cli::parse();
setup_global_subscriber(layer::Identity::new());
let device_id = get_device_id();
let mut session = Session::connect(
let (connect_url, private_key) = login_url(
Mode::Gateway,
cli.common.url,
SecretString::from(cli.common.secret),
device_id,
CallbackHandler,
)
.unwrap();
tracing::info!("new_session");
SecretString::new(cli.common.secret),
get_device_id(),
)?;
block_on_ctrl_c();
tokio::spawn(backoff::future::retry_notify(
ExponentialBackoffBuilder::default()
.with_max_elapsed_time(None)
.build(),
move || {
connect(
private_key.clone(),
Secret::new(SecureUrl::from_url(connect_url.clone())),
)
.map_err(backoff::Error::transient)
},
|error, t: Duration| {
tracing::warn!(retry_in = ?t, "Error connecting to portal: {error}");
},
));
tokio::signal::ctrl_c().await?;
session.disconnect(None);
Ok(())
}
async fn connect(private_key: StaticSecret, connect_url: Secret<SecureUrl>) -> Result<Infallible> {
// Note: This is only needed because [`Tunnel`] does not (yet) have a synchronous, poll-like interface. If it would have, ICE candidates would be emitted as events and we could just hand them to the phoenix channel.
let (control_tx, control_rx) = tokio::sync::mpsc::channel(1);
let signaler = ControlSignaler::new(control_tx);
let tunnel = Arc::new(Tunnel::new(private_key, signaler, CallbackHandler).await?);
tracing::debug!("Attempting connection to portal...");
let mut channel =
phoenix_channel::PhoenixChannel::connect(connect_url, get_user_agent()).await?;
channel.join(PHOENIX_TOPIC, ());
let channel = loop {
match future::poll_fn(|cx| channel.poll(cx))
.await
.context("portal connection failed")?
{
phoenix_channel::Event::JoinedRoom { topic } if topic == PHOENIX_TOPIC => {
tracing::info!("Joined gateway room on portal")
}
phoenix_channel::Event::InboundMessage {
topic,
msg: IngressMessages::Init(init),
} => {
tracing::info!("Received init message from portal on topic {topic}");
tunnel
.set_interface(&init.interface)
.await
.context("Failed to set interface")?;
break channel;
}
other => {
tracing::debug!("Unhandled message from portal: {other:?}");
}
}
};
let mut eventloop = Eventloop::new(tunnel, control_rx, channel);
future::poll_fn(|cx| eventloop.poll(cx)).await
}
#[derive(Clone)]
struct CallbackHandler;