feat(connlib): discard intermediate resource and TUN updates (#10223)

Right now, the Client event-loops have a channel with 1000 items for
sending new resource lists and updates to the TUN device to the host
app. This is kind of unnecessary as we always only care about the last
version of these. Intermediate updates that the host app doesn't process
are effectively irrelevant.

We've had an issue before where a bug in the portal caused us to receive
many updates to resources which ended up crashing Client apps because
this channel filled up.

To be more resilient on this front, we refactor the Client event loop to
use a `watch` channel for this. Watch channels only retain the last
value that got sent into them.
This commit is contained in:
Thomas Eizinger
2025-08-21 05:42:54 +00:00
committed by GitHub
parent 46afa52f78
commit a109c1a2ef
13 changed files with 139 additions and 187 deletions

2
rust/Cargo.lock generated
View File

@@ -1360,6 +1360,7 @@ dependencies = [
"thiserror 2.0.15",
"time",
"tokio",
"tokio-stream",
"tracing",
"tun",
"url",
@@ -7996,6 +7997,7 @@ dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
"tokio-util",
]
[[package]]

View File

@@ -142,8 +142,8 @@ impl CallbackHandler {
tunnel_address_v6: Ipv6Addr,
dns_addresses: Vec<IpAddr>,
search_domain: Option<DomainName>,
route_list_v4: Vec<Ipv4Network>,
route_list_v6: Vec<Ipv6Network>,
route_list_v4: impl IntoIterator<Item = Ipv4Network>,
route_list_v6: impl IntoIterator<Item = Ipv6Network>,
) {
match (
serde_json::to_string(&dns_addresses),
@@ -315,21 +315,14 @@ impl WrappedSession {
while let Some(event) = event_stream.next().await {
match event {
Event::TunInterfaceUpdated {
ipv4,
ipv6,
dns,
search_domain,
ipv4_routes,
ipv6_routes,
} => {
Event::TunInterfaceUpdated(config) => {
callback_handler.on_set_interface_config(
ipv4,
ipv6,
dns,
search_domain,
ipv4_routes,
ipv6_routes,
config.ip.v4,
config.ip.v6,
config.dns_sentinel_ips(),
config.search_domain,
config.ipv4_routes,
config.ipv6_routes,
);
}
Event::ResourcesUpdated(resource_views) => {

View File

@@ -175,8 +175,8 @@ impl TunDeviceManager {
pub async fn set_routes(
&mut self,
ipv4: Vec<Ipv4Network>,
ipv6: Vec<Ipv6Network>,
ipv4: impl IntoIterator<Item = Ipv4Network>,
ipv6: impl IntoIterator<Item = Ipv6Network>,
) -> Result<()> {
let new_routes: HashSet<IpNetwork> = ipv4
.into_iter()

View File

@@ -29,8 +29,8 @@ impl TunDeviceManager {
)]
pub async fn set_routes(
&mut self,
_ipv4: Vec<Ipv4Network>,
_ipv6: Vec<Ipv6Network>,
_ipv4: impl IntoIterator<Item = Ipv4Network>,
_ipv6: impl IntoIterator<Item = Ipv6Network>,
) -> Result<()> {
bail!("Not implemented")
}

View File

@@ -99,8 +99,12 @@ impl TunDeviceManager {
Ok(())
}
#[tracing::instrument(level = "trace", skip(self))]
pub async fn set_routes(&mut self, v4: Vec<Ipv4Network>, v6: Vec<Ipv6Network>) -> Result<()> {
#[expect(clippy::unused_async, reason = "Must match Linux API")]
pub async fn set_routes(
&mut self,
v4: impl IntoIterator<Item = Ipv4Network>,
v6: impl IntoIterator<Item = Ipv6Network>,
) -> Result<()> {
let iface_idx = self
.iface_idx
.context("Cannot set routes without having created TUN device")?;

View File

@@ -169,25 +169,21 @@ impl Session {
pub async fn next_event(&self) -> Result<Option<Event>, Error> {
match self.events.lock().await.next().await {
Some(client_shared::Event::TunInterfaceUpdated {
ipv4,
ipv6,
dns,
search_domain,
ipv4_routes,
ipv6_routes,
}) => {
let dns = serde_json::to_string(&dns).context("Failed to serialize DNS servers")?;
let ipv4_routes = serde_json::to_string(&V4RouteList::new(ipv4_routes))
Some(client_shared::Event::TunInterfaceUpdated(config)) => {
let dns = serde_json::to_string(
&config.dns_by_sentinel.left_values().collect::<Vec<_>>(),
)
.context("Failed to serialize DNS servers")?;
let ipv4_routes = serde_json::to_string(&V4RouteList::new(config.ipv4_routes))
.context("Failed to serialize IPv4 routes")?;
let ipv6_routes = serde_json::to_string(&V6RouteList::new(ipv6_routes))
let ipv6_routes = serde_json::to_string(&V6RouteList::new(config.ipv6_routes))
.context("Failed to serialize IPv6 routes")?;
Ok(Some(Event::TunInterfaceUpdated {
ipv4: ipv4.to_string(),
ipv6: ipv6.to_string(),
ipv4: config.ip.v4.to_string(),
ipv6: config.ip.v6.to_string(),
dns,
search_domain: search_domain.map(|d| d.to_string()),
search_domain: config.search_domain.map(|d| d.to_string()),
ipv4_routes,
ipv6_routes,
}))

View File

@@ -23,6 +23,7 @@ socket-factory = { workspace = true }
thiserror = { workspace = true }
time = { workspace = true, features = ["formatting"] }
tokio = { workspace = true, features = ["rt", "sync"] }
tokio-stream = { workspace = true, features = ["sync"] }
tracing = { workspace = true, features = ["std", "attributes"] }
tun = { workspace = true }
url = { workspace = true, features = ["serde"] }

View File

@@ -1,16 +1,13 @@
use crate::PHOENIX_TOPIC;
use anyhow::{Context as _, Result};
use connlib_model::{PublicKey, ResourceId, ResourceView};
use dns_types::DomainName;
use firezone_tunnel::messages::RelaysPresence;
use firezone_tunnel::messages::client::{
EgressMessages, FailReason, FlowCreated, FlowCreationFailed, GatewayIceCandidates,
GatewaysIceCandidates, IngressMessages, InitClient,
};
use firezone_tunnel::{ClientEvent, ClientTunnel, IpConfig, TunConfig};
use ip_network::{Ipv4Network, Ipv6Network};
use phoenix_channel::{ErrorReply, PhoenixChannel, PublicKeyParam};
use std::net::{Ipv4Addr, Ipv6Addr};
use std::ops::ControlFlow;
use std::pin::pin;
use std::time::Instant;
@@ -21,16 +18,17 @@ use std::{
task::{Context, Poll},
};
use std::{future, mem};
use tokio::sync::mpsc;
use tokio::sync::{mpsc, watch};
use tun::Tun;
pub struct Eventloop {
tunnel: ClientTunnel,
cmd_rx: mpsc::UnboundedReceiver<Command>,
event_tx: mpsc::Sender<Event>,
resource_list_sender: watch::Sender<Vec<ResourceView>>,
tun_config_sender: watch::Sender<Option<TunConfig>>,
portal_event_rx: mpsc::Receiver<PortalEvent>,
portal_event_rx: mpsc::Receiver<Result<IngressMessages, phoenix_channel::Error>>,
portal_cmd_tx: mpsc::Sender<PortalCommand>,
logged_permission_denied: bool,
@@ -45,46 +43,11 @@ pub enum Command {
SetDisabledResources(BTreeSet<ResourceId>),
}
pub enum Event {
TunInterfaceUpdated {
ipv4: Ipv4Addr,
ipv6: Ipv6Addr,
dns: Vec<IpAddr>,
search_domain: Option<DomainName>,
ipv4_routes: Vec<Ipv4Network>,
ipv6_routes: Vec<Ipv6Network>,
},
ResourcesUpdated(Vec<ResourceView>),
Disconnected(DisconnectError),
}
impl Event {
fn tun_interface_updated(config: TunConfig) -> Self {
Self::TunInterfaceUpdated {
ipv4: config.ip.v4,
ipv6: config.ip.v6,
dns: config.dns_by_sentinel.left_values().copied().collect(),
search_domain: config.search_domain,
ipv4_routes: Vec::from_iter(config.ipv4_routes),
ipv6_routes: Vec::from_iter(config.ipv6_routes),
}
}
}
enum PortalCommand {
Connect(PublicKeyParam),
Send(EgressMessages),
}
#[expect(
clippy::large_enum_variant,
reason = "This type is only sent through a channel so the stack-size doesn't matter much."
)]
enum PortalEvent {
Received(IngressMessages),
Error(phoenix_channel::Error),
}
/// Unified error type to use across connlib.
#[derive(thiserror::Error, Debug)]
#[error("{0:#}")]
@@ -111,7 +74,8 @@ impl Eventloop {
tunnel: ClientTunnel,
mut portal: PhoenixChannel<(), IngressMessages, (), PublicKeyParam>,
cmd_rx: mpsc::UnboundedReceiver<Command>,
event_tx: mpsc::Sender<Event>,
resource_list_sender: watch::Sender<Vec<ResourceView>>,
tun_config_sender: watch::Sender<Option<TunConfig>>,
) -> Self {
let (portal_event_tx, portal_event_rx) = mpsc::channel(128);
let (portal_cmd_tx, portal_cmd_rx) = mpsc::channel(128);
@@ -127,10 +91,11 @@ impl Eventloop {
Self {
tunnel,
cmd_rx,
event_tx,
logged_permission_denied: false,
portal_event_rx,
portal_cmd_tx,
resource_list_sender,
tun_config_sender,
}
}
}
@@ -138,11 +103,11 @@ impl Eventloop {
enum CombinedEvent {
Command(Option<Command>),
Tunnel(Result<ClientEvent>),
Portal(Option<PortalEvent>),
Portal(Option<Result<IngressMessages, phoenix_channel::Error>>),
}
impl Eventloop {
pub async fn run(mut self) -> Result<()> {
pub async fn run(mut self) -> Result<(), DisconnectError> {
loop {
match future::poll_fn(|cx| self.next_event(cx)).await {
CombinedEvent::Command(None) => return Ok(()),
@@ -154,14 +119,15 @@ impl Eventloop {
}
CombinedEvent::Tunnel(Ok(event)) => self.handle_tunnel_event(event).await?,
CombinedEvent::Tunnel(Err(e)) => self.handle_tunnel_error(e)?,
CombinedEvent::Portal(Some(PortalEvent::Received(msg))) => {
CombinedEvent::Portal(Some(event)) => {
let msg = event.context("Connection to portal failed")?;
self.handle_portal_message(msg).await?;
}
CombinedEvent::Portal(Some(PortalEvent::Error(e))) => {
return Err(e).context("Connection to portal failed");
}
CombinedEvent::Portal(None) => {
return Err(anyhow::Error::msg("portal task exited unexpectedly"));
return Err(DisconnectError(anyhow::Error::msg(
"portal task exited unexpectedly",
)));
}
}
}
@@ -238,15 +204,13 @@ impl Eventloop {
.context("Failed to send message to portal")?;
}
ClientEvent::ResourcesChanged { resources } => {
self.event_tx
.send(Event::ResourcesUpdated(resources))
.await
self.resource_list_sender
.send(resources)
.context("Failed to emit event")?;
}
ClientEvent::TunInterfaceUpdated(config) => {
self.event_tx
.send(Event::tun_interface_updated(config))
.await
self.tun_config_sender
.send(Some(config))
.context("Failed to emit event")?;
}
}
@@ -431,7 +395,7 @@ impl Eventloop {
async fn phoenix_channel_event_loop(
mut portal: PhoenixChannel<(), IngressMessages, (), PublicKeyParam>,
event_tx: mpsc::Sender<PortalEvent>,
event_tx: mpsc::Sender<Result<IngressMessages, phoenix_channel::Error>>,
mut cmd_rx: mpsc::Receiver<PortalCommand>,
) {
use futures::future::Either;
@@ -441,7 +405,7 @@ async fn phoenix_channel_event_loop(
loop {
match select(poll_fn(|cx| portal.poll(cx)), pin!(cmd_rx.recv())).await {
Either::Left((Ok(phoenix_channel::Event::InboundMessage { msg, .. }), _)) => {
if event_tx.send(PortalEvent::Received(msg)).await.is_err() {
if event_tx.send(Ok(msg)).await.is_err() {
tracing::debug!("Event channel closed: exiting phoenix-channel event-loop");
break;
@@ -479,7 +443,7 @@ async fn phoenix_channel_event_loop(
"Hiccup in portal connection: {error:#}"
),
Either::Left((Err(e), _)) => {
let _ = event_tx.send(PortalEvent::Error(e)).await; // We don't care about the result because we are exiting anyway.
let _ = event_tx.send(Err(e)).await; // We don't care about the result because we are exiting anyway.
break;
}

View File

@@ -1,21 +1,26 @@
//! Main connlib library for clients.
pub use crate::serde_routelist::{V4RouteList, V6RouteList};
pub use connlib_model::StaticSecret;
pub use eventloop::{DisconnectError, Event};
pub use eventloop::DisconnectError;
pub use firezone_tunnel::TunConfig;
pub use firezone_tunnel::messages::client::{IngressMessages, ResourceDescription};
use anyhow::{Context as _, Result};
use connlib_model::ResourceId;
use anyhow::Result;
use connlib_model::{ResourceId, ResourceView};
use eventloop::{Command, Eventloop};
use firezone_tunnel::ClientTunnel;
use futures::{FutureExt, StreamExt};
use phoenix_channel::{PhoenixChannel, PublicKeyParam};
use socket_factory::{SocketFactory, TcpSocket, UdpSocket};
use std::collections::BTreeSet;
use std::future;
use std::net::IpAddr;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::sync::mpsc::{Receiver, Sender, UnboundedReceiver, UnboundedSender};
use tokio::sync::mpsc;
use tokio::sync::watch;
use tokio::task::JoinHandle;
use tokio_stream::wrappers::WatchStream;
use tun::Tun;
mod eventloop;
@@ -29,12 +34,21 @@ const PHOENIX_TOPIC: &str = "client";
/// To stop the session, simply drop this struct.
#[derive(Clone, Debug)]
pub struct Session {
channel: UnboundedSender<Command>,
channel: mpsc::UnboundedSender<Command>,
}
#[derive(Debug)]
pub struct EventStream {
channel: Receiver<Event>,
eventloop: JoinHandle<Result<(), DisconnectError>>,
resource_list_receiver: WatchStream<Vec<ResourceView>>,
tun_config_receiver: WatchStream<Option<TunConfig>>,
}
#[derive(Debug)]
pub enum Event {
TunInterfaceUpdated(TunConfig),
ResourcesUpdated(Vec<ResourceView>),
Disconnected(DisconnectError),
}
impl Session {
@@ -47,19 +61,31 @@ impl Session {
portal: PhoenixChannel<(), IngressMessages, (), PublicKeyParam>,
handle: tokio::runtime::Handle,
) -> (Self, EventStream) {
let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
let (event_tx, event_rx) = tokio::sync::mpsc::channel(1000);
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
let connect_handle = handle.spawn(connect(
tcp_socket_factory,
udp_socket_factory,
portal,
cmd_rx,
event_tx.clone(),
));
handle.spawn(connect_supervisor(connect_handle, event_tx));
// Use `watch` channels for resource list and TUN config because we are only ever interested in the last value and don't care about intermediate updates.
let (tun_config_sender, tun_config_receiver) = watch::channel(None);
let (resource_list_sender, resource_list_receiver) = watch::channel(Vec::default());
(Self { channel: cmd_tx }, EventStream { channel: event_rx })
let eventloop = handle.spawn(
Eventloop::new(
ClientTunnel::new(tcp_socket_factory, udp_socket_factory),
portal,
cmd_rx,
resource_list_sender,
tun_config_sender,
)
.run(),
);
(
Self { channel: cmd_tx },
EventStream {
eventloop,
resource_list_receiver: WatchStream::from_changes(resource_list_receiver),
tun_config_receiver: WatchStream::from_changes(tun_config_receiver),
},
)
}
/// Reset a [`Session`].
@@ -114,11 +140,30 @@ impl Session {
impl EventStream {
pub fn poll_next(&mut self, cx: &mut Context) -> Poll<Option<Event>> {
self.channel.poll_recv(cx)
match self.eventloop.poll_unpin(cx) {
Poll::Ready(Ok(Ok(()))) => return Poll::Ready(None),
Poll::Ready(Ok(Err(e))) => return Poll::Ready(Some(Event::Disconnected(e))),
Poll::Ready(Err(e)) => {
return Poll::Ready(Some(Event::Disconnected(DisconnectError::from(
anyhow::Error::new(e).context("connlib crashed"),
))));
}
Poll::Pending => {}
}
if let Poll::Ready(Some(resources)) = self.resource_list_receiver.poll_next_unpin(cx) {
return Poll::Ready(Some(Event::ResourcesUpdated(resources)));
}
if let Poll::Ready(Some(Some(config))) = self.tun_config_receiver.poll_next_unpin(cx) {
return Poll::Ready(Some(Event::TunInterfaceUpdated(config)));
}
Poll::Pending
}
pub async fn next(&mut self) -> Option<Event> {
self.channel.recv().await
future::poll_fn(|cx| self.poll_next(cx)).await
}
}
@@ -127,51 +172,3 @@ impl Drop for Session {
tracing::debug!("`Session` dropped")
}
}
/// Connects to the portal and starts a tunnel.
///
/// When this function exits, the tunnel failed unrecoverably and you need to call it again.
async fn connect(
tcp_socket_factory: Arc<dyn SocketFactory<TcpSocket>>,
udp_socket_factory: Arc<dyn SocketFactory<UdpSocket>>,
portal: PhoenixChannel<(), IngressMessages, (), PublicKeyParam>,
cmd_rx: UnboundedReceiver<Command>,
event_tx: Sender<Event>,
) -> Result<()> {
Eventloop::new(
ClientTunnel::new(tcp_socket_factory, udp_socket_factory),
portal,
cmd_rx,
event_tx,
)
.run()
.await?;
Ok(())
}
/// A supervisor task that handles, when [`connect`] exits.
async fn connect_supervisor(
connect_handle: JoinHandle<Result<()>>,
event_tx: tokio::sync::mpsc::Sender<Event>,
) {
let task = async {
connect_handle.await.context("connlib crashed")??;
Ok(())
};
let error = match task.await {
Ok(()) => {
tracing::info!("connlib exited gracefully");
return;
}
Err(e) => e,
};
match event_tx.send(Event::Disconnected(error)).await {
Ok(()) => (),
Err(_) => tracing::debug!("Event stream closed before we could send disconnected event"),
}
}

View File

@@ -13,7 +13,7 @@ struct Cidr<T> {
pub struct V4RouteList(Vec<Cidr<Ipv4Addr>>);
impl V4RouteList {
pub fn new(route: Vec<Ipv4Network>) -> Self {
pub fn new(route: impl IntoIterator<Item = Ipv4Network>) -> Self {
Self(
route
.into_iter()
@@ -32,7 +32,7 @@ impl V4RouteList {
pub struct V6RouteList(Vec<Cidr<Ipv6Addr>>);
impl V6RouteList {
pub fn new(route: Vec<Ipv6Network>) -> Self {
pub fn new(route: impl IntoIterator<Item = Ipv6Network>) -> Self {
Self(
route
.into_iter()

View File

@@ -446,6 +446,12 @@ pub struct TunConfig {
pub ipv6_routes: BTreeSet<Ipv6Network>,
}
impl TunConfig {
pub fn dns_sentinel_ips(&self) -> Vec<IpAddr> {
self.dns_by_sentinel.left_values().copied().collect()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct IpConfig {
pub v4: Ipv4Addr,

View File

@@ -473,19 +473,16 @@ impl<'a> Handler<'a> {
})
.await?
}
client_shared::Event::TunInterfaceUpdated {
ipv4,
ipv6,
dns,
search_domain,
ipv4_routes,
ipv6_routes,
} => {
client_shared::Event::TunInterfaceUpdated(config) => {
self.session.transition_to_connected()?;
self.tun_device.set_ips(ipv4, ipv6).await?;
self.dns_controller.set_dns(dns, search_domain).await?;
self.tun_device.set_routes(ipv4_routes, ipv6_routes).await?;
self.tun_device.set_ips(config.ip.v4, config.ip.v6).await?;
self.dns_controller
.set_dns(config.dns_sentinel_ips(), config.search_domain)
.await?;
self.tun_device
.set_routes(config.ipv4_routes, config.ipv6_routes)
.await?;
self.dns_controller.flush()?;
}
client_shared::Event::ResourcesUpdated(resources) => {

View File

@@ -328,18 +328,10 @@ fn main() -> Result<()> {
// On every Resources update, flush DNS to mitigate <https://github.com/firezone/firezone/issues/5052>
dns_controller.flush()?;
}
client_shared::Event::TunInterfaceUpdated {
ipv4,
ipv6,
dns,
search_domain,
ipv4_routes,
ipv6_routes,
} => {
tun_device.set_ips(ipv4, ipv6).await?;
tun_device.set_routes(ipv4_routes, ipv6_routes).await?;
dns_controller.set_dns(dns, search_domain).await?;
client_shared::Event::TunInterfaceUpdated(config) => {
tun_device.set_ips(config.ip.v4, config.ip.v6).await?;
dns_controller.set_dns(config.dns_sentinel_ips(), config.search_domain).await?;
tun_device.set_routes(config.ipv4_routes, config.ipv6_routes).await?;
// `on_set_interface_config` is guaranteed to be called when the tunnel is completely ready
// <https://github.com/firezone/firezone/pull/6026#discussion_r1692297438>