fix(connlib): handle silently rebooted / disconnected relays (#6666)

Our relays are essential for connectivity because they also perform STUN
for us through which we learn our server-reflexive address. Thus, we
must at all times have at least one relay that we can reach in order to
establish a connection.

The portal tracks the connectivity to the relays for us and in case any
of them go down, sends us a `relays_presence` message, meaning we can
stop using that relay and migrate any relayed connections to a new one.
This works well for as long as we are connected to the portal while the
relay is rebooting / going-down. If we are not currently connected to
the portal and a relay we are using reboots, we don't learn about it. At
least if we are actively using it, the connection will fail and further
attempted communication with the relay will time-out and we will stop
using it.

In case we aren't currently using the relay, this gets a bit trickier.
If we aren't using the relay but it rebooted while we were partitioned
from the portal, logging in again might return the same relay to us in
the `init` message, but this time with different credentials.

The first bug that we are fixing in this PR is that we previously
ignored those credentials because we already knew about the relay,
thinking that we can still use our existing credentials. The fix here is
to also compare the credentials and ditch the local state if they
differ.

The second bug identified during fixing the first one is that we need to
pro-actively probe whether all other relays that we know about are
actually still responsive. For that, we issue a `REFRESH` message to
them. If that one times-out or fails otherwise, we will remove that one
from our list of `Allocation`s too.

To fix the 2nd bug, several changes were necessary:

1. We lower the log-level of `Disconnecting from relay` from ERROR to
WARN. Any ERROR emitted during a test-run fails our test-suite which is
what partially motivated this. The test suite builds on the assumption
that ERRORs are fatal and thus should never happen during our tests.
This change surfaces that disconnecting from a relay can indeed happen
during normal operation, which justifies lowering this to WARN. Users
should at the minimum monitor on WARN to be alerted about problems.
2. We reduce the total backoff duration for requests to relays from 60s
to 10s. The current 60s result in total of 8 retries. UDP is unreliable
but it isn't THAT unreliable to justify retrying everything for 60s. We
also use a 10s timeout for ICE, which means these are now aligned to
better match each other. We had to change the max backoff duration
because we only idle-spin for at most 10s in the tests and thus the
current 60s were too long to detect that a relay actually disappeared.
3. We had to shuffle around some function calls to make sure all
intermediary event buffers are emptied at the right point in time to
make the test deterministic.

Fixes: #6648.
This commit is contained in:
Thomas Eizinger
2024-10-03 07:14:51 +10:00
committed by GitHub
parent 9ff1b2aa78
commit 7e0fa50cae
10 changed files with 304 additions and 131 deletions

View File

@@ -259,9 +259,7 @@ impl Allocation {
tracing::debug!("Refreshing allocation");
// Allocation is not suspended here, we check as part of `handle_input` whether we need to `ALLOCATE` or `REFRESH`
self.active_socket = None;
self.send_binding_requests();
self.authenticate_and_queue(make_refresh_request(self.software.clone()), None);
}
#[tracing::instrument(level = "debug", skip_all, fields(%from, tid, method, class, rtt))]
@@ -620,6 +618,7 @@ impl Allocation {
// If we fail to queue the refresh message because we've exceeded our backoff, give up.
if !queued && is_refresh {
self.active_socket = None; // The socket seems to no longer be reachable.
self.invalidate_allocation();
}
@@ -788,6 +787,16 @@ impl Allocation {
self.credentials.is_some()
}
pub fn matches_credentials(&self, username: &Username, password: &str) -> bool {
self.credentials
.as_ref()
.is_some_and(|c| &c.username == username && c.password == password)
}
pub fn matches_socket(&self, socket: &RelaySocket) -> bool {
&self.server == socket
}
fn log_update(&self, now: Instant) {
tracing::info!(
srflx_ip4 = ?self.ip4_srflx_candidate.as_ref().map(|c| c.addr()),
@@ -814,6 +823,8 @@ impl Allocation {
}
fn invalidate_allocation(&mut self) {
tracing::info!(active_socket = ?self.active_socket, "Invalidating allocation");
if let Some(candidate) = self.ip4_allocation.take() {
self.events.push_back(CandidateEvent::Invalid(candidate))
}
@@ -1933,10 +1944,6 @@ mod tests {
allocation.refresh_with_same_credentials();
let binding = allocation.next_message().unwrap();
assert_eq!(binding.method(), BINDING);
allocation.handle_test_input_ip4(&binding_response(&binding, PEER1), Instant::now());
let refresh = allocation.next_message().unwrap();
assert_eq!(refresh.method(), REFRESH);
@@ -2033,10 +2040,6 @@ mod tests {
allocation.refresh_with_same_credentials();
let binding = allocation.next_message().unwrap();
assert_eq!(binding.method(), BINDING);
allocation.handle_test_input_ip4(&binding_response(&binding, PEER1), Instant::now());
let refresh = allocation.next_message().unwrap();
allocation.handle_test_input_ip4(&allocation_mismatch(&refresh), Instant::now());
@@ -2337,10 +2340,6 @@ mod tests {
let now = now + Duration::from_secs(1);
allocation.refresh(now);
let binding = allocation.next_message().unwrap();
assert_eq!(binding.method(), BINDING);
allocation.handle_test_input_ip4(&binding_response(&binding, PEER1), Instant::now());
// If the relay is restarted, our current credentials will be invalid. Simulate with an "unauthorized" response".
let now = now + Duration::from_secs(1);
let refresh = allocation.next_message().unwrap();

View File

@@ -24,7 +24,7 @@ pub fn new(
multiplier: backoff::default::MULTIPLIER,
max_interval: Duration::from_millis(backoff::default::MAX_INTERVAL_MILLIS),
start_time: now,
max_elapsed_time: Some(Duration::from_secs(60)),
max_elapsed_time: Some(Duration::from_secs(10)),
clock: ManualClock { now },
}
}
@@ -33,7 +33,7 @@ pub fn new(
///
/// The current strategy is multiplying the previous interval by 1.5 and adding them up.
#[cfg(test)]
pub fn steps(start: Instant) -> [Instant; 8] {
pub fn steps(start: Instant) -> [Instant; 4] {
fn secs(secs: f64) -> Duration {
Duration::from_nanos((secs * 1_000_000_000.0) as u64)
}
@@ -43,9 +43,5 @@ pub fn steps(start: Instant) -> [Instant; 8] {
start + secs(1.0 + 1.5),
start + secs(1.0 + 1.5 + 2.25),
start + secs(1.0 + 1.5 + 2.25 + 3.375),
start + secs(1.0 + 1.5 + 2.25 + 3.375 + 5.0625),
start + secs(1.0 + 1.5 + 2.25 + 3.375 + 5.0625 + 7.59375),
start + secs(1.0 + 1.5 + 2.25 + 3.375 + 5.0625 + 7.59375 + 11.390625),
start + secs(1.0 + 1.5 + 2.25 + 3.375 + 5.0625 + 7.59375 + 11.390625 + 17.0859375),
]
}

View File

@@ -13,10 +13,11 @@ use ip_packet::{ConvertibleIpv4Packet, ConvertibleIpv6Packet, IpPacket, IpPacket
use itertools::Itertools as _;
use rand::rngs::StdRng;
use rand::seq::IteratorRandom;
use rand::{random, SeedableRng};
use rand::{random, Rng, SeedableRng};
use secrecy::{ExposeSecret, Secret};
use sha2::Digest;
use std::borrow::Cow;
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, BTreeSet};
use std::hash::Hash;
use std::marker::PhantomData;
@@ -266,7 +267,7 @@ where
}
#[tracing::instrument(level = "info", skip_all, fields(%cid))]
pub fn remove_remote_candidate(&mut self, cid: TId, candidate: String) {
pub fn remove_remote_candidate(&mut self, cid: TId, candidate: String, now: Instant) {
let candidate = match Candidate::from_sdp_string(&candidate) {
Ok(c) => c,
Err(e) => {
@@ -277,6 +278,7 @@ where
if let Some(agent) = self.connections.agent_mut(cid) {
agent.invalidate_candidate(&candidate);
agent.handle_timeout(now); // We may have invalidated the last candidate, ensure we check our nomination state.
}
}
@@ -423,7 +425,11 @@ where
///
/// As such, it ends up being cleaner to "drain" all lower-level components of their events, transmits etc within this function.
pub fn handle_timeout(&mut self, now: Instant) {
self.bindings_and_allocations_drain_events();
for allocation in self.allocations.values_mut() {
allocation.handle_timeout(now);
}
self.allocations_drain_events();
for (id, connection) in self.connections.iter_established_mut() {
connection.handle_timeout(id, now, &mut self.allocations, &mut self.buffered_transmits);
@@ -433,10 +439,6 @@ where
connection.handle_timeout(id, now);
}
for allocation in self.allocations.values_mut() {
allocation.handle_timeout(now);
}
let next_reset = *self.next_rate_limiter_reset.get_or_insert(now);
if now >= next_reset {
@@ -447,12 +449,14 @@ where
self.allocations
.retain(|rid, allocation| match allocation.can_be_freed() {
Some(e) => {
tracing::error!(%rid, "Disconnecting from relay; {e}");
tracing::warn!(%rid, "Disconnecting from relay; {e}");
false
}
None => true,
});
self.connections
.check_relays_available(&self.allocations, &mut self.rng);
self.connections.gc(&mut self.pending_events);
}
@@ -485,21 +489,18 @@ where
now: Instant,
) {
// First, invalidate all candidates from relays that we should stop using.
for rid in to_remove {
let Some(allocation) = self.allocations.remove(&rid) else {
for rid in &to_remove {
let Some(allocation) = self.allocations.remove(rid) else {
tracing::debug!(%rid, "Cannot delete unknown allocation");
continue;
};
for (cid, agent, _guard) in self.connections.agents_mut() {
for candidate in allocation
.current_candidates()
.filter(|c| c.kind() == CandidateKind::Relayed)
{
remove_local_candidate(cid, agent, &candidate, &mut self.pending_events);
}
}
invalidate_allocation_candidates(
&mut self.connections,
&allocation,
&mut self.pending_events,
);
tracing::info!(%rid, address = ?allocation.server(), "Removed TURN server");
}
@@ -515,24 +516,61 @@ where
continue;
};
if self.allocations.contains_key(rid) {
tracing::info!(%rid, address = ?server, "Skipping known TURN server");
continue;
match self.allocations.entry(*rid) {
Entry::Vacant(v) => {
v.insert(Allocation::new(
*server,
username,
password.clone(),
realm,
now,
self.session_id.clone(),
));
tracing::info!(%rid, address = ?server, "Added new TURN server");
}
Entry::Occupied(mut o) => {
let allocation = o.get();
if allocation.matches_credentials(&username, password)
&& allocation.matches_socket(server)
{
tracing::info!(%rid, address = ?server, "Skipping known TURN server");
continue;
}
invalidate_allocation_candidates(
&mut self.connections,
allocation,
&mut self.pending_events,
);
o.insert(Allocation::new(
*server,
username,
password.clone(),
realm,
now,
self.session_id.clone(),
));
tracing::info!(%rid, address = ?server, "Replaced TURN server");
}
}
}
self.allocations.insert(
*rid,
Allocation::new(
*server,
username,
password.clone(),
realm,
now,
self.session_id.clone(),
),
);
let newly_added_relays = to_add
.iter()
.map(|(id, _, _, _, _)| *id)
.collect::<BTreeSet<_>>();
tracing::info!(%rid, address = ?server, "Added new TURN server");
// Third, check if other relays are still present.
for (_, previous_allocation) in self
.allocations
.iter_mut()
.filter(|(id, _)| !newly_added_relays.contains(id))
{
previous_allocation.refresh(now);
}
}
@@ -742,13 +780,14 @@ where
}))
}
fn bindings_and_allocations_drain_events(&mut self) {
let allocation_events = self
.allocations
.iter_mut()
.flat_map(|(rid, allocation)| Some((*rid, allocation.poll_event()?)));
fn allocations_drain_events(&mut self) {
let allocation_events = self.allocations.iter_mut().flat_map(|(rid, allocation)| {
std::iter::from_fn(|| allocation.poll_event()).map(|e| (*rid, e))
});
for (rid, event) in allocation_events {
tracing::trace!(%rid, ?event);
match event {
CandidateEvent::New(candidate)
if candidate.kind() == CandidateKind::ServerReflexive =>
@@ -774,7 +813,11 @@ where
/// Sample a relay to use for a new connection.
fn sample_relay(&mut self) -> Option<RId> {
self.allocations.keys().copied().choose(&mut self.rng)
let rid = self.allocations.keys().copied().choose(&mut self.rng)?;
tracing::debug!(%rid, "Sampled relay");
Some(rid)
}
}
@@ -1019,6 +1062,66 @@ where
});
}
fn check_relays_available(
&mut self,
allocations: &BTreeMap<RId, Allocation>,
rng: &mut impl Rng,
) {
// For initial connections, we can just update the relay to be used.
for (_, c) in self.iter_initial_mut() {
if c.relay.is_some_and(|r| allocations.contains_key(&r)) {
continue;
}
let _guard = c.span.enter();
let Some(new_rid) = allocations.keys().copied().choose(rng) else {
continue;
};
tracing::info!(old_rid = ?c.relay, %new_rid, "Updating relay");
c.relay = Some(new_rid);
}
// For established connections, we check if we are currently using the relay.
for (_, c) in self.iter_established_mut() {
let _guard = c.span.enter();
use ConnectionState::*;
let peer_socket = match &mut c.state {
Connected { peer_socket, .. } | Idle { peer_socket } => peer_socket,
Failed => continue,
Connecting {
relay: maybe_relay, ..
} => {
let Some(relay) = maybe_relay else {
continue;
};
if allocations.contains_key(relay) {
continue;
}
tracing::debug!("Selected relay disconnected during ICE; connection may fail");
*maybe_relay = None;
continue;
}
};
let relay = match peer_socket {
PeerSocket::Direct { .. } => continue, // Don't care if relay of direct connection disappears, we weren't using it anyway.
PeerSocket::Relay { relay, .. } => relay,
};
if allocations.contains_key(relay) {
continue; // Our relay is still there, no problems.
}
tracing::info!("Connection failed (relay disconnected)");
c.state = ConnectionState::Failed;
}
}
fn stats(&self) -> impl Iterator<Item = (TId, ConnectionStats)> + '_ {
self.established.iter().map(move |(id, c)| (*id, c.stats))
}
@@ -1166,6 +1269,24 @@ fn add_local_candidate<TId>(
}
}
fn invalidate_allocation_candidates<TId, RId>(
connections: &mut Connections<TId, RId>,
allocation: &Allocation,
pending_events: &mut VecDeque<Event<TId>>,
) where
TId: Eq + Hash + Copy + Ord + fmt::Display,
RId: Copy + Eq + Hash + PartialEq + Ord + fmt::Debug + fmt::Display,
{
for (cid, agent, _guard) in connections.agents_mut() {
for candidate in allocation
.current_candidates()
.filter(|c| c.kind() == CandidateKind::Relayed)
{
remove_local_candidate(cid, agent, &candidate, pending_events);
}
}
}
fn remove_local_candidate<TId>(
id: TId,
agent: &mut IceAgent,

File diff suppressed because one or more lines are too long

View File

@@ -150,7 +150,8 @@ impl ClientTunnel {
}
pub fn remove_ice_candidate(&mut self, conn_id: GatewayId, ice_candidate: String) {
self.role_state.remove_ice_candidate(conn_id, ice_candidate);
self.role_state
.remove_ice_candidate(conn_id, ice_candidate, Instant::now());
}
pub fn on_routing_details(
@@ -501,8 +502,14 @@ impl ClientState {
self.node.add_remote_candidate(conn_id, ice_candidate, now);
}
pub fn remove_ice_candidate(&mut self, conn_id: GatewayId, ice_candidate: String) {
self.node.remove_remote_candidate(conn_id, ice_candidate);
pub fn remove_ice_candidate(
&mut self,
conn_id: GatewayId,
ice_candidate: String,
now: Instant,
) {
self.node
.remove_remote_candidate(conn_id, ice_candidate, now);
}
#[tracing::instrument(level = "trace", skip_all, fields(%resource_id))]
@@ -906,10 +913,10 @@ impl ClientState {
pub fn handle_timeout(&mut self, now: Instant) {
self.node.handle_timeout(now);
self.drain_node_events();
self.mangled_dns_queries.retain(|_, exp| now < *exp);
self.forwarded_dns_queries.retain(|_, (_, exp)| now < *exp);
self.drain_node_events();
}
fn maybe_update_tun_routes(&mut self) {
@@ -1256,6 +1263,7 @@ impl ClientState {
now: Instant,
) {
self.node.update_relays(to_remove, &to_add, now);
self.drain_node_events(); // Ensure all state changes are fully-propagated.
}
}

View File

@@ -125,7 +125,8 @@ impl GatewayTunnel {
}
pub fn remove_ice_candidate(&mut self, conn_id: ClientId, ice_candidate: String) {
self.role_state.remove_ice_candidate(conn_id, ice_candidate);
self.role_state
.remove_ice_candidate(conn_id, ice_candidate, Instant::now());
}
}
@@ -245,8 +246,9 @@ impl GatewayState {
self.node.add_remote_candidate(conn_id, ice_candidate, now);
}
pub fn remove_ice_candidate(&mut self, conn_id: ClientId, ice_candidate: String) {
self.node.remove_remote_candidate(conn_id, ice_candidate);
pub fn remove_ice_candidate(&mut self, conn_id: ClientId, ice_candidate: String, now: Instant) {
self.node
.remove_remote_candidate(conn_id, ice_candidate, now);
}
/// Accept a connection request from a client.
@@ -330,6 +332,7 @@ impl GatewayState {
pub fn handle_timeout(&mut self, now: Instant, utc_now: DateTime<Utc>) {
self.node.handle_timeout(now);
self.drain_node_events();
match self.next_expiry_resources_check {
Some(next_expiry_resources_check) if now >= next_expiry_resources_check => {
@@ -344,7 +347,9 @@ impl GatewayState {
None => self.next_expiry_resources_check = Some(now + EXPIRE_RESOURCES_INTERVAL),
Some(_) => {}
}
}
fn drain_node_events(&mut self) {
let mut added_ice_candidates = BTreeMap::<ClientId, BTreeSet<String>>::default();
let mut removed_ice_candidates = BTreeMap::<ClientId, BTreeSet<String>>::default();
@@ -417,6 +422,7 @@ impl GatewayState {
now: Instant,
) {
self.node.update_relays(to_remove, &to_add, now);
self.drain_node_events()
}
}

View File

@@ -2,7 +2,7 @@ use super::{
composite_strategy::CompositeStrategy, sim_client::*, sim_dns::*, sim_gateway::*, sim_net::*,
strategies::*, stub_portal::StubPortal, transition::*,
};
use crate::dns::is_subdomain;
use crate::{dns::is_subdomain, proptest::relay_id};
use connlib_shared::{
messages::{
client::{self, ResourceDescription},
@@ -67,7 +67,7 @@ impl ReferenceStateMachine for ReferenceState {
system_dns_servers(dns_servers.values().cloned().collect()),
upstream_dns_servers(dns_servers.values().cloned().collect()),
);
let relays = relays();
let relays = relays(relay_id());
let global_dns_records = global_dns_records(); // Start out with a set of global DNS records so we have something to resolve outside of DNS resources.
let drop_direct_client_traffic = any::<bool>();
@@ -195,8 +195,15 @@ impl ReferenceStateMachine for ReferenceState {
sample::select(resource_ids).prop_map(Transition::DeactivateResource)
})
.with(1, roam_client())
.with(1, relays().prop_map(Transition::DeployNewRelays))
.with(1, relays(relay_id()).prop_map(Transition::DeployNewRelays))
.with(1, Just(Transition::PartitionRelaysFromPortal))
.with(
1,
relays(sample::select(
state.relays.keys().copied().collect::<Vec<_>>(),
))
.prop_map(Transition::RebootRelaysWhilePartitioned),
)
.with(1, Just(Transition::ReconnectPortal))
.with(1, Just(Transition::Idle))
.with_if_not_empty(1, state.client.inner().all_resource_ids(), |resources_id| {
@@ -454,22 +461,9 @@ impl ReferenceStateMachine for ReferenceState {
// We do re-add all resources though so depending on the order they are added in, overlapping CIDR resources may change.
state.client.exec_mut(|c| c.readd_all_resources());
}
Transition::DeployNewRelays(new_relays) => {
// Always take down all relays because we can't know which one was sampled for the connection.
for relay in state.relays.values() {
state.network.remove_host(relay);
}
state.relays.clear();
for (rid, new_relay) in new_relays {
state.relays.insert(*rid, new_relay.clone());
debug_assert!(state.network.add_host(*rid, new_relay));
}
// In case we were using the relays, all connections will be cut and require us to make a new one.
if state.drop_direct_client_traffic {
state.client.exec_mut(|client| client.reset_connections());
}
Transition::DeployNewRelays(new_relays) => state.deploy_new_relays(new_relays),
Transition::RebootRelaysWhilePartitioned(new_relays) => {
state.deploy_new_relays(new_relays)
}
Transition::Idle => {}
Transition::PartitionRelaysFromPortal => {
@@ -620,7 +614,8 @@ impl ReferenceStateMachine for ReferenceState {
Transition::DeactivateResource(r) => {
state.client.inner().all_resource_ids().contains(r)
}
Transition::DeployNewRelays(new_relays) => {
Transition::RebootRelaysWhilePartitioned(new_relays)
| Transition::DeployNewRelays(new_relays) => {
let mut additional_routes = RoutingTable::default();
for (rid, relay) in new_relays {
if !additional_routes.add_host(*rid, relay) {
@@ -672,6 +667,24 @@ impl ReferenceState {
all_resources
}
fn deploy_new_relays(&mut self, new_relays: &BTreeMap<RelayId, Host<u64>>) {
// Always take down all relays because we can't know which one was sampled for the connection.
for relay in self.relays.values() {
self.network.remove_host(relay);
}
self.relays.clear();
for (rid, new_relay) in new_relays {
self.relays.insert(*rid, new_relay.clone());
debug_assert!(self.network.add_host(*rid, new_relay));
}
// In case we were using the relays, all connections will be cut and require us to make a new one.
if self.drop_direct_client_traffic {
self.client.exec_mut(|client| client.reset_connections());
}
}
}
pub(crate) fn private_key() -> impl Strategy<Value = PrivateKey> {

View File

@@ -109,8 +109,10 @@ pub(crate) fn stub_portal() -> impl Strategy<Value = StubPortal> {
)
}
pub(crate) fn relays() -> impl Strategy<Value = BTreeMap<RelayId, Host<u64>>> {
collection::btree_map(relay_id(), ref_relay_host(), 1..=2)
pub(crate) fn relays(
id: impl Strategy<Value = RelayId>,
) -> impl Strategy<Value = BTreeMap<RelayId, Host<u64>>> {
collection::btree_map(id, ref_relay_host(), 1..=2)
}
/// Sample a list of DNS servers.

View File

@@ -273,28 +273,10 @@ impl TunnelTest {
});
}
Transition::DeployNewRelays(new_relays) => {
for relay in state.relays.values() {
state.network.remove_host(relay);
}
// If we are connected to the portal, we will learn, which ones went down, i.e. `relays_presence`.
let to_remove = state.relays.keys().copied().collect();
let online = new_relays
.into_iter()
.map(|(rid, relay)| (rid, relay.map(SimRelay::new, debug_span!("relay", %rid))))
.collect::<BTreeMap<_, _>>();
for (rid, relay) in &online {
debug_assert!(state.network.add_host(*rid, relay));
}
state.client.exec_mut(|c| {
c.update_relays(state.relays.keys().copied(), online.iter(), now);
});
for gateway in state.gateways.values_mut() {
gateway.exec_mut(|g| {
g.update_relays(state.relays.keys().copied(), online.iter(), now)
});
}
state.relays = online; // Override all relays.
state.deploy_new_relays(new_relays, now, to_remove);
}
Transition::Idle => {
const IDLE_DURATION: Duration = Duration::from_secs(6 * 60); // Ensure idling twice in a row puts us in the 10-15 minute window where TURN data channels are cooling down.
@@ -334,6 +316,8 @@ impl TunnelTest {
// 2. Advance state to ensure this is reflected.
state.advance(ref_state, &mut buffered_transmits);
let now = state.flux_capacitor.now();
// 3. Reconnect all relays.
state
.client
@@ -342,6 +326,12 @@ impl TunnelTest {
gateway.exec_mut(|g| g.update_relays(iter::empty(), state.relays.iter(), now));
}
}
Transition::RebootRelaysWhilePartitioned(new_relays) => {
// If we are partitioned from the portal, we will only learn which relays to use, potentially replacing existing ones.
let to_remove = Vec::default();
state.deploy_new_relays(new_relays, now, to_remove);
}
};
state.advance(ref_state, &mut buffered_transmits);
@@ -390,6 +380,24 @@ impl TunnelTest {
self.handle_timeout(&ref_state.global_dns_records, buffered_transmits);
let now = self.flux_capacitor.now();
for (id, gateway) in self.gateways.iter_mut() {
let Some(event) = gateway.exec_mut(|g| g.sut.poll_event()) else {
continue;
};
on_gateway_event(*id, event, &mut self.client, now);
continue 'outer;
}
if let Some(event) = self.client.exec_mut(|c| c.sut.poll_event()) {
self.on_client_event(
self.client.inner().id,
event,
&ref_state.portal,
&ref_state.global_dns_records,
);
continue;
}
for (_, relay) in self.relays.iter_mut() {
let Some(message) = relay.exec_mut(|r| r.sut.next_command()) else {
continue;
@@ -435,28 +443,10 @@ impl TunnelTest {
continue 'outer;
}
for (id, gateway) in self.gateways.iter_mut() {
let Some(event) = gateway.exec_mut(|g| g.sut.poll_event()) else {
continue;
};
on_gateway_event(*id, event, &mut self.client, now);
continue 'outer;
}
if let Some(transmit) = self.client.exec_mut(|sim| sim.sut.poll_transmit()) {
buffered_transmits.push_from(transmit, &self.client, now);
continue;
}
if let Some(event) = self.client.exec_mut(|c| c.sut.poll_event()) {
self.on_client_event(
self.client.inner().id,
event,
&ref_state.portal,
&ref_state.global_dns_records,
);
continue;
}
self.client.exec_mut(|sim| {
while let Some(packet) = sim.sut.poll_packets() {
sim.on_received_packet(packet)
@@ -654,7 +644,7 @@ impl TunnelTest {
gateway.exec_mut(|g| {
for candidate in candidates {
g.sut.remove_ice_candidate(src, candidate)
g.sut.remove_ice_candidate(src, candidate, now)
}
})
}
@@ -797,6 +787,34 @@ impl TunnelTest {
}
}
}
fn deploy_new_relays(
&mut self,
new_relays: BTreeMap<RelayId, Host<u64>>,
now: Instant,
to_remove: Vec<RelayId>,
) {
for relay in self.relays.values() {
self.network.remove_host(relay);
}
let online = new_relays
.into_iter()
.map(|(rid, relay)| (rid, relay.map(SimRelay::new, debug_span!("relay", %rid))))
.collect::<BTreeMap<_, _>>();
for (rid, relay) in &online {
debug_assert!(self.network.add_host(*rid, relay));
}
self.client.exec_mut(|c| {
c.update_relays(to_remove.iter().copied(), online.iter(), now);
});
for gateway in self.gateways.values_mut() {
gateway.exec_mut(|g| g.update_relays(to_remove.iter().copied(), online.iter(), now));
}
self.relays = online; // Override all relays.
}
}
fn on_gateway_event(
@@ -813,7 +831,7 @@ fn on_gateway_event(
}),
GatewayEvent::RemovedIceCandidates { candidates, .. } => client.exec_mut(|c| {
for candidate in candidates {
c.sut.remove_ice_candidate(src, candidate)
c.sut.remove_ice_candidate(src, candidate, now)
}
}),
GatewayEvent::RefreshDns { .. } => todo!(),

View File

@@ -84,6 +84,11 @@ pub(crate) enum Transition {
/// Idle connlib for a while.
Idle,
/// Simulate all relays rebooting while we are network partitioned from the portal.
///
/// In this case, we won't receive a `relays_presence` but instead we will receive relays with the same ID yet different credentials.
RebootRelaysWhilePartitioned(BTreeMap<RelayId, Host<u64>>),
}
#[derive(Debug, Clone)]