diff --git a/rust/connlib/snownet/src/node.rs b/rust/connlib/snownet/src/node.rs index 250114b88..22f6f1da9 100644 --- a/rust/connlib/snownet/src/node.rs +++ b/rust/connlib/snownet/src/node.rs @@ -278,12 +278,19 @@ where c.state.on_upsert(cid, &mut c.agent, now); - seed_agent_with_local_candidates( - cid, - c.relay.id, - &mut c.agent, - &self.allocations, - &mut self.pending_events, + // Take all current candidates. + let current_candidates = c.agent.local_candidates().to_vec(); + + // Re-seed connection with all candidates. + let new_candidates = + seed_agent_with_local_candidates(c.relay.id, &mut c.agent, &self.allocations); + + // Tell the remote about all of them. + self.pending_events.extend( + std::iter::empty() + .chain(current_candidates) + .chain(new_candidates) + .map(|candidate| new_ice_candidate_event(cid, candidate)), ); // Initiate a new WG session. @@ -317,12 +324,13 @@ where agent.set_local_credentials(local_creds); agent.set_remote_credentials(remote_creds); - seed_agent_with_local_candidates( - cid, - selected_relay, - &mut agent, - &self.allocations, - &mut self.pending_events, + self.pending_events.extend( + self.allocations + .candidates_for_relay(&selected_relay) + .filter_map(|candidate| { + add_local_candidate(&mut agent, candidate) + .map(|c| new_ice_candidate_event(cid, c)) + }), ); let connection = self.init_connection( @@ -1001,12 +1009,11 @@ where for (cid, agent, maybe_state) in self.connections.agents_and_state_by_relay_mut(rid) { - add_local_candidate( - cid, - agent, - candidate.clone(), - &mut self.pending_events, - ); + if let Some(candidate) = add_local_candidate(agent, candidate.clone()) { + self.pending_events + .push_back(new_ice_candidate_event(cid, candidate)); + } + if let Some(state) = maybe_state { state.on_candidate(cid, agent, now); } @@ -1110,13 +1117,12 @@ where let selected_relay = initial.relay; - seed_agent_with_local_candidates( - cid, - selected_relay, - &mut agent, - &self.allocations, - &mut self.pending_events, - ); + for candidate in + seed_agent_with_local_candidates(selected_relay, &mut agent, &self.allocations) + { + self.pending_events + .push_back(new_ice_candidate_event(cid, candidate)); + } let index = self.index.next(); let connection = self.init_connection( @@ -1182,13 +1188,13 @@ where }; let selected_relay = self.sample_relay()?; - seed_agent_with_local_candidates( - cid, - selected_relay, - &mut agent, - &self.allocations, - &mut self.pending_events, - ); + + for candidate in + seed_agent_with_local_candidates(selected_relay, &mut agent, &self.allocations) + { + self.pending_events + .push_back(new_ice_candidate_event(cid, candidate)); + } let index = self.index.next(); let connection = self.init_connection( @@ -1211,30 +1217,18 @@ where } } -fn seed_agent_with_local_candidates( - connection: TId, +/// Seeds the agent with all local candidates, returning an iterator of all candidates that should be signalled to the remote. +fn seed_agent_with_local_candidates<'a, RId>( selected_relay: RId, - agent: &mut IceAgent, + agent: &'a mut IceAgent, allocations: &Allocations, - pending_events: &mut VecDeque>, -) where +) -> impl Iterator + use<'a, RId> +where RId: Ord + fmt::Display + Copy, - TId: fmt::Display + Copy, { - let shared_candidates = allocations.shared_candidates(); - let relay_candidates = allocations - .get_by_id(&selected_relay) - .into_iter() - .flat_map(|allocation| allocation.current_relay_candidates()); - - // Candidates with a higher priority are better, therefore: Reverse the ordering by priority. - for candidate in shared_candidates - .chain(relay_candidates) - .sorted_by_key(|c| c.prio()) - .rev() - { - add_local_candidate(connection, agent, candidate, pending_events); - } + allocations + .candidates_for_relay(&selected_relay) + .filter_map(move |c| add_local_candidate(agent, c)) } /// Generate optimistic candidates based on the ones we have already received. @@ -1299,38 +1293,25 @@ fn generate_optimistic_candidates(agent: &mut IceAgent) { } } -fn add_local_candidate( - id: TId, - agent: &mut IceAgent, - candidate: Candidate, - pending_events: &mut VecDeque>, -) where - TId: fmt::Display, -{ +/// Attempts to add the candidate to the agent, returning back the candidate if it should be signalled to the remote. +fn add_local_candidate(agent: &mut IceAgent, candidate: Candidate) -> Option { // srflx candidates don't need to be added to the local agent because we always send from the `base` anyway. if candidate.kind() == CandidateKind::ServerReflexive { - signal_candidate_to_remote(id, &candidate, pending_events); - return; + return Some(candidate); } - let Some(candidate) = agent.add_local_candidate(candidate) else { - return; - }; + let candidate = agent.add_local_candidate(candidate)?; - signal_candidate_to_remote(id, candidate, pending_events); + Some(candidate.clone()) } -fn signal_candidate_to_remote( - id: TId, - candidate: &Candidate, - pending_events: &mut VecDeque>, -) { +fn new_ice_candidate_event(id: TId, candidate: Candidate) -> Event { tracing::debug!(?candidate, "Signalling candidate to remote"); - pending_events.push_back(Event::NewIceCandidate { + Event::NewIceCandidate { connection: id, - candidate: candidate.clone(), - }) + candidate, + } } fn invalidate_allocation_candidates( diff --git a/rust/connlib/snownet/src/node/allocations.rs b/rust/connlib/snownet/src/node/allocations.rs index 0704f39e6..90e98c7f6 100644 --- a/rust/connlib/snownet/src/node/allocations.rs +++ b/rust/connlib/snownet/src/node/allocations.rs @@ -74,6 +74,23 @@ where .unwrap_or(MutAllocationRef::Unknown) } + pub(crate) fn candidates_for_relay( + &self, + id: &RId, + ) -> impl Iterator + use { + let shared_candidates = self.shared_candidates(); + let relay_candidates = self + .get_by_id(id) + .into_iter() + .flat_map(|allocation| allocation.current_relay_candidates()); + + // Candidates with a higher priority are better, therefore: Reverse the ordering by priority. + shared_candidates + .chain(relay_candidates) + .sorted_by_key(|c| c.prio()) + .rev() + } + pub(crate) fn iter_mut(&mut self) -> impl Iterator { self.inner.iter_mut() } @@ -144,13 +161,6 @@ where Some((*id, a)) } - pub(crate) fn shared_candidates(&self) -> impl Iterator { - self.inner - .values() - .flat_map(|allocation| allocation.host_and_server_reflexive_candidates()) - .unique() - } - pub(crate) fn poll_timeout(&mut self) -> Option<(Instant, &'static str)> { self.inner .values_mut() @@ -192,6 +202,13 @@ where None => true, }); } + + fn shared_candidates(&self) -> impl Iterator { + self.inner + .values() + .flat_map(|allocation| allocation.host_and_server_reflexive_candidates()) + .unique() + } } pub(crate) enum MutAllocationRef<'a, RId> { diff --git a/rust/connlib/snownet/src/node/connections.rs b/rust/connlib/snownet/src/node/connections.rs index f0b446df2..6204f4016 100644 --- a/rust/connlib/snownet/src/node/connections.rs +++ b/rust/connlib/snownet/src/node/connections.rs @@ -15,7 +15,7 @@ use crate::{ ConnectionStats, Event, node::{ Connection, ConnectionState, InitialConnection, add_local_candidate, - allocations::Allocations, + allocations::Allocations, new_ice_candidate_event, }, }; @@ -138,10 +138,14 @@ where c.relay.id = rid; - for candidate in allocation.current_relay_candidates() { - add_local_candidate(cid, &mut c.agent, candidate, pending_events); - c.state.on_candidate(cid, &mut c.agent, now); + for candidate in allocation + .current_relay_candidates() + .filter_map(|candidate| add_local_candidate(&mut c.agent, candidate)) + { + pending_events.push_back(new_ice_candidate_event(cid, candidate)); } + + c.state.on_candidate(cid, &mut c.agent, now); } } diff --git a/rust/connlib/tunnel/proptest-regressions/tests.txt b/rust/connlib/tunnel/proptest-regressions/tests.txt index f971c371f..4094c361b 100644 --- a/rust/connlib/tunnel/proptest-regressions/tests.txt +++ b/rust/connlib/tunnel/proptest-regressions/tests.txt @@ -251,3 +251,5 @@ cc cd931decdb174867cc8067e66287a7fd6fe85f0490c25a408431f405987ec5e4 cc 208cbcc2338fd8f325abc5be770c51b97296729bf09bd4472be5734789c1d37b cc 6c01d07f2820e879d267944e063a55d0f4f07777a925a532f9da312b9b398db1 cc 9fac3117f02193b4b1dfbbfd02fa6062c8f62eb5c94d36d9bd9c694dcda5ddab +cc 659d967c73dae7e617a63ff86978636ef8aac193b70236adb823354067523629 +cc 1c531f3260ae1f80cf5ab4fb8c0cef5dd1d468ed4da1ff7170a12d203b5546a5 diff --git a/rust/connlib/tunnel/src/tests/sim_client.rs b/rust/connlib/tunnel/src/tests/sim_client.rs index 5aaf0a8c6..c0e1a8b67 100644 --- a/rust/connlib/tunnel/src/tests/sim_client.rs +++ b/rust/connlib/tunnel/src/tests/sim_client.rs @@ -643,17 +643,6 @@ impl RefClient { for status in self.site_status.values_mut() { *status = ResourceStatus::Unknown; } - - // TCP connections will automatically re-create connections to Gateways. - for ((_, dst, _, _), r) in self - .expected_tcp_connections - .clone() - .into_iter() - .collect::>() - { - self.connect_to_resource(r, dst); - self.set_resource_online(r); - } } pub(crate) fn add_internet_resource(&mut self, resource: InternetResource) {