test(connlib): introduce network latency to tunnel_test (#5948)

Currently, `tunnel_test` executes all actions within the same `Instant`,
i.e. time is never advanced by itself. The difficulty with advancing
time compared to other actions like sending packets is that all
time-related actions "overlap". In other words, all timers within
connlib advance at the same time. This makes it difficult to model the
expected behaviour after a certain amount of time has passed as we'd
effectively need to model all timers and their relation to particular
actions (like resending of connection intents or STUN requests).

Instead of only advancing time by itself, we can model some aspect of it
by introducing latency on network messages. This allows us to define a
range of an "acceptable" network latency within everything is expected
to work.

Whilst this doesn't cover all failure cases, it gives us a solid
foundation of parameters within which we should not expect any
operational problems.
This commit is contained in:
Thomas Eizinger
2024-07-24 14:01:50 +10:00
committed by GitHub
parent 6d09344521
commit 7c8bbd550b
16 changed files with 493 additions and 230 deletions

View File

@@ -94,6 +94,7 @@ jobs:
# Needed to create tunnel interfaces in unit tests
CARGO_TARGET_X86_64_UNKNOWN_LINUX_GNU_RUNNER: "sudo --preserve-env"
PROPTEST_VERBOSE: 0 # Otherwise the output is very long.
CARGO_PROFILE_TEST_OPT_LEVEL: 1 # Otherwise the tests take forever.
name: "cargo test"
shell: bash

1
rust/Cargo.lock generated
View File

@@ -2065,6 +2065,7 @@ dependencies = [
"thiserror",
"tokio",
"tracing",
"tracing-appender",
"tracing-subscriber",
"tun",
]

View File

@@ -1203,7 +1203,7 @@ pub enum Event<TId> {
ConnectionClosed(TId),
}
#[derive(Clone, PartialEq)]
#[derive(Clone, PartialEq, PartialOrd, Eq, Ord)]
pub struct Transmit<'a> {
/// The local interface from which this packet should be sent.
///

View File

@@ -44,6 +44,7 @@ proptest-state-machine = "0.3"
rand = "0.8"
serde_json = "1.0"
test-strategy = "0.3.1"
tracing-appender = "0.2.3"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
[features]

File diff suppressed because one or more lines are too long

View File

@@ -2,8 +2,11 @@ use crate::tests::sut::TunnelTest;
use proptest::test_runner::Config;
mod assertions;
mod buffered_transmits;
mod composite_strategy;
mod flux_capacitor;
mod reference;
mod run_count_appender;
mod sim_client;
mod sim_gateway;
mod sim_net;

View File

@@ -0,0 +1,139 @@
use super::sim_net::Host;
use snownet::Transmit;
use std::{
cmp::Reverse,
collections::BinaryHeap,
time::{Duration, Instant},
};
/// A buffer for network packets that need to be handled at a certain point in time.
#[derive(Debug, Clone, Default)]
pub(crate) struct BufferedTransmits {
// Transmits are stored in reverse ordering to emit the earliest first.
inner: BinaryHeap<Reverse<ByTime<Transmit<'static>>>>,
}
#[derive(Clone, Debug, PartialEq, PartialOrd, Eq, Ord)]
struct ByTime<T> {
at: Instant,
value: T,
}
impl BufferedTransmits {
/// Pushes a new [`Transmit`] from a given [`Host`].
pub(crate) fn push_from<T>(
&mut self,
transmit: impl Into<Option<Transmit<'static>>>,
sending_host: &Host<T>,
now: Instant,
) {
let Some(transmit) = transmit.into() else {
return;
};
if transmit.src.is_some() {
self.push(transmit, sending_host.latency(), now);
return;
}
// The `src` of a [`Transmit`] is empty if we want to send if via the default interface.
// In production, the kernel does this for us.
// In this test, we need to always set a `src` so that the remote peer knows where the packet is coming from.
let Some(src) = sending_host.sending_socket_for(transmit.dst.ip()) else {
tracing::debug!(dst = %transmit.dst, "No socket");
return;
};
self.push(
Transmit {
src: Some(src),
..transmit
},
sending_host.latency(),
now,
);
}
pub(crate) fn push(
&mut self,
transmit: impl Into<Option<Transmit<'static>>>,
latency: Duration,
now: Instant,
) {
let Some(transmit) = transmit.into() else {
return;
};
debug_assert!(transmit.src.is_some(), "src must be set for `push`");
self.inner.push(Reverse(ByTime {
at: now + latency,
value: transmit,
}));
}
pub(crate) fn pop(&mut self, now: Instant) -> Option<Transmit<'static>> {
let next = self.inner.peek()?.0.at;
if next > now {
return None;
}
let next = self.inner.pop().unwrap().0;
Some(next.value)
}
pub(crate) fn is_empty(&self) -> bool {
self.inner.is_empty()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
#[test]
fn by_time_orders_from_earliest_to_latest() {
let mut heap = BinaryHeap::new();
let start = Instant::now();
heap.push(ByTime {
at: start + Duration::from_secs(1),
value: 1,
});
heap.push(ByTime {
at: start,
value: 0,
});
heap.push(ByTime {
at: start + Duration::from_secs(2),
value: 2,
});
assert_eq!(
heap.pop().unwrap(),
ByTime {
at: start + Duration::from_secs(2),
value: 2
},
);
assert_eq!(
heap.pop().unwrap(),
ByTime {
at: start + Duration::from_secs(1),
value: 1
}
);
assert_eq!(
heap.pop().unwrap(),
ByTime {
at: start,
value: 0
}
);
}
}

View File

@@ -0,0 +1,89 @@
use chrono::{DateTime, Utc};
use std::{
fmt,
sync::{Arc, Mutex},
time::{Duration, Instant},
};
use tracing_subscriber::fmt::{format::Writer, time::FormatTime};
/// A device that allows us to travel into the future.
#[derive(Debug, Clone)]
pub(crate) struct FluxCapacitor {
start: Instant,
now: Arc<Mutex<(Instant, DateTime<Utc>)>>,
}
impl FormatTime for FluxCapacitor {
fn format_time(&self, w: &mut Writer<'_>) -> fmt::Result {
let e = self.elapsed();
write!(w, "{:3}.{:03}s", e.as_secs(), e.subsec_millis())
}
}
impl Default for FluxCapacitor {
fn default() -> Self {
let start = Instant::now();
let utc_start = Utc::now();
Self {
start,
now: Arc::new(Mutex::new((start, utc_start))),
}
}
}
impl FluxCapacitor {
const SMALL_TICK: Duration = Duration::from_millis(10);
const LARGE_TICK: Duration = Duration::from_millis(100);
#[allow(private_bounds)]
pub(crate) fn now<T>(&self) -> T
where
T: PickNow,
{
let (now, utc_now) = *self.now.lock().unwrap();
T::pick_now(now, utc_now)
}
pub(crate) fn small_tick(&self) {
self.tick(Self::SMALL_TICK);
}
pub(crate) fn large_tick(&self) {
self.tick(Self::LARGE_TICK);
}
fn tick(&self, tick: Duration) {
{
let mut guard = self.now.lock().unwrap();
guard.0 += tick;
guard.1 += tick;
}
if self.elapsed().subsec_millis() == 0 {
tracing::trace!("Tick");
}
}
fn elapsed(&self) -> Duration {
self.now::<Instant>().duration_since(self.start)
}
}
trait PickNow {
fn pick_now(now: Instant, utc_now: DateTime<Utc>) -> Self;
}
impl PickNow for Instant {
fn pick_now(now: Instant, _: DateTime<Utc>) -> Self {
now
}
}
impl PickNow for DateTime<Utc> {
fn pick_now(_: Instant, utc_now: DateTime<Utc>) -> Self {
utc_now
}
}

View File

@@ -3,7 +3,6 @@ use super::{
strategies::*, stub_portal::StubPortal, transition::*,
};
use crate::dns::is_subdomain;
use chrono::{DateTime, Utc};
use connlib_shared::{
messages::{
client::{self, ResourceDescription},
@@ -20,7 +19,6 @@ use std::{
collections::{BTreeMap, HashSet},
fmt, iter,
net::IpAddr,
time::Instant,
};
/// The reference state machine of the tunnel.
@@ -28,8 +26,6 @@ use std::{
/// This is the "expected" part of our test.
#[derive(Clone, Debug)]
pub(crate) struct ReferenceState {
pub(crate) now: Instant,
pub(crate) utc_now: DateTime<Utc>,
pub(crate) client: Host<RefClient>,
pub(crate) gateways: BTreeMap<GatewayId, Host<RefGateway>>,
pub(crate) relays: BTreeMap<RelayId, Host<u64>>,
@@ -69,8 +65,6 @@ impl ReferenceStateMachine for ReferenceState {
collection::btree_map(relay_id(), relay_prototype(), 1..=2),
global_dns_records(), // Start out with a set of global DNS records so we have something to resolve outside of DNS resources.
any::<bool>(),
Just(Instant::now()),
Just(Utc::now()),
)
.prop_filter_map(
"network IPs must be unique",
@@ -80,8 +74,6 @@ impl ReferenceStateMachine for ReferenceState {
relays,
mut global_dns,
drop_direct_client_traffic,
now,
utc_now,
)| {
let mut routing_table = RoutingTable::default();
@@ -110,15 +102,13 @@ impl ReferenceStateMachine for ReferenceState {
portal,
global_dns,
drop_direct_client_traffic,
now,
utc_now,
routing_table,
))
},
)
.prop_filter(
"private keys must be unique",
|(c, gateways, _, _, _, _, _, _, _)| {
|(c, gateways, _, _, _, _, _)| {
let different_keys = gateways
.iter()
.map(|(_, g)| g.inner().key)
@@ -136,13 +126,9 @@ impl ReferenceStateMachine for ReferenceState {
portal,
global_dns_records,
drop_direct_client_traffic,
now,
utc_now,
network,
)| {
Self {
now,
utc_now,
client,
gateways,
relays,

View File

@@ -0,0 +1,11 @@
use std::sync::atomic::AtomicU32;
use tracing_appender::rolling::RollingFileAppender;
/// A file appender that rolls over to a new file for every instance that is created within the same process.
#[allow(dead_code)]
pub(crate) fn appender() -> RollingFileAppender {
static RUN_COUNT: AtomicU32 = AtomicU32::new(0);
let run_count = RUN_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
tracing_appender::rolling::never(".", format!("run_{run_count:04}.log"))
}

View File

@@ -1,7 +1,7 @@
use super::{
reference::{private_key, PrivateKey, ResourceDst},
sim_net::{any_ip_stack, any_port, host, Host},
strategies::{system_dns_servers, upstream_dns_servers},
strategies::{latency, system_dns_servers, upstream_dns_servers},
sut::domain_to_hickory_name,
IcmpIdentifier, IcmpSeq, QueryId,
};
@@ -144,17 +144,14 @@ impl SimClient {
Some(self.sut.encapsulate(packet, now)?.into_owned())
}
pub(crate) fn handle_packet(
&mut self,
payload: &[u8],
src: SocketAddr,
dst: SocketAddr,
now: Instant,
) {
let Some(packet) = self
.sut
.decapsulate(dst, src, payload, now, &mut self.buffer)
else {
pub(crate) fn receive(&mut self, transmit: Transmit, now: Instant) {
let Some(packet) = self.sut.decapsulate(
transmit.dst,
transmit.src.unwrap(),
&transmit.payload,
now,
&mut self.buffer,
) else {
return;
};
let packet = packet.to_owned();
@@ -593,6 +590,7 @@ pub(crate) fn ref_client_host(
any_ip_stack(),
any_port(),
ref_client(tunnel_ip4s, tunnel_ip6s),
latency(2000), // Clients might have a horrible Internet connection.
)
.prop_filter("at least one DNS server needs to be reachable", |host| {
// TODO: PRODUCTION CODE DOES NOT HANDLE THIS!

View File

@@ -1,6 +1,7 @@
use super::{
reference::{private_key, PrivateKey},
sim_net::{any_port, dual_ip_stack, host, Host},
strategies::latency,
};
use crate::{tests::sut::hickory_name_to_domain, GatewayState};
use connlib_shared::DomainName;
@@ -9,7 +10,7 @@ use proptest::prelude::*;
use snownet::Transmit;
use std::{
collections::{BTreeMap, HashSet, VecDeque},
net::{IpAddr, SocketAddr},
net::IpAddr,
time::Instant,
};
@@ -31,17 +32,21 @@ impl SimGateway {
}
}
pub(crate) fn handle_packet(
pub(crate) fn receive(
&mut self,
global_dns_records: &BTreeMap<DomainName, HashSet<IpAddr>>,
payload: &[u8],
src: SocketAddr,
dst: SocketAddr,
transmit: Transmit,
now: Instant,
) -> Option<Transmit<'static>> {
let packet = self
.sut
.decapsulate(dst, src, payload, now, &mut self.buffer)?
.decapsulate(
transmit.dst,
transmit.src.unwrap(),
&transmit.payload,
now,
&mut self.buffer,
)?
.to_owned();
self.on_received_packet(global_dns_records, packet, now)
@@ -99,7 +104,12 @@ impl RefGateway {
}
pub(crate) fn ref_gateway_host() -> impl Strategy<Value = Host<RefGateway>> {
host(dual_ip_stack(), any_port(), ref_gateway())
host(
dual_ip_stack(),
any_port(),
ref_gateway(),
latency(200), // We assume gateways have a somewhat decent Internet connection.
)
}
fn ref_gateway() -> impl Strategy<Value = RefGateway> {

View File

@@ -1,3 +1,4 @@
use crate::tests::buffered_transmits::BufferedTransmits;
use crate::tests::strategies::documentation_ip6s;
use connlib_shared::messages::{ClientId, GatewayId, RelayId};
use firezone_relay::{AddressFamily, IpStack};
@@ -6,11 +7,13 @@ use ip_network_table::IpNetworkTable;
use itertools::Itertools as _;
use prop::sample;
use proptest::prelude::*;
use snownet::Transmit;
use std::{
collections::HashSet,
fmt,
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
num::NonZeroU16,
time::{Duration, Instant},
};
use tracing::Span;
@@ -29,12 +32,20 @@ pub(crate) struct Host<T> {
default_port: u16,
allocated_ports: HashSet<(u16, AddressFamily)>,
// The latency of incoming and outgoing packets.
latency: Duration,
#[derivative(Debug = "ignore")]
span: Span,
/// Messages that have "arrived" and are waiting to be dispatched.
///
/// We buffer them here because we need also apply our latency on inbound packets.
inbox: BufferedTransmits,
}
impl<T> Host<T> {
pub(crate) fn new(inner: T) -> Self {
pub(crate) fn new(inner: T, latency: Duration) -> Self {
Self {
inner,
ip4: None,
@@ -43,6 +54,8 @@ impl<T> Host<T> {
default_port: 0,
allocated_ports: HashSet::default(),
old_ports: HashSet::default(),
latency,
inbox: BufferedTransmits::default(),
}
}
@@ -105,6 +118,18 @@ impl<T> Host<T> {
IpAddr::V6(src) => self.ip6.is_some_and(|v6| v6 == src),
}
}
pub(crate) fn latency(&self) -> Duration {
self.latency
}
pub(crate) fn receive(&mut self, transmit: Transmit<'static>, now: Instant) {
self.inbox.push(transmit, self.latency, now);
}
pub(crate) fn poll_transmit(&mut self, now: Instant) -> Option<Transmit<'static>> {
self.inbox.pop(now)
}
}
impl<T> Host<T>
@@ -124,6 +149,8 @@ where
default_port: self.default_port,
allocated_ports: self.allocated_ports.clone(),
old_ports: self.old_ports.clone(),
latency: self.latency,
inbox: self.inbox.clone(),
}
}
}
@@ -241,12 +268,13 @@ pub(crate) fn host<T>(
socket_ips: impl Strategy<Value = IpStack>,
default_port: impl Strategy<Value = u16>,
state: impl Strategy<Value = T>,
latency: impl Strategy<Value = Duration>,
) -> impl Strategy<Value = Host<T>>
where
T: fmt::Debug,
{
(state, socket_ips, default_port).prop_map(move |(state, ip_stack, port)| {
let mut host = Host::new(state);
(state, socket_ips, default_port, latency).prop_map(move |(state, ip_stack, port, latency)| {
let mut host = Host::new(state, latency);
host.update_interface(ip_stack.as_v4().copied(), ip_stack.as_v6().copied(), port);
host

View File

@@ -1,4 +1,7 @@
use super::sim_net::{dual_ip_stack, host, Host};
use super::{
sim_net::{dual_ip_stack, host, Host},
strategies::latency,
};
use connlib_shared::messages::RelayId;
use firezone_relay::{AddressFamily, AllocationPort, ClientSocket, IpStack, PeerSocket};
use proptest::prelude::*;
@@ -82,13 +85,15 @@ impl SimRelay {
}
}
pub(crate) fn handle_packet(
pub(crate) fn receive(
&mut self,
payload: &[u8],
sender: SocketAddr,
dst: SocketAddr,
transmit: Transmit,
now: Instant,
) -> Option<Transmit<'static>> {
let dst = transmit.dst;
let payload = &transmit.payload;
let sender = transmit.src.unwrap();
if self
.matching_listen_socket(dst, self.sut.public_address())
.is_some_and(|s| s == dst)
@@ -196,5 +201,6 @@ pub(crate) fn relay_prototype() -> impl Strategy<Value = Host<u64>> {
dual_ip_stack(), // For this test, our relays always run in dual-stack mode to ensure connectivity!
Just(3478),
any::<u64>(),
latency(50), // We assume our relays have a good Internet connection.
)
}

View File

@@ -22,6 +22,7 @@ use std::{
collections::{BTreeMap, HashMap, HashSet},
net::{IpAddr, Ipv4Addr, Ipv6Addr},
str::FromStr as _,
time::Duration,
};
pub(crate) fn upstream_dns_servers() -> impl Strategy<Value = Vec<DnsServer>> {
@@ -93,6 +94,10 @@ pub(crate) fn tunnel_ip6s() -> impl Iterator<Item = Ipv6Addr> {
.map(|n| n.network_address())
}
pub(crate) fn latency(max: u64) -> impl Strategy<Value = Duration> {
(10..max).prop_map(Duration::from_millis)
}
/// A [`Strategy`] for sampling a set of gateways and a corresponding [`StubPortal`] that has a set of [`Site`]s configured with those gateways.
///
/// Similar as in production, the portal holds a list of DNS and CIDR resources (those are also sampled from the given sites).

View File

@@ -1,3 +1,4 @@
use super::buffered_transmits::BufferedTransmits;
use super::reference::ReferenceState;
use super::sim_client::SimClient;
use super::sim_gateway::SimGateway;
@@ -6,10 +7,11 @@ use super::sim_relay::SimRelay;
use super::stub_portal::StubPortal;
use crate::dns::is_subdomain;
use crate::tests::assertions::*;
use crate::tests::flux_capacitor::FluxCapacitor;
use crate::tests::sim_relay::map_explode;
use crate::tests::transition::Transition;
use crate::utils::earliest;
use crate::{dns::DnsQuery, ClientEvent, GatewayEvent, Request};
use chrono::{DateTime, Utc};
use connlib_shared::messages::client::ResourceDescription;
use connlib_shared::{
messages::{ClientId, GatewayId, Interface, RelayId},
@@ -23,8 +25,13 @@ use hickory_resolver::lookup::Lookup;
use proptest_state_machine::{ReferenceStateMachine, StateMachineTest};
use secrecy::ExposeSecret as _;
use snownet::Transmit;
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::{collections::HashSet, net::IpAddr, str::FromStr as _, sync::Arc, time::Instant};
use std::{
collections::{BTreeMap, BTreeSet, HashSet},
net::IpAddr,
str::FromStr as _,
sync::Arc,
time::{Duration, Instant},
};
use tracing::debug_span;
use tracing::subscriber::DefaultGuard;
use tracing_subscriber::layer::SubscriberExt;
@@ -35,8 +42,7 @@ use tracing_subscriber::{util::SubscriberInitExt as _, EnvFilter};
///
/// [`proptest`] manipulates this using [`Transition`]s and we assert it against [`ReferenceState`].
pub(crate) struct TunnelTest {
now: Instant,
utc_now: DateTime<Utc>,
flux_capacitor: FluxCapacitor,
client: Host<SimClient>,
gateways: BTreeMap<GatewayId, Host<SimGateway>>,
@@ -57,8 +63,12 @@ impl StateMachineTest for TunnelTest {
fn init_test(
ref_state: &<Self::Reference as ReferenceStateMachine>::State,
) -> Self::SystemUnderTest {
let flux_capacitor = FluxCapacitor::default();
let logger = tracing_subscriber::fmt()
.with_test_writer()
// .with_writer(crate::tests::run_count_appender::appender()) // Useful for diffing logs between runs.
.with_timer(flux_capacitor.clone())
.with_env_filter(EnvFilter::from_default_env())
.finish()
.set_default();
@@ -96,7 +106,7 @@ impl StateMachineTest for TunnelTest {
c.sut.update_relays(
BTreeSet::default(),
BTreeSet::from_iter(map_explode(relays.iter(), "client")),
ref_state.now,
flux_capacitor.now(),
)
});
for (id, gateway) in &mut gateways {
@@ -104,14 +114,13 @@ impl StateMachineTest for TunnelTest {
g.sut.update_relays(
BTreeSet::default(),
BTreeSet::from_iter(map_explode(relays.iter(), &format!("gateway_{id}"))),
ref_state.now,
flux_capacitor.now(),
)
});
}
let mut this = Self {
now: ref_state.now,
utc_now: ref_state.utc_now,
flux_capacitor,
network: ref_state.network.clone(),
drop_direct_client_traffic: ref_state.drop_direct_client_traffic,
client,
@@ -123,8 +132,6 @@ impl StateMachineTest for TunnelTest {
let mut buffered_transmits = BufferedTransmits::default();
this.advance(ref_state, &mut buffered_transmits); // Perform initial setup before we apply the first transition.
debug_assert!(buffered_transmits.is_empty());
this
}
@@ -135,6 +142,7 @@ impl StateMachineTest for TunnelTest {
transition: <Self::Reference as ReferenceStateMachine>::Transition,
) -> Self::SystemUnderTest {
let mut buffered_transmits = BufferedTransmits::default();
let now = state.flux_capacitor.now();
// Act: Apply the transition
match transition {
@@ -176,11 +184,9 @@ impl StateMachineTest for TunnelTest {
} => {
let packet = ip_packet::make::icmp_request_packet(src, dst, seq, identifier);
let transmit = state
.client
.exec_mut(|sim| sim.encapsulate(packet, state.now));
let transmit = state.client.exec_mut(|sim| sim.encapsulate(packet, now));
buffered_transmits.push(transmit, &state.client);
buffered_transmits.push_from(transmit, &state.client, now);
}
Transition::SendICMPPacketToDnsResource {
src,
@@ -207,9 +213,9 @@ impl StateMachineTest for TunnelTest {
let transmit = state
.client
.exec_mut(|sim| Some(sim.encapsulate(packet, state.now)?.into_owned()));
.exec_mut(|sim| Some(sim.encapsulate(packet, now)?.into_owned()));
buffered_transmits.push(transmit, &state.client);
buffered_transmits.push_from(transmit, &state.client, now);
}
Transition::SendDnsQuery {
domain,
@@ -218,10 +224,10 @@ impl StateMachineTest for TunnelTest {
dns_server,
} => {
let transmit = state.client.exec_mut(|sim| {
sim.send_dns_query_for(domain, r_type, query_id, dns_server, state.now)
sim.send_dns_query_for(domain, r_type, query_id, dns_server, now)
});
buffered_transmits.push(transmit, &state.client);
buffered_transmits.push_from(transmit, &state.client, now);
}
Transition::UpdateSystemDnsServers { servers } => {
state
@@ -251,7 +257,7 @@ impl StateMachineTest for TunnelTest {
c.sut.update_relays(
BTreeSet::default(),
BTreeSet::from_iter(map_explode(state.relays.iter(), "client")),
ref_state.now,
now,
);
c.sut
.set_resources(ref_state.client.inner().all_resources());
@@ -271,14 +277,12 @@ impl StateMachineTest for TunnelTest {
ipv6,
upstream_dns,
});
c.sut
.update_relays(BTreeSet::default(), relays, ref_state.now);
c.sut.update_relays(BTreeSet::default(), relays, now);
c.sut.set_resources(all_resources);
});
}
};
state.advance(ref_state, &mut buffered_transmits);
assert!(buffered_transmits.is_empty()); // Sanity check to ensure we handled all packets.
state
}
@@ -292,6 +296,7 @@ impl StateMachineTest for TunnelTest {
.with(
tracing_subscriber::fmt::layer()
.with_test_writer()
.with_timer(state.flux_capacitor.clone())
.with_filter(EnvFilter::from_default_env()),
)
.with(PanicOnErrorEvents::default()) // Temporarily install a layer that panics when `_guard` goes out of scope if any of our assertions emitted an error.
@@ -330,15 +335,71 @@ impl TunnelTest {
/// Dispatching a [`Transmit`] (read: packet) to a host can trigger more packets, i.e. receiving a STUN request may trigger a STUN response.
///
/// Consequently, this function needs to loop until no host can make progress at which point we consider the [`Transition`] complete.
///
/// At most, we will spend 10s of "simulation time" advancing the state.
fn advance(&mut self, ref_state: &ReferenceState, buffered_transmits: &mut BufferedTransmits) {
'outer: loop {
if let Some(transmit) = buffered_transmits.pop() {
self.dispatch_transmit(transmit, buffered_transmits, &ref_state.global_dns_records);
continue;
let cut_off = self.flux_capacitor.now::<Instant>() + Duration::from_secs(10);
'outer: while self.flux_capacitor.now::<Instant>() < cut_off {
self.handle_timeout(&ref_state.global_dns_records, buffered_transmits);
let now = self.flux_capacitor.now();
for (_, relay) in self.relays.iter_mut() {
let Some(message) = relay.exec_mut(|r| r.sut.next_command()) else {
continue;
};
match message {
firezone_relay::Command::SendMessage { payload, recipient } => {
let dst = recipient.into_socket();
let src = relay
.sending_socket_for(dst.ip())
.expect("relay to never emit packets without a matching socket");
buffered_transmits.push_from(
Transmit {
src: Some(src),
dst,
payload: payload.into(),
},
relay,
now,
);
}
firezone_relay::Command::CreateAllocation { port, family } => {
relay.allocate_port(port.value(), family);
relay.exec_mut(|r| r.allocations.insert((family, port)));
}
firezone_relay::Command::FreeAllocation { port, family } => {
relay.deallocate_port(port.value(), family);
relay.exec_mut(|r| r.allocations.remove(&(family, port)));
}
}
continue 'outer;
}
for (_, gateway) in self.gateways.iter_mut() {
let Some(transmit) = gateway.exec_mut(|g| g.sut.poll_transmit()) else {
continue;
};
buffered_transmits.push_from(transmit, gateway, now);
continue 'outer;
}
for (id, gateway) in self.gateways.iter_mut() {
let Some(event) = gateway.exec_mut(|g| g.sut.poll_event()) else {
continue;
};
on_gateway_event(*id, event, &mut self.client, now);
continue 'outer;
}
if let Some(transmit) = self.client.exec_mut(|sim| sim.sut.poll_transmit()) {
buffered_transmits.push(transmit, &self.client);
buffered_transmits.push_from(transmit, &self.client, now);
continue;
}
if let Some(event) = self.client.exec_mut(|c| c.sut.poll_event()) {
@@ -360,105 +421,81 @@ impl TunnelTest {
}
});
for (_, gateway) in self.gateways.iter_mut() {
let Some(transmit) = gateway.exec_mut(|g| g.sut.poll_transmit()) else {
continue;
};
buffered_transmits.push(transmit, gateway);
continue 'outer;
}
for (id, gateway) in self.gateways.iter_mut() {
let Some(event) = gateway.exec_mut(|g| g.sut.poll_event()) else {
continue;
};
on_gateway_event(*id, event, &mut self.client, self.now);
continue 'outer;
}
for (_, relay) in self.relays.iter_mut() {
let Some(message) = relay.exec_mut(|r| r.sut.next_command()) else {
continue;
};
match message {
firezone_relay::Command::SendMessage { payload, recipient } => {
let dst = recipient.into_socket();
let src = relay
.sending_socket_for(dst.ip())
.expect("relay to never emit packets without a matching socket");
buffered_transmits.push(
Transmit {
src: Some(src),
dst,
payload: payload.into(),
},
relay,
);
}
firezone_relay::Command::CreateAllocation { port, family } => {
relay.allocate_port(port.value(), family);
relay.exec_mut(|r| r.allocations.insert((family, port)));
}
firezone_relay::Command::FreeAllocation { port, family } => {
relay.deallocate_port(port.value(), family);
relay.exec_mut(|r| r.allocations.remove(&(family, port)));
}
}
continue 'outer;
}
if self.handle_timeout(self.now, self.utc_now) {
if let Some(transmit) = buffered_transmits.pop(now) {
self.dispatch_transmit(transmit);
continue;
}
break;
if !buffered_transmits.is_empty() {
self.flux_capacitor.small_tick(); // Small tick to get to the next transmit.
continue;
}
let Some(time_to_next_action) = self.poll_timeout() else {
break; // Nothing to do.
};
if time_to_next_action > cut_off {
break; // Nothing to do before cut-off.
}
self.flux_capacitor.large_tick(); // Large tick to more quickly advance to potential next timeout.
}
}
/// Forwards time to the given instant iff the corresponding host would like that (i.e. returns a timestamp <= from `poll_timeout`).
///
/// Tying the forwarding of time to the result of `poll_timeout` gives us better coverage because in production, we suspend until the value of `poll_timeout`.
fn handle_timeout(&mut self, now: Instant, utc_now: DateTime<Utc>) -> bool {
let mut any_advanced = false;
fn handle_timeout(
&mut self,
global_dns_records: &BTreeMap<DomainName, HashSet<IpAddr>>,
buffered_transmits: &mut BufferedTransmits,
) {
let now = self.flux_capacitor.now();
if self
.client
.exec_mut(|c| c.sut.poll_timeout())
.is_some_and(|t| t <= now)
{
any_advanced = true;
self.client.exec_mut(|c| c.sut.handle_timeout(now));
};
while let Some(transmit) = self.client.poll_transmit(now) {
self.client.exec_mut(|c| c.receive(transmit, now))
}
self.client.exec_mut(|c| c.sut.handle_timeout(now));
for (_, gateway) in self.gateways.iter_mut() {
if gateway
.exec_mut(|g| g.sut.poll_timeout())
.is_some_and(|t| t <= now)
{
any_advanced = true;
while let Some(transmit) = gateway.poll_transmit(now) {
let Some(reply) =
gateway.exec_mut(|g| g.receive(global_dns_records, transmit, now))
else {
continue;
};
gateway.exec_mut(|g| g.sut.handle_timeout(now, utc_now))
};
buffered_transmits.push_from(reply, gateway, now);
}
gateway.exec_mut(|g| g.sut.handle_timeout(now, self.flux_capacitor.now()));
}
for (_, relay) in self.relays.iter_mut() {
if relay
.exec_mut(|r| r.sut.poll_timeout())
.is_some_and(|t| t <= now)
{
any_advanced = true;
while let Some(transmit) = relay.poll_transmit(now) {
let Some(reply) = relay.exec_mut(|g| g.receive(transmit, now)) else {
continue;
};
relay.exec_mut(|r| r.sut.handle_timeout(now))
};
buffered_transmits.push_from(reply, relay, now);
}
relay.exec_mut(|r| r.sut.handle_timeout(now))
}
}
any_advanced
fn poll_timeout(&mut self) -> Option<Instant> {
let client = self.client.exec_mut(|c| c.sut.poll_timeout());
let gateway = self
.gateways
.values_mut()
.flat_map(|g| g.exec_mut(|g| g.sut.poll_timeout()))
.min();
let relay = self
.relays
.values_mut()
.flat_map(|r| r.exec_mut(|r| r.sut.poll_timeout()))
.min();
earliest(client, earliest(gateway, relay))
}
/// Dispatches a [`Transmit`] to the correct host.
@@ -467,17 +504,12 @@ impl TunnelTest {
/// It takes a [`Transmit`] and checks, which host accepts it, i.e. has configured the correct IP address.
///
/// Currently, the network topology of our tests are a single subnet without NAT.
fn dispatch_transmit(
&mut self,
transmit: Transmit,
buffered_transmits: &mut BufferedTransmits,
global_dns_records: &BTreeMap<DomainName, HashSet<IpAddr>>,
) {
fn dispatch_transmit(&mut self, transmit: Transmit<'static>) {
let src = transmit
.src
.expect("`src` should always be set in these tests");
let dst = transmit.dst;
let payload = &transmit.payload;
let now = self.flux_capacitor.now();
let Some(host) = self.network.host_by_ip(dst.ip()) else {
panic!("Unhandled packet: {src} -> {dst}")
@@ -488,41 +520,30 @@ impl TunnelTest {
if self.drop_direct_client_traffic
&& self.gateways.values().any(|g| g.is_sender(src.ip()))
{
tracing::debug!(%src, %dst, "Dropping direct traffic");
tracing::trace!(%src, %dst, "Dropping direct traffic");
return;
}
self.client
.exec_mut(|c| c.handle_packet(payload, src, dst, self.now));
self.client.receive(transmit, now);
}
HostId::Gateway(id) => {
if self.drop_direct_client_traffic && self.client.is_sender(src.ip()) {
tracing::debug!(%src, %dst, "Dropping direct traffic");
tracing::trace!(%src, %dst, "Dropping direct traffic");
return;
}
let gateway = self.gateways.get_mut(&id).expect("unknown gateway");
let Some(transmit) = gateway
.exec_mut(|g| g.handle_packet(global_dns_records, payload, src, dst, self.now))
else {
return;
};
buffered_transmits.push(transmit, gateway);
self.gateways
.get_mut(&id)
.expect("unknown gateway")
.receive(transmit, now);
}
HostId::Relay(id) => {
let relay = self.relays.get_mut(&id).expect("unknown relay");
let Some(transmit) =
relay.exec_mut(|r| r.handle_packet(payload, src, dst, self.now))
else {
return;
};
buffered_transmits.push(transmit, relay);
self.relays
.get_mut(&id)
.expect("unknown relay")
.receive(transmit, now);
}
HostId::Stale => {
tracing::debug!(%dst, "Dropping packet because host roamed away or is offline");
@@ -537,6 +558,8 @@ impl TunnelTest {
portal: &StubPortal,
global_dns_records: &BTreeMap<DomainName, HashSet<IpAddr>>,
) {
let now = self.flux_capacitor.now();
match event {
ClientEvent::AddedIceCandidates {
candidates,
@@ -546,7 +569,7 @@ impl TunnelTest {
gateway.exec_mut(|g| {
for candidate in candidates {
g.sut.add_ice_candidate(src, candidate, self.now)
g.sut.add_ice_candidate(src, candidate, now)
}
})
}
@@ -626,7 +649,7 @@ impl TunnelTest {
.map(|r| (r.name, r.proxy_ips)),
None, // TODO: How to generate expiry?
resource,
self.now,
now,
)
})
.unwrap();
@@ -642,7 +665,7 @@ impl TunnelTest {
},
resource_id,
gateway.inner().sut.public_key(),
self.now,
now,
)
})
.unwrap();
@@ -660,7 +683,7 @@ impl TunnelTest {
self.client.inner().id,
None,
reuse_connection.payload.map(|r| (r.name, r.proxy_ips)),
self.now,
now,
)
})
.unwrap();
@@ -696,7 +719,7 @@ impl TunnelTest {
self.client.inner().id,
None,
reuse_connection.payload.map(|r| (r.name, r.proxy_ips)),
self.now,
now,
)
})
.unwrap();
@@ -790,44 +813,3 @@ pub(crate) fn domain_to_hickory_name(domain: DomainName) -> hickory_proto::rr::N
name
}
#[derive(Debug, Default)]
struct BufferedTransmits {
inner: VecDeque<Transmit<'static>>,
}
impl BufferedTransmits {
fn push<T>(&mut self, transmit: impl Into<Option<Transmit<'static>>>, sending_host: &Host<T>) {
let Some(transmit) = transmit.into() else {
return;
};
if transmit.src.is_some() {
self.inner.push_back(transmit);
return;
}
// The `src` of a [`Transmit`] is empty if we want to send if via the default interface.
// In production, the kernel does this for us.
// In this test, we need to always set a `src` so that the remote peer knows where the packet is coming from.
let Some(src) = sending_host.sending_socket_for(transmit.dst.ip()) else {
tracing::debug!(dst = %transmit.dst, "No socket");
return;
};
self.inner.push_back(Transmit {
src: Some(src),
..transmit
});
}
fn pop(&mut self) -> Option<Transmit<'static>> {
self.inner.pop_front()
}
fn is_empty(&self) -> bool {
self.inner.is_empty()
}
}