refactor(snownet): learn host candidates from TURN traffic (#9998)

Presently, for each UDP packet that we process in `snownet`, we check if
we have already seen this local address of ours and if not, add it to
our list of host candidates. This is a safe way for ensuring that we
consider all addresses that we receive data on as ones that we tell our
peers that they should try and contact us on.

Performance profiling has shown that hashing the socket address of each
packet that is coming in is quite wasteful. We spend about 4-5% of our
main thread time doing this. For comparison, decrypting packets is only
about 30%.

Most of the time, we will already know about this address and therefore,
spending all this CPU time is completely pointless. At the same time
though, we need to be sure that we do discover our local address
correctly.

Inspired by STUN, we therefore move this responsibility to the
`allocation` module. The `allocation` module is responsible for
interacting with our TURN servers and will yield server-reflexive and
relay candidates as a result. It also knows, what the local address is
that it received traffic on so we simply extend that to yield host
candidates as well in addition to server-reflexive and relay candidates.

On my local machine, this bumps us across the 3.5 Gbits/sec mark:

```
Connecting to host 172.20.0.110, port 5201
[  5] local 100.93.174.92 port 57890 connected to 172.20.0.110 port 5201
[ ID] Interval           Transfer     Bitrate         Retr  Cwnd
[  5]   0.00-1.00   sec   319 MBytes  2.67 Gbits/sec   18    548 KBytes       
[  5]   1.00-2.00   sec   413 MBytes  3.46 Gbits/sec    4    884 KBytes       
[  5]   2.00-3.00   sec   417 MBytes  3.50 Gbits/sec    4   1.10 MBytes       
[  5]   3.00-4.00   sec   425 MBytes  3.56 Gbits/sec  415    785 KBytes       
[  5]   4.00-5.00   sec   430 MBytes  3.60 Gbits/sec  154    820 KBytes       
[  5]   5.00-6.00   sec   434 MBytes  3.64 Gbits/sec  251    793 KBytes       
[  5]   6.00-7.00   sec   436 MBytes  3.66 Gbits/sec  123    811 KBytes       
[  5]   7.00-8.00   sec   435 MBytes  3.65 Gbits/sec    2    788 KBytes       
[  5]   8.00-9.00   sec   423 MBytes  3.55 Gbits/sec    0   1.06 MBytes       
[  5]   9.00-10.00  sec   433 MBytes  3.63 Gbits/sec    8   1017 KBytes
- - - - - - - - - - - - - - - - - - - - - - - - -
[ ID] Interval           Transfer     Bitrate         Retr
[  5]   0.00-20.00  sec  8.21 GBytes  3.53 Gbits/sec  1728             sender
[  5]   0.00-20.00  sec  8.21 GBytes  3.53 Gbits/sec                  receiver

iperf Done.
```
This commit is contained in:
Thomas Eizinger
2025-07-28 21:38:39 +00:00
committed by GitHub
parent 9c71026416
commit 55304b3d2a
5 changed files with 88 additions and 234 deletions

View File

@@ -67,6 +67,10 @@ pub struct Allocation {
software: Software,
/// If present, the IPv4 address we received traffic on.
ip4_host_candidate: Option<Candidate>,
/// If present, the IPv6 address we received traffic on.
ip6_host_candidate: Option<Candidate>,
/// If present, the IPv4 address the relay observed for us.
ip4_srflx_candidate: Option<Candidate>,
/// If present, the IPv6 address the relay observed for us.
@@ -221,6 +225,8 @@ impl Allocation {
let mut allocation = Self {
server,
active_socket: None,
ip4_host_candidate: Default::default(),
ip6_host_candidate: Default::default(),
ip4_srflx_candidate: Default::default(),
ip6_srflx_candidate: Default::default(),
ip4_allocation: Default::default(),
@@ -248,6 +254,17 @@ impl Allocation {
allocation
}
pub fn host_and_server_reflexive_candidates(&self) -> impl Iterator<Item = Candidate> + use<> {
[
self.ip4_host_candidate.clone(),
self.ip6_host_candidate.clone(),
self.ip4_srflx_candidate.clone(),
self.ip6_srflx_candidate.clone(),
]
.into_iter()
.flatten()
}
pub fn current_relay_candidates(&self) -> impl Iterator<Item = Candidate> + use<> {
[self.ip4_allocation.clone(), self.ip6_allocation.clone()]
.into_iter()
@@ -504,7 +521,18 @@ impl Allocation {
match message.method() {
BINDING => {
// First, process the binding request itself.
// First, see if we need to update our host candidate.
let current_host_candidate = match local {
SocketAddr::V4(_) => &mut self.ip4_host_candidate,
SocketAddr::V6(_) => &mut self.ip6_host_candidate,
};
let maybe_candidate = Candidate::host(local, Protocol::Udp).ok();
if update_candidate(maybe_candidate, current_host_candidate, &mut self.events) {
self.log_update(now);
}
// Second, process the binding request itself.
let current_srflx_candidate = match original_dst {
SocketAddr::V4(_) => &mut self.ip4_srflx_candidate,
SocketAddr::V6(_) => &mut self.ip6_srflx_candidate,
@@ -515,7 +543,7 @@ impl Allocation {
self.log_update(now);
}
// Second, check if we have already determined which socket to use for this relay.
// Third, check if we have already determined which socket to use for this relay.
// We send 2 BINDING requests to start with (one for each IP version) and the first one coming back wins.
// Thus, if we already have a socket set, we are done with processing this binding request.
@@ -905,6 +933,8 @@ impl Allocation {
fn log_update(&self, now: Instant) {
tracing::info!(
host_ip4 = ?self.ip4_host_candidate.as_ref().map(|c| c.addr()),
host_ip6 = ?self.ip6_host_candidate.as_ref().map(|c| c.addr()),
srflx_ip4 = ?self.ip4_srflx_candidate.as_ref().map(|c| c.addr()),
srflx_ip6 = ?self.ip6_srflx_candidate.as_ref().map(|c| c.addr()),
relay_ip4 = ?self.ip4_allocation.as_ref().map(|c| c.addr()),
@@ -2113,6 +2143,11 @@ mod tests {
Instant::now(),
);
let next_event = allocation.poll_event();
assert_eq!(
next_event,
Some(Event::New(Candidate::host(PEER1, Protocol::Udp).unwrap()))
);
let next_event = allocation.poll_event();
assert_eq!(
next_event,
@@ -2626,9 +2661,11 @@ mod tests {
assert_eq!(
events,
vec![
Event::New(Candidate::host(PEER2_IP4, Protocol::Udp).unwrap()),
Event::New(
Candidate::server_reflexive(PEER2_IP4, PEER2_IP4, Protocol::Udp).unwrap()
),
Event::New(Candidate::host(PEER2_IP6, Protocol::Udp).unwrap()),
Event::New(
Candidate::server_reflexive(PEER2_IP6, PEER2_IP6, Protocol::Udp).unwrap()
)

View File

@@ -1,129 +0,0 @@
use std::collections::HashSet;
use itertools::Itertools;
use str0m::{Candidate, CandidateKind};
/// Custom "set" implementation for [`Candidate`]s based on a [`HashSet`] with an enforced ordering when iterating.
///
/// The set only allows host and server-reflexive candidates as only those need to be de-duplicated in order to avoid
/// spamming the remote with duplicate candidates.
#[derive(Debug, Default)]
pub struct CandidateSet {
host: HashSet<Candidate>,
server_reflexive: HashSet<Candidate>,
}
impl CandidateSet {
#[expect(
clippy::disallowed_methods,
reason = "We don't care about the ordering."
)]
pub fn insert(&mut self, new: Candidate) -> bool {
match new.kind() {
CandidateKind::PeerReflexive | CandidateKind::Relayed => {
debug_assert!(false);
tracing::warn!(
"CandidateSet is not meant to be used with candidates of kind {}",
new.kind()
);
false
}
CandidateKind::Host => self.host.insert(new),
CandidateKind::ServerReflexive => {
// Hashing a `Candidate` takes longer than checking a handful of entries using their `PartialEq` implementation.
// This function is in the hot-path so it needs to be fast ...
if self.server_reflexive.iter().any(|c| c == &new) {
return false;
}
self.server_reflexive.retain(|current| {
let is_ip_version_different = current.addr().is_ipv4() != new.addr().is_ipv4();
if !is_ip_version_different {
tracing::debug!(%current, %new, "Replacing server-reflexive candidate");
}
// Candidates of different IP version are also kept.
is_ip_version_different
});
self.server_reflexive.insert(new)
}
}
}
pub fn clear(&mut self) {
self.host.clear();
self.server_reflexive.clear();
}
#[expect(
clippy::disallowed_methods,
reason = "We are guaranteeing a stable ordering"
)]
pub fn iter(&self) -> impl Iterator<Item = &Candidate> {
std::iter::empty()
.chain(self.host.iter())
.chain(self.server_reflexive.iter())
.sorted_by(|l, r| l.prio().cmp(&r.prio()).then(l.addr().cmp(&r.addr())))
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use str0m::net::Protocol;
const SOCK_ADDR_IP4_BASE: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 10);
const SOCK_ADDR1: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1234);
const SOCK_ADDR2: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5678);
const SOCK_ADDR_IP6_BASE: SocketAddr = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 10);
const SOCK_ADDR3: SocketAddr = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 1234);
const SOCK_ADDR4: SocketAddr = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5678);
#[test]
fn only_allows_one_server_reflexive_candidate_per_ip_family() {
let mut set = CandidateSet::default();
let host1 = Candidate::host(SOCK_ADDR_IP4_BASE, Protocol::Udp).unwrap();
let host2 = Candidate::host(SOCK_ADDR_IP6_BASE, Protocol::Udp).unwrap();
assert!(set.insert(host1.clone()));
assert!(set.insert(host2.clone()));
let c1 =
Candidate::server_reflexive(SOCK_ADDR1, SOCK_ADDR_IP4_BASE, Protocol::Udp).unwrap();
let c2 =
Candidate::server_reflexive(SOCK_ADDR2, SOCK_ADDR_IP4_BASE, Protocol::Udp).unwrap();
let c3 =
Candidate::server_reflexive(SOCK_ADDR3, SOCK_ADDR_IP6_BASE, Protocol::Udp).unwrap();
let c4 =
Candidate::server_reflexive(SOCK_ADDR4, SOCK_ADDR_IP6_BASE, Protocol::Udp).unwrap();
assert!(set.insert(c1));
assert!(set.insert(c2.clone()));
assert!(set.insert(c3));
assert!(set.insert(c4.clone()));
assert_eq!(
set.iter().cloned().collect::<Vec<_>>(),
vec![c2, c4, host1, host2]
);
}
#[test]
fn allows_multiple_host_candidates_of_same_ip_base() {
let mut set = CandidateSet::default();
let host1 = Candidate::host(SOCK_ADDR1, Protocol::Udp).unwrap();
let host2 = Candidate::host(SOCK_ADDR2, Protocol::Udp).unwrap();
assert!(set.insert(host1.clone()));
assert!(set.insert(host2.clone()));
assert_eq!(set.iter().cloned().collect::<Vec<_>>(), vec![host1, host2]);
}
}

View File

@@ -5,7 +5,6 @@
mod allocation;
mod backoff;
mod candidate_set;
mod channel_data;
mod index;
mod node;

View File

@@ -1,5 +1,4 @@
use crate::allocation::{self, Allocation, RelaySocket, Socket};
use crate::candidate_set::CandidateSet;
use crate::index::IndexLfsr;
use crate::stats::{ConnectionStats, NodeStats};
use crate::utils::channel_data_packet_buffer;
@@ -121,8 +120,7 @@ pub struct Node<T, TId, RId> {
index: IndexLfsr,
rate_limiter: Arc<RateLimiter>,
/// Host and server-reflexive candidates that are shared between all connections.
shared_candidates: CandidateSet,
buffered_transmits: VecDeque<Transmit>,
next_rate_limiter_reset: Option<Instant>,
@@ -164,7 +162,6 @@ where
mode: T::new(),
index,
rate_limiter: Arc::new(RateLimiter::new_at(public_key, HANDSHAKE_RATE_LIMIT, now)),
shared_candidates: Default::default(),
buffered_transmits: VecDeque::default(),
next_rate_limiter_reset: None,
pending_events: VecDeque::default(),
@@ -202,7 +199,6 @@ where
self.pending_events.extend(closed_connections);
self.shared_candidates.clear();
self.connections.clear();
self.buffered_transmits.clear();
@@ -273,18 +269,13 @@ where
c.state.on_upsert(cid, &mut c.agent, now);
for candidate in c.agent.local_candidates() {
signal_candidate_to_remote(cid, candidate, &mut self.pending_events);
}
// Server-reflexive candidates are not in the local candidates of the ICE agent so those need special handling.
for candidate in self
.shared_candidates
.iter()
.filter(|c| c.kind() == CandidateKind::ServerReflexive)
{
signal_candidate_to_remote(cid, candidate, &mut self.pending_events);
}
seed_agent_with_local_candidates(
cid,
c.relay,
&mut c.agent,
&self.allocations,
&mut self.pending_events,
);
return Ok(());
}
@@ -305,7 +296,13 @@ where
agent.set_local_credentials(local_creds);
agent.set_remote_credentials(remote_creds);
self.seed_agent_with_local_candidates(cid, selected_relay, &mut agent);
seed_agent_with_local_candidates(
cid,
selected_relay,
&mut agent,
&self.allocations,
&mut self.pending_events,
);
let connection =
self.init_connection(cid, agent, remote, preshared_key, selected_relay, now, now);
@@ -330,35 +327,6 @@ where
(self.stats, self.connections.stats())
}
/// Add an address as a `host` candidate.
///
/// For most network topologies, [`snownet`](crate) will automatically discover host candidates via the traffic to the configured STUN and TURN servers.
/// However, in topologies like the one below, we cannot discover that there is a more optimal link between BACKEND and DB.
/// For those situations, users need to manually add the address of the direct link in order for [`snownet`](crate) to establish a connection.
///
/// ```text
/// ┌──────┐ ┌──────┐
/// │ STUN ├─┐ ┌─┤ TURN │
/// └──────┘ │ │ └──────┘
/// │ │
/// ┌─┴──────┴─┐
/// ┌────────┤ WAN ├───────┐
/// │ └──────────┘ │
/// ┌────┴─────┐ ┌──┴───┐
/// │ FW │ │ FW │
/// └────┬─────┘ └──┬───┘
/// │ ┌──┐ │
/// ┌───┴─────┐ │ │ ┌─┴──┐
/// │ BACKEND ├──────┤FW├─────────┤ DB │
/// └─────────┘ │ │ └────┘
/// └──┘
/// ```
pub fn add_local_host_candidate(&mut self, address: SocketAddr) -> Result<()> {
self.add_local_as_host_candidate(address)?;
Ok(())
}
#[tracing::instrument(level = "info", skip_all, fields(%cid))]
pub fn add_remote_candidate(&mut self, cid: TId, candidate: String, now: Instant) {
let candidate = match Candidate::from_sdp_string(&candidate) {
@@ -438,8 +406,6 @@ where
packet: &[u8],
now: Instant,
) -> Result<Option<(TId, IpPacket)>> {
self.add_local_as_host_candidate(local)?;
let (from, packet, relayed) = match self.allocations_try_handle(from, local, packet, now) {
ControlFlow::Continue(c) => c,
ControlFlow::Break(()) => return Ok(None),
@@ -773,25 +739,6 @@ where
}
}
/// Attempt to add the `local` address as a host candidate.
///
/// Receiving traffic on a certain interface means we at least have a connection to a relay via this interface.
/// Thus, it is also a viable interface to attempt a connection to a gateway.
fn add_local_as_host_candidate(&mut self, local: SocketAddr) -> Result<()> {
let host_candidate =
Candidate::host(local, Protocol::Udp).context("Failed to parse host candidate")?;
if !self.shared_candidates.insert(host_candidate.clone()) {
return Ok(());
}
for (cid, agent) in self.connections.agents_mut() {
add_local_candidate(cid, agent, host_candidate.clone(), &mut self.pending_events);
}
Ok(())
}
/// Tries to handle the packet using one of our [`Allocation`]s.
///
/// This function is in the hot-path of packet processing and thus must be as efficient as possible.
@@ -981,17 +928,6 @@ where
tracing::trace!(%rid, ?event);
match event {
allocation::Event::New(candidate)
if candidate.kind() == CandidateKind::ServerReflexive =>
{
if !self.shared_candidates.insert(candidate.clone()) {
continue;
}
for (cid, agent) in self.connections.agents_by_relay_mut(rid) {
add_local_candidate(cid, agent, candidate.clone(), &mut self.pending_events)
}
}
allocation::Event::New(candidate) => {
for (cid, agent) in self.connections.agents_by_relay_mut(rid) {
add_local_candidate(cid, agent, candidate.clone(), &mut self.pending_events)
@@ -1098,7 +1034,13 @@ where
let selected_relay = initial.relay;
self.seed_agent_with_local_candidates(cid, selected_relay, &mut agent);
seed_agent_with_local_candidates(
cid,
selected_relay,
&mut agent,
&self.allocations,
&mut self.pending_events,
);
let connection = self.init_connection(
cid,
@@ -1163,7 +1105,13 @@ where
};
let selected_relay = self.sample_relay()?;
self.seed_agent_with_local_candidates(cid, selected_relay, &mut agent);
seed_agent_with_local_candidates(
cid,
selected_relay,
&mut agent,
&self.allocations,
&mut self.pending_events,
);
let connection = self.init_connection(
cid,
@@ -1184,29 +1132,27 @@ where
}
}
impl<T, TId, RId> Node<T, TId, RId>
where
TId: Eq + Hash + Copy + fmt::Display,
RId: Copy + Eq + Hash + PartialEq + Ord + fmt::Debug + fmt::Display,
fn seed_agent_with_local_candidates<TId, RId>(
connection: TId,
selected_relay: RId,
agent: &mut IceAgent,
allocations: &BTreeMap<RId, Allocation>,
pending_events: &mut VecDeque<Event<TId>>,
) where
RId: Ord,
TId: fmt::Display + Copy,
{
fn seed_agent_with_local_candidates(
&mut self,
connection: TId,
selected_relay: RId,
agent: &mut IceAgent,
) {
for candidate in self.shared_candidates.iter().cloned() {
add_local_candidate(connection, agent, candidate, &mut self.pending_events);
}
let shared_candidates = allocations
.values()
.flat_map(|allocation| allocation.host_and_server_reflexive_candidates())
.unique();
let relay_candidates = allocations
.get(&selected_relay)
.into_iter()
.flat_map(|allocation| allocation.current_relay_candidates());
let Some(allocation) = self.allocations.get(&selected_relay) else {
tracing::debug!(%selected_relay, "Cannot seed relay candidates: Unknown relay");
return;
};
for candidate in allocation.current_relay_candidates() {
add_local_candidate(connection, agent, candidate, &mut self.pending_events);
}
for candidate in shared_candidates.chain(relay_candidates) {
add_local_candidate(connection, agent, candidate, pending_events);
}
}

View File

@@ -193,3 +193,4 @@ cc 122b860cc7c9ab8e500c057d8b3183b1b0586d0a38d5e8e7c61a77458d06a716
cc 613cba6cf1b34d88c6733b1272aae83d4f71abfdbe66d2109f890ff7d3bf6692
cc d88ebc9e70446d9f8b602032d1d68e72175630d1073ec9410810bccac31d49cf
cc 0f7aea8d78b70ac939b68b97d93206c077744f677a473336d05b1217b6e042c4
cc a065d273b793538b61647caf61edce567957b435c50b781dc5710f0a60f22efe