diff --git a/rust/Cargo.lock b/rust/Cargo.lock index ea5e77b9f..e0dd25593 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -6898,6 +6898,7 @@ dependencies = [ "hex", "hex-display", "ip-packet", + "itertools 0.13.0", "once_cell", "rand 0.8.5", "secrecy", diff --git a/rust/connlib/snownet/Cargo.toml b/rust/connlib/snownet/Cargo.toml index c8181d5c0..733167b8b 100644 --- a/rust/connlib/snownet/Cargo.toml +++ b/rust/connlib/snownet/Cargo.toml @@ -11,6 +11,7 @@ bytes = "1.7.1" hex = "0.4.0" hex-display = "0.3.0" ip-packet = { workspace = true } +itertools = "0.13" once_cell = "1.17.1" rand = "0.8" secrecy = { workspace = true } diff --git a/rust/connlib/snownet/src/node.rs b/rust/connlib/snownet/src/node.rs index 583c18330..363ccd5f6 100644 --- a/rust/connlib/snownet/src/node.rs +++ b/rust/connlib/snownet/src/node.rs @@ -10,6 +10,7 @@ use boringtun::{noise::rate_limiter::RateLimiter, x25519::StaticSecret}; use core::fmt; use hex_display::HexDisplayExt; use ip_packet::{ConvertibleIpv4Packet, ConvertibleIpv6Packet, IpPacket, IpPacketBuf}; +use itertools::Itertools as _; use rand::rngs::StdRng; use rand::seq::IteratorRandom; use rand::{random, SeedableRng}; @@ -233,8 +234,8 @@ where } }; - let Some(agent) = self.connections.agent_mut(cid) else { - tracing::debug!("Unknown connection"); + let Some((agent, relay)) = self.connections.connecting_agent_mut(cid) else { + tracing::debug!("Unknown connection or socket has already been nominated"); return; }; @@ -251,7 +252,7 @@ where | CandidateKind::PeerReflexive => {} } - let Some(rid) = self.connections.relay(cid) else { + let Some(rid) = relay else { tracing::debug!("No relay selected for connection"); return; }; @@ -572,10 +573,10 @@ where signalling_completed_at: now, remote_pub_key: remote, state: ConnectionState::Connecting { + relay, buffered: RingBuffer::new(10), }, possible_sockets: BTreeSet::default(), - relay, span: info_span!(parent: tracing::Span::none(), "connection", %cid), } } @@ -749,13 +750,18 @@ where for (rid, event) in allocation_events { match event { + CandidateEvent::New(candidate) + if candidate.kind() == CandidateKind::ServerReflexive => + { + for (cid, agent, _span) in self.connections.agents_mut() { + add_local_candidate(cid, agent, candidate.clone(), &mut self.pending_events) + } + } CandidateEvent::New(candidate) => { - add_local_candidate_to_all( - rid, - candidate, - &mut self.connections, - &mut self.pending_events, - ); + for (cid, agent, _span) in self.connections.connecting_agents_by_relay_mut(rid) + { + add_local_candidate(cid, agent, candidate.clone(), &mut self.pending_events) + } } CandidateEvent::Invalid(candidate) => { for (cid, agent, _span) in self.connections.agents_mut() { @@ -933,7 +939,7 @@ where impl Node where TId: Eq + Hash + Copy + fmt::Display, - RId: Copy + Eq + Hash + PartialEq + fmt::Debug + fmt::Display, + RId: Copy + Eq + Hash + PartialEq + Ord + fmt::Debug + fmt::Display, { fn seed_agent_with_local_candidates( &mut self, @@ -945,23 +951,31 @@ where add_local_candidate(connection, agent, candidate, &mut self.pending_events); } + for candidate in self + .allocations + .values() + .flat_map(|a| a.current_candidates()) + .filter(|c| c.kind() == CandidateKind::ServerReflexive) + .unique() + { + add_local_candidate(connection, agent, candidate, &mut self.pending_events); + } + let Some(selected_relay) = selected_relay else { tracing::debug!("Skipping seeding of relay candidates: No relay selected"); return; }; - for candidate in self - .allocations - .iter() - .filter_map(|(rid, allocation)| (*rid == selected_relay).then_some(allocation)) - .flat_map(|allocation| allocation.current_candidates()) + let Some(allocation) = self.allocations.get(&selected_relay) else { + tracing::debug!(%selected_relay, "Cannot seed relay candidates: Unknown relay"); + return; + }; + + for candidate in allocation + .current_candidates() + .filter(|c| c.kind() == CandidateKind::Relayed) { - add_local_candidate( - connection, - agent, - candidate.clone(), - &mut self.pending_events, - ); + add_local_candidate(connection, agent, candidate, &mut self.pending_events); } } } @@ -1016,11 +1030,37 @@ where maybe_initial_connection.or(maybe_established_connection) } - fn relay(&mut self, id: TId) -> Option { - let maybe_initial_connection = self.initial.get_mut(&id).and_then(|i| i.relay); - let maybe_established_connection = self.established.get_mut(&id).and_then(|c| c.relay); + fn connecting_agent_mut(&mut self, id: TId) -> Option<(&mut IceAgent, Option)> { + let maybe_initial_connection = self.initial.get_mut(&id).map(|i| (&mut i.agent, i.relay)); + let maybe_pending_connection = self.established.get_mut(&id).and_then(|c| match c.state { + ConnectionState::Connecting { relay, .. } => Some((&mut c.agent, relay)), + ConnectionState::Failed + | ConnectionState::Idle { .. } + | ConnectionState::Connected { .. } => None, + }); - maybe_initial_connection.or(maybe_established_connection) + maybe_initial_connection.or(maybe_pending_connection) + } + + fn connecting_agents_by_relay_mut( + &mut self, + id: RId, + ) -> impl Iterator)> + '_ { + let initial_connections = self.initial.iter_mut().filter_map(move |(cid, i)| { + (i.relay? == id).then_some((*cid, &mut i.agent, i.span.enter())) + }); + let pending_connections = self.established.iter_mut().filter_map(move |(cid, c)| { + use ConnectionState::*; + + match c.state { + Connecting { + relay: Some(relay), .. + } if relay == id => Some((*cid, &mut c.agent, c.span.enter())), + Failed | Idle { .. } | Connecting { .. } | Connected { .. } => None, + } + }); + + initial_connections.chain(pending_connections) } fn agents_mut( @@ -1099,34 +1139,6 @@ enum EncodeError { NoChannel, } -fn add_local_candidate_to_all( - rid: RId, - candidate: Candidate, - connections: &mut Connections, - pending_events: &mut VecDeque>, -) where - TId: Copy + fmt::Display, - RId: Copy + PartialEq, -{ - let initial_connections = connections - .initial - .iter_mut() - .flat_map(|(id, c)| Some((*id, &mut c.agent, c.relay?))); - let established_connections = connections - .established - .iter_mut() - .flat_map(|(id, c)| Some((*id, &mut c.agent, c.relay?))); - - for (cid, agent, _) in initial_connections - .chain(established_connections) - .filter(|(_, _, selected_relay)| *selected_relay == rid) - { - let _span = info_span!("connection", %cid).entered(); - - add_local_candidate(cid, agent, candidate.clone(), pending_events); - } -} - fn add_local_candidate( id: TId, agent: &mut IceAgent, @@ -1362,11 +1374,6 @@ struct Connection { /// Socket addresses from which we might receive data (even before we are connected). possible_sockets: BTreeSet, - /// The relay we have selected for this connection. - /// - /// `None` if we didn't have any relays available. - relay: Option, - stats: ConnectionStats, intent_sent_at: Instant, signalling_completed_at: Instant, @@ -1379,6 +1386,11 @@ struct Connection { enum ConnectionState { /// We are still running ICE to figure out, which socket to use to send data. Connecting { + /// The relay we have selected for this connection. + /// + /// `None` if we didn't have any relays available. + relay: Option, + /// Packets emitted by wireguard whilst are still running ICE. /// /// This can happen if the remote's WG session initiation arrives at our socket before we nominate it.