refactor(gui-client): de-dupe IPC server code, enable debug IPC service for Linux (#5074)

Refs #5022

The debug IPC service has been useful on Windows, and since there is
more refactoring to do, I want it on Linux too.

With this you can just do `sudo -E target/debug/firezone-client-ipc
debug-ipc-service` and it will launch an IPC service without messing
with systemd or installing anything. (Assuming the directory for the
socket is created)

```[tasklist]
### Before merging
- [ ] Check for regressions in Windows
- [ ] Check for regressions in Linux
```
This commit is contained in:
Reactor Scram
2024-05-28 09:37:03 -05:00
committed by GitHub
parent 6b570a6dad
commit bfffcedf47
5 changed files with 363 additions and 438 deletions

View File

@@ -42,7 +42,7 @@ Environment="LOG_DIR=/var/log/dev.firezone.client"
Environment="RUST_LOG=info"
EnvironmentFile="/etc/default/firezone-client-ipc"
ExecStart=firezone-client-ipc
ExecStart=firezone-client-ipc ipc-service
Type=notify
# Unfortunately we may need root to control DNS
User=root

View File

@@ -9,7 +9,7 @@ authors = ["Firezone, Inc."]
[dependencies]
anyhow = { version = "1.0" }
clap = { version = "4.5", features = ["derive", "env"] }
clap = { version = "4.5", features = ["derive", "env", "string"] }
connlib-client-shared = { workspace = true }
connlib-shared = { workspace = true }
firezone-cli-utils = { workspace = true }

View File

@@ -8,23 +8,29 @@
//! Tauri deb bundler to pick it up easily.
//! Otherwise we would just make it a normal binary crate.
use anyhow::{Context, Result};
use anyhow::{anyhow, bail, Context as _, Result};
use clap::Parser;
use connlib_client_shared::{file_logger, keypair, Callbacks, LoginUrl, Session, Sockets};
use connlib_shared::callbacks;
use firezone_cli_utils::setup_global_subscriber;
use futures::{future, SinkExt, StreamExt};
use secrecy::SecretString;
use std::{
future,
net::{IpAddr, Ipv4Addr, Ipv6Addr},
path::PathBuf,
task::Poll,
path::{Path, PathBuf},
pin::pin,
};
use tokio::sync::mpsc;
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use tracing::subscriber::set_global_default;
use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter, Layer as _, Registry};
use url::Url;
use platform::default_token_path;
/// SIGINT and, on Linux, SIGHUP.
///
/// Must be constructed inside a Tokio runtime context.
use platform::Signals;
pub mod known_dirs;
@@ -56,6 +62,7 @@ pub const GIT_VERSION: &str = git_version::git_version!(
const TOKEN_ENV_KEY: &str = "FIREZONE_TOKEN";
/// Command-line args for the headless Client
#[derive(clap::Parser)]
#[command(author, version, about, long_about = None)]
struct Cli {
@@ -64,6 +71,9 @@ struct Cli {
#[command(subcommand)]
_command: Option<Cmd>,
#[command(flatten)]
common: CliCommon,
#[arg(
short = 'u',
long,
@@ -80,6 +90,17 @@ struct Cli {
#[arg(long)]
check: bool,
/// Friendly name for this client to display in the UI.
#[arg(long, env = "FIREZONE_NAME")]
firezone_name: Option<String>,
/// Identifier used by the portal to identify and display the device.
// AKA `device_id` in the Windows and Linux GUI clients
// Generated automatically if not provided
#[arg(short = 'i', long, env = "FIREZONE_ID")]
pub firezone_id: Option<String>,
/// Token generated by the portal to authorize websocket connection.
// systemd recommends against passing secrets through env vars:
// <https://www.freedesktop.org/software/systemd/man/latest/systemd.exec.html#Environment=>
@@ -92,20 +113,36 @@ struct Cli {
// until anyone asks for it, env vars are okay and files on disk are slightly better.
// (Since we run as root and the env var on a headless system is probably stored
// on disk somewhere anyway.)
#[arg(default_value_t = default_token_path().display().to_string(), env = "FIREZONE_TOKEN_PATH", long)]
token_path: String,
#[arg(default_value = default_token_path().display().to_string(), env = "FIREZONE_TOKEN_PATH", long)]
token_path: PathBuf,
}
/// Friendly name for this client to display in the UI.
#[arg(long, env = "FIREZONE_NAME")]
firezone_name: Option<String>,
#[derive(clap::Parser)]
#[command(author, version, about, long_about = None)]
struct CliIpcService {
#[command(subcommand)]
command: CmdIpc,
/// Identifier used by the portal to identify and display the device.
#[command(flatten)]
common: CliCommon,
}
// AKA `device_id` in the Windows and Linux GUI clients
// Generated automatically if not provided
#[arg(short = 'i', long, env = "FIREZONE_ID")]
pub firezone_id: Option<String>,
#[derive(clap::Subcommand, Debug, PartialEq, Eq)]
enum CmdIpc {
#[command(hide = true)]
DebugIpcService,
IpcService,
}
impl Default for CmdIpc {
fn default() -> Self {
Self::IpcService
}
}
/// CLI args common to both the IPC service and the headless Client
#[derive(clap::Args)]
struct CliCommon {
/// File logging directory. Should be a path that's writeable by the current user.
#[arg(short, long, env = "LOG_DIR")]
log_dir: Option<PathBuf>,
@@ -144,7 +181,7 @@ pub enum IpcServerMsg {
}
pub fn run_only_headless_client() -> Result<()> {
let mut cli = Cli::parse();
let mut cli = Cli::try_parse()?;
// Modifying the environment of a running process is unsafe. If any other
// thread is reading or writing the environment, something bad can happen.
@@ -165,7 +202,12 @@ pub fn run_only_headless_client() -> Result<()> {
}
assert!(std::env::var(TOKEN_ENV_KEY).is_err());
let (layer, _handle) = cli.log_dir.as_deref().map(file_logger::layer).unzip();
let (layer, _handle) = cli
.common
.log_dir
.as_deref()
.map(file_logger::layer)
.unzip();
setup_global_subscriber(layer);
tracing::info!(git_version = crate::GIT_VERSION);
@@ -174,16 +216,16 @@ pub fn run_only_headless_client() -> Result<()> {
.enable_all()
.build()?;
let token = get_token(token_env_var, &cli)?.with_context(|| {
let token = get_token(token_env_var, &cli.token_path)?.with_context(|| {
format!(
"Can't find the Firezone token in ${TOKEN_ENV_KEY} or in `{}`",
cli.token_path
cli.token_path.display()
)
})?;
tracing::info!("Running in headless / standalone mode");
let _guard = rt.enter();
// TODO: Should this default to 30 days?
let max_partition_time = cli.max_partition_time.map(|d| d.into());
let max_partition_time = cli.common.max_partition_time.map(|d| d.into());
// AKA "Device ID", not the Firezone slug
let firezone_id = match cli.firezone_id {
@@ -221,30 +263,27 @@ pub fn run_only_headless_client() -> Result<()> {
// TODO: this should be added dynamically
session.set_dns(platform::system_resolvers().unwrap_or_default());
let mut signals = platform::Signals::new()?;
let result = rt.block_on(async {
future::poll_fn(|cx| loop {
match on_disconnect_rx.poll_recv(cx) {
Poll::Ready(Some(error)) => return Poll::Ready(Err(anyhow::anyhow!(error))),
Poll::Ready(None) => {
return Poll::Ready(Err(anyhow::anyhow!(
"on_disconnect_rx unexpectedly ran empty"
)))
}
Poll::Pending => {}
}
let mut signals = Signals::new()?;
match signals.poll(cx) {
Poll::Ready(SignalKind::Hangup) => {
loop {
match future::select(pin!(signals.recv()), pin!(on_disconnect_rx.recv())).await {
future::Either::Left((SignalKind::Hangup, _)) => {
tracing::info!("Caught Hangup signal");
session.reconnect();
continue;
}
Poll::Ready(SignalKind::Interrupt) => return Poll::Ready(Ok(())),
Poll::Pending => return Poll::Pending,
future::Either::Left((SignalKind::Interrupt, _)) => {
tracing::info!("Caught Interrupt signal");
return Ok(());
}
future::Either::Right((None, _)) => {
return Err(anyhow::anyhow!("on_disconnect_rx unexpectedly ran empty"));
}
future::Either::Right((Some(error), _)) => {
return Err(anyhow!(error).context("Firezone disconnected"))
}
}
})
.await
}
});
session.disconnect();
@@ -252,6 +291,7 @@ pub fn run_only_headless_client() -> Result<()> {
result
}
/// Only called from the GUI Client's build of the IPC service
pub fn run_only_ipc_service() -> Result<()> {
// Docs indicate that `remove_var` should actually be marked unsafe
// SAFETY: We haven't spawned any other threads, this code should be the first
@@ -263,13 +303,151 @@ pub fn run_only_ipc_service() -> Result<()> {
std::env::remove_var(TOKEN_ENV_KEY);
}
assert!(std::env::var(TOKEN_ENV_KEY).is_err());
platform::run_only_ipc_service()
let cli = CliIpcService::try_parse()?;
match cli.command {
CmdIpc::DebugIpcService => run_debug_ipc_service(),
CmdIpc::IpcService => platform::run_ipc_service(cli.common),
}
}
pub(crate) fn run_debug_ipc_service() -> Result<()> {
debug_command_setup()?;
let rt = tokio::runtime::Runtime::new()?;
let _guard = rt.enter();
let mut signals = Signals::new()?;
// Couldn't get the loop to work here yet, so SIGHUP is not implemented
rt.block_on(async {
let ipc_service = pin!(ipc_listen());
match future::select(pin!(signals.recv()), ipc_service).await {
future::Either::Left((SignalKind::Hangup, _)) => {
bail!("Exiting, SIGHUP not implemented for the IPC service");
}
future::Either::Left((SignalKind::Interrupt, _)) => {
tracing::info!("Caught Interrupt signal");
Ok(())
}
future::Either::Right((Ok(()), _)) => {
bail!("Impossible, ipc_listen can't return Ok");
}
future::Either::Right((Err(error), _)) => Err(error).context("ipc_listen failed"),
}
})
}
#[derive(Clone)]
struct CallbackHandlerIpc {
cb_tx: mpsc::Sender<IpcServerMsg>,
}
impl Callbacks for CallbackHandlerIpc {
fn on_disconnect(&self, error: &connlib_client_shared::Error) {
tracing::error!(?error, "Got `on_disconnect` from connlib");
self.cb_tx
.try_send(IpcServerMsg::OnDisconnect)
.expect("should be able to send OnDisconnect");
}
fn on_set_interface_config(
&self,
ipv4: Ipv4Addr,
ipv6: Ipv6Addr,
dns: Vec<IpAddr>,
) -> Option<i32> {
tracing::info!("TunnelReady (on_set_interface_config)");
self.cb_tx
.try_send(IpcServerMsg::OnSetInterfaceConfig { ipv4, ipv6, dns })
.expect("Should be able to send TunnelReady");
None
}
fn on_update_resources(&self, resources: Vec<callbacks::ResourceDescription>) {
tracing::debug!(len = resources.len(), "New resource list");
self.cb_tx
.try_send(IpcServerMsg::OnUpdateResources(resources))
.expect("Should be able to send OnUpdateResources");
}
}
async fn ipc_listen() -> Result<()> {
let mut server = platform::IpcServer::new().await?;
loop {
connlib_shared::deactivate_dns_control()?;
let stream = server.next_client().await?;
if let Err(error) = handle_ipc_client(stream).await {
tracing::error!(?error, "Error while handling IPC client");
}
}
}
async fn handle_ipc_client(stream: platform::IpcStream) -> Result<()> {
let (rx, tx) = tokio::io::split(stream);
let mut rx = FramedRead::new(rx, LengthDelimitedCodec::new());
let mut tx = FramedWrite::new(tx, LengthDelimitedCodec::new());
let (cb_tx, mut cb_rx) = mpsc::channel(100);
let send_task = tokio::spawn(async move {
while let Some(msg) = cb_rx.recv().await {
tx.send(serde_json::to_string(&msg)?.into()).await?;
}
Ok::<_, anyhow::Error>(())
});
let mut connlib = None;
let callback_handler = CallbackHandlerIpc { cb_tx };
while let Some(msg) = rx.next().await {
let msg = msg?;
let msg: IpcClientMsg = serde_json::from_slice(&msg)?;
match msg {
IpcClientMsg::Connect { api_url, token } => {
let token = secrecy::SecretString::from(token);
assert!(connlib.is_none());
let device_id = connlib_shared::device_id::get()
.context("Failed to read / create device ID")?;
let (private_key, public_key) = keypair();
let login = LoginUrl::client(
Url::parse(&api_url)?,
&token,
device_id.id,
None,
public_key.to_bytes(),
)?;
connlib = Some(connlib_client_shared::Session::connect(
login,
Sockets::new(),
private_key,
None,
callback_handler.clone(),
Some(std::time::Duration::from_secs(60 * 60 * 24 * 30)),
tokio::runtime::Handle::try_current()?,
));
}
IpcClientMsg::Disconnect => {
if let Some(connlib) = connlib.take() {
connlib.disconnect();
}
}
IpcClientMsg::Reconnect => connlib.as_mut().context("No connlib session")?.reconnect(),
IpcClientMsg::SetDns(v) => connlib.as_mut().context("No connlib session")?.set_dns(v),
}
}
send_task.abort();
Ok(())
}
// Allow dead code because Windows doesn't have an obvious SIGHUP equivalent
#[allow(dead_code)]
enum SignalKind {
/// SIGHUP
///
/// Not caught on Windows
Hangup,
/// SIGINT
Interrupt,
}
@@ -301,20 +479,21 @@ impl Callbacks for CallbackHandler {
/// - `Ok(None)` if there is no token to be found
/// - `Ok(Some(_))` if we found the token
/// - `Err(_)` if we found the token on disk but failed to read it
fn get_token(token_env_var: Option<SecretString>, cli: &Cli) -> Result<Option<SecretString>> {
fn get_token(
token_env_var: Option<SecretString>,
token_path: &Path,
) -> Result<Option<SecretString>> {
// This is very simple but I don't want to write it twice
if let Some(token) = token_env_var {
return Ok(Some(token));
}
read_token_file(cli)
read_token_file(token_path)
}
/// Try to retrieve the token from disk
///
/// Sync because we do blocking file I/O
fn read_token_file(cli: &Cli) -> Result<Option<SecretString>> {
let path = PathBuf::from(&cli.token_path);
fn read_token_file(path: &Path) -> Result<Option<SecretString>> {
if let Ok(token) = std::env::var(TOKEN_ENV_KEY) {
std::env::remove_var(TOKEN_ENV_KEY);
@@ -328,12 +507,12 @@ fn read_token_file(cli: &Cli) -> Result<Option<SecretString>> {
return Ok(Some(token));
}
if std::fs::metadata(&path).is_err() {
if std::fs::metadata(path).is_err() {
return Ok(None);
}
platform::check_token_permissions(&path)?;
platform::check_token_permissions(path)?;
let Ok(bytes) = std::fs::read(&path) else {
let Ok(bytes) = std::fs::read(path) else {
// We got the metadata a second ago, but can't read the file itself.
// Pretty strange, would have to be a disk fault or TOCTOU.
tracing::info!(?path, "Token file existed but now is unreadable");
@@ -356,3 +535,43 @@ pub fn debug_command_setup() -> Result<()> {
set_global_default(subscriber)?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::{Cli, CliIpcService, CmdIpc};
use clap::Parser;
use std::path::PathBuf;
use url::Url;
// Can't remember how Clap works sometimes
// Also these are examples
#[test]
fn cli() -> anyhow::Result<()> {
let exe_name = "firezone-headless-client";
let actual = Cli::parse_from([exe_name]);
assert_eq!(actual.api_url, Url::parse("wss://api.firezone.dev")?);
assert!(!actual.check);
let actual = Cli::parse_from([exe_name, "--api-url", "wss://api.firez.one"]);
assert_eq!(actual.api_url, Url::parse("wss://api.firez.one")?);
let actual = Cli::parse_from([exe_name, "--check", "--log-dir", "bogus_log_dir"]);
assert!(actual.check);
assert_eq!(actual.common.log_dir, Some(PathBuf::from("bogus_log_dir")));
let actual = CliIpcService::parse_from([
exe_name,
"--log-dir",
"bogus_log_dir",
"debug-ipc-service",
]);
assert_eq!(actual.command, CmdIpc::DebugIpcService);
assert_eq!(actual.common.log_dir, Some(PathBuf::from("bogus_log_dir")));
let actual = CliIpcService::parse_from([exe_name, "ipc-service"]);
assert_eq!(actual.command, CmdIpc::IpcService);
Ok(())
}
}

View File

@@ -1,30 +1,22 @@
//! Implementation, Linux-specific
use super::{Cli, IpcClientMsg, IpcServerMsg, FIREZONE_GROUP, TOKEN_ENV_KEY};
use super::{CliCommon, SignalKind, FIREZONE_GROUP, TOKEN_ENV_KEY};
use anyhow::{bail, Context as _, Result};
use clap::Parser;
use connlib_client_shared::{file_logger, Callbacks, Sockets};
use connlib_shared::{
callbacks, keypair,
linux::{etc_resolv_conf, get_dns_control_from_env, DnsControlMethod},
LoginUrl,
};
use connlib_client_shared::file_logger;
use connlib_shared::linux::{etc_resolv_conf, get_dns_control_from_env, DnsControlMethod};
use firezone_cli_utils::setup_global_subscriber;
use futures::{SinkExt, StreamExt};
use futures::future::{select, Either};
use std::{
net::{IpAddr, Ipv4Addr, Ipv6Addr},
net::IpAddr,
os::unix::fs::PermissionsExt,
path::{Path, PathBuf},
pin::pin,
str::FromStr,
task::{Context, Poll},
};
use tokio::{
net::{UnixListener, UnixStream},
signal::unix::SignalKind as TokioSignalKind,
sync::mpsc,
signal::unix::{signal, Signal, SignalKind as TokioSignalKind},
};
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use url::Url;
// The Client currently must run as root to control DNS
// Root group and user are used to check file ownership on the token
@@ -32,28 +24,23 @@ const ROOT_GROUP: u32 = 0;
const ROOT_USER: u32 = 0;
pub(crate) struct Signals {
sighup: tokio::signal::unix::Signal,
sigint: tokio::signal::unix::Signal,
sighup: Signal,
sigint: Signal,
}
impl Signals {
pub(crate) fn new() -> Result<Self> {
let sighup = tokio::signal::unix::signal(TokioSignalKind::hangup())?;
let sigint = tokio::signal::unix::signal(TokioSignalKind::interrupt())?;
let sighup = signal(TokioSignalKind::hangup())?;
let sigint = signal(TokioSignalKind::interrupt())?;
Ok(Self { sighup, sigint })
}
pub(crate) fn poll(&mut self, cx: &mut Context) -> Poll<super::SignalKind> {
if self.sigint.poll_recv(cx).is_ready() {
return Poll::Ready(super::SignalKind::Interrupt);
pub(crate) async fn recv(&mut self) -> SignalKind {
match select(pin!(self.sighup.recv()), pin!(self.sigint.recv())).await {
Either::Left((_, _)) => SignalKind::Hangup,
Either::Right((_, _)) => SignalKind::Interrupt,
}
if self.sighup.poll_recv(cx).is_ready() {
return Poll::Ready(super::SignalKind::Hangup);
}
Poll::Pending
}
}
@@ -63,24 +50,6 @@ pub fn default_token_path() -> PathBuf {
.join("token")
}
/// Only called from the GUI Client's build of the IPC service
///
/// On Linux this is the same as running with `ipc-service`
pub(crate) fn run_only_ipc_service() -> Result<()> {
let cli = Cli::parse();
// systemd supplies this but maybe we should hard-code a better default
let (layer, _handle) = cli.log_dir.as_deref().map(file_logger::layer).unzip();
setup_global_subscriber(layer);
tracing::info!(git_version = crate::GIT_VERSION);
if !nix::unistd::getuid().is_root() {
anyhow::bail!("This is the IPC service binary, it's not meant to run interactively.");
}
let rt = tokio::runtime::Runtime::new()?;
let (_shutdown_tx, shutdown_rx) = mpsc::channel(1);
run_ipc_service(cli, rt, shutdown_rx)
}
pub(crate) fn check_token_permissions(path: &Path) -> Result<()> {
let Ok(stat) = nix::sys::stat::fstatat(None, path, nix::fcntl::AtFlags::empty()) else {
// File doesn't exist or can't be read
@@ -184,13 +153,21 @@ pub fn sock_path() -> PathBuf {
.join("ipc.sock")
}
pub(crate) fn run_ipc_service(
cli: Cli,
rt: tokio::runtime::Runtime,
_shutdown_rx: mpsc::Receiver<()>,
) -> Result<()> {
/// Cross-platform entry point for systemd / Windows services
///
/// Linux uses the CLI args from here, Windows does not
pub(crate) fn run_ipc_service(cli: CliCommon) -> Result<()> {
tracing::info!("run_ipc_service");
rt.block_on(async { ipc_listen(cli).await })
// systemd supplies this but maybe we should hard-code a better default
let (layer, _handle) = cli.log_dir.as_deref().map(file_logger::layer).unzip();
setup_global_subscriber(layer);
tracing::info!(git_version = crate::GIT_VERSION);
if !nix::unistd::getuid().is_root() {
anyhow::bail!("This is the IPC service binary, it's not meant to run interactively.");
}
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(async { crate::ipc_listen().await })
}
pub fn firezone_group() -> Result<nix::unistd::Group> {
@@ -200,132 +177,43 @@ pub fn firezone_group() -> Result<nix::unistd::Group> {
Ok(group)
}
async fn ipc_listen(cli: Cli) -> Result<()> {
// Remove the socket if a previous run left it there
let sock_path = sock_path();
tokio::fs::remove_file(&sock_path).await.ok();
let listener = UnixListener::bind(&sock_path).context("Couldn't bind UDS")?;
let perms = std::fs::Permissions::from_mode(0o660);
std::fs::set_permissions(sock_path, perms)?;
sd_notify::notify(true, &[sd_notify::NotifyState::Ready])?;
pub(crate) struct IpcServer {
listener: UnixListener,
}
loop {
connlib_shared::deactivate_dns_control()?;
/// Opaque wrapper around platform-specific IPC stream
pub(crate) type IpcStream = UnixStream;
impl IpcServer {
/// Platform-specific setup
pub(crate) async fn new() -> Result<Self> {
// Remove the socket if a previous run left it there
let sock_path = sock_path();
tokio::fs::remove_file(&sock_path).await.ok();
let listener = UnixListener::bind(&sock_path).context("Couldn't bind UDS")?;
let perms = std::fs::Permissions::from_mode(0o660);
std::fs::set_permissions(sock_path, perms)?;
sd_notify::notify(true, &[sd_notify::NotifyState::Ready])?;
Ok(Self { listener })
}
pub(crate) async fn next_client(&mut self) -> Result<IpcStream> {
tracing::info!("Listening for GUI to connect over IPC...");
let (stream, _) = listener.accept().await?;
let (stream, _) = self.listener.accept().await?;
let cred = stream.peer_cred()?;
// I'm not sure if we can enforce group membership here - Docker
// might just be enforcing it with filesystem permissions.
// Checking the secondary groups of another user looks complicated.
tracing::info!(
uid = cred.uid(),
gid = cred.gid(),
pid = cred.pid(),
"Got an IPC connection"
);
// I'm not sure if we can enforce group membership here - Docker
// might just be enforcing it with filesystem permissions.
// Checking the secondary groups of another user looks complicated.
if let Err(error) = handle_ipc_client(&cli, stream).await {
tracing::error!(?error, "Error while handling IPC client");
}
Ok(stream)
}
}
#[derive(Clone)]
struct CallbackHandlerIpc {
cb_tx: mpsc::Sender<IpcServerMsg>,
}
impl Callbacks for CallbackHandlerIpc {
fn on_disconnect(&self, error: &connlib_client_shared::Error) {
tracing::error!(?error, "Got `on_disconnect` from connlib");
self.cb_tx
.try_send(IpcServerMsg::OnDisconnect)
.expect("should be able to send OnDisconnect");
}
fn on_set_interface_config(
&self,
ipv4: Ipv4Addr,
ipv6: Ipv6Addr,
dns: Vec<IpAddr>,
) -> Option<i32> {
tracing::info!("TunnelReady (on_set_interface_config)");
self.cb_tx
.try_send(IpcServerMsg::OnSetInterfaceConfig { ipv4, ipv6, dns })
.expect("Should be able to send TunnelReady");
None
}
fn on_update_resources(&self, resources: Vec<callbacks::ResourceDescription>) {
tracing::debug!(len = resources.len(), "New resource list");
self.cb_tx
.try_send(IpcServerMsg::OnUpdateResources(resources))
.expect("Should be able to send OnUpdateResources");
}
}
async fn handle_ipc_client(cli: &Cli, stream: UnixStream) -> Result<()> {
let (rx, tx) = stream.into_split();
let mut rx = FramedRead::new(rx, LengthDelimitedCodec::new());
let mut tx = FramedWrite::new(tx, LengthDelimitedCodec::new());
let (cb_tx, mut cb_rx) = mpsc::channel(100);
let send_task = tokio::spawn(async move {
while let Some(msg) = cb_rx.recv().await {
tx.send(serde_json::to_string(&msg)?.into()).await?;
}
Ok::<_, anyhow::Error>(())
});
let mut connlib = None;
let callback_handler = CallbackHandlerIpc { cb_tx };
while let Some(msg) = rx.next().await {
let msg = msg?;
let msg: super::IpcClientMsg = serde_json::from_slice(&msg)?;
match msg {
IpcClientMsg::Connect { api_url, token } => {
let token = secrecy::SecretString::from(token);
assert!(connlib.is_none());
let device_id = connlib_shared::device_id::get()
.context("Failed to read / create device ID")?;
let (private_key, public_key) = keypair();
let login = LoginUrl::client(
Url::parse(&api_url)?,
&token,
device_id.id,
None,
public_key.to_bytes(),
)?;
connlib = Some(connlib_client_shared::Session::connect(
login,
Sockets::new(),
private_key,
None,
callback_handler.clone(),
cli.max_partition_time
.map(|t| t.into())
.or(Some(std::time::Duration::from_secs(60 * 60 * 24 * 30))),
tokio::runtime::Handle::try_current()?,
));
}
IpcClientMsg::Disconnect => {
if let Some(connlib) = connlib.take() {
connlib.disconnect();
}
}
IpcClientMsg::Reconnect => connlib.as_mut().context("No connlib session")?.reconnect(),
IpcClientMsg::SetDns(v) => connlib.as_mut().context("No connlib session")?.set_dns(v),
}
}
send_task.abort();
Ok(())
}
/// Platform-specific setup needed for connlib
///
/// On Linux this does nothing

View File

@@ -4,27 +4,23 @@
//! service to be stopped even if its only process ends, for some reason.
//! We must tell Windows explicitly when our service is stopping.
use crate::{IpcClientMsg, IpcServerMsg, SignalKind};
use crate::{CliCommon, SignalKind};
use anyhow::{anyhow, Context as _, Result};
use clap::Parser;
use connlib_client_shared::{callbacks, file_logger, keypair, Callbacks, LoginUrl, Sockets};
use connlib_client_shared::file_logger;
use connlib_shared::BUNDLE_ID;
use futures::{SinkExt, Stream};
use std::{
ffi::{c_void, OsString},
future::{poll_fn, Future},
net::{IpAddr, Ipv4Addr, Ipv6Addr},
future::Future,
net::IpAddr,
path::{Path, PathBuf},
pin::pin,
str::FromStr,
task::{Context, Poll},
task::Poll,
time::Duration,
};
use tokio::{net::windows::named_pipe, sync::mpsc};
use tokio_util::codec::{Framed, LengthDelimitedCodec};
use tracing::subscriber::set_global_default;
use tracing_subscriber::{layer::SubscriberExt as _, EnvFilter, Layer, Registry};
use url::Url;
use windows::Win32::Security as WinSec;
use windows_service::{
service::{
@@ -45,6 +41,8 @@ const SERVICE_RUST_LOG: &str = "str0m=warn,info";
const SERVICE_NAME: &str = "firezone_client_ipc";
const SERVICE_TYPE: ServiceType = ServiceType::OWN_PROCESS;
// This looks like a pointless wrapper around `CtrlC`, because it must match
// the Linux signatures
pub(crate) struct Signals {
sigint: tokio::signal::windows::CtrlC,
}
@@ -55,31 +53,9 @@ impl Signals {
Ok(Self { sigint })
}
pub(crate) fn poll(&mut self, cx: &mut Context) -> Poll<SignalKind> {
if self.sigint.poll_recv(cx).is_ready() {
return Poll::Ready(SignalKind::Interrupt);
}
Poll::Pending
}
}
#[derive(clap::Parser, Default)]
#[command(author, version, about, long_about = None)]
struct CliIpcService {
#[command(subcommand)]
command: CmdIpc,
}
#[derive(clap::Subcommand)]
enum CmdIpc {
#[command(hide = true)]
DebugIpcService,
IpcService,
}
impl Default for CmdIpc {
fn default() -> Self {
Self::IpcService
pub(crate) async fn recv(&mut self) -> SignalKind {
self.sigint.recv().await;
SignalKind::Interrupt
}
}
@@ -95,67 +71,26 @@ pub(crate) fn default_token_path() -> std::path::PathBuf {
PathBuf::from("token.txt")
}
/// Only called from the GUI Client's build of the IPC service
/// Cross-platform entry point for systemd / Windows services
///
/// On Windows, this is wrapped specially so that Windows' service controller
/// can launch it.
pub(crate) fn run_only_ipc_service() -> Result<()> {
let cli = CliIpcService::parse();
match cli.command {
CmdIpc::DebugIpcService => run_debug_ipc_service(cli),
CmdIpc::IpcService => windows_service::service_dispatcher::start(SERVICE_NAME, ffi_service_run).context("windows_service::service_dispatcher failed. This isn't running in an interactive terminal, right?"),
}
}
fn run_debug_ipc_service(cli: CliIpcService) -> Result<()> {
crate::debug_command_setup()?;
let rt = tokio::runtime::Runtime::new()?;
let mut ipc_service = pin!(ipc_listen(cli));
let mut signals = Signals::new()?;
rt.block_on(async {
std::future::poll_fn(|cx| {
match signals.poll(cx) {
Poll::Ready(SignalKind::Hangup) => {
return Poll::Ready(Err(anyhow::anyhow!(
"Impossible, we don't catch Hangup on Windows"
)));
}
Poll::Ready(SignalKind::Interrupt) => {
tracing::info!("Caught Interrupt signal");
return Poll::Ready(Ok(()));
}
Poll::Pending => {}
}
match ipc_service.as_mut().poll(cx) {
Poll::Ready(Ok(())) => {
return Poll::Ready(Err(anyhow::anyhow!(
"Impossible, ipc_listen can't return Ok"
)));
}
Poll::Ready(Err(error)) => {
return Poll::Ready(Err(error).context("ipc_listen failed"));
}
Poll::Pending => {}
}
Poll::Pending
})
.await
})
/// Linux uses the CLI args from here, Windows does not
pub(crate) fn run_ipc_service(_cli: CliCommon) -> Result<()> {
windows_service::service_dispatcher::start(SERVICE_NAME, ffi_service_run).context("windows_service::service_dispatcher failed. This isn't running in an interactive terminal, right?")
}
// Generates `ffi_service_run` from `service_run`
windows_service::define_windows_service!(ffi_service_run, windows_service_run);
fn windows_service_run(_arguments: Vec<OsString>) {
if let Err(error) = fallible_windows_service_run() {
fn windows_service_run(arguments: Vec<OsString>) {
if let Err(error) = fallible_windows_service_run(arguments) {
tracing::error!(?error, "fallible_windows_service_run returned an error");
}
}
// Most of the Windows-specific service stuff should go here
fn fallible_windows_service_run() -> Result<()> {
//
// The arguments don't seem to match the ones passed to the main thread at all.
fn fallible_windows_service_run(arguments: Vec<OsString>) -> Result<()> {
let log_path =
crate::known_dirs::ipc_service_logs().context("Can't compute IPC service logs dir")?;
std::fs::create_dir_all(&log_path)?;
@@ -164,6 +99,7 @@ fn fallible_windows_service_run() -> Result<()> {
let subscriber = Registry::default().with(layer.with_filter(filter));
set_global_default(subscriber)?;
tracing::info!(git_version = crate::GIT_VERSION);
tracing::info!(?arguments, "fallible_windows_service_run");
let rt = tokio::runtime::Runtime::new()?;
let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
@@ -213,7 +149,7 @@ fn fallible_windows_service_run() -> Result<()> {
process_id: None,
})?;
let mut ipc_service = pin!(ipc_listen(CliIpcService::default()));
let mut ipc_service = pin!(super::ipc_listen());
let result = rt.block_on(async {
std::future::poll_fn(|cx| {
match shutdown_rx.poll_recv(cx) {
@@ -257,21 +193,31 @@ fn fallible_windows_service_run() -> Result<()> {
result
}
async fn ipc_listen(_cli: CliIpcService) -> Result<()> {
setup_before_connlib()?;
loop {
// This is redundant on the first loop. After that it clears the rules
// between GUI instances.
connlib_shared::deactivate_dns_control()?;
pub(crate) struct IpcServer {
// On Linux this has some fields
}
/// Opaque wrapper around platform-specific IPC stream
pub(crate) type IpcStream = named_pipe::NamedPipeServer;
impl IpcServer {
/// Platform-specific setup
///
/// This is async on Linux
#[allow(clippy::unused_async)]
pub(crate) async fn new() -> Result<Self> {
setup_before_connlib()?;
Ok(Self {})
}
pub(crate) async fn next_client(&mut self) -> Result<IpcStream> {
let server = create_pipe_server()?;
tracing::info!("Listening for GUI to connect over IPC...");
server
.connect()
.await
.context("Couldn't accept IPC connection from GUI")?;
if let Err(error) = handle_ipc_client(server).await {
tracing::error!(?error, "Error while handling IPC client");
}
Ok(server)
}
}
@@ -315,130 +261,6 @@ pub fn pipe_path() -> String {
named_pipe_path(&format!("{BUNDLE_ID}.ipc_service"))
}
enum IpcEvent {
/// A message that the client sent us
Client(IpcClientMsg),
/// A message that connlib wants to send
Connlib(IpcServerMsg),
/// The IPC client disconnected
IpcDisconnect,
}
#[derive(Clone)]
struct CallbackHandlerIpc {
cb_tx: mpsc::Sender<IpcServerMsg>,
}
impl Callbacks for CallbackHandlerIpc {
fn on_disconnect(&self, _error: &connlib_client_shared::Error) {
self.cb_tx
.try_send(IpcServerMsg::OnDisconnect)
.expect("should be able to send OnDisconnect");
}
fn on_set_interface_config(
&self,
ipv4: Ipv4Addr,
ipv6: Ipv6Addr,
dns: Vec<IpAddr>,
) -> Option<i32> {
tracing::info!("TunnelReady");
self.cb_tx
.try_send(IpcServerMsg::OnSetInterfaceConfig { ipv4, ipv6, dns })
.expect("Should be able to send TunnelReady");
None
}
fn on_update_resources(&self, resources: Vec<callbacks::ResourceDescription>) {
tracing::info!(len = resources.len(), "New resource list");
self.cb_tx
.try_send(IpcServerMsg::OnUpdateResources(resources))
.expect("Should be able to send OnUpdateResources");
}
}
async fn handle_ipc_client(server: named_pipe::NamedPipeServer) -> Result<()> {
let framed = Framed::new(server, LengthDelimitedCodec::new());
let mut framed = pin!(framed);
let (cb_tx, mut cb_rx) = mpsc::channel(100);
let mut connlib = None;
let callback_handler = CallbackHandlerIpc { cb_tx };
loop {
let ev = poll_fn(|cx| {
match cb_rx.poll_recv(cx) {
Poll::Ready(Some(msg)) => return Poll::Ready(Ok(IpcEvent::Connlib(msg))),
Poll::Ready(None) => {
return Poll::Ready(Err(anyhow!(
"Impossible - MPSC channel from connlib closed"
)))
}
Poll::Pending => {}
}
match framed.as_mut().poll_next(cx) {
Poll::Ready(Some(msg)) => {
let msg = serde_json::from_slice(&msg?)?;
return Poll::Ready(Ok(IpcEvent::Client(msg)));
}
Poll::Ready(None) => return Poll::Ready(Ok(IpcEvent::IpcDisconnect)),
Poll::Pending => {}
}
Poll::Pending
})
.await;
match ev {
Ok(IpcEvent::Client(msg)) => match msg {
IpcClientMsg::Connect { api_url, token } => {
let token = secrecy::SecretString::from(token);
assert!(connlib.is_none());
let device_id = connlib_shared::device_id::get()
.context("Failed to read / create device ID")?;
let (private_key, public_key) = keypair();
let login = LoginUrl::client(
Url::parse(&api_url)?,
&token,
device_id.id,
None,
public_key.to_bytes(),
)?;
connlib = Some(connlib_client_shared::Session::connect(
login,
Sockets::new(),
private_key,
None,
callback_handler.clone(),
Some(std::time::Duration::from_secs(60 * 60 * 24 * 30)),
tokio::runtime::Handle::try_current()?,
));
}
IpcClientMsg::Disconnect => {
if let Some(connlib) = connlib.take() {
connlib.disconnect();
}
}
IpcClientMsg::Reconnect => {
connlib.as_mut().context("No connlib session")?.reconnect()
}
IpcClientMsg::SetDns(v) => {
connlib.as_mut().context("No connlib session")?.set_dns(v)
}
},
Ok(IpcEvent::Connlib(msg)) => framed.send(serde_json::to_string(&msg)?.into()).await?,
Ok(IpcEvent::IpcDisconnect) => {
tracing::info!("IPC client disconnected");
break;
}
Err(e) => Err(e)?,
}
}
Ok(())
}
pub fn system_resolvers() -> Result<Vec<IpAddr>> {
let resolvers = ipconfig::get_adapters()?
.iter()
@@ -464,10 +286,6 @@ pub fn named_pipe_path(id: &str) -> String {
format!(r"\\.\pipe\{}", id)
}
/// Platform-specific setup needed for connlib
///
/// On Windows this installs wintun.dll
#[allow(clippy::unnecessary_wraps)]
pub(crate) fn setup_before_connlib() -> Result<()> {
wintun_install::ensure_dll()?;
Ok(())