From 215883caecb1f74749ca6352d3ccb28b46638614 Mon Sep 17 00:00:00 2001 From: Reactor Scram Date: Mon, 22 Jan 2024 15:48:28 -0600 Subject: [PATCH] test(windows): multi-process test for IPC (#3310) I tested this by temporarily putting panics in `test_ipc_manager` and `test_ipc_worker`. It looks like, if a process crashes, Windows will clean up its named pipe, and the process waiting on the other side of the named pipe will get an error. This is good but it's not air-tight - ~~We could still have a situation where a worker process locks up, and the main process crashes, and the worker process then leaks.~~ #3311 will fix that For that case I'll try this https://stackoverflow.com/questions/53208/how-do-i-automatically-destroy-child-processes-in-windows --------- Signed-off-by: Reactor Scram --- .github/workflows/_rust.yml | 8 + .../src-tauri/src/client/debug_commands.rs | 30 +- .../src-tauri/src/client/ipc.rs | 393 ++++++++++++------ 3 files changed, 289 insertions(+), 142 deletions(-) diff --git a/.github/workflows/_rust.yml b/.github/workflows/_rust.yml index ac29f08e2..825158285 100644 --- a/.github/workflows/_rust.yml +++ b/.github/workflows/_rust.yml @@ -90,6 +90,14 @@ jobs: run: | pnpm install -g @tauri-apps/cli + # Set logs to debug + $env:RUST_LOG = "debug" + + # I'm running the multi-process test here because I don't think it can be + # embedded in a test binary. It requires the client to call subcommands + # from its own exe. + cargo run -p firezone-windows-client -- debug test-ipc + # PNPM installs tauri-cli to somewhere in $PATH tauri build diff --git a/rust/windows-client/src-tauri/src/client/debug_commands.rs b/rust/windows-client/src-tauri/src/client/debug_commands.rs index f289c251f..c2611d709 100644 --- a/rust/windows-client/src-tauri/src/client/debug_commands.rs +++ b/rust/windows-client/src-tauri/src/client/debug_commands.rs @@ -1,6 +1,7 @@ //! CLI subcommands used to test features / dependencies before integrating //! them with the GUI, or to exercise features programmatically. +use crate::client; use anyhow::Result; #[derive(clap::Subcommand)] @@ -8,37 +9,26 @@ pub enum Cmd { Crash, Hostname, NetworkChanges, - Test { + TestIpc { #[command(subcommand)] - command: Test, + cmd: Option, }, Wintun, } -#[derive(clap::Subcommand)] -pub enum Test { - Ipc, -} - pub fn run(cmd: Cmd) -> Result<()> { match cmd { Cmd::Crash => crash(), Cmd::Hostname => hostname(), - Cmd::NetworkChanges => crate::client::network_changes::run_debug(), - Cmd::Test { command } => run_test(command), + Cmd::NetworkChanges => client::network_changes::run_debug(), + Cmd::TestIpc { cmd } => client::ipc::test_subcommand(cmd), Cmd::Wintun => wintun(), } } -fn run_test(cmd: Test) -> Result<()> { - match cmd { - Test::Ipc => test_ipc(), - } -} - fn crash() -> Result<()> { // `_` doesn't seem to work here, the log files end up empty - let _handles = crate::client::logging::setup("debug")?; + let _handles = client::logging::setup("debug")?; tracing::info!("started log (DebugCrash)"); panic!("purposely crashing to see if it shows up in logs"); @@ -52,16 +42,10 @@ fn hostname() -> Result<()> { Ok(()) } -fn test_ipc() -> Result<()> { - // TODO: Add multi-process IPC test here. This would be difficult to implement - // idiomatically with `cargo test` alone. - Ok(()) -} - fn wintun() -> Result<()> { tracing_subscriber::fmt::init(); - if crate::client::elevation::check()? { + if client::elevation::check()? { tracing::info!("Elevated"); } else { tracing::warn!("Not elevated") diff --git a/rust/windows-client/src-tauri/src/client/ipc.rs b/rust/windows-client/src-tauri/src/client/ipc.rs index 2d43761dd..118bb02d6 100755 --- a/rust/windows-client/src-tauri/src/client/ipc.rs +++ b/rust/windows-client/src-tauri/src/client/ipc.rs @@ -1,125 +1,280 @@ //! Inter-process communication for the connlib subprocess +//! +//! To run the unit tests and multi-process tests, use +//! ```bash +//! cargo test --all-features -p firezone-windows-client && \ +//! RUST_LOG=debug cargo run -p firezone-windows-client debug test-ipc +//! ``` + +use anyhow::{Context, Result}; +use connlib_client_shared::ResourceDescription; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use std::{ + marker::Unpin, + process::{self, Child}, + time::Duration, +}; +use tokio::{ + io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, + net::windows::named_pipe, +}; + +#[derive(clap::Subcommand)] +pub enum Subcommand { + Manager { pipe_id: String }, + Worker { pipe_id: String }, +} + +pub fn test_subcommand(cmd: Option) -> Result<()> { + let rt = tokio::runtime::Runtime::new()?; + rt.block_on(async move { + match cmd { + None => test_harness(), + Some(Subcommand::Manager { pipe_id }) => test_manager_process(pipe_id).await, + Some(Subcommand::Worker { pipe_id }) => test_worker_process(pipe_id).await, + } + })?; + Ok(()) +} + +fn test_harness() -> Result<()> { + tracing_subscriber::fmt::init(); + + let id = random_pipe_id(); + + let _manager = SubcommandChild::new(&["debug", "test-ipc", "manager", &id]); + let _worker = SubcommandChild::new(&["debug", "test-ipc", "worker", &id]); + + std::thread::sleep(Duration::from_secs(10)); + Ok(()) +} + +async fn test_manager_process(pipe_id: String) -> Result<()> { + tracing_subscriber::fmt::init(); + let server = UnconnectedServer::new_with_id(&pipe_id)?; + + // TODO: The real manager would spawn the worker subprocess here, but + // for this case, the test harness spawns it for us. + + let mut server = server.connect().await?; + + let start_time = std::time::Instant::now(); + assert_eq!(server.request(Request::Connect).await?, Response::Connected); + assert_eq!( + server.request(Request::AwaitCallback).await?, + Response::CallbackOnUpdateResources(vec![]) + ); + assert_eq!( + server.request(Request::Disconnect).await?, + Response::Disconnected + ); + + let elapsed = start_time.elapsed(); + assert!( + elapsed < std::time::Duration::from_millis(6), + "{:?}", + elapsed + ); + tracing::info!(?elapsed, "made 3 IPC requests"); + + Ok(()) +} + +async fn test_worker_process(pipe_id: String) -> Result<()> { + tracing_subscriber::fmt::init(); + let mut client = Client::new(&pipe_id)?; + // panic!("Pretending the worker crashed right after connecting"); + + // Handle requests from the main process + loop { + let (req, responder) = client.next_request().await?; + let resp = match &req { + Request::AwaitCallback => Response::CallbackOnUpdateResources(vec![]), + Request::Connect => Response::Connected, + Request::Disconnect => Response::Disconnected, + }; + responder.respond(resp).await?; + + if let Request::Disconnect = req { + break; + } + } + + Ok(()) +} + +/// Returns a random valid named pipe ID based on a UUIDv4 +/// +/// e.g. "\\.\pipe\dev.firezone.client\9508e87c-1c92-4630-bb20-839325d169bd" +/// +/// Normally you don't need to call this directly. Tests may need it to inject +/// a known pipe ID into a process controlled by the test. +fn random_pipe_id() -> String { + format!(r"\\.\pipe\dev.firezone.client\{}", uuid::Uuid::new_v4()) +} + +/// A server that accepts only one client +pub(crate) struct UnconnectedServer { + pipe: named_pipe::NamedPipeServer, +} + +impl UnconnectedServer { + /// Requires a Tokio context + /// + /// Will be used for production code soon + pub fn _new() -> anyhow::Result<(Self, String)> { + let id = random_pipe_id(); + let this = Self::new_with_id(&id)?; + Ok((this, id)) + } + + fn new_with_id(id: &str) -> anyhow::Result { + let pipe = named_pipe::ServerOptions::new() + .first_pipe_instance(true) + .create(id)?; + + Ok(Self { pipe }) + } + + pub async fn connect(self) -> anyhow::Result { + self.pipe.connect().await?; + Ok(Server { pipe: self.pipe }) + } +} + +/// A server that's connected to a client +pub(crate) struct Server { + pipe: named_pipe::NamedPipeServer, +} + +/// A client that's connected to a server +pub(crate) struct Client { + pipe: named_pipe::NamedPipeClient, +} + +#[derive(Deserialize, Serialize)] +pub(crate) enum Request { + AwaitCallback, + Connect, + Disconnect, +} + +#[derive(Debug, Deserialize, PartialEq, Serialize)] +pub(crate) enum Response { + CallbackOnUpdateResources(Vec), + Connected, + Disconnected, +} + +#[must_use] +pub(crate) struct Responder<'a> { + client: &'a mut Client, +} + +impl Server { + pub async fn request(&mut self, req: Request) -> Result { + write_bincode(&mut self.pipe, &req) + .await + .context("couldn't send request")?; + read_bincode(&mut self.pipe) + .await + .context("couldn't read response") + } +} + +impl Client { + /// Creates a `Client`. Requires a Tokio context + pub fn new(server_id: &str) -> Result { + let pipe = named_pipe::ClientOptions::new().open(server_id)?; + Ok(Self { pipe }) + } + + pub async fn next_request(&mut self) -> Result<(Request, Responder)> { + let req = read_bincode(&mut self.pipe).await?; + let responder = Responder { client: self }; + Ok((req, responder)) + } +} + +impl<'a> Responder<'a> { + pub async fn respond(self, resp: Response) -> Result<()> { + write_bincode(&mut self.client.pipe, &resp).await?; + Ok(()) + } +} + +/// Reads a message from an async reader, with a 32-bit little-endian length prefix +async fn read_bincode(reader: &mut R) -> Result { + let mut len_buf = [0u8; 4]; + reader.read_exact(&mut len_buf).await?; + let len = u32::from_le_bytes(len_buf); + let mut buf = vec![0u8; usize::try_from(len)?]; + reader.read_exact(&mut buf).await?; + let msg = bincode::deserialize(&buf)?; + Ok(msg) +} + +/// Writes a message to an async writer, with a 32-bit little-endian length prefix +async fn write_bincode(writer: &mut W, msg: &T) -> Result<()> { + let buf = bincode::serialize(msg)?; + let len = u32::try_from(buf.len())?.to_le_bytes(); + writer.write_all(&len).await?; + writer.write_all(&buf).await?; + Ok(()) +} + +/// `std::process::Child` but for a subcommand running from the same exe as +/// the current process. +/// +/// Unlike `std::process::Child`, `Drop` tries to join the process, and kills it +/// if it can't. +pub(crate) struct SubcommandChild { + process: Child, +} + +impl SubcommandChild { + /// Launches the current exe as a subprocess with new arguments + /// + /// # Parameters + /// + /// * `args` - e.g. `["debug", "test", "ipc-worker"]` + pub fn new(args: &[&str]) -> Result { + let mut process = process::Command::new(std::env::current_exe()?); + for arg in args { + process.arg(arg); + } + let process = process.spawn()?; + Ok(SubcommandChild { process }) + } + + /// Joins the subprocess, returning an error if the process doesn't stop + pub fn wait_or_kill(&mut self) -> Result<()> { + if let Ok(Some(status)) = self.process.try_wait() { + if status.success() { + tracing::info!("process exited with success code"); + } else { + tracing::warn!("process exited with non-success code"); + } + } else { + self.process.kill()?; + tracing::error!("process was killed"); + } + Ok(()) + } +} + +impl Drop for SubcommandChild { + fn drop(&mut self) { + if let Err(error) = self.wait_or_kill() { + tracing::error!(?error, "SubcommandChild could not be joined or killed"); + } + } +} #[cfg(test)] mod tests { - use connlib_client_shared::ResourceDescription; - use serde::{de::DeserializeOwned, Deserialize, Serialize}; - use std::marker::Unpin; - use tokio::{ - io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, - net::windows::named_pipe, - runtime::Runtime, - }; - - /// Returns a random valid named pipe ID based on a UUIDv4 - /// - /// e.g. "\\.\pipe\dev.firezone.client\9508e87c-1c92-4630-bb20-839325d169bd" - fn random_pipe_id() -> String { - format!(r"\\.\pipe\dev.firezone.client\{}", uuid::Uuid::new_v4()) - } - - /// A server that accepts only one client - struct UnconnectedServer { - pipe: named_pipe::NamedPipeServer, - } - - /// A server that's connected to a client - struct Server { - pipe: named_pipe::NamedPipeServer, - } - - /// A client that's connected to a server - struct Client { - pipe: named_pipe::NamedPipeClient, - } - - #[derive(Deserialize, Serialize)] - enum Request { - AwaitCallback, - Connect, - Disconnect, - } - - #[derive(Debug, Deserialize, PartialEq, Serialize)] - enum Response { - CallbackOnUpdateResources(Vec), - Connected, - Disconnected, - } - - #[must_use] - struct Responder<'a> { - client: &'a mut Client, - } - - impl UnconnectedServer { - pub fn new() -> anyhow::Result<(Self, String)> { - let id = random_pipe_id(); - let pipe = named_pipe::ServerOptions::new() - .first_pipe_instance(true) - .create(&id)?; - - let this = Self { pipe }; - Ok((this, id)) - } - - pub async fn connect(self) -> anyhow::Result { - self.pipe.connect().await?; - Ok(Server { pipe: self.pipe }) - } - } - - /// Reads a message from an async reader, with a 32-bit little-endian length prefix - async fn read_bincode( - reader: &mut R, - ) -> anyhow::Result { - let mut len_buf = [0u8; 4]; - reader.read_exact(&mut len_buf).await?; - let len = u32::from_le_bytes(len_buf); - let mut buf = vec![0u8; usize::try_from(len)?]; - reader.read_exact(&mut buf).await?; - let msg = bincode::deserialize(&buf)?; - Ok(msg) - } - - /// Writes a message to an async writer, with a 32-bit little-endian length prefix - async fn write_bincode( - writer: &mut W, - msg: &T, - ) -> anyhow::Result<()> { - let buf = bincode::serialize(msg)?; - let len = u32::try_from(buf.len())?.to_le_bytes(); - writer.write_all(&len).await?; - writer.write_all(&buf).await?; - Ok(()) - } - - impl Server { - pub async fn request(&mut self, req: Request) -> anyhow::Result { - write_bincode(&mut self.pipe, &req).await?; - read_bincode(&mut self.pipe).await - } - } - - impl Client { - pub fn new(server_id: &str) -> anyhow::Result { - let pipe = named_pipe::ClientOptions::new().open(server_id)?; - Ok(Self { pipe }) - } - - pub async fn next_request(&mut self) -> anyhow::Result<(Request, Responder)> { - let req = read_bincode(&mut self.pipe).await?; - let responder = Responder { client: self }; - Ok((req, responder)) - } - } - - impl<'a> Responder<'a> { - pub async fn respond(self, resp: Response) -> anyhow::Result<()> { - write_bincode(&mut self.client.pipe, &resp).await?; - Ok(()) - } - } + use super::*; + use tokio::runtime::Runtime; /// Test just the happy path /// It's hard to simulate a process crash because: @@ -132,7 +287,7 @@ mod tests { let rt = Runtime::new()?; rt.block_on(async move { // Pretend we're in the main process - let (server, server_id) = UnconnectedServer::new()?; + let (server, server_id) = UnconnectedServer::_new()?; let worker_task = tokio::spawn(async move { // Pretend we're in a worker process