mirror of
https://github.com/outbackdingo/firezone.git
synced 2026-01-27 10:18:54 +00:00
feat(connlib): decrease connection setup latency (#4022)
`snownet` is built in a SANS-IO way, which means it doesn't have internal timers or IO. It is up to the upper layer to correctly check `poll_timeout` and call `handle_timeout` as soon as that expires. _When_ we want to be called again (i.e. the result of `poll_timeout`) may change every time `snownet`s internal state changes. This is especially critical during the initial setup of a connection. As we learn about our own candidates and candidates from the other party, we form new pairs. To actually detect whether the pair is a viable network path, we need to send a STUN request. When to send STUN requests is controlled by time. A newly formed pair should send a STUN request as soon as possible to minimize latency. Previously, we did not update the timer upon which we "wake" `snownet` using `handle_timeout`. As such, we waited unnecessarily long before sending STUN requests to newly formed pairs. With this patch, we check `poll_timeout` at end of the `Tunnel`'s `poll` function and immediately call `handle_timeout` in case we need to. Currently, `str0m` throttles updates to `handle_timeout` in 50ms blocks which still creates some delay. With that commented out, I observed improvements of ~0.7s for establishing new connections. Most of the time, the 2nd ping already goes through!
This commit is contained in:
@@ -9,7 +9,7 @@ use connlib_shared::{
|
||||
CallbackErrorFacade, Callbacks, Error, Result,
|
||||
};
|
||||
use device_channel::Device;
|
||||
use futures_util::{future::BoxFuture, task::AtomicWaker, FutureExt};
|
||||
use futures_util::{task::AtomicWaker, FutureExt};
|
||||
use peer::PacketTransform;
|
||||
use peer_store::PeerStore;
|
||||
use snownet::{Node, Server};
|
||||
@@ -19,6 +19,7 @@ use std::{
|
||||
fmt,
|
||||
hash::Hash,
|
||||
io,
|
||||
pin::Pin,
|
||||
task::{ready, Context, Poll},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
@@ -122,6 +123,11 @@ where
|
||||
Poll::Pending => {}
|
||||
}
|
||||
|
||||
// After any state change, check what the new timeout is and reset it if necessary.
|
||||
if self.connections_state.poll_timeout(cx).is_ready() {
|
||||
cx.waker().wake_by_ref()
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
@@ -180,6 +186,11 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
// After any state change, check what the new timeout is and reset it if necessary.
|
||||
if self.connections_state.poll_timeout(cx).is_ready() {
|
||||
cx.waker().wake_by_ref()
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
@@ -229,7 +240,7 @@ where
|
||||
struct ConnectionState<TRole, TId> {
|
||||
pub node: Node<TRole, TId>,
|
||||
write_buf: Box<[u8; MAX_UDP_SIZE]>,
|
||||
connection_pool_timeout: BoxFuture<'static, std::time::Instant>,
|
||||
timeout: Option<Pin<Box<tokio::time::Sleep>>>,
|
||||
stats_timer: tokio::time::Interval,
|
||||
sockets: Sockets,
|
||||
}
|
||||
@@ -242,9 +253,9 @@ where
|
||||
Ok(ConnectionState {
|
||||
node: Node::new(private_key, std::time::Instant::now()),
|
||||
write_buf: Box::new([0; MAX_UDP_SIZE]),
|
||||
connection_pool_timeout: sleep_until(std::time::Instant::now()).boxed(),
|
||||
sockets: Sockets::new()?,
|
||||
stats_timer: tokio::time::interval(Duration::from_secs(60)),
|
||||
timeout: None,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -335,17 +346,6 @@ where
|
||||
}
|
||||
|
||||
fn poll_next_event(&mut self, cx: &mut Context<'_>) -> Poll<Event<TId>> {
|
||||
if let Poll::Ready(prev_timeout) = self.connection_pool_timeout.poll_unpin(cx) {
|
||||
self.node.handle_timeout(prev_timeout);
|
||||
if let Some(new_timeout) = self.node.poll_timeout() {
|
||||
debug_assert_ne!(prev_timeout, new_timeout, "Timer busy loop!");
|
||||
|
||||
self.connection_pool_timeout = sleep_until(new_timeout).boxed();
|
||||
}
|
||||
|
||||
cx.waker().wake_by_ref();
|
||||
}
|
||||
|
||||
if self.stats_timer.poll_tick(cx).is_ready() {
|
||||
let (node_stats, conn_stats) = self.node.stats();
|
||||
|
||||
@@ -386,6 +386,33 @@ where
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
|
||||
fn poll_timeout(&mut self, cx: &mut Context<'_>) -> Poll<()> {
|
||||
if let Some(timeout) = self.node.poll_timeout() {
|
||||
let timeout = tokio::time::Instant::from_std(timeout);
|
||||
|
||||
match self.timeout.as_mut() {
|
||||
Some(existing_timeout) if existing_timeout.deadline() != timeout => {
|
||||
existing_timeout.as_mut().reset(timeout)
|
||||
}
|
||||
Some(_) => {}
|
||||
None => self.timeout = Some(Box::pin(tokio::time::sleep_until(timeout))),
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(timeout) = self.timeout.as_mut() {
|
||||
ready!(timeout.poll_unpin(cx));
|
||||
self.node.handle_timeout(timeout.deadline().into());
|
||||
|
||||
return Poll::Ready(());
|
||||
}
|
||||
|
||||
// Technically, we should set a waker here because we don't have a timer.
|
||||
// But the only place where we set a timer is a few lines up.
|
||||
// That is the same path that will re-poll it so there is no point in using a waker.
|
||||
// We might want to consider making a `MaybeSleep` type that encapsulates a waker so we don't need to think about it as hard.
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
pub enum Event<TId> {
|
||||
@@ -403,9 +430,3 @@ pub enum Event<TId> {
|
||||
SendPacket(IpPacket<'static>),
|
||||
StopPeer(TId),
|
||||
}
|
||||
|
||||
async fn sleep_until(deadline: Instant) -> Instant {
|
||||
tokio::time::sleep_until(deadline.into()).await;
|
||||
|
||||
deadline
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user