From 55304b3d2a1b18fbf47ab8bd67c49603b33db6ea Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 28 Jul 2025 21:38:39 +0000 Subject: [PATCH] refactor(snownet): learn host candidates from TURN traffic (#9998) Presently, for each UDP packet that we process in `snownet`, we check if we have already seen this local address of ours and if not, add it to our list of host candidates. This is a safe way for ensuring that we consider all addresses that we receive data on as ones that we tell our peers that they should try and contact us on. Performance profiling has shown that hashing the socket address of each packet that is coming in is quite wasteful. We spend about 4-5% of our main thread time doing this. For comparison, decrypting packets is only about 30%. Most of the time, we will already know about this address and therefore, spending all this CPU time is completely pointless. At the same time though, we need to be sure that we do discover our local address correctly. Inspired by STUN, we therefore move this responsibility to the `allocation` module. The `allocation` module is responsible for interacting with our TURN servers and will yield server-reflexive and relay candidates as a result. It also knows, what the local address is that it received traffic on so we simply extend that to yield host candidates as well in addition to server-reflexive and relay candidates. On my local machine, this bumps us across the 3.5 Gbits/sec mark: ``` Connecting to host 172.20.0.110, port 5201 [ 5] local 100.93.174.92 port 57890 connected to 172.20.0.110 port 5201 [ ID] Interval Transfer Bitrate Retr Cwnd [ 5] 0.00-1.00 sec 319 MBytes 2.67 Gbits/sec 18 548 KBytes [ 5] 1.00-2.00 sec 413 MBytes 3.46 Gbits/sec 4 884 KBytes [ 5] 2.00-3.00 sec 417 MBytes 3.50 Gbits/sec 4 1.10 MBytes [ 5] 3.00-4.00 sec 425 MBytes 3.56 Gbits/sec 415 785 KBytes [ 5] 4.00-5.00 sec 430 MBytes 3.60 Gbits/sec 154 820 KBytes [ 5] 5.00-6.00 sec 434 MBytes 3.64 Gbits/sec 251 793 KBytes [ 5] 6.00-7.00 sec 436 MBytes 3.66 Gbits/sec 123 811 KBytes [ 5] 7.00-8.00 sec 435 MBytes 3.65 Gbits/sec 2 788 KBytes [ 5] 8.00-9.00 sec 423 MBytes 3.55 Gbits/sec 0 1.06 MBytes [ 5] 9.00-10.00 sec 433 MBytes 3.63 Gbits/sec 8 1017 KBytes - - - - - - - - - - - - - - - - - - - - - - - - - [ ID] Interval Transfer Bitrate Retr [ 5] 0.00-20.00 sec 8.21 GBytes 3.53 Gbits/sec 1728 sender [ 5] 0.00-20.00 sec 8.21 GBytes 3.53 Gbits/sec receiver iperf Done. ``` --- rust/connlib/snownet/src/allocation.rs | 41 ++++- rust/connlib/snownet/src/candidate_set.rs | 129 --------------- rust/connlib/snownet/src/lib.rs | 1 - rust/connlib/snownet/src/node.rs | 150 ++++++------------ .../tunnel/proptest-regressions/tests.txt | 1 + 5 files changed, 88 insertions(+), 234 deletions(-) delete mode 100644 rust/connlib/snownet/src/candidate_set.rs diff --git a/rust/connlib/snownet/src/allocation.rs b/rust/connlib/snownet/src/allocation.rs index 181926623..b4948015c 100644 --- a/rust/connlib/snownet/src/allocation.rs +++ b/rust/connlib/snownet/src/allocation.rs @@ -67,6 +67,10 @@ pub struct Allocation { software: Software, + /// If present, the IPv4 address we received traffic on. + ip4_host_candidate: Option, + /// If present, the IPv6 address we received traffic on. + ip6_host_candidate: Option, /// If present, the IPv4 address the relay observed for us. ip4_srflx_candidate: Option, /// If present, the IPv6 address the relay observed for us. @@ -221,6 +225,8 @@ impl Allocation { let mut allocation = Self { server, active_socket: None, + ip4_host_candidate: Default::default(), + ip6_host_candidate: Default::default(), ip4_srflx_candidate: Default::default(), ip6_srflx_candidate: Default::default(), ip4_allocation: Default::default(), @@ -248,6 +254,17 @@ impl Allocation { allocation } + pub fn host_and_server_reflexive_candidates(&self) -> impl Iterator + use<> { + [ + self.ip4_host_candidate.clone(), + self.ip6_host_candidate.clone(), + self.ip4_srflx_candidate.clone(), + self.ip6_srflx_candidate.clone(), + ] + .into_iter() + .flatten() + } + pub fn current_relay_candidates(&self) -> impl Iterator + use<> { [self.ip4_allocation.clone(), self.ip6_allocation.clone()] .into_iter() @@ -504,7 +521,18 @@ impl Allocation { match message.method() { BINDING => { - // First, process the binding request itself. + // First, see if we need to update our host candidate. + let current_host_candidate = match local { + SocketAddr::V4(_) => &mut self.ip4_host_candidate, + SocketAddr::V6(_) => &mut self.ip6_host_candidate, + }; + + let maybe_candidate = Candidate::host(local, Protocol::Udp).ok(); + if update_candidate(maybe_candidate, current_host_candidate, &mut self.events) { + self.log_update(now); + } + + // Second, process the binding request itself. let current_srflx_candidate = match original_dst { SocketAddr::V4(_) => &mut self.ip4_srflx_candidate, SocketAddr::V6(_) => &mut self.ip6_srflx_candidate, @@ -515,7 +543,7 @@ impl Allocation { self.log_update(now); } - // Second, check if we have already determined which socket to use for this relay. + // Third, check if we have already determined which socket to use for this relay. // We send 2 BINDING requests to start with (one for each IP version) and the first one coming back wins. // Thus, if we already have a socket set, we are done with processing this binding request. @@ -905,6 +933,8 @@ impl Allocation { fn log_update(&self, now: Instant) { tracing::info!( + host_ip4 = ?self.ip4_host_candidate.as_ref().map(|c| c.addr()), + host_ip6 = ?self.ip6_host_candidate.as_ref().map(|c| c.addr()), srflx_ip4 = ?self.ip4_srflx_candidate.as_ref().map(|c| c.addr()), srflx_ip6 = ?self.ip6_srflx_candidate.as_ref().map(|c| c.addr()), relay_ip4 = ?self.ip4_allocation.as_ref().map(|c| c.addr()), @@ -2113,6 +2143,11 @@ mod tests { Instant::now(), ); + let next_event = allocation.poll_event(); + assert_eq!( + next_event, + Some(Event::New(Candidate::host(PEER1, Protocol::Udp).unwrap())) + ); let next_event = allocation.poll_event(); assert_eq!( next_event, @@ -2626,9 +2661,11 @@ mod tests { assert_eq!( events, vec![ + Event::New(Candidate::host(PEER2_IP4, Protocol::Udp).unwrap()), Event::New( Candidate::server_reflexive(PEER2_IP4, PEER2_IP4, Protocol::Udp).unwrap() ), + Event::New(Candidate::host(PEER2_IP6, Protocol::Udp).unwrap()), Event::New( Candidate::server_reflexive(PEER2_IP6, PEER2_IP6, Protocol::Udp).unwrap() ) diff --git a/rust/connlib/snownet/src/candidate_set.rs b/rust/connlib/snownet/src/candidate_set.rs deleted file mode 100644 index 1b57aed86..000000000 --- a/rust/connlib/snownet/src/candidate_set.rs +++ /dev/null @@ -1,129 +0,0 @@ -use std::collections::HashSet; - -use itertools::Itertools; -use str0m::{Candidate, CandidateKind}; - -/// Custom "set" implementation for [`Candidate`]s based on a [`HashSet`] with an enforced ordering when iterating. -/// -/// The set only allows host and server-reflexive candidates as only those need to be de-duplicated in order to avoid -/// spamming the remote with duplicate candidates. -#[derive(Debug, Default)] -pub struct CandidateSet { - host: HashSet, - server_reflexive: HashSet, -} - -impl CandidateSet { - #[expect( - clippy::disallowed_methods, - reason = "We don't care about the ordering." - )] - pub fn insert(&mut self, new: Candidate) -> bool { - match new.kind() { - CandidateKind::PeerReflexive | CandidateKind::Relayed => { - debug_assert!(false); - tracing::warn!( - "CandidateSet is not meant to be used with candidates of kind {}", - new.kind() - ); - - false - } - CandidateKind::Host => self.host.insert(new), - CandidateKind::ServerReflexive => { - // Hashing a `Candidate` takes longer than checking a handful of entries using their `PartialEq` implementation. - // This function is in the hot-path so it needs to be fast ... - if self.server_reflexive.iter().any(|c| c == &new) { - return false; - } - - self.server_reflexive.retain(|current| { - let is_ip_version_different = current.addr().is_ipv4() != new.addr().is_ipv4(); - - if !is_ip_version_different { - tracing::debug!(%current, %new, "Replacing server-reflexive candidate"); - } - - // Candidates of different IP version are also kept. - is_ip_version_different - }); - - self.server_reflexive.insert(new) - } - } - } - - pub fn clear(&mut self) { - self.host.clear(); - self.server_reflexive.clear(); - } - - #[expect( - clippy::disallowed_methods, - reason = "We are guaranteeing a stable ordering" - )] - pub fn iter(&self) -> impl Iterator { - std::iter::empty() - .chain(self.host.iter()) - .chain(self.server_reflexive.iter()) - .sorted_by(|l, r| l.prio().cmp(&r.prio()).then(l.addr().cmp(&r.addr()))) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; - use str0m::net::Protocol; - - const SOCK_ADDR_IP4_BASE: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 10); - const SOCK_ADDR1: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1234); - const SOCK_ADDR2: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5678); - - const SOCK_ADDR_IP6_BASE: SocketAddr = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 10); - const SOCK_ADDR3: SocketAddr = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 1234); - const SOCK_ADDR4: SocketAddr = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5678); - - #[test] - fn only_allows_one_server_reflexive_candidate_per_ip_family() { - let mut set = CandidateSet::default(); - - let host1 = Candidate::host(SOCK_ADDR_IP4_BASE, Protocol::Udp).unwrap(); - let host2 = Candidate::host(SOCK_ADDR_IP6_BASE, Protocol::Udp).unwrap(); - - assert!(set.insert(host1.clone())); - assert!(set.insert(host2.clone())); - - let c1 = - Candidate::server_reflexive(SOCK_ADDR1, SOCK_ADDR_IP4_BASE, Protocol::Udp).unwrap(); - let c2 = - Candidate::server_reflexive(SOCK_ADDR2, SOCK_ADDR_IP4_BASE, Protocol::Udp).unwrap(); - let c3 = - Candidate::server_reflexive(SOCK_ADDR3, SOCK_ADDR_IP6_BASE, Protocol::Udp).unwrap(); - let c4 = - Candidate::server_reflexive(SOCK_ADDR4, SOCK_ADDR_IP6_BASE, Protocol::Udp).unwrap(); - - assert!(set.insert(c1)); - assert!(set.insert(c2.clone())); - assert!(set.insert(c3)); - assert!(set.insert(c4.clone())); - - assert_eq!( - set.iter().cloned().collect::>(), - vec![c2, c4, host1, host2] - ); - } - - #[test] - fn allows_multiple_host_candidates_of_same_ip_base() { - let mut set = CandidateSet::default(); - - let host1 = Candidate::host(SOCK_ADDR1, Protocol::Udp).unwrap(); - let host2 = Candidate::host(SOCK_ADDR2, Protocol::Udp).unwrap(); - - assert!(set.insert(host1.clone())); - assert!(set.insert(host2.clone())); - - assert_eq!(set.iter().cloned().collect::>(), vec![host1, host2]); - } -} diff --git a/rust/connlib/snownet/src/lib.rs b/rust/connlib/snownet/src/lib.rs index 0257c75bc..b8a7bbc7e 100644 --- a/rust/connlib/snownet/src/lib.rs +++ b/rust/connlib/snownet/src/lib.rs @@ -5,7 +5,6 @@ mod allocation; mod backoff; -mod candidate_set; mod channel_data; mod index; mod node; diff --git a/rust/connlib/snownet/src/node.rs b/rust/connlib/snownet/src/node.rs index 734c03636..6c73078a0 100644 --- a/rust/connlib/snownet/src/node.rs +++ b/rust/connlib/snownet/src/node.rs @@ -1,5 +1,4 @@ use crate::allocation::{self, Allocation, RelaySocket, Socket}; -use crate::candidate_set::CandidateSet; use crate::index::IndexLfsr; use crate::stats::{ConnectionStats, NodeStats}; use crate::utils::channel_data_packet_buffer; @@ -121,8 +120,7 @@ pub struct Node { index: IndexLfsr, rate_limiter: Arc, - /// Host and server-reflexive candidates that are shared between all connections. - shared_candidates: CandidateSet, + buffered_transmits: VecDeque, next_rate_limiter_reset: Option, @@ -164,7 +162,6 @@ where mode: T::new(), index, rate_limiter: Arc::new(RateLimiter::new_at(public_key, HANDSHAKE_RATE_LIMIT, now)), - shared_candidates: Default::default(), buffered_transmits: VecDeque::default(), next_rate_limiter_reset: None, pending_events: VecDeque::default(), @@ -202,7 +199,6 @@ where self.pending_events.extend(closed_connections); - self.shared_candidates.clear(); self.connections.clear(); self.buffered_transmits.clear(); @@ -273,18 +269,13 @@ where c.state.on_upsert(cid, &mut c.agent, now); - for candidate in c.agent.local_candidates() { - signal_candidate_to_remote(cid, candidate, &mut self.pending_events); - } - - // Server-reflexive candidates are not in the local candidates of the ICE agent so those need special handling. - for candidate in self - .shared_candidates - .iter() - .filter(|c| c.kind() == CandidateKind::ServerReflexive) - { - signal_candidate_to_remote(cid, candidate, &mut self.pending_events); - } + seed_agent_with_local_candidates( + cid, + c.relay, + &mut c.agent, + &self.allocations, + &mut self.pending_events, + ); return Ok(()); } @@ -305,7 +296,13 @@ where agent.set_local_credentials(local_creds); agent.set_remote_credentials(remote_creds); - self.seed_agent_with_local_candidates(cid, selected_relay, &mut agent); + seed_agent_with_local_candidates( + cid, + selected_relay, + &mut agent, + &self.allocations, + &mut self.pending_events, + ); let connection = self.init_connection(cid, agent, remote, preshared_key, selected_relay, now, now); @@ -330,35 +327,6 @@ where (self.stats, self.connections.stats()) } - /// Add an address as a `host` candidate. - /// - /// For most network topologies, [`snownet`](crate) will automatically discover host candidates via the traffic to the configured STUN and TURN servers. - /// However, in topologies like the one below, we cannot discover that there is a more optimal link between BACKEND and DB. - /// For those situations, users need to manually add the address of the direct link in order for [`snownet`](crate) to establish a connection. - /// - /// ```text - /// ┌──────┐ ┌──────┐ - /// │ STUN ├─┐ ┌─┤ TURN │ - /// └──────┘ │ │ └──────┘ - /// │ │ - /// ┌─┴──────┴─┐ - /// ┌────────┤ WAN ├───────┐ - /// │ └──────────┘ │ - /// ┌────┴─────┐ ┌──┴───┐ - /// │ FW │ │ FW │ - /// └────┬─────┘ └──┬───┘ - /// │ ┌──┐ │ - /// ┌───┴─────┐ │ │ ┌─┴──┐ - /// │ BACKEND ├──────┤FW├─────────┤ DB │ - /// └─────────┘ │ │ └────┘ - /// └──┘ - /// ``` - pub fn add_local_host_candidate(&mut self, address: SocketAddr) -> Result<()> { - self.add_local_as_host_candidate(address)?; - - Ok(()) - } - #[tracing::instrument(level = "info", skip_all, fields(%cid))] pub fn add_remote_candidate(&mut self, cid: TId, candidate: String, now: Instant) { let candidate = match Candidate::from_sdp_string(&candidate) { @@ -438,8 +406,6 @@ where packet: &[u8], now: Instant, ) -> Result> { - self.add_local_as_host_candidate(local)?; - let (from, packet, relayed) = match self.allocations_try_handle(from, local, packet, now) { ControlFlow::Continue(c) => c, ControlFlow::Break(()) => return Ok(None), @@ -773,25 +739,6 @@ where } } - /// Attempt to add the `local` address as a host candidate. - /// - /// Receiving traffic on a certain interface means we at least have a connection to a relay via this interface. - /// Thus, it is also a viable interface to attempt a connection to a gateway. - fn add_local_as_host_candidate(&mut self, local: SocketAddr) -> Result<()> { - let host_candidate = - Candidate::host(local, Protocol::Udp).context("Failed to parse host candidate")?; - - if !self.shared_candidates.insert(host_candidate.clone()) { - return Ok(()); - } - - for (cid, agent) in self.connections.agents_mut() { - add_local_candidate(cid, agent, host_candidate.clone(), &mut self.pending_events); - } - - Ok(()) - } - /// Tries to handle the packet using one of our [`Allocation`]s. /// /// This function is in the hot-path of packet processing and thus must be as efficient as possible. @@ -981,17 +928,6 @@ where tracing::trace!(%rid, ?event); match event { - allocation::Event::New(candidate) - if candidate.kind() == CandidateKind::ServerReflexive => - { - if !self.shared_candidates.insert(candidate.clone()) { - continue; - } - - for (cid, agent) in self.connections.agents_by_relay_mut(rid) { - add_local_candidate(cid, agent, candidate.clone(), &mut self.pending_events) - } - } allocation::Event::New(candidate) => { for (cid, agent) in self.connections.agents_by_relay_mut(rid) { add_local_candidate(cid, agent, candidate.clone(), &mut self.pending_events) @@ -1098,7 +1034,13 @@ where let selected_relay = initial.relay; - self.seed_agent_with_local_candidates(cid, selected_relay, &mut agent); + seed_agent_with_local_candidates( + cid, + selected_relay, + &mut agent, + &self.allocations, + &mut self.pending_events, + ); let connection = self.init_connection( cid, @@ -1163,7 +1105,13 @@ where }; let selected_relay = self.sample_relay()?; - self.seed_agent_with_local_candidates(cid, selected_relay, &mut agent); + seed_agent_with_local_candidates( + cid, + selected_relay, + &mut agent, + &self.allocations, + &mut self.pending_events, + ); let connection = self.init_connection( cid, @@ -1184,29 +1132,27 @@ where } } -impl Node -where - TId: Eq + Hash + Copy + fmt::Display, - RId: Copy + Eq + Hash + PartialEq + Ord + fmt::Debug + fmt::Display, +fn seed_agent_with_local_candidates( + connection: TId, + selected_relay: RId, + agent: &mut IceAgent, + allocations: &BTreeMap, + pending_events: &mut VecDeque>, +) where + RId: Ord, + TId: fmt::Display + Copy, { - fn seed_agent_with_local_candidates( - &mut self, - connection: TId, - selected_relay: RId, - agent: &mut IceAgent, - ) { - for candidate in self.shared_candidates.iter().cloned() { - add_local_candidate(connection, agent, candidate, &mut self.pending_events); - } + let shared_candidates = allocations + .values() + .flat_map(|allocation| allocation.host_and_server_reflexive_candidates()) + .unique(); + let relay_candidates = allocations + .get(&selected_relay) + .into_iter() + .flat_map(|allocation| allocation.current_relay_candidates()); - let Some(allocation) = self.allocations.get(&selected_relay) else { - tracing::debug!(%selected_relay, "Cannot seed relay candidates: Unknown relay"); - return; - }; - - for candidate in allocation.current_relay_candidates() { - add_local_candidate(connection, agent, candidate, &mut self.pending_events); - } + for candidate in shared_candidates.chain(relay_candidates) { + add_local_candidate(connection, agent, candidate, pending_events); } } diff --git a/rust/connlib/tunnel/proptest-regressions/tests.txt b/rust/connlib/tunnel/proptest-regressions/tests.txt index 3d17b9ac7..f5eadd8ad 100644 --- a/rust/connlib/tunnel/proptest-regressions/tests.txt +++ b/rust/connlib/tunnel/proptest-regressions/tests.txt @@ -193,3 +193,4 @@ cc 122b860cc7c9ab8e500c057d8b3183b1b0586d0a38d5e8e7c61a77458d06a716 cc 613cba6cf1b34d88c6733b1272aae83d4f71abfdbe66d2109f890ff7d3bf6692 cc d88ebc9e70446d9f8b602032d1d68e72175630d1073ec9410810bccac31d49cf cc 0f7aea8d78b70ac939b68b97d93206c077744f677a473336d05b1217b6e042c4 +cc a065d273b793538b61647caf61edce567957b435c50b781dc5710f0a60f22efe