chore(connlib): batch together sending of ICE candidates (#5616)

Currently, we are sending each ICE candidate individually from the
client to the gateway and vice versa. This causes a slight delay as to
when each ICE candidate gets added on the remote ICE agent. As a result,
they all start being tested with a slight offset which causes "endpoint
hopping" whenever a connection expires as they expire just after each
other.

In addition, sending multiple messages to the portal causes unnecessary
load when establishing connections.

Finally, with #5283 we started **not** adding the server-reflexive
candidate to the local ICE agent. Because we talk to multiple relays, we
detect the same server-reflexive candidate multiple times if we are
behind a non-symmetric NAT. Not adding the server-reflexive candidate to
the ICE agent mitigated our de-duplication strategy here which means we
currently send the same candidate multiple times to a peer, causing
additional, unnecessary load.

All of this can be mitigated by batching together all our ICE candidates
together into one message.

Resolves: #3978.
This commit is contained in:
Thomas Eizinger
2024-06-28 12:04:31 +10:00
committed by GitHub
parent 79ff3f830b
commit aadb045b27
8 changed files with 117 additions and 68 deletions

View File

@@ -99,31 +99,31 @@ where
fn handle_tunnel_event(&mut self, event: firezone_tunnel::ClientEvent) {
match event {
firezone_tunnel::ClientEvent::NewIceCandidate {
firezone_tunnel::ClientEvent::AddedIceCandidates {
conn_id: gateway,
candidate,
candidates,
} => {
tracing::debug!(%gateway, %candidate, "Sending new ICE candidate to gateway");
tracing::debug!(%gateway, ?candidates, "Sending new ICE candidates to gateway");
self.portal.send(
PHOENIX_TOPIC,
EgressMessages::BroadcastIceCandidates(GatewaysIceCandidates {
gateway_ids: vec![gateway],
candidates: vec![candidate],
candidates,
}),
);
}
firezone_tunnel::ClientEvent::InvalidatedIceCandidate {
firezone_tunnel::ClientEvent::RemovedIceCandidates {
conn_id: gateway,
candidate,
candidates,
} => {
tracing::debug!(%gateway, %candidate, "Sending invalidated ICE candidate to gateway");
tracing::debug!(%gateway, ?candidates, "Sending invalidated ICE candidates to gateway");
self.portal.send(
PHOENIX_TOPIC,
EgressMessages::BroadcastInvalidatedIceCandidates(GatewaysIceCandidates {
gateway_ids: vec![gateway],
candidates: vec![candidate],
candidates,
}),
);
}

View File

@@ -62,7 +62,7 @@ pub struct GatewaysIceCandidates {
/// The list of gateway IDs these candidates will be broadcast to.
pub gateway_ids: Vec<GatewayId>,
/// Actual RTC ice candidates
pub candidates: Vec<String>,
pub candidates: HashSet<String>,
}
#[derive(Debug, Deserialize, Serialize, Clone, PartialEq)]
@@ -118,10 +118,10 @@ mod test {
"client",
EgressMessages::BroadcastIceCandidates(GatewaysIceCandidates {
gateway_ids: vec!["b3d34a15-55ab-40df-994b-a838e75d65d7".parse().unwrap()],
candidates: vec![
candidates: HashSet::from([
"candidate:7031633958891736544 1 udp 50331391 35.244.108.190 53909 typ relay"
.to_owned(),
],
]),
}),
Some(OutboundRequestId::for_test(6)),
);

View File

@@ -813,6 +813,8 @@ impl ClientState {
self.mangled_dns_queries.retain(|_, exp| now < *exp);
let mut resources_changed = false; // Track this separately to batch together `ResourcesChanged` events.
let mut added_ice_candidates = HashMap::<GatewayId, HashSet<String>>::default();
let mut removed_ice_candidates = HashMap::<GatewayId, HashSet<String>>::default();
while let Some(event) = self.node.poll_event() {
match event {
@@ -823,21 +825,21 @@ impl ClientState {
snownet::Event::NewIceCandidate {
connection,
candidate,
} => self
.buffered_events
.push_back(ClientEvent::NewIceCandidate {
conn_id: connection,
candidate,
}),
} => {
added_ice_candidates
.entry(connection)
.or_default()
.insert(candidate);
}
snownet::Event::InvalidateIceCandidate {
connection,
candidate,
} => self
.buffered_events
.push_back(ClientEvent::InvalidatedIceCandidate {
conn_id: connection,
candidate,
}),
} => {
removed_ice_candidates
.entry(connection)
.or_default()
.insert(candidate);
}
snownet::Event::ConnectionEstablished(id) => {
self.update_site_status_by_gateway(&id, Status::Online);
resources_changed = true;
@@ -851,6 +853,22 @@ impl ClientState {
resources: self.resources(),
});
}
for (conn_id, candidates) in added_ice_candidates.drain() {
self.buffered_events
.push_back(ClientEvent::AddedIceCandidates {
conn_id,
candidates,
})
}
for (conn_id, candidates) in removed_ice_candidates.drain() {
self.buffered_events
.push_back(ClientEvent::RemovedIceCandidates {
conn_id,
candidates,
})
}
}
fn update_site_status_by_gateway(&mut self, gateway_id: &GatewayId, status: Status) {

View File

@@ -12,7 +12,7 @@ use connlib_shared::{Callbacks, DomainName, Error, Result, StaticSecret};
use ip_packet::{IpPacket, MutableIpPacket};
use secrecy::{ExposeSecret as _, Secret};
use snownet::{RelaySocket, ServerNode};
use std::collections::{HashSet, VecDeque};
use std::collections::{HashMap, HashSet, VecDeque};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::time::{Duration, Instant};
@@ -351,6 +351,9 @@ impl GatewayState {
Some(_) => {}
}
let mut added_ice_candidates = HashMap::<ClientId, HashSet<String>>::default();
let mut removed_ice_candidates = HashMap::<ClientId, HashSet<String>>::default();
while let Some(event) = self.node.poll_event() {
match event {
snownet::Event::ConnectionFailed(id) | snownet::Event::ConnectionClosed(id) => {
@@ -360,25 +363,39 @@ impl GatewayState {
connection,
candidate,
} => {
self.buffered_events
.push_back(GatewayEvent::NewIceCandidate {
conn_id: connection,
candidate,
});
added_ice_candidates
.entry(connection)
.or_default()
.insert(candidate);
}
snownet::Event::InvalidateIceCandidate {
connection,
candidate,
} => {
self.buffered_events
.push_back(GatewayEvent::InvalidIceCandidate {
conn_id: connection,
candidate,
});
removed_ice_candidates
.entry(connection)
.or_default()
.insert(candidate);
}
snownet::Event::ConnectionEstablished(_) => {}
}
}
for (conn_id, candidates) in added_ice_candidates.drain() {
self.buffered_events
.push_back(GatewayEvent::AddedIceCandidates {
conn_id,
candidates,
})
}
for (conn_id, candidates) in removed_ice_candidates.drain() {
self.buffered_events
.push_back(GatewayEvent::RemovedIceCandidates {
conn_id,
candidates,
})
}
}
pub(crate) fn poll_transmit(&mut self) -> Option<snownet::Transmit<'static>> {

View File

@@ -264,13 +264,13 @@ where
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ClientEvent {
NewIceCandidate {
AddedIceCandidates {
conn_id: GatewayId,
candidate: String,
candidates: HashSet<String>,
},
InvalidatedIceCandidate {
RemovedIceCandidates {
conn_id: GatewayId,
candidate: String,
candidates: HashSet<String>,
},
ConnectionIntent {
resource: ResourceId,
@@ -296,13 +296,13 @@ pub enum ClientEvent {
#[derive(Debug, Clone)]
pub enum GatewayEvent {
NewIceCandidate {
AddedIceCandidates {
conn_id: ClientId,
candidate: String,
candidates: HashSet<String>,
},
InvalidIceCandidate {
RemovedIceCandidates {
conn_id: ClientId,
candidate: String,
candidates: HashSet<String>,
},
RefreshDns {
name: DomainName,

View File

@@ -591,15 +591,22 @@ impl TunnelTest {
global_dns_records: &BTreeMap<DomainName, HashSet<IpAddr>>,
) {
match event {
ClientEvent::NewIceCandidate { candidate, .. } => self.gateway.span.in_scope(|| {
self.gateway
.state
.add_ice_candidate(src, candidate, self.now)
}),
ClientEvent::InvalidatedIceCandidate { candidate, .. } => self
.gateway
.span
.in_scope(|| self.gateway.state.remove_ice_candidate(src, candidate)),
ClientEvent::AddedIceCandidates { candidates, .. } => {
self.gateway.span.in_scope(|| {
for candidate in candidates {
self.gateway
.state
.add_ice_candidate(src, candidate, self.now)
}
})
}
ClientEvent::RemovedIceCandidates { candidates, .. } => {
self.gateway.span.in_scope(|| {
for candidate in candidates {
self.gateway.state.remove_ice_candidate(src, candidate)
}
})
}
ClientEvent::ConnectionIntent {
resource,
connected_gateway_ids,
@@ -761,15 +768,22 @@ impl TunnelTest {
fn on_gateway_event(&mut self, src: GatewayId, event: GatewayEvent) {
match event {
GatewayEvent::NewIceCandidate { candidate, .. } => self.client.span.in_scope(|| {
self.client
.state
.add_ice_candidate(src, candidate, self.now)
}),
GatewayEvent::InvalidIceCandidate { candidate, .. } => self
.client
.span
.in_scope(|| self.client.state.remove_ice_candidate(src, candidate)),
GatewayEvent::AddedIceCandidates { candidates, .. } => {
self.client.span.in_scope(|| {
for candidate in candidates {
self.client
.state
.add_ice_candidate(src, candidate, self.now)
}
})
}
GatewayEvent::RemovedIceCandidates { candidates, .. } => {
self.client.span.in_scope(|| {
for candidate in candidates {
self.client.state.remove_ice_candidate(src, candidate)
}
})
}
GatewayEvent::RefreshDns { .. } => todo!(),
}
}

View File

@@ -107,27 +107,27 @@ impl Eventloop {
fn handle_tunnel_event(&mut self, event: firezone_tunnel::GatewayEvent) {
match event {
firezone_tunnel::GatewayEvent::NewIceCandidate {
firezone_tunnel::GatewayEvent::AddedIceCandidates {
conn_id: client,
candidate,
candidates,
} => {
self.portal.send(
PHOENIX_TOPIC,
EgressMessages::BroadcastIceCandidates(ClientsIceCandidates {
client_ids: vec![client],
candidates: vec![candidate],
candidates,
}),
);
}
firezone_tunnel::GatewayEvent::InvalidIceCandidate {
firezone_tunnel::GatewayEvent::RemovedIceCandidates {
conn_id: client,
candidate,
candidates,
} => {
self.portal.send(
PHOENIX_TOPIC,
EgressMessages::BroadcastInvalidatedIceCandidates(ClientsIceCandidates {
client_ids: vec![client],
candidates: vec![candidate],
candidates,
}),
);
}

View File

@@ -1,4 +1,4 @@
use std::net::IpAddr;
use std::{collections::HashSet, net::IpAddr};
use chrono::{serde::ts_seconds_option, DateTime, Utc};
use connlib_shared::{
@@ -122,7 +122,7 @@ pub struct ClientsIceCandidates {
/// Client's id the ice candidates are meant for
pub client_ids: Vec<ClientId>,
/// Actual RTC ice candidates
pub candidates: Vec<String>,
pub candidates: HashSet<String>,
}
/// A client's ice candidate message.