fix(connlib): don't add new relays after nomination (#6876)

When relays reboot or get redeployed, the portal sends us new relays to
use and or relays we should discontinue using. To be more efficient with
battery and network usage, `connlib` only ever samples a single relay
out of all existing ones for a particular connection.

In case of a network topology where we need to use relays, there are
situations we can end up in:

- The client connects to the gateway's relay, i.e. to the port the
gateway allocated on the relay.
- The gateway connects to the client's relay, i.e to the port the client
allocated on the relay.

When we detect that a relay is down, the party that allocated the port
will now immediately (once #6666 is merged). The other party needs to
wait until it receives the invalidated candidates from its peer.
Invalidating that candidate will also invalidate the currently nominated
socket and fail the connection. In theory at least. That only works if
there are no other candidates available to try.

This is where this patch becomes important. Say we have the following
setup:

- Client samples relay A.
- Gateway samples relay B.
- The nominated candidate pair is "client server-reflexive <=> relay B",
i.e. the client talks to the allocated port on the gateway.

Next:

1. Client and portal get network-partitioned.
2. Relay B disappears.
3. Relay C appears.
4. Relay A reboots.
5. Client reconnects.

At this point, the client is told by the portal to use relays A & C.
Note that relay A rebooted and thus the allocation previously present on
the client is no longer valid. With #6666, we will detect this by
comparing credentials & IPs. The gateway is being told about the same
relays and as part of that, tests that relay B is still there. It learns
that it isn't, invalidates the candidates which fails the connection to
the client (but only locally!).

Meanwhile, as part of the regular `init` procedure, the client made a
new allocation with relays A & C. Because it had previously selected
relay A for the connection with the gateway, the new candidates are
added to the agent, forming new pairs. The gateway has already given up
on this connection however so it won't ever answer these STUN requests.

Concurrently, the gateway's invalidated candidates arrive the client.
They however don't fail the connection because the client is probing the
newly added candidates. This creates a state mismatch between the client
and gateway that is only resolved after the candidates start timing out,
adding an additional delay during which the connection isn't working.

With this PR, we prevent this from happening by only ever adding new
candidates while we are still in the nomination process of a socket. In
theory, there exists a race condition in which we nominate a relay
candidate first and then miss out on a server-reflexive candidate not
being added. In practice, this won't happen because:

- Our host candidates are always available first.
- We learn server-reflexive candidates already as part of the initial
BINDING, before creating the allocation.
- We learn server-reflexive candidates from all relays, not just the one
that has been assigned.

Related: #6666.
This commit is contained in:
Thomas Eizinger
2024-10-02 12:00:03 +10:00
committed by GitHub
parent 14544b27cc
commit d4e9384a08
3 changed files with 73 additions and 59 deletions

1
rust/Cargo.lock generated
View File

@@ -6898,6 +6898,7 @@ dependencies = [
"hex",
"hex-display",
"ip-packet",
"itertools 0.13.0",
"once_cell",
"rand 0.8.5",
"secrecy",

View File

@@ -11,6 +11,7 @@ bytes = "1.7.1"
hex = "0.4.0"
hex-display = "0.3.0"
ip-packet = { workspace = true }
itertools = "0.13"
once_cell = "1.17.1"
rand = "0.8"
secrecy = { workspace = true }

View File

@@ -10,6 +10,7 @@ use boringtun::{noise::rate_limiter::RateLimiter, x25519::StaticSecret};
use core::fmt;
use hex_display::HexDisplayExt;
use ip_packet::{ConvertibleIpv4Packet, ConvertibleIpv6Packet, IpPacket, IpPacketBuf};
use itertools::Itertools as _;
use rand::rngs::StdRng;
use rand::seq::IteratorRandom;
use rand::{random, SeedableRng};
@@ -233,8 +234,8 @@ where
}
};
let Some(agent) = self.connections.agent_mut(cid) else {
tracing::debug!("Unknown connection");
let Some((agent, relay)) = self.connections.connecting_agent_mut(cid) else {
tracing::debug!("Unknown connection or socket has already been nominated");
return;
};
@@ -251,7 +252,7 @@ where
| CandidateKind::PeerReflexive => {}
}
let Some(rid) = self.connections.relay(cid) else {
let Some(rid) = relay else {
tracing::debug!("No relay selected for connection");
return;
};
@@ -572,10 +573,10 @@ where
signalling_completed_at: now,
remote_pub_key: remote,
state: ConnectionState::Connecting {
relay,
buffered: RingBuffer::new(10),
},
possible_sockets: BTreeSet::default(),
relay,
span: info_span!(parent: tracing::Span::none(), "connection", %cid),
}
}
@@ -749,13 +750,18 @@ where
for (rid, event) in allocation_events {
match event {
CandidateEvent::New(candidate)
if candidate.kind() == CandidateKind::ServerReflexive =>
{
for (cid, agent, _span) in self.connections.agents_mut() {
add_local_candidate(cid, agent, candidate.clone(), &mut self.pending_events)
}
}
CandidateEvent::New(candidate) => {
add_local_candidate_to_all(
rid,
candidate,
&mut self.connections,
&mut self.pending_events,
);
for (cid, agent, _span) in self.connections.connecting_agents_by_relay_mut(rid)
{
add_local_candidate(cid, agent, candidate.clone(), &mut self.pending_events)
}
}
CandidateEvent::Invalid(candidate) => {
for (cid, agent, _span) in self.connections.agents_mut() {
@@ -933,7 +939,7 @@ where
impl<T, TId, RId> Node<T, TId, RId>
where
TId: Eq + Hash + Copy + fmt::Display,
RId: Copy + Eq + Hash + PartialEq + fmt::Debug + fmt::Display,
RId: Copy + Eq + Hash + PartialEq + Ord + fmt::Debug + fmt::Display,
{
fn seed_agent_with_local_candidates(
&mut self,
@@ -945,23 +951,31 @@ where
add_local_candidate(connection, agent, candidate, &mut self.pending_events);
}
for candidate in self
.allocations
.values()
.flat_map(|a| a.current_candidates())
.filter(|c| c.kind() == CandidateKind::ServerReflexive)
.unique()
{
add_local_candidate(connection, agent, candidate, &mut self.pending_events);
}
let Some(selected_relay) = selected_relay else {
tracing::debug!("Skipping seeding of relay candidates: No relay selected");
return;
};
for candidate in self
.allocations
.iter()
.filter_map(|(rid, allocation)| (*rid == selected_relay).then_some(allocation))
.flat_map(|allocation| allocation.current_candidates())
let Some(allocation) = self.allocations.get(&selected_relay) else {
tracing::debug!(%selected_relay, "Cannot seed relay candidates: Unknown relay");
return;
};
for candidate in allocation
.current_candidates()
.filter(|c| c.kind() == CandidateKind::Relayed)
{
add_local_candidate(
connection,
agent,
candidate.clone(),
&mut self.pending_events,
);
add_local_candidate(connection, agent, candidate, &mut self.pending_events);
}
}
}
@@ -1016,11 +1030,37 @@ where
maybe_initial_connection.or(maybe_established_connection)
}
fn relay(&mut self, id: TId) -> Option<RId> {
let maybe_initial_connection = self.initial.get_mut(&id).and_then(|i| i.relay);
let maybe_established_connection = self.established.get_mut(&id).and_then(|c| c.relay);
fn connecting_agent_mut(&mut self, id: TId) -> Option<(&mut IceAgent, Option<RId>)> {
let maybe_initial_connection = self.initial.get_mut(&id).map(|i| (&mut i.agent, i.relay));
let maybe_pending_connection = self.established.get_mut(&id).and_then(|c| match c.state {
ConnectionState::Connecting { relay, .. } => Some((&mut c.agent, relay)),
ConnectionState::Failed
| ConnectionState::Idle { .. }
| ConnectionState::Connected { .. } => None,
});
maybe_initial_connection.or(maybe_established_connection)
maybe_initial_connection.or(maybe_pending_connection)
}
fn connecting_agents_by_relay_mut(
&mut self,
id: RId,
) -> impl Iterator<Item = (TId, &mut IceAgent, tracing::span::Entered<'_>)> + '_ {
let initial_connections = self.initial.iter_mut().filter_map(move |(cid, i)| {
(i.relay? == id).then_some((*cid, &mut i.agent, i.span.enter()))
});
let pending_connections = self.established.iter_mut().filter_map(move |(cid, c)| {
use ConnectionState::*;
match c.state {
Connecting {
relay: Some(relay), ..
} if relay == id => Some((*cid, &mut c.agent, c.span.enter())),
Failed | Idle { .. } | Connecting { .. } | Connected { .. } => None,
}
});
initial_connections.chain(pending_connections)
}
fn agents_mut(
@@ -1099,34 +1139,6 @@ enum EncodeError {
NoChannel,
}
fn add_local_candidate_to_all<TId, RId>(
rid: RId,
candidate: Candidate,
connections: &mut Connections<TId, RId>,
pending_events: &mut VecDeque<Event<TId>>,
) where
TId: Copy + fmt::Display,
RId: Copy + PartialEq,
{
let initial_connections = connections
.initial
.iter_mut()
.flat_map(|(id, c)| Some((*id, &mut c.agent, c.relay?)));
let established_connections = connections
.established
.iter_mut()
.flat_map(|(id, c)| Some((*id, &mut c.agent, c.relay?)));
for (cid, agent, _) in initial_connections
.chain(established_connections)
.filter(|(_, _, selected_relay)| *selected_relay == rid)
{
let _span = info_span!("connection", %cid).entered();
add_local_candidate(cid, agent, candidate.clone(), pending_events);
}
}
fn add_local_candidate<TId>(
id: TId,
agent: &mut IceAgent,
@@ -1362,11 +1374,6 @@ struct Connection<RId> {
/// Socket addresses from which we might receive data (even before we are connected).
possible_sockets: BTreeSet<SocketAddr>,
/// The relay we have selected for this connection.
///
/// `None` if we didn't have any relays available.
relay: Option<RId>,
stats: ConnectionStats,
intent_sent_at: Instant,
signalling_completed_at: Instant,
@@ -1379,6 +1386,11 @@ struct Connection<RId> {
enum ConnectionState<RId> {
/// We are still running ICE to figure out, which socket to use to send data.
Connecting {
/// The relay we have selected for this connection.
///
/// `None` if we didn't have any relays available.
relay: Option<RId>,
/// Packets emitted by wireguard whilst are still running ICE.
///
/// This can happen if the remote's WG session initiation arrives at our socket before we nominate it.