mirror of
https://github.com/outbackdingo/firezone.git
synced 2026-01-27 10:18:54 +00:00
fix(rust): don't block runtime shutdown (#10204)
By default, dropping a `tokio` runtime waits until all tasks have finished. The tasks we spawn within `connlib` can have complex dependencies with each other. To ensure that we can shut down in any case and don't hang, we apply a timeout of 1s to the runtime.
This commit is contained in:
@@ -118,7 +118,7 @@ mod ffi {
|
||||
/// This is used by the apple client to interact with our code.
|
||||
pub struct WrappedSession {
|
||||
inner: Session,
|
||||
runtime: Runtime,
|
||||
runtime: Option<Runtime>,
|
||||
|
||||
telemetry: Telemetry,
|
||||
}
|
||||
@@ -344,7 +344,7 @@ impl WrappedSession {
|
||||
|
||||
Ok(Self {
|
||||
inner: session,
|
||||
runtime,
|
||||
runtime: Some(runtime),
|
||||
telemetry,
|
||||
})
|
||||
}
|
||||
@@ -386,7 +386,12 @@ impl WrappedSession {
|
||||
|
||||
impl Drop for WrappedSession {
|
||||
fn drop(&mut self) {
|
||||
self.runtime.block_on(self.telemetry.stop());
|
||||
let Some(runtime) = self.runtime.take() else {
|
||||
return;
|
||||
};
|
||||
|
||||
runtime.block_on(self.telemetry.stop());
|
||||
runtime.shutdown_timeout(Duration::from_secs(1)); // Ensure we don't block forever on a task in the blocking pool.
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ use std::{
|
||||
os::fd::{AsRawFd as _, RawFd},
|
||||
path::{Path, PathBuf},
|
||||
sync::{Arc, OnceLock},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use anyhow::{Context as _, Result};
|
||||
@@ -26,7 +27,7 @@ pub struct Session {
|
||||
inner: client_shared::Session,
|
||||
events: Mutex<client_shared::EventStream>,
|
||||
telemetry: Mutex<Telemetry>,
|
||||
runtime: tokio::runtime::Runtime,
|
||||
runtime: Option<tokio::runtime::Runtime>,
|
||||
}
|
||||
|
||||
#[derive(uniffi::Object, thiserror::Error, Debug)]
|
||||
@@ -113,11 +114,15 @@ impl Session {
|
||||
)
|
||||
}
|
||||
|
||||
pub fn disconnect(&self) {
|
||||
self.runtime.block_on(async {
|
||||
pub fn disconnect(&self) -> Result<(), Error> {
|
||||
let runtime = self.runtime.as_ref().context("No runtime")?;
|
||||
|
||||
runtime.block_on(async {
|
||||
self.telemetry.lock().await.stop().await;
|
||||
});
|
||||
self.inner.stop();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn set_disabled_resources(&self, disabled_resources: String) -> Result<(), Error> {
|
||||
@@ -153,7 +158,7 @@ impl Session {
|
||||
}
|
||||
|
||||
pub fn set_tun(&self, fd: RawFd) -> Result<(), Error> {
|
||||
let _guard = self.runtime.enter();
|
||||
let _guard = self.runtime.as_ref().context("No runtime")?.enter();
|
||||
// SAFETY: FD must be open.
|
||||
let tun = unsafe { platform::Tun::from_fd(fd).context("Failed to create new Tun")? };
|
||||
|
||||
@@ -203,8 +208,12 @@ impl Session {
|
||||
|
||||
impl Drop for Session {
|
||||
fn drop(&mut self) {
|
||||
self.runtime
|
||||
.block_on(async { self.telemetry.lock().await.stop_on_crash().await })
|
||||
let Some(runtime) = self.runtime.take() else {
|
||||
return;
|
||||
};
|
||||
|
||||
runtime.block_on(async { self.telemetry.lock().await.stop_on_crash().await });
|
||||
runtime.shutdown_timeout(Duration::from_secs(1)); // Ensure we don't block forever on a task in the blocking pool.
|
||||
}
|
||||
}
|
||||
|
||||
@@ -279,7 +288,7 @@ fn connect(
|
||||
inner: session,
|
||||
events: Mutex::new(events),
|
||||
telemetry: Mutex::new(telemetry),
|
||||
runtime,
|
||||
runtime: Some(runtime),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::path::PathBuf;
|
||||
use std::{path::PathBuf, time::Duration};
|
||||
|
||||
use anyhow::{Result, bail};
|
||||
use firezone_bin_shared::{DnsControlMethod, signals};
|
||||
@@ -22,7 +22,11 @@ pub fn run(log_dir: Option<PathBuf>, dns_control: DnsControlMethod) -> Result<()
|
||||
&log_filter_reloader,
|
||||
&mut signals,
|
||||
))
|
||||
.inspect_err(|e| tracing::error!("IPC service failed: {e:#}"))
|
||||
.inspect_err(|e| tracing::error!("IPC service failed: {e:#}"))?;
|
||||
|
||||
rt.shutdown_timeout(Duration::from_secs(1)); // Ensure we don't block forever on a task in the blocking pool.
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns true if the Tunnel service can run properly
|
||||
|
||||
@@ -298,6 +298,8 @@ fn run_service(arguments: Vec<OsString>) {
|
||||
wait_hint: Duration::default(),
|
||||
process_id: None,
|
||||
});
|
||||
|
||||
rt.shutdown_timeout(Duration::from_secs(1)); // Ensure we don't block forever on a task in the blocking pool.
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -20,6 +20,7 @@ use secrecy::{Secret, SecretString};
|
||||
use std::{
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::time::Instant;
|
||||
|
||||
@@ -355,7 +356,11 @@ fn main() -> Result<()> {
|
||||
drop(session);
|
||||
|
||||
result
|
||||
})
|
||||
})?;
|
||||
|
||||
rt.shutdown_timeout(Duration::from_secs(1));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Read the token from disk if it was not in the environment
|
||||
|
||||
Reference in New Issue
Block a user