diff --git a/rust/Cargo.lock b/rust/Cargo.lock index b3fc626cf..00de3f323 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1111,6 +1111,7 @@ dependencies = [ "firezone-tunnel", "ip_network", "phoenix-channel", + "rayon", "secrecy", "serde", "serde_json", @@ -1268,6 +1269,25 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-deque" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.20" @@ -5118,6 +5138,26 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "20675572f6f24e9e76ef639bc5552774ed45f1c30e2951e1e99c59888861c539" +[[package]] +name = "rayon" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "redox_syscall" version = "0.5.7" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index b2b943060..4174e8a79 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -89,6 +89,7 @@ quinn-udp = { version = "0.5.8", features = ["fast-apple-datapath"] } rand = "0.8.5" rand_core = "0.6.4" rangemap = "1.5.1" +rayon = "1.10.0" reqwest = { version = "0.12.9", default-features = false } rtnetlink = { version = "0.14.1", default-features = false, features = ["tokio_socket"] } rustls = { version = "0.23.21", default-features = false, features = ["ring"] } diff --git a/rust/connlib/clients/android/src/lib.rs b/rust/connlib/clients/android/src/lib.rs index e7fa421b2..06acb1f8f 100644 --- a/rust/connlib/clients/android/src/lib.rs +++ b/rust/connlib/clients/android/src/lib.rs @@ -246,7 +246,7 @@ impl Callbacks for CallbackHandler { .expect("onUpdateResources callback failed") } - fn on_disconnect(&self, error: &DisconnectError) { + fn on_disconnect(&self, error: DisconnectError) { self.env(|mut env| { let error = env .new_string(serde_json::to_string(&error.to_string())?) diff --git a/rust/connlib/clients/apple/src/lib.rs b/rust/connlib/clients/apple/src/lib.rs index 4ebe46b74..d270b23c9 100644 --- a/rust/connlib/clients/apple/src/lib.rs +++ b/rust/connlib/clients/apple/src/lib.rs @@ -163,7 +163,7 @@ impl Callbacks for CallbackHandler { self.inner.on_update_resources(resource_list); } - fn on_disconnect(&self, error: &DisconnectError) { + fn on_disconnect(&self, error: DisconnectError) { self.inner.on_disconnect(error.to_string()); } } diff --git a/rust/connlib/clients/shared/Cargo.toml b/rust/connlib/clients/shared/Cargo.toml index ca676720c..9b4220be7 100644 --- a/rust/connlib/clients/shared/Cargo.toml +++ b/rust/connlib/clients/shared/Cargo.toml @@ -13,6 +13,7 @@ firezone-logging = { workspace = true } firezone-tunnel = { workspace = true } ip_network = { workspace = true } phoenix-channel = { workspace = true } +rayon = { workspace = true } secrecy = { workspace = true } serde = { workspace = true, features = ["std", "derive"] } snownet = { workspace = true } diff --git a/rust/connlib/clients/shared/src/callbacks.rs b/rust/connlib/clients/shared/src/callbacks.rs index b8f96a1fd..d241e70c2 100644 --- a/rust/connlib/clients/shared/src/callbacks.rs +++ b/rust/connlib/clients/shared/src/callbacks.rs @@ -1,6 +1,9 @@ use connlib_model::ResourceView; use ip_network::{Ipv4Network, Ipv6Network}; -use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; +use std::{ + net::{IpAddr, Ipv4Addr, Ipv6Addr}, + sync::Arc, +}; /// Traits that will be used by connlib to callback the client upper layers. pub trait Callbacks: Clone + Send + Sync { @@ -27,7 +30,7 @@ pub trait Callbacks: Clone + Send + Sync { fn on_update_resources(&self, _: Vec) {} /// Called when the tunnel is disconnected. - fn on_disconnect(&self, _: &DisconnectError) {} + fn on_disconnect(&self, _: DisconnectError) {} } /// Unified error type to use across connlib. @@ -49,3 +52,66 @@ impl DisconnectError { e.is_authentication_error() } } + +#[derive(Debug, Clone)] +pub struct BackgroundCallbacks { + inner: C, + threadpool: Arc, +} + +impl BackgroundCallbacks { + pub fn new(callbacks: C) -> Self { + Self { + inner: callbacks, + threadpool: Arc::new( + rayon::ThreadPoolBuilder::new() + .num_threads(1) + .thread_name(|_| "connlib callbacks".to_owned()) + .build() + .expect("Unable to create thread-pool"), + ), + } + } +} + +impl Callbacks for BackgroundCallbacks +where + C: Callbacks + 'static, +{ + fn on_set_interface_config( + &self, + ipv4_addr: Ipv4Addr, + ipv6_addr: Ipv6Addr, + dns_addresses: Vec, + route_list_4: Vec, + route_list_6: Vec, + ) { + let callbacks = self.inner.clone(); + + self.threadpool.spawn(move || { + callbacks.on_set_interface_config( + ipv4_addr, + ipv6_addr, + dns_addresses, + route_list_4, + route_list_6, + ); + }); + } + + fn on_update_resources(&self, resources: Vec) { + let callbacks = self.inner.clone(); + + self.threadpool.spawn(move || { + callbacks.on_update_resources(resources); + }); + } + + fn on_disconnect(&self, error: DisconnectError) { + let callbacks = self.inner.clone(); + + self.threadpool.spawn(move || { + callbacks.on_disconnect(error); + }); + } +} diff --git a/rust/connlib/clients/shared/src/lib.rs b/rust/connlib/clients/shared/src/lib.rs index d5973ca7d..1d5fbe017 100644 --- a/rust/connlib/clients/shared/src/lib.rs +++ b/rust/connlib/clients/shared/src/lib.rs @@ -1,5 +1,6 @@ //! Main connlib library for clients. pub use crate::serde_routelist::{V4RouteList, V6RouteList}; +use callbacks::BackgroundCallbacks; pub use callbacks::{Callbacks, DisconnectError}; pub use connlib_model::StaticSecret; pub use eventloop::Eventloop; @@ -42,6 +43,8 @@ impl Session { portal: PhoenixChannel<(), IngressMessages, (), PublicKeyParam>, handle: tokio::runtime::Handle, ) -> Self { + let callbacks = BackgroundCallbacks::new(callbacks); // Run all callbacks on a background thread to avoid blocking the main connlib task. + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); let connect_handle = handle.spawn(connect( @@ -141,7 +144,7 @@ async fn connect_supervisor( Ok(Ok(())) => { tracing::info!("connlib exited gracefully"); } - Ok(Err(e)) => callbacks.on_disconnect(&DisconnectError::PortalConnectionFailed(e)), - Err(e) => callbacks.on_disconnect(&DisconnectError::Crash(e)), + Ok(Err(e)) => callbacks.on_disconnect(DisconnectError::PortalConnectionFailed(e)), + Err(e) => callbacks.on_disconnect(DisconnectError::Crash(e)), } } diff --git a/rust/headless-client/src/lib.rs b/rust/headless-client/src/lib.rs index 07962cc04..91cd61b84 100644 --- a/rust/headless-client/src/lib.rs +++ b/rust/headless-client/src/lib.rs @@ -97,7 +97,7 @@ pub struct CallbackHandler { } impl Callbacks for CallbackHandler { - fn on_disconnect(&self, error: &connlib_client_shared::DisconnectError) { + fn on_disconnect(&self, error: connlib_client_shared::DisconnectError) { self.cb_tx .try_send(ConnlibMsg::OnDisconnect { error_msg: error.to_string(),