diff --git a/.github/workflows/_rust.yml b/.github/workflows/_rust.yml index ac29f08e2..825158285 100644 --- a/.github/workflows/_rust.yml +++ b/.github/workflows/_rust.yml @@ -90,6 +90,14 @@ jobs: run: | pnpm install -g @tauri-apps/cli + # Set logs to debug + $env:RUST_LOG = "debug" + + # I'm running the multi-process test here because I don't think it can be + # embedded in a test binary. It requires the client to call subcommands + # from its own exe. + cargo run -p firezone-windows-client -- debug test-ipc + # PNPM installs tauri-cli to somewhere in $PATH tauri build diff --git a/rust/windows-client/src-tauri/src/client/debug_commands.rs b/rust/windows-client/src-tauri/src/client/debug_commands.rs index f289c251f..c2611d709 100644 --- a/rust/windows-client/src-tauri/src/client/debug_commands.rs +++ b/rust/windows-client/src-tauri/src/client/debug_commands.rs @@ -1,6 +1,7 @@ //! CLI subcommands used to test features / dependencies before integrating //! them with the GUI, or to exercise features programmatically. +use crate::client; use anyhow::Result; #[derive(clap::Subcommand)] @@ -8,37 +9,26 @@ pub enum Cmd { Crash, Hostname, NetworkChanges, - Test { + TestIpc { #[command(subcommand)] - command: Test, + cmd: Option, }, Wintun, } -#[derive(clap::Subcommand)] -pub enum Test { - Ipc, -} - pub fn run(cmd: Cmd) -> Result<()> { match cmd { Cmd::Crash => crash(), Cmd::Hostname => hostname(), - Cmd::NetworkChanges => crate::client::network_changes::run_debug(), - Cmd::Test { command } => run_test(command), + Cmd::NetworkChanges => client::network_changes::run_debug(), + Cmd::TestIpc { cmd } => client::ipc::test_subcommand(cmd), Cmd::Wintun => wintun(), } } -fn run_test(cmd: Test) -> Result<()> { - match cmd { - Test::Ipc => test_ipc(), - } -} - fn crash() -> Result<()> { // `_` doesn't seem to work here, the log files end up empty - let _handles = crate::client::logging::setup("debug")?; + let _handles = client::logging::setup("debug")?; tracing::info!("started log (DebugCrash)"); panic!("purposely crashing to see if it shows up in logs"); @@ -52,16 +42,10 @@ fn hostname() -> Result<()> { Ok(()) } -fn test_ipc() -> Result<()> { - // TODO: Add multi-process IPC test here. This would be difficult to implement - // idiomatically with `cargo test` alone. - Ok(()) -} - fn wintun() -> Result<()> { tracing_subscriber::fmt::init(); - if crate::client::elevation::check()? { + if client::elevation::check()? { tracing::info!("Elevated"); } else { tracing::warn!("Not elevated") diff --git a/rust/windows-client/src-tauri/src/client/ipc.rs b/rust/windows-client/src-tauri/src/client/ipc.rs index 2d43761dd..118bb02d6 100755 --- a/rust/windows-client/src-tauri/src/client/ipc.rs +++ b/rust/windows-client/src-tauri/src/client/ipc.rs @@ -1,125 +1,280 @@ //! Inter-process communication for the connlib subprocess +//! +//! To run the unit tests and multi-process tests, use +//! ```bash +//! cargo test --all-features -p firezone-windows-client && \ +//! RUST_LOG=debug cargo run -p firezone-windows-client debug test-ipc +//! ``` + +use anyhow::{Context, Result}; +use connlib_client_shared::ResourceDescription; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use std::{ + marker::Unpin, + process::{self, Child}, + time::Duration, +}; +use tokio::{ + io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, + net::windows::named_pipe, +}; + +#[derive(clap::Subcommand)] +pub enum Subcommand { + Manager { pipe_id: String }, + Worker { pipe_id: String }, +} + +pub fn test_subcommand(cmd: Option) -> Result<()> { + let rt = tokio::runtime::Runtime::new()?; + rt.block_on(async move { + match cmd { + None => test_harness(), + Some(Subcommand::Manager { pipe_id }) => test_manager_process(pipe_id).await, + Some(Subcommand::Worker { pipe_id }) => test_worker_process(pipe_id).await, + } + })?; + Ok(()) +} + +fn test_harness() -> Result<()> { + tracing_subscriber::fmt::init(); + + let id = random_pipe_id(); + + let _manager = SubcommandChild::new(&["debug", "test-ipc", "manager", &id]); + let _worker = SubcommandChild::new(&["debug", "test-ipc", "worker", &id]); + + std::thread::sleep(Duration::from_secs(10)); + Ok(()) +} + +async fn test_manager_process(pipe_id: String) -> Result<()> { + tracing_subscriber::fmt::init(); + let server = UnconnectedServer::new_with_id(&pipe_id)?; + + // TODO: The real manager would spawn the worker subprocess here, but + // for this case, the test harness spawns it for us. + + let mut server = server.connect().await?; + + let start_time = std::time::Instant::now(); + assert_eq!(server.request(Request::Connect).await?, Response::Connected); + assert_eq!( + server.request(Request::AwaitCallback).await?, + Response::CallbackOnUpdateResources(vec![]) + ); + assert_eq!( + server.request(Request::Disconnect).await?, + Response::Disconnected + ); + + let elapsed = start_time.elapsed(); + assert!( + elapsed < std::time::Duration::from_millis(6), + "{:?}", + elapsed + ); + tracing::info!(?elapsed, "made 3 IPC requests"); + + Ok(()) +} + +async fn test_worker_process(pipe_id: String) -> Result<()> { + tracing_subscriber::fmt::init(); + let mut client = Client::new(&pipe_id)?; + // panic!("Pretending the worker crashed right after connecting"); + + // Handle requests from the main process + loop { + let (req, responder) = client.next_request().await?; + let resp = match &req { + Request::AwaitCallback => Response::CallbackOnUpdateResources(vec![]), + Request::Connect => Response::Connected, + Request::Disconnect => Response::Disconnected, + }; + responder.respond(resp).await?; + + if let Request::Disconnect = req { + break; + } + } + + Ok(()) +} + +/// Returns a random valid named pipe ID based on a UUIDv4 +/// +/// e.g. "\\.\pipe\dev.firezone.client\9508e87c-1c92-4630-bb20-839325d169bd" +/// +/// Normally you don't need to call this directly. Tests may need it to inject +/// a known pipe ID into a process controlled by the test. +fn random_pipe_id() -> String { + format!(r"\\.\pipe\dev.firezone.client\{}", uuid::Uuid::new_v4()) +} + +/// A server that accepts only one client +pub(crate) struct UnconnectedServer { + pipe: named_pipe::NamedPipeServer, +} + +impl UnconnectedServer { + /// Requires a Tokio context + /// + /// Will be used for production code soon + pub fn _new() -> anyhow::Result<(Self, String)> { + let id = random_pipe_id(); + let this = Self::new_with_id(&id)?; + Ok((this, id)) + } + + fn new_with_id(id: &str) -> anyhow::Result { + let pipe = named_pipe::ServerOptions::new() + .first_pipe_instance(true) + .create(id)?; + + Ok(Self { pipe }) + } + + pub async fn connect(self) -> anyhow::Result { + self.pipe.connect().await?; + Ok(Server { pipe: self.pipe }) + } +} + +/// A server that's connected to a client +pub(crate) struct Server { + pipe: named_pipe::NamedPipeServer, +} + +/// A client that's connected to a server +pub(crate) struct Client { + pipe: named_pipe::NamedPipeClient, +} + +#[derive(Deserialize, Serialize)] +pub(crate) enum Request { + AwaitCallback, + Connect, + Disconnect, +} + +#[derive(Debug, Deserialize, PartialEq, Serialize)] +pub(crate) enum Response { + CallbackOnUpdateResources(Vec), + Connected, + Disconnected, +} + +#[must_use] +pub(crate) struct Responder<'a> { + client: &'a mut Client, +} + +impl Server { + pub async fn request(&mut self, req: Request) -> Result { + write_bincode(&mut self.pipe, &req) + .await + .context("couldn't send request")?; + read_bincode(&mut self.pipe) + .await + .context("couldn't read response") + } +} + +impl Client { + /// Creates a `Client`. Requires a Tokio context + pub fn new(server_id: &str) -> Result { + let pipe = named_pipe::ClientOptions::new().open(server_id)?; + Ok(Self { pipe }) + } + + pub async fn next_request(&mut self) -> Result<(Request, Responder)> { + let req = read_bincode(&mut self.pipe).await?; + let responder = Responder { client: self }; + Ok((req, responder)) + } +} + +impl<'a> Responder<'a> { + pub async fn respond(self, resp: Response) -> Result<()> { + write_bincode(&mut self.client.pipe, &resp).await?; + Ok(()) + } +} + +/// Reads a message from an async reader, with a 32-bit little-endian length prefix +async fn read_bincode(reader: &mut R) -> Result { + let mut len_buf = [0u8; 4]; + reader.read_exact(&mut len_buf).await?; + let len = u32::from_le_bytes(len_buf); + let mut buf = vec![0u8; usize::try_from(len)?]; + reader.read_exact(&mut buf).await?; + let msg = bincode::deserialize(&buf)?; + Ok(msg) +} + +/// Writes a message to an async writer, with a 32-bit little-endian length prefix +async fn write_bincode(writer: &mut W, msg: &T) -> Result<()> { + let buf = bincode::serialize(msg)?; + let len = u32::try_from(buf.len())?.to_le_bytes(); + writer.write_all(&len).await?; + writer.write_all(&buf).await?; + Ok(()) +} + +/// `std::process::Child` but for a subcommand running from the same exe as +/// the current process. +/// +/// Unlike `std::process::Child`, `Drop` tries to join the process, and kills it +/// if it can't. +pub(crate) struct SubcommandChild { + process: Child, +} + +impl SubcommandChild { + /// Launches the current exe as a subprocess with new arguments + /// + /// # Parameters + /// + /// * `args` - e.g. `["debug", "test", "ipc-worker"]` + pub fn new(args: &[&str]) -> Result { + let mut process = process::Command::new(std::env::current_exe()?); + for arg in args { + process.arg(arg); + } + let process = process.spawn()?; + Ok(SubcommandChild { process }) + } + + /// Joins the subprocess, returning an error if the process doesn't stop + pub fn wait_or_kill(&mut self) -> Result<()> { + if let Ok(Some(status)) = self.process.try_wait() { + if status.success() { + tracing::info!("process exited with success code"); + } else { + tracing::warn!("process exited with non-success code"); + } + } else { + self.process.kill()?; + tracing::error!("process was killed"); + } + Ok(()) + } +} + +impl Drop for SubcommandChild { + fn drop(&mut self) { + if let Err(error) = self.wait_or_kill() { + tracing::error!(?error, "SubcommandChild could not be joined or killed"); + } + } +} #[cfg(test)] mod tests { - use connlib_client_shared::ResourceDescription; - use serde::{de::DeserializeOwned, Deserialize, Serialize}; - use std::marker::Unpin; - use tokio::{ - io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, - net::windows::named_pipe, - runtime::Runtime, - }; - - /// Returns a random valid named pipe ID based on a UUIDv4 - /// - /// e.g. "\\.\pipe\dev.firezone.client\9508e87c-1c92-4630-bb20-839325d169bd" - fn random_pipe_id() -> String { - format!(r"\\.\pipe\dev.firezone.client\{}", uuid::Uuid::new_v4()) - } - - /// A server that accepts only one client - struct UnconnectedServer { - pipe: named_pipe::NamedPipeServer, - } - - /// A server that's connected to a client - struct Server { - pipe: named_pipe::NamedPipeServer, - } - - /// A client that's connected to a server - struct Client { - pipe: named_pipe::NamedPipeClient, - } - - #[derive(Deserialize, Serialize)] - enum Request { - AwaitCallback, - Connect, - Disconnect, - } - - #[derive(Debug, Deserialize, PartialEq, Serialize)] - enum Response { - CallbackOnUpdateResources(Vec), - Connected, - Disconnected, - } - - #[must_use] - struct Responder<'a> { - client: &'a mut Client, - } - - impl UnconnectedServer { - pub fn new() -> anyhow::Result<(Self, String)> { - let id = random_pipe_id(); - let pipe = named_pipe::ServerOptions::new() - .first_pipe_instance(true) - .create(&id)?; - - let this = Self { pipe }; - Ok((this, id)) - } - - pub async fn connect(self) -> anyhow::Result { - self.pipe.connect().await?; - Ok(Server { pipe: self.pipe }) - } - } - - /// Reads a message from an async reader, with a 32-bit little-endian length prefix - async fn read_bincode( - reader: &mut R, - ) -> anyhow::Result { - let mut len_buf = [0u8; 4]; - reader.read_exact(&mut len_buf).await?; - let len = u32::from_le_bytes(len_buf); - let mut buf = vec![0u8; usize::try_from(len)?]; - reader.read_exact(&mut buf).await?; - let msg = bincode::deserialize(&buf)?; - Ok(msg) - } - - /// Writes a message to an async writer, with a 32-bit little-endian length prefix - async fn write_bincode( - writer: &mut W, - msg: &T, - ) -> anyhow::Result<()> { - let buf = bincode::serialize(msg)?; - let len = u32::try_from(buf.len())?.to_le_bytes(); - writer.write_all(&len).await?; - writer.write_all(&buf).await?; - Ok(()) - } - - impl Server { - pub async fn request(&mut self, req: Request) -> anyhow::Result { - write_bincode(&mut self.pipe, &req).await?; - read_bincode(&mut self.pipe).await - } - } - - impl Client { - pub fn new(server_id: &str) -> anyhow::Result { - let pipe = named_pipe::ClientOptions::new().open(server_id)?; - Ok(Self { pipe }) - } - - pub async fn next_request(&mut self) -> anyhow::Result<(Request, Responder)> { - let req = read_bincode(&mut self.pipe).await?; - let responder = Responder { client: self }; - Ok((req, responder)) - } - } - - impl<'a> Responder<'a> { - pub async fn respond(self, resp: Response) -> anyhow::Result<()> { - write_bincode(&mut self.client.pipe, &resp).await?; - Ok(()) - } - } + use super::*; + use tokio::runtime::Runtime; /// Test just the happy path /// It's hard to simulate a process crash because: @@ -132,7 +287,7 @@ mod tests { let rt = Runtime::new()?; rt.block_on(async move { // Pretend we're in the main process - let (server, server_id) = UnconnectedServer::new()?; + let (server, server_id) = UnconnectedServer::_new()?; let worker_task = tokio::spawn(async move { // Pretend we're in a worker process