fix(connlib): run all callbacks on a separate thread (#8126)

At present, `connlib` communicates with its host app via callbacks.
These callbacks are executed synchronously as part of `connlib`s
event-loop, meaning `connlib` cannot do anything else whilst the
callback is executing in the host app. Additionally, this callback runs
within the `Future` that represents `connlib` and thus runs on a `tokio`
worker thread.

Attempting to interact with the session from within the callback can
lead to panics, for example when `Session::disconnect` is called which
uses `Runtime::block_on`. This isn't allowed by `tokio`: You cannot
block on the execution of an async task from within one of the worker
threads.

To solve both of these problems, we introduce a thread-pool of size 1
that is responsible for executing `connlib` callbacks. Not only does
this allow `connlib` to perform more work such as routing packets or
process portal messages, it also means that it is not possible for the
host app to cause these panics within the `tokio` runtime because the
callbacks run on a different thread.
This commit is contained in:
Thomas Eizinger
2025-02-14 17:54:35 +11:00
committed by GitHub
parent 10ba02e341
commit 8f0db6ad47
8 changed files with 118 additions and 7 deletions

40
rust/Cargo.lock generated
View File

@@ -1111,6 +1111,7 @@ dependencies = [
"firezone-tunnel",
"ip_network",
"phoenix-channel",
"rayon",
"secrecy",
"serde",
"serde_json",
@@ -1268,6 +1269,25 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-deque"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51"
dependencies = [
"crossbeam-epoch",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.20"
@@ -5118,6 +5138,26 @@ version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20675572f6f24e9e76ef639bc5552774ed45f1c30e2951e1e99c59888861c539"
[[package]]
name = "rayon"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa"
dependencies = [
"either",
"rayon-core",
]
[[package]]
name = "rayon-core"
version = "1.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2"
dependencies = [
"crossbeam-deque",
"crossbeam-utils",
]
[[package]]
name = "redox_syscall"
version = "0.5.7"

View File

@@ -89,6 +89,7 @@ quinn-udp = { version = "0.5.8", features = ["fast-apple-datapath"] }
rand = "0.8.5"
rand_core = "0.6.4"
rangemap = "1.5.1"
rayon = "1.10.0"
reqwest = { version = "0.12.9", default-features = false }
rtnetlink = { version = "0.14.1", default-features = false, features = ["tokio_socket"] }
rustls = { version = "0.23.21", default-features = false, features = ["ring"] }

View File

@@ -246,7 +246,7 @@ impl Callbacks for CallbackHandler {
.expect("onUpdateResources callback failed")
}
fn on_disconnect(&self, error: &DisconnectError) {
fn on_disconnect(&self, error: DisconnectError) {
self.env(|mut env| {
let error = env
.new_string(serde_json::to_string(&error.to_string())?)

View File

@@ -163,7 +163,7 @@ impl Callbacks for CallbackHandler {
self.inner.on_update_resources(resource_list);
}
fn on_disconnect(&self, error: &DisconnectError) {
fn on_disconnect(&self, error: DisconnectError) {
self.inner.on_disconnect(error.to_string());
}
}

View File

@@ -13,6 +13,7 @@ firezone-logging = { workspace = true }
firezone-tunnel = { workspace = true }
ip_network = { workspace = true }
phoenix-channel = { workspace = true }
rayon = { workspace = true }
secrecy = { workspace = true }
serde = { workspace = true, features = ["std", "derive"] }
snownet = { workspace = true }

View File

@@ -1,6 +1,9 @@
use connlib_model::ResourceView;
use ip_network::{Ipv4Network, Ipv6Network};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::{
net::{IpAddr, Ipv4Addr, Ipv6Addr},
sync::Arc,
};
/// Traits that will be used by connlib to callback the client upper layers.
pub trait Callbacks: Clone + Send + Sync {
@@ -27,7 +30,7 @@ pub trait Callbacks: Clone + Send + Sync {
fn on_update_resources(&self, _: Vec<ResourceView>) {}
/// Called when the tunnel is disconnected.
fn on_disconnect(&self, _: &DisconnectError) {}
fn on_disconnect(&self, _: DisconnectError) {}
}
/// Unified error type to use across connlib.
@@ -49,3 +52,66 @@ impl DisconnectError {
e.is_authentication_error()
}
}
#[derive(Debug, Clone)]
pub struct BackgroundCallbacks<C> {
inner: C,
threadpool: Arc<rayon::ThreadPool>,
}
impl<C> BackgroundCallbacks<C> {
pub fn new(callbacks: C) -> Self {
Self {
inner: callbacks,
threadpool: Arc::new(
rayon::ThreadPoolBuilder::new()
.num_threads(1)
.thread_name(|_| "connlib callbacks".to_owned())
.build()
.expect("Unable to create thread-pool"),
),
}
}
}
impl<C> Callbacks for BackgroundCallbacks<C>
where
C: Callbacks + 'static,
{
fn on_set_interface_config(
&self,
ipv4_addr: Ipv4Addr,
ipv6_addr: Ipv6Addr,
dns_addresses: Vec<IpAddr>,
route_list_4: Vec<Ipv4Network>,
route_list_6: Vec<Ipv6Network>,
) {
let callbacks = self.inner.clone();
self.threadpool.spawn(move || {
callbacks.on_set_interface_config(
ipv4_addr,
ipv6_addr,
dns_addresses,
route_list_4,
route_list_6,
);
});
}
fn on_update_resources(&self, resources: Vec<ResourceView>) {
let callbacks = self.inner.clone();
self.threadpool.spawn(move || {
callbacks.on_update_resources(resources);
});
}
fn on_disconnect(&self, error: DisconnectError) {
let callbacks = self.inner.clone();
self.threadpool.spawn(move || {
callbacks.on_disconnect(error);
});
}
}

View File

@@ -1,5 +1,6 @@
//! Main connlib library for clients.
pub use crate::serde_routelist::{V4RouteList, V6RouteList};
use callbacks::BackgroundCallbacks;
pub use callbacks::{Callbacks, DisconnectError};
pub use connlib_model::StaticSecret;
pub use eventloop::Eventloop;
@@ -42,6 +43,8 @@ impl Session {
portal: PhoenixChannel<(), IngressMessages, (), PublicKeyParam>,
handle: tokio::runtime::Handle,
) -> Self {
let callbacks = BackgroundCallbacks::new(callbacks); // Run all callbacks on a background thread to avoid blocking the main connlib task.
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let connect_handle = handle.spawn(connect(
@@ -141,7 +144,7 @@ async fn connect_supervisor<CB>(
Ok(Ok(())) => {
tracing::info!("connlib exited gracefully");
}
Ok(Err(e)) => callbacks.on_disconnect(&DisconnectError::PortalConnectionFailed(e)),
Err(e) => callbacks.on_disconnect(&DisconnectError::Crash(e)),
Ok(Err(e)) => callbacks.on_disconnect(DisconnectError::PortalConnectionFailed(e)),
Err(e) => callbacks.on_disconnect(DisconnectError::Crash(e)),
}
}

View File

@@ -97,7 +97,7 @@ pub struct CallbackHandler {
}
impl Callbacks for CallbackHandler {
fn on_disconnect(&self, error: &connlib_client_shared::DisconnectError) {
fn on_disconnect(&self, error: connlib_client_shared::DisconnectError) {
self.cb_tx
.try_send(ConnlibMsg::OnDisconnect {
error_msg: error.to_string(),