diff --git a/rust/connlib/clients/shared/src/eventloop.rs b/rust/connlib/clients/shared/src/eventloop.rs index 7e5e4ccd4..32b12fd64 100644 --- a/rust/connlib/clients/shared/src/eventloop.rs +++ b/rust/connlib/clients/shared/src/eventloop.rs @@ -99,31 +99,31 @@ where fn handle_tunnel_event(&mut self, event: firezone_tunnel::ClientEvent) { match event { - firezone_tunnel::ClientEvent::NewIceCandidate { + firezone_tunnel::ClientEvent::AddedIceCandidates { conn_id: gateway, - candidate, + candidates, } => { - tracing::debug!(%gateway, %candidate, "Sending new ICE candidate to gateway"); + tracing::debug!(%gateway, ?candidates, "Sending new ICE candidates to gateway"); self.portal.send( PHOENIX_TOPIC, EgressMessages::BroadcastIceCandidates(GatewaysIceCandidates { gateway_ids: vec![gateway], - candidates: vec![candidate], + candidates, }), ); } - firezone_tunnel::ClientEvent::InvalidatedIceCandidate { + firezone_tunnel::ClientEvent::RemovedIceCandidates { conn_id: gateway, - candidate, + candidates, } => { - tracing::debug!(%gateway, %candidate, "Sending invalidated ICE candidate to gateway"); + tracing::debug!(%gateway, ?candidates, "Sending invalidated ICE candidates to gateway"); self.portal.send( PHOENIX_TOPIC, EgressMessages::BroadcastInvalidatedIceCandidates(GatewaysIceCandidates { gateway_ids: vec![gateway], - candidates: vec![candidate], + candidates, }), ); } diff --git a/rust/connlib/clients/shared/src/messages.rs b/rust/connlib/clients/shared/src/messages.rs index 6e74e2acd..9b2bc9270 100644 --- a/rust/connlib/clients/shared/src/messages.rs +++ b/rust/connlib/clients/shared/src/messages.rs @@ -62,7 +62,7 @@ pub struct GatewaysIceCandidates { /// The list of gateway IDs these candidates will be broadcast to. pub gateway_ids: Vec, /// Actual RTC ice candidates - pub candidates: Vec, + pub candidates: HashSet, } #[derive(Debug, Deserialize, Serialize, Clone, PartialEq)] @@ -118,10 +118,10 @@ mod test { "client", EgressMessages::BroadcastIceCandidates(GatewaysIceCandidates { gateway_ids: vec!["b3d34a15-55ab-40df-994b-a838e75d65d7".parse().unwrap()], - candidates: vec![ + candidates: HashSet::from([ "candidate:7031633958891736544 1 udp 50331391 35.244.108.190 53909 typ relay" .to_owned(), - ], + ]), }), Some(OutboundRequestId::for_test(6)), ); diff --git a/rust/connlib/tunnel/src/client.rs b/rust/connlib/tunnel/src/client.rs index a47f500e5..c53860db3 100644 --- a/rust/connlib/tunnel/src/client.rs +++ b/rust/connlib/tunnel/src/client.rs @@ -813,6 +813,8 @@ impl ClientState { self.mangled_dns_queries.retain(|_, exp| now < *exp); let mut resources_changed = false; // Track this separately to batch together `ResourcesChanged` events. + let mut added_ice_candidates = HashMap::>::default(); + let mut removed_ice_candidates = HashMap::>::default(); while let Some(event) = self.node.poll_event() { match event { @@ -823,21 +825,21 @@ impl ClientState { snownet::Event::NewIceCandidate { connection, candidate, - } => self - .buffered_events - .push_back(ClientEvent::NewIceCandidate { - conn_id: connection, - candidate, - }), + } => { + added_ice_candidates + .entry(connection) + .or_default() + .insert(candidate); + } snownet::Event::InvalidateIceCandidate { connection, candidate, - } => self - .buffered_events - .push_back(ClientEvent::InvalidatedIceCandidate { - conn_id: connection, - candidate, - }), + } => { + removed_ice_candidates + .entry(connection) + .or_default() + .insert(candidate); + } snownet::Event::ConnectionEstablished(id) => { self.update_site_status_by_gateway(&id, Status::Online); resources_changed = true; @@ -851,6 +853,22 @@ impl ClientState { resources: self.resources(), }); } + + for (conn_id, candidates) in added_ice_candidates.drain() { + self.buffered_events + .push_back(ClientEvent::AddedIceCandidates { + conn_id, + candidates, + }) + } + + for (conn_id, candidates) in removed_ice_candidates.drain() { + self.buffered_events + .push_back(ClientEvent::RemovedIceCandidates { + conn_id, + candidates, + }) + } } fn update_site_status_by_gateway(&mut self, gateway_id: &GatewayId, status: Status) { diff --git a/rust/connlib/tunnel/src/gateway.rs b/rust/connlib/tunnel/src/gateway.rs index 6f7415f19..146eec381 100644 --- a/rust/connlib/tunnel/src/gateway.rs +++ b/rust/connlib/tunnel/src/gateway.rs @@ -12,7 +12,7 @@ use connlib_shared::{Callbacks, DomainName, Error, Result, StaticSecret}; use ip_packet::{IpPacket, MutableIpPacket}; use secrecy::{ExposeSecret as _, Secret}; use snownet::{RelaySocket, ServerNode}; -use std::collections::{HashSet, VecDeque}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::time::{Duration, Instant}; @@ -351,6 +351,9 @@ impl GatewayState { Some(_) => {} } + let mut added_ice_candidates = HashMap::>::default(); + let mut removed_ice_candidates = HashMap::>::default(); + while let Some(event) = self.node.poll_event() { match event { snownet::Event::ConnectionFailed(id) | snownet::Event::ConnectionClosed(id) => { @@ -360,25 +363,39 @@ impl GatewayState { connection, candidate, } => { - self.buffered_events - .push_back(GatewayEvent::NewIceCandidate { - conn_id: connection, - candidate, - }); + added_ice_candidates + .entry(connection) + .or_default() + .insert(candidate); } snownet::Event::InvalidateIceCandidate { connection, candidate, } => { - self.buffered_events - .push_back(GatewayEvent::InvalidIceCandidate { - conn_id: connection, - candidate, - }); + removed_ice_candidates + .entry(connection) + .or_default() + .insert(candidate); } snownet::Event::ConnectionEstablished(_) => {} } } + + for (conn_id, candidates) in added_ice_candidates.drain() { + self.buffered_events + .push_back(GatewayEvent::AddedIceCandidates { + conn_id, + candidates, + }) + } + + for (conn_id, candidates) in removed_ice_candidates.drain() { + self.buffered_events + .push_back(GatewayEvent::RemovedIceCandidates { + conn_id, + candidates, + }) + } } pub(crate) fn poll_transmit(&mut self) -> Option> { diff --git a/rust/connlib/tunnel/src/lib.rs b/rust/connlib/tunnel/src/lib.rs index 89fa768fd..1418d3cf1 100644 --- a/rust/connlib/tunnel/src/lib.rs +++ b/rust/connlib/tunnel/src/lib.rs @@ -264,13 +264,13 @@ where #[derive(Clone, Debug, PartialEq, Eq)] pub enum ClientEvent { - NewIceCandidate { + AddedIceCandidates { conn_id: GatewayId, - candidate: String, + candidates: HashSet, }, - InvalidatedIceCandidate { + RemovedIceCandidates { conn_id: GatewayId, - candidate: String, + candidates: HashSet, }, ConnectionIntent { resource: ResourceId, @@ -296,13 +296,13 @@ pub enum ClientEvent { #[derive(Debug, Clone)] pub enum GatewayEvent { - NewIceCandidate { + AddedIceCandidates { conn_id: ClientId, - candidate: String, + candidates: HashSet, }, - InvalidIceCandidate { + RemovedIceCandidates { conn_id: ClientId, - candidate: String, + candidates: HashSet, }, RefreshDns { name: DomainName, diff --git a/rust/connlib/tunnel/src/tests/sut.rs b/rust/connlib/tunnel/src/tests/sut.rs index f1dd66c26..6eb8f71e4 100644 --- a/rust/connlib/tunnel/src/tests/sut.rs +++ b/rust/connlib/tunnel/src/tests/sut.rs @@ -591,15 +591,22 @@ impl TunnelTest { global_dns_records: &BTreeMap>, ) { match event { - ClientEvent::NewIceCandidate { candidate, .. } => self.gateway.span.in_scope(|| { - self.gateway - .state - .add_ice_candidate(src, candidate, self.now) - }), - ClientEvent::InvalidatedIceCandidate { candidate, .. } => self - .gateway - .span - .in_scope(|| self.gateway.state.remove_ice_candidate(src, candidate)), + ClientEvent::AddedIceCandidates { candidates, .. } => { + self.gateway.span.in_scope(|| { + for candidate in candidates { + self.gateway + .state + .add_ice_candidate(src, candidate, self.now) + } + }) + } + ClientEvent::RemovedIceCandidates { candidates, .. } => { + self.gateway.span.in_scope(|| { + for candidate in candidates { + self.gateway.state.remove_ice_candidate(src, candidate) + } + }) + } ClientEvent::ConnectionIntent { resource, connected_gateway_ids, @@ -761,15 +768,22 @@ impl TunnelTest { fn on_gateway_event(&mut self, src: GatewayId, event: GatewayEvent) { match event { - GatewayEvent::NewIceCandidate { candidate, .. } => self.client.span.in_scope(|| { - self.client - .state - .add_ice_candidate(src, candidate, self.now) - }), - GatewayEvent::InvalidIceCandidate { candidate, .. } => self - .client - .span - .in_scope(|| self.client.state.remove_ice_candidate(src, candidate)), + GatewayEvent::AddedIceCandidates { candidates, .. } => { + self.client.span.in_scope(|| { + for candidate in candidates { + self.client + .state + .add_ice_candidate(src, candidate, self.now) + } + }) + } + GatewayEvent::RemovedIceCandidates { candidates, .. } => { + self.client.span.in_scope(|| { + for candidate in candidates { + self.client.state.remove_ice_candidate(src, candidate) + } + }) + } GatewayEvent::RefreshDns { .. } => todo!(), } } diff --git a/rust/gateway/src/eventloop.rs b/rust/gateway/src/eventloop.rs index 5911ea182..738f898a0 100644 --- a/rust/gateway/src/eventloop.rs +++ b/rust/gateway/src/eventloop.rs @@ -107,27 +107,27 @@ impl Eventloop { fn handle_tunnel_event(&mut self, event: firezone_tunnel::GatewayEvent) { match event { - firezone_tunnel::GatewayEvent::NewIceCandidate { + firezone_tunnel::GatewayEvent::AddedIceCandidates { conn_id: client, - candidate, + candidates, } => { self.portal.send( PHOENIX_TOPIC, EgressMessages::BroadcastIceCandidates(ClientsIceCandidates { client_ids: vec![client], - candidates: vec![candidate], + candidates, }), ); } - firezone_tunnel::GatewayEvent::InvalidIceCandidate { + firezone_tunnel::GatewayEvent::RemovedIceCandidates { conn_id: client, - candidate, + candidates, } => { self.portal.send( PHOENIX_TOPIC, EgressMessages::BroadcastInvalidatedIceCandidates(ClientsIceCandidates { client_ids: vec![client], - candidates: vec![candidate], + candidates, }), ); } diff --git a/rust/gateway/src/messages.rs b/rust/gateway/src/messages.rs index 179d41e71..e18c5dae6 100644 --- a/rust/gateway/src/messages.rs +++ b/rust/gateway/src/messages.rs @@ -1,4 +1,4 @@ -use std::net::IpAddr; +use std::{collections::HashSet, net::IpAddr}; use chrono::{serde::ts_seconds_option, DateTime, Utc}; use connlib_shared::{ @@ -122,7 +122,7 @@ pub struct ClientsIceCandidates { /// Client's id the ice candidates are meant for pub client_ids: Vec, /// Actual RTC ice candidates - pub candidates: Vec, + pub candidates: HashSet, } /// A client's ice candidate message.