diff --git a/rust/gui-client/src-tauri/src/bin/firezone-gui-client.rs b/rust/gui-client/src-tauri/src/bin/firezone-gui-client.rs index 947422567..f2a3bf46b 100644 --- a/rust/gui-client/src-tauri/src/bin/firezone-gui-client.rs +++ b/rust/gui-client/src-tauri/src/bin/firezone-gui-client.rs @@ -73,7 +73,7 @@ fn main() -> anyhow::Result<()> { firezone_gui_client::logging::setup_stdout()?; let rt = tokio::runtime::Runtime::new()?; - if let Err(error) = rt.block_on(deep_link::open(&deep_link.url)) { + if let Err(error) = rt.block_on(deep_link::open(deep_link.url)) { tracing::error!("Error in `OpenDeepLink`: {error:#}"); } Ok(()) @@ -144,7 +144,7 @@ fn run_gui(config: RunConfig) -> Result<()> { return Err(anyhow); } - if anyhow.root_cause().is::() { + if anyhow.root_cause().is::() { show_error_dialog( "Firezone is already running. If it's not responding, force-stop it.", )?; diff --git a/rust/gui-client/src-tauri/src/controller.rs b/rust/gui-client/src-tauri/src/controller.rs index b06499001..505f0e39f 100644 --- a/rust/gui-client/src-tauri/src/controller.rs +++ b/rust/gui-client/src-tauri/src/controller.rs @@ -1,7 +1,8 @@ use crate::{ auth, deep_link, - gui::system_tray, - ipc, logging, + gui::{self, system_tray}, + ipc::{self, SocketId}, + logging, service, settings::{self, AdvancedSettings}, updates, uptime, }; @@ -11,7 +12,7 @@ use firezone_bin_shared::DnsControlMethod; use firezone_logging::FilterReloadHandle; use firezone_telemetry::Telemetry; use futures::{ - Stream, StreamExt, + SinkExt, Stream, StreamExt, stream::{self, BoxStream}, }; use secrecy::{ExposeSecret as _, SecretString}; @@ -33,8 +34,8 @@ pub struct Controller { auth: auth::Auth, clear_logs_callback: Option>>, ctlr_tx: CtlrTx, - ipc_client: ipc::Client, - ipc_rx: ipc::ClientRead, + ipc_client: ipc::ClientWrite, + ipc_rx: ipc::ClientRead, integration: I, log_filter_reloader: FilterReloadHandle, /// A release that's ready to download @@ -44,6 +45,14 @@ pub struct Controller { updates_rx: ReceiverStream>, uptime: uptime::Tracker, + gui_ipc_clients: BoxStream< + 'static, + Result<( + ipc::ServerRead, + ipc::ServerWrite, + )>, + >, + dns_notifier: BoxStream<'static, Result<()>>, network_notifier: BoxStream<'static, Result<()>>, } @@ -75,7 +84,6 @@ pub enum ControllerRequest { }, Fail(Failure), GetAdvancedSettings(oneshot::Sender), - SchemeRequest(SecretString), SignIn, SystemTrayMenu(system_tray::Event), UpdateNotificationClicked(Url), @@ -167,23 +175,33 @@ impl Status { enum EventloopTick { NetworkChanged(Result<()>), DnsChanged(Result<()>), - IpcMsg(Option>), + IpcMsg(Option>), ControllerRequest(Option), UpdateNotification(Option>), + NewInstanceLaunched( + Option< + Result<( + ipc::ServerRead, + ipc::ServerWrite, + )>, + >, + ), } impl Controller { - pub async fn start( + pub(crate) async fn start( ctlr_tx: CtlrTx, integration: I, rx: mpsc::Receiver, advanced_settings: AdvancedSettings, log_filter_reloader: FilterReloadHandle, updates_rx: mpsc::Receiver>, + gui_ipc: ipc::Server, ) -> Result<()> { tracing::debug!("Starting new instance of `Controller`"); - let (ipc_client, ipc_rx) = ipc::Client::new().await?; + let (ipc_rx, ipc_client) = + ipc::connect(SocketId::Tunnel, ipc::ConnectOptions::default()).await?; let dns_notifier = new_dns_notifier().await?.boxed(); let network_notifier = new_network_notifier().await?.boxed(); @@ -204,6 +222,12 @@ impl Controller { uptime: Default::default(), dns_notifier, network_notifier, + gui_ipc_clients: stream::unfold(gui_ipc, |mut gui_ipc| async move { + let result = gui_ipc.next_client_split().await; + + Some((result, gui_ipc)) + }) + .boxed(), }; controller.main_loop().await?; @@ -234,7 +258,7 @@ impl Controller { EventloopTick::NetworkChanged(Ok(())) => { if self.status.needs_network_changes() { tracing::debug!("Internet up/down changed, calling `Session::reset`"); - self.ipc_client.reset().await? + self.send_ipc(&service::ClientMsg::Reset).await? } self.try_retry_connection().await? @@ -246,7 +270,8 @@ impl Controller { ?resolvers, "New DNS resolvers, calling `Session::set_dns`" ); - self.ipc_client.set_dns(resolvers).await?; + self.send_ipc(&service::ClientMsg::SetDns(resolvers)) + .await?; } self.try_retry_connection().await? @@ -260,7 +285,7 @@ impl Controller { .context("IPC closed")? .context("Failed to read from IPC")?; - match self.handle_ipc_msg(msg).await? { + match self.handle_service_ipc_msg(msg).await? { ControlFlow::Break(()) => break, ControlFlow::Continue(()) => continue, }; @@ -277,12 +302,29 @@ impl Controller { EventloopTick::UpdateNotification(None) => { return Err(anyhow!("Update checker task stopped")); } + EventloopTick::NewInstanceLaunched(None) => { + return Err(anyhow!("GUI IPC socket closed")); + } + EventloopTick::NewInstanceLaunched(Some(Err(e))) => { + tracing::warn!("Failed to accept IPC connection from new GUI instance: {e:#}"); + } + EventloopTick::NewInstanceLaunched(Some(Ok((mut read, mut write)))) => { + let client_msg = read.next().await; + + if let Err(e) = self.handle_gui_ipc_msg(client_msg).await { + tracing::debug!("Failed to handle IPC message from new GUI instance: {e:#}") + } + + if let Err(e) = write.send(&gui::ServerMsg::Ack).await { + tracing::debug!("Failed to ack IPC message from new GUI instance: {e:#}") + } + } } } tracing::debug!("Closing..."); - if let Err(error) = self.ipc_client.disconnect_from_ipc().await { + if let Err(error) = self.ipc_client.close().await { tracing::error!("ipc_client: {error:#}"); } @@ -313,6 +355,10 @@ impl Controller { return Poll::Ready(Some(EventloopTick::UpdateNotification(notification))); } + if let Poll::Ready(new_instance) = self.gui_ipc_clients.poll_next_unpin(cx) { + return Poll::Ready(Some(EventloopTick::NewInstanceLaunched(new_instance))); + } + Poll::Pending }) .await @@ -335,9 +381,12 @@ impl Controller { // Count the start instant from before we connect let start_instant = Instant::now(); - self.ipc_client - .connect_to_firezone(api_url.as_str(), token.expose_secret().clone().into()) - .await?; + self.send_ipc(&service::ClientMsg::Connect { + api_url: api_url.to_string(), + token: token.expose_secret().clone(), + }) + .await?; + // Change the status after we begin connecting self.status = Status::WaitingForPortal { start_instant, @@ -355,18 +404,17 @@ impl Controller { Telemetry::set_account_slug(account_slug); } - self.ipc_client - .send_msg(&ipc::ClientMsg::StartTelemetry { - environment, - release: crate::RELEASE.to_string(), - account_slug, - }) - .await?; + self.send_ipc(&service::ClientMsg::StartTelemetry { + environment, + release: crate::RELEASE.to_string(), + account_slug, + }) + .await?; Ok(()) } - async fn handle_deep_link(&mut self, url: &SecretString) -> Result<()> { + async fn handle_deep_link(&mut self, url: &Url) -> Result<()> { let auth_response = deep_link::parse_auth_callback(url).context("Couldn't parse scheme request")?; @@ -393,11 +441,10 @@ impl Controller { self.advanced_settings = *settings; - self.ipc_client - .send_msg(&ipc::ClientMsg::ApplyLogFilter { - directives: self.advanced_settings.log_filter.clone(), - }) - .await?; + self.send_ipc(&service::ClientMsg::ApplyLogFilter { + directives: self.advanced_settings.log_filter.clone(), + }) + .await?; tracing::debug!("Applied new settings. Log level will take effect immediately."); @@ -413,7 +460,7 @@ impl Controller { if let Err(error) = logging::clear_gui_logs().await { tracing::error!("Failed to clear GUI logs: {error:#}"); } - self.ipc_client.send_msg(&ipc::ClientMsg::ClearLogs).await?; + self.send_ipc(&service::ClientMsg::ClearLogs).await?; self.clear_logs_callback = Some(completion_tx); } Req::ExportLogs { path, stem } => logging::export_logs_to(path, stem) @@ -429,20 +476,6 @@ impl Controller { Req::GetAdvancedSettings(tx) => { tx.send(self.advanced_settings.clone()).ok(); } - Req::SchemeRequest(url) => match self.handle_deep_link(&url).await { - Ok(()) => {} - Err(error) - if error - .root_cause() - .downcast_ref::() - .is_some_and(|e| matches!(e, auth::Error::NoInflightRequest)) => - { - tracing::debug!("Ignoring deep-link; no local state"); - } - Err(error) => { - tracing::error!("`handle_deep_link` failed: {error:#}"); - } - }, Req::SignIn | Req::SystemTrayMenu(system_tray::Event::SignIn) => { let req = self .auth @@ -528,9 +561,7 @@ impl Controller { Req::SystemTrayMenu(system_tray::Event::Quit) => { tracing::info!("User clicked Quit in the menu"); self.status = Status::Quitting; - self.ipc_client - .send_msg(&ipc::ClientMsg::Disconnect) - .await?; + self.send_ipc(&service::ClientMsg::Disconnect).await?; self.refresh_system_tray_menu(); } Req::UpdateNotificationClicked(download_url) => { @@ -543,9 +574,9 @@ impl Controller { Ok(()) } - async fn handle_ipc_msg(&mut self, msg: ipc::ServerMsg) -> Result> { + async fn handle_service_ipc_msg(&mut self, msg: service::ServerMsg) -> Result> { match msg { - ipc::ServerMsg::ClearedLogs(result) => { + service::ServerMsg::ClearedLogs(result) => { let Some(tx) = self.clear_logs_callback.take() else { return Err(anyhow!( "Can't handle `IpcClearedLogs` when there's no callback waiting for a `ClearLogs` result" @@ -554,15 +585,15 @@ impl Controller { tx.send(result) .map_err(|_| anyhow!("Couldn't send `ClearLogs` result to Tauri task"))?; } - ipc::ServerMsg::ConnectResult(result) => { + service::ServerMsg::ConnectResult(result) => { self.handle_connect_result(result).await?; } - ipc::ServerMsg::DisconnectedGracefully => { + service::ServerMsg::DisconnectedGracefully => { if let Status::Quitting = self.status { return Ok(ControlFlow::Break(())); } } - ipc::ServerMsg::OnDisconnect { + service::ServerMsg::OnDisconnect { error_msg, is_authentication_error, } => { @@ -583,7 +614,7 @@ impl Controller { .context("Couldn't show Disconnected alert")?; } } - ipc::ServerMsg::OnUpdateResources(resources) => { + service::ServerMsg::OnUpdateResources(resources) => { if !self.status.needs_resource_updates() { return Ok(ControlFlow::Continue(())); } @@ -593,7 +624,7 @@ impl Controller { self.update_disabled_resources().await?; } - ipc::ServerMsg::TerminatingGracefully => { + service::ServerMsg::TerminatingGracefully => { tracing::info!("IPC service exited gracefully"); self.integration .set_tray_icon(system_tray::icon_terminating()); @@ -604,7 +635,7 @@ impl Controller { return Ok(ControlFlow::Break(())); } - ipc::ServerMsg::TunnelReady => { + service::ServerMsg::TunnelReady => { let Status::WaitingForTunnel { start_instant } = self.status else { // If we are not waiting for a tunnel, continue. return Ok(ControlFlow::Continue(())); @@ -622,7 +653,41 @@ impl Controller { Ok(ControlFlow::Continue(())) } - async fn handle_connect_result(&mut self, result: Result<(), ipc::Error>) -> Result<()> { + async fn handle_gui_ipc_msg( + &mut self, + maybe_msg: Option>, + ) -> Result<()> { + let client_msg = maybe_msg + .context("No message received")? + .context("Failed to read message")?; + + match client_msg { + gui::ClientMsg::Deeplink(url) => match self.handle_deep_link(&url).await { + Ok(()) => {} + Err(error) + if error + .root_cause() + .downcast_ref::() + .is_some_and(|e| matches!(e, auth::Error::NoInflightRequest)) => + { + tracing::debug!("Ignoring deep-link; no local state"); + } + Err(error) => { + tracing::error!("`handle_deep_link` failed: {error:#}"); + } + }, + gui::ClientMsg::NewInstance => { + tracing::debug!("A new instance of Firezone has been launched") + } + } + + Ok(()) + } + + async fn handle_connect_result( + &mut self, + result: Result<(), service::ConnectError>, + ) -> Result<()> { let Status::WaitingForPortal { start_instant, token, @@ -642,7 +707,7 @@ impl Controller { self.refresh_system_tray_menu(); Ok(()) } - Err(ipc::Error::Io(error)) => { + Err(service::ConnectError::Io(error)) => { // This is typically something like, we don't have Internet access so we can't // open the PhoenixChannel's WebSocket. tracing::info!( @@ -655,7 +720,7 @@ impl Controller { self.refresh_system_tray_menu(); Ok(()) } - Err(ipc::Error::Other(error)) => { + Err(service::ConnectError::Other(error)) => { // We log this here directly instead of forwarding it because errors hard-abort the event-loop and we still want to be able to export logs and stuff. // See . tracing::error!("Failed to connect to Firezone: {error}"); @@ -709,9 +774,10 @@ impl Controller { disabled_resources.insert(internet_resource.id()); } - self.ipc_client - .send_msg(&ipc::ClientMsg::SetDisabledResources(disabled_resources)) - .await?; + self.send_ipc(&service::ClientMsg::SetDisabledResources( + disabled_resources, + )) + .await?; self.refresh_system_tray_menu(); Ok(()) @@ -791,12 +857,17 @@ impl Controller { tracing::debug!("disconnecting connlib"); // This is redundant if the token is expired, in that case // connlib already disconnected itself. - self.ipc_client - .send_msg(&ipc::ClientMsg::Disconnect) - .await?; + self.send_ipc(&service::ClientMsg::Disconnect).await?; self.refresh_system_tray_menu(); Ok(()) } + + async fn send_ipc(&mut self, msg: &service::ClientMsg) -> Result<()> { + self.ipc_client + .send(msg) + .await + .context("Failed to send IPC message") + } } async fn new_dns_notifier() -> Result>> { diff --git a/rust/gui-client/src-tauri/src/deep_link.rs b/rust/gui-client/src-tauri/src/deep_link.rs index 4f9bcc9a4..a36ecfeb5 100644 --- a/rust/gui-client/src-tauri/src/deep_link.rs +++ b/rust/gui-client/src-tauri/src/deep_link.rs @@ -3,9 +3,15 @@ // The IPC parts use the same primitives as the IPC service, UDS on Linux // and named pipes on Windows, so TODO de-dupe the IPC code -use crate::auth; +use crate::{ + auth, + gui::{self, ServerMsg}, + ipc::SocketId, +}; use anyhow::{Context as _, Result, bail}; -use secrecy::{ExposeSecret, SecretString}; +use futures::SinkExt as _; +use secrecy::SecretString; +use tokio_stream::StreamExt as _; use url::Url; #[cfg(any(target_os = "linux", target_os = "windows"))] @@ -24,17 +30,35 @@ mod imp; #[path = "deep_link/windows.rs"] mod imp; -#[derive(thiserror::Error, Debug)] -#[error("named pipe server couldn't start listening, we are probably the second instance")] -pub struct CantListen; +pub use imp::register; -pub use imp::{Server, open, register}; +pub async fn open(url: url::Url) -> Result<()> { + let (mut read, mut write) = crate::ipc::connect::( + SocketId::Gui, + crate::ipc::ConnectOptions::default(), + ) + .await?; + + write + .send(&gui::ClientMsg::Deeplink(url)) + .await + .context("Failed to send deep-link")?; + + let response = read + .next() + .await + .context("No response received")? + .context("Failed to receive response")?; + + anyhow::ensure!(response == ServerMsg::Ack); + + Ok(()) +} /// Parses a deep-link URL into a struct. /// /// e.g. `firezone-fd0020211111://handle_client_sign_in_callback/?state=secret&fragment=secret&account_name=Firezone&account_slug=firezone&actor_name=Jane+Doe&identity_provider_identifier=secret` -pub(crate) fn parse_auth_callback(url_secret: &SecretString) -> Result { - let url = Url::parse(url_secret.expose_secret())?; +pub(crate) fn parse_auth_callback(url: &Url) -> Result { if Some(url::Host::Domain("handle_client_sign_in_callback")) != url.host() { bail!("URL host should be `handle_client_sign_in_callback`"); } @@ -92,8 +116,8 @@ pub(crate) fn parse_auth_callback(url_secret: &SecretString) -> Result Result<()> { @@ -144,28 +168,6 @@ mod tests { } fn parse_callback_wrapper(s: &str) -> Result { - super::parse_auth_callback(&SecretString::new(s.to_owned())) - } - - /// Tests the named pipe or Unix domain socket, doesn't test the URI scheme itself - /// - /// Will fail if any other Firezone Client instance is running - /// Will fail with permission error if Firezone already ran as sudo - #[tokio::test] - async fn socket_smoke_test() -> Result<()> { - let server = Server::new().await.context("Couldn't start Server")?; - let server_task = tokio::spawn(async move { - let bytes = server.accept().await?; - Ok::<_, anyhow::Error>(bytes) - }); - let id = uuid::Uuid::new_v4().to_string(); - let expected_url = url::Url::parse(&format!("bogus-test-schema://{id}"))?; - super::open(&expected_url).await?; - - let bytes = server_task.await??.unwrap(); - let s = std::str::from_utf8(bytes.expose_secret())?; - let url = url::Url::parse(s)?; - assert_eq!(url, expected_url); - Ok(()) + super::parse_auth_callback(&s.parse()?) } } diff --git a/rust/gui-client/src-tauri/src/deep_link/linux.rs b/rust/gui-client/src-tauri/src/deep_link/linux.rs index 33623e1c4..fa8a1eff1 100644 --- a/rust/gui-client/src-tauri/src/deep_link/linux.rs +++ b/rust/gui-client/src-tauri/src/deep_link/linux.rs @@ -1,97 +1,5 @@ use anyhow::{Context, Result, bail}; -use firezone_bin_shared::known_dirs; -use secrecy::{ExposeSecret, Secret}; use std::{io::ErrorKind, path::PathBuf, process::Command}; -use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, - net::{UnixListener, UnixStream}, -}; - -use super::CantListen; - -const SOCK_NAME: &str = "deep_link.sock"; - -pub struct Server { - listener: UnixListener, -} - -fn sock_path() -> Result { - Ok(known_dirs::runtime() - .context("Couldn't find runtime dir")? - .join(SOCK_NAME)) -} - -impl Server { - /// Create a new deep link server to make sure we're the only instance - /// - /// Still uses `thiserror` so we can catch the deep_link `CantListen` error - /// On Windows this uses async because of #5143 and #5566. - #[expect(clippy::unused_async)] - pub async fn new() -> Result { - let path = sock_path()?; - let dir = path - .parent() - .context("Impossible, socket path should always have a parent")?; - - // Try to `connect` to the socket as a client. - // If it succeeds, that means there is already a Firezone instance listening - // as a server on that socket, and we should exit. - // If it fails, it means nobody is listening on the socket, or the - // socket does not exist, in which case we are the only instance - // and should proceed. - if std::os::unix::net::UnixStream::connect(&path).is_ok() { - return Err(anyhow::Error::new(CantListen)); - } - std::fs::remove_file(&path).ok(); - std::fs::create_dir_all(dir).context("Can't create dir for deep link socket")?; - - // TODO: TOCTOU error here. - // It's possible for 2 processes to see the `connect` call fail, then one - // binds the socket, and the other deletes the socket and binds a different - // socket at the same path, resulting in 2 instances with confusing behavior. - // The `bind` call should probably go first, but without more testing and more - // thought, I don't want to re-arrange it yet. - - let listener = UnixListener::bind(&path).context("Couldn't bind listener Unix socket")?; - - Ok(Self { listener }) - } - - /// Await one incoming deep link - /// - /// To match the Windows API, this consumes the `Server`. - pub async fn accept(self) -> Result>>> { - tracing::debug!("deep_link::accept"); - let (mut stream, _) = self.listener.accept().await?; - tracing::debug!("Accepted Unix domain socket connection"); - - // TODO: Limit reads to 4,096 bytes. Partial reads will probably never happen - // since it's a local socket transferring very small data. - let mut bytes = vec![]; - stream - .read_to_end(&mut bytes) - .await - .context("failed to read incoming deep link over Unix socket stream")?; - if bytes.is_empty() { - return Ok(None); - } - let bytes = Secret::new(bytes); - tracing::debug!( - len = bytes.expose_secret().len(), - "Got data from Unix domain socket" - ); - Ok(Some(bytes)) - } -} - -pub async fn open(url: &url::Url) -> Result<()> { - let path = sock_path()?; - let mut stream = UnixStream::connect(&path).await?; - - stream.write_all(url.to_string().as_bytes()).await?; - - Ok(()) -} /// Register a URI scheme so that browser can deep link into our app for auth /// diff --git a/rust/gui-client/src-tauri/src/deep_link/macos.rs b/rust/gui-client/src-tauri/src/deep_link/macos.rs index e4c81b8d4..bc07fd726 100644 --- a/rust/gui-client/src-tauri/src/deep_link/macos.rs +++ b/rust/gui-client/src-tauri/src/deep_link/macos.rs @@ -1,27 +1,5 @@ -use std::path::PathBuf; - use anyhow::{Result, bail}; -use secrecy::Secret; - -pub struct Server {} - -impl Server { - #[expect( - clippy::unused_async, - reason = "Signture must match other operating systems" - )] - pub async fn new() -> Result { - bail!("not implemented") - } - - pub async fn accept(self) -> Result>>> { - futures::future::pending().await - } -} - -pub async fn open(_url: &url::Url) -> Result<()> { - bail!("not implemented") -} +use std::path::PathBuf; pub fn register(_path: PathBuf) -> Result<()> { bail!("not implemented") diff --git a/rust/gui-client/src-tauri/src/deep_link/windows.rs b/rust/gui-client/src-tauri/src/deep_link/windows.rs index 207e1ee3b..71d458973 100644 --- a/rust/gui-client/src-tauri/src/deep_link/windows.rs +++ b/rust/gui-client/src-tauri/src/deep_link/windows.rs @@ -1,111 +1,13 @@ -//! A module for registering, catching, and parsing deep links that are sent over to the app's already-running instance +//! A module for registering deep links that are sent over to the app's already-running instance //! Based on reading some of the Windows code from , which is licensed "MIT OR Apache-2.0" -use super::{CantListen, FZ_SCHEME}; +use super::FZ_SCHEME; use anyhow::{Context, Result}; use firezone_bin_shared::BUNDLE_ID; -use firezone_logging::err_with_src; -use secrecy::Secret; use std::{ io, path::{Path, PathBuf}, - time::Duration, }; -use tokio::{io::AsyncReadExt, io::AsyncWriteExt, net::windows::named_pipe}; - -/// A server for a named pipe, so we can receive deep links from other instances -/// of the client launched by web browsers -pub struct Server { - inner: named_pipe::NamedPipeServer, -} - -impl Server { - /// Construct a server, but don't await client connections yet - /// - /// Panics if there is no Tokio runtime - /// Still uses `thiserror` so we can catch the deep_link `CantListen` error - pub async fn new() -> Result { - // This isn't air-tight - We recreate the whole server on each loop, - // rather than binding 1 socket and accepting many streams like a normal socket API. - // Tokio appears to be following Windows' underlying API here, so not - // much we can do until Unix domain sockets have wide support in Windows. - let server = bind_to_pipe(&pipe_path()).await?; - - tracing::debug!("server is bound"); - Ok(Server { inner: server }) - } - - /// Await one incoming deep link from a named pipe client - /// Tokio's API is strange, so this consumes the server. - /// I assume this is based on the underlying Windows API. - /// I tried re-using the server and it acted strange. The official Tokio - /// examples are not clear on this. - pub async fn accept(mut self) -> Result>>> { - self.inner - .connect() - .await - .context("Couldn't accept connection from named pipe client")?; - tracing::debug!("server got connection"); - - // TODO: Limit the read size here. Our typical callback is 350 bytes, so 4,096 bytes should be more than enough. - // Also, I think `read_to_end` can do partial reads because this is a named pipe, - // not a file. We might need a length-prefixed or newline-terminated format for IPC. - let mut bytes = vec![]; - self.inner - .read_to_end(&mut bytes) - .await - .context("Couldn't read bytes from named pipe client")?; - let bytes = Secret::new(bytes); - - self.inner.disconnect().ok(); - Ok(Some(bytes)) - } -} - -async fn bind_to_pipe(pipe_path: &str) -> Result { - const NUM_ITERS: usize = 10; - // Relating to #5143 and #5566, sometimes re-creating a named pipe server - // in a loop fails. This is copied from `firezone_headless_client::ipc_service::ipc::windows`. - for i in 0..NUM_ITERS { - match create_pipe_server(pipe_path) { - Ok(server) => return Ok(server), - Err(e) => { - tracing::debug!( - "`create_pipe_server` failed {}; sleeping... (attempt {i}/{NUM_ITERS})", - err_with_src(&e) - ); - tokio::time::sleep(Duration::from_secs(1)).await; - } - } - } - Err(anyhow::Error::new(CantListen)) -} - -fn create_pipe_server(pipe_path: &str) -> io::Result { - let mut server_options = named_pipe::ServerOptions::new(); - server_options.first_pipe_instance(true); - - let server = server_options.create(pipe_path)?; - - Ok(server) -} - -/// Open a deep link by sending it to the already-running instance of the app -pub async fn open(url: &url::Url) -> Result<()> { - let path = pipe_path(); - let mut client = named_pipe::ClientOptions::new() - .open(&path) - .with_context(|| format!("Couldn't connect to named pipe server at `{path}`"))?; - client - .write_all(url.as_str().as_bytes()) - .await - .with_context(|| format!("Couldn't write bytes to named pipe server at `{path}`"))?; - Ok(()) -} - -fn pipe_path() -> String { - crate::ipc::platform::named_pipe_path(&format!("{BUNDLE_ID}.deep_link")) -} /// Registers the current exe as the handler for our deep link scheme. /// diff --git a/rust/gui-client/src-tauri/src/gui.rs b/rust/gui-client/src-tauri/src/gui.rs index 0033df034..4d68092db 100644 --- a/rust/gui-client/src-tauri/src/gui.rs +++ b/rust/gui-client/src-tauri/src/gui.rs @@ -6,17 +6,20 @@ use crate::{ about, controller::{Controller, ControllerRequest, CtlrTx, Failure, GuiIntegration}, - deep_link, logging, + deep_link, + ipc::{self, ClientRead, ClientWrite, SocketId}, + logging, settings::{self, AdvancedSettings}, updates, }; use anyhow::{Context, Result, bail}; use firezone_logging::err_with_src; use firezone_telemetry as telemetry; -use secrecy::{ExposeSecret as _, SecretString}; -use std::{str::FromStr, time::Duration}; +use futures::SinkExt as _; +use std::time::Duration; use tauri::Manager; use tokio::sync::mpsc; +use tokio_stream::StreamExt; use tracing::instrument; pub mod system_tray; @@ -123,6 +126,23 @@ pub struct RunConfig { pub fail_with: Option, } +/// IPC Messages that a newly launched instance (i.e. a client) may send to an already running instance of Firezone. +#[derive(Debug, PartialEq, serde::Deserialize, serde::Serialize)] +pub enum ClientMsg { + Deeplink(url::Url), + NewInstance, +} + +/// IPC Messages that an already running instance of Firezone may send to a newly launched instance. +#[derive(Debug, PartialEq, serde::Deserialize, serde::Serialize)] +pub enum ServerMsg { + Ack, +} + +#[derive(Debug, thiserror::Error)] +#[error("Another instance of Firezone is already running")] +pub struct AlreadyRunning; + /// Runs the Tauri GUI and returns on exit or unrecoverable error #[instrument(skip_all)] pub fn run( @@ -137,10 +157,14 @@ pub fn run( let _guard = rt.enter(); - // Make sure we're single-instance - // We register our deep links to call the `open-deep-link` subcommand, - // so if we're at this point, we know we've been launched manually - let deep_link_server = rt.block_on(deep_link::Server::new())?; + let gui_ipc = match rt.block_on(create_gui_ipc_server()) { + Ok(gui_ipc) => gui_ipc, + Err(e) => { + tracing::debug!("{e:#}"); + + return Err(anyhow::Error::new(AlreadyRunning)); + } + }; let (ctlr_tx, ctlr_rx) = mpsc::channel(5); let (ready_tx, mut ready_rx) = mpsc::channel::(1); @@ -218,7 +242,6 @@ pub fn run( // to handle deep links let exe = tauri_utils::platform::current_exe().context("Can't find our own exe path")?; deep_link::register(exe).context("Failed to register deep link handler")?; - tokio::spawn(accept_deep_links(deep_link_server, ctlr_tx.clone())); } if let Some(failure) = config.fail_with { @@ -275,6 +298,7 @@ pub fn run( advanced_settings, reloader, updates_rx, + gui_ipc )); anyhow::Ok(ctrl_task) @@ -396,36 +420,6 @@ async fn smoke_test(ctlr_tx: CtlrTx) -> Result<()> { Ok::<_, anyhow::Error>(()) } -/// Worker task to accept deep links from a named pipe forever -/// -/// * `server` An initial named pipe server to consume before making new servers. This lets us also use the named pipe to enforce single-instance -async fn accept_deep_links(mut server: deep_link::Server, ctlr_tx: CtlrTx) -> Result<()> { - loop { - match server.accept().await { - Ok(Some(bytes)) => { - let url = SecretString::from_str( - std::str::from_utf8(bytes.expose_secret()) - .context("Incoming deep link was not valid UTF-8")?, - ) - .context("Impossible: can't wrap String into SecretString")?; - // Ignore errors from this, it would only happen if the app is shutting down, otherwise we would wait - ctlr_tx - .send(ControllerRequest::SchemeRequest(url)) - .await - .ok(); - } - Ok(None) => { - tracing::debug!("Accepted deep-link but read 0 bytes, trying again ..."); - } - Err(error) => { - tracing::warn!("Failed to accept deep link: {error:#}") - } - } - // We re-create the named pipe server every time we get a link, because of an oddity in the Windows API. - server = deep_link::Server::new().await?; - } -} - fn handle_system_tray_event(app: &tauri::AppHandle, event: system_tray::Event) -> Result<()> { app.try_state::() .context("can't get Managed struct from Tauri")? @@ -433,3 +427,57 @@ fn handle_system_tray_event(app: &tauri::AppHandle, event: system_tray::Event) - .blocking_send(ControllerRequest::SystemTrayMenu(event))?; Ok(()) } + +/// Create a new instance of the GUI IPC server. +/// +/// Every time Firezone gets launched, we attempt to connect to this server. +/// If we can successfully connect and handshake a message, then we know that there is a functioning instance of Firezone already running. +/// +/// If we hit an IO errors during the connection, we assume that there isn't already an instance of Firezone and we consider us to be the first instance. +/// +/// There is a third, somewhat gnarly case: +/// +/// An instance of Firezone may already be running but it is not responding. +/// Launching another instance on top of it would likely create more problems that it solves, so we also need to fail for that case. +async fn create_gui_ipc_server() -> Result { + let (read, write) = match ipc::connect::( + SocketId::Gui, + ipc::ConnectOptions { num_attempts: 1 }, + ) + .await + { + Ok(pair) => pair, + Err(e) => { + // If we can't connect to the socket, we must be the first instance. + tracing::debug!( + "We appear to be the first instance of the GUI client; connecting to socket yielded: {e:#}" + ); + + return ipc::Server::new(SocketId::Gui).context("Failed to create GUI IPC socket"); + } + }; + + tokio::time::timeout(Duration::from_secs(5), new_instance_handshake(read, write)) + .await + .context("Failed to handshake with existing instance in 5s")??; + + // If we managed to send the IPC message then another instance of Firezone is already running. + + bail!("Successfully handshaked with existing instance of Firezone GUI") +} + +async fn new_instance_handshake( + mut read: ClientRead, + mut write: ClientWrite, +) -> Result<()> { + write.send(&ClientMsg::NewInstance).await?; + let response = read + .next() + .await + .context("No response received")? + .context("Failed to receive response")?; + + anyhow::ensure!(response == ServerMsg::Ack); + + Ok(()) +} diff --git a/rust/gui-client/src-tauri/src/ipc.rs b/rust/gui-client/src-tauri/src/ipc.rs index 664b1e42b..c654557c7 100644 --- a/rust/gui-client/src-tauri/src/ipc.rs +++ b/rust/gui-client/src-tauri/src/ipc.rs @@ -1,9 +1,8 @@ +//! Defines a reusable, bi-directional, cross-platform IPC framework that uses JSON for message serialisation. + use anyhow::{Context as _, Result}; -use connlib_model::{ResourceId, ResourceView}; -use futures::SinkExt; use platform::{ClientStream, ServerStream}; -use secrecy::{ExposeSecret, SecretString}; -use std::{collections::BTreeSet, io, net::IpAddr}; +use serde::{Serialize, de::DeserializeOwned}; use tokio::io::{ReadHalf, WriteHalf}; use tokio_util::{ bytes::BytesMut, @@ -12,10 +11,10 @@ use tokio_util::{ pub(crate) use platform::Server; -pub type ClientRead = FramedRead, Decoder>; -pub type ClientWrite = FramedWrite, Encoder>; -pub(crate) type ServerRead = FramedRead, Decoder>; -pub(crate) type ServerWrite = FramedWrite, Encoder>; +pub type ClientRead = FramedRead, Decoder>; +pub type ClientWrite = FramedWrite, Encoder>; +pub(crate) type ServerRead = FramedRead, Decoder>; +pub(crate) type ServerWrite = FramedWrite, Encoder>; #[cfg(target_os = "linux")] #[path = "ipc/linux.rs"] @@ -29,62 +28,6 @@ pub(crate) mod platform; #[path = "ipc/macos.rs"] pub(crate) mod platform; -pub struct Client { - // Needed temporarily to avoid a big refactor. We can remove this in the future. - tx: ClientWrite, -} - -impl Client { - pub async fn new() -> Result<(Self, ClientRead)> { - tracing::debug!( - client_pid = std::process::id(), - "Connecting to IPC service..." - ); - let (rx, tx) = connect_to_service(ServiceId::Prod).await?; - - Ok((Self { tx }, rx)) - } - - pub async fn disconnect_from_ipc(mut self) -> Result<()> { - self.tx.close().await?; - Ok(()) - } - - pub async fn send_msg(&mut self, msg: &ClientMsg) -> Result<()> { - self.tx - .send(msg) - .await - .context("Couldn't send IPC message")?; - Ok(()) - } - - pub async fn connect_to_firezone(&mut self, api_url: &str, token: SecretString) -> Result<()> { - let token = token.expose_secret().clone(); - self.send_msg(&ClientMsg::Connect { - api_url: api_url.to_string(), - token, - }) - .await - .context("Couldn't send Connect message")?; - Ok(()) - } - - pub async fn reset(&mut self) -> Result<()> { - self.send_msg(&ClientMsg::Reset) - .await - .context("Couldn't send Reset")?; - Ok(()) - } - - /// Tell connlib about the system's default resolvers - pub async fn set_dns(&mut self, dns: Vec) -> Result<()> { - self.send_msg(&ClientMsg::SetDns(dns)) - .await - .context("Couldn't send SetDns")?; - Ok(()) - } -} - #[derive(Debug, thiserror::Error)] #[error("Couldn't find IPC service `{0}`")] pub struct NotFound(String); @@ -108,15 +51,19 @@ pub struct NotFound(String); /// on all platforms. /// /// Because the paths are so different (and Windows actually uses a `String`), -/// we have this `ServiceId` abstraction instead of just a `PathBuf`. -#[derive(Clone, Copy)] -pub enum ServiceId { - /// The IPC service used by Firezone GUI Client in production +/// we have this [`SocketId`] abstraction instead of just a `PathBuf`. +#[derive(Clone, Copy, Debug)] +pub enum SocketId { + /// The IPC socket used by Firezone GUI Client in production to connect to the tunnel service. /// /// This must go in `/run/dev.firezone.client` on Linux, which requires /// root permission - Prod, - /// An IPC service used for unit tests. + Tunnel, + /// The IPC socket used by the Firezone GUI Client in production to connect to an already running instance. + /// + /// This is used for deeplinks and duplicate launch handling. + Gui, + /// An IPC socket used for unit tests. /// /// This must go in `/run/user/$UID/dev.firezone.client` on Linux so /// the unit tests won't need root. @@ -181,16 +128,37 @@ impl tokio_util::codec::Encoder<&E> for Encoder { } } -/// Connect to the IPC service -/// -/// Public because the GUI Client will need it -pub async fn connect_to_service(id: ServiceId) -> Result<(ClientRead, ClientWrite)> { +pub struct ConnectOptions { + pub num_attempts: usize, +} + +impl Default for ConnectOptions { + fn default() -> Self { + Self { num_attempts: 10 } + } +} + +/// Attempt to connect to an IPC socket. +pub async fn connect( + id: SocketId, + options: ConnectOptions, +) -> Result<(ClientRead, ClientWrite)> +where + R: DeserializeOwned, + W: Serialize, +{ + tracing::debug!( + ?id, + client_pid = std::process::id(), + "Connecting to IPC socket" + ); + // This is how ChatGPT recommended, and I couldn't think of any more clever // way before I asked it. let mut last_err = None; - for _ in 0..10 { - match platform::connect_to_service(id).await { + for _ in 0..options.num_attempts { + match platform::connect_to_socket(id).await { Ok(stream) => { let (rx, tx) = tokio::io::split(stream); let rx = FramedRead::new(rx, Decoder::default()); @@ -198,7 +166,7 @@ pub async fn connect_to_service(id: ServiceId) -> Result<(ClientRead, ClientWrit return Ok((rx, tx)); } Err(error) => { - tracing::debug!("Couldn't connect to IPC service: {error}"); + tracing::debug!("Couldn't connect to IPC socket: {error}"); last_err = Some(error); // This won't come up much for humans but it helps the automated @@ -211,7 +179,13 @@ pub async fn connect_to_service(id: ServiceId) -> Result<(ClientRead, ClientWrit } impl platform::Server { - pub(crate) async fn next_client_split(&mut self) -> Result<(ServerRead, ServerWrite)> { + pub(crate) async fn next_client_split( + &mut self, + ) -> Result<(ServerRead, ServerWrite)> + where + R: DeserializeOwned, + W: Serialize, + { let (rx, tx) = tokio::io::split(self.next_client().await?); let rx = FramedRead::new(rx, Decoder::default()); let tx = FramedWrite::new(tx, Encoder::default()); @@ -219,70 +193,6 @@ impl platform::Server { } } -#[derive(Debug, PartialEq, serde::Deserialize, serde::Serialize)] -pub enum ClientMsg { - ClearLogs, - Connect { - api_url: String, - token: String, - }, - Disconnect, - ApplyLogFilter { - directives: String, - }, - Reset, - SetDns(Vec), - SetDisabledResources(BTreeSet), - StartTelemetry { - environment: String, - release: String, - account_slug: Option, - }, -} - -/// Messages that end up in the GUI, either forwarded from connlib or from the IPC service. -#[derive(Debug, serde::Deserialize, serde::Serialize, strum::Display)] -pub enum ServerMsg { - /// The IPC service finished clearing its log dir. - ClearedLogs(Result<(), String>), - ConnectResult(Result<(), Error>), - DisconnectedGracefully, - OnDisconnect { - error_msg: String, - is_authentication_error: bool, - }, - OnUpdateResources(Vec), - /// The IPC service is terminating, maybe due to a software update - /// - /// This is a hint that the Client should exit with a message like, - /// "Firezone is updating, please restart the GUI" instead of an error like, - /// "IPC connection closed". - TerminatingGracefully, - /// The interface and tunnel are ready for traffic. - TunnelReady, -} - -// All variants are `String` because almost no error type implements `Serialize` -#[derive(Debug, serde::Deserialize, serde::Serialize, thiserror::Error)] -pub enum Error { - #[error("IO error: {0}")] - Io(String), - #[error("{0}")] - Other(String), -} - -impl From for Error { - fn from(v: io::Error) -> Self { - Self::Io(v.to_string()) - } -} - -impl From for Error { - fn from(v: anyhow::Error) -> Self { - Self::Other(format!("{v:#}")) - } -} - #[cfg(test)] mod tests { use super::{platform::Server, *}; @@ -294,33 +204,46 @@ mod tests { #[tokio::test] async fn no_such_service() -> Result<()> { let _guard = firezone_logging::test("trace"); - const ID: ServiceId = ServiceId::Test("H56FRXVH"); + const ID: SocketId = SocketId::Test("H56FRXVH"); - if super::connect_to_service(ID).await.is_ok() { + if super::connect::<(), ()>(ID, super::ConnectOptions::default()) + .await + .is_ok() + { bail!("`connect_to_service` should have failed for a non-existent service"); } Ok(()) } + #[derive(Debug, PartialEq, serde::Deserialize, serde::Serialize)] + enum ClientMsg { + Foo, + } + + #[derive(Debug, PartialEq, serde::Deserialize, serde::Serialize)] + enum ServerMsg { + Bar, + } + /// Make sure the IPC client and server can exchange messages #[tokio::test] async fn smoke() -> Result<()> { let _guard = firezone_logging::test("trace"); let loops = 10; - const ID: ServiceId = ServiceId::Test("OB5SZCGN"); + const ID: SocketId = SocketId::Test("OB5SZCGN"); let mut server = Server::new(ID).expect("Error while starting IPC server"); let server_task: tokio::task::JoinHandle> = tokio::spawn(async move { for _ in 0..loops { let (mut rx, mut tx) = server - .next_client_split() + .next_client_split::() .await .expect("Error while waiting for next IPC client"); while let Some(req) = rx.next().await { let req = req.expect("Error while reading from IPC client"); - ensure!(req == ClientMsg::Reset); - tx.send(&ServerMsg::OnUpdateResources(vec![])) + ensure!(req == ClientMsg::Foo); + tx.send(&ServerMsg::Bar) .await .expect("Error while writing to IPC client"); } @@ -331,11 +254,11 @@ mod tests { let client_task: JoinHandle> = tokio::spawn(async move { for _ in 0..loops { - let (mut rx, mut tx) = super::connect_to_service(ID) + let (mut rx, mut tx) = super::connect(ID, super::ConnectOptions::default()) .await .context("Error while connecting to IPC server")?; - let req = ClientMsg::Reset; + let req = ClientMsg::Foo; for _ in 0..10 { tx.send(&req) .await @@ -345,7 +268,7 @@ mod tests { .await .expect("Should have gotten a reply from the IPC server") .expect("Error while reading from IPC server"); - ensure!(matches!(resp, ServerMsg::OnUpdateResources(_))); + ensure!(matches!(resp, ServerMsg::Bar)); } } Ok(()) @@ -388,7 +311,7 @@ mod tests { async fn loop_to_next_client() -> Result<()> { let _guard = firezone_logging::test("trace"); - let mut server = Server::new(ServiceId::Test("H6L73DG5"))?; + let mut server = Server::new(SocketId::Test("H6L73DG5"))?; for i in 0..5 { if let Ok(Err(err)) = timeout(Duration::from_secs(1), server.next_client()).await { Err(err).with_context(|| { diff --git a/rust/gui-client/src-tauri/src/ipc/linux.rs b/rust/gui-client/src-tauri/src/ipc/linux.rs index a6707ac00..9dd89d76d 100644 --- a/rust/gui-client/src-tauri/src/ipc/linux.rs +++ b/rust/gui-client/src-tauri/src/ipc/linux.rs @@ -1,4 +1,4 @@ -use super::{NotFound, ServiceId}; +use super::{NotFound, SocketId}; use anyhow::{Context as _, Result}; use firezone_bin_shared::BUNDLE_ID; use std::{io::ErrorKind, os::unix::fs::PermissionsExt, path::PathBuf}; @@ -6,6 +6,17 @@ use tokio::net::{UnixListener, UnixStream}; pub(crate) struct Server { listener: UnixListener, + id: SocketId, +} + +impl Drop for Server { + fn drop(&mut self) { + let path = ipc_path(self.id); + + if let Err(e) = std::fs::remove_file(&path) { + tracing::debug!(path = %path.display(), "Failed to delete IPC socket: {e}"); + } + } } /// Alias for the client's half of a platform-specific IPC stream @@ -18,7 +29,7 @@ pub(crate) type ServerStream = UnixStream; /// Connect to the IPC service #[expect(clippy::wildcard_enum_match_arm)] -pub async fn connect_to_service(id: ServiceId) -> Result { +pub async fn connect_to_socket(id: SocketId) -> Result { let path = ipc_path(id); let stream = UnixStream::connect(&path) .await @@ -41,7 +52,7 @@ pub async fn connect_to_service(id: ServiceId) -> Result { impl Server { /// Platform-specific setup - pub(crate) fn new(id: ServiceId) -> Result { + pub(crate) fn new(id: SocketId) -> Result { let sock_path = ipc_path(id); tracing::debug!(socket = %sock_path.display(), "Creating new IPC server"); @@ -61,11 +72,10 @@ impl Server { // TODO: Change this to `notify_service_controller` and put it in // the same place in the IPC service's main loop as in the Headless Client. sd_notify::notify(true, &[sd_notify::NotifyState::Ready])?; - Ok(Self { listener }) + Ok(Self { listener, id }) } pub(crate) async fn next_client(&mut self) -> Result { - tracing::info!("Listening for GUI to connect over IPC..."); let (stream, _) = self.listener.accept().await?; let cred = stream.peer_cred()?; tracing::info!( @@ -87,11 +97,14 @@ impl Server { /// Also systemd can create this dir with the `RuntimeDir=` directive which is nice. /// /// Test sockets live in e.g. `/run/user/1000/dev.firezone.client/data/` -fn ipc_path(id: ServiceId) -> PathBuf { +fn ipc_path(id: SocketId) -> PathBuf { match id { - ServiceId::Prod => PathBuf::from("/run").join(BUNDLE_ID).join("ipc.sock"), + SocketId::Tunnel => PathBuf::from("/run").join(BUNDLE_ID).join("tunnel.sock"), + SocketId::Gui => firezone_bin_shared::known_dirs::runtime() + .expect("`known_dirs::runtime()` should always work") + .join("gui.sock"), #[cfg(test)] - ServiceId::Test(id) => firezone_bin_shared::known_dirs::runtime() + SocketId::Test(id) => firezone_bin_shared::known_dirs::runtime() .expect("`known_dirs::runtime()` should always work") .join(format!("ipc_test_{id}.sock")), } diff --git a/rust/gui-client/src-tauri/src/ipc/macos.rs b/rust/gui-client/src-tauri/src/ipc/macos.rs index 809472e7e..e748e8ca9 100644 --- a/rust/gui-client/src-tauri/src/ipc/macos.rs +++ b/rust/gui-client/src-tauri/src/ipc/macos.rs @@ -1,4 +1,4 @@ -use super::ServiceId; +use super::SocketId; use anyhow::{Result, bail}; use tokio::net::UnixStream; @@ -11,12 +11,12 @@ pub(crate) type ServerStream = UnixStream; clippy::unused_async, reason = "Signture must match other operating systems" )] -pub async fn connect_to_service(_id: ServiceId) -> Result { +pub async fn connect_to_socket(_id: SocketId) -> Result { bail!("not implemented") } impl Server { - pub(crate) fn new(_id: ServiceId) -> Result { + pub(crate) fn new(_id: SocketId) -> Result { bail!("not implemented") } diff --git a/rust/gui-client/src-tauri/src/ipc/windows.rs b/rust/gui-client/src-tauri/src/ipc/windows.rs index be5d0e4a9..e96770dd9 100644 --- a/rust/gui-client/src-tauri/src/ipc/windows.rs +++ b/rust/gui-client/src-tauri/src/ipc/windows.rs @@ -1,4 +1,4 @@ -use super::{NotFound, ServiceId}; +use super::{NotFound, SocketId}; use anyhow::{Context as _, Result, bail}; use firezone_bin_shared::BUNDLE_ID; use std::{ffi::c_void, io::ErrorKind, os::windows::io::AsRawHandle, time::Duration}; @@ -19,12 +19,12 @@ pub type ClientStream = named_pipe::NamedPipeClient; /// Alias for the server's half of a platform-specific IPC stream pub(crate) type ServerStream = named_pipe::NamedPipeServer; -/// Connect to the IPC service +/// Connect to an IPC socket. /// /// This is async on Linux #[expect(clippy::unused_async)] #[expect(clippy::wildcard_enum_match_arm)] -pub(crate) async fn connect_to_service(id: ServiceId) -> Result { +pub(crate) async fn connect_to_socket(id: SocketId) -> Result { let path = ipc_path(id); let stream = named_pipe::ClientOptions::new() .open(&path) @@ -47,7 +47,7 @@ pub(crate) async fn connect_to_service(id: ServiceId) -> Result { impl Server { /// Platform-specific setup #[expect(clippy::unnecessary_wraps, reason = "Linux impl is fallible")] - pub(crate) fn new(id: ServiceId) -> Result { + pub(crate) fn new(id: SocketId) -> Result { let pipe_path = ipc_path(id); Ok(Self { pipe_path }) } @@ -64,10 +64,6 @@ impl Server { .bind_to_pipe() .await .context("Couldn't bind to named pipe")?; - tracing::debug!( - server_pid = std::process::id(), - "Listening for GUI to connect over IPC..." - ); // Note that Tokio has no `poll_connect` server .connect() @@ -153,12 +149,13 @@ fn create_pipe_server(pipe_path: &str) -> Result String { +/// Named pipe for an IPC connection +fn ipc_path(id: SocketId) -> String { let name = match id { - ServiceId::Prod => format!("{BUNDLE_ID}.ipc_service"), + SocketId::Tunnel => format!("{BUNDLE_ID}_tunnel.ipc"), + SocketId::Gui => format!("{BUNDLE_ID}_gui.ipc"), #[cfg(test)] - ServiceId::Test(id) => format!("{BUNDLE_ID}_test_{id}.ipc_service"), + SocketId::Test(id) => format!("{BUNDLE_ID}_test_{id}.ipc"), }; named_pipe_path(&name) } @@ -177,7 +174,7 @@ pub fn named_pipe_path(id: &str) -> String { #[cfg(test)] mod tests { - use super::{Server, ServiceId}; + use super::{Server, SocketId}; use anyhow::Context as _; use futures::StreamExt; @@ -191,23 +188,24 @@ mod tests { #[test] fn ipc_path() { - assert!(super::ipc_path(ServiceId::Prod).starts_with(r"\\.\pipe\")); + assert!(super::ipc_path(SocketId::Tunnel).starts_with(r"\\.\pipe\")); } #[tokio::test] async fn single_instance() -> anyhow::Result<()> { let _guard = firezone_logging::test("trace"); - const ID: ServiceId = ServiceId::Test("2GOCMPBG"); + const ID: SocketId = SocketId::Test("2GOCMPBG"); let mut server_1 = Server::new(ID)?; let pipe_path = server_1.pipe_path.clone(); tokio::spawn(async move { - let (mut rx, _tx) = server_1.next_client_split().await?; + let (mut rx, _tx) = server_1.next_client_split::<(), ()>().await?; rx.next().await; Ok::<_, anyhow::Error>(()) }); - let (_rx, _tx) = crate::ipc::connect_to_service(ID).await?; + let (_rx, _tx) = + crate::ipc::connect::<(), ()>(ID, crate::ipc::ConnectOptions::default()).await?; match super::create_pipe_server(&pipe_path) { Err(super::PipeError::AccessDenied) => {} diff --git a/rust/gui-client/src-tauri/src/service.rs b/rust/gui-client/src-tauri/src/service.rs index 2a5cbd2fb..d886e9f5c 100644 --- a/rust/gui-client/src-tauri/src/service.rs +++ b/rust/gui-client/src-tauri/src/service.rs @@ -1,7 +1,9 @@ +use crate::ipc::{self, SocketId}; use anyhow::{Context as _, Result, bail}; use atomicwrites::{AtomicFile, OverwriteBehavior}; use backoff::ExponentialBackoffBuilder; use client_shared::ConnlibMsg; +use connlib_model::{ResourceId, ResourceView}; use firezone_bin_shared::{ DnsControlMethod, DnsController, TunDeviceManager, device_id, device_info, known_dirs, platform::{tcp_socket_factory, udp_socket_factory}, @@ -16,7 +18,14 @@ use futures::{ }; use phoenix_channel::{DeviceInfo, LoginUrl, PhoenixChannel, get_user_agent}; use secrecy::{Secret, SecretString}; -use std::{io::Write, pin::pin, sync::Arc, time::Duration}; +use std::{ + collections::BTreeSet, + io::{self, Write}, + net::IpAddr, + pin::pin, + sync::Arc, + time::Duration, +}; use tokio::{sync::mpsc, time::Instant}; use url::Url; @@ -34,7 +43,69 @@ mod platform; pub use platform::{elevation_check, install, run}; -use crate::ipc::{self, ServiceId}; +#[derive(Debug, PartialEq, serde::Deserialize, serde::Serialize)] +pub enum ClientMsg { + ClearLogs, + Connect { + api_url: String, + token: String, + }, + Disconnect, + ApplyLogFilter { + directives: String, + }, + Reset, + SetDns(Vec), + SetDisabledResources(BTreeSet), + StartTelemetry { + environment: String, + release: String, + account_slug: Option, + }, +} + +/// Messages that end up in the GUI, either forwarded from connlib or from the IPC service. +#[derive(Debug, serde::Deserialize, serde::Serialize, strum::Display)] +pub enum ServerMsg { + /// The IPC service finished clearing its log dir. + ClearedLogs(Result<(), String>), + ConnectResult(Result<(), ConnectError>), + DisconnectedGracefully, + OnDisconnect { + error_msg: String, + is_authentication_error: bool, + }, + OnUpdateResources(Vec), + /// The IPC service is terminating, maybe due to a software update + /// + /// This is a hint that the Client should exit with a message like, + /// "Firezone is updating, please restart the GUI" instead of an error like, + /// "IPC connection closed". + TerminatingGracefully, + /// The interface and tunnel are ready for traffic. + TunnelReady, +} + +// All variants are `String` because almost no error type implements `Serialize` +#[derive(Debug, serde::Deserialize, serde::Serialize, thiserror::Error)] +pub enum ConnectError { + #[error("IO error: {0}")] + Io(String), + #[error("{0}")] + Other(String), +} + +impl From for ConnectError { + fn from(v: io::Error) -> Self { + Self::Io(v.to_string()) + } +} + +impl From for ConnectError { + fn from(v: anyhow::Error) -> Self { + Self::Other(format!("{v:#}")) + } +} /// Run the IPC service and terminate gracefully if we catch a terminate signal /// @@ -54,7 +125,7 @@ async fn ipc_listen( Telemetry::set_firezone_id(firezone_id); - let mut server = ipc::Server::new(ServiceId::Prod)?; + let mut server = ipc::Server::new(SocketId::Tunnel)?; let mut dns_controller = DnsController { dns_control_method }; loop { let mut handler_fut = pin!(Handler::new( @@ -90,8 +161,8 @@ async fn ipc_listen( /// Handles one IPC client struct Handler<'a> { dns_controller: &'a mut DnsController, - ipc_rx: ipc::ServerRead, - ipc_tx: ipc::ServerWrite, + ipc_rx: ipc::ServerRead, + ipc_tx: ipc::ServerWrite, last_connlib_start_instant: Option, log_filter_reloader: &'a FilterReloadHandle, session: Option, @@ -107,7 +178,7 @@ struct Session { enum Event { Callback(ConnlibMsg), CallbackChannelClosed, - Ipc(ipc::ClientMsg), + Ipc(ClientMsg), IpcDisconnected, IpcError(anyhow::Error), Terminate, @@ -129,6 +200,12 @@ impl<'a> Handler<'a> { telemetry: &'a mut Telemetry, ) -> Result { dns_controller.deactivate()?; + + tracing::info!( + server_pid = std::process::id(), + "Listening for GUI to connect over IPC..." + ); + let (ipc_rx, ipc_tx) = server .next_client_split() .await @@ -189,7 +266,7 @@ impl<'a> Handler<'a> { "Caught SIGINT / SIGTERM / Ctrl+C while an IPC client is connected" ); // Ignore the result here because we're terminating anyway. - let _ = self.send_ipc(ipc::ServerMsg::TerminatingGracefully).await; + let _ = self.send_ipc(ServerMsg::TerminatingGracefully).await; break HandlerOk::ServiceTerminating; } } @@ -233,7 +310,7 @@ impl<'a> Handler<'a> { } => { let _ = self.session.take(); self.dns_controller.deactivate()?; - self.send_ipc(ipc::ServerMsg::OnDisconnect { + self.send_ipc(ServerMsg::OnDisconnect { error_msg, is_authentication_error, }) @@ -255,48 +332,45 @@ impl<'a> Handler<'a> { self.tun_device.set_routes(ipv4_routes, ipv6_routes).await?; self.dns_controller.flush()?; - self.send_ipc(ipc::ServerMsg::TunnelReady).await?; + self.send_ipc(ServerMsg::TunnelReady).await?; } ConnlibMsg::OnUpdateResources(resources) => { // On every resources update, flush DNS to mitigate self.dns_controller.flush()?; - self.send_ipc(ipc::ServerMsg::OnUpdateResources(resources)) + self.send_ipc(ServerMsg::OnUpdateResources(resources)) .await?; } } Ok(()) } - async fn handle_ipc_msg(&mut self, msg: ipc::ClientMsg) -> Result<()> { + async fn handle_ipc_msg(&mut self, msg: ClientMsg) -> Result<()> { match msg { - ipc::ClientMsg::ClearLogs => { + ClientMsg::ClearLogs => { let result = crate::clear_logs( &firezone_bin_shared::known_dirs::ipc_service_logs() .context("Can't compute logs dir")?, ) .await; - self.send_ipc(ipc::ServerMsg::ClearedLogs( - result.map_err(|e| e.to_string()), - )) - .await? + self.send_ipc(ServerMsg::ClearedLogs(result.map_err(|e| e.to_string()))) + .await? } - ipc::ClientMsg::Connect { api_url, token } => { + ClientMsg::Connect { api_url, token } => { // Warning: Connection errors don't bubble to callers of `handle_ipc_msg`. let token = secrecy::SecretString::from(token); let result = self.connect_to_firezone(&api_url, token); - self.send_ipc(ipc::ServerMsg::ConnectResult(result)).await? + self.send_ipc(ServerMsg::ConnectResult(result)).await? } - ipc::ClientMsg::Disconnect => { + ClientMsg::Disconnect => { if self.session.take().is_some() { self.dns_controller.deactivate()?; } // Always send `DisconnectedGracefully` even if we weren't connected, // so this will be idempotent. - self.send_ipc(ipc::ServerMsg::DisconnectedGracefully) - .await?; + self.send_ipc(ServerMsg::DisconnectedGracefully).await?; } - ipc::ClientMsg::ApplyLogFilter { directives } => { + ClientMsg::ApplyLogFilter { directives } => { self.log_filter_reloader.reload(&directives)?; let path = known_dirs::ipc_log_filter()?; @@ -307,7 +381,7 @@ impl<'a> Handler<'a> { tracing::warn!(path = %path.display(), %directives, "Failed to write new log directives: {}", err_with_src(&e)); } } - ipc::ClientMsg::Reset => { + ClientMsg::Reset => { if self.last_connlib_start_instant.is_some() { tracing::debug!("Ignoring reset since we're still signing in"); return Ok(()); @@ -319,7 +393,7 @@ impl<'a> Handler<'a> { session.connlib.reset(); } - ipc::ClientMsg::SetDns(resolvers) => { + ClientMsg::SetDns(resolvers) => { let Some(session) = self.session.as_ref() else { tracing::debug!("Cannot set DNS resolvers if we're signed out"); return Ok(()); @@ -328,7 +402,7 @@ impl<'a> Handler<'a> { tracing::debug!(?resolvers); session.connlib.set_dns(resolvers); } - ipc::ClientMsg::SetDisabledResources(disabled_resources) => { + ClientMsg::SetDisabledResources(disabled_resources) => { let Some(session) = self.session.as_ref() else { // At this point, the GUI has already saved the disabled Resources to disk, so it'll be correct on the next sign-in anyway. tracing::debug!("Cannot set disabled resources if we're signed out"); @@ -337,7 +411,7 @@ impl<'a> Handler<'a> { session.connlib.set_disabled_resources(disabled_resources); } - ipc::ClientMsg::StartTelemetry { + ClientMsg::StartTelemetry { environment, release, account_slug, @@ -362,7 +436,7 @@ impl<'a> Handler<'a> { &mut self, api_url: &str, token: SecretString, - ) -> Result<(), ipc::Error> { + ) -> Result<(), ConnectError> { let _connect_span = telemetry_span!("connect_to_firezone").entered(); assert!(self.session.is_none()); @@ -430,7 +504,7 @@ impl<'a> Handler<'a> { Ok(()) } - async fn send_ipc(&mut self, msg: ipc::ServerMsg) -> Result<()> { + async fn send_ipc(&mut self, msg: ServerMsg) -> Result<()> { self.ipc_tx .send(&msg) .await @@ -476,7 +550,7 @@ pub fn run_debug(dns_control: DnsControlMethod) -> Result<()> { /// This makes the timing neater in case the GUI starts up slowly. #[cfg(debug_assertions)] pub fn run_smoke_test() -> Result<()> { - use crate::ipc::{self, ServiceId}; + use crate::ipc::{self, SocketId}; use anyhow::{Context as _, bail}; use firezone_bin_shared::{DnsController, device_id}; @@ -500,7 +574,7 @@ pub fn run_smoke_test() -> Result<()> { // Couldn't get the loop to work here yet, so SIGHUP is not implemented rt.block_on(async { device_id::get_or_create().context("Failed to read / create device ID")?; - let mut server = ipc::Server::new(ServiceId::Prod)?; + let mut server = ipc::Server::new(SocketId::Tunnel)?; let _ = Handler::new( &mut server, &mut dns_controller, diff --git a/scripts/tests/linux-group.sh b/scripts/tests/linux-group.sh index 6f73754fb..0f6653e4a 100755 --- a/scripts/tests/linux-group.sh +++ b/scripts/tests/linux-group.sh @@ -8,7 +8,7 @@ source "./scripts/tests/lib.sh" BINARY_NAME=firezone-client-ipc FZ_GROUP="firezone-client" SERVICE_NAME=firezone-client-ipc -SOCKET=/run/dev.firezone.client/ipc.sock +SOCKET=/run/dev.firezone.client/tunnel.sock export RUST_LOG=info cd rust || exit 1