refactor(gateway): close connections on error (#10401)

Previously, the Gateway would only proactively close connections to its
peers when it was shutdown gracefully via a SIGTERM or SIGINT signal. By
copying the same design for the event-loop as I've implemented in
#10400, we can now also initiate the graceful shutdown in case the
event-loop exits with an error.
This commit is contained in:
Thomas Eizinger
2025-09-20 20:55:48 +00:00
committed by GitHub
parent 7b2d98263a
commit 8e00870942
2 changed files with 80 additions and 56 deletions

View File

@@ -80,6 +80,8 @@ impl GatewayState {
}
pub fn shutdown(&mut self, now: Instant) {
tracing::info!("Initiating graceful shutdown");
self.peers.clear();
self.node.close_all(p2p_control::goodbye(), now);
}

View File

@@ -16,12 +16,11 @@ use firezone_tunnel::{
DnsResourceNatEntry, GatewayEvent, GatewayTunnel, IPV4_TUNNEL, IPV6_TUNNEL, IpConfig,
ResolveDnsRequest, TunnelError,
};
use futures::FutureExt;
use futures::future::BoxFuture;
use phoenix_channel::{PhoenixChannel, PublicKeyParam};
use std::collections::{BTreeMap, BTreeSet};
use std::future::{self, Future, poll_fn};
use std::net::{IpAddr, SocketAddrV4, SocketAddrV6};
use std::ops::ControlFlow;
use std::pin::pin;
use std::sync::Arc;
use std::task::{Context, Poll};
@@ -65,7 +64,6 @@ pub struct Eventloop {
dns_cache: moka::future::Cache<DomainName, Vec<IpAddr>>,
sigint: signals::Terminate,
shutdown: Option<BoxFuture<'static, Result<()>>>,
logged_permission_denied: bool,
}
@@ -108,14 +106,12 @@ impl Eventloop {
portal_event_rx,
portal_cmd_tx,
sigint: signals::Terminate::new()?,
shutdown: None,
})
}
}
enum CombinedEvent {
SigIntTerm,
ShutdownComplete(Result<()>),
Tunnel(GatewayEvent),
Portal(Option<Result<IngressMessages, phoenix_channel::Error>>),
DomainResolved((Result<Vec<IpAddr>, Arc<anyhow::Error>>, ResolveTrigger)),
@@ -124,61 +120,76 @@ enum CombinedEvent {
impl Eventloop {
pub async fn run(mut self) -> Result<()> {
loop {
match future::poll_fn(|cx| self.next_event(cx)).await {
CombinedEvent::Tunnel(event) => {
self.handle_tunnel_event(event).await?;
}
CombinedEvent::Portal(Some(Ok(msg))) => {
self.handle_portal_message(msg).await?;
}
CombinedEvent::Portal(None) => {
return Err(anyhow::Error::msg(
"phoenix channel task stopped unexpectedly",
));
}
CombinedEvent::Portal(Some(Err(e))) => {
return Err(e).context("Failed to login to portal");
}
CombinedEvent::DomainResolved((result, ResolveTrigger::RequestConnection(req))) => {
self.accept_connection(result, req).await?;
}
CombinedEvent::DomainResolved((result, ResolveTrigger::AllowAccess(req))) => {
self.allow_access(result, req);
}
CombinedEvent::DomainResolved((result, ResolveTrigger::SetupNat(req))) => {
let Some(tunnel) = self.tunnel.as_mut() else {
tracing::debug!("Ignoring DNS resolution result during shutdown");
match self.tick().await {
Ok(ControlFlow::Continue(())) => continue,
Ok(ControlFlow::Break(())) => {
self.shutdown_tunnel().await?;
continue;
};
if let Err(e) =
tunnel
.state_mut()
.handle_domain_resolved(req, result, Instant::now())
{
tracing::warn!("Failed to set DNS resource NAT: {e:#}");
};
return Ok(());
}
CombinedEvent::SigIntTerm => {
if self.shutdown.is_some() {
tracing::info!("Forcing shutdown on repeated SIGINT/SIGTERM");
Err(e) => {
// Ignore shutdown error here to not obscure the original error.
let _ = self.shutdown_tunnel().await;
return Ok(());
}
tracing::info!("Received SIGINT/SIGTERM, initiating graceful shutdown");
self.portal_cmd_tx.send(PortalCommand::Close).await?;
self.shutdown = self.tunnel.take().map(|t| t.shutdown());
}
CombinedEvent::ShutdownComplete(result) => {
return result.context("Graceful shutdown failed");
return Err(e);
}
}
}
}
pub async fn tick(&mut self) -> Result<ControlFlow<(), ()>> {
match future::poll_fn(|cx| self.next_event(cx)).await {
CombinedEvent::Tunnel(event) => {
self.handle_tunnel_event(event).await?;
Ok(ControlFlow::Continue(()))
}
CombinedEvent::Portal(Some(Ok(msg))) => {
self.handle_portal_message(msg).await?;
Ok(ControlFlow::Continue(()))
}
CombinedEvent::Portal(None) => Err(anyhow::Error::msg(
"phoenix channel task stopped unexpectedly",
)),
CombinedEvent::Portal(Some(Err(e))) => Err(e).context("Failed to login to portal"),
CombinedEvent::DomainResolved((result, ResolveTrigger::RequestConnection(req))) => {
self.accept_connection(result, req).await?;
Ok(ControlFlow::Continue(()))
}
CombinedEvent::DomainResolved((result, ResolveTrigger::AllowAccess(req))) => {
self.allow_access(result, req);
Ok(ControlFlow::Continue(()))
}
CombinedEvent::DomainResolved((result, ResolveTrigger::SetupNat(req))) => {
let Some(tunnel) = self.tunnel.as_mut() else {
tracing::debug!("Ignoring DNS resolution result during shutdown");
return Ok(ControlFlow::Continue(()));
};
if let Err(e) =
tunnel
.state_mut()
.handle_domain_resolved(req, result, Instant::now())
{
tracing::warn!("Failed to set DNS resource NAT: {e:#}");
};
Ok(ControlFlow::Continue(()))
}
CombinedEvent::SigIntTerm => {
tracing::info!("Received SIGINT/SIGTERM");
self.portal_cmd_tx.send(PortalCommand::Close).await?;
Ok(ControlFlow::Break(()))
}
}
}
fn next_event(&mut self, cx: &mut Context<'_>) -> Poll<CombinedEvent> {
if let Poll::Ready(event) = self.portal_event_rx.poll_recv(cx) {
return Poll::Ready(CombinedEvent::Portal(event));
@@ -202,13 +213,24 @@ impl Eventloop {
return Poll::Ready(CombinedEvent::SigIntTerm);
}
if let Some(Poll::Ready(result)) = self.shutdown.as_mut().map(|s| s.poll_unpin(cx)) {
return Poll::Ready(CombinedEvent::ShutdownComplete(result));
}
Poll::Pending
}
async fn shutdown_tunnel(&mut self) -> Result<()> {
let Some(tunnel) = self.tunnel.take() else {
tracing::debug!("Tunnel has already been shut down");
return Ok(());
};
tunnel
.shutdown()
.await
.context("Failed to shutdown tunnel")?;
Ok(())
}
async fn handle_tunnel_event(&mut self, event: firezone_tunnel::GatewayEvent) -> Result<()> {
match event {
firezone_tunnel::GatewayEvent::AddedIceCandidates {