mirror of
https://github.com/outbackdingo/firezone.git
synced 2026-01-27 18:18:55 +00:00
chore(connlib): add "wake reason" to poll_timeout (#9876)
In order to debug timer interactions, it is useful to know when and why connlib wants to be woken to perform tasks.
This commit is contained in:
@@ -761,9 +761,10 @@ impl Allocation {
|
||||
self.buffered_transmits.pop_front()
|
||||
}
|
||||
|
||||
pub fn poll_timeout(&self) -> Option<Instant> {
|
||||
pub fn poll_timeout(&self) -> Option<(Instant, &'static str)> {
|
||||
let next_refresh = if !self.refresh_in_flight() {
|
||||
self.refresh_allocation_at()
|
||||
.map(|refresh_at| (refresh_at, "refresh allocation"))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
@@ -771,10 +772,11 @@ impl Allocation {
|
||||
let next_timeout = self
|
||||
.sent_requests
|
||||
.values()
|
||||
.map(|(_, _, b)| b.next_trigger());
|
||||
.map(|(_, _, b)| (b.next_trigger(), "resend TURN message"));
|
||||
|
||||
let next_keepalive = if self.has_allocation() {
|
||||
self.active_socket.map(|a| a.next_binding)
|
||||
self.active_socket
|
||||
.map(|a| (a.next_binding, "TURN keep-alive"))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
@@ -783,7 +785,7 @@ impl Allocation {
|
||||
.chain(next_refresh)
|
||||
.chain(next_keepalive)
|
||||
.chain(next_timeout)
|
||||
.min()
|
||||
.min_by_key(|(instant, _)| *instant)
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self, now), fields(active_socket = ?self.active_socket))]
|
||||
@@ -1967,7 +1969,7 @@ mod tests {
|
||||
let mut expected_backoffs = VecDeque::from(backoff::steps(start));
|
||||
|
||||
loop {
|
||||
let Some(timeout) = allocation.poll_timeout() else {
|
||||
let Some((timeout, _)) = allocation.poll_timeout() else {
|
||||
break;
|
||||
};
|
||||
|
||||
@@ -2286,11 +2288,11 @@ mod tests {
|
||||
Instant::now(),
|
||||
);
|
||||
|
||||
let refresh_at = allocation.poll_timeout().unwrap();
|
||||
let (refresh_at, _) = allocation.poll_timeout().unwrap();
|
||||
|
||||
allocation.handle_timeout(refresh_at);
|
||||
|
||||
assert!(allocation.poll_timeout().unwrap() > refresh_at);
|
||||
assert!(allocation.poll_timeout().unwrap().0 > refresh_at);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -2477,7 +2479,7 @@ mod tests {
|
||||
|
||||
// Test that we send binding requests it.
|
||||
{
|
||||
now = allocation.poll_timeout().unwrap();
|
||||
now = allocation.poll_timeout().unwrap().0;
|
||||
allocation.handle_timeout(now);
|
||||
|
||||
let binding = allocation.next_message().unwrap();
|
||||
@@ -2486,7 +2488,7 @@ mod tests {
|
||||
|
||||
// Simulate bindings timing out
|
||||
for _ in backoff::steps(now) {
|
||||
allocation.handle_timeout(allocation.poll_timeout().unwrap());
|
||||
allocation.handle_timeout(allocation.poll_timeout().unwrap().0);
|
||||
}
|
||||
|
||||
assert_eq!(
|
||||
@@ -2660,7 +2662,7 @@ mod tests {
|
||||
);
|
||||
|
||||
loop {
|
||||
let Some(timeout) = allocation.poll_timeout() else {
|
||||
let Some((timeout, _)) = allocation.poll_timeout() else {
|
||||
break;
|
||||
};
|
||||
|
||||
@@ -2680,7 +2682,7 @@ mod tests {
|
||||
let mut allocation = Allocation::for_test_dual(Instant::now());
|
||||
|
||||
loop {
|
||||
let Some(timeout) = allocation.poll_timeout() else {
|
||||
let Some((timeout, _)) = allocation.poll_timeout() else {
|
||||
break;
|
||||
};
|
||||
allocation.handle_timeout(timeout);
|
||||
|
||||
@@ -541,7 +541,7 @@ where
|
||||
/// This function only takes `&mut self` because it caches certain computations internally.
|
||||
/// The returned timestamp will **not** change unless other state is modified.
|
||||
#[must_use]
|
||||
pub fn poll_timeout(&mut self) -> Option<Instant> {
|
||||
pub fn poll_timeout(&mut self) -> Option<(Instant, &'static str)> {
|
||||
iter::empty()
|
||||
.chain(self.connections.poll_timeout())
|
||||
.chain(
|
||||
@@ -549,8 +549,11 @@ where
|
||||
.values_mut()
|
||||
.filter_map(|a| a.poll_timeout()),
|
||||
)
|
||||
.chain(self.next_rate_limiter_reset)
|
||||
.min()
|
||||
.chain(
|
||||
self.next_rate_limiter_reset
|
||||
.map(|instant| (instant, "rate limiter reset")),
|
||||
)
|
||||
.min_by_key(|(instant, _)| *instant)
|
||||
}
|
||||
|
||||
/// Advances time within the [`Node`].
|
||||
@@ -1438,7 +1441,7 @@ where
|
||||
self.established.values().all(|c| c.is_idle())
|
||||
}
|
||||
|
||||
fn poll_timeout(&mut self) -> Option<Instant> {
|
||||
fn poll_timeout(&mut self) -> Option<(Instant, &'static str)> {
|
||||
iter::empty()
|
||||
.chain(self.initial.values_mut().filter_map(|c| c.poll_timeout()))
|
||||
.chain(
|
||||
@@ -1446,7 +1449,7 @@ where
|
||||
.values_mut()
|
||||
.filter_map(|c| c.poll_timeout()),
|
||||
)
|
||||
.min()
|
||||
.min_by_key(|(instant, _)| *instant)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1644,11 +1647,18 @@ impl<RId> InitialConnection<RId> {
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_timeout(&mut self) -> Option<Instant> {
|
||||
fn poll_timeout(&mut self) -> Option<(Instant, &'static str)> {
|
||||
iter::empty()
|
||||
.chain(self.agent.poll_timeout())
|
||||
.chain(Some(self.no_answer_received_timeout()))
|
||||
.min()
|
||||
.chain(
|
||||
self.agent
|
||||
.poll_timeout()
|
||||
.map(|timeout| (timeout, "ICE agent")),
|
||||
)
|
||||
.chain(Some((
|
||||
self.no_answer_received_timeout(),
|
||||
"connection handshake timeout",
|
||||
)))
|
||||
.min_by_key(|(instant, _)| *instant)
|
||||
}
|
||||
|
||||
fn no_answer_received_timeout(&self) -> Instant {
|
||||
@@ -1719,13 +1729,13 @@ impl<RId> ConnectionState<RId>
|
||||
where
|
||||
RId: Copy,
|
||||
{
|
||||
fn poll_timeout(&self) -> Option<Instant> {
|
||||
fn poll_timeout(&self) -> Option<(Instant, &'static str)> {
|
||||
match self {
|
||||
ConnectionState::Connected {
|
||||
last_incoming,
|
||||
last_outgoing,
|
||||
..
|
||||
} => Some(idle_at(*last_incoming, *last_outgoing)),
|
||||
} => Some((idle_at(*last_incoming, *last_outgoing), "idle transition")),
|
||||
ConnectionState::Connecting { .. }
|
||||
| ConnectionState::Idle { .. }
|
||||
| ConnectionState::Failed => None,
|
||||
@@ -1865,14 +1875,24 @@ where
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
fn poll_timeout(&mut self) -> Option<Instant> {
|
||||
fn poll_timeout(&mut self) -> Option<(Instant, &'static str)> {
|
||||
iter::empty()
|
||||
.chain(self.agent.poll_timeout())
|
||||
.chain(Some(self.next_wg_timer_update))
|
||||
.chain(self.candidate_timeout())
|
||||
.chain(self.disconnect_timeout())
|
||||
.chain(
|
||||
self.agent
|
||||
.poll_timeout()
|
||||
.map(|instant| (instant, "ICE agent")),
|
||||
)
|
||||
.chain(Some((self.next_wg_timer_update, "boringtun tunnel")))
|
||||
.chain(
|
||||
self.candidate_timeout()
|
||||
.map(|instant| (instant, "candidate timeout")),
|
||||
)
|
||||
.chain(
|
||||
self.disconnect_timeout()
|
||||
.map(|instant| (instant, "disconnect timeout")),
|
||||
)
|
||||
.chain(self.state.poll_timeout())
|
||||
.min()
|
||||
.min_by_key(|(instant, _)| *instant)
|
||||
}
|
||||
|
||||
fn candidate_timeout(&self) -> Option<Instant> {
|
||||
|
||||
@@ -999,13 +999,25 @@ impl ClientState {
|
||||
.or_else(|| self.tcp_dns_server.poll_outbound())
|
||||
}
|
||||
|
||||
pub fn poll_timeout(&mut self) -> Option<Instant> {
|
||||
pub fn poll_timeout(&mut self) -> Option<(Instant, &'static str)> {
|
||||
iter::empty()
|
||||
.chain(self.udp_dns_sockets_by_upstream_and_query_id.poll_timeout())
|
||||
.chain(self.tcp_dns_client.poll_timeout())
|
||||
.chain(self.tcp_dns_server.poll_timeout())
|
||||
.chain(
|
||||
self.udp_dns_sockets_by_upstream_and_query_id
|
||||
.poll_timeout()
|
||||
.map(|instant| (instant, "DNS socket timeout")),
|
||||
)
|
||||
.chain(
|
||||
self.tcp_dns_client
|
||||
.poll_timeout()
|
||||
.map(|instant| (instant, "TCP DNS client")),
|
||||
)
|
||||
.chain(
|
||||
self.tcp_dns_server
|
||||
.poll_timeout()
|
||||
.map(|instant| (instant, "TCP DNS server")),
|
||||
)
|
||||
.chain(self.node.poll_timeout())
|
||||
.min()
|
||||
.min_by_key(|(instant, _)| *instant)
|
||||
}
|
||||
|
||||
pub fn handle_timeout(&mut self, now: Instant) {
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use crate::messages::gateway::ResourceDescription;
|
||||
use crate::messages::{Answer, IceCredentials, ResolveRequest, SecretKey};
|
||||
use crate::peer::TranslateOutboundResult;
|
||||
use crate::utils::earliest;
|
||||
use crate::{GatewayEvent, IpConfig, p2p_control};
|
||||
use crate::{peer::ClientOnGateway, peer_store::PeerStore};
|
||||
use anyhow::{Context, Result};
|
||||
@@ -13,6 +12,7 @@ use ip_packet::{FzP2pControlSlice, IpPacket};
|
||||
use secrecy::{ExposeSecret as _, Secret};
|
||||
use snownet::{Credentials, NoTurnServers, RelaySocket, ServerNode, Transmit};
|
||||
use std::collections::{BTreeMap, BTreeSet, VecDeque};
|
||||
use std::iter;
|
||||
use std::net::{IpAddr, SocketAddr};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
@@ -339,9 +339,15 @@ impl GatewayState {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn poll_timeout(&mut self) -> Option<Instant> {
|
||||
// TODO: This should check when the next resource actually expires instead of doing it at a fixed interval.
|
||||
earliest(self.next_expiry_resources_check, self.node.poll_timeout())
|
||||
pub fn poll_timeout(&mut self) -> Option<(Instant, &'static str)> {
|
||||
iter::empty()
|
||||
// TODO: This should check when the next resource actually expires instead of doing it at a fixed interval.
|
||||
.chain(
|
||||
self.next_expiry_resources_check
|
||||
.map(|instant| (instant, "resource expiry")),
|
||||
)
|
||||
.chain(self.node.poll_timeout())
|
||||
.min_by_key(|(instant, _)| *instant)
|
||||
}
|
||||
|
||||
pub fn handle_timeout(&mut self, now: Instant, utc_now: DateTime<Utc>) {
|
||||
|
||||
@@ -21,6 +21,7 @@ use std::{
|
||||
task::{Context, Poll, ready},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tracing::Level;
|
||||
use tun::Tun;
|
||||
|
||||
/// How many IP packets we will at most read from the MPSC-channel connected to our TUN device thread.
|
||||
@@ -326,15 +327,26 @@ impl Io {
|
||||
self.nameservers.evaluate();
|
||||
}
|
||||
|
||||
pub fn reset_timeout(&mut self, timeout: Instant) {
|
||||
pub fn reset_timeout(&mut self, timeout: Instant, reason: &'static str) {
|
||||
let wakeup_in = tracing::event_enabled!(Level::TRACE)
|
||||
.then(|| timeout.duration_since(Instant::now()))
|
||||
.map(tracing::field::debug);
|
||||
let timeout = tokio::time::Instant::from_std(timeout);
|
||||
|
||||
match self.timeout.as_mut() {
|
||||
Some(existing_timeout) if existing_timeout.deadline() != timeout => {
|
||||
tracing::trace!(wakeup_in, %reason);
|
||||
|
||||
existing_timeout.as_mut().reset(timeout)
|
||||
}
|
||||
Some(_) => {}
|
||||
None => self.timeout = Some(Box::pin(tokio::time::sleep_until(timeout))),
|
||||
None => {
|
||||
self.timeout = {
|
||||
tracing::trace!(?wakeup_in, %reason);
|
||||
|
||||
Some(Box::pin(tokio::time::sleep_until(timeout)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -430,7 +442,7 @@ mod tests {
|
||||
let mut io = Io::for_test();
|
||||
|
||||
let deadline = Instant::now() + Duration::from_secs(1);
|
||||
io.reset_timeout(deadline);
|
||||
io.reset_timeout(deadline, "");
|
||||
|
||||
let Input::Timeout(timeout) = io.next().await else {
|
||||
panic!("Unexpected result");
|
||||
@@ -449,7 +461,7 @@ mod tests {
|
||||
let now = Instant::now();
|
||||
let mut io = Io::for_test();
|
||||
|
||||
io.reset_timeout(now - Duration::from_secs(10));
|
||||
io.reset_timeout(now - Duration::from_secs(10), "");
|
||||
|
||||
let Input::Timeout(timeout) = io.next().await else {
|
||||
panic!("Unexpected result");
|
||||
|
||||
@@ -161,8 +161,8 @@ impl ClientTunnel {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(timeout) = self.role_state.poll_timeout() {
|
||||
self.io.reset_timeout(timeout);
|
||||
if let Some((timeout, reason)) = self.role_state.poll_timeout() {
|
||||
self.io.reset_timeout(timeout, reason);
|
||||
}
|
||||
|
||||
match self.io.poll(cx, &mut self.buffers)? {
|
||||
@@ -275,8 +275,8 @@ impl GatewayTunnel {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(timeout) = self.role_state.poll_timeout() {
|
||||
self.io.reset_timeout(timeout);
|
||||
if let Some((timeout, reason)) = self.role_state.poll_timeout() {
|
||||
self.io.reset_timeout(timeout, reason);
|
||||
}
|
||||
|
||||
match self.io.poll(cx, &mut self.buffers)? {
|
||||
|
||||
@@ -198,7 +198,7 @@ impl SimClient {
|
||||
self.tcp_dns_client.handle_timeout(now);
|
||||
self.tcp_client.handle_timeout(now);
|
||||
|
||||
if self.sut.poll_timeout().is_some_and(|t| t <= now) {
|
||||
if self.sut.poll_timeout().is_some_and(|(t, _)| t <= now) {
|
||||
self.sut.handle_timeout(now)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use crate::tests::buffered_transmits::BufferedTransmits;
|
||||
use crate::tests::strategies::documentation_ip6s;
|
||||
use crate::{tests::buffered_transmits::BufferedTransmits, utils::earliest};
|
||||
use connlib_model::{ClientId, GatewayId, RelayId};
|
||||
use firezone_relay::{AddressFamily, IpStack};
|
||||
use ip_network::IpNetwork;
|
||||
@@ -8,7 +8,7 @@ use proptest::prelude::*;
|
||||
use snownet::Transmit;
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
fmt,
|
||||
fmt, iter,
|
||||
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
@@ -111,30 +111,37 @@ impl<T> Host<T>
|
||||
where
|
||||
T: PollTimeout,
|
||||
{
|
||||
pub(crate) fn poll_timeout(&mut self) -> Option<Instant> {
|
||||
earliest(self.inner.poll_timeout(), self.inbox.next_transmit())
|
||||
pub(crate) fn poll_timeout(&mut self) -> Option<(Instant, &'static str)> {
|
||||
iter::empty()
|
||||
.chain(self.inner.poll_timeout())
|
||||
.chain(
|
||||
self.inbox
|
||||
.next_transmit()
|
||||
.map(|instant| (instant, "inbox transmit")),
|
||||
)
|
||||
.min_by_key(|(instant, _)| *instant)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) trait PollTimeout {
|
||||
fn poll_timeout(&mut self) -> Option<Instant>;
|
||||
fn poll_timeout(&mut self) -> Option<(Instant, &'static str)>;
|
||||
}
|
||||
|
||||
impl PollTimeout for SimClient {
|
||||
fn poll_timeout(&mut self) -> Option<Instant> {
|
||||
fn poll_timeout(&mut self) -> Option<(Instant, &'static str)> {
|
||||
self.sut.poll_timeout()
|
||||
}
|
||||
}
|
||||
|
||||
impl PollTimeout for SimGateway {
|
||||
fn poll_timeout(&mut self) -> Option<Instant> {
|
||||
fn poll_timeout(&mut self) -> Option<(Instant, &'static str)> {
|
||||
self.sut.poll_timeout()
|
||||
}
|
||||
}
|
||||
|
||||
impl PollTimeout for SimRelay {
|
||||
fn poll_timeout(&mut self) -> Option<Instant> {
|
||||
self.sut.poll_timeout()
|
||||
fn poll_timeout(&mut self) -> Option<(Instant, &'static str)> {
|
||||
self.sut.poll_timeout().map(|instant| (instant, ""))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -14,7 +14,6 @@ use crate::messages::{IceCredentials, Key, SecretKey};
|
||||
use crate::tests::assertions::*;
|
||||
use crate::tests::flux_capacitor::FluxCapacitor;
|
||||
use crate::tests::transition::Transition;
|
||||
use crate::utils::earliest;
|
||||
use crate::{ClientEvent, GatewayEvent, dns, messages::Interface};
|
||||
use bufferpool::BufferPool;
|
||||
use connlib_model::{ClientId, GatewayId, PublicKey, RelayId};
|
||||
@@ -393,9 +392,9 @@ impl TunnelTest {
|
||||
now,
|
||||
);
|
||||
|
||||
if let Some(next) = self.poll_timeout() {
|
||||
if let Some((next, reason)) = self.poll_timeout() {
|
||||
if next < now {
|
||||
tracing::error!(?next, ?now, "State machine requested time in the past");
|
||||
tracing::error!(?next, ?now, %reason, "State machine requested time in the past");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -519,7 +518,7 @@ impl TunnelTest {
|
||||
continue;
|
||||
}
|
||||
|
||||
let Some(time_to_next_action) = self.poll_timeout() else {
|
||||
let Some((time_to_next_action, _)) = self.poll_timeout() else {
|
||||
break; // Nothing to do.
|
||||
};
|
||||
|
||||
@@ -589,7 +588,7 @@ impl TunnelTest {
|
||||
}
|
||||
|
||||
gateway.exec_mut(|g| {
|
||||
if g.sut.poll_timeout().is_some_and(|t| t <= now) {
|
||||
if g.sut.poll_timeout().is_some_and(|(t, _)| t <= now) {
|
||||
g.sut.handle_timeout(now, self.flux_capacitor.now())
|
||||
}
|
||||
});
|
||||
@@ -613,20 +612,12 @@ impl TunnelTest {
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_timeout(&mut self) -> Option<Instant> {
|
||||
let client = self.client.poll_timeout();
|
||||
let gateway = self
|
||||
.gateways
|
||||
.values_mut()
|
||||
.flat_map(|g| g.poll_timeout())
|
||||
.min();
|
||||
let relay = self
|
||||
.relays
|
||||
.values_mut()
|
||||
.flat_map(|r| r.poll_timeout())
|
||||
.min();
|
||||
|
||||
earliest(client, earliest(gateway, relay))
|
||||
fn poll_timeout(&mut self) -> Option<(Instant, &'static str)> {
|
||||
iter::empty()
|
||||
.chain(self.client.poll_timeout())
|
||||
.chain(self.gateways.values_mut().flat_map(|g| g.poll_timeout()))
|
||||
.chain(self.relays.values_mut().flat_map(|r| r.poll_timeout()))
|
||||
.min_by_key(|(instant, _)| *instant)
|
||||
}
|
||||
|
||||
/// Dispatches a [`Transmit`] to the correct host.
|
||||
|
||||
@@ -3,7 +3,7 @@ use connlib_model::RelayId;
|
||||
use ip_network::{IpNetwork, Ipv4Network, Ipv6Network};
|
||||
use itertools::Itertools as _;
|
||||
use snownet::RelaySocket;
|
||||
use std::{collections::BTreeSet, net::SocketAddr, time::Instant};
|
||||
use std::{collections::BTreeSet, net::SocketAddr};
|
||||
|
||||
pub fn turn(relays: &[Relay]) -> BTreeSet<(RelayId, RelaySocket, String, String, String)> {
|
||||
relays
|
||||
@@ -57,15 +57,6 @@ pub fn turn(relays: &[Relay]) -> BTreeSet<(RelayId, RelaySocket, String, String,
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn earliest(left: Option<Instant>, right: Option<Instant>) -> Option<Instant> {
|
||||
match (left, right) {
|
||||
(None, None) => None,
|
||||
(Some(left), Some(right)) => Some(std::cmp::min(left, right)),
|
||||
(Some(left), None) => Some(left),
|
||||
(None, Some(right)) => Some(right),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn network_contains_network(ip_a: IpNetwork, ip_b: IpNetwork) -> bool {
|
||||
ip_a.contains(ip_b.network_address()) && ip_a.netmask() <= ip_b.netmask()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user