refactor(headless-client): move IPC servers into their own module (#5360)

Makes #5357 easier by freeing up space in the catch-all `platform` mod
This commit is contained in:
Reactor Scram
2024-06-14 10:40:32 -05:00
committed by GitHub
parent 90ebe193f4
commit 6c83f76a1f
8 changed files with 246 additions and 232 deletions

View File

@@ -79,7 +79,7 @@ pub async fn open(url: &url::Url) -> Result<()> {
}
fn pipe_path() -> String {
firezone_headless_client::windows::named_pipe_path(&format!("{BUNDLE_ID}.deep_link"))
firezone_headless_client::ipc::platform::named_pipe_path(&format!("{BUNDLE_ID}.deep_link"))
}
/// Registers the current exe as the handler for our deep link scheme.

View File

@@ -11,7 +11,7 @@ pub(crate) type IpcStream = named_pipe::NamedPipeClient;
/// This is async on Linux
#[allow(clippy::unused_async)]
pub(crate) async fn connect_to_service() -> Result<IpcStream> {
let path = firezone_headless_client::windows::pipe_path();
let path = firezone_headless_client::ipc::platform::pipe_path();
let stream = named_pipe::ClientOptions::new()
.open(path)
.with_context(|| "Couldn't connect to named pipe server at `{path}`")?;

View File

@@ -0,0 +1,11 @@
#[cfg(target_os = "linux")]
mod linux;
#[cfg(target_os = "linux")]
use linux as platform;
#[cfg(target_os = "windows")]
pub mod windows;
#[cfg(target_os = "windows")]
pub use windows as platform;
pub(crate) use platform::{Server, Stream};

View File

@@ -0,0 +1,53 @@
use anyhow::{Context as _, Result};
use std::{os::unix::fs::PermissionsExt, path::Path};
use tokio::net::{UnixListener, UnixStream};
pub(crate) struct Server {
listener: UnixListener,
}
/// Opaque wrapper around platform-specific IPC stream
pub(crate) type Stream = UnixStream;
impl Server {
/// Platform-specific setup
pub(crate) async fn new() -> Result<Self> {
Self::new_with_path(&crate::platform::sock_path()).await
}
/// Uses a test path instead of what prod uses
///
/// The test path doesn't need admin powers and won't conflict with the prod
/// IPC service on a dev machine.
#[cfg(test)]
pub(crate) async fn new_for_test() -> Result<Self> {
let dir = crate::known_dirs::runtime().context("Can't find runtime dir")?;
// On a CI runner, the dir might not exist yet
tokio::fs::create_dir_all(&dir).await?;
let sock_path = dir.join("ipc_test.sock");
Self::new_with_path(&sock_path).await
}
async fn new_with_path(sock_path: &Path) -> Result<Self> {
// Remove the socket if a previous run left it there
tokio::fs::remove_file(sock_path).await.ok();
let listener = UnixListener::bind(sock_path).context("Couldn't bind UDS")?;
let perms = std::fs::Permissions::from_mode(0o660);
tokio::fs::set_permissions(sock_path, perms).await?;
sd_notify::notify(true, &[sd_notify::NotifyState::Ready])?;
Ok(Self { listener })
}
pub(crate) async fn next_client(&mut self) -> Result<Stream> {
tracing::info!("Listening for GUI to connect over IPC...");
let (stream, _) = self.listener.accept().await?;
let cred = stream.peer_cred()?;
tracing::info!(
uid = cred.uid(),
gid = cred.gid(),
pid = cred.pid(),
"Accepted an IPC connection"
);
Ok(stream)
}
}

View File

@@ -0,0 +1,171 @@
use anyhow::{bail, Context as _, Result};
use connlib_shared::BUNDLE_ID;
use std::{ffi::c_void, os::windows::io::AsRawHandle, time::Duration};
use tokio::net::windows::named_pipe;
use windows::Win32::{
Foundation::HANDLE, Security as WinSec, System::Pipes::GetNamedPipeClientProcessId,
};
pub(crate) struct Server {
pipe_path: String,
}
/// Opaque wrapper around platform-specific IPC stream
pub(crate) type Stream = named_pipe::NamedPipeServer;
impl Server {
/// Platform-specific setup
///
/// This is async on Linux
#[allow(clippy::unused_async)]
pub(crate) async fn new() -> Result<Self> {
Self::new_with_path(pipe_path())
}
/// Uses a test path instead of what prod uses
///
/// The test path doesn't need admin powers and won't conflict with the prod
/// IPC service on a dev machine.
///
/// This is async on Linux
#[allow(clippy::unused_async)]
#[cfg(test)]
pub(crate) async fn new_for_test() -> Result<Self> {
let pipe_path = named_pipe_path(&format!("{BUNDLE_ID}_test.ipc_service"));
Self::new_with_path(pipe_path)
}
pub(crate) fn new_with_path(pipe_path: String) -> Result<Self> {
crate::platform::setup_before_connlib()?;
Ok(Self { pipe_path })
}
// `&mut self` needed to match the Linux signature
pub(crate) async fn next_client(&mut self) -> Result<Stream> {
// Fixes #5143. In the IPC service, if we close the pipe and immediately re-open
// it, Tokio may not get a chance to clean up the pipe. Yielding seems to fix
// this in tests, but `yield_now` doesn't make any such guarantees, so
// we also do a loop.
tokio::task::yield_now().await;
let server = self
.bind_to_pipe()
.await
.context("Couldn't bind to named pipe")?;
tracing::info!(
server_pid = std::process::id(),
"Listening for GUI to connect over IPC..."
);
server
.connect()
.await
.context("Couldn't accept IPC connection from GUI")?;
let handle = HANDLE(server.as_raw_handle() as isize);
let mut client_pid: u32 = 0;
// SAFETY: Windows doesn't store this pointer or handle, and we just got the handle
// from Tokio, so it should be valid.
unsafe { GetNamedPipeClientProcessId(handle, &mut client_pid) }
.context("Couldn't get PID of named pipe client")?;
tracing::info!(?client_pid, "Accepted IPC connection");
Ok(server)
}
async fn bind_to_pipe(&self) -> Result<Stream> {
const NUM_ITERS: usize = 10;
// This loop is defense-in-depth. The `yield_now` in `next_client` is enough
// to fix #5143, but Tokio doesn't guarantee any behavior when yielding, so
// the loop will catch it even if yielding doesn't.
for i in 0..NUM_ITERS {
match create_pipe_server(&self.pipe_path) {
Ok(server) => return Ok(server),
Err(PipeError::AccessDenied) => {
tracing::warn!("PipeError::AccessDenied, sleeping... (loop {i})");
tokio::time::sleep(Duration::from_secs(1)).await;
}
Err(error) => Err(error)?,
}
}
bail!("Tried {NUM_ITERS} times to bind the pipe and failed");
}
}
#[derive(Debug, thiserror::Error)]
enum PipeError {
#[error("Access denied - Is another process using this pipe path?")]
AccessDenied,
#[error(transparent)]
Other(#[from] anyhow::Error),
}
fn create_pipe_server(pipe_path: &str) -> Result<named_pipe::NamedPipeServer, PipeError> {
let mut server_options = named_pipe::ServerOptions::new();
server_options.first_pipe_instance(true);
// This will allow non-admin clients to connect to us even though we're running with privilege
let mut sd = WinSec::SECURITY_DESCRIPTOR::default();
let psd = WinSec::PSECURITY_DESCRIPTOR(&mut sd as *mut _ as *mut c_void);
// SAFETY: Unsafe needed to call Win32 API. There shouldn't be any threading or lifetime problems, because we only pass pointers to our local vars to Win32, and Win32 shouldn't sae them anywhere.
unsafe {
// ChatGPT pointed me to these functions
WinSec::InitializeSecurityDescriptor(
psd,
windows::Win32::System::SystemServices::SECURITY_DESCRIPTOR_REVISION,
)
.context("InitializeSecurityDescriptor failed")?;
WinSec::SetSecurityDescriptorDacl(psd, true, None, false)
.context("SetSecurityDescriptorDacl failed")?;
}
let mut sa = WinSec::SECURITY_ATTRIBUTES {
nLength: 0,
lpSecurityDescriptor: psd.0,
bInheritHandle: false.into(),
};
sa.nLength = std::mem::size_of_val(&sa)
.try_into()
.context("Size of SECURITY_ATTRIBUTES struct is not right")?;
let sa_ptr = &mut sa as *mut _ as *mut c_void;
// SAFETY: Unsafe needed to call Win32 API. We only pass pointers to local vars, and Win32 shouldn't store them, so there shouldn't be any threading of lifetime problems.
match unsafe { server_options.create_with_security_attributes_raw(pipe_path, sa_ptr) } {
Ok(x) => Ok(x),
Err(err) => {
if err.kind() == std::io::ErrorKind::PermissionDenied {
tracing::warn!(?pipe_path, "Named pipe `PermissionDenied`");
Err(PipeError::AccessDenied)
} else {
Err(anyhow::Error::from(err).into())
}
}
}
}
/// Named pipe for IPC between GUI client and IPC service
pub fn pipe_path() -> String {
named_pipe_path(&format!("{BUNDLE_ID}.ipc_service"))
}
/// Returns a valid name for a Windows named pipe
///
/// # Arguments
///
/// * `id` - BUNDLE_ID, e.g. `dev.firezone.client`
pub fn named_pipe_path(id: &str) -> String {
format!(r"\\.\pipe\{}", id)
}
#[cfg(test)]
mod tests {
#[test]
fn named_pipe_path() {
assert_eq!(
super::named_pipe_path("dev.firezone.client"),
r"\\.\pipe\dev.firezone.client"
);
}
#[test]
fn pipe_path() {
assert!(super::pipe_path().starts_with(r"\\.\pipe\"));
}
}

View File

@@ -38,6 +38,7 @@ use platform::Signals;
pub(crate) mod device_id;
pub mod dns_control;
pub mod heartbeat;
pub mod ipc;
pub mod known_dirs;
#[cfg(target_os = "linux")]
@@ -50,6 +51,8 @@ pub mod windows;
#[cfg(target_os = "windows")]
pub(crate) use windows as platform;
use ipc::{Server as IpcServer, Stream as IpcStream};
/// Only used on Linux
pub const FIREZONE_GROUP: &str = "firezone-client";
@@ -423,7 +426,7 @@ impl Callbacks for CallbackHandler {
}
async fn ipc_listen() -> Result<std::convert::Infallible> {
let mut server = platform::IpcServer::new().await?;
let mut server = IpcServer::new().await?;
loop {
dns_control::deactivate()?;
let stream = server
@@ -436,7 +439,7 @@ async fn ipc_listen() -> Result<std::convert::Infallible> {
}
}
async fn handle_ipc_client(stream: platform::IpcStream) -> Result<()> {
async fn handle_ipc_client(stream: IpcStream) -> Result<()> {
let (rx, tx) = tokio::io::split(stream);
let mut rx = FramedRead::new(rx, LengthDelimitedCodec::new());
let mut tx = FramedWrite::new(tx, LengthDelimitedCodec::new());
@@ -631,7 +634,7 @@ mod tests {
async fn ipc_server() -> anyhow::Result<()> {
let _ = tracing_subscriber::fmt().with_test_writer().try_init();
let mut server = crate::platform::IpcServer::new_for_test().await?;
let mut server = crate::IpcServer::new_for_test().await?;
for i in 0..5 {
if let Ok(Err(err)) = timeout(Duration::from_secs(1), server.next_client()).await {
Err(err).with_context(|| {

View File

@@ -6,14 +6,10 @@ use connlib_client_shared::file_logger;
use firezone_cli_utils::setup_global_subscriber;
use futures::future::{select, Either};
use std::{
os::unix::fs::PermissionsExt,
path::{Path, PathBuf},
pin::pin,
};
use tokio::{
net::{UnixListener, UnixStream},
signal::unix::{signal, Signal, SignalKind as TokioSignalKind},
};
use tokio::signal::unix::{signal, Signal, SignalKind as TokioSignalKind};
// The Client currently must run as root to control DNS
// Root group and user are used to check file ownership on the token
@@ -119,56 +115,6 @@ pub fn firezone_group() -> Result<nix::unistd::Group> {
Ok(group)
}
pub(crate) struct IpcServer {
listener: UnixListener,
}
/// Opaque wrapper around platform-specific IPC stream
pub(crate) type IpcStream = UnixStream;
impl IpcServer {
/// Platform-specific setup
pub(crate) async fn new() -> Result<Self> {
Self::new_with_path(&sock_path()).await
}
/// Uses a test path instead of what prod uses
///
/// The test path doesn't need admin powers and won't conflict with the prod
/// IPC service on a dev machine.
#[cfg(test)]
pub(crate) async fn new_for_test() -> Result<Self> {
let dir = crate::known_dirs::runtime().context("Can't find runtime dir")?;
// On a CI runner, the dir might not exist yet
tokio::fs::create_dir_all(&dir).await?;
let sock_path = dir.join("ipc_test.sock");
Self::new_with_path(&sock_path).await
}
async fn new_with_path(sock_path: &Path) -> Result<Self> {
// Remove the socket if a previous run left it there
tokio::fs::remove_file(sock_path).await.ok();
let listener = UnixListener::bind(sock_path).context("Couldn't bind UDS")?;
let perms = std::fs::Permissions::from_mode(0o660);
tokio::fs::set_permissions(sock_path, perms).await?;
sd_notify::notify(true, &[sd_notify::NotifyState::Ready])?;
Ok(Self { listener })
}
pub(crate) async fn next_client(&mut self) -> Result<IpcStream> {
tracing::info!("Listening for GUI to connect over IPC...");
let (stream, _) = self.listener.accept().await?;
let cred = stream.peer_cred()?;
tracing::info!(
uid = cred.uid(),
gid = cred.gid(),
pid = cred.pid(),
"Accepted an IPC connection"
);
Ok(stream)
}
}
pub(crate) fn notify_service_controller() -> Result<()> {
Ok(sd_notify::notify(true, &[sd_notify::NotifyState::Ready])?)
}

View File

@@ -5,22 +5,16 @@
//! We must tell Windows explicitly when our service is stopping.
use crate::{CliCommon, SignalKind};
use anyhow::{bail, Context as _, Result};
use anyhow::{Context as _, Result};
use connlib_client_shared::file_logger;
use connlib_shared::BUNDLE_ID;
use std::{
ffi::{c_void, OsString},
os::windows::io::AsRawHandle,
ffi::OsString,
path::{Path, PathBuf},
str::FromStr,
time::Duration,
};
use tokio::net::windows::named_pipe;
use tracing::subscriber::set_global_default;
use tracing_subscriber::{layer::SubscriberExt as _, EnvFilter, Layer, Registry};
use windows::Win32::{
Foundation::HANDLE, Security as WinSec, System::Pipes::GetNamedPipeClientProcessId,
};
use windows_service::{
service::{
ServiceControl, ServiceControlAccept, ServiceExitCode, ServiceState, ServiceStatus,
@@ -199,154 +193,6 @@ fn fallible_windows_service_run(
Ok(())
}
pub(crate) struct IpcServer {
pipe_path: String,
}
/// Opaque wrapper around platform-specific IPC stream
pub(crate) type IpcStream = named_pipe::NamedPipeServer;
impl IpcServer {
/// Platform-specific setup
///
/// This is async on Linux
#[allow(clippy::unused_async)]
pub(crate) async fn new() -> Result<Self> {
Self::new_with_path(pipe_path())
}
/// Uses a test path instead of what prod uses
///
/// The test path doesn't need admin powers and won't conflict with the prod
/// IPC service on a dev machine.
///
/// This is async on Linux
#[allow(clippy::unused_async)]
#[cfg(test)]
pub(crate) async fn new_for_test() -> Result<Self> {
let pipe_path = named_pipe_path(&format!("{BUNDLE_ID}_test.ipc_service"));
Self::new_with_path(pipe_path)
}
pub(crate) fn new_with_path(pipe_path: String) -> Result<Self> {
setup_before_connlib()?;
Ok(Self { pipe_path })
}
// `&mut self` needed to match the Linux signature
pub(crate) async fn next_client(&mut self) -> Result<IpcStream> {
// Fixes #5143. In the IPC service, if we close the pipe and immediately re-open
// it, Tokio may not get a chance to clean up the pipe. Yielding seems to fix
// this in tests, but `yield_now` doesn't make any such guarantees, so
// we also do a loop.
tokio::task::yield_now().await;
let server = self
.bind_to_pipe()
.await
.context("Couldn't bind to named pipe")?;
tracing::info!(
server_pid = std::process::id(),
"Listening for GUI to connect over IPC..."
);
server
.connect()
.await
.context("Couldn't accept IPC connection from GUI")?;
let handle = HANDLE(server.as_raw_handle() as isize);
let mut client_pid: u32 = 0;
// SAFETY: Windows doesn't store this pointer or handle, and we just got the handle
// from Tokio, so it should be valid.
unsafe { GetNamedPipeClientProcessId(handle, &mut client_pid) }
.context("Couldn't get PID of named pipe client")?;
tracing::info!(?client_pid, "Accepted IPC connection");
Ok(server)
}
async fn bind_to_pipe(&self) -> Result<IpcStream> {
const NUM_ITERS: usize = 10;
// This loop is defense-in-depth. The `yield_now` in `next_client` is enough
// to fix #5143, but Tokio doesn't guarantee any behavior when yielding, so
// the loop will catch it even if yielding doesn't.
for i in 0..NUM_ITERS {
match create_pipe_server(&self.pipe_path) {
Ok(server) => return Ok(server),
Err(PipeError::AccessDenied) => {
tracing::warn!("PipeError::AccessDenied, sleeping... (loop {i})");
tokio::time::sleep(Duration::from_secs(1)).await;
}
Err(error) => Err(error)?,
}
}
bail!("Tried {NUM_ITERS} times to bind the pipe and failed");
}
}
#[derive(Debug, thiserror::Error)]
enum PipeError {
#[error("Access denied - Is another process using this pipe path?")]
AccessDenied,
#[error(transparent)]
Other(#[from] anyhow::Error),
}
fn create_pipe_server(pipe_path: &str) -> Result<named_pipe::NamedPipeServer, PipeError> {
let mut server_options = named_pipe::ServerOptions::new();
server_options.first_pipe_instance(true);
// This will allow non-admin clients to connect to us even though we're running with privilege
let mut sd = WinSec::SECURITY_DESCRIPTOR::default();
let psd = WinSec::PSECURITY_DESCRIPTOR(&mut sd as *mut _ as *mut c_void);
// SAFETY: Unsafe needed to call Win32 API. There shouldn't be any threading or lifetime problems, because we only pass pointers to our local vars to Win32, and Win32 shouldn't sae them anywhere.
unsafe {
// ChatGPT pointed me to these functions
WinSec::InitializeSecurityDescriptor(
psd,
windows::Win32::System::SystemServices::SECURITY_DESCRIPTOR_REVISION,
)
.context("InitializeSecurityDescriptor failed")?;
WinSec::SetSecurityDescriptorDacl(psd, true, None, false)
.context("SetSecurityDescriptorDacl failed")?;
}
let mut sa = WinSec::SECURITY_ATTRIBUTES {
nLength: 0,
lpSecurityDescriptor: psd.0,
bInheritHandle: false.into(),
};
sa.nLength = std::mem::size_of_val(&sa)
.try_into()
.context("Size of SECURITY_ATTRIBUTES struct is not right")?;
let sa_ptr = &mut sa as *mut _ as *mut c_void;
// SAFETY: Unsafe needed to call Win32 API. We only pass pointers to local vars, and Win32 shouldn't store them, so there shouldn't be any threading of lifetime problems.
match unsafe { server_options.create_with_security_attributes_raw(pipe_path, sa_ptr) } {
Ok(x) => Ok(x),
Err(err) => {
if err.kind() == std::io::ErrorKind::PermissionDenied {
tracing::warn!(?pipe_path, "Named pipe `PermissionDenied`");
Err(PipeError::AccessDenied)
} else {
Err(anyhow::Error::from(err).into())
}
}
}
}
/// Named pipe for IPC between GUI client and IPC service
pub fn pipe_path() -> String {
named_pipe_path(&format!("{BUNDLE_ID}.ipc_service"))
}
/// Returns a valid name for a Windows named pipe
///
/// # Arguments
///
/// * `id` - BUNDLE_ID, e.g. `dev.firezone.client`
pub fn named_pipe_path(id: &str) -> String {
format!(r"\\.\pipe\{}", id)
}
// Does nothing on Windows. On Linux this notifies systemd that we're ready.
// When we eventually have a system service for the Windows Headless Client,
// this could notify the Windows service controller too.
@@ -359,19 +205,3 @@ pub(crate) fn setup_before_connlib() -> Result<()> {
wintun_install::ensure_dll()?;
Ok(())
}
#[cfg(test)]
mod tests {
#[test]
fn named_pipe_path() {
assert_eq!(
super::named_pipe_path("dev.firezone.client"),
r"\\.\pipe\dev.firezone.client"
);
}
#[test]
fn pipe_path() {
assert!(super::pipe_path().starts_with(r"\\.\pipe\"));
}
}