fix(tauri_client/windows): close and re-open the named pipe properly, and back off if needed (#5156)

Closes #5143 

The initial half-second backoff should typically be enough, and if the
user is manually re-opening the GUI after a GUI crash, I don't think
they'll notice. If they do, they can open the GUI again and it should
all work.
This commit is contained in:
Reactor Scram
2024-06-05 15:32:00 -05:00
committed by GitHub
parent d561e0ee0d
commit f161fd290e
3 changed files with 118 additions and 15 deletions

View File

@@ -422,7 +422,10 @@ async fn ipc_listen() -> Result<std::convert::Infallible> {
let mut server = platform::IpcServer::new().await?;
loop {
dns_control::deactivate()?;
let stream = server.next_client().await?;
let stream = server
.next_client()
.await
.context("Failed to wait for incoming IPC connection from a GUI")?;
if let Err(error) = handle_ipc_client(stream).await {
tracing::error!(?error, "Error while handling IPC client");
}
@@ -580,8 +583,10 @@ pub fn debug_command_setup() -> Result<()> {
#[cfg(test)]
mod tests {
use super::{Cli, CliIpcService, CmdIpc};
use anyhow::Context as _;
use clap::Parser;
use std::path::PathBuf;
use std::{path::PathBuf, time::Duration};
use tokio::time::timeout;
use url::Url;
// Can't remember how Clap works sometimes
@@ -592,6 +597,7 @@ mod tests {
let actual = Cli::parse_from([exe_name, "--api-url", "wss://api.firez.one"]);
assert_eq!(actual.api_url, Url::parse("wss://api.firez.one")?);
assert!(!actual.check);
let actual = Cli::parse_from([exe_name, "--check", "--log-dir", "bogus_log_dir"]);
assert!(actual.check);
@@ -611,4 +617,24 @@ mod tests {
Ok(())
}
/// Replicate #5143
///
/// When the IPC service has disconnected from a GUI and loops over, sometimes
/// the named pipe is not ready. If our IPC code doesn't handle this right,
/// this test will fail.
#[tokio::test]
async fn ipc_server() -> anyhow::Result<()> {
let _ = tracing_subscriber::fmt().with_test_writer().try_init();
let mut server = crate::platform::IpcServer::new_for_test().await?;
for i in 0..5 {
if let Ok(Err(err)) = timeout(Duration::from_secs(1), server.next_client()).await {
Err(err).with_context(|| {
format!("Couldn't listen for next IPC client, iteration {i}")
})?;
}
}
Ok(())
}
}

View File

@@ -126,12 +126,28 @@ pub(crate) type IpcStream = UnixStream;
impl IpcServer {
/// Platform-specific setup
pub(crate) async fn new() -> Result<Self> {
Self::new_with_path(&sock_path()).await
}
/// Uses a test path instead of what prod uses
///
/// The test path doesn't need admin powers and won't conflict with the prod
/// IPC service on a dev machine.
#[cfg(test)]
pub(crate) async fn new_for_test() -> Result<Self> {
let dir = crate::known_dirs::runtime().context("Can't find runtime dir")?;
// On a CI runner, the dir might not exist yet
tokio::fs::create_dir_all(&dir).await?;
let sock_path = dir.join("ipc_test.sock");
Self::new_with_path(&sock_path).await
}
async fn new_with_path(sock_path: &Path) -> Result<Self> {
// Remove the socket if a previous run left it there
let sock_path = sock_path();
tokio::fs::remove_file(&sock_path).await.ok();
let listener = UnixListener::bind(&sock_path).context("Couldn't bind UDS")?;
tokio::fs::remove_file(sock_path).await.ok();
let listener = UnixListener::bind(sock_path).context("Couldn't bind UDS")?;
let perms = std::fs::Permissions::from_mode(0o660);
std::fs::set_permissions(sock_path, perms)?;
tokio::fs::set_permissions(sock_path, perms).await?;
sd_notify::notify(true, &[sd_notify::NotifyState::Ready])?;
Ok(Self { listener })
}

View File

@@ -5,7 +5,7 @@
//! We must tell Windows explicitly when our service is stopping.
use crate::{CliCommon, SignalKind};
use anyhow::{Context as _, Result};
use anyhow::{bail, Context as _, Result};
use connlib_client_shared::file_logger;
use connlib_shared::BUNDLE_ID;
use std::{
@@ -174,7 +174,7 @@ fn fallible_windows_service_run(arguments: Vec<OsString>) -> Result<()> {
}
pub(crate) struct IpcServer {
// On Linux this has some fields
pipe_path: String,
}
/// Opaque wrapper around platform-specific IPC stream
@@ -186,12 +186,39 @@ impl IpcServer {
/// This is async on Linux
#[allow(clippy::unused_async)]
pub(crate) async fn new() -> Result<Self> {
setup_before_connlib()?;
Ok(Self {})
Self::new_with_path(pipe_path())
}
/// Uses a test path instead of what prod uses
///
/// The test path doesn't need admin powers and won't conflict with the prod
/// IPC service on a dev machine.
///
/// This is async on Linux
#[allow(clippy::unused_async)]
#[cfg(test)]
pub(crate) async fn new_for_test() -> Result<Self> {
let pipe_path = named_pipe_path(&format!("{BUNDLE_ID}_test.ipc_service"));
Self::new_with_path(pipe_path)
}
pub(crate) fn new_with_path(pipe_path: String) -> Result<Self> {
setup_before_connlib()?;
Ok(Self { pipe_path })
}
// `&mut self` needed to match the Linux signature
pub(crate) async fn next_client(&mut self) -> Result<IpcStream> {
let server = create_pipe_server()?;
// Fixes #5143. In the IPC service, if we close the pipe and immediately re-open
// it, Tokio may not get a chance to clean up the pipe. Yielding seems to fix
// this in tests, but `yield_now` doesn't make any such guarantees, so
// we also do a loop.
tokio::task::yield_now().await;
let server = self
.bind_to_pipe()
.await
.context("Couldn't bind to named pipe")?;
tracing::info!("Listening for GUI to connect over IPC...");
server
.connect()
@@ -199,9 +226,35 @@ impl IpcServer {
.context("Couldn't accept IPC connection from GUI")?;
Ok(server)
}
async fn bind_to_pipe(&self) -> Result<IpcStream> {
const NUM_ITERS: usize = 10;
// This loop is defense-in-depth. The `yield_now` in `next_client` is enough
// to fix #5143, but Tokio doesn't guarantee any behavior when yielding, so
// the loop will catch it even if yielding doesn't.
for i in 0..NUM_ITERS {
match create_pipe_server(&self.pipe_path) {
Ok(server) => return Ok(server),
Err(PipeError::AccessDenied) => {
tracing::warn!("PipeError::AccessDenied, sleeping... (loop {i})");
tokio::time::sleep(Duration::from_secs(1)).await;
}
Err(error) => Err(error)?,
}
}
bail!("Tried {NUM_ITERS} times to bind the pipe and failed");
}
}
fn create_pipe_server() -> Result<named_pipe::NamedPipeServer> {
#[derive(Debug, thiserror::Error)]
enum PipeError {
#[error("Access denied - Is another process using this pipe path?")]
AccessDenied,
#[error(transparent)]
Other(#[from] anyhow::Error),
}
fn create_pipe_server(pipe_path: &str) -> Result<named_pipe::NamedPipeServer, PipeError> {
let mut server_options = named_pipe::ServerOptions::new();
server_options.first_pipe_instance(true);
@@ -231,9 +284,17 @@ fn create_pipe_server() -> Result<named_pipe::NamedPipeServer> {
let sa_ptr = &mut sa as *mut _ as *mut c_void;
// SAFETY: Unsafe needed to call Win32 API. We only pass pointers to local vars, and Win32 shouldn't store them, so there shouldn't be any threading of lifetime problems.
let server = unsafe { server_options.create_with_security_attributes_raw(pipe_path(), sa_ptr) }
.context("Failed to listen on named pipe")?;
Ok(server)
match unsafe { server_options.create_with_security_attributes_raw(pipe_path, sa_ptr) } {
Ok(x) => Ok(x),
Err(err) => {
if err.kind() == std::io::ErrorKind::PermissionDenied {
tracing::warn!(?pipe_path, "Named pipe `PermissionDenied`");
Err(PipeError::AccessDenied)
} else {
Err(anyhow::Error::from(err).into())
}
}
}
}
/// Named pipe for IPC between GUI client and IPC service