From 9d1b14debe5090826f6ab6fb4817ddff491e00dd Mon Sep 17 00:00:00 2001 From: Reactor Scram Date: Tue, 28 May 2024 10:07:44 -0500 Subject: [PATCH] refactor(gui-client): de-dupe IPC client code (#5091) Same as in #5074, almost all of the IPC code turned out to be platform-independent. Yak shaving towards #5022 --- rust/gui-client/src-tauri/src/client/ipc.rs | 78 ++++++++++- .../src-tauri/src/client/ipc/linux.rs | 94 ++----------- .../src-tauri/src/client/ipc/windows.rs | 130 ++---------------- 3 files changed, 103 insertions(+), 199 deletions(-) diff --git a/rust/gui-client/src-tauri/src/client/ipc.rs b/rust/gui-client/src-tauri/src/client/ipc.rs index 7ffb84200..6404c83e4 100644 --- a/rust/gui-client/src-tauri/src/client/ipc.rs +++ b/rust/gui-client/src-tauri/src/client/ipc.rs @@ -1,16 +1,16 @@ use crate::client::gui::{ControllerRequest, CtlrTx}; use anyhow::{Context as _, Result}; use arc_swap::ArcSwap; -use connlib_client_shared::callbacks::ResourceDescription; -use firezone_headless_client::IpcClientMsg; - +use connlib_client_shared::{callbacks::ResourceDescription, Callbacks}; +use firezone_headless_client::{IpcClientMsg, IpcServerMsg}; +use futures::{SinkExt, StreamExt}; +use secrecy::{ExposeSecret, SecretString}; use std::{ net::{IpAddr, Ipv4Addr, Ipv6Addr}, sync::Arc, }; use tokio::sync::Notify; - -pub(crate) use platform::Client; +use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; #[cfg(target_os = "linux")] #[path = "ipc/linux.rs"] @@ -58,7 +58,75 @@ impl connlib_client_shared::Callbacks for CallbackHandler { } } +pub(crate) struct Client { + task: tokio::task::JoinHandle>, + // Needed temporarily to avoid a big refactor. We can remove this in the future. + tx: FramedWrite, LengthDelimitedCodec>, +} + impl Client { + pub(crate) async fn disconnect(mut self) -> Result<()> { + self.send_msg(&IpcClientMsg::Disconnect) + .await + .context("Couldn't send Disconnect")?; + self.tx.close().await?; + self.task.abort(); + Ok(()) + } + + pub(crate) async fn send_msg(&mut self, msg: &IpcClientMsg) -> Result<()> { + self.tx + .send( + serde_json::to_string(msg) + .context("Couldn't encode IPC message as JSON")? + .into(), + ) + .await + .context("Couldn't send IPC message")?; + Ok(()) + } + + pub(crate) async fn connect( + api_url: &str, + token: SecretString, + callback_handler: CallbackHandler, + tokio_handle: tokio::runtime::Handle, + ) -> Result { + tracing::info!(pid = std::process::id(), "Connecting to IPC service..."); + let stream = platform::connect_to_service().await?; + let (rx, tx) = tokio::io::split(stream); + // Receives messages from the IPC service + let mut rx = FramedRead::new(rx, LengthDelimitedCodec::new()); + let tx = FramedWrite::new(tx, LengthDelimitedCodec::new()); + + let task = tokio_handle.spawn(async move { + while let Some(msg) = rx.next().await.transpose()? { + match serde_json::from_slice::(&msg)? { + IpcServerMsg::Ok => {} + IpcServerMsg::OnDisconnect => callback_handler.on_disconnect( + &connlib_client_shared::Error::Other("errors can't be serialized"), + ), + IpcServerMsg::OnUpdateResources(v) => callback_handler.on_update_resources(v), + IpcServerMsg::OnSetInterfaceConfig { ipv4, ipv6, dns } => { + callback_handler.on_set_interface_config(ipv4, ipv6, dns); + } + } + } + Ok(()) + }); + + let mut client = Self { task, tx }; + let token = token.expose_secret().clone(); + client + .send_msg(&IpcClientMsg::Connect { + api_url: api_url.to_string(), + token, + }) + .await + .context("Couldn't send Connect message")?; + Ok(client) + } + pub(crate) async fn reconnect(&mut self) -> Result<()> { self.send_msg(&IpcClientMsg::Reconnect) .await diff --git a/rust/gui-client/src-tauri/src/client/ipc/linux.rs b/rust/gui-client/src-tauri/src/client/ipc/linux.rs index a1a9ca8e3..ca9ac805c 100644 --- a/rust/gui-client/src-tauri/src/client/ipc/linux.rs +++ b/rust/gui-client/src-tauri/src/client/ipc/linux.rs @@ -1,82 +1,18 @@ use anyhow::{Context as _, Result}; -use connlib_client_shared::Callbacks; -use firezone_headless_client::{platform::sock_path, IpcClientMsg, IpcServerMsg}; -use futures::{SinkExt, StreamExt}; -use secrecy::{ExposeSecret, SecretString}; -use tokio::net::{unix::OwnedWriteHalf, UnixStream}; -use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; +use firezone_headless_client::platform::sock_path; +use tokio::net::UnixStream; -/// Forwards events to and from connlib -pub(crate) struct Client { - recv_task: tokio::task::JoinHandle>, - tx: FramedWrite, -} - -impl Client { - pub(crate) async fn disconnect(mut self) -> Result<()> { - self.send_msg(&IpcClientMsg::Disconnect) - .await - .context("Couldn't send Disconnect")?; - self.tx.close().await?; - self.recv_task.abort(); - Ok(()) - } - - pub(crate) async fn send_msg(&mut self, msg: &IpcClientMsg) -> Result<()> { - self.tx - .send( - serde_json::to_string(msg) - .context("Couldn't encode IPC message as JSON")? - .into(), - ) - .await - .context("Couldn't send IPC message")?; - Ok(()) - } - - pub(crate) async fn connect( - api_url: &str, - token: SecretString, - callback_handler: super::CallbackHandler, - tokio_handle: tokio::runtime::Handle, - ) -> Result { - tracing::info!(pid = std::process::id(), "Connecting to IPC service..."); - let stream = UnixStream::connect(sock_path()) - .await - .context("Couldn't connect to UDS")?; - let (rx, tx) = stream.into_split(); - // Receives messages from the IPC service - let mut rx = FramedRead::new(rx, LengthDelimitedCodec::new()); - let tx = FramedWrite::new(tx, LengthDelimitedCodec::new()); - - // TODO: Make sure this joins / drops somewhere - let recv_task = tokio_handle.spawn(async move { - while let Some(msg) = rx.next().await.transpose()? { - let msg: IpcServerMsg = serde_json::from_slice(&msg)?; - match msg { - IpcServerMsg::Ok => {} - IpcServerMsg::OnDisconnect => callback_handler.on_disconnect( - &connlib_client_shared::Error::Other("errors can't be serialized"), - ), - IpcServerMsg::OnUpdateResources(v) => callback_handler.on_update_resources(v), - IpcServerMsg::OnSetInterfaceConfig { ipv4, ipv6, dns } => { - callback_handler.on_set_interface_config(ipv4, ipv6, dns); - } - } - } - Ok(()) - }); - - let mut client = Self { recv_task, tx }; - let token = token.expose_secret().clone(); - client - .send_msg(&IpcClientMsg::Connect { - api_url: api_url.to_string(), - token, - }) - .await - .context("Couldn't send Connect message")?; - - Ok(client) - } +/// A type alias to abstract over the Windows and Unix IPC primitives +pub(crate) type IpcStream = UnixStream; + +/// Connect to the IPC service +pub(crate) async fn connect_to_service() -> Result { + let path = sock_path(); + let stream = UnixStream::connect(&path).await.with_context(|| { + format!( + "Couldn't connect to Unix domain socket at `{}`", + path.display() + ) + })?; + Ok(stream) } diff --git a/rust/gui-client/src-tauri/src/client/ipc/windows.rs b/rust/gui-client/src-tauri/src/client/ipc/windows.rs index 8a7a1d183..55a76e507 100644 --- a/rust/gui-client/src-tauri/src/client/ipc/windows.rs +++ b/rust/gui-client/src-tauri/src/client/ipc/windows.rs @@ -1,117 +1,17 @@ -use anyhow::{anyhow, Context as _, Result}; -use connlib_client_shared::Callbacks; -use firezone_headless_client::{IpcClientMsg, IpcServerMsg}; -use futures::{SinkExt, Stream}; -use secrecy::{ExposeSecret, SecretString}; -use std::{pin::pin, task::Poll}; -use tokio::{net::windows::named_pipe, sync::mpsc}; -use tokio_util::codec::{Framed, LengthDelimitedCodec}; +use anyhow::{Context as _, Result}; +use tokio::net::windows::named_pipe; -pub(crate) struct Client { - task: tokio::task::JoinHandle>, - // Needed temporarily to avoid a big refactor. We can remove this in the future. - tx: mpsc::Sender, -} - -impl Client { - pub(crate) async fn disconnect(mut self) -> Result<()> { - self.send_msg(&IpcClientMsg::Disconnect) - .await - .context("Couldn't send Disconnect")?; - self.task.abort(); - Ok(()) - } - - #[allow(clippy::unused_async)] - pub(crate) async fn send_msg(&mut self, msg: &IpcClientMsg) -> Result<()> { - self.tx - .send(serde_json::to_string(msg).context("Couldn't encode IPC message as JSON")?) - .await - .context("Couldn't send IPC message")?; - Ok(()) - } - - pub(crate) async fn connect( - api_url: &str, - token: SecretString, - callback_handler: super::CallbackHandler, - tokio_handle: tokio::runtime::Handle, - ) -> Result { - tracing::info!(pid = std::process::id(), "Connecting to IPC service..."); - let ipc = named_pipe::ClientOptions::new() - .open(firezone_headless_client::windows::pipe_path()) - .context("Couldn't connect to named pipe server")?; - let ipc = Framed::new(ipc, LengthDelimitedCodec::new()); - // This channel allows us to communicate with the GUI even though NamedPipeClient - // doesn't have `into_split`. - let (tx, mut rx) = mpsc::channel(1); - - let task = tokio_handle.spawn(async move { - let mut ipc = pin!(ipc); - loop { - let ev = std::future::poll_fn(|cx| { - match rx.poll_recv(cx) { - Poll::Ready(Some(msg)) => return Poll::Ready(Ok(IpcEvent::Gui(msg))), - Poll::Ready(None) => { - return Poll::Ready(Err(anyhow!("MPSC channel from GUI closed"))) - } - Poll::Pending => {} - } - - match ipc.as_mut().poll_next(cx) { - Poll::Ready(Some(msg)) => { - let msg = serde_json::from_slice(&msg?)?; - return Poll::Ready(Ok(IpcEvent::Connlib(msg))); - } - Poll::Ready(None) => { - return Poll::Ready(Err(anyhow!("IPC service disconnected from us"))) - } - Poll::Pending => {} - } - - Poll::Pending - }) - .await; - - match ev { - Ok(IpcEvent::Gui(msg)) => ipc.send(msg.into()).await?, - Ok(IpcEvent::Connlib(msg)) => match msg { - IpcServerMsg::Ok => {} - IpcServerMsg::OnDisconnect => callback_handler.on_disconnect( - &connlib_client_shared::Error::Other("errors can't be serialized"), - ), - IpcServerMsg::OnUpdateResources(v) => { - callback_handler.on_update_resources(v) - } - IpcServerMsg::OnSetInterfaceConfig { ipv4, ipv6, dns } => { - callback_handler.on_set_interface_config(ipv4, ipv6, dns); - } - }, - Err(error) => { - tracing::error!(?error, "Error while waiting for IPC tx/rx"); - // TODO: Catch that error when the task is joined - Err(error)? - } - } - } - }); - - let mut client = Self { task, tx }; - let token = token.expose_secret().clone(); - client - .send_msg(&IpcClientMsg::Connect { - api_url: api_url.to_string(), - token, - }) - .await - .context("Couldn't send Connect message")?; - Ok(client) - } -} - -enum IpcEvent { - /// The GUI wants to send a message to the service - Gui(String), - /// The connlib instance in the service wants to send a message to the GUI - Connlib(IpcServerMsg), +/// A type alias to abstract over the Windows and Unix IPC primitives +pub(crate) type IpcStream = named_pipe::NamedPipeClient; + +/// Connect to the IPC service +/// +/// This is async on Linux +#[allow(clippy::unused_async)] +pub(crate) async fn connect_to_service() -> Result { + let path = firezone_headless_client::windows::pipe_path(); + let stream = named_pipe::ClientOptions::new() + .open(path) + .with_context(|| "Couldn't connect to named pipe server at `{path}`")?; + Ok(stream) }