From 7c8bbd550bf3cc31acf4e620b9883503f1807db4 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 24 Jul 2024 14:01:50 +1000 Subject: [PATCH] test(connlib): introduce network latency to `tunnel_test` (#5948) Currently, `tunnel_test` executes all actions within the same `Instant`, i.e. time is never advanced by itself. The difficulty with advancing time compared to other actions like sending packets is that all time-related actions "overlap". In other words, all timers within connlib advance at the same time. This makes it difficult to model the expected behaviour after a certain amount of time has passed as we'd effectively need to model all timers and their relation to particular actions (like resending of connection intents or STUN requests). Instead of only advancing time by itself, we can model some aspect of it by introducing latency on network messages. This allows us to define a range of an "acceptable" network latency within everything is expected to work. Whilst this doesn't cover all failure cases, it gives us a solid foundation of parameters within which we should not expect any operational problems. --- .github/workflows/_rust.yml | 1 + rust/Cargo.lock | 1 + rust/connlib/snownet/src/node.rs | 2 +- rust/connlib/tunnel/Cargo.toml | 1 + .../tunnel/proptest-regressions/tests.txt | 3 + rust/connlib/tunnel/src/tests.rs | 3 + .../tunnel/src/tests/buffered_transmits.rs | 139 +++++++ .../tunnel/src/tests/flux_capacitor.rs | 89 +++++ rust/connlib/tunnel/src/tests/reference.rs | 16 +- .../tunnel/src/tests/run_count_appender.rs | 11 + rust/connlib/tunnel/src/tests/sim_client.rs | 22 +- rust/connlib/tunnel/src/tests/sim_gateway.rs | 24 +- rust/connlib/tunnel/src/tests/sim_net.rs | 34 +- rust/connlib/tunnel/src/tests/sim_relay.rs | 16 +- rust/connlib/tunnel/src/tests/strategies.rs | 5 + rust/connlib/tunnel/src/tests/sut.rs | 356 +++++++++--------- 16 files changed, 493 insertions(+), 230 deletions(-) create mode 100644 rust/connlib/tunnel/src/tests/buffered_transmits.rs create mode 100644 rust/connlib/tunnel/src/tests/flux_capacitor.rs create mode 100644 rust/connlib/tunnel/src/tests/run_count_appender.rs diff --git a/.github/workflows/_rust.yml b/.github/workflows/_rust.yml index 75998c941..92c20b1b1 100644 --- a/.github/workflows/_rust.yml +++ b/.github/workflows/_rust.yml @@ -94,6 +94,7 @@ jobs: # Needed to create tunnel interfaces in unit tests CARGO_TARGET_X86_64_UNKNOWN_LINUX_GNU_RUNNER: "sudo --preserve-env" PROPTEST_VERBOSE: 0 # Otherwise the output is very long. + CARGO_PROFILE_TEST_OPT_LEVEL: 1 # Otherwise the tests take forever. name: "cargo test" shell: bash diff --git a/rust/Cargo.lock b/rust/Cargo.lock index e52a2a331..051dcc3a3 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -2065,6 +2065,7 @@ dependencies = [ "thiserror", "tokio", "tracing", + "tracing-appender", "tracing-subscriber", "tun", ] diff --git a/rust/connlib/snownet/src/node.rs b/rust/connlib/snownet/src/node.rs index 772e76f4b..111c558ff 100644 --- a/rust/connlib/snownet/src/node.rs +++ b/rust/connlib/snownet/src/node.rs @@ -1203,7 +1203,7 @@ pub enum Event { ConnectionClosed(TId), } -#[derive(Clone, PartialEq)] +#[derive(Clone, PartialEq, PartialOrd, Eq, Ord)] pub struct Transmit<'a> { /// The local interface from which this packet should be sent. /// diff --git a/rust/connlib/tunnel/Cargo.toml b/rust/connlib/tunnel/Cargo.toml index 186aa6b4d..93db08f80 100644 --- a/rust/connlib/tunnel/Cargo.toml +++ b/rust/connlib/tunnel/Cargo.toml @@ -44,6 +44,7 @@ proptest-state-machine = "0.3" rand = "0.8" serde_json = "1.0" test-strategy = "0.3.1" +tracing-appender = "0.2.3" tracing-subscriber = { version = "0.3", features = ["env-filter"] } [features] diff --git a/rust/connlib/tunnel/proptest-regressions/tests.txt b/rust/connlib/tunnel/proptest-regressions/tests.txt index 43690da50..55ba5aab9 100644 --- a/rust/connlib/tunnel/proptest-regressions/tests.txt +++ b/rust/connlib/tunnel/proptest-regressions/tests.txt @@ -57,3 +57,6 @@ cc cd529437ca42deb6fe43a3c3480aba20611ad23f3c1da91f4b4b19c8e9977edd # shrinks to cc f5a8e8fd890e576f208bcedf184f15be1163a70eb75c090bef33874973abc440 cc 03e85f5a4bc1e4d3eb44e2853e66e5ea9f89bb98f0d5a05bd5e9baa1b18def4f # shrinks to (initial_state, transitions, seen_counter) = (ReferenceState { now: Instant { tv_sec: 53253, tv_nsec: 940862355 }, utc_now: 2024-07-19T14:03:12.074710495Z, client: Host { inner: RefClient { id: ClientId(00000000-0000-0000-0000-000000000000), key: PrivateKey("0000000000000000000000000000000000000000000000000000000000000000"), known_hosts: {}, tunnel_ip4: 100.64.0.1, tunnel_ip6: fd00:2021:1111::, system_dns_resolvers: [0.0.0.0], upstream_dns_resolvers: [IpPort(IpDnsServer { address: 0.0.0.0:53 }), IpPort(IpDnsServer { address: [e44c:d620:c1e9:e326:361d:e6ad:1caf:25a2]:53 }), IpPort(IpDnsServer { address: [6603:13c5:6335:3984:4e59:6b0b:62a3:f068]:53 })], cidr_resources: {}, dns_resources: {}, dns_records: {}, connected_cidr_resources: {}, connected_dns_resources: {}, expected_icmp_handshakes: {}, expected_dns_handshakes: [] }, ip4: None, ip6: Some(2001:db80::7), old_ports: {0}, default_port: 46113, allocated_ports: {(46113, V6)} }, gateways: {GatewayId(dc30baba-eed8-0817-ec08-9af76ec6a705): Host { inner: RefGateway { key: PrivateKey("14afc0f1981355fe5efc1fc7986568a6f8eac259b7344c17a3be9a6a15cfaa44") }, ip4: Some(203.0.113.61), ip6: Some(2001:db80::29), old_ports: {0}, default_port: 47760, allocated_ports: {(47760, V4), (47760, V6)} }, GatewayId(351b6669-2ecc-d4ef-bfdb-ae7e67335f0f): Host { inner: RefGateway { key: PrivateKey("91e26f9b93c82a52358a25bfa38be88daeeb005ee76dbee1ee0eb670d3187fd0") }, ip4: Some(203.0.113.53), ip6: None, old_ports: {0}, default_port: 52416, allocated_ports: {(52416, V4)} }, GatewayId(986cb888-7e70-963c-2424-716987ce2b63): Host { inner: RefGateway { key: PrivateKey("46337def7696c2d69a86e69c37b9b5fc5a66a041665452bfb6871683f1c8623d") }, ip4: None, ip6: Some(2001:db80::45), old_ports: {0}, default_port: 44026, allocated_ports: {(44026, V6)} }}, relays: {RelayId(75d67559-7bfa-5a63-6a27-0e61219b0911): Host { inner: 17233114666030775146, ip4: Some(203.0.113.86), ip6: Some(2001:db80::8), old_ports: {0}, default_port: 3478, allocated_ports: {(3478, V4), (3478, V6)} }, RelayId(db5e85e5-c0d4-5ba8-5f2e-7edd8a257295): Host { inner: 10133780386335438457, ip4: Some(203.0.113.55), ip6: Some(2001:db80::24), old_ports: {0}, default_port: 3478, allocated_ports: {(3478, V6), (3478, V4)} }}, portal: StubPortal { gateways_by_site: {SiteId(dd8656de-e47d-c26a-c47c-313176f5d05e): {GatewayId(986cb888-7e70-963c-2424-716987ce2b63), GatewayId(dc30baba-eed8-0817-ec08-9af76ec6a705)}, SiteId(7d5bb747-fd32-ce19-9627-48579bc60347): {GatewayId(351b6669-2ecc-d4ef-bfdb-ae7e67335f0f)}}, sites_by_resource: {ResourceId(a5d7c295-c083-f35c-a7a6-bca3dc945df6): SiteId(dd8656de-e47d-c26a-c47c-313176f5d05e), ResourceId(00de7415-e811-9256-e3a0-c5f5c1c17575): SiteId(7d5bb747-fd32-ce19-9627-48579bc60347), ResourceId(96ec6dae-6a4b-b22c-e885-7e69b42d6bb9): SiteId(dd8656de-e47d-c26a-c47c-313176f5d05e)}, cidr_resources: {ResourceId(00de7415-e811-9256-e3a0-c5f5c1c17575): ResourceDescriptionCidr { id: ResourceId(00de7415-e811-9256-e3a0-c5f5c1c17575), address: V4(Ipv4Network { network_address: 127.0.0.0, netmask: 26 }), name: "pvwdyrsutv", address_description: None, sites: [Site { id: SiteId(7d5bb747-fd32-ce19-9627-48579bc60347), name: "miyzapusu" }] }}, dns_resources: {ResourceId(96ec6dae-6a4b-b22c-e885-7e69b42d6bb9): ResourceDescriptionDns { id: ResourceId(96ec6dae-6a4b-b22c-e885-7e69b42d6bb9), address: "tcdg.soggv", name: "xguldtdm", address_description: Some("cnrzwdta"), sites: [Site { id: SiteId(dd8656de-e47d-c26a-c47c-313176f5d05e), name: "opkgdiwul" }] }, ResourceId(a5d7c295-c083-f35c-a7a6-bca3dc945df6): ResourceDescriptionDns { id: ResourceId(a5d7c295-c083-f35c-a7a6-bca3dc945df6), address: "wotdd.nsc", name: "gamlub", address_description: Some("fnig"), sites: [Site { id: SiteId(dd8656de-e47d-c26a-c47c-313176f5d05e), name: "opkgdiwul" }] }}, gateway_selector: Selector { rng: TestRng { rng: ChaCha(ChaCha20Rng { rng: BlockRng { core: ChaChaXCore {}, result_len: 64, index: 64 } }) }, bias_increment: 0 } }, global_dns_records: {Name(gmvtch.micn.csycux.): {166.54.78.5, 89.218.96.4, a9b4:c51b:847:baa0:3d37:b7a7:7a41:a758, ::ffff:80.139.91.47}, Name(wotdd.nsc.): {198.51.100.89, 2001:db80::1d, 2001:db80::b, 2001:db80::41, 2001:db80::7}, Name(fauky.ojbl.): {32.237.235.35, ::ffff:127.0.0.1, 217.177.40.0}, Name(tcdg.soggv.): {198.51.100.101, 2001:db80::12}}, network: RoutingTable { routes: {(V4(Ipv4Network { network_address: 203.0.113.53, netmask: 32 }), Gateway(GatewayId(351b6669-2ecc-d4ef-bfdb-ae7e67335f0f))), (V4(Ipv4Network { network_address: 203.0.113.55, netmask: 32 }), Relay(RelayId(db5e85e5-c0d4-5ba8-5f2e-7edd8a257295))), (V4(Ipv4Network { network_address: 203.0.113.61, netmask: 32 }), Gateway(GatewayId(dc30baba-eed8-0817-ec08-9af76ec6a705))), (V4(Ipv4Network { network_address: 203.0.113.86, netmask: 32 }), Relay(RelayId(75d67559-7bfa-5a63-6a27-0e61219b0911))), (V6(Ipv6Network { network_address: 2001:db80::7, netmask: 128 }), Client(ClientId(00000000-0000-0000-0000-000000000000))), (V6(Ipv6Network { network_address: 2001:db80::8, netmask: 128 }), Relay(RelayId(75d67559-7bfa-5a63-6a27-0e61219b0911))), (V6(Ipv6Network { network_address: 2001:db80::24, netmask: 128 }), Relay(RelayId(db5e85e5-c0d4-5ba8-5f2e-7edd8a257295))), (V6(Ipv6Network { network_address: 2001:db80::29, netmask: 128 }), Gateway(GatewayId(dc30baba-eed8-0817-ec08-9af76ec6a705))), (V6(Ipv6Network { network_address: 2001:db80::45, netmask: 128 }), Gateway(GatewayId(986cb888-7e70-963c-2424-716987ce2b63)))} } }, [ActivateResource(Dns(ResourceDescriptionDns { id: ResourceId(96ec6dae-6a4b-b22c-e885-7e69b42d6bb9), address: "tcdg.soggv", name: "xguldtdm", address_description: Some("cnrzwdta"), sites: [Site { id: SiteId(dd8656de-e47d-c26a-c47c-313176f5d05e), name: "opkgdiwul" }] })), SendDnsQuery { domain: Name(tcdg.soggv.), r_type: A, query_id: 52671, dns_server: [6603:13c5:6335:3984:4e59:6b0b:62a3:f068]:53 }, SendICMPPacketToDnsResource { src: 100.64.0.1, dst: Name(tcdg.soggv.), seq: 0, identifier: 0 }, DeactivateResource(ResourceId(96ec6dae-6a4b-b22c-e885-7e69b42d6bb9)), ActivateResource(Dns(ResourceDescriptionDns { id: ResourceId(96ec6dae-6a4b-b22c-e885-7e69b42d6bb9), address: "tcdg.soggv", name: "xguldtdm", address_description: Some("cnrzwdta"), sites: [Site { id: SiteId(dd8656de-e47d-c26a-c47c-313176f5d05e), name: "opkgdiwul" }] })), SendDnsQuery { domain: Name(tcdg.soggv.), r_type: AAAA, query_id: 36641, dns_server: [e44c:d620:c1e9:e326:361d:e6ad:1caf:25a2]:53 }, SendICMPPacketToDnsResource { src: fd00:2021:1111::, dst: Name(tcdg.soggv.), seq: 0, identifier: 0 }], None) cc 547a1c29e31d0e03c3d9645dd7d644cf6ee24d4118333efae50c6a0d7e2dd1e6 # shrinks to (initial_state, transitions, seen_counter) = (ReferenceState { now: Instant { tv_sec: 294166, tv_nsec: 684186989 }, utc_now: 2024-07-20T03:30:33.641354967Z, client: Host { inner: RefClient { id: ClientId(00000000-0000-0000-0000-000000000000), key: PrivateKey("0000000000000000000000000000000000000000000000000000000000000000"), known_hosts: {}, tunnel_ip4: 100.64.0.1, tunnel_ip6: fd00:2021:1111::, system_dns_resolvers: [127.0.0.1], upstream_dns_resolvers: [], cidr_resources: {}, dns_resources: {}, dns_records: {}, connected_cidr_resources: {}, connected_dns_resources: {}, expected_icmp_handshakes: {}, expected_dns_handshakes: [] }, ip4: Some(203.0.113.41), ip6: None, old_ports: {0}, default_port: 51234, allocated_ports: {(51234, V4)} }, gateways: {GatewayId(29009c2e-c7bb-b747-2f2a-35b6a061e4ba): Host { inner: RefGateway { key: PrivateKey("5e8e55c90177c08697e3bd5b7217c7034717dcc22deec7cd5d724a56474b88c4") }, ip4: Some(203.0.113.97), ip6: Some(2001:db80::18), old_ports: {0}, default_port: 30743, allocated_ports: {(30743, V4), (30743, V6)} }, GatewayId(bec0b567-d5c2-abae-d198-b4e40558eb5d): Host { inner: RefGateway { key: PrivateKey("a861b5319dd811adf29a182e6ae6d441ea99b2da3de1f5d854323491d8791f75") }, ip4: Some(203.0.113.50), ip6: None, old_ports: {0}, default_port: 61420, allocated_ports: {(61420, V4)} }}, relays: {RelayId(f8821991-8760-432e-774e-9e9f10601a2d): Host { inner: 3060437218861478241, ip4: Some(203.0.113.76), ip6: Some(2001:db80::13), old_ports: {0}, default_port: 3478, allocated_ports: {(3478, V4), (3478, V6)} }, RelayId(15d626e3-4273-0112-ae12-b10d68e45670): Host { inner: 7324147672620279826, ip4: Some(203.0.113.6), ip6: Some(2001:db80::35), old_ports: {0}, default_port: 3478, allocated_ports: {(3478, V4), (3478, V6)} }}, portal: StubPortal { gateways_by_site: {SiteId(926baf76-74f7-9f10-33ce-ecc193225045): {GatewayId(bec0b567-d5c2-abae-d198-b4e40558eb5d), GatewayId(29009c2e-c7bb-b747-2f2a-35b6a061e4ba)}}, sites_by_resource: {ResourceId(4fb7cd6e-426f-9447-4874-63227a86bb1c): SiteId(926baf76-74f7-9f10-33ce-ecc193225045), ResourceId(95628b17-9f45-57b7-773c-ca1f9ba51b0d): SiteId(926baf76-74f7-9f10-33ce-ecc193225045), ResourceId(bde971c9-7a7d-2bb8-e4ed-4aabb9cd7046): SiteId(926baf76-74f7-9f10-33ce-ecc193225045), ResourceId(b28854b5-f5b3-fc05-f825-f199d538e2fd): SiteId(926baf76-74f7-9f10-33ce-ecc193225045), ResourceId(f61fd3bf-823e-0c47-bee7-3f2a566cd7c7): SiteId(926baf76-74f7-9f10-33ce-ecc193225045)}, cidr_resources: {ResourceId(f61fd3bf-823e-0c47-bee7-3f2a566cd7c7): ResourceDescriptionCidr { id: ResourceId(f61fd3bf-823e-0c47-bee7-3f2a566cd7c7), address: V6(Ipv6Network { network_address: f423:16f2:a47:2104:8706:c6b0:512b:d8a0, netmask: 124 }), name: "djwqojp", address_description: None, sites: [Site { id: SiteId(926baf76-74f7-9f10-33ce-ecc193225045), name: "autpi" }] }, ResourceId(4fb7cd6e-426f-9447-4874-63227a86bb1c): ResourceDescriptionCidr { id: ResourceId(4fb7cd6e-426f-9447-4874-63227a86bb1c), address: V6(Ipv6Network { network_address: ::ffff:64.186.64.56, netmask: 125 }), name: "ieybqj", address_description: Some("rxykuo"), sites: [Site { id: SiteId(926baf76-74f7-9f10-33ce-ecc193225045), name: "autpi" }] }, ResourceId(bde971c9-7a7d-2bb8-e4ed-4aabb9cd7046): ResourceDescriptionCidr { id: ResourceId(bde971c9-7a7d-2bb8-e4ed-4aabb9cd7046), address: V4(Ipv4Network { network_address: 127.0.0.0, netmask: 28 }), name: "cwagxmyay", address_description: None, sites: [Site { id: SiteId(926baf76-74f7-9f10-33ce-ecc193225045), name: "autpi" }] }, ResourceId(b28854b5-f5b3-fc05-f825-f199d538e2fd): ResourceDescriptionCidr { id: ResourceId(b28854b5-f5b3-fc05-f825-f199d538e2fd), address: V6(Ipv6Network { network_address: 183:10ea:2b4:e15:75ff:3012:18b7:e0d0, netmask: 124 }), name: "hnhmn", address_description: None, sites: [Site { id: SiteId(926baf76-74f7-9f10-33ce-ecc193225045), name: "autpi" }] }}, dns_resources: {ResourceId(95628b17-9f45-57b7-773c-ca1f9ba51b0d): ResourceDescriptionDns { id: ResourceId(95628b17-9f45-57b7-773c-ca1f9ba51b0d), address: "?.spspt.yxil.ycym", name: "fmumrt", address_description: Some("wwrfj"), sites: [Site { id: SiteId(926baf76-74f7-9f10-33ce-ecc193225045), name: "autpi" }] }}, gateway_selector: Selector { rng: TestRng { rng: ChaCha(ChaCha20Rng { rng: BlockRng { core: ChaChaXCore {}, result_len: 64, index: 64 } }) }, bias_increment: 0 } }, global_dns_records: {Name(nme.ectfdn.nxom.): {140d:e0d6:a055:84bd:6a07:ac77:e087:192a, 127.0.0.1, 59.107.216.84, 217.110.117.58}, Name(ynhth.vtdg.qzbhqk.): {b24d:5587:4408:336a:deab:790a:8959:889d, ::ffff:127.0.0.1}, Name(krsmtn.unzrvv.): {79.222.225.84, 127.0.0.1}, Name(drcf.spspt.yxil.ycym.): {198.51.100.134, 198.51.100.80}, Name(stvij.spspt.yxil.ycym.): {2001:db80::2, 2001:db80::aa, 2001:db80::d6}}, network: RoutingTable { routes: {(V4(Ipv4Network { network_address: 203.0.113.6, netmask: 32 }), Relay(RelayId(15d626e3-4273-0112-ae12-b10d68e45670))), (V4(Ipv4Network { network_address: 203.0.113.41, netmask: 32 }), Client(ClientId(00000000-0000-0000-0000-000000000000))), (V4(Ipv4Network { network_address: 203.0.113.50, netmask: 32 }), Gateway(GatewayId(bec0b567-d5c2-abae-d198-b4e40558eb5d))), (V4(Ipv4Network { network_address: 203.0.113.76, netmask: 32 }), Relay(RelayId(f8821991-8760-432e-774e-9e9f10601a2d))), (V4(Ipv4Network { network_address: 203.0.113.97, netmask: 32 }), Gateway(GatewayId(29009c2e-c7bb-b747-2f2a-35b6a061e4ba))), (V6(Ipv6Network { network_address: 2001:db80::13, netmask: 128 }), Relay(RelayId(f8821991-8760-432e-774e-9e9f10601a2d))), (V6(Ipv6Network { network_address: 2001:db80::18, netmask: 128 }), Gateway(GatewayId(29009c2e-c7bb-b747-2f2a-35b6a061e4ba))), (V6(Ipv6Network { network_address: 2001:db80::35, netmask: 128 }), Relay(RelayId(15d626e3-4273-0112-ae12-b10d68e45670)))} } }, [ActivateResource(Cidr(ResourceDescriptionCidr { id: ResourceId(bde971c9-7a7d-2bb8-e4ed-4aabb9cd7046), address: V4(Ipv4Network { network_address: 127.0.0.0, netmask: 28 }), name: "cwagxmyay", address_description: None, sites: [Site { id: SiteId(926baf76-74f7-9f10-33ce-ecc193225045), name: "autpi" }] })), SendDnsQuery { domain: Name(nme.ectfdn.nxom.), r_type: A, query_id: 0, dns_server: 127.0.0.1:53 }], None) +cc eab6084243b6d361bfd3a055870249f30f07d2d9ae808cfe440d2a92910805d5 # shrinks to (initial_state, transitions, seen_counter) = (ReferenceState { client: Host { inner: RefClient { id: ClientId(00000000-0000-0000-0000-000000000000), key: PrivateKey("0000000000000000000000000000000000000000000000000000000000000000"), known_hosts: {}, tunnel_ip4: 100.64.0.1, tunnel_ip6: fd00:2021:1111::, system_dns_resolvers: [0.0.0.0], upstream_dns_resolvers: [IpPort(IpDnsServer { address: 1.127.231.135:53 }), IpPort(IpDnsServer { address: [14c2:6a79:7420:f25f:1ce4:ff9d:dcc6:da21]:53 }), IpPort(IpDnsServer { address: [::ffff:127.0.0.1]:53 }), IpPort(IpDnsServer { address: [7d2e:d5bf:b3c3:b614:262a:bf39:7e4:8d54]:53 })], cidr_resources: {}, dns_resources: {}, dns_records: {}, connected_cidr_resources: {}, connected_dns_resources: {}, expected_icmp_handshakes: {}, expected_dns_handshakes: [] }, ip4: Some(203.0.113.99), ip6: None, old_ports: {0}, default_port: 53282, allocated_ports: {(53282, V4)}, latency: 65ms }, gateways: {GatewayId(546aa1d9-8309-7d32-6030-ee68906ba7a8): Host { inner: RefGateway { key: PrivateKey("fee4bb73bb216c4dbb9723a4aa154a7f3c8e12dbeedcd80448b3267f1e0475d7") }, ip4: Some(203.0.113.78), ip6: Some(2001:db80::59), old_ports: {0}, default_port: 4725, allocated_ports: {(4725, V4), (4725, V6)}, latency: 40ms }}, relays: {RelayId(71f81a67-4384-7479-b327-c3ad4b198e8f): Host { inner: 18265579232023912380, ip4: Some(203.0.113.39), ip6: Some(2001:db80::48), old_ports: {0}, default_port: 3478, allocated_ports: {(3478, V4), (3478, V6)}, latency: 88ms }}, portal: StubPortal { gateways_by_site: {SiteId(03cd8b72-a91e-f818-040b-78989a12c378): {GatewayId(546aa1d9-8309-7d32-6030-ee68906ba7a8)}}, sites_by_resource: {ResourceId(57f2e154-c37d-eb23-fe7c-7def269fa25a): SiteId(03cd8b72-a91e-f818-040b-78989a12c378), ResourceId(45d30f88-fce6-5633-afc5-07b66a5ccd60): SiteId(03cd8b72-a91e-f818-040b-78989a12c378), ResourceId(2816ea38-4f4f-f031-5cc8-46c9beed5209): SiteId(03cd8b72-a91e-f818-040b-78989a12c378), ResourceId(09fa3de9-3baf-8686-ceed-8830ee880d27): SiteId(03cd8b72-a91e-f818-040b-78989a12c378)}, cidr_resources: {ResourceId(57f2e154-c37d-eb23-fe7c-7def269fa25a): ResourceDescriptionCidr { id: ResourceId(57f2e154-c37d-eb23-fe7c-7def269fa25a), address: V4(Ipv4Network { network_address: 221.62.86.128, netmask: 25 }), name: "ocfaaizvoe", address_description: None, sites: [Site { id: SiteId(03cd8b72-a91e-f818-040b-78989a12c378), name: "iqnjedkq" }] }, ResourceId(09fa3de9-3baf-8686-ceed-8830ee880d27): ResourceDescriptionCidr { id: ResourceId(09fa3de9-3baf-8686-ceed-8830ee880d27), address: V4(Ipv4Network { network_address: 179.138.125.137, netmask: 32 }), name: "bzhnddmrp", address_description: Some("ffzmchohy"), sites: [Site { id: SiteId(03cd8b72-a91e-f818-040b-78989a12c378), name: "iqnjedkq" }] }}, dns_resources: {ResourceId(2816ea38-4f4f-f031-5cc8-46c9beed5209): ResourceDescriptionDns { id: ResourceId(2816ea38-4f4f-f031-5cc8-46c9beed5209), address: "xfkekv.pgmda", name: "mojmr", address_description: None, sites: [Site { id: SiteId(03cd8b72-a91e-f818-040b-78989a12c378), name: "iqnjedkq" }] }, ResourceId(45d30f88-fce6-5633-afc5-07b66a5ccd60): ResourceDescriptionDns { id: ResourceId(45d30f88-fce6-5633-afc5-07b66a5ccd60), address: "?.lysnno.daua", name: "mzwo", address_description: Some("wvwnkcnnf"), sites: [Site { id: SiteId(03cd8b72-a91e-f818-040b-78989a12c378), name: "iqnjedkq" }] }}, gateway_selector: Selector { rng: TestRng { rng: ChaCha(ChaCha20Rng { rng: BlockRng { core: ChaChaXCore {}, result_len: 64, index: 64 } }) }, bias_increment: 0 } }, global_dns_records: {Name(cohkip.lysnno.daua.): {198.51.100.176, 2001:db80::b, 198.51.100.196, 198.51.100.22}, Name(hedlv.vlt.hsn.): {254.0.175.188, 127.0.0.1, 74.56.14.172, 111.228.2.198, 0.0.0.0}, Name(ejxryi.dznv.mul.): {::ffff:26.186.154.63}, Name(ncx.nzrgl.): {4.37.52.12, 251.168.201.227, ::ffff:176.120.121.186, ::ffff:70.62.117.245, ::ffff:194.247.205.134}, Name(xfkekv.pgmda.): {2001:db80::bd, 2001:db80::8, 198.51.100.223}}, drop_direct_client_traffic: true, network: RoutingTable { routes: {(V4(Ipv4Network { network_address: 203.0.113.39, netmask: 32 }), Relay(RelayId(71f81a67-4384-7479-b327-c3ad4b198e8f))), (V4(Ipv4Network { network_address: 203.0.113.78, netmask: 32 }), Gateway(GatewayId(546aa1d9-8309-7d32-6030-ee68906ba7a8))), (V4(Ipv4Network { network_address: 203.0.113.99, netmask: 32 }), Client(ClientId(00000000-0000-0000-0000-000000000000))), (V6(Ipv6Network { network_address: 2001:db80::48, netmask: 128 }), Relay(RelayId(71f81a67-4384-7479-b327-c3ad4b198e8f))), (V6(Ipv6Network { network_address: 2001:db80::59, netmask: 128 }), Gateway(GatewayId(546aa1d9-8309-7d32-6030-ee68906ba7a8)))} } }, [ActivateResource(Dns(ResourceDescriptionDns { id: ResourceId(45d30f88-fce6-5633-afc5-07b66a5ccd60), address: "?.lysnno.daua", name: "mzwo", address_description: Some("wvwnkcnnf"), sites: [Site { id: SiteId(03cd8b72-a91e-f818-040b-78989a12c378), name: "iqnjedkq" }] })), SendDnsQuery { domain: Name(cohkip.lysnno.daua.), r_type: AAAA, query_id: 54648, dns_server: 1.127.231.135:53 }, SendICMPPacketToDnsResource { src: fd00:2021:1111::, dst: Name(cohkip.lysnno.daua.), seq: 0, identifier: 0 }, SendICMPPacketToDnsResource { src: fd00:2021:1111::, dst: Name(cohkip.lysnno.daua.), seq: 0, identifier: 0 }], None) +cc b72bd98dd2b6aa4d7fb2fb29000c7198ab3b6f93e043cf87fb207611a1d26838 # shrinks to (initial_state, transitions, seen_counter) = (ReferenceState { client: Host { inner: RefClient { id: ClientId(00000000-0000-0000-0000-000000000000), key: PrivateKey("0000000000000000000000000000000000000000000000000000000000000000"), known_hosts: {}, tunnel_ip4: 100.64.0.1, tunnel_ip6: fd00:2021:1111::, system_dns_resolvers: [::ffff:0.0.0.0], upstream_dns_resolvers: [], cidr_resources: {}, dns_resources: {}, dns_records: {}, connected_cidr_resources: {}, connected_dns_resources: {}, expected_icmp_handshakes: {}, expected_dns_handshakes: [] }, ip4: None, ip6: Some(2001:db80::), old_ports: {0}, default_port: 1, allocated_ports: {(1, V6)}, latency: 10ms }, gateways: {GatewayId(00000000-0000-0000-0000-0070596bacaa): Host { inner: RefGateway { key: PrivateKey("e04a93801603cb484f2be7f116c8a6ff2d114621e67c95e0ef8130e5dca04a05") }, ip4: Some(203.0.113.48), ip6: Some(2001:db80::d), old_ports: {0}, default_port: 33619, allocated_ports: {(33619, V6), (33619, V4)}, latency: 94ms }}, relays: {RelayId(82b81509-2e01-4310-af0c-1e2977a3055b): Host { inner: 4354275714811340498, ip4: Some(203.0.113.82), ip6: Some(2001:db80::40), old_ports: {0}, default_port: 3478, allocated_ports: {(3478, V4), (3478, V6)}, latency: 30ms }}, portal: StubPortal { gateways_by_site: {SiteId(68de952d-805b-b1b8-f7ce-e37910be6744): {GatewayId(00000000-0000-0000-0000-0070596bacaa)}}, sites_by_resource: {ResourceId(0b4f45d6-3b4c-19f3-917b-9bcbb1c548e4): SiteId(68de952d-805b-b1b8-f7ce-e37910be6744), ResourceId(5c5be57c-c89c-d331-2924-56b06cf55940): SiteId(2fe3bfba-5209-b8c7-eff7-0fa47212d60d), ResourceId(dd56bf56-e977-7526-3e9f-4e1992c03605): SiteId(2fe3bfba-5209-b8c7-eff7-0fa47212d60d)}, cidr_resources: {ResourceId(5c5be57c-c89c-d331-2924-56b06cf55940): ResourceDescriptionCidr { id: ResourceId(5c5be57c-c89c-d331-2924-56b06cf55940), address: V4(Ipv4Network { network_address: 127.0.0.0, netmask: 26 }), name: "groey", address_description: None, sites: [Site { id: SiteId(2fe3bfba-5209-b8c7-eff7-0fa47212d60d), name: "mstcnfiy" }] }, ResourceId(0b4f45d6-3b4c-19f3-917b-9bcbb1c548e4): ResourceDescriptionCidr { id: ResourceId(0b4f45d6-3b4c-19f3-917b-9bcbb1c548e4), address: V4(Ipv4Network { network_address: 127.0.0.0, netmask: 31 }), name: "vjjcjd", address_description: None, sites: [Site { id: SiteId(68de952d-805b-b1b8-f7ce-e37910be6744), name: "ikukl" }] }}, dns_resources: {ResourceId(dd56bf56-e977-7526-3e9f-4e1992c03605): ResourceDescriptionDns { id: ResourceId(dd56bf56-e977-7526-3e9f-4e1992c03605), address: "?.punzd.vdmfn.ipb", name: "jbzakvxoal", address_description: None, sites: [Site { id: SiteId(2fe3bfba-5209-b8c7-eff7-0fa47212d60d), name: "mstcnfiy" }] }}, gateway_selector: Selector { rng: TestRng { rng: ChaCha(ChaCha20Rng { rng: BlockRng { core: ChaChaXCore {}, result_len: 64, index: 64 } }) }, bias_increment: 0 } }, global_dns_records: {Name(aaa.punzd.vdmfn.ipb.): {198.51.100.15}, Name(cvyq.uhdtb.kvcid.): {127.0.0.1, 232.139.74.54, ::ffff:127.0.0.1}, Name(llmcod.nejw.): {3f1d:5290:e6c5:f8b4:544c:3874:17e4:d507, 9ebd:31bb:be04:ed50:5965:addf:1fef:958d, ::ffff:38.91.200.132, 5.97.63.64}}, drop_direct_client_traffic: true, network: RoutingTable { routes: {(V4(Ipv4Network { network_address: 203.0.113.48, netmask: 32 }), Gateway(GatewayId(00000000-0000-0000-0000-0070596bacaa))), (V4(Ipv4Network { network_address: 203.0.113.82, netmask: 32 }), Relay(RelayId(82b81509-2e01-4310-af0c-1e2977a3055b))), (V6(Ipv6Network { network_address: 2001:db80::, netmask: 128 }), Client(ClientId(00000000-0000-0000-0000-000000000000))), (V6(Ipv6Network { network_address: 2001:db80::d, netmask: 128 }), Gateway(GatewayId(00000000-0000-0000-0000-0070596bacaa))), (V6(Ipv6Network { network_address: 2001:db80::40, netmask: 128 }), Relay(RelayId(82b81509-2e01-4310-af0c-1e2977a3055b)))} } }, [ActivateResource(Cidr(ResourceDescriptionCidr { id: ResourceId(0b4f45d6-3b4c-19f3-917b-9bcbb1c548e4), address: V4(Ipv4Network { network_address: 127.0.0.0, netmask: 31 }), name: "vjjcjd", address_description: None, sites: [Site { id: SiteId(68de952d-805b-b1b8-f7ce-e37910be6744), name: "ikukl" }] })), SendICMPPacketToCidrResource { src: 100.64.0.1, dst: 127.0.0.0, seq: 0, identifier: 0 }, SendICMPPacketToCidrResource { src: 100.64.0.1, dst: 127.0.0.0, seq: 0, identifier: 0 }], None) +cc 6f00b4fe571ce46735ae49e5729677e7f098b354ccf1449a14b77ee46a564c40 # shrinks to (initial_state, transitions, seen_counter) = (ReferenceState { client: Host { inner: RefClient { id: 0, key: PrivateKey("0000000000000000000000000000000000000000000000000000000000000000"), known_hosts: {}, tunnel_ip4: 100.64.0.1, tunnel_ip6: fd00:2021:1111::, system_dns_resolvers: [0.0.0.0], upstream_dns_resolvers: [], cidr_resources: {}, dns_resources: {}, dns_records: {}, connected_cidr_resources: {}, connected_dns_resources: {}, expected_icmp_handshakes: {}, expected_dns_handshakes: [] }, ip4: Some(203.0.113.4), ip6: None, old_ports: {0}, default_port: 1, allocated_ports: {(1, V4)}, latency: 10ms }, gateways: {2F8575348E1C955FC: Host { inner: RefGateway { key: PrivateKey("a7ddde752b29d54dd8490c1d09ff3c57cea567538ff3969880df030302ce0186") }, ip4: Some(203.0.113.3), ip6: Some(2001:db80::2), old_ports: {0}, default_port: 34528, allocated_ports: {(34528, V4), (34528, V6)}, latency: 614ms }}, relays: {3BFC7AFF2563AA02AE270F8C169E0582: Host { inner: 18328713063552352787, ip4: Some(203.0.113.86), ip6: Some(2001:db80::26), old_ports: {0}, default_port: 3478, allocated_ports: {(3478, V6), (3478, V4)}, latency: 795ms }}, portal: StubPortal { gateways_by_site: {5724BEB2110961ABB05F3F32AC10E0FF: {2F8575348E1C955FC}}, sites_by_resource: {FF519F2CB514A2C99CD5C1CB0CB82D04: 5724BEB2110961ABB05F3F32AC10E0FF, 5BA249689A1A8355B04834C18F4B535B: 5724BEB2110961ABB05F3F32AC10E0FF, C9675631BBFA99A4BCA5BF13DB5F48EC: 5724BEB2110961ABB05F3F32AC10E0FF, DA21A49524864F9DC41178CCCEE652F2: 5724BEB2110961ABB05F3F32AC10E0FF, FBF8B2918FA52D4A9DF542CE0AF991A2: 5724BEB2110961ABB05F3F32AC10E0FF}, cidr_resources: {DA21A49524864F9DC41178CCCEE652F2: ResourceDescriptionCidr { id: DA21A49524864F9DC41178CCCEE652F2, address: V6(Ipv6Network { network_address: ::ffff:39.157.87.248, netmask: 125 }), name: "lrmenlrqn", address_description: None, sites: [Site { id: 5724BEB2110961ABB05F3F32AC10E0FF, name: "fciscdj" }] }, 5BA249689A1A8355B04834C18F4B535B: ResourceDescriptionCidr { id: 5BA249689A1A8355B04834C18F4B535B, address: V6(Ipv6Network { network_address: ::ffff:40.236.238.234, netmask: 127 }), name: "jlpvfrmhx", address_description: None, sites: [Site { id: 5724BEB2110961ABB05F3F32AC10E0FF, name: "fciscdj" }] }, FBF8B2918FA52D4A9DF542CE0AF991A2: ResourceDescriptionCidr { id: FBF8B2918FA52D4A9DF542CE0AF991A2, address: V6(Ipv6Network { network_address: ::ffff:0.0.0.0, netmask: 127 }), name: "gzaggml", address_description: Some("qtamwxq"), sites: [Site { id: 5724BEB2110961ABB05F3F32AC10E0FF, name: "fciscdj" }] }}, dns_resources: {FF519F2CB514A2C99CD5C1CB0CB82D04: ResourceDescriptionDns { id: FF519F2CB514A2C99CD5C1CB0CB82D04, address: "?.fcr.baj", name: "qzvuasnzdp", address_description: Some("imsuq"), sites: [Site { id: 5724BEB2110961ABB05F3F32AC10E0FF, name: "fciscdj" }] }, C9675631BBFA99A4BCA5BF13DB5F48EC: ResourceDescriptionDns { id: C9675631BBFA99A4BCA5BF13DB5F48EC, address: "?.rivxqn.icm", name: "oikpvyh", address_description: None, sites: [Site { id: 5724BEB2110961ABB05F3F32AC10E0FF, name: "fciscdj" }] }}, gateway_selector: Selector { rng: TestRng { rng: ChaCha(ChaCha20Rng { rng: BlockRng { core: ChaChaXCore {}, result_len: 64, index: 64 } }) }, bias_increment: 0 } }, global_dns_records: {Name(aaa.fcr.baj.): {198.51.100.1}, Name(aaa.rivxqn.icm.): {198.51.100.1}}, drop_direct_client_traffic: true, network: RoutingTable { routes: {(V4(Ipv4Network { network_address: 203.0.113.3, netmask: 32 }), Gateway(2F8575348E1C955FC)), (V4(Ipv4Network { network_address: 203.0.113.4, netmask: 32 }), Client(0)), (V4(Ipv4Network { network_address: 203.0.113.86, netmask: 32 }), Relay(3BFC7AFF2563AA02AE270F8C169E0582)), (V6(Ipv6Network { network_address: 2001:db80::2, netmask: 128 }), Gateway(2F8575348E1C955FC)), (V6(Ipv6Network { network_address: 2001:db80::26, netmask: 128 }), Relay(3BFC7AFF2563AA02AE270F8C169E0582))} } }, [ActivateResource(Cidr(ResourceDescriptionCidr { id: DA21A49524864F9DC41178CCCEE652F2, address: V6(Ipv6Network { network_address: ::ffff:39.157.87.248, netmask: 125 }), name: "lrmenlrqn", address_description: None, sites: [Site { id: 5724BEB2110961ABB05F3F32AC10E0FF, name: "fciscdj" }] })), SendICMPPacketToCidrResource { src: fd00:2021:1111::, dst: ::ffff:39.157.87.248, seq: 0, identifier: 0 }, SendICMPPacketToCidrResource { src: fd00:2021:1111::, dst: ::ffff:39.157.87.248, seq: 0, identifier: 0 }], None) diff --git a/rust/connlib/tunnel/src/tests.rs b/rust/connlib/tunnel/src/tests.rs index 6c4d74a85..8be1f5cdf 100644 --- a/rust/connlib/tunnel/src/tests.rs +++ b/rust/connlib/tunnel/src/tests.rs @@ -2,8 +2,11 @@ use crate::tests::sut::TunnelTest; use proptest::test_runner::Config; mod assertions; +mod buffered_transmits; mod composite_strategy; +mod flux_capacitor; mod reference; +mod run_count_appender; mod sim_client; mod sim_gateway; mod sim_net; diff --git a/rust/connlib/tunnel/src/tests/buffered_transmits.rs b/rust/connlib/tunnel/src/tests/buffered_transmits.rs new file mode 100644 index 000000000..f79045b76 --- /dev/null +++ b/rust/connlib/tunnel/src/tests/buffered_transmits.rs @@ -0,0 +1,139 @@ +use super::sim_net::Host; +use snownet::Transmit; +use std::{ + cmp::Reverse, + collections::BinaryHeap, + time::{Duration, Instant}, +}; + +/// A buffer for network packets that need to be handled at a certain point in time. +#[derive(Debug, Clone, Default)] +pub(crate) struct BufferedTransmits { + // Transmits are stored in reverse ordering to emit the earliest first. + inner: BinaryHeap>>>, +} + +#[derive(Clone, Debug, PartialEq, PartialOrd, Eq, Ord)] +struct ByTime { + at: Instant, + value: T, +} + +impl BufferedTransmits { + /// Pushes a new [`Transmit`] from a given [`Host`]. + pub(crate) fn push_from( + &mut self, + transmit: impl Into>>, + sending_host: &Host, + now: Instant, + ) { + let Some(transmit) = transmit.into() else { + return; + }; + + if transmit.src.is_some() { + self.push(transmit, sending_host.latency(), now); + return; + } + + // The `src` of a [`Transmit`] is empty if we want to send if via the default interface. + // In production, the kernel does this for us. + // In this test, we need to always set a `src` so that the remote peer knows where the packet is coming from. + + let Some(src) = sending_host.sending_socket_for(transmit.dst.ip()) else { + tracing::debug!(dst = %transmit.dst, "No socket"); + + return; + }; + + self.push( + Transmit { + src: Some(src), + ..transmit + }, + sending_host.latency(), + now, + ); + } + + pub(crate) fn push( + &mut self, + transmit: impl Into>>, + latency: Duration, + now: Instant, + ) { + let Some(transmit) = transmit.into() else { + return; + }; + + debug_assert!(transmit.src.is_some(), "src must be set for `push`"); + + self.inner.push(Reverse(ByTime { + at: now + latency, + value: transmit, + })); + } + + pub(crate) fn pop(&mut self, now: Instant) -> Option> { + let next = self.inner.peek()?.0.at; + + if next > now { + return None; + } + + let next = self.inner.pop().unwrap().0; + + Some(next.value) + } + + pub(crate) fn is_empty(&self) -> bool { + self.inner.is_empty() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + #[test] + fn by_time_orders_from_earliest_to_latest() { + let mut heap = BinaryHeap::new(); + let start = Instant::now(); + + heap.push(ByTime { + at: start + Duration::from_secs(1), + value: 1, + }); + heap.push(ByTime { + at: start, + value: 0, + }); + heap.push(ByTime { + at: start + Duration::from_secs(2), + value: 2, + }); + + assert_eq!( + heap.pop().unwrap(), + ByTime { + at: start + Duration::from_secs(2), + value: 2 + }, + ); + assert_eq!( + heap.pop().unwrap(), + ByTime { + at: start + Duration::from_secs(1), + value: 1 + } + ); + assert_eq!( + heap.pop().unwrap(), + ByTime { + at: start, + value: 0 + } + ); + } +} diff --git a/rust/connlib/tunnel/src/tests/flux_capacitor.rs b/rust/connlib/tunnel/src/tests/flux_capacitor.rs new file mode 100644 index 000000000..884cde376 --- /dev/null +++ b/rust/connlib/tunnel/src/tests/flux_capacitor.rs @@ -0,0 +1,89 @@ +use chrono::{DateTime, Utc}; +use std::{ + fmt, + sync::{Arc, Mutex}, + time::{Duration, Instant}, +}; +use tracing_subscriber::fmt::{format::Writer, time::FormatTime}; + +/// A device that allows us to travel into the future. +#[derive(Debug, Clone)] +pub(crate) struct FluxCapacitor { + start: Instant, + now: Arc)>>, +} + +impl FormatTime for FluxCapacitor { + fn format_time(&self, w: &mut Writer<'_>) -> fmt::Result { + let e = self.elapsed(); + write!(w, "{:3}.{:03}s", e.as_secs(), e.subsec_millis()) + } +} + +impl Default for FluxCapacitor { + fn default() -> Self { + let start = Instant::now(); + let utc_start = Utc::now(); + + Self { + start, + now: Arc::new(Mutex::new((start, utc_start))), + } + } +} + +impl FluxCapacitor { + const SMALL_TICK: Duration = Duration::from_millis(10); + const LARGE_TICK: Duration = Duration::from_millis(100); + + #[allow(private_bounds)] + pub(crate) fn now(&self) -> T + where + T: PickNow, + { + let (now, utc_now) = *self.now.lock().unwrap(); + + T::pick_now(now, utc_now) + } + + pub(crate) fn small_tick(&self) { + self.tick(Self::SMALL_TICK); + } + + pub(crate) fn large_tick(&self) { + self.tick(Self::LARGE_TICK); + } + + fn tick(&self, tick: Duration) { + { + let mut guard = self.now.lock().unwrap(); + + guard.0 += tick; + guard.1 += tick; + } + + if self.elapsed().subsec_millis() == 0 { + tracing::trace!("Tick"); + } + } + + fn elapsed(&self) -> Duration { + self.now::().duration_since(self.start) + } +} + +trait PickNow { + fn pick_now(now: Instant, utc_now: DateTime) -> Self; +} + +impl PickNow for Instant { + fn pick_now(now: Instant, _: DateTime) -> Self { + now + } +} + +impl PickNow for DateTime { + fn pick_now(_: Instant, utc_now: DateTime) -> Self { + utc_now + } +} diff --git a/rust/connlib/tunnel/src/tests/reference.rs b/rust/connlib/tunnel/src/tests/reference.rs index abe310af1..06797b121 100644 --- a/rust/connlib/tunnel/src/tests/reference.rs +++ b/rust/connlib/tunnel/src/tests/reference.rs @@ -3,7 +3,6 @@ use super::{ strategies::*, stub_portal::StubPortal, transition::*, }; use crate::dns::is_subdomain; -use chrono::{DateTime, Utc}; use connlib_shared::{ messages::{ client::{self, ResourceDescription}, @@ -20,7 +19,6 @@ use std::{ collections::{BTreeMap, HashSet}, fmt, iter, net::IpAddr, - time::Instant, }; /// The reference state machine of the tunnel. @@ -28,8 +26,6 @@ use std::{ /// This is the "expected" part of our test. #[derive(Clone, Debug)] pub(crate) struct ReferenceState { - pub(crate) now: Instant, - pub(crate) utc_now: DateTime, pub(crate) client: Host, pub(crate) gateways: BTreeMap>, pub(crate) relays: BTreeMap>, @@ -69,8 +65,6 @@ impl ReferenceStateMachine for ReferenceState { collection::btree_map(relay_id(), relay_prototype(), 1..=2), global_dns_records(), // Start out with a set of global DNS records so we have something to resolve outside of DNS resources. any::(), - Just(Instant::now()), - Just(Utc::now()), ) .prop_filter_map( "network IPs must be unique", @@ -80,8 +74,6 @@ impl ReferenceStateMachine for ReferenceState { relays, mut global_dns, drop_direct_client_traffic, - now, - utc_now, )| { let mut routing_table = RoutingTable::default(); @@ -110,15 +102,13 @@ impl ReferenceStateMachine for ReferenceState { portal, global_dns, drop_direct_client_traffic, - now, - utc_now, routing_table, )) }, ) .prop_filter( "private keys must be unique", - |(c, gateways, _, _, _, _, _, _, _)| { + |(c, gateways, _, _, _, _, _)| { let different_keys = gateways .iter() .map(|(_, g)| g.inner().key) @@ -136,13 +126,9 @@ impl ReferenceStateMachine for ReferenceState { portal, global_dns_records, drop_direct_client_traffic, - now, - utc_now, network, )| { Self { - now, - utc_now, client, gateways, relays, diff --git a/rust/connlib/tunnel/src/tests/run_count_appender.rs b/rust/connlib/tunnel/src/tests/run_count_appender.rs new file mode 100644 index 000000000..0642655ce --- /dev/null +++ b/rust/connlib/tunnel/src/tests/run_count_appender.rs @@ -0,0 +1,11 @@ +use std::sync::atomic::AtomicU32; +use tracing_appender::rolling::RollingFileAppender; + +/// A file appender that rolls over to a new file for every instance that is created within the same process. +#[allow(dead_code)] +pub(crate) fn appender() -> RollingFileAppender { + static RUN_COUNT: AtomicU32 = AtomicU32::new(0); + let run_count = RUN_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + + tracing_appender::rolling::never(".", format!("run_{run_count:04}.log")) +} diff --git a/rust/connlib/tunnel/src/tests/sim_client.rs b/rust/connlib/tunnel/src/tests/sim_client.rs index 1e550dc84..5e6736d18 100644 --- a/rust/connlib/tunnel/src/tests/sim_client.rs +++ b/rust/connlib/tunnel/src/tests/sim_client.rs @@ -1,7 +1,7 @@ use super::{ reference::{private_key, PrivateKey, ResourceDst}, sim_net::{any_ip_stack, any_port, host, Host}, - strategies::{system_dns_servers, upstream_dns_servers}, + strategies::{latency, system_dns_servers, upstream_dns_servers}, sut::domain_to_hickory_name, IcmpIdentifier, IcmpSeq, QueryId, }; @@ -144,17 +144,14 @@ impl SimClient { Some(self.sut.encapsulate(packet, now)?.into_owned()) } - pub(crate) fn handle_packet( - &mut self, - payload: &[u8], - src: SocketAddr, - dst: SocketAddr, - now: Instant, - ) { - let Some(packet) = self - .sut - .decapsulate(dst, src, payload, now, &mut self.buffer) - else { + pub(crate) fn receive(&mut self, transmit: Transmit, now: Instant) { + let Some(packet) = self.sut.decapsulate( + transmit.dst, + transmit.src.unwrap(), + &transmit.payload, + now, + &mut self.buffer, + ) else { return; }; let packet = packet.to_owned(); @@ -593,6 +590,7 @@ pub(crate) fn ref_client_host( any_ip_stack(), any_port(), ref_client(tunnel_ip4s, tunnel_ip6s), + latency(2000), // Clients might have a horrible Internet connection. ) .prop_filter("at least one DNS server needs to be reachable", |host| { // TODO: PRODUCTION CODE DOES NOT HANDLE THIS! diff --git a/rust/connlib/tunnel/src/tests/sim_gateway.rs b/rust/connlib/tunnel/src/tests/sim_gateway.rs index 24573278e..06656bffd 100644 --- a/rust/connlib/tunnel/src/tests/sim_gateway.rs +++ b/rust/connlib/tunnel/src/tests/sim_gateway.rs @@ -1,6 +1,7 @@ use super::{ reference::{private_key, PrivateKey}, sim_net::{any_port, dual_ip_stack, host, Host}, + strategies::latency, }; use crate::{tests::sut::hickory_name_to_domain, GatewayState}; use connlib_shared::DomainName; @@ -9,7 +10,7 @@ use proptest::prelude::*; use snownet::Transmit; use std::{ collections::{BTreeMap, HashSet, VecDeque}, - net::{IpAddr, SocketAddr}, + net::IpAddr, time::Instant, }; @@ -31,17 +32,21 @@ impl SimGateway { } } - pub(crate) fn handle_packet( + pub(crate) fn receive( &mut self, global_dns_records: &BTreeMap>, - payload: &[u8], - src: SocketAddr, - dst: SocketAddr, + transmit: Transmit, now: Instant, ) -> Option> { let packet = self .sut - .decapsulate(dst, src, payload, now, &mut self.buffer)? + .decapsulate( + transmit.dst, + transmit.src.unwrap(), + &transmit.payload, + now, + &mut self.buffer, + )? .to_owned(); self.on_received_packet(global_dns_records, packet, now) @@ -99,7 +104,12 @@ impl RefGateway { } pub(crate) fn ref_gateway_host() -> impl Strategy> { - host(dual_ip_stack(), any_port(), ref_gateway()) + host( + dual_ip_stack(), + any_port(), + ref_gateway(), + latency(200), // We assume gateways have a somewhat decent Internet connection. + ) } fn ref_gateway() -> impl Strategy { diff --git a/rust/connlib/tunnel/src/tests/sim_net.rs b/rust/connlib/tunnel/src/tests/sim_net.rs index 002b8d25a..94d0d3228 100644 --- a/rust/connlib/tunnel/src/tests/sim_net.rs +++ b/rust/connlib/tunnel/src/tests/sim_net.rs @@ -1,3 +1,4 @@ +use crate::tests::buffered_transmits::BufferedTransmits; use crate::tests::strategies::documentation_ip6s; use connlib_shared::messages::{ClientId, GatewayId, RelayId}; use firezone_relay::{AddressFamily, IpStack}; @@ -6,11 +7,13 @@ use ip_network_table::IpNetworkTable; use itertools::Itertools as _; use prop::sample; use proptest::prelude::*; +use snownet::Transmit; use std::{ collections::HashSet, fmt, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, num::NonZeroU16, + time::{Duration, Instant}, }; use tracing::Span; @@ -29,12 +32,20 @@ pub(crate) struct Host { default_port: u16, allocated_ports: HashSet<(u16, AddressFamily)>, + // The latency of incoming and outgoing packets. + latency: Duration, + #[derivative(Debug = "ignore")] span: Span, + + /// Messages that have "arrived" and are waiting to be dispatched. + /// + /// We buffer them here because we need also apply our latency on inbound packets. + inbox: BufferedTransmits, } impl Host { - pub(crate) fn new(inner: T) -> Self { + pub(crate) fn new(inner: T, latency: Duration) -> Self { Self { inner, ip4: None, @@ -43,6 +54,8 @@ impl Host { default_port: 0, allocated_ports: HashSet::default(), old_ports: HashSet::default(), + latency, + inbox: BufferedTransmits::default(), } } @@ -105,6 +118,18 @@ impl Host { IpAddr::V6(src) => self.ip6.is_some_and(|v6| v6 == src), } } + + pub(crate) fn latency(&self) -> Duration { + self.latency + } + + pub(crate) fn receive(&mut self, transmit: Transmit<'static>, now: Instant) { + self.inbox.push(transmit, self.latency, now); + } + + pub(crate) fn poll_transmit(&mut self, now: Instant) -> Option> { + self.inbox.pop(now) + } } impl Host @@ -124,6 +149,8 @@ where default_port: self.default_port, allocated_ports: self.allocated_ports.clone(), old_ports: self.old_ports.clone(), + latency: self.latency, + inbox: self.inbox.clone(), } } } @@ -241,12 +268,13 @@ pub(crate) fn host( socket_ips: impl Strategy, default_port: impl Strategy, state: impl Strategy, + latency: impl Strategy, ) -> impl Strategy> where T: fmt::Debug, { - (state, socket_ips, default_port).prop_map(move |(state, ip_stack, port)| { - let mut host = Host::new(state); + (state, socket_ips, default_port, latency).prop_map(move |(state, ip_stack, port, latency)| { + let mut host = Host::new(state, latency); host.update_interface(ip_stack.as_v4().copied(), ip_stack.as_v6().copied(), port); host diff --git a/rust/connlib/tunnel/src/tests/sim_relay.rs b/rust/connlib/tunnel/src/tests/sim_relay.rs index 8fdd0e1d1..2c1a997c6 100644 --- a/rust/connlib/tunnel/src/tests/sim_relay.rs +++ b/rust/connlib/tunnel/src/tests/sim_relay.rs @@ -1,4 +1,7 @@ -use super::sim_net::{dual_ip_stack, host, Host}; +use super::{ + sim_net::{dual_ip_stack, host, Host}, + strategies::latency, +}; use connlib_shared::messages::RelayId; use firezone_relay::{AddressFamily, AllocationPort, ClientSocket, IpStack, PeerSocket}; use proptest::prelude::*; @@ -82,13 +85,15 @@ impl SimRelay { } } - pub(crate) fn handle_packet( + pub(crate) fn receive( &mut self, - payload: &[u8], - sender: SocketAddr, - dst: SocketAddr, + transmit: Transmit, now: Instant, ) -> Option> { + let dst = transmit.dst; + let payload = &transmit.payload; + let sender = transmit.src.unwrap(); + if self .matching_listen_socket(dst, self.sut.public_address()) .is_some_and(|s| s == dst) @@ -196,5 +201,6 @@ pub(crate) fn relay_prototype() -> impl Strategy> { dual_ip_stack(), // For this test, our relays always run in dual-stack mode to ensure connectivity! Just(3478), any::(), + latency(50), // We assume our relays have a good Internet connection. ) } diff --git a/rust/connlib/tunnel/src/tests/strategies.rs b/rust/connlib/tunnel/src/tests/strategies.rs index 0e4c239fd..48b87ffc1 100644 --- a/rust/connlib/tunnel/src/tests/strategies.rs +++ b/rust/connlib/tunnel/src/tests/strategies.rs @@ -22,6 +22,7 @@ use std::{ collections::{BTreeMap, HashMap, HashSet}, net::{IpAddr, Ipv4Addr, Ipv6Addr}, str::FromStr as _, + time::Duration, }; pub(crate) fn upstream_dns_servers() -> impl Strategy> { @@ -93,6 +94,10 @@ pub(crate) fn tunnel_ip6s() -> impl Iterator { .map(|n| n.network_address()) } +pub(crate) fn latency(max: u64) -> impl Strategy { + (10..max).prop_map(Duration::from_millis) +} + /// A [`Strategy`] for sampling a set of gateways and a corresponding [`StubPortal`] that has a set of [`Site`]s configured with those gateways. /// /// Similar as in production, the portal holds a list of DNS and CIDR resources (those are also sampled from the given sites). diff --git a/rust/connlib/tunnel/src/tests/sut.rs b/rust/connlib/tunnel/src/tests/sut.rs index 4d6999430..b467cfb7e 100644 --- a/rust/connlib/tunnel/src/tests/sut.rs +++ b/rust/connlib/tunnel/src/tests/sut.rs @@ -1,3 +1,4 @@ +use super::buffered_transmits::BufferedTransmits; use super::reference::ReferenceState; use super::sim_client::SimClient; use super::sim_gateway::SimGateway; @@ -6,10 +7,11 @@ use super::sim_relay::SimRelay; use super::stub_portal::StubPortal; use crate::dns::is_subdomain; use crate::tests::assertions::*; +use crate::tests::flux_capacitor::FluxCapacitor; use crate::tests::sim_relay::map_explode; use crate::tests::transition::Transition; +use crate::utils::earliest; use crate::{dns::DnsQuery, ClientEvent, GatewayEvent, Request}; -use chrono::{DateTime, Utc}; use connlib_shared::messages::client::ResourceDescription; use connlib_shared::{ messages::{ClientId, GatewayId, Interface, RelayId}, @@ -23,8 +25,13 @@ use hickory_resolver::lookup::Lookup; use proptest_state_machine::{ReferenceStateMachine, StateMachineTest}; use secrecy::ExposeSecret as _; use snownet::Transmit; -use std::collections::{BTreeMap, BTreeSet, VecDeque}; -use std::{collections::HashSet, net::IpAddr, str::FromStr as _, sync::Arc, time::Instant}; +use std::{ + collections::{BTreeMap, BTreeSet, HashSet}, + net::IpAddr, + str::FromStr as _, + sync::Arc, + time::{Duration, Instant}, +}; use tracing::debug_span; use tracing::subscriber::DefaultGuard; use tracing_subscriber::layer::SubscriberExt; @@ -35,8 +42,7 @@ use tracing_subscriber::{util::SubscriberInitExt as _, EnvFilter}; /// /// [`proptest`] manipulates this using [`Transition`]s and we assert it against [`ReferenceState`]. pub(crate) struct TunnelTest { - now: Instant, - utc_now: DateTime, + flux_capacitor: FluxCapacitor, client: Host, gateways: BTreeMap>, @@ -57,8 +63,12 @@ impl StateMachineTest for TunnelTest { fn init_test( ref_state: &::State, ) -> Self::SystemUnderTest { + let flux_capacitor = FluxCapacitor::default(); + let logger = tracing_subscriber::fmt() .with_test_writer() + // .with_writer(crate::tests::run_count_appender::appender()) // Useful for diffing logs between runs. + .with_timer(flux_capacitor.clone()) .with_env_filter(EnvFilter::from_default_env()) .finish() .set_default(); @@ -96,7 +106,7 @@ impl StateMachineTest for TunnelTest { c.sut.update_relays( BTreeSet::default(), BTreeSet::from_iter(map_explode(relays.iter(), "client")), - ref_state.now, + flux_capacitor.now(), ) }); for (id, gateway) in &mut gateways { @@ -104,14 +114,13 @@ impl StateMachineTest for TunnelTest { g.sut.update_relays( BTreeSet::default(), BTreeSet::from_iter(map_explode(relays.iter(), &format!("gateway_{id}"))), - ref_state.now, + flux_capacitor.now(), ) }); } let mut this = Self { - now: ref_state.now, - utc_now: ref_state.utc_now, + flux_capacitor, network: ref_state.network.clone(), drop_direct_client_traffic: ref_state.drop_direct_client_traffic, client, @@ -123,8 +132,6 @@ impl StateMachineTest for TunnelTest { let mut buffered_transmits = BufferedTransmits::default(); this.advance(ref_state, &mut buffered_transmits); // Perform initial setup before we apply the first transition. - debug_assert!(buffered_transmits.is_empty()); - this } @@ -135,6 +142,7 @@ impl StateMachineTest for TunnelTest { transition: ::Transition, ) -> Self::SystemUnderTest { let mut buffered_transmits = BufferedTransmits::default(); + let now = state.flux_capacitor.now(); // Act: Apply the transition match transition { @@ -176,11 +184,9 @@ impl StateMachineTest for TunnelTest { } => { let packet = ip_packet::make::icmp_request_packet(src, dst, seq, identifier); - let transmit = state - .client - .exec_mut(|sim| sim.encapsulate(packet, state.now)); + let transmit = state.client.exec_mut(|sim| sim.encapsulate(packet, now)); - buffered_transmits.push(transmit, &state.client); + buffered_transmits.push_from(transmit, &state.client, now); } Transition::SendICMPPacketToDnsResource { src, @@ -207,9 +213,9 @@ impl StateMachineTest for TunnelTest { let transmit = state .client - .exec_mut(|sim| Some(sim.encapsulate(packet, state.now)?.into_owned())); + .exec_mut(|sim| Some(sim.encapsulate(packet, now)?.into_owned())); - buffered_transmits.push(transmit, &state.client); + buffered_transmits.push_from(transmit, &state.client, now); } Transition::SendDnsQuery { domain, @@ -218,10 +224,10 @@ impl StateMachineTest for TunnelTest { dns_server, } => { let transmit = state.client.exec_mut(|sim| { - sim.send_dns_query_for(domain, r_type, query_id, dns_server, state.now) + sim.send_dns_query_for(domain, r_type, query_id, dns_server, now) }); - buffered_transmits.push(transmit, &state.client); + buffered_transmits.push_from(transmit, &state.client, now); } Transition::UpdateSystemDnsServers { servers } => { state @@ -251,7 +257,7 @@ impl StateMachineTest for TunnelTest { c.sut.update_relays( BTreeSet::default(), BTreeSet::from_iter(map_explode(state.relays.iter(), "client")), - ref_state.now, + now, ); c.sut .set_resources(ref_state.client.inner().all_resources()); @@ -271,14 +277,12 @@ impl StateMachineTest for TunnelTest { ipv6, upstream_dns, }); - c.sut - .update_relays(BTreeSet::default(), relays, ref_state.now); + c.sut.update_relays(BTreeSet::default(), relays, now); c.sut.set_resources(all_resources); }); } }; state.advance(ref_state, &mut buffered_transmits); - assert!(buffered_transmits.is_empty()); // Sanity check to ensure we handled all packets. state } @@ -292,6 +296,7 @@ impl StateMachineTest for TunnelTest { .with( tracing_subscriber::fmt::layer() .with_test_writer() + .with_timer(state.flux_capacitor.clone()) .with_filter(EnvFilter::from_default_env()), ) .with(PanicOnErrorEvents::default()) // Temporarily install a layer that panics when `_guard` goes out of scope if any of our assertions emitted an error. @@ -330,15 +335,71 @@ impl TunnelTest { /// Dispatching a [`Transmit`] (read: packet) to a host can trigger more packets, i.e. receiving a STUN request may trigger a STUN response. /// /// Consequently, this function needs to loop until no host can make progress at which point we consider the [`Transition`] complete. + /// + /// At most, we will spend 10s of "simulation time" advancing the state. fn advance(&mut self, ref_state: &ReferenceState, buffered_transmits: &mut BufferedTransmits) { - 'outer: loop { - if let Some(transmit) = buffered_transmits.pop() { - self.dispatch_transmit(transmit, buffered_transmits, &ref_state.global_dns_records); - continue; + let cut_off = self.flux_capacitor.now::() + Duration::from_secs(10); + + 'outer: while self.flux_capacitor.now::() < cut_off { + self.handle_timeout(&ref_state.global_dns_records, buffered_transmits); + let now = self.flux_capacitor.now(); + + for (_, relay) in self.relays.iter_mut() { + let Some(message) = relay.exec_mut(|r| r.sut.next_command()) else { + continue; + }; + + match message { + firezone_relay::Command::SendMessage { payload, recipient } => { + let dst = recipient.into_socket(); + let src = relay + .sending_socket_for(dst.ip()) + .expect("relay to never emit packets without a matching socket"); + + buffered_transmits.push_from( + Transmit { + src: Some(src), + dst, + payload: payload.into(), + }, + relay, + now, + ); + } + + firezone_relay::Command::CreateAllocation { port, family } => { + relay.allocate_port(port.value(), family); + relay.exec_mut(|r| r.allocations.insert((family, port))); + } + firezone_relay::Command::FreeAllocation { port, family } => { + relay.deallocate_port(port.value(), family); + relay.exec_mut(|r| r.allocations.remove(&(family, port))); + } + } + + continue 'outer; + } + + for (_, gateway) in self.gateways.iter_mut() { + let Some(transmit) = gateway.exec_mut(|g| g.sut.poll_transmit()) else { + continue; + }; + + buffered_transmits.push_from(transmit, gateway, now); + continue 'outer; + } + + for (id, gateway) in self.gateways.iter_mut() { + let Some(event) = gateway.exec_mut(|g| g.sut.poll_event()) else { + continue; + }; + + on_gateway_event(*id, event, &mut self.client, now); + continue 'outer; } if let Some(transmit) = self.client.exec_mut(|sim| sim.sut.poll_transmit()) { - buffered_transmits.push(transmit, &self.client); + buffered_transmits.push_from(transmit, &self.client, now); continue; } if let Some(event) = self.client.exec_mut(|c| c.sut.poll_event()) { @@ -360,105 +421,81 @@ impl TunnelTest { } }); - for (_, gateway) in self.gateways.iter_mut() { - let Some(transmit) = gateway.exec_mut(|g| g.sut.poll_transmit()) else { - continue; - }; - - buffered_transmits.push(transmit, gateway); - continue 'outer; - } - - for (id, gateway) in self.gateways.iter_mut() { - let Some(event) = gateway.exec_mut(|g| g.sut.poll_event()) else { - continue; - }; - - on_gateway_event(*id, event, &mut self.client, self.now); - continue 'outer; - } - - for (_, relay) in self.relays.iter_mut() { - let Some(message) = relay.exec_mut(|r| r.sut.next_command()) else { - continue; - }; - - match message { - firezone_relay::Command::SendMessage { payload, recipient } => { - let dst = recipient.into_socket(); - let src = relay - .sending_socket_for(dst.ip()) - .expect("relay to never emit packets without a matching socket"); - - buffered_transmits.push( - Transmit { - src: Some(src), - dst, - payload: payload.into(), - }, - relay, - ); - } - - firezone_relay::Command::CreateAllocation { port, family } => { - relay.allocate_port(port.value(), family); - relay.exec_mut(|r| r.allocations.insert((family, port))); - } - firezone_relay::Command::FreeAllocation { port, family } => { - relay.deallocate_port(port.value(), family); - relay.exec_mut(|r| r.allocations.remove(&(family, port))); - } - } - - continue 'outer; - } - - if self.handle_timeout(self.now, self.utc_now) { + if let Some(transmit) = buffered_transmits.pop(now) { + self.dispatch_transmit(transmit); continue; } - break; + + if !buffered_transmits.is_empty() { + self.flux_capacitor.small_tick(); // Small tick to get to the next transmit. + continue; + } + + let Some(time_to_next_action) = self.poll_timeout() else { + break; // Nothing to do. + }; + + if time_to_next_action > cut_off { + break; // Nothing to do before cut-off. + } + + self.flux_capacitor.large_tick(); // Large tick to more quickly advance to potential next timeout. } } - /// Forwards time to the given instant iff the corresponding host would like that (i.e. returns a timestamp <= from `poll_timeout`). - /// - /// Tying the forwarding of time to the result of `poll_timeout` gives us better coverage because in production, we suspend until the value of `poll_timeout`. - fn handle_timeout(&mut self, now: Instant, utc_now: DateTime) -> bool { - let mut any_advanced = false; + fn handle_timeout( + &mut self, + global_dns_records: &BTreeMap>, + buffered_transmits: &mut BufferedTransmits, + ) { + let now = self.flux_capacitor.now(); - if self - .client - .exec_mut(|c| c.sut.poll_timeout()) - .is_some_and(|t| t <= now) - { - any_advanced = true; - - self.client.exec_mut(|c| c.sut.handle_timeout(now)); - }; + while let Some(transmit) = self.client.poll_transmit(now) { + self.client.exec_mut(|c| c.receive(transmit, now)) + } + self.client.exec_mut(|c| c.sut.handle_timeout(now)); for (_, gateway) in self.gateways.iter_mut() { - if gateway - .exec_mut(|g| g.sut.poll_timeout()) - .is_some_and(|t| t <= now) - { - any_advanced = true; + while let Some(transmit) = gateway.poll_transmit(now) { + let Some(reply) = + gateway.exec_mut(|g| g.receive(global_dns_records, transmit, now)) + else { + continue; + }; - gateway.exec_mut(|g| g.sut.handle_timeout(now, utc_now)) - }; + buffered_transmits.push_from(reply, gateway, now); + } + + gateway.exec_mut(|g| g.sut.handle_timeout(now, self.flux_capacitor.now())); } for (_, relay) in self.relays.iter_mut() { - if relay - .exec_mut(|r| r.sut.poll_timeout()) - .is_some_and(|t| t <= now) - { - any_advanced = true; + while let Some(transmit) = relay.poll_transmit(now) { + let Some(reply) = relay.exec_mut(|g| g.receive(transmit, now)) else { + continue; + }; - relay.exec_mut(|r| r.sut.handle_timeout(now)) - }; + buffered_transmits.push_from(reply, relay, now); + } + + relay.exec_mut(|r| r.sut.handle_timeout(now)) } + } - any_advanced + fn poll_timeout(&mut self) -> Option { + let client = self.client.exec_mut(|c| c.sut.poll_timeout()); + let gateway = self + .gateways + .values_mut() + .flat_map(|g| g.exec_mut(|g| g.sut.poll_timeout())) + .min(); + let relay = self + .relays + .values_mut() + .flat_map(|r| r.exec_mut(|r| r.sut.poll_timeout())) + .min(); + + earliest(client, earliest(gateway, relay)) } /// Dispatches a [`Transmit`] to the correct host. @@ -467,17 +504,12 @@ impl TunnelTest { /// It takes a [`Transmit`] and checks, which host accepts it, i.e. has configured the correct IP address. /// /// Currently, the network topology of our tests are a single subnet without NAT. - fn dispatch_transmit( - &mut self, - transmit: Transmit, - buffered_transmits: &mut BufferedTransmits, - global_dns_records: &BTreeMap>, - ) { + fn dispatch_transmit(&mut self, transmit: Transmit<'static>) { let src = transmit .src .expect("`src` should always be set in these tests"); let dst = transmit.dst; - let payload = &transmit.payload; + let now = self.flux_capacitor.now(); let Some(host) = self.network.host_by_ip(dst.ip()) else { panic!("Unhandled packet: {src} -> {dst}") @@ -488,41 +520,30 @@ impl TunnelTest { if self.drop_direct_client_traffic && self.gateways.values().any(|g| g.is_sender(src.ip())) { - tracing::debug!(%src, %dst, "Dropping direct traffic"); + tracing::trace!(%src, %dst, "Dropping direct traffic"); return; } - self.client - .exec_mut(|c| c.handle_packet(payload, src, dst, self.now)); + self.client.receive(transmit, now); } HostId::Gateway(id) => { if self.drop_direct_client_traffic && self.client.is_sender(src.ip()) { - tracing::debug!(%src, %dst, "Dropping direct traffic"); + tracing::trace!(%src, %dst, "Dropping direct traffic"); return; } - let gateway = self.gateways.get_mut(&id).expect("unknown gateway"); - - let Some(transmit) = gateway - .exec_mut(|g| g.handle_packet(global_dns_records, payload, src, dst, self.now)) - else { - return; - }; - - buffered_transmits.push(transmit, gateway); + self.gateways + .get_mut(&id) + .expect("unknown gateway") + .receive(transmit, now); } HostId::Relay(id) => { - let relay = self.relays.get_mut(&id).expect("unknown relay"); - - let Some(transmit) = - relay.exec_mut(|r| r.handle_packet(payload, src, dst, self.now)) - else { - return; - }; - - buffered_transmits.push(transmit, relay); + self.relays + .get_mut(&id) + .expect("unknown relay") + .receive(transmit, now); } HostId::Stale => { tracing::debug!(%dst, "Dropping packet because host roamed away or is offline"); @@ -537,6 +558,8 @@ impl TunnelTest { portal: &StubPortal, global_dns_records: &BTreeMap>, ) { + let now = self.flux_capacitor.now(); + match event { ClientEvent::AddedIceCandidates { candidates, @@ -546,7 +569,7 @@ impl TunnelTest { gateway.exec_mut(|g| { for candidate in candidates { - g.sut.add_ice_candidate(src, candidate, self.now) + g.sut.add_ice_candidate(src, candidate, now) } }) } @@ -626,7 +649,7 @@ impl TunnelTest { .map(|r| (r.name, r.proxy_ips)), None, // TODO: How to generate expiry? resource, - self.now, + now, ) }) .unwrap(); @@ -642,7 +665,7 @@ impl TunnelTest { }, resource_id, gateway.inner().sut.public_key(), - self.now, + now, ) }) .unwrap(); @@ -660,7 +683,7 @@ impl TunnelTest { self.client.inner().id, None, reuse_connection.payload.map(|r| (r.name, r.proxy_ips)), - self.now, + now, ) }) .unwrap(); @@ -696,7 +719,7 @@ impl TunnelTest { self.client.inner().id, None, reuse_connection.payload.map(|r| (r.name, r.proxy_ips)), - self.now, + now, ) }) .unwrap(); @@ -790,44 +813,3 @@ pub(crate) fn domain_to_hickory_name(domain: DomainName) -> hickory_proto::rr::N name } - -#[derive(Debug, Default)] -struct BufferedTransmits { - inner: VecDeque>, -} - -impl BufferedTransmits { - fn push(&mut self, transmit: impl Into>>, sending_host: &Host) { - let Some(transmit) = transmit.into() else { - return; - }; - - if transmit.src.is_some() { - self.inner.push_back(transmit); - return; - } - - // The `src` of a [`Transmit`] is empty if we want to send if via the default interface. - // In production, the kernel does this for us. - // In this test, we need to always set a `src` so that the remote peer knows where the packet is coming from. - - let Some(src) = sending_host.sending_socket_for(transmit.dst.ip()) else { - tracing::debug!(dst = %transmit.dst, "No socket"); - - return; - }; - - self.inner.push_back(Transmit { - src: Some(src), - ..transmit - }); - } - - fn pop(&mut self) -> Option> { - self.inner.pop_front() - } - - fn is_empty(&self) -> bool { - self.inner.is_empty() - } -}