From ffcb269c8b1560a88a03c49ef0ffaa33f3a159cc Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 15 Jul 2025 06:58:06 -0700 Subject: [PATCH] chore(connlib): add "wake reason" to `poll_timeout` (#9876) In order to debug timer interactions, it is useful to know when and why connlib wants to be woken to perform tasks. --- rust/connlib/snownet/src/allocation.rs | 24 ++++----- rust/connlib/snownet/src/node.rs | 54 ++++++++++++++------- rust/connlib/tunnel/src/client.rs | 22 +++++++-- rust/connlib/tunnel/src/gateway.rs | 14 ++++-- rust/connlib/tunnel/src/io.rs | 20 ++++++-- rust/connlib/tunnel/src/lib.rs | 8 +-- rust/connlib/tunnel/src/tests/sim_client.rs | 2 +- rust/connlib/tunnel/src/tests/sim_net.rs | 25 ++++++---- rust/connlib/tunnel/src/tests/sut.rs | 29 ++++------- rust/connlib/tunnel/src/utils.rs | 11 +---- 10 files changed, 125 insertions(+), 84 deletions(-) diff --git a/rust/connlib/snownet/src/allocation.rs b/rust/connlib/snownet/src/allocation.rs index 1f802e8a5..6612d8baf 100644 --- a/rust/connlib/snownet/src/allocation.rs +++ b/rust/connlib/snownet/src/allocation.rs @@ -761,9 +761,10 @@ impl Allocation { self.buffered_transmits.pop_front() } - pub fn poll_timeout(&self) -> Option { + pub fn poll_timeout(&self) -> Option<(Instant, &'static str)> { let next_refresh = if !self.refresh_in_flight() { self.refresh_allocation_at() + .map(|refresh_at| (refresh_at, "refresh allocation")) } else { None }; @@ -771,10 +772,11 @@ impl Allocation { let next_timeout = self .sent_requests .values() - .map(|(_, _, b)| b.next_trigger()); + .map(|(_, _, b)| (b.next_trigger(), "resend TURN message")); let next_keepalive = if self.has_allocation() { - self.active_socket.map(|a| a.next_binding) + self.active_socket + .map(|a| (a.next_binding, "TURN keep-alive")) } else { None }; @@ -783,7 +785,7 @@ impl Allocation { .chain(next_refresh) .chain(next_keepalive) .chain(next_timeout) - .min() + .min_by_key(|(instant, _)| *instant) } #[tracing::instrument(level = "debug", skip(self, now), fields(active_socket = ?self.active_socket))] @@ -1967,7 +1969,7 @@ mod tests { let mut expected_backoffs = VecDeque::from(backoff::steps(start)); loop { - let Some(timeout) = allocation.poll_timeout() else { + let Some((timeout, _)) = allocation.poll_timeout() else { break; }; @@ -2286,11 +2288,11 @@ mod tests { Instant::now(), ); - let refresh_at = allocation.poll_timeout().unwrap(); + let (refresh_at, _) = allocation.poll_timeout().unwrap(); allocation.handle_timeout(refresh_at); - assert!(allocation.poll_timeout().unwrap() > refresh_at); + assert!(allocation.poll_timeout().unwrap().0 > refresh_at); } #[test] @@ -2477,7 +2479,7 @@ mod tests { // Test that we send binding requests it. { - now = allocation.poll_timeout().unwrap(); + now = allocation.poll_timeout().unwrap().0; allocation.handle_timeout(now); let binding = allocation.next_message().unwrap(); @@ -2486,7 +2488,7 @@ mod tests { // Simulate bindings timing out for _ in backoff::steps(now) { - allocation.handle_timeout(allocation.poll_timeout().unwrap()); + allocation.handle_timeout(allocation.poll_timeout().unwrap().0); } assert_eq!( @@ -2660,7 +2662,7 @@ mod tests { ); loop { - let Some(timeout) = allocation.poll_timeout() else { + let Some((timeout, _)) = allocation.poll_timeout() else { break; }; @@ -2680,7 +2682,7 @@ mod tests { let mut allocation = Allocation::for_test_dual(Instant::now()); loop { - let Some(timeout) = allocation.poll_timeout() else { + let Some((timeout, _)) = allocation.poll_timeout() else { break; }; allocation.handle_timeout(timeout); diff --git a/rust/connlib/snownet/src/node.rs b/rust/connlib/snownet/src/node.rs index 11b23435e..98e9952f9 100644 --- a/rust/connlib/snownet/src/node.rs +++ b/rust/connlib/snownet/src/node.rs @@ -541,7 +541,7 @@ where /// This function only takes `&mut self` because it caches certain computations internally. /// The returned timestamp will **not** change unless other state is modified. #[must_use] - pub fn poll_timeout(&mut self) -> Option { + pub fn poll_timeout(&mut self) -> Option<(Instant, &'static str)> { iter::empty() .chain(self.connections.poll_timeout()) .chain( @@ -549,8 +549,11 @@ where .values_mut() .filter_map(|a| a.poll_timeout()), ) - .chain(self.next_rate_limiter_reset) - .min() + .chain( + self.next_rate_limiter_reset + .map(|instant| (instant, "rate limiter reset")), + ) + .min_by_key(|(instant, _)| *instant) } /// Advances time within the [`Node`]. @@ -1438,7 +1441,7 @@ where self.established.values().all(|c| c.is_idle()) } - fn poll_timeout(&mut self) -> Option { + fn poll_timeout(&mut self) -> Option<(Instant, &'static str)> { iter::empty() .chain(self.initial.values_mut().filter_map(|c| c.poll_timeout())) .chain( @@ -1446,7 +1449,7 @@ where .values_mut() .filter_map(|c| c.poll_timeout()), ) - .min() + .min_by_key(|(instant, _)| *instant) } } @@ -1644,11 +1647,18 @@ impl InitialConnection { } } - fn poll_timeout(&mut self) -> Option { + fn poll_timeout(&mut self) -> Option<(Instant, &'static str)> { iter::empty() - .chain(self.agent.poll_timeout()) - .chain(Some(self.no_answer_received_timeout())) - .min() + .chain( + self.agent + .poll_timeout() + .map(|timeout| (timeout, "ICE agent")), + ) + .chain(Some(( + self.no_answer_received_timeout(), + "connection handshake timeout", + ))) + .min_by_key(|(instant, _)| *instant) } fn no_answer_received_timeout(&self) -> Instant { @@ -1719,13 +1729,13 @@ impl ConnectionState where RId: Copy, { - fn poll_timeout(&self) -> Option { + fn poll_timeout(&self) -> Option<(Instant, &'static str)> { match self { ConnectionState::Connected { last_incoming, last_outgoing, .. - } => Some(idle_at(*last_incoming, *last_outgoing)), + } => Some((idle_at(*last_incoming, *last_outgoing), "idle transition")), ConnectionState::Connecting { .. } | ConnectionState::Idle { .. } | ConnectionState::Failed => None, @@ -1865,14 +1875,24 @@ where } #[must_use] - fn poll_timeout(&mut self) -> Option { + fn poll_timeout(&mut self) -> Option<(Instant, &'static str)> { iter::empty() - .chain(self.agent.poll_timeout()) - .chain(Some(self.next_wg_timer_update)) - .chain(self.candidate_timeout()) - .chain(self.disconnect_timeout()) + .chain( + self.agent + .poll_timeout() + .map(|instant| (instant, "ICE agent")), + ) + .chain(Some((self.next_wg_timer_update, "boringtun tunnel"))) + .chain( + self.candidate_timeout() + .map(|instant| (instant, "candidate timeout")), + ) + .chain( + self.disconnect_timeout() + .map(|instant| (instant, "disconnect timeout")), + ) .chain(self.state.poll_timeout()) - .min() + .min_by_key(|(instant, _)| *instant) } fn candidate_timeout(&self) -> Option { diff --git a/rust/connlib/tunnel/src/client.rs b/rust/connlib/tunnel/src/client.rs index d16fe7d7a..7e86db337 100644 --- a/rust/connlib/tunnel/src/client.rs +++ b/rust/connlib/tunnel/src/client.rs @@ -999,13 +999,25 @@ impl ClientState { .or_else(|| self.tcp_dns_server.poll_outbound()) } - pub fn poll_timeout(&mut self) -> Option { + pub fn poll_timeout(&mut self) -> Option<(Instant, &'static str)> { iter::empty() - .chain(self.udp_dns_sockets_by_upstream_and_query_id.poll_timeout()) - .chain(self.tcp_dns_client.poll_timeout()) - .chain(self.tcp_dns_server.poll_timeout()) + .chain( + self.udp_dns_sockets_by_upstream_and_query_id + .poll_timeout() + .map(|instant| (instant, "DNS socket timeout")), + ) + .chain( + self.tcp_dns_client + .poll_timeout() + .map(|instant| (instant, "TCP DNS client")), + ) + .chain( + self.tcp_dns_server + .poll_timeout() + .map(|instant| (instant, "TCP DNS server")), + ) .chain(self.node.poll_timeout()) - .min() + .min_by_key(|(instant, _)| *instant) } pub fn handle_timeout(&mut self, now: Instant) { diff --git a/rust/connlib/tunnel/src/gateway.rs b/rust/connlib/tunnel/src/gateway.rs index 575595239..d1c5ddefb 100644 --- a/rust/connlib/tunnel/src/gateway.rs +++ b/rust/connlib/tunnel/src/gateway.rs @@ -1,7 +1,6 @@ use crate::messages::gateway::ResourceDescription; use crate::messages::{Answer, IceCredentials, ResolveRequest, SecretKey}; use crate::peer::TranslateOutboundResult; -use crate::utils::earliest; use crate::{GatewayEvent, IpConfig, p2p_control}; use crate::{peer::ClientOnGateway, peer_store::PeerStore}; use anyhow::{Context, Result}; @@ -13,6 +12,7 @@ use ip_packet::{FzP2pControlSlice, IpPacket}; use secrecy::{ExposeSecret as _, Secret}; use snownet::{Credentials, NoTurnServers, RelaySocket, ServerNode, Transmit}; use std::collections::{BTreeMap, BTreeSet, VecDeque}; +use std::iter; use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -339,9 +339,15 @@ impl GatewayState { Ok(()) } - pub fn poll_timeout(&mut self) -> Option { - // TODO: This should check when the next resource actually expires instead of doing it at a fixed interval. - earliest(self.next_expiry_resources_check, self.node.poll_timeout()) + pub fn poll_timeout(&mut self) -> Option<(Instant, &'static str)> { + iter::empty() + // TODO: This should check when the next resource actually expires instead of doing it at a fixed interval. + .chain( + self.next_expiry_resources_check + .map(|instant| (instant, "resource expiry")), + ) + .chain(self.node.poll_timeout()) + .min_by_key(|(instant, _)| *instant) } pub fn handle_timeout(&mut self, now: Instant, utc_now: DateTime) { diff --git a/rust/connlib/tunnel/src/io.rs b/rust/connlib/tunnel/src/io.rs index 7a4cfb486..2a4e32cbc 100644 --- a/rust/connlib/tunnel/src/io.rs +++ b/rust/connlib/tunnel/src/io.rs @@ -21,6 +21,7 @@ use std::{ task::{Context, Poll, ready}, time::{Duration, Instant}, }; +use tracing::Level; use tun::Tun; /// How many IP packets we will at most read from the MPSC-channel connected to our TUN device thread. @@ -326,15 +327,26 @@ impl Io { self.nameservers.evaluate(); } - pub fn reset_timeout(&mut self, timeout: Instant) { + pub fn reset_timeout(&mut self, timeout: Instant, reason: &'static str) { + let wakeup_in = tracing::event_enabled!(Level::TRACE) + .then(|| timeout.duration_since(Instant::now())) + .map(tracing::field::debug); let timeout = tokio::time::Instant::from_std(timeout); match self.timeout.as_mut() { Some(existing_timeout) if existing_timeout.deadline() != timeout => { + tracing::trace!(wakeup_in, %reason); + existing_timeout.as_mut().reset(timeout) } Some(_) => {} - None => self.timeout = Some(Box::pin(tokio::time::sleep_until(timeout))), + None => { + self.timeout = { + tracing::trace!(?wakeup_in, %reason); + + Some(Box::pin(tokio::time::sleep_until(timeout))) + } + } } } @@ -430,7 +442,7 @@ mod tests { let mut io = Io::for_test(); let deadline = Instant::now() + Duration::from_secs(1); - io.reset_timeout(deadline); + io.reset_timeout(deadline, ""); let Input::Timeout(timeout) = io.next().await else { panic!("Unexpected result"); @@ -449,7 +461,7 @@ mod tests { let now = Instant::now(); let mut io = Io::for_test(); - io.reset_timeout(now - Duration::from_secs(10)); + io.reset_timeout(now - Duration::from_secs(10), ""); let Input::Timeout(timeout) = io.next().await else { panic!("Unexpected result"); diff --git a/rust/connlib/tunnel/src/lib.rs b/rust/connlib/tunnel/src/lib.rs index c58ab602b..2b0149c83 100644 --- a/rust/connlib/tunnel/src/lib.rs +++ b/rust/connlib/tunnel/src/lib.rs @@ -161,8 +161,8 @@ impl ClientTunnel { continue; } - if let Some(timeout) = self.role_state.poll_timeout() { - self.io.reset_timeout(timeout); + if let Some((timeout, reason)) = self.role_state.poll_timeout() { + self.io.reset_timeout(timeout, reason); } match self.io.poll(cx, &mut self.buffers)? { @@ -275,8 +275,8 @@ impl GatewayTunnel { continue; } - if let Some(timeout) = self.role_state.poll_timeout() { - self.io.reset_timeout(timeout); + if let Some((timeout, reason)) = self.role_state.poll_timeout() { + self.io.reset_timeout(timeout, reason); } match self.io.poll(cx, &mut self.buffers)? { diff --git a/rust/connlib/tunnel/src/tests/sim_client.rs b/rust/connlib/tunnel/src/tests/sim_client.rs index 39ea8e0a0..b06ebec25 100644 --- a/rust/connlib/tunnel/src/tests/sim_client.rs +++ b/rust/connlib/tunnel/src/tests/sim_client.rs @@ -198,7 +198,7 @@ impl SimClient { self.tcp_dns_client.handle_timeout(now); self.tcp_client.handle_timeout(now); - if self.sut.poll_timeout().is_some_and(|t| t <= now) { + if self.sut.poll_timeout().is_some_and(|(t, _)| t <= now) { self.sut.handle_timeout(now) } } diff --git a/rust/connlib/tunnel/src/tests/sim_net.rs b/rust/connlib/tunnel/src/tests/sim_net.rs index 4320b95ad..e79029563 100644 --- a/rust/connlib/tunnel/src/tests/sim_net.rs +++ b/rust/connlib/tunnel/src/tests/sim_net.rs @@ -1,5 +1,5 @@ +use crate::tests::buffered_transmits::BufferedTransmits; use crate::tests::strategies::documentation_ip6s; -use crate::{tests::buffered_transmits::BufferedTransmits, utils::earliest}; use connlib_model::{ClientId, GatewayId, RelayId}; use firezone_relay::{AddressFamily, IpStack}; use ip_network::IpNetwork; @@ -8,7 +8,7 @@ use proptest::prelude::*; use snownet::Transmit; use std::{ collections::HashSet, - fmt, + fmt, iter, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, time::{Duration, Instant}, }; @@ -111,30 +111,37 @@ impl Host where T: PollTimeout, { - pub(crate) fn poll_timeout(&mut self) -> Option { - earliest(self.inner.poll_timeout(), self.inbox.next_transmit()) + pub(crate) fn poll_timeout(&mut self) -> Option<(Instant, &'static str)> { + iter::empty() + .chain(self.inner.poll_timeout()) + .chain( + self.inbox + .next_transmit() + .map(|instant| (instant, "inbox transmit")), + ) + .min_by_key(|(instant, _)| *instant) } } pub(crate) trait PollTimeout { - fn poll_timeout(&mut self) -> Option; + fn poll_timeout(&mut self) -> Option<(Instant, &'static str)>; } impl PollTimeout for SimClient { - fn poll_timeout(&mut self) -> Option { + fn poll_timeout(&mut self) -> Option<(Instant, &'static str)> { self.sut.poll_timeout() } } impl PollTimeout for SimGateway { - fn poll_timeout(&mut self) -> Option { + fn poll_timeout(&mut self) -> Option<(Instant, &'static str)> { self.sut.poll_timeout() } } impl PollTimeout for SimRelay { - fn poll_timeout(&mut self) -> Option { - self.sut.poll_timeout() + fn poll_timeout(&mut self) -> Option<(Instant, &'static str)> { + self.sut.poll_timeout().map(|instant| (instant, "")) } } diff --git a/rust/connlib/tunnel/src/tests/sut.rs b/rust/connlib/tunnel/src/tests/sut.rs index 25873acfe..ec4c76cb6 100644 --- a/rust/connlib/tunnel/src/tests/sut.rs +++ b/rust/connlib/tunnel/src/tests/sut.rs @@ -14,7 +14,6 @@ use crate::messages::{IceCredentials, Key, SecretKey}; use crate::tests::assertions::*; use crate::tests::flux_capacitor::FluxCapacitor; use crate::tests::transition::Transition; -use crate::utils::earliest; use crate::{ClientEvent, GatewayEvent, dns, messages::Interface}; use bufferpool::BufferPool; use connlib_model::{ClientId, GatewayId, PublicKey, RelayId}; @@ -393,9 +392,9 @@ impl TunnelTest { now, ); - if let Some(next) = self.poll_timeout() { + if let Some((next, reason)) = self.poll_timeout() { if next < now { - tracing::error!(?next, ?now, "State machine requested time in the past"); + tracing::error!(?next, ?now, %reason, "State machine requested time in the past"); } } @@ -519,7 +518,7 @@ impl TunnelTest { continue; } - let Some(time_to_next_action) = self.poll_timeout() else { + let Some((time_to_next_action, _)) = self.poll_timeout() else { break; // Nothing to do. }; @@ -589,7 +588,7 @@ impl TunnelTest { } gateway.exec_mut(|g| { - if g.sut.poll_timeout().is_some_and(|t| t <= now) { + if g.sut.poll_timeout().is_some_and(|(t, _)| t <= now) { g.sut.handle_timeout(now, self.flux_capacitor.now()) } }); @@ -613,20 +612,12 @@ impl TunnelTest { } } - fn poll_timeout(&mut self) -> Option { - let client = self.client.poll_timeout(); - let gateway = self - .gateways - .values_mut() - .flat_map(|g| g.poll_timeout()) - .min(); - let relay = self - .relays - .values_mut() - .flat_map(|r| r.poll_timeout()) - .min(); - - earliest(client, earliest(gateway, relay)) + fn poll_timeout(&mut self) -> Option<(Instant, &'static str)> { + iter::empty() + .chain(self.client.poll_timeout()) + .chain(self.gateways.values_mut().flat_map(|g| g.poll_timeout())) + .chain(self.relays.values_mut().flat_map(|r| r.poll_timeout())) + .min_by_key(|(instant, _)| *instant) } /// Dispatches a [`Transmit`] to the correct host. diff --git a/rust/connlib/tunnel/src/utils.rs b/rust/connlib/tunnel/src/utils.rs index 990a5696f..bc6ab3dc2 100644 --- a/rust/connlib/tunnel/src/utils.rs +++ b/rust/connlib/tunnel/src/utils.rs @@ -3,7 +3,7 @@ use connlib_model::RelayId; use ip_network::{IpNetwork, Ipv4Network, Ipv6Network}; use itertools::Itertools as _; use snownet::RelaySocket; -use std::{collections::BTreeSet, net::SocketAddr, time::Instant}; +use std::{collections::BTreeSet, net::SocketAddr}; pub fn turn(relays: &[Relay]) -> BTreeSet<(RelayId, RelaySocket, String, String, String)> { relays @@ -57,15 +57,6 @@ pub fn turn(relays: &[Relay]) -> BTreeSet<(RelayId, RelaySocket, String, String, .collect() } -pub fn earliest(left: Option, right: Option) -> Option { - match (left, right) { - (None, None) => None, - (Some(left), Some(right)) => Some(std::cmp::min(left, right)), - (Some(left), None) => Some(left), - (None, Some(right)) => Some(right), - } -} - pub(crate) fn network_contains_network(ip_a: IpNetwork, ip_b: IpNetwork) -> bool { ip_a.contains(ip_b.network_address()) && ip_a.netmask() <= ip_b.netmask() }