diff --git a/rust/headless-client/src/lib.rs b/rust/headless-client/src/lib.rs index 26877af65..e62d3e920 100644 --- a/rust/headless-client/src/lib.rs +++ b/rust/headless-client/src/lib.rs @@ -422,7 +422,10 @@ async fn ipc_listen() -> Result { let mut server = platform::IpcServer::new().await?; loop { dns_control::deactivate()?; - let stream = server.next_client().await?; + let stream = server + .next_client() + .await + .context("Failed to wait for incoming IPC connection from a GUI")?; if let Err(error) = handle_ipc_client(stream).await { tracing::error!(?error, "Error while handling IPC client"); } @@ -580,8 +583,10 @@ pub fn debug_command_setup() -> Result<()> { #[cfg(test)] mod tests { use super::{Cli, CliIpcService, CmdIpc}; + use anyhow::Context as _; use clap::Parser; - use std::path::PathBuf; + use std::{path::PathBuf, time::Duration}; + use tokio::time::timeout; use url::Url; // Can't remember how Clap works sometimes @@ -592,6 +597,7 @@ mod tests { let actual = Cli::parse_from([exe_name, "--api-url", "wss://api.firez.one"]); assert_eq!(actual.api_url, Url::parse("wss://api.firez.one")?); + assert!(!actual.check); let actual = Cli::parse_from([exe_name, "--check", "--log-dir", "bogus_log_dir"]); assert!(actual.check); @@ -611,4 +617,24 @@ mod tests { Ok(()) } + + /// Replicate #5143 + /// + /// When the IPC service has disconnected from a GUI and loops over, sometimes + /// the named pipe is not ready. If our IPC code doesn't handle this right, + /// this test will fail. + #[tokio::test] + async fn ipc_server() -> anyhow::Result<()> { + let _ = tracing_subscriber::fmt().with_test_writer().try_init(); + + let mut server = crate::platform::IpcServer::new_for_test().await?; + for i in 0..5 { + if let Ok(Err(err)) = timeout(Duration::from_secs(1), server.next_client()).await { + Err(err).with_context(|| { + format!("Couldn't listen for next IPC client, iteration {i}") + })?; + } + } + Ok(()) + } } diff --git a/rust/headless-client/src/linux.rs b/rust/headless-client/src/linux.rs index f7ae75eca..4d074747e 100644 --- a/rust/headless-client/src/linux.rs +++ b/rust/headless-client/src/linux.rs @@ -126,12 +126,28 @@ pub(crate) type IpcStream = UnixStream; impl IpcServer { /// Platform-specific setup pub(crate) async fn new() -> Result { + Self::new_with_path(&sock_path()).await + } + + /// Uses a test path instead of what prod uses + /// + /// The test path doesn't need admin powers and won't conflict with the prod + /// IPC service on a dev machine. + #[cfg(test)] + pub(crate) async fn new_for_test() -> Result { + let dir = crate::known_dirs::runtime().context("Can't find runtime dir")?; + // On a CI runner, the dir might not exist yet + tokio::fs::create_dir_all(&dir).await?; + let sock_path = dir.join("ipc_test.sock"); + Self::new_with_path(&sock_path).await + } + + async fn new_with_path(sock_path: &Path) -> Result { // Remove the socket if a previous run left it there - let sock_path = sock_path(); - tokio::fs::remove_file(&sock_path).await.ok(); - let listener = UnixListener::bind(&sock_path).context("Couldn't bind UDS")?; + tokio::fs::remove_file(sock_path).await.ok(); + let listener = UnixListener::bind(sock_path).context("Couldn't bind UDS")?; let perms = std::fs::Permissions::from_mode(0o660); - std::fs::set_permissions(sock_path, perms)?; + tokio::fs::set_permissions(sock_path, perms).await?; sd_notify::notify(true, &[sd_notify::NotifyState::Ready])?; Ok(Self { listener }) } diff --git a/rust/headless-client/src/windows.rs b/rust/headless-client/src/windows.rs index 6441996a6..cbe699401 100644 --- a/rust/headless-client/src/windows.rs +++ b/rust/headless-client/src/windows.rs @@ -5,7 +5,7 @@ //! We must tell Windows explicitly when our service is stopping. use crate::{CliCommon, SignalKind}; -use anyhow::{Context as _, Result}; +use anyhow::{bail, Context as _, Result}; use connlib_client_shared::file_logger; use connlib_shared::BUNDLE_ID; use std::{ @@ -174,7 +174,7 @@ fn fallible_windows_service_run(arguments: Vec) -> Result<()> { } pub(crate) struct IpcServer { - // On Linux this has some fields + pipe_path: String, } /// Opaque wrapper around platform-specific IPC stream @@ -186,12 +186,39 @@ impl IpcServer { /// This is async on Linux #[allow(clippy::unused_async)] pub(crate) async fn new() -> Result { - setup_before_connlib()?; - Ok(Self {}) + Self::new_with_path(pipe_path()) } + /// Uses a test path instead of what prod uses + /// + /// The test path doesn't need admin powers and won't conflict with the prod + /// IPC service on a dev machine. + /// + /// This is async on Linux + #[allow(clippy::unused_async)] + #[cfg(test)] + pub(crate) async fn new_for_test() -> Result { + let pipe_path = named_pipe_path(&format!("{BUNDLE_ID}_test.ipc_service")); + Self::new_with_path(pipe_path) + } + + pub(crate) fn new_with_path(pipe_path: String) -> Result { + setup_before_connlib()?; + Ok(Self { pipe_path }) + } + + // `&mut self` needed to match the Linux signature pub(crate) async fn next_client(&mut self) -> Result { - let server = create_pipe_server()?; + // Fixes #5143. In the IPC service, if we close the pipe and immediately re-open + // it, Tokio may not get a chance to clean up the pipe. Yielding seems to fix + // this in tests, but `yield_now` doesn't make any such guarantees, so + // we also do a loop. + tokio::task::yield_now().await; + + let server = self + .bind_to_pipe() + .await + .context("Couldn't bind to named pipe")?; tracing::info!("Listening for GUI to connect over IPC..."); server .connect() @@ -199,9 +226,35 @@ impl IpcServer { .context("Couldn't accept IPC connection from GUI")?; Ok(server) } + + async fn bind_to_pipe(&self) -> Result { + const NUM_ITERS: usize = 10; + // This loop is defense-in-depth. The `yield_now` in `next_client` is enough + // to fix #5143, but Tokio doesn't guarantee any behavior when yielding, so + // the loop will catch it even if yielding doesn't. + for i in 0..NUM_ITERS { + match create_pipe_server(&self.pipe_path) { + Ok(server) => return Ok(server), + Err(PipeError::AccessDenied) => { + tracing::warn!("PipeError::AccessDenied, sleeping... (loop {i})"); + tokio::time::sleep(Duration::from_secs(1)).await; + } + Err(error) => Err(error)?, + } + } + bail!("Tried {NUM_ITERS} times to bind the pipe and failed"); + } } -fn create_pipe_server() -> Result { +#[derive(Debug, thiserror::Error)] +enum PipeError { + #[error("Access denied - Is another process using this pipe path?")] + AccessDenied, + #[error(transparent)] + Other(#[from] anyhow::Error), +} + +fn create_pipe_server(pipe_path: &str) -> Result { let mut server_options = named_pipe::ServerOptions::new(); server_options.first_pipe_instance(true); @@ -231,9 +284,17 @@ fn create_pipe_server() -> Result { let sa_ptr = &mut sa as *mut _ as *mut c_void; // SAFETY: Unsafe needed to call Win32 API. We only pass pointers to local vars, and Win32 shouldn't store them, so there shouldn't be any threading of lifetime problems. - let server = unsafe { server_options.create_with_security_attributes_raw(pipe_path(), sa_ptr) } - .context("Failed to listen on named pipe")?; - Ok(server) + match unsafe { server_options.create_with_security_attributes_raw(pipe_path, sa_ptr) } { + Ok(x) => Ok(x), + Err(err) => { + if err.kind() == std::io::ErrorKind::PermissionDenied { + tracing::warn!(?pipe_path, "Named pipe `PermissionDenied`"); + Err(PipeError::AccessDenied) + } else { + Err(anyhow::Error::from(err).into()) + } + } + } } /// Named pipe for IPC between GUI client and IPC service