From a2bd667c6902dc8ed186eb964c7f504ebebca5f8 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 15 May 2025 15:47:29 +1000 Subject: [PATCH] refactor(gui-client): use existing IPC framework for deeplinks (#9047) We already have a pretty powerful IPC framework in place to communicate between the GUI and the service process. The deeplink implemenation uses the same IPC mechanisms (UDS / pipes), yet it is effectively a re-implementation of what we already have, just with less functionality. In order to provide a more sophisticated handling of the case where Firezone is launched again while it is already running, we refactor the deeplink module to reuse the existing IPC framework. This makes it quite easy to then reuse this in order to ping the already running Firezone process that a new instance was launched. For now, this doesn't do anything other than writing a log entry. This however lays enough ground-work for us to then implement a more sophisticated handling of that case in the future, e.g. open new windows etc. One caveat here is that we are now trying to connect to an existing IPC socket on every startup, even the first one. Our IPC code has a retry loop of 10 iterations to be more resilient on Windows when connecting to pipes. Without any further changes, this would now delay the start of Firezone always by 1s because we would try to connect to the socket 10x before concluding that we are the first instance. To fix this, we make the number of attempts configurable and set it to 1 when attempting to the GUI IPC socket to avoid unnecessary delays in starting up the Client. Related: #5143. --- .../src-tauri/src/bin/firezone-gui-client.rs | 4 +- rust/gui-client/src-tauri/src/controller.rs | 199 ++++++++++----- rust/gui-client/src-tauri/src/deep_link.rs | 68 +++--- .../src-tauri/src/deep_link/linux.rs | 92 ------- .../src-tauri/src/deep_link/macos.rs | 24 +- .../src-tauri/src/deep_link/windows.rs | 102 +------- rust/gui-client/src-tauri/src/gui.rs | 124 +++++++--- rust/gui-client/src-tauri/src/ipc.rs | 227 ++++++------------ rust/gui-client/src-tauri/src/ipc/linux.rs | 29 ++- rust/gui-client/src-tauri/src/ipc/macos.rs | 6 +- rust/gui-client/src-tauri/src/ipc/windows.rs | 32 ++- rust/gui-client/src-tauri/src/service.rs | 134 ++++++++--- scripts/tests/linux-group.sh | 2 +- 13 files changed, 480 insertions(+), 563 deletions(-) diff --git a/rust/gui-client/src-tauri/src/bin/firezone-gui-client.rs b/rust/gui-client/src-tauri/src/bin/firezone-gui-client.rs index 947422567..f2a3bf46b 100644 --- a/rust/gui-client/src-tauri/src/bin/firezone-gui-client.rs +++ b/rust/gui-client/src-tauri/src/bin/firezone-gui-client.rs @@ -73,7 +73,7 @@ fn main() -> anyhow::Result<()> { firezone_gui_client::logging::setup_stdout()?; let rt = tokio::runtime::Runtime::new()?; - if let Err(error) = rt.block_on(deep_link::open(&deep_link.url)) { + if let Err(error) = rt.block_on(deep_link::open(deep_link.url)) { tracing::error!("Error in `OpenDeepLink`: {error:#}"); } Ok(()) @@ -144,7 +144,7 @@ fn run_gui(config: RunConfig) -> Result<()> { return Err(anyhow); } - if anyhow.root_cause().is::() { + if anyhow.root_cause().is::() { show_error_dialog( "Firezone is already running. If it's not responding, force-stop it.", )?; diff --git a/rust/gui-client/src-tauri/src/controller.rs b/rust/gui-client/src-tauri/src/controller.rs index b06499001..505f0e39f 100644 --- a/rust/gui-client/src-tauri/src/controller.rs +++ b/rust/gui-client/src-tauri/src/controller.rs @@ -1,7 +1,8 @@ use crate::{ auth, deep_link, - gui::system_tray, - ipc, logging, + gui::{self, system_tray}, + ipc::{self, SocketId}, + logging, service, settings::{self, AdvancedSettings}, updates, uptime, }; @@ -11,7 +12,7 @@ use firezone_bin_shared::DnsControlMethod; use firezone_logging::FilterReloadHandle; use firezone_telemetry::Telemetry; use futures::{ - Stream, StreamExt, + SinkExt, Stream, StreamExt, stream::{self, BoxStream}, }; use secrecy::{ExposeSecret as _, SecretString}; @@ -33,8 +34,8 @@ pub struct Controller { auth: auth::Auth, clear_logs_callback: Option>>, ctlr_tx: CtlrTx, - ipc_client: ipc::Client, - ipc_rx: ipc::ClientRead, + ipc_client: ipc::ClientWrite, + ipc_rx: ipc::ClientRead, integration: I, log_filter_reloader: FilterReloadHandle, /// A release that's ready to download @@ -44,6 +45,14 @@ pub struct Controller { updates_rx: ReceiverStream>, uptime: uptime::Tracker, + gui_ipc_clients: BoxStream< + 'static, + Result<( + ipc::ServerRead, + ipc::ServerWrite, + )>, + >, + dns_notifier: BoxStream<'static, Result<()>>, network_notifier: BoxStream<'static, Result<()>>, } @@ -75,7 +84,6 @@ pub enum ControllerRequest { }, Fail(Failure), GetAdvancedSettings(oneshot::Sender), - SchemeRequest(SecretString), SignIn, SystemTrayMenu(system_tray::Event), UpdateNotificationClicked(Url), @@ -167,23 +175,33 @@ impl Status { enum EventloopTick { NetworkChanged(Result<()>), DnsChanged(Result<()>), - IpcMsg(Option>), + IpcMsg(Option>), ControllerRequest(Option), UpdateNotification(Option>), + NewInstanceLaunched( + Option< + Result<( + ipc::ServerRead, + ipc::ServerWrite, + )>, + >, + ), } impl Controller { - pub async fn start( + pub(crate) async fn start( ctlr_tx: CtlrTx, integration: I, rx: mpsc::Receiver, advanced_settings: AdvancedSettings, log_filter_reloader: FilterReloadHandle, updates_rx: mpsc::Receiver>, + gui_ipc: ipc::Server, ) -> Result<()> { tracing::debug!("Starting new instance of `Controller`"); - let (ipc_client, ipc_rx) = ipc::Client::new().await?; + let (ipc_rx, ipc_client) = + ipc::connect(SocketId::Tunnel, ipc::ConnectOptions::default()).await?; let dns_notifier = new_dns_notifier().await?.boxed(); let network_notifier = new_network_notifier().await?.boxed(); @@ -204,6 +222,12 @@ impl Controller { uptime: Default::default(), dns_notifier, network_notifier, + gui_ipc_clients: stream::unfold(gui_ipc, |mut gui_ipc| async move { + let result = gui_ipc.next_client_split().await; + + Some((result, gui_ipc)) + }) + .boxed(), }; controller.main_loop().await?; @@ -234,7 +258,7 @@ impl Controller { EventloopTick::NetworkChanged(Ok(())) => { if self.status.needs_network_changes() { tracing::debug!("Internet up/down changed, calling `Session::reset`"); - self.ipc_client.reset().await? + self.send_ipc(&service::ClientMsg::Reset).await? } self.try_retry_connection().await? @@ -246,7 +270,8 @@ impl Controller { ?resolvers, "New DNS resolvers, calling `Session::set_dns`" ); - self.ipc_client.set_dns(resolvers).await?; + self.send_ipc(&service::ClientMsg::SetDns(resolvers)) + .await?; } self.try_retry_connection().await? @@ -260,7 +285,7 @@ impl Controller { .context("IPC closed")? .context("Failed to read from IPC")?; - match self.handle_ipc_msg(msg).await? { + match self.handle_service_ipc_msg(msg).await? { ControlFlow::Break(()) => break, ControlFlow::Continue(()) => continue, }; @@ -277,12 +302,29 @@ impl Controller { EventloopTick::UpdateNotification(None) => { return Err(anyhow!("Update checker task stopped")); } + EventloopTick::NewInstanceLaunched(None) => { + return Err(anyhow!("GUI IPC socket closed")); + } + EventloopTick::NewInstanceLaunched(Some(Err(e))) => { + tracing::warn!("Failed to accept IPC connection from new GUI instance: {e:#}"); + } + EventloopTick::NewInstanceLaunched(Some(Ok((mut read, mut write)))) => { + let client_msg = read.next().await; + + if let Err(e) = self.handle_gui_ipc_msg(client_msg).await { + tracing::debug!("Failed to handle IPC message from new GUI instance: {e:#}") + } + + if let Err(e) = write.send(&gui::ServerMsg::Ack).await { + tracing::debug!("Failed to ack IPC message from new GUI instance: {e:#}") + } + } } } tracing::debug!("Closing..."); - if let Err(error) = self.ipc_client.disconnect_from_ipc().await { + if let Err(error) = self.ipc_client.close().await { tracing::error!("ipc_client: {error:#}"); } @@ -313,6 +355,10 @@ impl Controller { return Poll::Ready(Some(EventloopTick::UpdateNotification(notification))); } + if let Poll::Ready(new_instance) = self.gui_ipc_clients.poll_next_unpin(cx) { + return Poll::Ready(Some(EventloopTick::NewInstanceLaunched(new_instance))); + } + Poll::Pending }) .await @@ -335,9 +381,12 @@ impl Controller { // Count the start instant from before we connect let start_instant = Instant::now(); - self.ipc_client - .connect_to_firezone(api_url.as_str(), token.expose_secret().clone().into()) - .await?; + self.send_ipc(&service::ClientMsg::Connect { + api_url: api_url.to_string(), + token: token.expose_secret().clone(), + }) + .await?; + // Change the status after we begin connecting self.status = Status::WaitingForPortal { start_instant, @@ -355,18 +404,17 @@ impl Controller { Telemetry::set_account_slug(account_slug); } - self.ipc_client - .send_msg(&ipc::ClientMsg::StartTelemetry { - environment, - release: crate::RELEASE.to_string(), - account_slug, - }) - .await?; + self.send_ipc(&service::ClientMsg::StartTelemetry { + environment, + release: crate::RELEASE.to_string(), + account_slug, + }) + .await?; Ok(()) } - async fn handle_deep_link(&mut self, url: &SecretString) -> Result<()> { + async fn handle_deep_link(&mut self, url: &Url) -> Result<()> { let auth_response = deep_link::parse_auth_callback(url).context("Couldn't parse scheme request")?; @@ -393,11 +441,10 @@ impl Controller { self.advanced_settings = *settings; - self.ipc_client - .send_msg(&ipc::ClientMsg::ApplyLogFilter { - directives: self.advanced_settings.log_filter.clone(), - }) - .await?; + self.send_ipc(&service::ClientMsg::ApplyLogFilter { + directives: self.advanced_settings.log_filter.clone(), + }) + .await?; tracing::debug!("Applied new settings. Log level will take effect immediately."); @@ -413,7 +460,7 @@ impl Controller { if let Err(error) = logging::clear_gui_logs().await { tracing::error!("Failed to clear GUI logs: {error:#}"); } - self.ipc_client.send_msg(&ipc::ClientMsg::ClearLogs).await?; + self.send_ipc(&service::ClientMsg::ClearLogs).await?; self.clear_logs_callback = Some(completion_tx); } Req::ExportLogs { path, stem } => logging::export_logs_to(path, stem) @@ -429,20 +476,6 @@ impl Controller { Req::GetAdvancedSettings(tx) => { tx.send(self.advanced_settings.clone()).ok(); } - Req::SchemeRequest(url) => match self.handle_deep_link(&url).await { - Ok(()) => {} - Err(error) - if error - .root_cause() - .downcast_ref::() - .is_some_and(|e| matches!(e, auth::Error::NoInflightRequest)) => - { - tracing::debug!("Ignoring deep-link; no local state"); - } - Err(error) => { - tracing::error!("`handle_deep_link` failed: {error:#}"); - } - }, Req::SignIn | Req::SystemTrayMenu(system_tray::Event::SignIn) => { let req = self .auth @@ -528,9 +561,7 @@ impl Controller { Req::SystemTrayMenu(system_tray::Event::Quit) => { tracing::info!("User clicked Quit in the menu"); self.status = Status::Quitting; - self.ipc_client - .send_msg(&ipc::ClientMsg::Disconnect) - .await?; + self.send_ipc(&service::ClientMsg::Disconnect).await?; self.refresh_system_tray_menu(); } Req::UpdateNotificationClicked(download_url) => { @@ -543,9 +574,9 @@ impl Controller { Ok(()) } - async fn handle_ipc_msg(&mut self, msg: ipc::ServerMsg) -> Result> { + async fn handle_service_ipc_msg(&mut self, msg: service::ServerMsg) -> Result> { match msg { - ipc::ServerMsg::ClearedLogs(result) => { + service::ServerMsg::ClearedLogs(result) => { let Some(tx) = self.clear_logs_callback.take() else { return Err(anyhow!( "Can't handle `IpcClearedLogs` when there's no callback waiting for a `ClearLogs` result" @@ -554,15 +585,15 @@ impl Controller { tx.send(result) .map_err(|_| anyhow!("Couldn't send `ClearLogs` result to Tauri task"))?; } - ipc::ServerMsg::ConnectResult(result) => { + service::ServerMsg::ConnectResult(result) => { self.handle_connect_result(result).await?; } - ipc::ServerMsg::DisconnectedGracefully => { + service::ServerMsg::DisconnectedGracefully => { if let Status::Quitting = self.status { return Ok(ControlFlow::Break(())); } } - ipc::ServerMsg::OnDisconnect { + service::ServerMsg::OnDisconnect { error_msg, is_authentication_error, } => { @@ -583,7 +614,7 @@ impl Controller { .context("Couldn't show Disconnected alert")?; } } - ipc::ServerMsg::OnUpdateResources(resources) => { + service::ServerMsg::OnUpdateResources(resources) => { if !self.status.needs_resource_updates() { return Ok(ControlFlow::Continue(())); } @@ -593,7 +624,7 @@ impl Controller { self.update_disabled_resources().await?; } - ipc::ServerMsg::TerminatingGracefully => { + service::ServerMsg::TerminatingGracefully => { tracing::info!("IPC service exited gracefully"); self.integration .set_tray_icon(system_tray::icon_terminating()); @@ -604,7 +635,7 @@ impl Controller { return Ok(ControlFlow::Break(())); } - ipc::ServerMsg::TunnelReady => { + service::ServerMsg::TunnelReady => { let Status::WaitingForTunnel { start_instant } = self.status else { // If we are not waiting for a tunnel, continue. return Ok(ControlFlow::Continue(())); @@ -622,7 +653,41 @@ impl Controller { Ok(ControlFlow::Continue(())) } - async fn handle_connect_result(&mut self, result: Result<(), ipc::Error>) -> Result<()> { + async fn handle_gui_ipc_msg( + &mut self, + maybe_msg: Option>, + ) -> Result<()> { + let client_msg = maybe_msg + .context("No message received")? + .context("Failed to read message")?; + + match client_msg { + gui::ClientMsg::Deeplink(url) => match self.handle_deep_link(&url).await { + Ok(()) => {} + Err(error) + if error + .root_cause() + .downcast_ref::() + .is_some_and(|e| matches!(e, auth::Error::NoInflightRequest)) => + { + tracing::debug!("Ignoring deep-link; no local state"); + } + Err(error) => { + tracing::error!("`handle_deep_link` failed: {error:#}"); + } + }, + gui::ClientMsg::NewInstance => { + tracing::debug!("A new instance of Firezone has been launched") + } + } + + Ok(()) + } + + async fn handle_connect_result( + &mut self, + result: Result<(), service::ConnectError>, + ) -> Result<()> { let Status::WaitingForPortal { start_instant, token, @@ -642,7 +707,7 @@ impl Controller { self.refresh_system_tray_menu(); Ok(()) } - Err(ipc::Error::Io(error)) => { + Err(service::ConnectError::Io(error)) => { // This is typically something like, we don't have Internet access so we can't // open the PhoenixChannel's WebSocket. tracing::info!( @@ -655,7 +720,7 @@ impl Controller { self.refresh_system_tray_menu(); Ok(()) } - Err(ipc::Error::Other(error)) => { + Err(service::ConnectError::Other(error)) => { // We log this here directly instead of forwarding it because errors hard-abort the event-loop and we still want to be able to export logs and stuff. // See . tracing::error!("Failed to connect to Firezone: {error}"); @@ -709,9 +774,10 @@ impl Controller { disabled_resources.insert(internet_resource.id()); } - self.ipc_client - .send_msg(&ipc::ClientMsg::SetDisabledResources(disabled_resources)) - .await?; + self.send_ipc(&service::ClientMsg::SetDisabledResources( + disabled_resources, + )) + .await?; self.refresh_system_tray_menu(); Ok(()) @@ -791,12 +857,17 @@ impl Controller { tracing::debug!("disconnecting connlib"); // This is redundant if the token is expired, in that case // connlib already disconnected itself. - self.ipc_client - .send_msg(&ipc::ClientMsg::Disconnect) - .await?; + self.send_ipc(&service::ClientMsg::Disconnect).await?; self.refresh_system_tray_menu(); Ok(()) } + + async fn send_ipc(&mut self, msg: &service::ClientMsg) -> Result<()> { + self.ipc_client + .send(msg) + .await + .context("Failed to send IPC message") + } } async fn new_dns_notifier() -> Result>> { diff --git a/rust/gui-client/src-tauri/src/deep_link.rs b/rust/gui-client/src-tauri/src/deep_link.rs index 4f9bcc9a4..a36ecfeb5 100644 --- a/rust/gui-client/src-tauri/src/deep_link.rs +++ b/rust/gui-client/src-tauri/src/deep_link.rs @@ -3,9 +3,15 @@ // The IPC parts use the same primitives as the IPC service, UDS on Linux // and named pipes on Windows, so TODO de-dupe the IPC code -use crate::auth; +use crate::{ + auth, + gui::{self, ServerMsg}, + ipc::SocketId, +}; use anyhow::{Context as _, Result, bail}; -use secrecy::{ExposeSecret, SecretString}; +use futures::SinkExt as _; +use secrecy::SecretString; +use tokio_stream::StreamExt as _; use url::Url; #[cfg(any(target_os = "linux", target_os = "windows"))] @@ -24,17 +30,35 @@ mod imp; #[path = "deep_link/windows.rs"] mod imp; -#[derive(thiserror::Error, Debug)] -#[error("named pipe server couldn't start listening, we are probably the second instance")] -pub struct CantListen; +pub use imp::register; -pub use imp::{Server, open, register}; +pub async fn open(url: url::Url) -> Result<()> { + let (mut read, mut write) = crate::ipc::connect::( + SocketId::Gui, + crate::ipc::ConnectOptions::default(), + ) + .await?; + + write + .send(&gui::ClientMsg::Deeplink(url)) + .await + .context("Failed to send deep-link")?; + + let response = read + .next() + .await + .context("No response received")? + .context("Failed to receive response")?; + + anyhow::ensure!(response == ServerMsg::Ack); + + Ok(()) +} /// Parses a deep-link URL into a struct. /// /// e.g. `firezone-fd0020211111://handle_client_sign_in_callback/?state=secret&fragment=secret&account_name=Firezone&account_slug=firezone&actor_name=Jane+Doe&identity_provider_identifier=secret` -pub(crate) fn parse_auth_callback(url_secret: &SecretString) -> Result { - let url = Url::parse(url_secret.expose_secret())?; +pub(crate) fn parse_auth_callback(url: &Url) -> Result { if Some(url::Host::Domain("handle_client_sign_in_callback")) != url.host() { bail!("URL host should be `handle_client_sign_in_callback`"); } @@ -92,8 +116,8 @@ pub(crate) fn parse_auth_callback(url_secret: &SecretString) -> Result Result<()> { @@ -144,28 +168,6 @@ mod tests { } fn parse_callback_wrapper(s: &str) -> Result { - super::parse_auth_callback(&SecretString::new(s.to_owned())) - } - - /// Tests the named pipe or Unix domain socket, doesn't test the URI scheme itself - /// - /// Will fail if any other Firezone Client instance is running - /// Will fail with permission error if Firezone already ran as sudo - #[tokio::test] - async fn socket_smoke_test() -> Result<()> { - let server = Server::new().await.context("Couldn't start Server")?; - let server_task = tokio::spawn(async move { - let bytes = server.accept().await?; - Ok::<_, anyhow::Error>(bytes) - }); - let id = uuid::Uuid::new_v4().to_string(); - let expected_url = url::Url::parse(&format!("bogus-test-schema://{id}"))?; - super::open(&expected_url).await?; - - let bytes = server_task.await??.unwrap(); - let s = std::str::from_utf8(bytes.expose_secret())?; - let url = url::Url::parse(s)?; - assert_eq!(url, expected_url); - Ok(()) + super::parse_auth_callback(&s.parse()?) } } diff --git a/rust/gui-client/src-tauri/src/deep_link/linux.rs b/rust/gui-client/src-tauri/src/deep_link/linux.rs index 33623e1c4..fa8a1eff1 100644 --- a/rust/gui-client/src-tauri/src/deep_link/linux.rs +++ b/rust/gui-client/src-tauri/src/deep_link/linux.rs @@ -1,97 +1,5 @@ use anyhow::{Context, Result, bail}; -use firezone_bin_shared::known_dirs; -use secrecy::{ExposeSecret, Secret}; use std::{io::ErrorKind, path::PathBuf, process::Command}; -use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, - net::{UnixListener, UnixStream}, -}; - -use super::CantListen; - -const SOCK_NAME: &str = "deep_link.sock"; - -pub struct Server { - listener: UnixListener, -} - -fn sock_path() -> Result { - Ok(known_dirs::runtime() - .context("Couldn't find runtime dir")? - .join(SOCK_NAME)) -} - -impl Server { - /// Create a new deep link server to make sure we're the only instance - /// - /// Still uses `thiserror` so we can catch the deep_link `CantListen` error - /// On Windows this uses async because of #5143 and #5566. - #[expect(clippy::unused_async)] - pub async fn new() -> Result { - let path = sock_path()?; - let dir = path - .parent() - .context("Impossible, socket path should always have a parent")?; - - // Try to `connect` to the socket as a client. - // If it succeeds, that means there is already a Firezone instance listening - // as a server on that socket, and we should exit. - // If it fails, it means nobody is listening on the socket, or the - // socket does not exist, in which case we are the only instance - // and should proceed. - if std::os::unix::net::UnixStream::connect(&path).is_ok() { - return Err(anyhow::Error::new(CantListen)); - } - std::fs::remove_file(&path).ok(); - std::fs::create_dir_all(dir).context("Can't create dir for deep link socket")?; - - // TODO: TOCTOU error here. - // It's possible for 2 processes to see the `connect` call fail, then one - // binds the socket, and the other deletes the socket and binds a different - // socket at the same path, resulting in 2 instances with confusing behavior. - // The `bind` call should probably go first, but without more testing and more - // thought, I don't want to re-arrange it yet. - - let listener = UnixListener::bind(&path).context("Couldn't bind listener Unix socket")?; - - Ok(Self { listener }) - } - - /// Await one incoming deep link - /// - /// To match the Windows API, this consumes the `Server`. - pub async fn accept(self) -> Result>>> { - tracing::debug!("deep_link::accept"); - let (mut stream, _) = self.listener.accept().await?; - tracing::debug!("Accepted Unix domain socket connection"); - - // TODO: Limit reads to 4,096 bytes. Partial reads will probably never happen - // since it's a local socket transferring very small data. - let mut bytes = vec![]; - stream - .read_to_end(&mut bytes) - .await - .context("failed to read incoming deep link over Unix socket stream")?; - if bytes.is_empty() { - return Ok(None); - } - let bytes = Secret::new(bytes); - tracing::debug!( - len = bytes.expose_secret().len(), - "Got data from Unix domain socket" - ); - Ok(Some(bytes)) - } -} - -pub async fn open(url: &url::Url) -> Result<()> { - let path = sock_path()?; - let mut stream = UnixStream::connect(&path).await?; - - stream.write_all(url.to_string().as_bytes()).await?; - - Ok(()) -} /// Register a URI scheme so that browser can deep link into our app for auth /// diff --git a/rust/gui-client/src-tauri/src/deep_link/macos.rs b/rust/gui-client/src-tauri/src/deep_link/macos.rs index e4c81b8d4..bc07fd726 100644 --- a/rust/gui-client/src-tauri/src/deep_link/macos.rs +++ b/rust/gui-client/src-tauri/src/deep_link/macos.rs @@ -1,27 +1,5 @@ -use std::path::PathBuf; - use anyhow::{Result, bail}; -use secrecy::Secret; - -pub struct Server {} - -impl Server { - #[expect( - clippy::unused_async, - reason = "Signture must match other operating systems" - )] - pub async fn new() -> Result { - bail!("not implemented") - } - - pub async fn accept(self) -> Result>>> { - futures::future::pending().await - } -} - -pub async fn open(_url: &url::Url) -> Result<()> { - bail!("not implemented") -} +use std::path::PathBuf; pub fn register(_path: PathBuf) -> Result<()> { bail!("not implemented") diff --git a/rust/gui-client/src-tauri/src/deep_link/windows.rs b/rust/gui-client/src-tauri/src/deep_link/windows.rs index 207e1ee3b..71d458973 100644 --- a/rust/gui-client/src-tauri/src/deep_link/windows.rs +++ b/rust/gui-client/src-tauri/src/deep_link/windows.rs @@ -1,111 +1,13 @@ -//! A module for registering, catching, and parsing deep links that are sent over to the app's already-running instance +//! A module for registering deep links that are sent over to the app's already-running instance //! Based on reading some of the Windows code from , which is licensed "MIT OR Apache-2.0" -use super::{CantListen, FZ_SCHEME}; +use super::FZ_SCHEME; use anyhow::{Context, Result}; use firezone_bin_shared::BUNDLE_ID; -use firezone_logging::err_with_src; -use secrecy::Secret; use std::{ io, path::{Path, PathBuf}, - time::Duration, }; -use tokio::{io::AsyncReadExt, io::AsyncWriteExt, net::windows::named_pipe}; - -/// A server for a named pipe, so we can receive deep links from other instances -/// of the client launched by web browsers -pub struct Server { - inner: named_pipe::NamedPipeServer, -} - -impl Server { - /// Construct a server, but don't await client connections yet - /// - /// Panics if there is no Tokio runtime - /// Still uses `thiserror` so we can catch the deep_link `CantListen` error - pub async fn new() -> Result { - // This isn't air-tight - We recreate the whole server on each loop, - // rather than binding 1 socket and accepting many streams like a normal socket API. - // Tokio appears to be following Windows' underlying API here, so not - // much we can do until Unix domain sockets have wide support in Windows. - let server = bind_to_pipe(&pipe_path()).await?; - - tracing::debug!("server is bound"); - Ok(Server { inner: server }) - } - - /// Await one incoming deep link from a named pipe client - /// Tokio's API is strange, so this consumes the server. - /// I assume this is based on the underlying Windows API. - /// I tried re-using the server and it acted strange. The official Tokio - /// examples are not clear on this. - pub async fn accept(mut self) -> Result>>> { - self.inner - .connect() - .await - .context("Couldn't accept connection from named pipe client")?; - tracing::debug!("server got connection"); - - // TODO: Limit the read size here. Our typical callback is 350 bytes, so 4,096 bytes should be more than enough. - // Also, I think `read_to_end` can do partial reads because this is a named pipe, - // not a file. We might need a length-prefixed or newline-terminated format for IPC. - let mut bytes = vec![]; - self.inner - .read_to_end(&mut bytes) - .await - .context("Couldn't read bytes from named pipe client")?; - let bytes = Secret::new(bytes); - - self.inner.disconnect().ok(); - Ok(Some(bytes)) - } -} - -async fn bind_to_pipe(pipe_path: &str) -> Result { - const NUM_ITERS: usize = 10; - // Relating to #5143 and #5566, sometimes re-creating a named pipe server - // in a loop fails. This is copied from `firezone_headless_client::ipc_service::ipc::windows`. - for i in 0..NUM_ITERS { - match create_pipe_server(pipe_path) { - Ok(server) => return Ok(server), - Err(e) => { - tracing::debug!( - "`create_pipe_server` failed {}; sleeping... (attempt {i}/{NUM_ITERS})", - err_with_src(&e) - ); - tokio::time::sleep(Duration::from_secs(1)).await; - } - } - } - Err(anyhow::Error::new(CantListen)) -} - -fn create_pipe_server(pipe_path: &str) -> io::Result { - let mut server_options = named_pipe::ServerOptions::new(); - server_options.first_pipe_instance(true); - - let server = server_options.create(pipe_path)?; - - Ok(server) -} - -/// Open a deep link by sending it to the already-running instance of the app -pub async fn open(url: &url::Url) -> Result<()> { - let path = pipe_path(); - let mut client = named_pipe::ClientOptions::new() - .open(&path) - .with_context(|| format!("Couldn't connect to named pipe server at `{path}`"))?; - client - .write_all(url.as_str().as_bytes()) - .await - .with_context(|| format!("Couldn't write bytes to named pipe server at `{path}`"))?; - Ok(()) -} - -fn pipe_path() -> String { - crate::ipc::platform::named_pipe_path(&format!("{BUNDLE_ID}.deep_link")) -} /// Registers the current exe as the handler for our deep link scheme. /// diff --git a/rust/gui-client/src-tauri/src/gui.rs b/rust/gui-client/src-tauri/src/gui.rs index 0033df034..4d68092db 100644 --- a/rust/gui-client/src-tauri/src/gui.rs +++ b/rust/gui-client/src-tauri/src/gui.rs @@ -6,17 +6,20 @@ use crate::{ about, controller::{Controller, ControllerRequest, CtlrTx, Failure, GuiIntegration}, - deep_link, logging, + deep_link, + ipc::{self, ClientRead, ClientWrite, SocketId}, + logging, settings::{self, AdvancedSettings}, updates, }; use anyhow::{Context, Result, bail}; use firezone_logging::err_with_src; use firezone_telemetry as telemetry; -use secrecy::{ExposeSecret as _, SecretString}; -use std::{str::FromStr, time::Duration}; +use futures::SinkExt as _; +use std::time::Duration; use tauri::Manager; use tokio::sync::mpsc; +use tokio_stream::StreamExt; use tracing::instrument; pub mod system_tray; @@ -123,6 +126,23 @@ pub struct RunConfig { pub fail_with: Option, } +/// IPC Messages that a newly launched instance (i.e. a client) may send to an already running instance of Firezone. +#[derive(Debug, PartialEq, serde::Deserialize, serde::Serialize)] +pub enum ClientMsg { + Deeplink(url::Url), + NewInstance, +} + +/// IPC Messages that an already running instance of Firezone may send to a newly launched instance. +#[derive(Debug, PartialEq, serde::Deserialize, serde::Serialize)] +pub enum ServerMsg { + Ack, +} + +#[derive(Debug, thiserror::Error)] +#[error("Another instance of Firezone is already running")] +pub struct AlreadyRunning; + /// Runs the Tauri GUI and returns on exit or unrecoverable error #[instrument(skip_all)] pub fn run( @@ -137,10 +157,14 @@ pub fn run( let _guard = rt.enter(); - // Make sure we're single-instance - // We register our deep links to call the `open-deep-link` subcommand, - // so if we're at this point, we know we've been launched manually - let deep_link_server = rt.block_on(deep_link::Server::new())?; + let gui_ipc = match rt.block_on(create_gui_ipc_server()) { + Ok(gui_ipc) => gui_ipc, + Err(e) => { + tracing::debug!("{e:#}"); + + return Err(anyhow::Error::new(AlreadyRunning)); + } + }; let (ctlr_tx, ctlr_rx) = mpsc::channel(5); let (ready_tx, mut ready_rx) = mpsc::channel::(1); @@ -218,7 +242,6 @@ pub fn run( // to handle deep links let exe = tauri_utils::platform::current_exe().context("Can't find our own exe path")?; deep_link::register(exe).context("Failed to register deep link handler")?; - tokio::spawn(accept_deep_links(deep_link_server, ctlr_tx.clone())); } if let Some(failure) = config.fail_with { @@ -275,6 +298,7 @@ pub fn run( advanced_settings, reloader, updates_rx, + gui_ipc )); anyhow::Ok(ctrl_task) @@ -396,36 +420,6 @@ async fn smoke_test(ctlr_tx: CtlrTx) -> Result<()> { Ok::<_, anyhow::Error>(()) } -/// Worker task to accept deep links from a named pipe forever -/// -/// * `server` An initial named pipe server to consume before making new servers. This lets us also use the named pipe to enforce single-instance -async fn accept_deep_links(mut server: deep_link::Server, ctlr_tx: CtlrTx) -> Result<()> { - loop { - match server.accept().await { - Ok(Some(bytes)) => { - let url = SecretString::from_str( - std::str::from_utf8(bytes.expose_secret()) - .context("Incoming deep link was not valid UTF-8")?, - ) - .context("Impossible: can't wrap String into SecretString")?; - // Ignore errors from this, it would only happen if the app is shutting down, otherwise we would wait - ctlr_tx - .send(ControllerRequest::SchemeRequest(url)) - .await - .ok(); - } - Ok(None) => { - tracing::debug!("Accepted deep-link but read 0 bytes, trying again ..."); - } - Err(error) => { - tracing::warn!("Failed to accept deep link: {error:#}") - } - } - // We re-create the named pipe server every time we get a link, because of an oddity in the Windows API. - server = deep_link::Server::new().await?; - } -} - fn handle_system_tray_event(app: &tauri::AppHandle, event: system_tray::Event) -> Result<()> { app.try_state::() .context("can't get Managed struct from Tauri")? @@ -433,3 +427,57 @@ fn handle_system_tray_event(app: &tauri::AppHandle, event: system_tray::Event) - .blocking_send(ControllerRequest::SystemTrayMenu(event))?; Ok(()) } + +/// Create a new instance of the GUI IPC server. +/// +/// Every time Firezone gets launched, we attempt to connect to this server. +/// If we can successfully connect and handshake a message, then we know that there is a functioning instance of Firezone already running. +/// +/// If we hit an IO errors during the connection, we assume that there isn't already an instance of Firezone and we consider us to be the first instance. +/// +/// There is a third, somewhat gnarly case: +/// +/// An instance of Firezone may already be running but it is not responding. +/// Launching another instance on top of it would likely create more problems that it solves, so we also need to fail for that case. +async fn create_gui_ipc_server() -> Result { + let (read, write) = match ipc::connect::( + SocketId::Gui, + ipc::ConnectOptions { num_attempts: 1 }, + ) + .await + { + Ok(pair) => pair, + Err(e) => { + // If we can't connect to the socket, we must be the first instance. + tracing::debug!( + "We appear to be the first instance of the GUI client; connecting to socket yielded: {e:#}" + ); + + return ipc::Server::new(SocketId::Gui).context("Failed to create GUI IPC socket"); + } + }; + + tokio::time::timeout(Duration::from_secs(5), new_instance_handshake(read, write)) + .await + .context("Failed to handshake with existing instance in 5s")??; + + // If we managed to send the IPC message then another instance of Firezone is already running. + + bail!("Successfully handshaked with existing instance of Firezone GUI") +} + +async fn new_instance_handshake( + mut read: ClientRead, + mut write: ClientWrite, +) -> Result<()> { + write.send(&ClientMsg::NewInstance).await?; + let response = read + .next() + .await + .context("No response received")? + .context("Failed to receive response")?; + + anyhow::ensure!(response == ServerMsg::Ack); + + Ok(()) +} diff --git a/rust/gui-client/src-tauri/src/ipc.rs b/rust/gui-client/src-tauri/src/ipc.rs index 664b1e42b..c654557c7 100644 --- a/rust/gui-client/src-tauri/src/ipc.rs +++ b/rust/gui-client/src-tauri/src/ipc.rs @@ -1,9 +1,8 @@ +//! Defines a reusable, bi-directional, cross-platform IPC framework that uses JSON for message serialisation. + use anyhow::{Context as _, Result}; -use connlib_model::{ResourceId, ResourceView}; -use futures::SinkExt; use platform::{ClientStream, ServerStream}; -use secrecy::{ExposeSecret, SecretString}; -use std::{collections::BTreeSet, io, net::IpAddr}; +use serde::{Serialize, de::DeserializeOwned}; use tokio::io::{ReadHalf, WriteHalf}; use tokio_util::{ bytes::BytesMut, @@ -12,10 +11,10 @@ use tokio_util::{ pub(crate) use platform::Server; -pub type ClientRead = FramedRead, Decoder>; -pub type ClientWrite = FramedWrite, Encoder>; -pub(crate) type ServerRead = FramedRead, Decoder>; -pub(crate) type ServerWrite = FramedWrite, Encoder>; +pub type ClientRead = FramedRead, Decoder>; +pub type ClientWrite = FramedWrite, Encoder>; +pub(crate) type ServerRead = FramedRead, Decoder>; +pub(crate) type ServerWrite = FramedWrite, Encoder>; #[cfg(target_os = "linux")] #[path = "ipc/linux.rs"] @@ -29,62 +28,6 @@ pub(crate) mod platform; #[path = "ipc/macos.rs"] pub(crate) mod platform; -pub struct Client { - // Needed temporarily to avoid a big refactor. We can remove this in the future. - tx: ClientWrite, -} - -impl Client { - pub async fn new() -> Result<(Self, ClientRead)> { - tracing::debug!( - client_pid = std::process::id(), - "Connecting to IPC service..." - ); - let (rx, tx) = connect_to_service(ServiceId::Prod).await?; - - Ok((Self { tx }, rx)) - } - - pub async fn disconnect_from_ipc(mut self) -> Result<()> { - self.tx.close().await?; - Ok(()) - } - - pub async fn send_msg(&mut self, msg: &ClientMsg) -> Result<()> { - self.tx - .send(msg) - .await - .context("Couldn't send IPC message")?; - Ok(()) - } - - pub async fn connect_to_firezone(&mut self, api_url: &str, token: SecretString) -> Result<()> { - let token = token.expose_secret().clone(); - self.send_msg(&ClientMsg::Connect { - api_url: api_url.to_string(), - token, - }) - .await - .context("Couldn't send Connect message")?; - Ok(()) - } - - pub async fn reset(&mut self) -> Result<()> { - self.send_msg(&ClientMsg::Reset) - .await - .context("Couldn't send Reset")?; - Ok(()) - } - - /// Tell connlib about the system's default resolvers - pub async fn set_dns(&mut self, dns: Vec) -> Result<()> { - self.send_msg(&ClientMsg::SetDns(dns)) - .await - .context("Couldn't send SetDns")?; - Ok(()) - } -} - #[derive(Debug, thiserror::Error)] #[error("Couldn't find IPC service `{0}`")] pub struct NotFound(String); @@ -108,15 +51,19 @@ pub struct NotFound(String); /// on all platforms. /// /// Because the paths are so different (and Windows actually uses a `String`), -/// we have this `ServiceId` abstraction instead of just a `PathBuf`. -#[derive(Clone, Copy)] -pub enum ServiceId { - /// The IPC service used by Firezone GUI Client in production +/// we have this [`SocketId`] abstraction instead of just a `PathBuf`. +#[derive(Clone, Copy, Debug)] +pub enum SocketId { + /// The IPC socket used by Firezone GUI Client in production to connect to the tunnel service. /// /// This must go in `/run/dev.firezone.client` on Linux, which requires /// root permission - Prod, - /// An IPC service used for unit tests. + Tunnel, + /// The IPC socket used by the Firezone GUI Client in production to connect to an already running instance. + /// + /// This is used for deeplinks and duplicate launch handling. + Gui, + /// An IPC socket used for unit tests. /// /// This must go in `/run/user/$UID/dev.firezone.client` on Linux so /// the unit tests won't need root. @@ -181,16 +128,37 @@ impl tokio_util::codec::Encoder<&E> for Encoder { } } -/// Connect to the IPC service -/// -/// Public because the GUI Client will need it -pub async fn connect_to_service(id: ServiceId) -> Result<(ClientRead, ClientWrite)> { +pub struct ConnectOptions { + pub num_attempts: usize, +} + +impl Default for ConnectOptions { + fn default() -> Self { + Self { num_attempts: 10 } + } +} + +/// Attempt to connect to an IPC socket. +pub async fn connect( + id: SocketId, + options: ConnectOptions, +) -> Result<(ClientRead, ClientWrite)> +where + R: DeserializeOwned, + W: Serialize, +{ + tracing::debug!( + ?id, + client_pid = std::process::id(), + "Connecting to IPC socket" + ); + // This is how ChatGPT recommended, and I couldn't think of any more clever // way before I asked it. let mut last_err = None; - for _ in 0..10 { - match platform::connect_to_service(id).await { + for _ in 0..options.num_attempts { + match platform::connect_to_socket(id).await { Ok(stream) => { let (rx, tx) = tokio::io::split(stream); let rx = FramedRead::new(rx, Decoder::default()); @@ -198,7 +166,7 @@ pub async fn connect_to_service(id: ServiceId) -> Result<(ClientRead, ClientWrit return Ok((rx, tx)); } Err(error) => { - tracing::debug!("Couldn't connect to IPC service: {error}"); + tracing::debug!("Couldn't connect to IPC socket: {error}"); last_err = Some(error); // This won't come up much for humans but it helps the automated @@ -211,7 +179,13 @@ pub async fn connect_to_service(id: ServiceId) -> Result<(ClientRead, ClientWrit } impl platform::Server { - pub(crate) async fn next_client_split(&mut self) -> Result<(ServerRead, ServerWrite)> { + pub(crate) async fn next_client_split( + &mut self, + ) -> Result<(ServerRead, ServerWrite)> + where + R: DeserializeOwned, + W: Serialize, + { let (rx, tx) = tokio::io::split(self.next_client().await?); let rx = FramedRead::new(rx, Decoder::default()); let tx = FramedWrite::new(tx, Encoder::default()); @@ -219,70 +193,6 @@ impl platform::Server { } } -#[derive(Debug, PartialEq, serde::Deserialize, serde::Serialize)] -pub enum ClientMsg { - ClearLogs, - Connect { - api_url: String, - token: String, - }, - Disconnect, - ApplyLogFilter { - directives: String, - }, - Reset, - SetDns(Vec), - SetDisabledResources(BTreeSet), - StartTelemetry { - environment: String, - release: String, - account_slug: Option, - }, -} - -/// Messages that end up in the GUI, either forwarded from connlib or from the IPC service. -#[derive(Debug, serde::Deserialize, serde::Serialize, strum::Display)] -pub enum ServerMsg { - /// The IPC service finished clearing its log dir. - ClearedLogs(Result<(), String>), - ConnectResult(Result<(), Error>), - DisconnectedGracefully, - OnDisconnect { - error_msg: String, - is_authentication_error: bool, - }, - OnUpdateResources(Vec), - /// The IPC service is terminating, maybe due to a software update - /// - /// This is a hint that the Client should exit with a message like, - /// "Firezone is updating, please restart the GUI" instead of an error like, - /// "IPC connection closed". - TerminatingGracefully, - /// The interface and tunnel are ready for traffic. - TunnelReady, -} - -// All variants are `String` because almost no error type implements `Serialize` -#[derive(Debug, serde::Deserialize, serde::Serialize, thiserror::Error)] -pub enum Error { - #[error("IO error: {0}")] - Io(String), - #[error("{0}")] - Other(String), -} - -impl From for Error { - fn from(v: io::Error) -> Self { - Self::Io(v.to_string()) - } -} - -impl From for Error { - fn from(v: anyhow::Error) -> Self { - Self::Other(format!("{v:#}")) - } -} - #[cfg(test)] mod tests { use super::{platform::Server, *}; @@ -294,33 +204,46 @@ mod tests { #[tokio::test] async fn no_such_service() -> Result<()> { let _guard = firezone_logging::test("trace"); - const ID: ServiceId = ServiceId::Test("H56FRXVH"); + const ID: SocketId = SocketId::Test("H56FRXVH"); - if super::connect_to_service(ID).await.is_ok() { + if super::connect::<(), ()>(ID, super::ConnectOptions::default()) + .await + .is_ok() + { bail!("`connect_to_service` should have failed for a non-existent service"); } Ok(()) } + #[derive(Debug, PartialEq, serde::Deserialize, serde::Serialize)] + enum ClientMsg { + Foo, + } + + #[derive(Debug, PartialEq, serde::Deserialize, serde::Serialize)] + enum ServerMsg { + Bar, + } + /// Make sure the IPC client and server can exchange messages #[tokio::test] async fn smoke() -> Result<()> { let _guard = firezone_logging::test("trace"); let loops = 10; - const ID: ServiceId = ServiceId::Test("OB5SZCGN"); + const ID: SocketId = SocketId::Test("OB5SZCGN"); let mut server = Server::new(ID).expect("Error while starting IPC server"); let server_task: tokio::task::JoinHandle> = tokio::spawn(async move { for _ in 0..loops { let (mut rx, mut tx) = server - .next_client_split() + .next_client_split::() .await .expect("Error while waiting for next IPC client"); while let Some(req) = rx.next().await { let req = req.expect("Error while reading from IPC client"); - ensure!(req == ClientMsg::Reset); - tx.send(&ServerMsg::OnUpdateResources(vec![])) + ensure!(req == ClientMsg::Foo); + tx.send(&ServerMsg::Bar) .await .expect("Error while writing to IPC client"); } @@ -331,11 +254,11 @@ mod tests { let client_task: JoinHandle> = tokio::spawn(async move { for _ in 0..loops { - let (mut rx, mut tx) = super::connect_to_service(ID) + let (mut rx, mut tx) = super::connect(ID, super::ConnectOptions::default()) .await .context("Error while connecting to IPC server")?; - let req = ClientMsg::Reset; + let req = ClientMsg::Foo; for _ in 0..10 { tx.send(&req) .await @@ -345,7 +268,7 @@ mod tests { .await .expect("Should have gotten a reply from the IPC server") .expect("Error while reading from IPC server"); - ensure!(matches!(resp, ServerMsg::OnUpdateResources(_))); + ensure!(matches!(resp, ServerMsg::Bar)); } } Ok(()) @@ -388,7 +311,7 @@ mod tests { async fn loop_to_next_client() -> Result<()> { let _guard = firezone_logging::test("trace"); - let mut server = Server::new(ServiceId::Test("H6L73DG5"))?; + let mut server = Server::new(SocketId::Test("H6L73DG5"))?; for i in 0..5 { if let Ok(Err(err)) = timeout(Duration::from_secs(1), server.next_client()).await { Err(err).with_context(|| { diff --git a/rust/gui-client/src-tauri/src/ipc/linux.rs b/rust/gui-client/src-tauri/src/ipc/linux.rs index a6707ac00..9dd89d76d 100644 --- a/rust/gui-client/src-tauri/src/ipc/linux.rs +++ b/rust/gui-client/src-tauri/src/ipc/linux.rs @@ -1,4 +1,4 @@ -use super::{NotFound, ServiceId}; +use super::{NotFound, SocketId}; use anyhow::{Context as _, Result}; use firezone_bin_shared::BUNDLE_ID; use std::{io::ErrorKind, os::unix::fs::PermissionsExt, path::PathBuf}; @@ -6,6 +6,17 @@ use tokio::net::{UnixListener, UnixStream}; pub(crate) struct Server { listener: UnixListener, + id: SocketId, +} + +impl Drop for Server { + fn drop(&mut self) { + let path = ipc_path(self.id); + + if let Err(e) = std::fs::remove_file(&path) { + tracing::debug!(path = %path.display(), "Failed to delete IPC socket: {e}"); + } + } } /// Alias for the client's half of a platform-specific IPC stream @@ -18,7 +29,7 @@ pub(crate) type ServerStream = UnixStream; /// Connect to the IPC service #[expect(clippy::wildcard_enum_match_arm)] -pub async fn connect_to_service(id: ServiceId) -> Result { +pub async fn connect_to_socket(id: SocketId) -> Result { let path = ipc_path(id); let stream = UnixStream::connect(&path) .await @@ -41,7 +52,7 @@ pub async fn connect_to_service(id: ServiceId) -> Result { impl Server { /// Platform-specific setup - pub(crate) fn new(id: ServiceId) -> Result { + pub(crate) fn new(id: SocketId) -> Result { let sock_path = ipc_path(id); tracing::debug!(socket = %sock_path.display(), "Creating new IPC server"); @@ -61,11 +72,10 @@ impl Server { // TODO: Change this to `notify_service_controller` and put it in // the same place in the IPC service's main loop as in the Headless Client. sd_notify::notify(true, &[sd_notify::NotifyState::Ready])?; - Ok(Self { listener }) + Ok(Self { listener, id }) } pub(crate) async fn next_client(&mut self) -> Result { - tracing::info!("Listening for GUI to connect over IPC..."); let (stream, _) = self.listener.accept().await?; let cred = stream.peer_cred()?; tracing::info!( @@ -87,11 +97,14 @@ impl Server { /// Also systemd can create this dir with the `RuntimeDir=` directive which is nice. /// /// Test sockets live in e.g. `/run/user/1000/dev.firezone.client/data/` -fn ipc_path(id: ServiceId) -> PathBuf { +fn ipc_path(id: SocketId) -> PathBuf { match id { - ServiceId::Prod => PathBuf::from("/run").join(BUNDLE_ID).join("ipc.sock"), + SocketId::Tunnel => PathBuf::from("/run").join(BUNDLE_ID).join("tunnel.sock"), + SocketId::Gui => firezone_bin_shared::known_dirs::runtime() + .expect("`known_dirs::runtime()` should always work") + .join("gui.sock"), #[cfg(test)] - ServiceId::Test(id) => firezone_bin_shared::known_dirs::runtime() + SocketId::Test(id) => firezone_bin_shared::known_dirs::runtime() .expect("`known_dirs::runtime()` should always work") .join(format!("ipc_test_{id}.sock")), } diff --git a/rust/gui-client/src-tauri/src/ipc/macos.rs b/rust/gui-client/src-tauri/src/ipc/macos.rs index 809472e7e..e748e8ca9 100644 --- a/rust/gui-client/src-tauri/src/ipc/macos.rs +++ b/rust/gui-client/src-tauri/src/ipc/macos.rs @@ -1,4 +1,4 @@ -use super::ServiceId; +use super::SocketId; use anyhow::{Result, bail}; use tokio::net::UnixStream; @@ -11,12 +11,12 @@ pub(crate) type ServerStream = UnixStream; clippy::unused_async, reason = "Signture must match other operating systems" )] -pub async fn connect_to_service(_id: ServiceId) -> Result { +pub async fn connect_to_socket(_id: SocketId) -> Result { bail!("not implemented") } impl Server { - pub(crate) fn new(_id: ServiceId) -> Result { + pub(crate) fn new(_id: SocketId) -> Result { bail!("not implemented") } diff --git a/rust/gui-client/src-tauri/src/ipc/windows.rs b/rust/gui-client/src-tauri/src/ipc/windows.rs index be5d0e4a9..e96770dd9 100644 --- a/rust/gui-client/src-tauri/src/ipc/windows.rs +++ b/rust/gui-client/src-tauri/src/ipc/windows.rs @@ -1,4 +1,4 @@ -use super::{NotFound, ServiceId}; +use super::{NotFound, SocketId}; use anyhow::{Context as _, Result, bail}; use firezone_bin_shared::BUNDLE_ID; use std::{ffi::c_void, io::ErrorKind, os::windows::io::AsRawHandle, time::Duration}; @@ -19,12 +19,12 @@ pub type ClientStream = named_pipe::NamedPipeClient; /// Alias for the server's half of a platform-specific IPC stream pub(crate) type ServerStream = named_pipe::NamedPipeServer; -/// Connect to the IPC service +/// Connect to an IPC socket. /// /// This is async on Linux #[expect(clippy::unused_async)] #[expect(clippy::wildcard_enum_match_arm)] -pub(crate) async fn connect_to_service(id: ServiceId) -> Result { +pub(crate) async fn connect_to_socket(id: SocketId) -> Result { let path = ipc_path(id); let stream = named_pipe::ClientOptions::new() .open(&path) @@ -47,7 +47,7 @@ pub(crate) async fn connect_to_service(id: ServiceId) -> Result { impl Server { /// Platform-specific setup #[expect(clippy::unnecessary_wraps, reason = "Linux impl is fallible")] - pub(crate) fn new(id: ServiceId) -> Result { + pub(crate) fn new(id: SocketId) -> Result { let pipe_path = ipc_path(id); Ok(Self { pipe_path }) } @@ -64,10 +64,6 @@ impl Server { .bind_to_pipe() .await .context("Couldn't bind to named pipe")?; - tracing::debug!( - server_pid = std::process::id(), - "Listening for GUI to connect over IPC..." - ); // Note that Tokio has no `poll_connect` server .connect() @@ -153,12 +149,13 @@ fn create_pipe_server(pipe_path: &str) -> Result String { +/// Named pipe for an IPC connection +fn ipc_path(id: SocketId) -> String { let name = match id { - ServiceId::Prod => format!("{BUNDLE_ID}.ipc_service"), + SocketId::Tunnel => format!("{BUNDLE_ID}_tunnel.ipc"), + SocketId::Gui => format!("{BUNDLE_ID}_gui.ipc"), #[cfg(test)] - ServiceId::Test(id) => format!("{BUNDLE_ID}_test_{id}.ipc_service"), + SocketId::Test(id) => format!("{BUNDLE_ID}_test_{id}.ipc"), }; named_pipe_path(&name) } @@ -177,7 +174,7 @@ pub fn named_pipe_path(id: &str) -> String { #[cfg(test)] mod tests { - use super::{Server, ServiceId}; + use super::{Server, SocketId}; use anyhow::Context as _; use futures::StreamExt; @@ -191,23 +188,24 @@ mod tests { #[test] fn ipc_path() { - assert!(super::ipc_path(ServiceId::Prod).starts_with(r"\\.\pipe\")); + assert!(super::ipc_path(SocketId::Tunnel).starts_with(r"\\.\pipe\")); } #[tokio::test] async fn single_instance() -> anyhow::Result<()> { let _guard = firezone_logging::test("trace"); - const ID: ServiceId = ServiceId::Test("2GOCMPBG"); + const ID: SocketId = SocketId::Test("2GOCMPBG"); let mut server_1 = Server::new(ID)?; let pipe_path = server_1.pipe_path.clone(); tokio::spawn(async move { - let (mut rx, _tx) = server_1.next_client_split().await?; + let (mut rx, _tx) = server_1.next_client_split::<(), ()>().await?; rx.next().await; Ok::<_, anyhow::Error>(()) }); - let (_rx, _tx) = crate::ipc::connect_to_service(ID).await?; + let (_rx, _tx) = + crate::ipc::connect::<(), ()>(ID, crate::ipc::ConnectOptions::default()).await?; match super::create_pipe_server(&pipe_path) { Err(super::PipeError::AccessDenied) => {} diff --git a/rust/gui-client/src-tauri/src/service.rs b/rust/gui-client/src-tauri/src/service.rs index 2a5cbd2fb..d886e9f5c 100644 --- a/rust/gui-client/src-tauri/src/service.rs +++ b/rust/gui-client/src-tauri/src/service.rs @@ -1,7 +1,9 @@ +use crate::ipc::{self, SocketId}; use anyhow::{Context as _, Result, bail}; use atomicwrites::{AtomicFile, OverwriteBehavior}; use backoff::ExponentialBackoffBuilder; use client_shared::ConnlibMsg; +use connlib_model::{ResourceId, ResourceView}; use firezone_bin_shared::{ DnsControlMethod, DnsController, TunDeviceManager, device_id, device_info, known_dirs, platform::{tcp_socket_factory, udp_socket_factory}, @@ -16,7 +18,14 @@ use futures::{ }; use phoenix_channel::{DeviceInfo, LoginUrl, PhoenixChannel, get_user_agent}; use secrecy::{Secret, SecretString}; -use std::{io::Write, pin::pin, sync::Arc, time::Duration}; +use std::{ + collections::BTreeSet, + io::{self, Write}, + net::IpAddr, + pin::pin, + sync::Arc, + time::Duration, +}; use tokio::{sync::mpsc, time::Instant}; use url::Url; @@ -34,7 +43,69 @@ mod platform; pub use platform::{elevation_check, install, run}; -use crate::ipc::{self, ServiceId}; +#[derive(Debug, PartialEq, serde::Deserialize, serde::Serialize)] +pub enum ClientMsg { + ClearLogs, + Connect { + api_url: String, + token: String, + }, + Disconnect, + ApplyLogFilter { + directives: String, + }, + Reset, + SetDns(Vec), + SetDisabledResources(BTreeSet), + StartTelemetry { + environment: String, + release: String, + account_slug: Option, + }, +} + +/// Messages that end up in the GUI, either forwarded from connlib or from the IPC service. +#[derive(Debug, serde::Deserialize, serde::Serialize, strum::Display)] +pub enum ServerMsg { + /// The IPC service finished clearing its log dir. + ClearedLogs(Result<(), String>), + ConnectResult(Result<(), ConnectError>), + DisconnectedGracefully, + OnDisconnect { + error_msg: String, + is_authentication_error: bool, + }, + OnUpdateResources(Vec), + /// The IPC service is terminating, maybe due to a software update + /// + /// This is a hint that the Client should exit with a message like, + /// "Firezone is updating, please restart the GUI" instead of an error like, + /// "IPC connection closed". + TerminatingGracefully, + /// The interface and tunnel are ready for traffic. + TunnelReady, +} + +// All variants are `String` because almost no error type implements `Serialize` +#[derive(Debug, serde::Deserialize, serde::Serialize, thiserror::Error)] +pub enum ConnectError { + #[error("IO error: {0}")] + Io(String), + #[error("{0}")] + Other(String), +} + +impl From for ConnectError { + fn from(v: io::Error) -> Self { + Self::Io(v.to_string()) + } +} + +impl From for ConnectError { + fn from(v: anyhow::Error) -> Self { + Self::Other(format!("{v:#}")) + } +} /// Run the IPC service and terminate gracefully if we catch a terminate signal /// @@ -54,7 +125,7 @@ async fn ipc_listen( Telemetry::set_firezone_id(firezone_id); - let mut server = ipc::Server::new(ServiceId::Prod)?; + let mut server = ipc::Server::new(SocketId::Tunnel)?; let mut dns_controller = DnsController { dns_control_method }; loop { let mut handler_fut = pin!(Handler::new( @@ -90,8 +161,8 @@ async fn ipc_listen( /// Handles one IPC client struct Handler<'a> { dns_controller: &'a mut DnsController, - ipc_rx: ipc::ServerRead, - ipc_tx: ipc::ServerWrite, + ipc_rx: ipc::ServerRead, + ipc_tx: ipc::ServerWrite, last_connlib_start_instant: Option, log_filter_reloader: &'a FilterReloadHandle, session: Option, @@ -107,7 +178,7 @@ struct Session { enum Event { Callback(ConnlibMsg), CallbackChannelClosed, - Ipc(ipc::ClientMsg), + Ipc(ClientMsg), IpcDisconnected, IpcError(anyhow::Error), Terminate, @@ -129,6 +200,12 @@ impl<'a> Handler<'a> { telemetry: &'a mut Telemetry, ) -> Result { dns_controller.deactivate()?; + + tracing::info!( + server_pid = std::process::id(), + "Listening for GUI to connect over IPC..." + ); + let (ipc_rx, ipc_tx) = server .next_client_split() .await @@ -189,7 +266,7 @@ impl<'a> Handler<'a> { "Caught SIGINT / SIGTERM / Ctrl+C while an IPC client is connected" ); // Ignore the result here because we're terminating anyway. - let _ = self.send_ipc(ipc::ServerMsg::TerminatingGracefully).await; + let _ = self.send_ipc(ServerMsg::TerminatingGracefully).await; break HandlerOk::ServiceTerminating; } } @@ -233,7 +310,7 @@ impl<'a> Handler<'a> { } => { let _ = self.session.take(); self.dns_controller.deactivate()?; - self.send_ipc(ipc::ServerMsg::OnDisconnect { + self.send_ipc(ServerMsg::OnDisconnect { error_msg, is_authentication_error, }) @@ -255,48 +332,45 @@ impl<'a> Handler<'a> { self.tun_device.set_routes(ipv4_routes, ipv6_routes).await?; self.dns_controller.flush()?; - self.send_ipc(ipc::ServerMsg::TunnelReady).await?; + self.send_ipc(ServerMsg::TunnelReady).await?; } ConnlibMsg::OnUpdateResources(resources) => { // On every resources update, flush DNS to mitigate self.dns_controller.flush()?; - self.send_ipc(ipc::ServerMsg::OnUpdateResources(resources)) + self.send_ipc(ServerMsg::OnUpdateResources(resources)) .await?; } } Ok(()) } - async fn handle_ipc_msg(&mut self, msg: ipc::ClientMsg) -> Result<()> { + async fn handle_ipc_msg(&mut self, msg: ClientMsg) -> Result<()> { match msg { - ipc::ClientMsg::ClearLogs => { + ClientMsg::ClearLogs => { let result = crate::clear_logs( &firezone_bin_shared::known_dirs::ipc_service_logs() .context("Can't compute logs dir")?, ) .await; - self.send_ipc(ipc::ServerMsg::ClearedLogs( - result.map_err(|e| e.to_string()), - )) - .await? + self.send_ipc(ServerMsg::ClearedLogs(result.map_err(|e| e.to_string()))) + .await? } - ipc::ClientMsg::Connect { api_url, token } => { + ClientMsg::Connect { api_url, token } => { // Warning: Connection errors don't bubble to callers of `handle_ipc_msg`. let token = secrecy::SecretString::from(token); let result = self.connect_to_firezone(&api_url, token); - self.send_ipc(ipc::ServerMsg::ConnectResult(result)).await? + self.send_ipc(ServerMsg::ConnectResult(result)).await? } - ipc::ClientMsg::Disconnect => { + ClientMsg::Disconnect => { if self.session.take().is_some() { self.dns_controller.deactivate()?; } // Always send `DisconnectedGracefully` even if we weren't connected, // so this will be idempotent. - self.send_ipc(ipc::ServerMsg::DisconnectedGracefully) - .await?; + self.send_ipc(ServerMsg::DisconnectedGracefully).await?; } - ipc::ClientMsg::ApplyLogFilter { directives } => { + ClientMsg::ApplyLogFilter { directives } => { self.log_filter_reloader.reload(&directives)?; let path = known_dirs::ipc_log_filter()?; @@ -307,7 +381,7 @@ impl<'a> Handler<'a> { tracing::warn!(path = %path.display(), %directives, "Failed to write new log directives: {}", err_with_src(&e)); } } - ipc::ClientMsg::Reset => { + ClientMsg::Reset => { if self.last_connlib_start_instant.is_some() { tracing::debug!("Ignoring reset since we're still signing in"); return Ok(()); @@ -319,7 +393,7 @@ impl<'a> Handler<'a> { session.connlib.reset(); } - ipc::ClientMsg::SetDns(resolvers) => { + ClientMsg::SetDns(resolvers) => { let Some(session) = self.session.as_ref() else { tracing::debug!("Cannot set DNS resolvers if we're signed out"); return Ok(()); @@ -328,7 +402,7 @@ impl<'a> Handler<'a> { tracing::debug!(?resolvers); session.connlib.set_dns(resolvers); } - ipc::ClientMsg::SetDisabledResources(disabled_resources) => { + ClientMsg::SetDisabledResources(disabled_resources) => { let Some(session) = self.session.as_ref() else { // At this point, the GUI has already saved the disabled Resources to disk, so it'll be correct on the next sign-in anyway. tracing::debug!("Cannot set disabled resources if we're signed out"); @@ -337,7 +411,7 @@ impl<'a> Handler<'a> { session.connlib.set_disabled_resources(disabled_resources); } - ipc::ClientMsg::StartTelemetry { + ClientMsg::StartTelemetry { environment, release, account_slug, @@ -362,7 +436,7 @@ impl<'a> Handler<'a> { &mut self, api_url: &str, token: SecretString, - ) -> Result<(), ipc::Error> { + ) -> Result<(), ConnectError> { let _connect_span = telemetry_span!("connect_to_firezone").entered(); assert!(self.session.is_none()); @@ -430,7 +504,7 @@ impl<'a> Handler<'a> { Ok(()) } - async fn send_ipc(&mut self, msg: ipc::ServerMsg) -> Result<()> { + async fn send_ipc(&mut self, msg: ServerMsg) -> Result<()> { self.ipc_tx .send(&msg) .await @@ -476,7 +550,7 @@ pub fn run_debug(dns_control: DnsControlMethod) -> Result<()> { /// This makes the timing neater in case the GUI starts up slowly. #[cfg(debug_assertions)] pub fn run_smoke_test() -> Result<()> { - use crate::ipc::{self, ServiceId}; + use crate::ipc::{self, SocketId}; use anyhow::{Context as _, bail}; use firezone_bin_shared::{DnsController, device_id}; @@ -500,7 +574,7 @@ pub fn run_smoke_test() -> Result<()> { // Couldn't get the loop to work here yet, so SIGHUP is not implemented rt.block_on(async { device_id::get_or_create().context("Failed to read / create device ID")?; - let mut server = ipc::Server::new(ServiceId::Prod)?; + let mut server = ipc::Server::new(SocketId::Tunnel)?; let _ = Handler::new( &mut server, &mut dns_controller, diff --git a/scripts/tests/linux-group.sh b/scripts/tests/linux-group.sh index 6f73754fb..0f6653e4a 100755 --- a/scripts/tests/linux-group.sh +++ b/scripts/tests/linux-group.sh @@ -8,7 +8,7 @@ source "./scripts/tests/lib.sh" BINARY_NAME=firezone-client-ipc FZ_GROUP="firezone-client" SERVICE_NAME=firezone-client-ipc -SOCKET=/run/dev.firezone.client/ipc.sock +SOCKET=/run/dev.firezone.client/tunnel.sock export RUST_LOG=info cd rust || exit 1