chore(gui-client): proof of concept for process splitting (#4788)

Closes #4270

Refs #3713 
Refs #3782 

It sort-of works, but many features are missing and it needs a refactor.

```[tasklist]
- [ ] Break `imp_linux.rs` into modules
- [ ] Get rid of `try_send` and panics where possible
```

---------

Signed-off-by: Reactor Scram <ReactorScram@users.noreply.github.com>
This commit is contained in:
Reactor Scram
2024-04-26 14:58:12 -05:00
committed by GitHub
parent 52e8138644
commit c8d989a34d
11 changed files with 635 additions and 380 deletions

4
rust/Cargo.lock generated
View File

@@ -1906,7 +1906,6 @@ dependencies = [
"git-version",
"hex",
"hostname 0.4.0",
"ipconfig",
"keyring",
"minidumper",
"native-dialog",
@@ -1927,6 +1926,7 @@ dependencies = [
"tauri-winrt-notification",
"thiserror",
"tokio",
"tokio-util",
"tracing",
"tracing-log",
"tracing-panic",
@@ -1949,11 +1949,11 @@ dependencies = [
"clap",
"connlib-client-shared",
"connlib-shared",
"dirs",
"firezone-cli-utils",
"futures",
"git-version",
"humantime",
"ipconfig",
"nix 0.28.0",
"resolv-conf",
"sd-notify",

View File

@@ -71,7 +71,9 @@ impl Device {
dns_config: Vec<IpAddr>,
callbacks: &impl Callbacks,
) -> Result<(), ConnlibError> {
self.tun = Some(Tun::new(config, dns_config, callbacks)?);
self.tun = Some(Tun::new(config, dns_config.clone(), callbacks)?);
callbacks.on_set_interface_config(config.ipv4, config.ipv6, dns_config);
if let Some(waker) = self.waker.take() {
waker.wake();

View File

@@ -55,6 +55,7 @@ zip = { version = "0.6.6", features = ["deflate", "time"], default-features = fa
dirs = "5.0.1"
# Used for infinite `pending` on not-yet-implemented functions
futures = "0.3.30"
tokio-util = { version = "0.7.10", features = ["codec"] }
[target.'cfg(target_os = "macos")'.dependencies]
dirs = "5.0.1"
@@ -68,7 +69,6 @@ windows-core = "0.56.0"
windows-implement = "0.56.0"
winreg = "0.52.0"
wintun = "0.4.0"
ipconfig = "0.3.2"
[target.'cfg(target_os = "windows")'.dependencies.windows]
version = "0.56.0"

View File

@@ -48,12 +48,20 @@ mod tunnel_wrapper_ipc;
#[cfg(target_os = "linux")]
use tunnel_wrapper_ipc as tunnel_wrapper;
*/
#[cfg(any(target_os = "linux", target_os = "windows"))]
#[cfg(target_os = "windows")]
#[path = "tunnel-wrapper/in_proc.rs"]
mod tunnel_wrapper_in_proc;
#[cfg(any(target_os = "linux", target_os = "windows"))]
#[cfg(target_os = "linux")]
#[path = "tunnel-wrapper/ipc.rs"]
mod tunnel_wrapper_ipc;
#[cfg(target_os = "windows")]
use tunnel_wrapper_in_proc as tunnel_wrapper;
#[cfg(target_os = "linux")]
use tunnel_wrapper_ipc as tunnel_wrapper;
pub(crate) type CtlrTx = mpsc::Sender<ControllerRequest>;
/// All managed state that we might need to access from odd places like Tauri commands.
@@ -513,7 +521,7 @@ struct Session {
impl Controller {
// TODO: Figure out how re-starting sessions automatically will work
/// Pre-req: the auth module must be signed in
fn start_session(&mut self, token: SecretString) -> Result<()> {
async fn start_session(&mut self, token: SecretString) -> Result<()> {
if self.session.is_some() {
bail!("can't start session, we're already in a session");
}
@@ -531,14 +539,17 @@ impl Controller {
"Calling connlib Session::connect"
);
let connlib = tunnel_wrapper::connect(
let mut connlib = tunnel_wrapper::connect(
api_url.as_str(),
token,
callback_handler.clone(),
tokio::runtime::Handle::current(),
)?;
)
.await?;
connlib.set_dns(client::resolvers::get().unwrap_or_default());
connlib
.set_dns(client::resolvers::get().unwrap_or_default())
.await?;
self.session = Some(Session {
callback_handler,
@@ -567,7 +578,7 @@ impl Controller {
Ok(())
}
fn handle_deep_link(&mut self, url: &SecretString) -> Result<()> {
async fn handle_deep_link(&mut self, url: &SecretString) -> Result<()> {
let auth_response =
client::deep_link::parse_auth_callback(url).context("Couldn't parse scheme request")?;
@@ -575,6 +586,7 @@ impl Controller {
// Uses `std::fs`
let token = self.auth.handle_response(auth_response)?;
self.start_session(token)
.await
.context("Couldn't start connlib session")?;
Ok(())
}
@@ -591,7 +603,7 @@ impl Controller {
}
Req::Disconnected => {
tracing::info!("Disconnected by connlib");
self.sign_out()?;
self.sign_out().await?;
os::show_notification(
"Firezone disconnected",
"To access resources, sign in again.",
@@ -606,6 +618,7 @@ impl Controller {
}
Req::SchemeRequest(url) => self
.handle_deep_link(&url)
.await
.context("Couldn't handle deep link")?,
Req::SignIn | Req::SystemTrayMenu(TrayMenuEvent::SignIn) => {
if let Some(req) = self.auth.start_sign_in()? {
@@ -626,11 +639,11 @@ impl Controller {
tracing::warn!(
"Connlib is already raising the tunnel, calling `sign_out` anyway"
);
self.sign_out()?;
self.sign_out().await?;
}
} else {
tracing::info!("Calling `sign_out` to cancel sign-in");
self.sign_out()?;
self.sign_out().await?;
}
}
Req::SystemTrayMenu(TrayMenuEvent::ShowWindow(window)) => {
@@ -650,7 +663,7 @@ impl Controller {
.context("Couldn't copy resource to clipboard")?,
Req::SystemTrayMenu(TrayMenuEvent::SignOut) => {
tracing::info!("User asked to sign out");
self.sign_out()?;
self.sign_out().await?;
}
Req::SystemTrayMenu(TrayMenuEvent::Quit) => {
bail!("Impossible error: `Quit` should be handled before this")
@@ -726,14 +739,14 @@ impl Controller {
}
/// Deletes the auth token, stops connlib, and refreshes the tray menu
fn sign_out(&mut self) -> Result<()> {
async fn sign_out(&mut self) -> Result<()> {
self.auth.sign_out()?;
self.tunnel_ready = false;
if let Some(session) = self.session.take() {
tracing::debug!("disconnecting connlib");
// This is redundant if the token is expired, in that case
// connlib already disconnected itself.
session.connlib.disconnect();
session.connlib.disconnect().await?;
} else {
// Might just be because we got a double sign-out or
// the user canceled the sign-in or something innocent.
@@ -798,6 +811,7 @@ async fn run_controller(
{
controller
.start_session(token)
.await
.context("Failed to restart session during app start")?;
} else {
tracing::info!("No token / actor_name on disk, starting in signed-out state");
@@ -814,16 +828,19 @@ async fn run_controller(
loop {
tokio::select! {
() = controller.notify_controller.notified() => if let Err(error) = controller.refresh_system_tray_menu() {
tracing::error!(?error, "Failed to reload resource list");
},
() = controller.notify_controller.notified() => {
tracing::debug!("Controller notified of new resources");
if let Err(error) = controller.refresh_system_tray_menu() {
tracing::error!(?error, "Failed to reload resource list");
}
}
() = com_worker.notified() => {
let new_have_internet = network_changes::check_internet().context("Failed to check for internet")?;
if new_have_internet != have_internet {
have_internet = new_have_internet;
if let Some(session) = controller.session.as_mut() {
tracing::debug!("Internet up/down changed, calling `Session::reconnect`");
session.connlib.reconnect();
session.connlib.reconnect().await?;
}
}
},
@@ -831,7 +848,7 @@ async fn run_controller(
r?;
if let Some(session) = controller.session.as_mut() {
tracing::debug!("New DNS resolvers, calling `Session::set_dns`");
session.connlib.set_dns(client::resolvers::get().unwrap_or_default());
session.connlib.set_dns(client::resolvers::get().unwrap_or_default()).await?;
}
},
req = rx.recv() => {

View File

@@ -32,18 +32,6 @@ mod imp {
use std::net::IpAddr;
pub fn get() -> Result<Vec<IpAddr>> {
let resolvers = ipconfig::get_adapters()?
.iter()
.flat_map(|adapter| adapter.dns_servers())
.filter(|ip| match ip {
IpAddr::V4(_) => true,
// Filter out bogus DNS resolvers on my dev laptop that start with fec0:
IpAddr::V6(ip) => !ip.octets().starts_with(&[0xfe, 0xc0]),
})
.copied()
.collect();
// This is private, so keep it at `debug` or `trace`
tracing::debug!(?resolvers);
Ok(resolvers)
firezone_headless_client::imp::system_resolvers()
}
}

View File

@@ -48,20 +48,30 @@ pub(crate) struct TunnelWrapper {
}
impl TunnelWrapper {
pub(crate) fn disconnect(self) {
self.session.disconnect()
#[allow(clippy::unused_async)]
pub(crate) async fn disconnect(self) -> Result<()> {
self.session.disconnect();
Ok(())
}
pub(crate) fn reconnect(&self) {
self.session.reconnect()
#[allow(clippy::unused_async)]
pub(crate) async fn reconnect(&mut self) -> Result<()> {
self.session.reconnect();
Ok(())
}
pub(crate) fn set_dns(&self, dns: Vec<IpAddr>) {
self.session.set_dns(dns)
#[allow(clippy::unused_async)]
pub(crate) async fn set_dns(&mut self, dns: Vec<IpAddr>) -> Result<()> {
self.session.set_dns(dns);
Ok(())
}
}
pub fn connect(
/// Starts connlib in-process
///
/// This is `async` because the IPC version is async
#[allow(clippy::unused_async)]
pub async fn connect(
api_url: &str,
token: SecretString,
callback_handler: CallbackHandler,

View File

@@ -0,0 +1,145 @@
use anyhow::{Context, Result};
use arc_swap::ArcSwap;
use connlib_client_shared::{file_logger, Callbacks, ResourceDescription};
use firezone_headless_client::{imp::sock_path, IpcClientMsg, IpcServerMsg};
use futures::{SinkExt, StreamExt};
use secrecy::{ExposeSecret, SecretString};
use std::{
net::{IpAddr, Ipv4Addr, Ipv6Addr},
sync::Arc,
};
use tokio::{
net::{unix::OwnedWriteHalf, UnixStream},
sync::Notify,
};
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use super::ControllerRequest;
use super::CtlrTx;
#[derive(Clone)]
pub(crate) struct CallbackHandler {
pub _logger: file_logger::Handle,
pub notify_controller: Arc<Notify>,
pub ctlr_tx: CtlrTx,
pub resources: Arc<ArcSwap<Vec<ResourceDescription>>>,
}
/// Forwards events to and from connlib
pub(crate) struct TunnelWrapper {
recv_task: tokio::task::JoinHandle<Result<()>>,
tx: FramedWrite<OwnedWriteHalf, LengthDelimitedCodec>,
}
impl TunnelWrapper {
pub(crate) async fn disconnect(mut self) -> Result<()> {
self.send_msg(&IpcClientMsg::Disconnect)
.await
.context("Couldn't send Disconnect")?;
self.tx.close().await?;
self.recv_task.abort();
Ok(())
}
pub(crate) async fn reconnect(&mut self) -> Result<()> {
self.send_msg(&IpcClientMsg::Reconnect)
.await
.context("Couldn't send Reconnect")?;
Ok(())
}
/// Tell connlib about the system's default resolvers
///
/// `dns` is passed as value because the in-proc impl needs that
pub(crate) async fn set_dns(&mut self, dns: Vec<IpAddr>) -> Result<()> {
self.send_msg(&IpcClientMsg::SetDns(dns))
.await
.context("Couldn't send SetDns")?;
Ok(())
}
async fn send_msg(&mut self, msg: &IpcClientMsg) -> Result<()> {
self.tx
.send(
serde_json::to_string(msg)
.context("Couldn't encode IPC message as JSON")?
.into(),
)
.await
.context("Couldn't send IPC message")?;
Ok(())
}
}
pub async fn connect(
api_url: &str,
token: SecretString,
callback_handler: CallbackHandler,
tokio_handle: tokio::runtime::Handle,
) -> Result<TunnelWrapper> {
tracing::info!(pid = std::process::id(), "Connecting to IPC service...");
let stream = UnixStream::connect(sock_path())
.await
.context("Couldn't connect to UDS")?;
let (rx, tx) = stream.into_split();
let mut rx = FramedRead::new(rx, LengthDelimitedCodec::new());
let tx = FramedWrite::new(tx, LengthDelimitedCodec::new());
// TODO: Make sure this joins / drops somewhere
let recv_task = tokio_handle.spawn(async move {
while let Some(msg) = rx.next().await {
let msg = msg?;
let msg: IpcServerMsg = serde_json::from_slice(&msg)?;
match msg {
IpcServerMsg::Ok => {}
IpcServerMsg::OnDisconnect => callback_handler.on_disconnect(
&connlib_client_shared::Error::Other("errors can't be serialized"),
),
IpcServerMsg::OnUpdateResources(v) => callback_handler.on_update_resources(v),
IpcServerMsg::TunnelReady => callback_handler.on_tunnel_ready(),
}
}
Ok(())
});
let mut client = TunnelWrapper { recv_task, tx };
let token = token.expose_secret().clone();
client
.send_msg(&IpcClientMsg::Connect {
api_url: api_url.to_string(),
token,
})
.await
.context("Couldn't send Connect message")?;
Ok(client)
}
// Callbacks must all be non-blocking
// TODO: DRY
impl connlib_client_shared::Callbacks for CallbackHandler {
fn on_disconnect(&self, error: &connlib_client_shared::Error) {
tracing::debug!("on_disconnect {error:?}");
self.ctlr_tx
.try_send(ControllerRequest::Disconnected)
.expect("controller channel failed");
}
fn on_set_interface_config(&self, _: Ipv4Addr, _: Ipv6Addr, _: Vec<IpAddr>) -> Option<i32> {
unimplemented!()
}
fn on_update_resources(&self, resources: Vec<ResourceDescription>) {
tracing::debug!("on_update_resources");
self.resources.store(resources.into());
self.notify_controller.notify_one();
}
}
impl CallbackHandler {
fn on_tunnel_ready(&self) {
self.ctlr_tx
.try_send(ControllerRequest::TunnelReady)
.expect("controller channel failed");
}
}

View File

@@ -10,27 +10,29 @@ authors = ["Firezone, Inc."]
[dependencies]
anyhow = { version = "1.0" }
clap = { version = "4.5", features = ["derive", "env"] }
connlib-client-shared = { workspace = true }
connlib-shared = { workspace = true }
firezone-cli-utils = { workspace = true }
git-version = "0.3.9"
humantime = "2.1"
secrecy = { workspace = true }
serde = { version = "1.0.197", features = ["derive"] }
serde_json = "1.0.115"
# This actually relies on many other features in Tokio, so this will probably
# fail to build outside the workspace. <https://github.com/firezone/firezone/pull/4328#discussion_r1540342142>
tokio = { version = "1.36.0", features = ["macros", "signal"] }
tracing = { workspace = true }
url = { version = "2.3.1", default-features = false }
[target.'cfg(target_os = "linux")'.dependencies]
connlib-client-shared = { workspace = true }
connlib-shared = { workspace = true }
dirs = "5.0.1"
firezone-cli-utils = { workspace = true }
futures = "0.3.30"
nix = { version = "0.28.0", features = ["fs", "user"] }
resolv-conf = "0.7.0"
sd-notify = "0.4.1" # This is a pure Rust re-implementation, so it isn't vulnerable to CVE-2024-3094
serde_json = "1.0.116"
secrecy = { workspace = true }
# This actually relies on many other features in Tokio, so this will probably
# fail to build outside the workspace. <https://github.com/firezone/firezone/pull/4328#discussion_r1540342142>
tokio = { version = "1.36.0", features = ["macros", "signal"] }
tokio-util = { version = "0.7.10", features = ["codec"] }
[target.'cfg(target_os = "windows")'.dependencies]
ipconfig = "0.3.2"
[lints]
workspace = true

View File

@@ -1,125 +1,75 @@
//! Implementation, Linux-specific
use super::{Cli, Cmd, TOKEN_ENV_KEY};
use anyhow::{bail, Context, Result};
use clap::Parser;
use connlib_client_shared::{file_logger, Callbacks, Session, Sockets};
use super::{Cli, IpcClientMsg, IpcServerMsg, TOKEN_ENV_KEY};
use anyhow::{bail, Context as _, Result};
use connlib_client_shared::{Callbacks, ResourceDescription, Sockets};
use connlib_shared::{
keypair,
linux::{etc_resolv_conf, get_dns_control_from_env, DnsControlMethod},
LoginUrl,
};
use firezone_cli_utils::setup_global_subscriber;
use futures::{SinkExt, StreamExt};
use secrecy::SecretString;
use std::{future, net::IpAddr, path::PathBuf, str::FromStr, task::Poll};
use std::{
net::{IpAddr, Ipv4Addr, Ipv6Addr},
os::unix::fs::PermissionsExt,
path::{Path, PathBuf},
str::FromStr,
task::{Context, Poll},
};
use tokio::{
net::{UnixListener, UnixStream},
signal::unix::SignalKind,
signal::unix::SignalKind as TokioSignalKind,
sync::mpsc,
};
use tokio_util::codec::LengthDelimitedCodec;
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use url::Url;
// The Client currently must run as root to control DNS
// Root group and user are used to check file ownership on the token
const ROOT_GROUP: u32 = 0;
const ROOT_USER: u32 = 0;
pub(crate) struct Signals {
sighup: tokio::signal::unix::Signal,
sigint: tokio::signal::unix::Signal,
}
impl Signals {
pub(crate) fn new() -> Result<Self> {
let sighup = tokio::signal::unix::signal(TokioSignalKind::hangup())?;
let sigint = tokio::signal::unix::signal(TokioSignalKind::interrupt())?;
Ok(Self { sighup, sigint })
}
pub(crate) fn poll(&mut self, cx: &mut Context) -> Poll<super::SignalKind> {
if self.sigint.poll_recv(cx).is_ready() {
return Poll::Ready(super::SignalKind::Interrupt);
}
if self.sighup.poll_recv(cx).is_ready() {
return Poll::Ready(super::SignalKind::Hangup);
}
Poll::Pending
}
}
pub fn default_token_path() -> PathBuf {
PathBuf::from("/etc")
.join(connlib_shared::BUNDLE_ID)
.join("token")
}
pub fn run() -> Result<()> {
let mut cli = Cli::parse();
// Modifying the environment of a running process is unsafe. If any other
// thread is reading or writing the environment, something bad can happen.
// So `run` must take over as early as possible during startup, and
// take the token env var before any other threads spawn.
let token_env_var = cli.token.take().map(SecretString::from);
let cli = cli;
// Docs indicate that `remove_var` should actually be marked unsafe
// SAFETY: We haven't spawned any other threads, this code should be the first
// thing to run after entering `main`. So nobody else is reading the environment.
#[allow(unused_unsafe)]
unsafe {
// This removes the token from the environment per <https://security.stackexchange.com/a/271285>. We run as root so it may not do anything besides defense-in-depth.
std::env::remove_var(TOKEN_ENV_KEY);
}
assert!(std::env::var(TOKEN_ENV_KEY).is_err());
let (layer, _handle) = cli.log_dir.as_deref().map(file_logger::layer).unzip();
setup_global_subscriber(layer);
tracing::info!(git_version = crate::GIT_VERSION);
match cli.command() {
Cmd::Auto => {
if let Some(token) = get_token(token_env_var, &cli)? {
run_standalone(cli, &token)
} else {
run_ipc_service(cli)
}
}
Cmd::IpcService => run_ipc_service(cli),
Cmd::Standalone => {
let token = get_token(token_env_var, &cli)?.with_context(|| {
format!(
"Can't find the Firezone token in ${TOKEN_ENV_KEY} or in `{}`",
cli.token_path
)
})?;
run_standalone(cli, &token)
}
Cmd::StubIpcClient => run_debug_ipc_client(cli),
}
}
/// Read the token from disk if it was not in the environment
///
/// # Returns
/// - `Ok(None)` if there is no token to be found
/// - `Ok(Some(_))` if we found the token
/// - `Err(_)` if we found the token on disk but failed to read it
fn get_token(token_env_var: Option<SecretString>, cli: &Cli) -> Result<Option<SecretString>> {
// This is very simple but I don't want to write it twice
if let Some(token) = token_env_var {
return Ok(Some(token));
}
read_token_file(cli)
}
/// Try to retrieve the token from disk
///
/// Sync because we do blocking file I/O
fn read_token_file(cli: &Cli) -> Result<Option<SecretString>> {
let path = PathBuf::from(&cli.token_path);
if let Ok(token) = std::env::var(TOKEN_ENV_KEY) {
std::env::remove_var(TOKEN_ENV_KEY);
let token = SecretString::from(token);
// Token was provided in env var
tracing::info!(
?path,
?TOKEN_ENV_KEY,
"Found token in env var, ignoring any token that may be on disk."
);
return Ok(Some(token));
}
let Ok(stat) = nix::sys::stat::fstatat(None, &path, nix::fcntl::AtFlags::empty()) else {
pub(crate) fn check_token_permissions(path: &Path) -> Result<()> {
let Ok(stat) = nix::sys::stat::fstatat(None, path, nix::fcntl::AtFlags::empty()) else {
// File doesn't exist or can't be read
tracing::info!(
?path,
?TOKEN_ENV_KEY,
"No token found in env var or on disk"
);
return Ok(None);
bail!("Token file doesn't exist");
};
if stat.st_uid != ROOT_USER {
bail!(
@@ -139,97 +89,11 @@ fn read_token_file(cli: &Cli) -> Result<Option<SecretString>> {
path.display()
);
}
let Ok(bytes) = std::fs::read(&path) else {
// We got the metadata a second ago, but can't read the file itself.
// Pretty strange, would have to be a disk fault or TOCTOU.
tracing::info!(?path, "Token file existed but now is unreadable");
return Ok(None);
};
let token = String::from_utf8(bytes)?.trim().to_string();
let token = SecretString::from(token);
tracing::info!(?path, "Loaded token from disk");
Ok(Some(token))
Ok(())
}
fn run_standalone(cli: Cli, token: &SecretString) -> Result<()> {
tracing::info!("Running in standalone mode");
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
let _guard = rt.enter();
let max_partition_time = cli.max_partition_time.map(|d| d.into());
// AKA "Device ID", not the Firezone slug
let firezone_id = match cli.firezone_id {
Some(id) => id,
None => connlib_shared::device_id::get().context("Could not get `firezone_id` from CLI, could not read it from disk, could not generate it and save it to disk")?.id,
};
let (private_key, public_key) = keypair();
let login = LoginUrl::client(cli.api_url, token, firezone_id, None, public_key.to_bytes())?;
if cli.check {
tracing::info!("Check passed");
return Ok(());
}
let (on_disconnect_tx, mut on_disconnect_rx) = mpsc::channel(1);
let callback_handler = CallbackHandler { on_disconnect_tx };
let session = Session::connect(
login,
Sockets::new(),
private_key,
None,
callback_handler,
max_partition_time,
rt.handle().clone(),
);
// TODO: this should be added dynamically
session.set_dns(system_resolvers(get_dns_control_from_env()).unwrap_or_default());
let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())?;
let mut sighup = tokio::signal::unix::signal(SignalKind::hangup())?;
let result = rt.block_on(async {
future::poll_fn(|cx| loop {
match on_disconnect_rx.poll_recv(cx) {
Poll::Ready(Some(error)) => return Poll::Ready(Err(anyhow::anyhow!(error))),
Poll::Ready(None) => {
return Poll::Ready(Err(anyhow::anyhow!(
"on_disconnect_rx unexpectedly ran empty"
)))
}
Poll::Pending => {}
}
if sigint.poll_recv(cx).is_ready() {
tracing::debug!("Received SIGINT");
return Poll::Ready(Ok(()));
}
if sighup.poll_recv(cx).is_ready() {
tracing::debug!("Received SIGHUP");
session.reconnect();
continue;
}
return Poll::Pending;
})
.await
});
session.disconnect();
result
}
fn system_resolvers(dns_control_method: Option<DnsControlMethod>) -> Result<Vec<IpAddr>> {
match dns_control_method {
pub(crate) fn system_resolvers() -> Result<Vec<IpAddr>> {
match get_dns_control_from_env() {
None => get_system_default_resolvers_resolv_conf(),
Some(DnsControlMethod::EtcResolvConf) => get_system_default_resolvers_resolv_conf(),
Some(DnsControlMethod::NetworkManager) => get_system_default_resolvers_network_manager(),
@@ -237,29 +101,6 @@ fn system_resolvers(dns_control_method: Option<DnsControlMethod>) -> Result<Vec<
}
}
#[derive(Clone)]
struct CallbackHandler {
/// Channel for an error message if connlib disconnects due to an error
on_disconnect_tx: mpsc::Sender<String>,
}
impl Callbacks for CallbackHandler {
fn on_disconnect(&self, error: &connlib_client_shared::Error) {
// Convert the error to a String since we can't clone it
self.on_disconnect_tx
.try_send(error.to_string())
.expect("should be able to tell the main thread that we disconnected");
}
fn on_update_resources(&self, resources: Vec<connlib_client_shared::ResourceDescription>) {
// See easily with `export RUST_LOG=firezone_headless_client=debug`
tracing::debug!(len = resources.len(), "Printing the resource list one time");
for resource in &resources {
tracing::debug!(?resource);
}
}
}
fn get_system_default_resolvers_resolv_conf() -> Result<Vec<IpAddr>> {
// Assume that `configure_resolv_conf` has run in `tun_linux.rs`
@@ -317,35 +158,19 @@ fn parse_resolvectl_output(s: &str) -> Vec<IpAddr> {
/// on some systems, `/run` should be the newer version.
///
/// Also systemd can create this dir with the `RuntimeDir=` directive which is nice.
fn sock_path() -> PathBuf {
pub fn sock_path() -> PathBuf {
PathBuf::from("/run")
.join(connlib_shared::BUNDLE_ID)
.join("ipc.sock")
}
fn run_debug_ipc_client(_cli: Cli) -> Result<()> {
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(async {
tracing::info!(pid = std::process::id(), "run_debug_ipc_client");
let sock_path = sock_path();
let stream = UnixStream::connect(&sock_path)
.await
.with_context(|| format!("couldn't connect to UDS at {}", sock_path.display()))?;
let mut stream = IpcStream::new(stream, LengthDelimitedCodec::new());
stream.send(serde_json::to_string("Hello")?.into()).await?;
Ok::<_, anyhow::Error>(())
})?;
Ok(())
}
fn run_ipc_service(_cli: Cli) -> Result<()> {
pub(crate) fn run_ipc_service(cli: Cli) -> Result<()> {
let rt = tokio::runtime::Runtime::new()?;
tracing::info!("run_daemon");
rt.block_on(async { ipc_listen().await })
rt.block_on(async { ipc_listen(cli).await })
}
async fn ipc_listen() -> Result<()> {
async fn ipc_listen(cli: Cli) -> Result<()> {
// Find the `firezone` group
let fz_gid = nix::unistd::Group::from_name("firezone")
.context("can't get group by name")?
@@ -358,6 +183,8 @@ async fn ipc_listen() -> Result<()> {
let listener = UnixListener::bind(&sock_path).context("Couldn't bind UDS")?;
std::os::unix::fs::chown(&sock_path, Some(ROOT_USER), Some(fz_gid.into()))
.context("can't set firezone as the group for the UDS")?;
let perms = std::fs::Permissions::from_mode(0o660);
std::fs::set_permissions(sock_path, perms)?;
sd_notify::notify(true, &[sd_notify::NotifyState::Ready])?;
loop {
@@ -374,100 +201,104 @@ async fn ipc_listen() -> Result<()> {
// I'm not sure if we can enforce group membership here - Docker
// might just be enforcing it with filesystem permissions.
// Checking the secondary groups of another user looks complicated.
let stream = IpcStream::new(stream, LengthDelimitedCodec::new());
if let Err(error) = handle_ipc_client(stream).await {
if let Err(error) = handle_ipc_client(&cli, stream).await {
tracing::error!(?error, "Error while handling IPC client");
}
}
}
type IpcStream = tokio_util::codec::Framed<UnixStream, LengthDelimitedCodec>;
#[derive(Clone)]
struct CallbackHandlerIpc {
cb_tx: mpsc::Sender<IpcServerMsg>,
}
async fn handle_ipc_client(mut stream: IpcStream) -> Result<()> {
tracing::info!("Waiting for an IPC message from the GUI...");
impl Callbacks for CallbackHandlerIpc {
fn on_disconnect(&self, _error: &connlib_client_shared::Error) {
self.cb_tx
.try_send(IpcServerMsg::OnDisconnect)
.expect("should be able to send OnDisconnect");
}
let v = stream
.next()
.await
.context("Error while reading IPC message")?
.context("IPC stream empty")?;
let decoded: String = serde_json::from_slice(&v)?;
fn on_set_interface_config(&self, _: Ipv4Addr, _: Ipv6Addr, _: Vec<IpAddr>) -> Option<i32> {
tracing::info!("TunnelReady");
self.cb_tx
.try_send(IpcServerMsg::TunnelReady)
.expect("Should be able to send TunnelReady");
None
}
fn on_update_resources(&self, resources: Vec<ResourceDescription>) {
tracing::info!(len = resources.len(), "New resource list");
self.cb_tx
.try_send(IpcServerMsg::OnUpdateResources(resources))
.expect("Should be able to send OnUpdateResources");
}
}
async fn handle_ipc_client(cli: &Cli, stream: UnixStream) -> Result<()> {
connlib_shared::deactivate_dns_control()?;
let (rx, tx) = stream.into_split();
let mut rx = FramedRead::new(rx, LengthDelimitedCodec::new());
let mut tx = FramedWrite::new(tx, LengthDelimitedCodec::new());
let (cb_tx, mut cb_rx) = mpsc::channel(100);
let send_task = tokio::spawn(async move {
while let Some(msg) = cb_rx.recv().await {
tx.send(serde_json::to_string(&msg)?.into()).await?;
}
Ok::<_, anyhow::Error>(())
});
let mut connlib = None;
let callback_handler = CallbackHandlerIpc { cb_tx };
while let Some(msg) = rx.next().await {
let msg = msg?;
let msg: super::IpcClientMsg = serde_json::from_slice(&msg)?;
match msg {
IpcClientMsg::Connect { api_url, token } => {
let token = secrecy::SecretString::from(token);
assert!(connlib.is_none());
let device_id = connlib_shared::device_id::get()
.context("Failed to read / create device ID")?;
let (private_key, public_key) = keypair();
let login = LoginUrl::client(
Url::parse(&api_url)?,
&token,
device_id.id,
None,
public_key.to_bytes(),
)?;
connlib = Some(connlib_client_shared::Session::connect(
login,
Sockets::new(),
private_key,
None,
callback_handler.clone(),
cli.max_partition_time.map(|t| t.into()),
tokio::runtime::Handle::try_current()?,
));
}
IpcClientMsg::Disconnect => {
if let Some(connlib) = connlib.take() {
connlib.disconnect();
}
}
IpcClientMsg::Reconnect => connlib.as_mut().context("No connlib session")?.reconnect(),
IpcClientMsg::SetDns(v) => connlib.as_mut().context("No connlib session")?.set_dns(v),
}
}
send_task.abort();
tracing::debug!(?decoded, "Received message");
stream.send("OK".to_string().into()).await?;
tracing::info!("Replied. Connection will close");
Ok(())
}
#[cfg(test)]
mod tests {
use super::IpcStream;
use futures::{SinkExt, StreamExt};
use std::net::IpAddr;
use tokio::net::{UnixListener, UnixStream};
use tokio_util::codec::LengthDelimitedCodec;
const MESSAGE_ONE: &str = "message one";
const MESSAGE_TWO: &str = "message two";
#[tokio::test]
async fn ipc() {
let sock_path = dirs::runtime_dir()
.unwrap()
.join("dev.firezone.client_ipc_test");
// Remove the socket if a previous run left it there
tokio::fs::remove_file(&sock_path).await.ok();
let listener = UnixListener::bind(&sock_path).unwrap();
let ipc_server_task = tokio::spawn(async move {
let (stream, _) = listener.accept().await.unwrap();
let cred = stream.peer_cred().unwrap();
// TODO: Check that the user is in the `firezone` group
// For now, to make it work well in CI where that group isn't created,
// just check if it matches our own UID.
let actual_peer_uid = cred.uid();
let expected_peer_uid = nix::unistd::Uid::current().as_raw();
assert_eq!(actual_peer_uid, expected_peer_uid);
let mut stream = IpcStream::new(stream, LengthDelimitedCodec::new());
let v = stream
.next()
.await
.expect("Error while reading IPC message")
.expect("IPC stream empty");
let decoded: String = serde_json::from_slice(&v).unwrap();
assert_eq!(MESSAGE_ONE, decoded);
let v = stream
.next()
.await
.expect("Error while reading IPC message")
.expect("IPC stream empty");
let decoded: String = serde_json::from_slice(&v).unwrap();
assert_eq!(MESSAGE_TWO, decoded);
});
tracing::info!(pid = std::process::id(), "Connecting to IPC server");
let stream = UnixStream::connect(&sock_path).await.unwrap();
let mut stream = IpcStream::new(stream, LengthDelimitedCodec::new());
stream
.send(serde_json::to_string(MESSAGE_ONE).unwrap().into())
.await
.unwrap();
stream
.send(serde_json::to_string(MESSAGE_TWO).unwrap().into())
.await
.unwrap();
tokio::time::timeout(std::time::Duration::from_millis(2_000), ipc_server_task)
.await
.unwrap()
.unwrap();
}
#[test]
fn parse_resolvectl_output() {

View File

@@ -0,0 +1,58 @@
use crate::Cli;
use anyhow::Result;
use std::{
net::IpAddr,
path::{Path, PathBuf},
task::{Context, Poll},
};
pub(crate) struct Signals {
sigint: tokio::signal::windows::CtrlC,
}
impl Signals {
pub(crate) fn new() -> Result<Self> {
let sigint = tokio::signal::windows::ctrl_c()?;
Ok(Self { sigint })
}
pub(crate) fn poll(&mut self, cx: &mut Context) -> Poll<super::SignalKind> {
if self.sigint.poll_recv(cx).is_ready() {
return Poll::Ready(super::SignalKind::Interrupt);
}
Poll::Pending
}
}
// The return value is useful on Linux
#[allow(clippy::unnecessary_wraps)]
pub(crate) fn check_token_permissions(_path: &Path) -> Result<()> {
// TODO: Make sure the token is only readable by admin / our service user on Windows
Ok(())
}
pub(crate) fn default_token_path() -> std::path::PathBuf {
// TODO: System-wide default token path for Windows
PathBuf::from("token.txt")
}
pub(crate) fn run_ipc_service(_cli: Cli) -> Result<()> {
// TODO: Process split on Windows
todo!()
}
pub fn system_resolvers() -> Result<Vec<IpAddr>> {
let resolvers = ipconfig::get_adapters()?
.iter()
.flat_map(|adapter| adapter.dns_servers())
.filter(|ip| match ip {
IpAddr::V4(_) => true,
// Filter out bogus DNS resolvers on my dev laptop that start with fec0:
IpAddr::V6(ip) => !ip.octets().starts_with(&[0xfe, 0xc0]),
})
.copied()
.collect();
// This is private, so keep it at `debug` or `trace`
tracing::debug!(?resolvers);
Ok(resolvers)
}

View File

@@ -8,35 +8,27 @@
//! Tauri deb bundler to pick it up easily.
//! Otherwise we would just make it a normal binary crate.
use std::path::PathBuf;
use anyhow::{Context, Result};
use clap::Parser;
use connlib_client_shared::{
file_logger, keypair, Callbacks, LoginUrl, ResourceDescription, Session, Sockets,
};
use firezone_cli_utils::setup_global_subscriber;
use secrecy::SecretString;
use std::{future, net::IpAddr, path::PathBuf, task::Poll};
use tokio::sync::mpsc;
pub use imp::{default_token_path, run};
use imp::default_token_path;
#[cfg(target_os = "linux")]
mod imp_linux;
pub mod imp_linux;
#[cfg(target_os = "linux")]
use imp_linux as imp;
pub use imp_linux as imp;
#[cfg(target_os = "windows")]
mod imp_windows {
use clap::Parser;
pub fn default_token_path() -> std::path::PathBuf {
todo!()
}
pub fn run() -> anyhow::Result<()> {
let cli = super::Cli::parse();
let _cmd = cli.command();
tracing::info!(git_version = crate::GIT_VERSION);
// Clippy will complain that the `Result` type is pointless if we can't
// possibly throw an error, because it doesn't see that the Linux impl does
// throw errors
anyhow::bail!("`headless-client` is not implemented for Windows yet");
}
}
pub mod imp_windows;
#[cfg(target_os = "windows")]
use imp_windows as imp;
pub use imp_windows as imp;
/// Output of `git describe` at compile time
/// e.g. `1.0.0-pre.4-20-ged5437c88-modified` where:
@@ -124,7 +116,217 @@ enum Cmd {
IpcService,
/// Act as a CLI-only Client
Standalone,
/// Act as an IPC client for development
#[command(hide = true)]
StubIpcClient,
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub enum IpcClientMsg {
Connect { api_url: String, token: String },
Disconnect,
Reconnect,
SetDns(Vec<IpAddr>),
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub enum IpcServerMsg {
Ok,
OnDisconnect,
OnUpdateResources(Vec<ResourceDescription>),
TunnelReady,
}
pub fn run() -> Result<()> {
let mut cli = Cli::parse();
// Modifying the environment of a running process is unsafe. If any other
// thread is reading or writing the environment, something bad can happen.
// So `run` must take over as early as possible during startup, and
// take the token env var before any other threads spawn.
let token_env_var = cli.token.take().map(SecretString::from);
let cli = cli;
// Docs indicate that `remove_var` should actually be marked unsafe
// SAFETY: We haven't spawned any other threads, this code should be the first
// thing to run after entering `main`. So nobody else is reading the environment.
#[allow(unused_unsafe)]
unsafe {
// This removes the token from the environment per <https://security.stackexchange.com/a/271285>. We run as root so it may not do anything besides defense-in-depth.
std::env::remove_var(TOKEN_ENV_KEY);
}
assert!(std::env::var(TOKEN_ENV_KEY).is_err());
let (layer, _handle) = cli.log_dir.as_deref().map(file_logger::layer).unzip();
setup_global_subscriber(layer);
tracing::info!(git_version = crate::GIT_VERSION);
match cli.command() {
Cmd::Auto => {
if let Some(token) = get_token(token_env_var, &cli)? {
run_standalone(cli, &token)
} else {
imp::run_ipc_service(cli)
}
}
Cmd::IpcService => imp::run_ipc_service(cli),
Cmd::Standalone => {
let token = get_token(token_env_var, &cli)?.with_context(|| {
format!(
"Can't find the Firezone token in ${TOKEN_ENV_KEY} or in `{}`",
cli.token_path
)
})?;
run_standalone(cli, &token)
}
}
}
// Allow dead code because Windows doesn't have an obvious SIGHUP equivalent
#[allow(dead_code)]
enum SignalKind {
Hangup,
Interrupt,
}
fn run_standalone(cli: Cli, token: &SecretString) -> Result<()> {
tracing::info!("Running in standalone mode");
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
let _guard = rt.enter();
// TODO: Should this default to 30 days?
let max_partition_time = cli.max_partition_time.map(|d| d.into());
// AKA "Device ID", not the Firezone slug
let firezone_id = match cli.firezone_id {
Some(id) => id,
None => connlib_shared::device_id::get().context("Could not get `firezone_id` from CLI, could not read it from disk, could not generate it and save it to disk")?.id,
};
let (private_key, public_key) = keypair();
let login = LoginUrl::client(cli.api_url, token, firezone_id, None, public_key.to_bytes())?;
if cli.check {
tracing::info!("Check passed");
return Ok(());
}
let (on_disconnect_tx, mut on_disconnect_rx) = mpsc::channel(1);
let callback_handler = CallbackHandler { on_disconnect_tx };
let session = Session::connect(
login,
Sockets::new(),
private_key,
None,
callback_handler,
max_partition_time,
rt.handle().clone(),
);
// TODO: this should be added dynamically
session.set_dns(imp::system_resolvers().unwrap_or_default());
let mut signals = imp::Signals::new()?;
let result = rt.block_on(async {
future::poll_fn(|cx| loop {
match on_disconnect_rx.poll_recv(cx) {
Poll::Ready(Some(error)) => return Poll::Ready(Err(anyhow::anyhow!(error))),
Poll::Ready(None) => {
return Poll::Ready(Err(anyhow::anyhow!(
"on_disconnect_rx unexpectedly ran empty"
)))
}
Poll::Pending => {}
}
match signals.poll(cx) {
Poll::Ready(SignalKind::Hangup) => {
session.reconnect();
continue;
}
Poll::Ready(SignalKind::Interrupt) => return Poll::Ready(Ok(())),
Poll::Pending => return Poll::Pending,
}
})
.await
});
session.disconnect();
result
}
#[derive(Clone)]
struct CallbackHandler {
/// Channel for an error message if connlib disconnects due to an error
on_disconnect_tx: mpsc::Sender<String>,
}
impl Callbacks for CallbackHandler {
fn on_disconnect(&self, error: &connlib_client_shared::Error) {
// Convert the error to a String since we can't clone it
self.on_disconnect_tx
.try_send(error.to_string())
.expect("should be able to tell the main thread that we disconnected");
}
fn on_update_resources(&self, resources: Vec<connlib_client_shared::ResourceDescription>) {
// See easily with `export RUST_LOG=firezone_headless_client=debug`
tracing::debug!(len = resources.len(), "Printing the resource list one time");
for resource in &resources {
tracing::debug!(?resource);
}
}
}
/// Read the token from disk if it was not in the environment
///
/// # Returns
/// - `Ok(None)` if there is no token to be found
/// - `Ok(Some(_))` if we found the token
/// - `Err(_)` if we found the token on disk but failed to read it
fn get_token(token_env_var: Option<SecretString>, cli: &Cli) -> Result<Option<SecretString>> {
// This is very simple but I don't want to write it twice
if let Some(token) = token_env_var {
return Ok(Some(token));
}
read_token_file(cli)
}
/// Try to retrieve the token from disk
///
/// Sync because we do blocking file I/O
fn read_token_file(cli: &Cli) -> Result<Option<SecretString>> {
let path = PathBuf::from(&cli.token_path);
if let Ok(token) = std::env::var(TOKEN_ENV_KEY) {
std::env::remove_var(TOKEN_ENV_KEY);
let token = SecretString::from(token);
// Token was provided in env var
tracing::info!(
?path,
?TOKEN_ENV_KEY,
"Found token in env var, ignoring any token that may be on disk."
);
return Ok(Some(token));
}
if std::fs::metadata(&path).is_err() {
return Ok(None);
}
imp::check_token_permissions(&path)?;
let Ok(bytes) = std::fs::read(&path) else {
// We got the metadata a second ago, but can't read the file itself.
// Pretty strange, would have to be a disk fault or TOCTOU.
tracing::info!(?path, "Token file existed but now is unreadable");
return Ok(None);
};
let token = String::from_utf8(bytes)?.trim().to_string();
let token = SecretString::from(token);
tracing::info!(?path, "Loaded token from disk");
Ok(Some(token))
}