diff --git a/rust/connlib/snownet/src/allocation.rs b/rust/connlib/snownet/src/allocation.rs index 181926623..b4948015c 100644 --- a/rust/connlib/snownet/src/allocation.rs +++ b/rust/connlib/snownet/src/allocation.rs @@ -67,6 +67,10 @@ pub struct Allocation { software: Software, + /// If present, the IPv4 address we received traffic on. + ip4_host_candidate: Option, + /// If present, the IPv6 address we received traffic on. + ip6_host_candidate: Option, /// If present, the IPv4 address the relay observed for us. ip4_srflx_candidate: Option, /// If present, the IPv6 address the relay observed for us. @@ -221,6 +225,8 @@ impl Allocation { let mut allocation = Self { server, active_socket: None, + ip4_host_candidate: Default::default(), + ip6_host_candidate: Default::default(), ip4_srflx_candidate: Default::default(), ip6_srflx_candidate: Default::default(), ip4_allocation: Default::default(), @@ -248,6 +254,17 @@ impl Allocation { allocation } + pub fn host_and_server_reflexive_candidates(&self) -> impl Iterator + use<> { + [ + self.ip4_host_candidate.clone(), + self.ip6_host_candidate.clone(), + self.ip4_srflx_candidate.clone(), + self.ip6_srflx_candidate.clone(), + ] + .into_iter() + .flatten() + } + pub fn current_relay_candidates(&self) -> impl Iterator + use<> { [self.ip4_allocation.clone(), self.ip6_allocation.clone()] .into_iter() @@ -504,7 +521,18 @@ impl Allocation { match message.method() { BINDING => { - // First, process the binding request itself. + // First, see if we need to update our host candidate. + let current_host_candidate = match local { + SocketAddr::V4(_) => &mut self.ip4_host_candidate, + SocketAddr::V6(_) => &mut self.ip6_host_candidate, + }; + + let maybe_candidate = Candidate::host(local, Protocol::Udp).ok(); + if update_candidate(maybe_candidate, current_host_candidate, &mut self.events) { + self.log_update(now); + } + + // Second, process the binding request itself. let current_srflx_candidate = match original_dst { SocketAddr::V4(_) => &mut self.ip4_srflx_candidate, SocketAddr::V6(_) => &mut self.ip6_srflx_candidate, @@ -515,7 +543,7 @@ impl Allocation { self.log_update(now); } - // Second, check if we have already determined which socket to use for this relay. + // Third, check if we have already determined which socket to use for this relay. // We send 2 BINDING requests to start with (one for each IP version) and the first one coming back wins. // Thus, if we already have a socket set, we are done with processing this binding request. @@ -905,6 +933,8 @@ impl Allocation { fn log_update(&self, now: Instant) { tracing::info!( + host_ip4 = ?self.ip4_host_candidate.as_ref().map(|c| c.addr()), + host_ip6 = ?self.ip6_host_candidate.as_ref().map(|c| c.addr()), srflx_ip4 = ?self.ip4_srflx_candidate.as_ref().map(|c| c.addr()), srflx_ip6 = ?self.ip6_srflx_candidate.as_ref().map(|c| c.addr()), relay_ip4 = ?self.ip4_allocation.as_ref().map(|c| c.addr()), @@ -2113,6 +2143,11 @@ mod tests { Instant::now(), ); + let next_event = allocation.poll_event(); + assert_eq!( + next_event, + Some(Event::New(Candidate::host(PEER1, Protocol::Udp).unwrap())) + ); let next_event = allocation.poll_event(); assert_eq!( next_event, @@ -2626,9 +2661,11 @@ mod tests { assert_eq!( events, vec![ + Event::New(Candidate::host(PEER2_IP4, Protocol::Udp).unwrap()), Event::New( Candidate::server_reflexive(PEER2_IP4, PEER2_IP4, Protocol::Udp).unwrap() ), + Event::New(Candidate::host(PEER2_IP6, Protocol::Udp).unwrap()), Event::New( Candidate::server_reflexive(PEER2_IP6, PEER2_IP6, Protocol::Udp).unwrap() ) diff --git a/rust/connlib/snownet/src/candidate_set.rs b/rust/connlib/snownet/src/candidate_set.rs deleted file mode 100644 index 1b57aed86..000000000 --- a/rust/connlib/snownet/src/candidate_set.rs +++ /dev/null @@ -1,129 +0,0 @@ -use std::collections::HashSet; - -use itertools::Itertools; -use str0m::{Candidate, CandidateKind}; - -/// Custom "set" implementation for [`Candidate`]s based on a [`HashSet`] with an enforced ordering when iterating. -/// -/// The set only allows host and server-reflexive candidates as only those need to be de-duplicated in order to avoid -/// spamming the remote with duplicate candidates. -#[derive(Debug, Default)] -pub struct CandidateSet { - host: HashSet, - server_reflexive: HashSet, -} - -impl CandidateSet { - #[expect( - clippy::disallowed_methods, - reason = "We don't care about the ordering." - )] - pub fn insert(&mut self, new: Candidate) -> bool { - match new.kind() { - CandidateKind::PeerReflexive | CandidateKind::Relayed => { - debug_assert!(false); - tracing::warn!( - "CandidateSet is not meant to be used with candidates of kind {}", - new.kind() - ); - - false - } - CandidateKind::Host => self.host.insert(new), - CandidateKind::ServerReflexive => { - // Hashing a `Candidate` takes longer than checking a handful of entries using their `PartialEq` implementation. - // This function is in the hot-path so it needs to be fast ... - if self.server_reflexive.iter().any(|c| c == &new) { - return false; - } - - self.server_reflexive.retain(|current| { - let is_ip_version_different = current.addr().is_ipv4() != new.addr().is_ipv4(); - - if !is_ip_version_different { - tracing::debug!(%current, %new, "Replacing server-reflexive candidate"); - } - - // Candidates of different IP version are also kept. - is_ip_version_different - }); - - self.server_reflexive.insert(new) - } - } - } - - pub fn clear(&mut self) { - self.host.clear(); - self.server_reflexive.clear(); - } - - #[expect( - clippy::disallowed_methods, - reason = "We are guaranteeing a stable ordering" - )] - pub fn iter(&self) -> impl Iterator { - std::iter::empty() - .chain(self.host.iter()) - .chain(self.server_reflexive.iter()) - .sorted_by(|l, r| l.prio().cmp(&r.prio()).then(l.addr().cmp(&r.addr()))) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; - use str0m::net::Protocol; - - const SOCK_ADDR_IP4_BASE: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 10); - const SOCK_ADDR1: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1234); - const SOCK_ADDR2: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5678); - - const SOCK_ADDR_IP6_BASE: SocketAddr = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 10); - const SOCK_ADDR3: SocketAddr = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 1234); - const SOCK_ADDR4: SocketAddr = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5678); - - #[test] - fn only_allows_one_server_reflexive_candidate_per_ip_family() { - let mut set = CandidateSet::default(); - - let host1 = Candidate::host(SOCK_ADDR_IP4_BASE, Protocol::Udp).unwrap(); - let host2 = Candidate::host(SOCK_ADDR_IP6_BASE, Protocol::Udp).unwrap(); - - assert!(set.insert(host1.clone())); - assert!(set.insert(host2.clone())); - - let c1 = - Candidate::server_reflexive(SOCK_ADDR1, SOCK_ADDR_IP4_BASE, Protocol::Udp).unwrap(); - let c2 = - Candidate::server_reflexive(SOCK_ADDR2, SOCK_ADDR_IP4_BASE, Protocol::Udp).unwrap(); - let c3 = - Candidate::server_reflexive(SOCK_ADDR3, SOCK_ADDR_IP6_BASE, Protocol::Udp).unwrap(); - let c4 = - Candidate::server_reflexive(SOCK_ADDR4, SOCK_ADDR_IP6_BASE, Protocol::Udp).unwrap(); - - assert!(set.insert(c1)); - assert!(set.insert(c2.clone())); - assert!(set.insert(c3)); - assert!(set.insert(c4.clone())); - - assert_eq!( - set.iter().cloned().collect::>(), - vec![c2, c4, host1, host2] - ); - } - - #[test] - fn allows_multiple_host_candidates_of_same_ip_base() { - let mut set = CandidateSet::default(); - - let host1 = Candidate::host(SOCK_ADDR1, Protocol::Udp).unwrap(); - let host2 = Candidate::host(SOCK_ADDR2, Protocol::Udp).unwrap(); - - assert!(set.insert(host1.clone())); - assert!(set.insert(host2.clone())); - - assert_eq!(set.iter().cloned().collect::>(), vec![host1, host2]); - } -} diff --git a/rust/connlib/snownet/src/lib.rs b/rust/connlib/snownet/src/lib.rs index 0257c75bc..b8a7bbc7e 100644 --- a/rust/connlib/snownet/src/lib.rs +++ b/rust/connlib/snownet/src/lib.rs @@ -5,7 +5,6 @@ mod allocation; mod backoff; -mod candidate_set; mod channel_data; mod index; mod node; diff --git a/rust/connlib/snownet/src/node.rs b/rust/connlib/snownet/src/node.rs index 734c03636..6c73078a0 100644 --- a/rust/connlib/snownet/src/node.rs +++ b/rust/connlib/snownet/src/node.rs @@ -1,5 +1,4 @@ use crate::allocation::{self, Allocation, RelaySocket, Socket}; -use crate::candidate_set::CandidateSet; use crate::index::IndexLfsr; use crate::stats::{ConnectionStats, NodeStats}; use crate::utils::channel_data_packet_buffer; @@ -121,8 +120,7 @@ pub struct Node { index: IndexLfsr, rate_limiter: Arc, - /// Host and server-reflexive candidates that are shared between all connections. - shared_candidates: CandidateSet, + buffered_transmits: VecDeque, next_rate_limiter_reset: Option, @@ -164,7 +162,6 @@ where mode: T::new(), index, rate_limiter: Arc::new(RateLimiter::new_at(public_key, HANDSHAKE_RATE_LIMIT, now)), - shared_candidates: Default::default(), buffered_transmits: VecDeque::default(), next_rate_limiter_reset: None, pending_events: VecDeque::default(), @@ -202,7 +199,6 @@ where self.pending_events.extend(closed_connections); - self.shared_candidates.clear(); self.connections.clear(); self.buffered_transmits.clear(); @@ -273,18 +269,13 @@ where c.state.on_upsert(cid, &mut c.agent, now); - for candidate in c.agent.local_candidates() { - signal_candidate_to_remote(cid, candidate, &mut self.pending_events); - } - - // Server-reflexive candidates are not in the local candidates of the ICE agent so those need special handling. - for candidate in self - .shared_candidates - .iter() - .filter(|c| c.kind() == CandidateKind::ServerReflexive) - { - signal_candidate_to_remote(cid, candidate, &mut self.pending_events); - } + seed_agent_with_local_candidates( + cid, + c.relay, + &mut c.agent, + &self.allocations, + &mut self.pending_events, + ); return Ok(()); } @@ -305,7 +296,13 @@ where agent.set_local_credentials(local_creds); agent.set_remote_credentials(remote_creds); - self.seed_agent_with_local_candidates(cid, selected_relay, &mut agent); + seed_agent_with_local_candidates( + cid, + selected_relay, + &mut agent, + &self.allocations, + &mut self.pending_events, + ); let connection = self.init_connection(cid, agent, remote, preshared_key, selected_relay, now, now); @@ -330,35 +327,6 @@ where (self.stats, self.connections.stats()) } - /// Add an address as a `host` candidate. - /// - /// For most network topologies, [`snownet`](crate) will automatically discover host candidates via the traffic to the configured STUN and TURN servers. - /// However, in topologies like the one below, we cannot discover that there is a more optimal link between BACKEND and DB. - /// For those situations, users need to manually add the address of the direct link in order for [`snownet`](crate) to establish a connection. - /// - /// ```text - /// ┌──────┐ ┌──────┐ - /// │ STUN ├─┐ ┌─┤ TURN │ - /// └──────┘ │ │ └──────┘ - /// │ │ - /// ┌─┴──────┴─┐ - /// ┌────────┤ WAN ├───────┐ - /// │ └──────────┘ │ - /// ┌────┴─────┐ ┌──┴───┐ - /// │ FW │ │ FW │ - /// └────┬─────┘ └──┬───┘ - /// │ ┌──┐ │ - /// ┌───┴─────┐ │ │ ┌─┴──┐ - /// │ BACKEND ├──────┤FW├─────────┤ DB │ - /// └─────────┘ │ │ └────┘ - /// └──┘ - /// ``` - pub fn add_local_host_candidate(&mut self, address: SocketAddr) -> Result<()> { - self.add_local_as_host_candidate(address)?; - - Ok(()) - } - #[tracing::instrument(level = "info", skip_all, fields(%cid))] pub fn add_remote_candidate(&mut self, cid: TId, candidate: String, now: Instant) { let candidate = match Candidate::from_sdp_string(&candidate) { @@ -438,8 +406,6 @@ where packet: &[u8], now: Instant, ) -> Result> { - self.add_local_as_host_candidate(local)?; - let (from, packet, relayed) = match self.allocations_try_handle(from, local, packet, now) { ControlFlow::Continue(c) => c, ControlFlow::Break(()) => return Ok(None), @@ -773,25 +739,6 @@ where } } - /// Attempt to add the `local` address as a host candidate. - /// - /// Receiving traffic on a certain interface means we at least have a connection to a relay via this interface. - /// Thus, it is also a viable interface to attempt a connection to a gateway. - fn add_local_as_host_candidate(&mut self, local: SocketAddr) -> Result<()> { - let host_candidate = - Candidate::host(local, Protocol::Udp).context("Failed to parse host candidate")?; - - if !self.shared_candidates.insert(host_candidate.clone()) { - return Ok(()); - } - - for (cid, agent) in self.connections.agents_mut() { - add_local_candidate(cid, agent, host_candidate.clone(), &mut self.pending_events); - } - - Ok(()) - } - /// Tries to handle the packet using one of our [`Allocation`]s. /// /// This function is in the hot-path of packet processing and thus must be as efficient as possible. @@ -981,17 +928,6 @@ where tracing::trace!(%rid, ?event); match event { - allocation::Event::New(candidate) - if candidate.kind() == CandidateKind::ServerReflexive => - { - if !self.shared_candidates.insert(candidate.clone()) { - continue; - } - - for (cid, agent) in self.connections.agents_by_relay_mut(rid) { - add_local_candidate(cid, agent, candidate.clone(), &mut self.pending_events) - } - } allocation::Event::New(candidate) => { for (cid, agent) in self.connections.agents_by_relay_mut(rid) { add_local_candidate(cid, agent, candidate.clone(), &mut self.pending_events) @@ -1098,7 +1034,13 @@ where let selected_relay = initial.relay; - self.seed_agent_with_local_candidates(cid, selected_relay, &mut agent); + seed_agent_with_local_candidates( + cid, + selected_relay, + &mut agent, + &self.allocations, + &mut self.pending_events, + ); let connection = self.init_connection( cid, @@ -1163,7 +1105,13 @@ where }; let selected_relay = self.sample_relay()?; - self.seed_agent_with_local_candidates(cid, selected_relay, &mut agent); + seed_agent_with_local_candidates( + cid, + selected_relay, + &mut agent, + &self.allocations, + &mut self.pending_events, + ); let connection = self.init_connection( cid, @@ -1184,29 +1132,27 @@ where } } -impl Node -where - TId: Eq + Hash + Copy + fmt::Display, - RId: Copy + Eq + Hash + PartialEq + Ord + fmt::Debug + fmt::Display, +fn seed_agent_with_local_candidates( + connection: TId, + selected_relay: RId, + agent: &mut IceAgent, + allocations: &BTreeMap, + pending_events: &mut VecDeque>, +) where + RId: Ord, + TId: fmt::Display + Copy, { - fn seed_agent_with_local_candidates( - &mut self, - connection: TId, - selected_relay: RId, - agent: &mut IceAgent, - ) { - for candidate in self.shared_candidates.iter().cloned() { - add_local_candidate(connection, agent, candidate, &mut self.pending_events); - } + let shared_candidates = allocations + .values() + .flat_map(|allocation| allocation.host_and_server_reflexive_candidates()) + .unique(); + let relay_candidates = allocations + .get(&selected_relay) + .into_iter() + .flat_map(|allocation| allocation.current_relay_candidates()); - let Some(allocation) = self.allocations.get(&selected_relay) else { - tracing::debug!(%selected_relay, "Cannot seed relay candidates: Unknown relay"); - return; - }; - - for candidate in allocation.current_relay_candidates() { - add_local_candidate(connection, agent, candidate, &mut self.pending_events); - } + for candidate in shared_candidates.chain(relay_candidates) { + add_local_candidate(connection, agent, candidate, pending_events); } } diff --git a/rust/connlib/tunnel/proptest-regressions/tests.txt b/rust/connlib/tunnel/proptest-regressions/tests.txt index 3d17b9ac7..f5eadd8ad 100644 --- a/rust/connlib/tunnel/proptest-regressions/tests.txt +++ b/rust/connlib/tunnel/proptest-regressions/tests.txt @@ -193,3 +193,4 @@ cc 122b860cc7c9ab8e500c057d8b3183b1b0586d0a38d5e8e7c61a77458d06a716 cc 613cba6cf1b34d88c6733b1272aae83d4f71abfdbe66d2109f890ff7d3bf6692 cc d88ebc9e70446d9f8b602032d1d68e72175630d1073ec9410810bccac31d49cf cc 0f7aea8d78b70ac939b68b97d93206c077744f677a473336d05b1217b6e042c4 +cc a065d273b793538b61647caf61edce567957b435c50b781dc5710f0a60f22efe