feat(clients): gracefully close connections on shutdown (#10400)

In #10076, connlib gained the ability to gracefully close connections
between peers. The Gateway already uses this when it is being gracefully
shutdown such as during an upgrade. This allows Clients to immediately
fail-over to a different Gateway instead of waiting for an ICE timeout.

When a Client signs out, we currently just drop all the state, resulting
in an ICE timeout on the Gateway ~15 seconds later. This makes it
difficult for us to analyze, whether an ICE timeout in the logs presents
an actual problem where a network connection got cut or whether the
Client simply signed out.

Whilst not water-tight, attempting to gracefully close our connections
when the Client signs out is better than nothing so we implement this
here.

All Clients use the `Session` abstraction from `client-shared` which
spawns the event-loop into a dedicated task.

- For the Linux and Windows GUI client, the already present tokio
runtime instance of the tunnel service is used for this.
- For Android and Apple, we create a dedicated, single-threaded runtime
instance for connlib.
- For the headless client, we also reuse the already existing tokio
runtime instance of the binary.

In case of Android, Apple and the headless client, this means we need to
ensure the tokio runtime instances stays alive long enough to actually
complete the graceful shutdown task. We achieve this by draining the
`EventStream` returned from `Session`. The `EventStream` is a wrapper
around a channel connected to the event-loop. This stream only finishes
once the event-loop is entirely dropped (and therefore completed the
graceful shutdown) as it holds the sender-end of the channel.

In case of the Linux and Windows GUI client, the runtime outlives the
`Session` because it is scoped to the entire tunnel process. Therefore,
no additional measures are necessary there to ensure the graceful
shutdown task completes.
This commit is contained in:
Thomas Eizinger
2025-09-23 03:40:52 +00:00
committed by GitHub
parent 1581042d10
commit 0310bafbcd
7 changed files with 165 additions and 51 deletions

View File

@@ -29,6 +29,7 @@ use std::{
time::Duration,
};
use tokio::runtime::Runtime;
use tokio::task::JoinHandle;
use tracing_subscriber::prelude::*;
use tun::Tun;
@@ -119,6 +120,7 @@ mod ffi {
pub struct WrappedSession {
inner: Session,
runtime: Option<Runtime>,
event_stream_handler: Option<JoinHandle<()>>,
telemetry: Telemetry,
}
@@ -305,7 +307,7 @@ impl WrappedSession {
analytics::new_session(device_id, api_url.to_string());
runtime.spawn(async move {
let event_stream_handler = runtime.spawn(async move {
let callback_handler = CallbackHandler {
inner: callback_handler,
};
@@ -335,6 +337,7 @@ impl WrappedSession {
Ok(Self {
inner: session,
runtime: Some(runtime),
event_stream_handler: Some(event_stream_handler),
telemetry,
})
}
@@ -380,7 +383,19 @@ impl Drop for WrappedSession {
return;
};
runtime.block_on(self.telemetry.stop());
self.inner.stop(); // Instruct the event-loop to shutdown.
runtime.block_on(async {
self.telemetry.stop().await;
// The `event_stream_handler` task will exit once the stream is drained.
// That only happens once the event-loop has fully shut down.
// Hence, waiting for this task here allows us to wait for the graceful shutdown to complete.
let Some(event_stream_handler) = self.event_stream_handler.take() else {
return;
};
let _ = tokio::time::timeout(Duration::from_secs(1), event_stream_handler).await;
});
runtime.shutdown_timeout(Duration::from_secs(1)); // Ensure we don't block forever on a task in the blocking pool.
}
}

View File

@@ -208,7 +208,16 @@ impl Drop for Session {
return;
};
runtime.block_on(async { self.telemetry.lock().await.stop_on_crash().await });
self.inner.stop(); // Instruct the event-loop to shutdown.
runtime.block_on(async {
self.telemetry.lock().await.stop_on_crash().await;
// Draining the event-stream allows us to wait for the event-loop to finish its graceful shutdown.
let drain = async { self.events.lock().await.drain().await };
let _ = tokio::time::timeout(Duration::from_secs(1), drain).await;
});
runtime.shutdown_timeout(Duration::from_secs(1)); // Ensure we don't block forever on a task in the blocking pool.
}
}

View File

@@ -47,7 +47,7 @@ use tun::Tun;
static DNS_RESOURCE_RECORDS_CACHE: Mutex<BTreeSet<DnsResourceRecord>> = Mutex::new(BTreeSet::new());
pub struct Eventloop {
tunnel: ClientTunnel,
tunnel: Option<ClientTunnel>,
cmd_rx: mpsc::UnboundedReceiver<Command>,
resource_list_sender: watch::Sender<Vec<ResourceView>>,
@@ -121,7 +121,7 @@ impl Eventloop {
));
Self {
tunnel,
tunnel: Some(tunnel),
cmd_rx,
logged_permission_denied: false,
portal_event_rx,
@@ -141,45 +141,83 @@ enum CombinedEvent {
impl Eventloop {
pub async fn run(mut self) -> Result<(), DisconnectError> {
loop {
match future::poll_fn(|cx| self.next_event(cx)).await {
CombinedEvent::Command(None) => return Ok(()),
CombinedEvent::Command(Some(cmd)) => {
match self.handle_eventloop_command(cmd).await? {
ControlFlow::Continue(()) => {}
ControlFlow::Break(()) => return Ok(()),
}
}
CombinedEvent::Tunnel(event) => self.handle_tunnel_event(event).await?,
CombinedEvent::Portal(Some(event)) => {
let msg = event.context("Connection to portal failed")?;
match self.tick().await {
Ok(ControlFlow::Continue(())) => continue,
Ok(ControlFlow::Break(())) => {
self.shutdown_tunnel().await?;
self.handle_portal_message(msg).await?;
return Ok(());
}
CombinedEvent::Portal(None) => {
return Err(DisconnectError(anyhow::Error::msg(
"portal task exited unexpectedly",
)));
Err(e) => {
// Ignore error from shutdown to not obscure the original error.
let _ = self.shutdown_tunnel().await;
return Err(e);
}
}
}
}
async fn tick(&mut self) -> Result<ControlFlow<(), ()>, DisconnectError> {
match future::poll_fn(|cx| self.next_event(cx)).await {
CombinedEvent::Command(None) => Ok(ControlFlow::Break(())),
CombinedEvent::Command(Some(cmd)) => {
let cf = self.handle_eventloop_command(cmd).await?;
Ok(cf)
}
CombinedEvent::Tunnel(event) => {
self.handle_tunnel_event(event).await?;
Ok(ControlFlow::Continue(()))
}
CombinedEvent::Portal(Some(event)) => {
let msg = event.context("Connection to portal failed")?;
self.handle_portal_message(msg).await?;
Ok(ControlFlow::Continue(()))
}
CombinedEvent::Portal(None) => Err(DisconnectError(anyhow::Error::msg(
"portal task exited unexpectedly",
))),
}
}
async fn handle_eventloop_command(&mut self, command: Command) -> Result<ControlFlow<(), ()>> {
match command {
Command::Stop => return Ok(ControlFlow::Break(())),
Command::SetDns(dns) => self.tunnel.state_mut().update_system_resolvers(dns),
Command::SetDisabledResources(resources) => self
.tunnel
.state_mut()
.set_disabled_resources(resources, Instant::now()),
Command::SetDns(dns) => {
let Some(tunnel) = self.tunnel.as_mut() else {
return Ok(ControlFlow::Continue(()));
};
tunnel.state_mut().update_system_resolvers(dns);
}
Command::SetDisabledResources(resources) => {
let Some(tunnel) = self.tunnel.as_mut() else {
return Ok(ControlFlow::Continue(()));
};
tunnel
.state_mut()
.set_disabled_resources(resources, Instant::now())
}
Command::SetTun(tun) => {
self.tunnel.set_tun(tun);
let Some(tunnel) = self.tunnel.as_mut() else {
return Ok(ControlFlow::Continue(()));
};
tunnel.set_tun(tun);
}
Command::Reset(reason) => {
self.tunnel.reset(&reason);
let Some(tunnel) = self.tunnel.as_mut() else {
return Ok(ControlFlow::Continue(()));
};
tunnel.reset(&reason);
self.portal_cmd_tx
.send(PortalCommand::Connect(PublicKeyParam(
self.tunnel.public_key().to_bytes(),
tunnel.public_key().to_bytes(),
)))
.await
.context("Failed to connect phoenix-channel")?;
@@ -299,17 +337,20 @@ impl Eventloop {
}
async fn handle_portal_message(&mut self, msg: IngressMessages) -> Result<()> {
let Some(tunnel) = self.tunnel.as_mut() else {
return Ok(());
};
match msg {
IngressMessages::ConfigChanged(config) => self
.tunnel
.state_mut()
.update_interface_config(config.interface),
IngressMessages::ConfigChanged(config) => {
tunnel.state_mut().update_interface_config(config.interface)
}
IngressMessages::IceCandidates(GatewayIceCandidates {
gateway_id,
candidates,
}) => {
for candidate in candidates {
self.tunnel
tunnel
.state_mut()
.add_ice_candidate(gateway_id, candidate, Instant::now())
}
@@ -319,7 +360,7 @@ impl Eventloop {
resources,
relays,
}) => {
let state = self.tunnel.state_mut();
let state = tunnel.state_mut();
state.update_interface_config(interface);
state.set_resources(resources, Instant::now());
@@ -330,19 +371,15 @@ impl Eventloop {
);
}
IngressMessages::ResourceCreatedOrUpdated(resource) => {
self.tunnel
.state_mut()
.add_resource(resource, Instant::now());
tunnel.state_mut().add_resource(resource, Instant::now());
}
IngressMessages::ResourceDeleted(resource) => {
self.tunnel
.state_mut()
.remove_resource(resource, Instant::now());
tunnel.state_mut().remove_resource(resource, Instant::now());
}
IngressMessages::RelaysPresence(RelaysPresence {
disconnected_ids,
connected,
}) => self.tunnel.state_mut().update_relays(
}) => tunnel.state_mut().update_relays(
BTreeSet::from_iter(disconnected_ids),
firezone_tunnel::turn(&connected),
Instant::now(),
@@ -352,11 +389,9 @@ impl Eventloop {
candidates,
}) => {
for candidate in candidates {
self.tunnel.state_mut().remove_ice_candidate(
gateway_id,
candidate,
Instant::now(),
)
tunnel
.state_mut()
.remove_ice_candidate(gateway_id, candidate, Instant::now())
}
}
IngressMessages::FlowCreated(FlowCreated {
@@ -370,7 +405,7 @@ impl Eventloop {
client_ice_credentials,
gateway_ice_credentials,
}) => {
match self.tunnel.state_mut().handle_flow_created(
match tunnel.state_mut().handle_flow_created(
resource_id,
gateway_id,
PublicKey::from(gateway_public_key.0),
@@ -393,7 +428,7 @@ impl Eventloop {
// Re-connecting to the portal means we will receive another `init` and thus new TURN servers.
self.portal_cmd_tx
.send(PortalCommand::Connect(PublicKeyParam(
self.tunnel.public_key().to_bytes(),
tunnel.public_key().to_bytes(),
)))
.await
.context("Failed to connect phoenix-channel")?;
@@ -408,7 +443,7 @@ impl Eventloop {
reason: FailReason::Offline,
..
}) => {
self.tunnel.state_mut().set_resource_offline(resource_id);
tunnel.state_mut().set_resource_offline(resource_id);
}
IngressMessages::FlowCreationFailed(FlowCreationFailed { reason, .. }) => {
tracing::debug!("Failed to create flow: {reason:?}")
@@ -427,12 +462,26 @@ impl Eventloop {
return Poll::Ready(CombinedEvent::Portal(event));
}
if let Poll::Ready(event) = self.tunnel.poll_next_event(cx) {
if let Some(Poll::Ready(event)) = self.tunnel.as_mut().map(|t| t.poll_next_event(cx)) {
return Poll::Ready(CombinedEvent::Tunnel(event));
}
Poll::Pending
}
async fn shutdown_tunnel(&mut self) -> Result<()> {
let Some(tunnel) = self.tunnel.take() else {
tracing::debug!("Tunnel has already been shut down");
return Ok(());
};
tunnel
.shutdown()
.await
.context("Failed to shutdown tunnel")?;
Ok(())
}
}
async fn phoenix_channel_event_loop(

View File

@@ -166,6 +166,12 @@ impl EventStream {
pub async fn next(&mut self) -> Option<Event> {
future::poll_fn(|cx| self.poll_next(cx)).await
}
pub async fn drain(&mut self) -> Vec<Event> {
futures::stream::poll_fn(|cx| self.poll_next(cx))
.collect()
.await
}
}
impl Drop for Session {

View File

@@ -280,6 +280,13 @@ impl ClientState {
self.node.public_key()
}
pub fn shutdown(&mut self, now: Instant) {
tracing::info!("Initiating graceful shutdown");
self.peers.clear();
self.node.close_all(p2p_control::goodbye(), now);
}
/// Updates the NAT for all domains resolved by the stub resolver on the corresponding gateway.
///
/// In order to route traffic for DNS resources, the designated gateway needs to set up NAT from

View File

@@ -140,6 +140,31 @@ impl ClientTunnel {
self.io.reset();
}
/// Shutdown the Client tunnel.
pub fn shutdown(mut self) -> BoxFuture<'static, Result<()>> {
// Initiate shutdown.
self.role_state.shutdown(Instant::now());
// Drain all UDP packets that need to be sent.
while let Some(trans) = self.role_state.poll_transmit() {
self.io
.send_network(trans.src, trans.dst, &trans.payload, Ecn::NonEct);
}
// Return a future that "owns" our IO, polling it until all packets have been flushed.
async move {
tokio::time::timeout(
Duration::from_secs(1),
future::poll_fn(move |cx| self.io.flush(cx)),
)
.await
.context("Failed to flush within 1s")??;
Ok(())
}
.boxed()
}
pub fn poll_next_event(&mut self, cx: &mut Context<'_>) -> Poll<ClientEvent> {
for _ in 0..MAX_EVENTLOOP_ITERS {
let mut ready = false;

View File

@@ -420,6 +420,9 @@ fn try_main() -> Result<()> {
drop(session);
// Drain the event-stream to allow the event-loop to gracefully shutdown.
let _ = tokio::time::timeout(Duration::from_secs(1), event_stream.drain()).await;
result
})?;