diff --git a/rust/gui-client/src-tauri/deb_files/firezone-client-ipc.service b/rust/gui-client/src-tauri/deb_files/firezone-client-ipc.service index 6cdec430a..4de72c9ea 100644 --- a/rust/gui-client/src-tauri/deb_files/firezone-client-ipc.service +++ b/rust/gui-client/src-tauri/deb_files/firezone-client-ipc.service @@ -42,7 +42,7 @@ Environment="LOG_DIR=/var/log/dev.firezone.client" Environment="RUST_LOG=info" EnvironmentFile="/etc/default/firezone-client-ipc" -ExecStart=firezone-client-ipc +ExecStart=firezone-client-ipc ipc-service Type=notify # Unfortunately we may need root to control DNS User=root diff --git a/rust/headless-client/Cargo.toml b/rust/headless-client/Cargo.toml index db5560ba7..7e62c2fd9 100644 --- a/rust/headless-client/Cargo.toml +++ b/rust/headless-client/Cargo.toml @@ -9,7 +9,7 @@ authors = ["Firezone, Inc."] [dependencies] anyhow = { version = "1.0" } -clap = { version = "4.5", features = ["derive", "env"] } +clap = { version = "4.5", features = ["derive", "env", "string"] } connlib-client-shared = { workspace = true } connlib-shared = { workspace = true } firezone-cli-utils = { workspace = true } diff --git a/rust/headless-client/src/lib.rs b/rust/headless-client/src/lib.rs index ea64a0b84..20f02e4fa 100644 --- a/rust/headless-client/src/lib.rs +++ b/rust/headless-client/src/lib.rs @@ -8,23 +8,29 @@ //! Tauri deb bundler to pick it up easily. //! Otherwise we would just make it a normal binary crate. -use anyhow::{Context, Result}; +use anyhow::{anyhow, bail, Context as _, Result}; use clap::Parser; use connlib_client_shared::{file_logger, keypair, Callbacks, LoginUrl, Session, Sockets}; use connlib_shared::callbacks; use firezone_cli_utils::setup_global_subscriber; +use futures::{future, SinkExt, StreamExt}; use secrecy::SecretString; use std::{ - future, net::{IpAddr, Ipv4Addr, Ipv6Addr}, - path::PathBuf, - task::Poll, + path::{Path, PathBuf}, + pin::pin, }; use tokio::sync::mpsc; +use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; use tracing::subscriber::set_global_default; use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter, Layer as _, Registry}; +use url::Url; use platform::default_token_path; +/// SIGINT and, on Linux, SIGHUP. +/// +/// Must be constructed inside a Tokio runtime context. +use platform::Signals; pub mod known_dirs; @@ -56,6 +62,7 @@ pub const GIT_VERSION: &str = git_version::git_version!( const TOKEN_ENV_KEY: &str = "FIREZONE_TOKEN"; +/// Command-line args for the headless Client #[derive(clap::Parser)] #[command(author, version, about, long_about = None)] struct Cli { @@ -64,6 +71,9 @@ struct Cli { #[command(subcommand)] _command: Option, + #[command(flatten)] + common: CliCommon, + #[arg( short = 'u', long, @@ -80,6 +90,17 @@ struct Cli { #[arg(long)] check: bool, + /// Friendly name for this client to display in the UI. + #[arg(long, env = "FIREZONE_NAME")] + firezone_name: Option, + + /// Identifier used by the portal to identify and display the device. + + // AKA `device_id` in the Windows and Linux GUI clients + // Generated automatically if not provided + #[arg(short = 'i', long, env = "FIREZONE_ID")] + pub firezone_id: Option, + /// Token generated by the portal to authorize websocket connection. // systemd recommends against passing secrets through env vars: // @@ -92,20 +113,36 @@ struct Cli { // until anyone asks for it, env vars are okay and files on disk are slightly better. // (Since we run as root and the env var on a headless system is probably stored // on disk somewhere anyway.) - #[arg(default_value_t = default_token_path().display().to_string(), env = "FIREZONE_TOKEN_PATH", long)] - token_path: String, + #[arg(default_value = default_token_path().display().to_string(), env = "FIREZONE_TOKEN_PATH", long)] + token_path: PathBuf, +} - /// Friendly name for this client to display in the UI. - #[arg(long, env = "FIREZONE_NAME")] - firezone_name: Option, +#[derive(clap::Parser)] +#[command(author, version, about, long_about = None)] +struct CliIpcService { + #[command(subcommand)] + command: CmdIpc, - /// Identifier used by the portal to identify and display the device. + #[command(flatten)] + common: CliCommon, +} - // AKA `device_id` in the Windows and Linux GUI clients - // Generated automatically if not provided - #[arg(short = 'i', long, env = "FIREZONE_ID")] - pub firezone_id: Option, +#[derive(clap::Subcommand, Debug, PartialEq, Eq)] +enum CmdIpc { + #[command(hide = true)] + DebugIpcService, + IpcService, +} +impl Default for CmdIpc { + fn default() -> Self { + Self::IpcService + } +} + +/// CLI args common to both the IPC service and the headless Client +#[derive(clap::Args)] +struct CliCommon { /// File logging directory. Should be a path that's writeable by the current user. #[arg(short, long, env = "LOG_DIR")] log_dir: Option, @@ -144,7 +181,7 @@ pub enum IpcServerMsg { } pub fn run_only_headless_client() -> Result<()> { - let mut cli = Cli::parse(); + let mut cli = Cli::try_parse()?; // Modifying the environment of a running process is unsafe. If any other // thread is reading or writing the environment, something bad can happen. @@ -165,7 +202,12 @@ pub fn run_only_headless_client() -> Result<()> { } assert!(std::env::var(TOKEN_ENV_KEY).is_err()); - let (layer, _handle) = cli.log_dir.as_deref().map(file_logger::layer).unzip(); + let (layer, _handle) = cli + .common + .log_dir + .as_deref() + .map(file_logger::layer) + .unzip(); setup_global_subscriber(layer); tracing::info!(git_version = crate::GIT_VERSION); @@ -174,16 +216,16 @@ pub fn run_only_headless_client() -> Result<()> { .enable_all() .build()?; - let token = get_token(token_env_var, &cli)?.with_context(|| { + let token = get_token(token_env_var, &cli.token_path)?.with_context(|| { format!( "Can't find the Firezone token in ${TOKEN_ENV_KEY} or in `{}`", - cli.token_path + cli.token_path.display() ) })?; tracing::info!("Running in headless / standalone mode"); let _guard = rt.enter(); // TODO: Should this default to 30 days? - let max_partition_time = cli.max_partition_time.map(|d| d.into()); + let max_partition_time = cli.common.max_partition_time.map(|d| d.into()); // AKA "Device ID", not the Firezone slug let firezone_id = match cli.firezone_id { @@ -221,30 +263,27 @@ pub fn run_only_headless_client() -> Result<()> { // TODO: this should be added dynamically session.set_dns(platform::system_resolvers().unwrap_or_default()); - let mut signals = platform::Signals::new()?; - let result = rt.block_on(async { - future::poll_fn(|cx| loop { - match on_disconnect_rx.poll_recv(cx) { - Poll::Ready(Some(error)) => return Poll::Ready(Err(anyhow::anyhow!(error))), - Poll::Ready(None) => { - return Poll::Ready(Err(anyhow::anyhow!( - "on_disconnect_rx unexpectedly ran empty" - ))) - } - Poll::Pending => {} - } + let mut signals = Signals::new()?; - match signals.poll(cx) { - Poll::Ready(SignalKind::Hangup) => { + loop { + match future::select(pin!(signals.recv()), pin!(on_disconnect_rx.recv())).await { + future::Either::Left((SignalKind::Hangup, _)) => { + tracing::info!("Caught Hangup signal"); session.reconnect(); - continue; } - Poll::Ready(SignalKind::Interrupt) => return Poll::Ready(Ok(())), - Poll::Pending => return Poll::Pending, + future::Either::Left((SignalKind::Interrupt, _)) => { + tracing::info!("Caught Interrupt signal"); + return Ok(()); + } + future::Either::Right((None, _)) => { + return Err(anyhow::anyhow!("on_disconnect_rx unexpectedly ran empty")); + } + future::Either::Right((Some(error), _)) => { + return Err(anyhow!(error).context("Firezone disconnected")) + } } - }) - .await + } }); session.disconnect(); @@ -252,6 +291,7 @@ pub fn run_only_headless_client() -> Result<()> { result } +/// Only called from the GUI Client's build of the IPC service pub fn run_only_ipc_service() -> Result<()> { // Docs indicate that `remove_var` should actually be marked unsafe // SAFETY: We haven't spawned any other threads, this code should be the first @@ -263,13 +303,151 @@ pub fn run_only_ipc_service() -> Result<()> { std::env::remove_var(TOKEN_ENV_KEY); } assert!(std::env::var(TOKEN_ENV_KEY).is_err()); - platform::run_only_ipc_service() + let cli = CliIpcService::try_parse()?; + match cli.command { + CmdIpc::DebugIpcService => run_debug_ipc_service(), + CmdIpc::IpcService => platform::run_ipc_service(cli.common), + } +} + +pub(crate) fn run_debug_ipc_service() -> Result<()> { + debug_command_setup()?; + let rt = tokio::runtime::Runtime::new()?; + let _guard = rt.enter(); + let mut signals = Signals::new()?; + + // Couldn't get the loop to work here yet, so SIGHUP is not implemented + rt.block_on(async { + let ipc_service = pin!(ipc_listen()); + + match future::select(pin!(signals.recv()), ipc_service).await { + future::Either::Left((SignalKind::Hangup, _)) => { + bail!("Exiting, SIGHUP not implemented for the IPC service"); + } + future::Either::Left((SignalKind::Interrupt, _)) => { + tracing::info!("Caught Interrupt signal"); + Ok(()) + } + future::Either::Right((Ok(()), _)) => { + bail!("Impossible, ipc_listen can't return Ok"); + } + future::Either::Right((Err(error), _)) => Err(error).context("ipc_listen failed"), + } + }) +} + +#[derive(Clone)] +struct CallbackHandlerIpc { + cb_tx: mpsc::Sender, +} + +impl Callbacks for CallbackHandlerIpc { + fn on_disconnect(&self, error: &connlib_client_shared::Error) { + tracing::error!(?error, "Got `on_disconnect` from connlib"); + self.cb_tx + .try_send(IpcServerMsg::OnDisconnect) + .expect("should be able to send OnDisconnect"); + } + + fn on_set_interface_config( + &self, + ipv4: Ipv4Addr, + ipv6: Ipv6Addr, + dns: Vec, + ) -> Option { + tracing::info!("TunnelReady (on_set_interface_config)"); + self.cb_tx + .try_send(IpcServerMsg::OnSetInterfaceConfig { ipv4, ipv6, dns }) + .expect("Should be able to send TunnelReady"); + None + } + + fn on_update_resources(&self, resources: Vec) { + tracing::debug!(len = resources.len(), "New resource list"); + self.cb_tx + .try_send(IpcServerMsg::OnUpdateResources(resources)) + .expect("Should be able to send OnUpdateResources"); + } +} + +async fn ipc_listen() -> Result<()> { + let mut server = platform::IpcServer::new().await?; + loop { + connlib_shared::deactivate_dns_control()?; + let stream = server.next_client().await?; + if let Err(error) = handle_ipc_client(stream).await { + tracing::error!(?error, "Error while handling IPC client"); + } + } +} + +async fn handle_ipc_client(stream: platform::IpcStream) -> Result<()> { + let (rx, tx) = tokio::io::split(stream); + let mut rx = FramedRead::new(rx, LengthDelimitedCodec::new()); + let mut tx = FramedWrite::new(tx, LengthDelimitedCodec::new()); + let (cb_tx, mut cb_rx) = mpsc::channel(100); + + let send_task = tokio::spawn(async move { + while let Some(msg) = cb_rx.recv().await { + tx.send(serde_json::to_string(&msg)?.into()).await?; + } + Ok::<_, anyhow::Error>(()) + }); + + let mut connlib = None; + let callback_handler = CallbackHandlerIpc { cb_tx }; + while let Some(msg) = rx.next().await { + let msg = msg?; + let msg: IpcClientMsg = serde_json::from_slice(&msg)?; + + match msg { + IpcClientMsg::Connect { api_url, token } => { + let token = secrecy::SecretString::from(token); + assert!(connlib.is_none()); + let device_id = connlib_shared::device_id::get() + .context("Failed to read / create device ID")?; + let (private_key, public_key) = keypair(); + + let login = LoginUrl::client( + Url::parse(&api_url)?, + &token, + device_id.id, + None, + public_key.to_bytes(), + )?; + + connlib = Some(connlib_client_shared::Session::connect( + login, + Sockets::new(), + private_key, + None, + callback_handler.clone(), + Some(std::time::Duration::from_secs(60 * 60 * 24 * 30)), + tokio::runtime::Handle::try_current()?, + )); + } + IpcClientMsg::Disconnect => { + if let Some(connlib) = connlib.take() { + connlib.disconnect(); + } + } + IpcClientMsg::Reconnect => connlib.as_mut().context("No connlib session")?.reconnect(), + IpcClientMsg::SetDns(v) => connlib.as_mut().context("No connlib session")?.set_dns(v), + } + } + + send_task.abort(); + + Ok(()) } -// Allow dead code because Windows doesn't have an obvious SIGHUP equivalent #[allow(dead_code)] enum SignalKind { + /// SIGHUP + /// + /// Not caught on Windows Hangup, + /// SIGINT Interrupt, } @@ -301,20 +479,21 @@ impl Callbacks for CallbackHandler { /// - `Ok(None)` if there is no token to be found /// - `Ok(Some(_))` if we found the token /// - `Err(_)` if we found the token on disk but failed to read it -fn get_token(token_env_var: Option, cli: &Cli) -> Result> { +fn get_token( + token_env_var: Option, + token_path: &Path, +) -> Result> { // This is very simple but I don't want to write it twice if let Some(token) = token_env_var { return Ok(Some(token)); } - read_token_file(cli) + read_token_file(token_path) } /// Try to retrieve the token from disk /// /// Sync because we do blocking file I/O -fn read_token_file(cli: &Cli) -> Result> { - let path = PathBuf::from(&cli.token_path); - +fn read_token_file(path: &Path) -> Result> { if let Ok(token) = std::env::var(TOKEN_ENV_KEY) { std::env::remove_var(TOKEN_ENV_KEY); @@ -328,12 +507,12 @@ fn read_token_file(cli: &Cli) -> Result> { return Ok(Some(token)); } - if std::fs::metadata(&path).is_err() { + if std::fs::metadata(path).is_err() { return Ok(None); } - platform::check_token_permissions(&path)?; + platform::check_token_permissions(path)?; - let Ok(bytes) = std::fs::read(&path) else { + let Ok(bytes) = std::fs::read(path) else { // We got the metadata a second ago, but can't read the file itself. // Pretty strange, would have to be a disk fault or TOCTOU. tracing::info!(?path, "Token file existed but now is unreadable"); @@ -356,3 +535,43 @@ pub fn debug_command_setup() -> Result<()> { set_global_default(subscriber)?; Ok(()) } + +#[cfg(test)] +mod tests { + use super::{Cli, CliIpcService, CmdIpc}; + use clap::Parser; + use std::path::PathBuf; + use url::Url; + + // Can't remember how Clap works sometimes + // Also these are examples + #[test] + fn cli() -> anyhow::Result<()> { + let exe_name = "firezone-headless-client"; + + let actual = Cli::parse_from([exe_name]); + assert_eq!(actual.api_url, Url::parse("wss://api.firezone.dev")?); + assert!(!actual.check); + + let actual = Cli::parse_from([exe_name, "--api-url", "wss://api.firez.one"]); + assert_eq!(actual.api_url, Url::parse("wss://api.firez.one")?); + + let actual = Cli::parse_from([exe_name, "--check", "--log-dir", "bogus_log_dir"]); + assert!(actual.check); + assert_eq!(actual.common.log_dir, Some(PathBuf::from("bogus_log_dir"))); + + let actual = CliIpcService::parse_from([ + exe_name, + "--log-dir", + "bogus_log_dir", + "debug-ipc-service", + ]); + assert_eq!(actual.command, CmdIpc::DebugIpcService); + assert_eq!(actual.common.log_dir, Some(PathBuf::from("bogus_log_dir"))); + + let actual = CliIpcService::parse_from([exe_name, "ipc-service"]); + assert_eq!(actual.command, CmdIpc::IpcService); + + Ok(()) + } +} diff --git a/rust/headless-client/src/linux.rs b/rust/headless-client/src/linux.rs index b412a36f6..d50109aa1 100644 --- a/rust/headless-client/src/linux.rs +++ b/rust/headless-client/src/linux.rs @@ -1,30 +1,22 @@ //! Implementation, Linux-specific -use super::{Cli, IpcClientMsg, IpcServerMsg, FIREZONE_GROUP, TOKEN_ENV_KEY}; +use super::{CliCommon, SignalKind, FIREZONE_GROUP, TOKEN_ENV_KEY}; use anyhow::{bail, Context as _, Result}; -use clap::Parser; -use connlib_client_shared::{file_logger, Callbacks, Sockets}; -use connlib_shared::{ - callbacks, keypair, - linux::{etc_resolv_conf, get_dns_control_from_env, DnsControlMethod}, - LoginUrl, -}; +use connlib_client_shared::file_logger; +use connlib_shared::linux::{etc_resolv_conf, get_dns_control_from_env, DnsControlMethod}; use firezone_cli_utils::setup_global_subscriber; -use futures::{SinkExt, StreamExt}; +use futures::future::{select, Either}; use std::{ - net::{IpAddr, Ipv4Addr, Ipv6Addr}, + net::IpAddr, os::unix::fs::PermissionsExt, path::{Path, PathBuf}, + pin::pin, str::FromStr, - task::{Context, Poll}, }; use tokio::{ net::{UnixListener, UnixStream}, - signal::unix::SignalKind as TokioSignalKind, - sync::mpsc, + signal::unix::{signal, Signal, SignalKind as TokioSignalKind}, }; -use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; -use url::Url; // The Client currently must run as root to control DNS // Root group and user are used to check file ownership on the token @@ -32,28 +24,23 @@ const ROOT_GROUP: u32 = 0; const ROOT_USER: u32 = 0; pub(crate) struct Signals { - sighup: tokio::signal::unix::Signal, - sigint: tokio::signal::unix::Signal, + sighup: Signal, + sigint: Signal, } impl Signals { pub(crate) fn new() -> Result { - let sighup = tokio::signal::unix::signal(TokioSignalKind::hangup())?; - let sigint = tokio::signal::unix::signal(TokioSignalKind::interrupt())?; + let sighup = signal(TokioSignalKind::hangup())?; + let sigint = signal(TokioSignalKind::interrupt())?; Ok(Self { sighup, sigint }) } - pub(crate) fn poll(&mut self, cx: &mut Context) -> Poll { - if self.sigint.poll_recv(cx).is_ready() { - return Poll::Ready(super::SignalKind::Interrupt); + pub(crate) async fn recv(&mut self) -> SignalKind { + match select(pin!(self.sighup.recv()), pin!(self.sigint.recv())).await { + Either::Left((_, _)) => SignalKind::Hangup, + Either::Right((_, _)) => SignalKind::Interrupt, } - - if self.sighup.poll_recv(cx).is_ready() { - return Poll::Ready(super::SignalKind::Hangup); - } - - Poll::Pending } } @@ -63,24 +50,6 @@ pub fn default_token_path() -> PathBuf { .join("token") } -/// Only called from the GUI Client's build of the IPC service -/// -/// On Linux this is the same as running with `ipc-service` -pub(crate) fn run_only_ipc_service() -> Result<()> { - let cli = Cli::parse(); - // systemd supplies this but maybe we should hard-code a better default - let (layer, _handle) = cli.log_dir.as_deref().map(file_logger::layer).unzip(); - setup_global_subscriber(layer); - tracing::info!(git_version = crate::GIT_VERSION); - - if !nix::unistd::getuid().is_root() { - anyhow::bail!("This is the IPC service binary, it's not meant to run interactively."); - } - let rt = tokio::runtime::Runtime::new()?; - let (_shutdown_tx, shutdown_rx) = mpsc::channel(1); - run_ipc_service(cli, rt, shutdown_rx) -} - pub(crate) fn check_token_permissions(path: &Path) -> Result<()> { let Ok(stat) = nix::sys::stat::fstatat(None, path, nix::fcntl::AtFlags::empty()) else { // File doesn't exist or can't be read @@ -184,13 +153,21 @@ pub fn sock_path() -> PathBuf { .join("ipc.sock") } -pub(crate) fn run_ipc_service( - cli: Cli, - rt: tokio::runtime::Runtime, - _shutdown_rx: mpsc::Receiver<()>, -) -> Result<()> { +/// Cross-platform entry point for systemd / Windows services +/// +/// Linux uses the CLI args from here, Windows does not +pub(crate) fn run_ipc_service(cli: CliCommon) -> Result<()> { tracing::info!("run_ipc_service"); - rt.block_on(async { ipc_listen(cli).await }) + // systemd supplies this but maybe we should hard-code a better default + let (layer, _handle) = cli.log_dir.as_deref().map(file_logger::layer).unzip(); + setup_global_subscriber(layer); + tracing::info!(git_version = crate::GIT_VERSION); + + if !nix::unistd::getuid().is_root() { + anyhow::bail!("This is the IPC service binary, it's not meant to run interactively."); + } + let rt = tokio::runtime::Runtime::new()?; + rt.block_on(async { crate::ipc_listen().await }) } pub fn firezone_group() -> Result { @@ -200,132 +177,43 @@ pub fn firezone_group() -> Result { Ok(group) } -async fn ipc_listen(cli: Cli) -> Result<()> { - // Remove the socket if a previous run left it there - let sock_path = sock_path(); - tokio::fs::remove_file(&sock_path).await.ok(); - let listener = UnixListener::bind(&sock_path).context("Couldn't bind UDS")?; - let perms = std::fs::Permissions::from_mode(0o660); - std::fs::set_permissions(sock_path, perms)?; - sd_notify::notify(true, &[sd_notify::NotifyState::Ready])?; +pub(crate) struct IpcServer { + listener: UnixListener, +} - loop { - connlib_shared::deactivate_dns_control()?; +/// Opaque wrapper around platform-specific IPC stream +pub(crate) type IpcStream = UnixStream; + +impl IpcServer { + /// Platform-specific setup + pub(crate) async fn new() -> Result { + // Remove the socket if a previous run left it there + let sock_path = sock_path(); + tokio::fs::remove_file(&sock_path).await.ok(); + let listener = UnixListener::bind(&sock_path).context("Couldn't bind UDS")?; + let perms = std::fs::Permissions::from_mode(0o660); + std::fs::set_permissions(sock_path, perms)?; + sd_notify::notify(true, &[sd_notify::NotifyState::Ready])?; + Ok(Self { listener }) + } + + pub(crate) async fn next_client(&mut self) -> Result { tracing::info!("Listening for GUI to connect over IPC..."); - let (stream, _) = listener.accept().await?; + let (stream, _) = self.listener.accept().await?; let cred = stream.peer_cred()?; + // I'm not sure if we can enforce group membership here - Docker + // might just be enforcing it with filesystem permissions. + // Checking the secondary groups of another user looks complicated. tracing::info!( uid = cred.uid(), gid = cred.gid(), pid = cred.pid(), "Got an IPC connection" ); - - // I'm not sure if we can enforce group membership here - Docker - // might just be enforcing it with filesystem permissions. - // Checking the secondary groups of another user looks complicated. - if let Err(error) = handle_ipc_client(&cli, stream).await { - tracing::error!(?error, "Error while handling IPC client"); - } + Ok(stream) } } -#[derive(Clone)] -struct CallbackHandlerIpc { - cb_tx: mpsc::Sender, -} - -impl Callbacks for CallbackHandlerIpc { - fn on_disconnect(&self, error: &connlib_client_shared::Error) { - tracing::error!(?error, "Got `on_disconnect` from connlib"); - self.cb_tx - .try_send(IpcServerMsg::OnDisconnect) - .expect("should be able to send OnDisconnect"); - } - - fn on_set_interface_config( - &self, - ipv4: Ipv4Addr, - ipv6: Ipv6Addr, - dns: Vec, - ) -> Option { - tracing::info!("TunnelReady (on_set_interface_config)"); - self.cb_tx - .try_send(IpcServerMsg::OnSetInterfaceConfig { ipv4, ipv6, dns }) - .expect("Should be able to send TunnelReady"); - None - } - - fn on_update_resources(&self, resources: Vec) { - tracing::debug!(len = resources.len(), "New resource list"); - self.cb_tx - .try_send(IpcServerMsg::OnUpdateResources(resources)) - .expect("Should be able to send OnUpdateResources"); - } -} - -async fn handle_ipc_client(cli: &Cli, stream: UnixStream) -> Result<()> { - let (rx, tx) = stream.into_split(); - let mut rx = FramedRead::new(rx, LengthDelimitedCodec::new()); - let mut tx = FramedWrite::new(tx, LengthDelimitedCodec::new()); - let (cb_tx, mut cb_rx) = mpsc::channel(100); - - let send_task = tokio::spawn(async move { - while let Some(msg) = cb_rx.recv().await { - tx.send(serde_json::to_string(&msg)?.into()).await?; - } - Ok::<_, anyhow::Error>(()) - }); - - let mut connlib = None; - let callback_handler = CallbackHandlerIpc { cb_tx }; - while let Some(msg) = rx.next().await { - let msg = msg?; - let msg: super::IpcClientMsg = serde_json::from_slice(&msg)?; - - match msg { - IpcClientMsg::Connect { api_url, token } => { - let token = secrecy::SecretString::from(token); - assert!(connlib.is_none()); - let device_id = connlib_shared::device_id::get() - .context("Failed to read / create device ID")?; - let (private_key, public_key) = keypair(); - - let login = LoginUrl::client( - Url::parse(&api_url)?, - &token, - device_id.id, - None, - public_key.to_bytes(), - )?; - - connlib = Some(connlib_client_shared::Session::connect( - login, - Sockets::new(), - private_key, - None, - callback_handler.clone(), - cli.max_partition_time - .map(|t| t.into()) - .or(Some(std::time::Duration::from_secs(60 * 60 * 24 * 30))), - tokio::runtime::Handle::try_current()?, - )); - } - IpcClientMsg::Disconnect => { - if let Some(connlib) = connlib.take() { - connlib.disconnect(); - } - } - IpcClientMsg::Reconnect => connlib.as_mut().context("No connlib session")?.reconnect(), - IpcClientMsg::SetDns(v) => connlib.as_mut().context("No connlib session")?.set_dns(v), - } - } - - send_task.abort(); - - Ok(()) -} - /// Platform-specific setup needed for connlib /// /// On Linux this does nothing diff --git a/rust/headless-client/src/windows.rs b/rust/headless-client/src/windows.rs index 921f61ce1..00b40f2bb 100644 --- a/rust/headless-client/src/windows.rs +++ b/rust/headless-client/src/windows.rs @@ -4,27 +4,23 @@ //! service to be stopped even if its only process ends, for some reason. //! We must tell Windows explicitly when our service is stopping. -use crate::{IpcClientMsg, IpcServerMsg, SignalKind}; +use crate::{CliCommon, SignalKind}; use anyhow::{anyhow, Context as _, Result}; -use clap::Parser; -use connlib_client_shared::{callbacks, file_logger, keypair, Callbacks, LoginUrl, Sockets}; +use connlib_client_shared::file_logger; use connlib_shared::BUNDLE_ID; -use futures::{SinkExt, Stream}; use std::{ ffi::{c_void, OsString}, - future::{poll_fn, Future}, - net::{IpAddr, Ipv4Addr, Ipv6Addr}, + future::Future, + net::IpAddr, path::{Path, PathBuf}, pin::pin, str::FromStr, - task::{Context, Poll}, + task::Poll, time::Duration, }; use tokio::{net::windows::named_pipe, sync::mpsc}; -use tokio_util::codec::{Framed, LengthDelimitedCodec}; use tracing::subscriber::set_global_default; use tracing_subscriber::{layer::SubscriberExt as _, EnvFilter, Layer, Registry}; -use url::Url; use windows::Win32::Security as WinSec; use windows_service::{ service::{ @@ -45,6 +41,8 @@ const SERVICE_RUST_LOG: &str = "str0m=warn,info"; const SERVICE_NAME: &str = "firezone_client_ipc"; const SERVICE_TYPE: ServiceType = ServiceType::OWN_PROCESS; +// This looks like a pointless wrapper around `CtrlC`, because it must match +// the Linux signatures pub(crate) struct Signals { sigint: tokio::signal::windows::CtrlC, } @@ -55,31 +53,9 @@ impl Signals { Ok(Self { sigint }) } - pub(crate) fn poll(&mut self, cx: &mut Context) -> Poll { - if self.sigint.poll_recv(cx).is_ready() { - return Poll::Ready(SignalKind::Interrupt); - } - Poll::Pending - } -} - -#[derive(clap::Parser, Default)] -#[command(author, version, about, long_about = None)] -struct CliIpcService { - #[command(subcommand)] - command: CmdIpc, -} - -#[derive(clap::Subcommand)] -enum CmdIpc { - #[command(hide = true)] - DebugIpcService, - IpcService, -} - -impl Default for CmdIpc { - fn default() -> Self { - Self::IpcService + pub(crate) async fn recv(&mut self) -> SignalKind { + self.sigint.recv().await; + SignalKind::Interrupt } } @@ -95,67 +71,26 @@ pub(crate) fn default_token_path() -> std::path::PathBuf { PathBuf::from("token.txt") } -/// Only called from the GUI Client's build of the IPC service +/// Cross-platform entry point for systemd / Windows services /// -/// On Windows, this is wrapped specially so that Windows' service controller -/// can launch it. -pub(crate) fn run_only_ipc_service() -> Result<()> { - let cli = CliIpcService::parse(); - match cli.command { - CmdIpc::DebugIpcService => run_debug_ipc_service(cli), - CmdIpc::IpcService => windows_service::service_dispatcher::start(SERVICE_NAME, ffi_service_run).context("windows_service::service_dispatcher failed. This isn't running in an interactive terminal, right?"), - } -} - -fn run_debug_ipc_service(cli: CliIpcService) -> Result<()> { - crate::debug_command_setup()?; - let rt = tokio::runtime::Runtime::new()?; - let mut ipc_service = pin!(ipc_listen(cli)); - let mut signals = Signals::new()?; - rt.block_on(async { - std::future::poll_fn(|cx| { - match signals.poll(cx) { - Poll::Ready(SignalKind::Hangup) => { - return Poll::Ready(Err(anyhow::anyhow!( - "Impossible, we don't catch Hangup on Windows" - ))); - } - Poll::Ready(SignalKind::Interrupt) => { - tracing::info!("Caught Interrupt signal"); - return Poll::Ready(Ok(())); - } - Poll::Pending => {} - } - - match ipc_service.as_mut().poll(cx) { - Poll::Ready(Ok(())) => { - return Poll::Ready(Err(anyhow::anyhow!( - "Impossible, ipc_listen can't return Ok" - ))); - } - Poll::Ready(Err(error)) => { - return Poll::Ready(Err(error).context("ipc_listen failed")); - } - Poll::Pending => {} - } - - Poll::Pending - }) - .await - }) +/// Linux uses the CLI args from here, Windows does not +pub(crate) fn run_ipc_service(_cli: CliCommon) -> Result<()> { + windows_service::service_dispatcher::start(SERVICE_NAME, ffi_service_run).context("windows_service::service_dispatcher failed. This isn't running in an interactive terminal, right?") } // Generates `ffi_service_run` from `service_run` windows_service::define_windows_service!(ffi_service_run, windows_service_run); -fn windows_service_run(_arguments: Vec) { - if let Err(error) = fallible_windows_service_run() { +fn windows_service_run(arguments: Vec) { + if let Err(error) = fallible_windows_service_run(arguments) { tracing::error!(?error, "fallible_windows_service_run returned an error"); } } // Most of the Windows-specific service stuff should go here -fn fallible_windows_service_run() -> Result<()> { +// +// The arguments don't seem to match the ones passed to the main thread at all. +fn fallible_windows_service_run(arguments: Vec) -> Result<()> { let log_path = crate::known_dirs::ipc_service_logs().context("Can't compute IPC service logs dir")?; std::fs::create_dir_all(&log_path)?; @@ -164,6 +99,7 @@ fn fallible_windows_service_run() -> Result<()> { let subscriber = Registry::default().with(layer.with_filter(filter)); set_global_default(subscriber)?; tracing::info!(git_version = crate::GIT_VERSION); + tracing::info!(?arguments, "fallible_windows_service_run"); let rt = tokio::runtime::Runtime::new()?; let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1); @@ -213,7 +149,7 @@ fn fallible_windows_service_run() -> Result<()> { process_id: None, })?; - let mut ipc_service = pin!(ipc_listen(CliIpcService::default())); + let mut ipc_service = pin!(super::ipc_listen()); let result = rt.block_on(async { std::future::poll_fn(|cx| { match shutdown_rx.poll_recv(cx) { @@ -257,21 +193,31 @@ fn fallible_windows_service_run() -> Result<()> { result } -async fn ipc_listen(_cli: CliIpcService) -> Result<()> { - setup_before_connlib()?; - loop { - // This is redundant on the first loop. After that it clears the rules - // between GUI instances. - connlib_shared::deactivate_dns_control()?; +pub(crate) struct IpcServer { + // On Linux this has some fields +} + +/// Opaque wrapper around platform-specific IPC stream +pub(crate) type IpcStream = named_pipe::NamedPipeServer; + +impl IpcServer { + /// Platform-specific setup + /// + /// This is async on Linux + #[allow(clippy::unused_async)] + pub(crate) async fn new() -> Result { + setup_before_connlib()?; + Ok(Self {}) + } + + pub(crate) async fn next_client(&mut self) -> Result { let server = create_pipe_server()?; tracing::info!("Listening for GUI to connect over IPC..."); server .connect() .await .context("Couldn't accept IPC connection from GUI")?; - if let Err(error) = handle_ipc_client(server).await { - tracing::error!(?error, "Error while handling IPC client"); - } + Ok(server) } } @@ -315,130 +261,6 @@ pub fn pipe_path() -> String { named_pipe_path(&format!("{BUNDLE_ID}.ipc_service")) } -enum IpcEvent { - /// A message that the client sent us - Client(IpcClientMsg), - /// A message that connlib wants to send - Connlib(IpcServerMsg), - /// The IPC client disconnected - IpcDisconnect, -} - -#[derive(Clone)] -struct CallbackHandlerIpc { - cb_tx: mpsc::Sender, -} - -impl Callbacks for CallbackHandlerIpc { - fn on_disconnect(&self, _error: &connlib_client_shared::Error) { - self.cb_tx - .try_send(IpcServerMsg::OnDisconnect) - .expect("should be able to send OnDisconnect"); - } - - fn on_set_interface_config( - &self, - ipv4: Ipv4Addr, - ipv6: Ipv6Addr, - dns: Vec, - ) -> Option { - tracing::info!("TunnelReady"); - self.cb_tx - .try_send(IpcServerMsg::OnSetInterfaceConfig { ipv4, ipv6, dns }) - .expect("Should be able to send TunnelReady"); - None - } - - fn on_update_resources(&self, resources: Vec) { - tracing::info!(len = resources.len(), "New resource list"); - self.cb_tx - .try_send(IpcServerMsg::OnUpdateResources(resources)) - .expect("Should be able to send OnUpdateResources"); - } -} - -async fn handle_ipc_client(server: named_pipe::NamedPipeServer) -> Result<()> { - let framed = Framed::new(server, LengthDelimitedCodec::new()); - let mut framed = pin!(framed); - let (cb_tx, mut cb_rx) = mpsc::channel(100); - - let mut connlib = None; - let callback_handler = CallbackHandlerIpc { cb_tx }; - loop { - let ev = poll_fn(|cx| { - match cb_rx.poll_recv(cx) { - Poll::Ready(Some(msg)) => return Poll::Ready(Ok(IpcEvent::Connlib(msg))), - Poll::Ready(None) => { - return Poll::Ready(Err(anyhow!( - "Impossible - MPSC channel from connlib closed" - ))) - } - Poll::Pending => {} - } - - match framed.as_mut().poll_next(cx) { - Poll::Ready(Some(msg)) => { - let msg = serde_json::from_slice(&msg?)?; - return Poll::Ready(Ok(IpcEvent::Client(msg))); - } - Poll::Ready(None) => return Poll::Ready(Ok(IpcEvent::IpcDisconnect)), - Poll::Pending => {} - } - - Poll::Pending - }) - .await; - - match ev { - Ok(IpcEvent::Client(msg)) => match msg { - IpcClientMsg::Connect { api_url, token } => { - let token = secrecy::SecretString::from(token); - assert!(connlib.is_none()); - let device_id = connlib_shared::device_id::get() - .context("Failed to read / create device ID")?; - let (private_key, public_key) = keypair(); - - let login = LoginUrl::client( - Url::parse(&api_url)?, - &token, - device_id.id, - None, - public_key.to_bytes(), - )?; - - connlib = Some(connlib_client_shared::Session::connect( - login, - Sockets::new(), - private_key, - None, - callback_handler.clone(), - Some(std::time::Duration::from_secs(60 * 60 * 24 * 30)), - tokio::runtime::Handle::try_current()?, - )); - } - IpcClientMsg::Disconnect => { - if let Some(connlib) = connlib.take() { - connlib.disconnect(); - } - } - IpcClientMsg::Reconnect => { - connlib.as_mut().context("No connlib session")?.reconnect() - } - IpcClientMsg::SetDns(v) => { - connlib.as_mut().context("No connlib session")?.set_dns(v) - } - }, - Ok(IpcEvent::Connlib(msg)) => framed.send(serde_json::to_string(&msg)?.into()).await?, - Ok(IpcEvent::IpcDisconnect) => { - tracing::info!("IPC client disconnected"); - break; - } - Err(e) => Err(e)?, - } - } - Ok(()) -} - pub fn system_resolvers() -> Result> { let resolvers = ipconfig::get_adapters()? .iter() @@ -464,10 +286,6 @@ pub fn named_pipe_path(id: &str) -> String { format!(r"\\.\pipe\{}", id) } -/// Platform-specific setup needed for connlib -/// -/// On Windows this installs wintun.dll -#[allow(clippy::unnecessary_wraps)] pub(crate) fn setup_before_connlib() -> Result<()> { wintun_install::ensure_dll()?; Ok(())