refactor(gui-client): de-dupe IPC client code (#5091)

Same as in #5074, almost all of the IPC code turned out to be
platform-independent.
Yak shaving towards #5022
This commit is contained in:
Reactor Scram
2024-05-28 10:07:44 -05:00
committed by GitHub
parent bfffcedf47
commit 9d1b14debe
3 changed files with 103 additions and 199 deletions

View File

@@ -1,16 +1,16 @@
use crate::client::gui::{ControllerRequest, CtlrTx};
use anyhow::{Context as _, Result};
use arc_swap::ArcSwap;
use connlib_client_shared::callbacks::ResourceDescription;
use firezone_headless_client::IpcClientMsg;
use connlib_client_shared::{callbacks::ResourceDescription, Callbacks};
use firezone_headless_client::{IpcClientMsg, IpcServerMsg};
use futures::{SinkExt, StreamExt};
use secrecy::{ExposeSecret, SecretString};
use std::{
net::{IpAddr, Ipv4Addr, Ipv6Addr},
sync::Arc,
};
use tokio::sync::Notify;
pub(crate) use platform::Client;
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
#[cfg(target_os = "linux")]
#[path = "ipc/linux.rs"]
@@ -58,7 +58,75 @@ impl connlib_client_shared::Callbacks for CallbackHandler {
}
}
pub(crate) struct Client {
task: tokio::task::JoinHandle<Result<()>>,
// Needed temporarily to avoid a big refactor. We can remove this in the future.
tx: FramedWrite<tokio::io::WriteHalf<platform::IpcStream>, LengthDelimitedCodec>,
}
impl Client {
pub(crate) async fn disconnect(mut self) -> Result<()> {
self.send_msg(&IpcClientMsg::Disconnect)
.await
.context("Couldn't send Disconnect")?;
self.tx.close().await?;
self.task.abort();
Ok(())
}
pub(crate) async fn send_msg(&mut self, msg: &IpcClientMsg) -> Result<()> {
self.tx
.send(
serde_json::to_string(msg)
.context("Couldn't encode IPC message as JSON")?
.into(),
)
.await
.context("Couldn't send IPC message")?;
Ok(())
}
pub(crate) async fn connect(
api_url: &str,
token: SecretString,
callback_handler: CallbackHandler,
tokio_handle: tokio::runtime::Handle,
) -> Result<Self> {
tracing::info!(pid = std::process::id(), "Connecting to IPC service...");
let stream = platform::connect_to_service().await?;
let (rx, tx) = tokio::io::split(stream);
// Receives messages from the IPC service
let mut rx = FramedRead::new(rx, LengthDelimitedCodec::new());
let tx = FramedWrite::new(tx, LengthDelimitedCodec::new());
let task = tokio_handle.spawn(async move {
while let Some(msg) = rx.next().await.transpose()? {
match serde_json::from_slice::<IpcServerMsg>(&msg)? {
IpcServerMsg::Ok => {}
IpcServerMsg::OnDisconnect => callback_handler.on_disconnect(
&connlib_client_shared::Error::Other("errors can't be serialized"),
),
IpcServerMsg::OnUpdateResources(v) => callback_handler.on_update_resources(v),
IpcServerMsg::OnSetInterfaceConfig { ipv4, ipv6, dns } => {
callback_handler.on_set_interface_config(ipv4, ipv6, dns);
}
}
}
Ok(())
});
let mut client = Self { task, tx };
let token = token.expose_secret().clone();
client
.send_msg(&IpcClientMsg::Connect {
api_url: api_url.to_string(),
token,
})
.await
.context("Couldn't send Connect message")?;
Ok(client)
}
pub(crate) async fn reconnect(&mut self) -> Result<()> {
self.send_msg(&IpcClientMsg::Reconnect)
.await

View File

@@ -1,82 +1,18 @@
use anyhow::{Context as _, Result};
use connlib_client_shared::Callbacks;
use firezone_headless_client::{platform::sock_path, IpcClientMsg, IpcServerMsg};
use futures::{SinkExt, StreamExt};
use secrecy::{ExposeSecret, SecretString};
use tokio::net::{unix::OwnedWriteHalf, UnixStream};
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use firezone_headless_client::platform::sock_path;
use tokio::net::UnixStream;
/// Forwards events to and from connlib
pub(crate) struct Client {
recv_task: tokio::task::JoinHandle<Result<()>>,
tx: FramedWrite<OwnedWriteHalf, LengthDelimitedCodec>,
}
impl Client {
pub(crate) async fn disconnect(mut self) -> Result<()> {
self.send_msg(&IpcClientMsg::Disconnect)
.await
.context("Couldn't send Disconnect")?;
self.tx.close().await?;
self.recv_task.abort();
Ok(())
}
pub(crate) async fn send_msg(&mut self, msg: &IpcClientMsg) -> Result<()> {
self.tx
.send(
serde_json::to_string(msg)
.context("Couldn't encode IPC message as JSON")?
.into(),
)
.await
.context("Couldn't send IPC message")?;
Ok(())
}
pub(crate) async fn connect(
api_url: &str,
token: SecretString,
callback_handler: super::CallbackHandler,
tokio_handle: tokio::runtime::Handle,
) -> Result<Self> {
tracing::info!(pid = std::process::id(), "Connecting to IPC service...");
let stream = UnixStream::connect(sock_path())
.await
.context("Couldn't connect to UDS")?;
let (rx, tx) = stream.into_split();
// Receives messages from the IPC service
let mut rx = FramedRead::new(rx, LengthDelimitedCodec::new());
let tx = FramedWrite::new(tx, LengthDelimitedCodec::new());
// TODO: Make sure this joins / drops somewhere
let recv_task = tokio_handle.spawn(async move {
while let Some(msg) = rx.next().await.transpose()? {
let msg: IpcServerMsg = serde_json::from_slice(&msg)?;
match msg {
IpcServerMsg::Ok => {}
IpcServerMsg::OnDisconnect => callback_handler.on_disconnect(
&connlib_client_shared::Error::Other("errors can't be serialized"),
),
IpcServerMsg::OnUpdateResources(v) => callback_handler.on_update_resources(v),
IpcServerMsg::OnSetInterfaceConfig { ipv4, ipv6, dns } => {
callback_handler.on_set_interface_config(ipv4, ipv6, dns);
}
}
}
Ok(())
});
let mut client = Self { recv_task, tx };
let token = token.expose_secret().clone();
client
.send_msg(&IpcClientMsg::Connect {
api_url: api_url.to_string(),
token,
})
.await
.context("Couldn't send Connect message")?;
Ok(client)
}
/// A type alias to abstract over the Windows and Unix IPC primitives
pub(crate) type IpcStream = UnixStream;
/// Connect to the IPC service
pub(crate) async fn connect_to_service() -> Result<IpcStream> {
let path = sock_path();
let stream = UnixStream::connect(&path).await.with_context(|| {
format!(
"Couldn't connect to Unix domain socket at `{}`",
path.display()
)
})?;
Ok(stream)
}

View File

@@ -1,117 +1,17 @@
use anyhow::{anyhow, Context as _, Result};
use connlib_client_shared::Callbacks;
use firezone_headless_client::{IpcClientMsg, IpcServerMsg};
use futures::{SinkExt, Stream};
use secrecy::{ExposeSecret, SecretString};
use std::{pin::pin, task::Poll};
use tokio::{net::windows::named_pipe, sync::mpsc};
use tokio_util::codec::{Framed, LengthDelimitedCodec};
use anyhow::{Context as _, Result};
use tokio::net::windows::named_pipe;
pub(crate) struct Client {
task: tokio::task::JoinHandle<Result<()>>,
// Needed temporarily to avoid a big refactor. We can remove this in the future.
tx: mpsc::Sender<String>,
}
impl Client {
pub(crate) async fn disconnect(mut self) -> Result<()> {
self.send_msg(&IpcClientMsg::Disconnect)
.await
.context("Couldn't send Disconnect")?;
self.task.abort();
Ok(())
}
#[allow(clippy::unused_async)]
pub(crate) async fn send_msg(&mut self, msg: &IpcClientMsg) -> Result<()> {
self.tx
.send(serde_json::to_string(msg).context("Couldn't encode IPC message as JSON")?)
.await
.context("Couldn't send IPC message")?;
Ok(())
}
pub(crate) async fn connect(
api_url: &str,
token: SecretString,
callback_handler: super::CallbackHandler,
tokio_handle: tokio::runtime::Handle,
) -> Result<Self> {
tracing::info!(pid = std::process::id(), "Connecting to IPC service...");
let ipc = named_pipe::ClientOptions::new()
.open(firezone_headless_client::windows::pipe_path())
.context("Couldn't connect to named pipe server")?;
let ipc = Framed::new(ipc, LengthDelimitedCodec::new());
// This channel allows us to communicate with the GUI even though NamedPipeClient
// doesn't have `into_split`.
let (tx, mut rx) = mpsc::channel(1);
let task = tokio_handle.spawn(async move {
let mut ipc = pin!(ipc);
loop {
let ev = std::future::poll_fn(|cx| {
match rx.poll_recv(cx) {
Poll::Ready(Some(msg)) => return Poll::Ready(Ok(IpcEvent::Gui(msg))),
Poll::Ready(None) => {
return Poll::Ready(Err(anyhow!("MPSC channel from GUI closed")))
}
Poll::Pending => {}
}
match ipc.as_mut().poll_next(cx) {
Poll::Ready(Some(msg)) => {
let msg = serde_json::from_slice(&msg?)?;
return Poll::Ready(Ok(IpcEvent::Connlib(msg)));
}
Poll::Ready(None) => {
return Poll::Ready(Err(anyhow!("IPC service disconnected from us")))
}
Poll::Pending => {}
}
Poll::Pending
})
.await;
match ev {
Ok(IpcEvent::Gui(msg)) => ipc.send(msg.into()).await?,
Ok(IpcEvent::Connlib(msg)) => match msg {
IpcServerMsg::Ok => {}
IpcServerMsg::OnDisconnect => callback_handler.on_disconnect(
&connlib_client_shared::Error::Other("errors can't be serialized"),
),
IpcServerMsg::OnUpdateResources(v) => {
callback_handler.on_update_resources(v)
}
IpcServerMsg::OnSetInterfaceConfig { ipv4, ipv6, dns } => {
callback_handler.on_set_interface_config(ipv4, ipv6, dns);
}
},
Err(error) => {
tracing::error!(?error, "Error while waiting for IPC tx/rx");
// TODO: Catch that error when the task is joined
Err(error)?
}
}
}
});
let mut client = Self { task, tx };
let token = token.expose_secret().clone();
client
.send_msg(&IpcClientMsg::Connect {
api_url: api_url.to_string(),
token,
})
.await
.context("Couldn't send Connect message")?;
Ok(client)
}
}
enum IpcEvent {
/// The GUI wants to send a message to the service
Gui(String),
/// The connlib instance in the service wants to send a message to the GUI
Connlib(IpcServerMsg),
/// A type alias to abstract over the Windows and Unix IPC primitives
pub(crate) type IpcStream = named_pipe::NamedPipeClient;
/// Connect to the IPC service
///
/// This is async on Linux
#[allow(clippy::unused_async)]
pub(crate) async fn connect_to_service() -> Result<IpcStream> {
let path = firezone_headless_client::windows::pipe_path();
let stream = named_pipe::ClientOptions::new()
.open(path)
.with_context(|| "Couldn't connect to named pipe server at `{path}`")?;
Ok(stream)
}