diff --git a/.github/actions/setup-rust/action.yml b/.github/actions/setup-rust/action.yml index c7910a3c1..d0288137c 100644 --- a/.github/actions/setup-rust/action.yml +++ b/.github/actions/setup-rust/action.yml @@ -92,6 +92,7 @@ runs: NIGHTLY="nightly-2024-12-13" rustup toolchain install $NIGHTLY + rustup component add rust-src --toolchain $NIGHTLY echo "nightly=$NIGHTLY" >> $GITHUB_OUTPUT shell: bash diff --git a/.github/workflows/_build_artifacts.yml b/.github/workflows/_build_artifacts.yml index aad4a972d..dc73792c2 100644 --- a/.github/workflows/_build_artifacts.yml +++ b/.github/workflows/_build_artifacts.yml @@ -264,6 +264,11 @@ jobs: targets: ${{ matrix.arch.target }} - name: Install dependencies run: ${{ matrix.arch.install_dependencies }} + - uses: taiki-e/install-action@0b63bc859f7224657cf7e39426848cabaa36f456 # v2.49.9 + with: + tool: bpf-linker + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - name: Build binaries run: | set -xe diff --git a/.github/workflows/_rust.yml b/.github/workflows/_rust.yml index c8f0b4a3a..6dc6bec57 100644 --- a/.github/workflows/_rust.yml +++ b/.github/workflows/_rust.yml @@ -55,6 +55,12 @@ jobs: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} with: tool: cargo-udeps,cargo-deny + - uses: taiki-e/install-action@0b63bc859f7224657cf7e39426848cabaa36f456 # v2.49.9 + if: ${{ runner.os == 'Linux' }} + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + tool: bpf-linker - run: | cargo +${{ steps.setup-rust.outputs.nightly_version }} udeps --all-targets --all-features ${{ steps.setup-rust.outputs.packages }} name: Check for unused dependencies @@ -67,6 +73,7 @@ jobs: shell: bash - run: cargo deny check --hide-inclusion-graph shell: bash + test: name: test-${{ matrix.runs-on }} strategy: @@ -94,6 +101,12 @@ jobs: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} with: tool: ripgrep + - uses: taiki-e/install-action@0b63bc859f7224657cf7e39426848cabaa36f456 # v2.49.9 + if: ${{ runner.os == 'Linux' }} + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + tool: bpf-linker - name: "cargo test" shell: bash run: | diff --git a/rust/Cargo.lock b/rust/Cargo.lock index aa2dc3705..730cc2f75 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -207,6 +207,12 @@ dependencies = [ "zbus 5.5.0", ] +[[package]] +name = "assert_matches" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" + [[package]] name = "async-broadcast" version = "0.7.1" @@ -469,6 +475,131 @@ dependencies = [ "tower-service", ] +[[package]] +name = "aya" +version = "0.13.1" +source = "git+https://github.com/aya-rs/aya#8724cc1b2d61a006f74bc0aab77a5df6dd6b60f3" +dependencies = [ + "assert_matches", + "aya-obj", + "bitflags 2.6.0", + "bytes", + "hashbrown 0.15.2", + "libc", + "log", + "object", + "once_cell", + "thiserror 2.0.6", + "tokio", +] + +[[package]] +name = "aya-build" +version = "0.1.2" +source = "git+https://github.com/thomaseizinger/aya?branch=fix%2Faya-build-pin-nightly#c766051000cc41e3409477a8fec8547acc4d72fd" +dependencies = [ + "anyhow", + "cargo_metadata", +] + +[[package]] +name = "aya-ebpf" +version = "0.1.1" +source = "git+https://github.com/aya-rs/aya#8724cc1b2d61a006f74bc0aab77a5df6dd6b60f3" +dependencies = [ + "aya-ebpf-bindings", + "aya-ebpf-cty", + "aya-ebpf-macros", + "rustversion", +] + +[[package]] +name = "aya-ebpf-bindings" +version = "0.1.1" +source = "git+https://github.com/aya-rs/aya#8724cc1b2d61a006f74bc0aab77a5df6dd6b60f3" +dependencies = [ + "aya-ebpf-cty", +] + +[[package]] +name = "aya-ebpf-cty" +version = "0.2.2" +source = "git+https://github.com/aya-rs/aya#8724cc1b2d61a006f74bc0aab77a5df6dd6b60f3" + +[[package]] +name = "aya-ebpf-macros" +version = "0.1.1" +source = "git+https://github.com/aya-rs/aya#8724cc1b2d61a006f74bc0aab77a5df6dd6b60f3" +dependencies = [ + "proc-macro2", + "proc-macro2-diagnostics", + "quote", + "syn 2.0.87", +] + +[[package]] +name = "aya-log" +version = "0.2.1" +source = "git+https://github.com/aya-rs/aya#8724cc1b2d61a006f74bc0aab77a5df6dd6b60f3" +dependencies = [ + "aya", + "aya-log-common", + "bytes", + "log", + "thiserror 2.0.6", + "tokio", +] + +[[package]] +name = "aya-log-common" +version = "0.1.15" +source = "git+https://github.com/aya-rs/aya#8724cc1b2d61a006f74bc0aab77a5df6dd6b60f3" +dependencies = [ + "num_enum", +] + +[[package]] +name = "aya-log-ebpf" +version = "0.1.1" +source = "git+https://github.com/aya-rs/aya#8724cc1b2d61a006f74bc0aab77a5df6dd6b60f3" +dependencies = [ + "aya-ebpf", + "aya-log-common", + "aya-log-ebpf-macros", +] + +[[package]] +name = "aya-log-ebpf-macros" +version = "0.1.0" +source = "git+https://github.com/aya-rs/aya#8724cc1b2d61a006f74bc0aab77a5df6dd6b60f3" +dependencies = [ + "aya-log-common", + "aya-log-parser", + "proc-macro2", + "quote", + "syn 2.0.87", +] + +[[package]] +name = "aya-log-parser" +version = "0.1.13" +source = "git+https://github.com/aya-rs/aya#8724cc1b2d61a006f74bc0aab77a5df6dd6b60f3" +dependencies = [ + "aya-log-common", +] + +[[package]] +name = "aya-obj" +version = "0.2.1" +source = "git+https://github.com/aya-rs/aya#8724cc1b2d61a006f74bc0aab77a5df6dd6b60f3" +dependencies = [ + "bytes", + "hashbrown 0.15.2", + "log", + "object", + "thiserror 2.0.6", +] + [[package]] name = "backoff" version = "0.4.0" @@ -1748,6 +1879,27 @@ version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0d6ef0072f8a535281e4876be788938b528e9a1d43900b82c2569af7da799125" +[[package]] +name = "ebpf-shared" +version = "0.1.0" +dependencies = [ + "aya", +] + +[[package]] +name = "ebpf-turn-router" +version = "0.1.0" +dependencies = [ + "aya-ebpf", + "aya-log-ebpf", + "ebpf-shared", + "etherparse", + "etherparse-ext", + "hex-literal", + "ip-packet", + "which", +] + [[package]] name = "either" version = "1.15.0" @@ -2183,6 +2335,9 @@ name = "firezone-relay" version = "0.1.0" dependencies = [ "anyhow", + "aya", + "aya-build", + "aya-log", "backoff", "base64 0.22.1", "bytecodec", @@ -2190,6 +2345,7 @@ dependencies = [ "clap", "derive_more 1.0.0", "difference", + "ebpf-shared", "env_logger", "firezone-bin-shared", "firezone-logging", @@ -4322,6 +4478,9 @@ version = "0.36.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aedf0a2d09c573ed1d8d85b30c119153926a2b36dce0ab28322c09a117a4683e" dependencies = [ + "crc32fast", + "hashbrown 0.15.2", + "indexmap 2.6.0", "memchr", ] @@ -4919,6 +5078,18 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "proc-macro2-diagnostics" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af066a9c399a26e020ada66a034357a868728e72cd426f3adcd35f80d88d88c8" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", + "version_check", +] + [[package]] name = "proptest" version = "1.6.0" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 5a9fc9076..8be346557 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -19,7 +19,9 @@ members = [ "ip-packet", "logging", "phoenix-channel", - "relay", + "relay/ebpf-shared", + "relay/ebpf-turn-router", + "relay/server", "socket-factory", "telemetry", "tests/gui-smoke-test", @@ -39,6 +41,11 @@ arboard = { version = "3.4.0", default-features = false } async-trait = { version = "0.1", default-features = false } atomicwrites = "0.4.4" axum = { version = "0.7.7", default-features = false } +aya = { git = "https://github.com/aya-rs/aya" } +aya-build = { git = "https://github.com/thomaseizinger/aya", branch = "fix/aya-build-pin-nightly" } +aya-ebpf = { git = "https://github.com/aya-rs/aya" } +aya-log = { git = "https://github.com/aya-rs/aya" } +aya-log-ebpf = { git = "https://github.com/aya-rs/aya" } backoff = { version = "0.4", features = ["tokio"] } base64 = { version = "0.22.1", default-features = false } bimap = "0.6" @@ -67,7 +74,8 @@ firezone-bin-shared = { path = "bin-shared" } firezone-gui-client-common = { path = "gui-client/src-common" } firezone-headless-client = { path = "headless-client" } firezone-logging = { path = "logging" } -firezone-relay = { path = "relay" } +firezone-relay = { path = "relay/server" } +ebpf-shared = { path = "relay/ebpf-shared" } firezone-telemetry = { path = "telemetry" } firezone-tunnel = { path = "connlib/tunnel" } flume = { version = "0.11.1", features = ["async"] } @@ -173,6 +181,7 @@ url = "2.5.2" uuid = "1.16.0" windows = "0.58.0" winreg = "0.52.0" +which = "4.4.2" zbus = "5.5.0" zip = { version = "2", default-features = false } @@ -220,3 +229,7 @@ split-debuginfo = "packed" [profile.release.package.firezone-gui-client] debug = "full" split-debuginfo = "packed" + +[profile.release.package.ebpf-turn-router] +debug = 2 +codegen-units = 1 diff --git a/rust/connlib/tunnel/src/tests/sut.rs b/rust/connlib/tunnel/src/tests/sut.rs index af1039b06..06bf4674a 100644 --- a/rust/connlib/tunnel/src/tests/sut.rs +++ b/rust/connlib/tunnel/src/tests/sut.rs @@ -474,6 +474,8 @@ impl TunnelTest { relay.deallocate_port(port.value(), family); relay.exec_mut(|r| r.allocations.remove(&(family, port))); } + firezone_relay::Command::CreateChannelBinding { .. } => {} + firezone_relay::Command::DeleteChannelBinding { .. } => {} } continue 'outer; diff --git a/rust/relay/ebpf-shared/Cargo.toml b/rust/relay/ebpf-shared/Cargo.toml new file mode 100644 index 000000000..979b4e164 --- /dev/null +++ b/rust/relay/ebpf-shared/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "ebpf-shared" +version = "0.1.0" +edition = { workspace = true } +license = { workspace = true } + +[features] +std = ["aya"] + +[target.'cfg(target_os = "linux")'.dependencies] +aya = { workspace = true, optional = true } diff --git a/rust/relay/ebpf-shared/src/lib.rs b/rust/relay/ebpf-shared/src/lib.rs new file mode 100644 index 000000000..097954388 --- /dev/null +++ b/rust/relay/ebpf-shared/src/lib.rs @@ -0,0 +1,127 @@ +//! Shared data structures between the kernel and userspace. +//! +//! To learn more about the layout requirements of these structs, read . + +#![cfg_attr(not(feature = "std"), no_std)] + +#[repr(C)] +#[derive(Clone, Copy)] +#[cfg_attr(feature = "std", derive(Debug))] +pub struct ClientAndChannelV4 { + ipv4_address: [u8; 4], + _padding_ipv4_address: [u8; 4], + + port: u16, + _padding_port: [u8; 6], + + channel: u16, + _padding_channel: [u8; 6], + + _padding_struct: [u8; 40], +} + +impl ClientAndChannelV4 { + pub fn new(ipv4_address: [u8; 4], port: u16, channel: u16) -> Self { + Self { + ipv4_address, + _padding_ipv4_address: [0u8; 4], + + port: port.to_be(), + _padding_port: [0u8; 6], + + channel: channel.to_be(), + _padding_channel: [0u8; 6], + + _padding_struct: [0u8; 40], + } + } + + pub fn from_socket(src: core::net::SocketAddrV4, channel: u16) -> Self { + Self::new(src.ip().octets(), src.port(), channel) + } + + pub fn client_ip(&self) -> [u8; 4] { + self.ipv4_address + } + + pub fn client_port(&self) -> u16 { + self.port + } + + pub fn channel(&self) -> u16 { + self.channel + } +} + +#[repr(C)] +#[derive(Clone, Copy)] +#[cfg_attr(feature = "std", derive(Debug))] +pub struct PortAndPeerV4 { + ipv4_address: [u8; 4], + _padding_ipv4_address: [u8; 4], + + allocation_port: u16, + _padding_allocation_port: [u8; 6], + + peer_port: u16, + _padding_dest_port: [u8; 6], +} + +impl PortAndPeerV4 { + pub fn new(ipv4_address: [u8; 4], allocation_port: u16, peer_port: u16) -> Self { + Self { + ipv4_address, + _padding_ipv4_address: [0u8; 4], + + allocation_port, + _padding_allocation_port: [0u8; 6], + + peer_port, + _padding_dest_port: [0u8; 6], + } + } + + pub fn from_socket(dst: core::net::SocketAddrV4, allocation_port: u16) -> Self { + Self::new(dst.ip().octets(), allocation_port, dst.port()) + } + + pub fn peer_ip(&self) -> [u8; 4] { + self.ipv4_address + } + + pub fn allocation_port(&self) -> u16 { + self.allocation_port + } + + pub fn peer_port(&self) -> u16 { + self.peer_port + } +} + +#[repr(C)] +#[derive(Clone, Copy, Default)] +#[cfg_attr(feature = "std", derive(Debug))] +pub struct Config { + pub udp_checksum_enabled: bool, +} + +#[cfg(all(feature = "std", target_os = "linux"))] +mod userspace { + use super::*; + + unsafe impl aya::Pod for ClientAndChannelV4 {} + + unsafe impl aya::Pod for PortAndPeerV4 {} + + unsafe impl aya::Pod for Config {} +} + +#[cfg(all(test, feature = "std"))] +mod tests { + use super::*; + + #[test] + fn client_and_channel_has_size_64() { + assert_eq!(std::mem::size_of::(), 64) + } +} diff --git a/rust/relay/ebpf-turn-router/Cargo.toml b/rust/relay/ebpf-turn-router/Cargo.toml new file mode 100644 index 000000000..0c9c16c7d --- /dev/null +++ b/rust/relay/ebpf-turn-router/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "ebpf-turn-router" +version = "0.1.0" +edition = { workspace = true } +license = { workspace = true } + +[dependencies] +aya-ebpf = { workspace = true } +aya-log-ebpf = { workspace = true } +ebpf-shared = { workspace = true } +etherparse = { workspace = true } +etherparse-ext = { workspace = true } + +[dev-dependencies] +hex-literal = { workspace = true } +ip-packet = { workspace = true } + +[build-dependencies] +which = { workspace = true } + +[[bin]] +name = "ebpf-turn-router-main" # This needs to be different from the package name otherwise the build-script fails to differentiate between the directory it is built in and the actual binary. +path = "src/main.rs" diff --git a/rust/relay/ebpf-turn-router/build.rs b/rust/relay/ebpf-turn-router/build.rs new file mode 100644 index 000000000..3655e0aba --- /dev/null +++ b/rust/relay/ebpf-turn-router/build.rs @@ -0,0 +1,17 @@ +use which::which; + +/// Building this crate has an undeclared dependency on the `bpf-linker` binary. This would be +/// better expressed by [artifact-dependencies][bindeps] but issues such as +/// https://github.com/rust-lang/cargo/issues/12385 make their use impractical for the time being. +/// +/// This file implements an imperfect solution: it causes cargo to rebuild the crate whenever the +/// mtime of `which bpf-linker` changes. Note that possibility that a new bpf-linker is added to +/// $PATH ahead of the one used as the cache key still exists. Solving this in the general case +/// would require rebuild-if-changed-env=PATH *and* rebuild-if-changed={every-directory-in-PATH} +/// which would likely mean far too much cache invalidation. +/// +/// [bindeps]: https://doc.rust-lang.org/nightly/cargo/reference/unstable.html?highlight=feature#artifact-dependencies +fn main() { + let bpf_linker = which("bpf-linker").expect("bpf-linker not found in $PATH"); + println!("cargo:rerun-if-changed={}", bpf_linker.to_str().unwrap()); +} diff --git a/rust/relay/ebpf-turn-router/src/channel_data.rs b/rust/relay/ebpf-turn-router/src/channel_data.rs new file mode 100644 index 000000000..a1cef8dba --- /dev/null +++ b/rust/relay/ebpf-turn-router/src/channel_data.rs @@ -0,0 +1,64 @@ +use crate::{CHANNEL_DATA_HEADER_LEN, Error, slice_mut_at::slice_mut_at}; +use aya_ebpf::programs::XdpContext; +use etherparse::{Ethernet2Header, Ipv4Header, UdpHeader}; + +/// Represents a channel-data header within our packet. +pub struct ChannelData<'a> { + number: u16, + + _ctx: &'a XdpContext, +} + +impl<'a> ChannelData<'a> { + pub fn parse(ctx: &'a XdpContext) -> Result { + let cdhdr = slice_mut_at::<{ CHANNEL_DATA_HEADER_LEN }>( + ctx, + Ethernet2Header::LEN + Ipv4Header::MIN_LEN + UdpHeader::LEN, + )?; + + if !(64..=79).contains(&cdhdr[0]) { + return Err(Error::NotAChannelDataMessage); + } + + let number = u16::from_be_bytes([cdhdr[0], cdhdr[1]]); + + // Untrusted because we read it from the packet. + let untrusted_channel_data_length = u16::from_be_bytes([cdhdr[2], cdhdr[3]]); + + // The eBPF verifier doesn't allow you to use data read from the packet to index into the packet. + // Hence, we cannot read the length of the channel-data message and use it on the packet. + // So what we do instead is, we check how many bytes we have left in the packet and compare it to the length we read. + let length = remaining_bytes( + ctx, + Ethernet2Header::LEN + Ipv4Header::MIN_LEN + UdpHeader::LEN + CHANNEL_DATA_HEADER_LEN, + )?; + + // We received less (or more) data than the header said we would. + if length != usize::from(untrusted_channel_data_length) { + return Err(Error::BadChannelDataLength); + } + + // After we have verified the length, we don't need it anymore. + + Ok(Self { number, _ctx: ctx }) + } + + pub fn number(&self) -> u16 { + self.number + } +} + +/// Computes how many more bytes we have in the packet after a given offset. +#[inline(always)] +fn remaining_bytes(ctx: &XdpContext, offset: usize) -> Result { + let start = ctx.data() + offset; + let end = ctx.data_end(); + + if start > end { + return Err(Error::PacketTooShort); + } + + let len = end - start; + + Ok(len) +} diff --git a/rust/relay/ebpf-turn-router/src/checksum.rs b/rust/relay/ebpf-turn-router/src/checksum.rs new file mode 100644 index 000000000..f79accd3e --- /dev/null +++ b/rust/relay/ebpf-turn-router/src/checksum.rs @@ -0,0 +1,131 @@ +//! Incremental updates to Internet checksums. +//! +//! The Internet checksum is the one's complement of the one's complement sum of certain 16-bit words. +//! The use of one's complement arithmetic allows us to make incremental updates to a checksum without requiring a full re-computation. +//! +//! That is what we are implementing in this module. +//! There are three things you need to know: +//! +//! 1. The one's complement of a number `x` is `!x`. +//! 2. Addition in one's complement arithmetic is the same as regular addition, except that upon overflow, we add an additional bit. +//! 3. Subtraction in one's complement arithmetic is implemented as the addition of the one's complement of the number to be subtracted. +//! +//! This allows us to e.g. take an existing IP header checksum and update it to account for just the destination address changing. + +#[derive(Default)] +pub struct ChecksumUpdate { + inner: u16, +} + +impl ChecksumUpdate { + pub fn new(checksum: u16) -> Self { + Self { inner: !checksum } + } + + pub fn remove_u16(self, val: u16) -> Self { + self.ones_complement_add(!val) + } + + pub fn remove_addr(self, val: [u8; 4]) -> Self { + self.remove_u16(fold_u32_into_u16(u32::from_be_bytes(val))) + } + + pub fn add_u16(self, val: u16) -> Self { + self.ones_complement_add(val) + } + + pub fn add_addr(self, val: [u8; 4]) -> Self { + self.add_u16(fold_u32_into_u16(u32::from_be_bytes(val))) + } + + pub fn add_update(self, update: ChecksumUpdate) -> Self { + self.add_u16(update.inner) + } + + fn ones_complement_add(self, val: u16) -> Self { + let (res, carry) = self.inner.overflowing_add(val); + + Self { + inner: res + (carry as u16), + } + } + + pub fn into_checksum(self) -> u16 { + !self.inner + } +} + +#[inline(always)] +fn fold_u32_into_u16(mut csum: u32) -> u16 { + csum = (csum & 0xffff) + (csum >> 16); + csum = (csum & 0xffff) + (csum >> 16); + + csum as u16 +} + +#[cfg(test)] +mod tests { + use core::net::Ipv4Addr; + + use super::*; + + #[test] + fn recompute_udp_checksum() { + let old_src_ip = Ipv4Addr::new(172, 28, 0, 100); + let old_dst_ip = Ipv4Addr::new(172, 28, 0, 101); + let old_src_port = 45088; + let old_dst_port = 3478; + let old_udp_payload = hex_literal::hex!( + "400100400101002c2112a44293d108418ca2a8fdf7e8930e002000080001b6c58d0ea42b00080014019672bb752bf292ecf95b498b8b4797eacef51d80280004a057ff1e" + ); + let channel_number = 0x4001; + let channel_data_len = 0x0040; + + let incoming_ip_packet = ip_packet::make::udp_packet( + old_src_ip, + old_dst_ip, + old_src_port, + old_dst_port, + old_udp_payload.to_vec(), + ) + .unwrap(); + let incoming_checksum = incoming_ip_packet.as_udp().unwrap().checksum(); + + let new_src_ip = Ipv4Addr::new(172, 28, 0, 101); + let new_dst_ip = Ipv4Addr::new(172, 28, 0, 105); + let new_src_port = 4324; + let new_dst_port = 59385; + let new_udp_payload = hex_literal::hex!( + "0101002c2112a44293d108418ca2a8fdf7e8930e002000080001b6c58d0ea42b00080014019672bb752bf292ecf95b498b8b4797eacef51d80280004a057ff1e" + ); + + let outgoing_ip_packet = ip_packet::make::udp_packet( + new_src_ip, + new_dst_ip, + new_src_port, + new_dst_port, + new_udp_payload.to_vec(), + ) + .unwrap(); + let outgoing_checksum = outgoing_ip_packet.as_udp().unwrap().checksum(); + + let computed_checksum = ChecksumUpdate::new(incoming_checksum) + .remove_addr(old_src_ip.octets()) + .remove_addr(old_dst_ip.octets()) + .remove_u16(old_src_port) + .remove_u16(old_dst_port) + .remove_u16(old_udp_payload.len() as u16) + .remove_u16(old_udp_payload.len() as u16) + .remove_u16(channel_number) + .remove_u16(channel_data_len) + .add_addr(new_src_ip.octets()) + .add_addr(new_dst_ip.octets()) + .add_u16(new_src_port) + .add_u16(new_dst_port) + .add_u16(new_udp_payload.len() as u16) + .add_u16(new_udp_payload.len() as u16) + .into_checksum(); + + assert_eq!(computed_checksum, outgoing_checksum) + } +} diff --git a/rust/relay/ebpf-turn-router/src/config.rs b/rust/relay/ebpf-turn-router/src/config.rs new file mode 100644 index 000000000..88bbb2ca6 --- /dev/null +++ b/rust/relay/ebpf-turn-router/src/config.rs @@ -0,0 +1,12 @@ +use aya_ebpf::{macros::map, maps::Array}; +use ebpf_shared::Config; + +/// Dynamic configuration of the eBPF program. +#[map] +static CONFIG: Array = Array::with_max_entries(1, 0); + +pub fn udp_checksum_enabled() -> bool { + CONFIG + .get(0) + .is_some_and(|config| config.udp_checksum_enabled) +} diff --git a/rust/relay/ebpf-turn-router/src/error.rs b/rust/relay/ebpf-turn-router/src/error.rs new file mode 100644 index 000000000..d99151b87 --- /dev/null +++ b/rust/relay/ebpf-turn-router/src/error.rs @@ -0,0 +1,46 @@ +use core::num::NonZeroUsize; + +use aya_ebpf::bindings::xdp_action; + +#[derive(Debug, Clone, Copy)] +pub enum Error { + ParseEthernet2Header, + ParseIpv4Header, + ParseUdpHeader, + PacketTooShort, + Ipv4PacketWithOptions, + NotAChannelDataMessage, + BadChannelDataLength, +} + +impl Error { + pub fn xdp_action(&self) -> xdp_action::Type { + match self { + Error::ParseEthernet2Header => xdp_action::XDP_PASS, + Error::ParseIpv4Header => xdp_action::XDP_PASS, + Error::ParseUdpHeader => xdp_action::XDP_PASS, + Error::PacketTooShort => xdp_action::XDP_PASS, + Error::Ipv4PacketWithOptions => xdp_action::XDP_PASS, + Error::BadChannelDataLength => xdp_action::XDP_DROP, + Error::NotAChannelDataMessage => xdp_action::XDP_PASS, + } + } +} + +impl aya_log_ebpf::WriteToBuf for Error { + fn write(self, buf: &mut [u8]) -> Option { + let msg = match self { + Error::ParseEthernet2Header => "Failed to parse Ethernet2 header", + Error::ParseIpv4Header => "Failed to parse IPv4 header", + Error::ParseUdpHeader => "Failed to parse UDP header", + Error::PacketTooShort => "Packet is too short", + Error::Ipv4PacketWithOptions => "IPv4 packet has optiosn", + Error::NotAChannelDataMessage => "Not a channel data message", + Error::BadChannelDataLength => "Channel data length does not match packet length", + }; + + msg.write(buf) + } +} + +impl aya_log_ebpf::macro_support::DefaultFormatter for Error {} diff --git a/rust/relay/ebpf-turn-router/src/eth2.rs b/rust/relay/ebpf-turn-router/src/eth2.rs new file mode 100644 index 000000000..86c5b8272 --- /dev/null +++ b/rust/relay/ebpf-turn-router/src/eth2.rs @@ -0,0 +1,24 @@ +use aya_ebpf::programs::XdpContext; +use etherparse::{EtherType, Ethernet2Header, Ethernet2HeaderSlice}; + +use crate::{error::Error, slice_mut_at}; + +pub struct Eth2 { + ty: EtherType, +} + +impl Eth2 { + pub fn parse(ctx: &XdpContext) -> Result { + let slice = slice_mut_at::<{ Ethernet2Header::LEN }>(ctx, 0)?; + let eth = + Ethernet2HeaderSlice::from_slice(slice).map_err(|_| Error::ParseEthernet2Header)?; + + Ok(Self { + ty: eth.ether_type(), + }) + } + + pub fn ether_type(&self) -> EtherType { + self.ty + } +} diff --git a/rust/relay/ebpf-turn-router/src/ip4.rs b/rust/relay/ebpf-turn-router/src/ip4.rs new file mode 100644 index 000000000..45523aadf --- /dev/null +++ b/rust/relay/ebpf-turn-router/src/ip4.rs @@ -0,0 +1,99 @@ +use crate::{Error, checksum::ChecksumUpdate, slice_mut_at::slice_mut_at}; +use aya_ebpf::programs::XdpContext; +use aya_log_ebpf::debug; +use etherparse::{Ethernet2Header, IpNumber, Ipv4Header, Ipv4HeaderSlice}; +use etherparse_ext::Ipv4HeaderSliceMut; + +/// Represents an IPv4 header within our packet. +pub struct Ip4<'a> { + src: [u8; 4], + dst: [u8; 4], + protocol: IpNumber, + checksum: u16, + total_len: u16, + + ctx: &'a XdpContext, + + /// Mutable slice of the IPv4 header, allows us to modify the header in-place. + slice_mut: Ipv4HeaderSliceMut<'a>, +} + +impl<'a> Ip4<'a> { + pub fn parse(ctx: &'a XdpContext) -> Result { + let slice_mut = slice_mut_at::<{ Ipv4Header::MIN_LEN }>(ctx, Ethernet2Header::LEN)?; + let ipv4_slice = + Ipv4HeaderSlice::from_slice(slice_mut).map_err(|_| Error::ParseIpv4Header)?; + + // IPv4 packets with options are handled in user-space. + if usize::from(ipv4_slice.ihl() * 4) != Ipv4Header::MIN_LEN { + return Err(Error::Ipv4PacketWithOptions); + } + + Ok(Self { + ctx, + src: ipv4_slice.source(), + dst: ipv4_slice.destination(), + protocol: ipv4_slice.protocol(), + checksum: ipv4_slice.header_checksum(), + total_len: ipv4_slice.total_len(), + slice_mut: { + // SAFETY: We parsed the slice as an IPv4 header above. + unsafe { Ipv4HeaderSliceMut::from_slice_unchecked(slice_mut) } + }, + }) + } + + pub fn src(&self) -> [u8; 4] { + self.src + } + + pub fn dst(&self) -> [u8; 4] { + self.dst + } + + pub fn protocol(&self) -> IpNumber { + self.protocol + } + + pub fn total_len(&self) -> u16 { + self.total_len + } + + /// Update this packet with a new source, destination, and total length. + /// + /// Returns a [`ChecksumUpdate`] representing the checksum-difference of the "IP pseudo-header." + /// which is used in certain L4 protocols (e.g. UDP). + pub fn update(mut self, new_src: [u8; 4], new_dst: [u8; 4], new_len: u16) -> ChecksumUpdate { + self.slice_mut.set_source(new_src); + self.slice_mut.set_destination(new_dst); + self.slice_mut.set_total_length(new_len.to_be_bytes()); + + let ip_pseudo_header = ChecksumUpdate::default() + .remove_addr(self.src) + .add_addr(new_src) + .remove_addr(self.dst) + .add_addr(new_dst); + + self.slice_mut.set_checksum( + ChecksumUpdate::new(self.checksum) + .remove_addr(self.src) + .add_addr(new_src) + .remove_addr(self.dst) + .add_addr(new_dst) + .remove_u16(self.total_len) + .add_u16(new_len) + .into_checksum(), + ); + + debug!( + self.ctx, + "IP4 header update: src {:i} -> {:i}; dst {:i} -> {:i}", + self.src, + new_src, + self.dst, + new_dst, + ); + + ip_pseudo_header + } +} diff --git a/rust/relay/ebpf-turn-router/src/main.rs b/rust/relay/ebpf-turn-router/src/main.rs new file mode 100644 index 000000000..8f658eb1c --- /dev/null +++ b/rust/relay/ebpf-turn-router/src/main.rs @@ -0,0 +1,152 @@ +#![cfg_attr(target_arch = "bpf", no_std)] +#![cfg_attr(target_arch = "bpf", no_main)] + +use crate::error::Error; +use crate::slice_mut_at::slice_mut_at; +use aya_ebpf::{ + bindings::xdp_action, + macros::{map, xdp}, + maps::HashMap, + programs::XdpContext, +}; +use aya_log_ebpf::*; +use channel_data::ChannelData; +use ebpf_shared::{ClientAndChannelV4, PortAndPeerV4}; +use eth2::Eth2; +use etherparse::{EtherType, IpNumber}; +use ip4::Ip4; +use move_headers::remove_channel_data_header_ipv4; +use udp::Udp; + +mod channel_data; +mod checksum; +mod config; +mod error; +mod eth2; +mod ip4; +mod move_headers; +mod slice_mut_at; +mod udp; + +pub const CHANNEL_DATA_HEADER_LEN: usize = 4; + +/// Channel mappings from an IPv4 socket + channel number to an IPv4 socket + port. +/// +/// TODO: Update flags to `BPF_F_NO_PREALLOC` to guarantee atomicity? Needs research. +#[map] +static CHAN_TO_UDP_44: HashMap = + HashMap::with_max_entries(0x100000, 0); +#[map] +static UDP_TO_CHAN_44: HashMap = + HashMap::with_max_entries(0x100000, 0); + +#[xdp] +pub fn handle_turn(ctx: XdpContext) -> u32 { + try_handle_turn(&ctx).unwrap_or_else(|e| { + let action = e.xdp_action(); + + debug!(&ctx, "Failed to handle packet {}; action = {}", e, action); + + action + }) +} + +#[inline(always)] +fn try_handle_turn(ctx: &XdpContext) -> Result { + let eth = Eth2::parse(ctx)?; + + match eth.ether_type() { + EtherType::IPV4 => try_handle_turn_ipv4(ctx), + EtherType::IPV6 => try_handle_turn_ipv6(ctx), + _ => Ok(xdp_action::XDP_PASS), + } +} + +#[inline(always)] +fn try_handle_turn_ipv4(ctx: &XdpContext) -> Result { + let ipv4 = Ip4::parse(ctx)?; + + if ipv4.protocol() != IpNumber::UDP { + return Ok(xdp_action::XDP_PASS); + } + + let udp = Udp::parse(ctx)?; // TODO: Change the API so we parse the UDP header _from_ the ipv4 struct? + + trace!(ctx, "New packet from {:i}:{}", ipv4.src(), udp.src()); + + if udp.dst() == 3478 { + try_handle_ipv4_channel_data_to_udp(ctx, ipv4, udp) + } else { + try_handle_ipv4_udp_to_channel_data(ctx, ipv4, udp) + } +} + +fn try_handle_ipv4_channel_data_to_udp( + ctx: &XdpContext, + ipv4: Ip4, + udp: Udp, +) -> Result { + let cd = ChannelData::parse(ctx)?; + + // SAFETY: ??? + let maybe_peer = + unsafe { CHAN_TO_UDP_44.get(&ClientAndChannelV4::new(ipv4.src(), udp.src(), cd.number())) }; + let Some(port_and_peer) = maybe_peer else { + debug!( + ctx, + "No channel binding from {:i}:{} for channel {}", + ipv4.src(), + udp.src(), + cd.number(), + ); + + return Ok(xdp_action::XDP_PASS); + }; + + let new_src = ipv4.dst(); // The IP we received the packet on will be the new source IP. + let new_ipv4_total_len = ipv4.total_len() - CHANNEL_DATA_HEADER_LEN as u16; + let pseudo_header = ipv4.update(new_src, port_and_peer.peer_ip(), new_ipv4_total_len); + + let new_udp_len = udp.len() - CHANNEL_DATA_HEADER_LEN as u16; + udp.update( + pseudo_header, + port_and_peer.allocation_port(), + port_and_peer.peer_port(), + new_udp_len, + ); + + remove_channel_data_header_ipv4(ctx); + + Ok(xdp_action::XDP_TX) +} + +#[inline(always)] +#[expect(unused_variables, reason = "Will be used in the future.")] +fn try_handle_ipv4_udp_to_channel_data( + ctx: &XdpContext, + ipv4: Ip4, + udp: Udp, +) -> Result { + // Not yet implemented ... + + Ok(xdp_action::XDP_PASS) +} + +fn try_handle_turn_ipv6(_: &XdpContext) -> Result { + Ok(xdp_action::XDP_PASS) +} + +/// Defines our panic handler. +/// +/// This doesn't do anything because we can never actually panic in eBPF. +/// Attempting to link a program that wants to abort fails at compile time anyway. +#[cfg(target_arch = "bpf")] +#[panic_handler] +fn on_panic(_: &core::panic::PanicInfo) -> ! { + loop {} +} + +#[cfg(not(target_arch = "bpf"))] +fn main() { + panic!("This program is meant to be compiled as an eBPF program."); +} diff --git a/rust/relay/ebpf-turn-router/src/move_headers.rs b/rust/relay/ebpf-turn-router/src/move_headers.rs new file mode 100644 index 000000000..587c1c9c5 --- /dev/null +++ b/rust/relay/ebpf-turn-router/src/move_headers.rs @@ -0,0 +1,61 @@ +use aya_ebpf::{ + cty::c_void, + helpers::{bpf_xdp_adjust_head, bpf_xdp_load_bytes, bpf_xdp_store_bytes}, + programs::XdpContext, +}; +use etherparse::{Ethernet2Header, Ipv4Header, Ipv6Header, UdpHeader}; + +use crate::CHANNEL_DATA_HEADER_LEN; + +pub fn remove_channel_data_header_ipv4(ctx: &XdpContext) { + move_headers::<{ CHANNEL_DATA_HEADER_LEN as i32 }, { Ipv4Header::MIN_LEN }>(ctx) +} + +#[expect(dead_code, reason = "Will be used in the future.")] +pub fn add_channel_data_header_ipv4(ctx: &XdpContext, mut header: [u8; 4]) { + move_headers::<{ -(CHANNEL_DATA_HEADER_LEN as i32) }, { Ipv4Header::MIN_LEN }>(ctx); + let offset = (Ethernet2Header::LEN + Ipv4Header::MIN_LEN + UdpHeader::LEN) as u32; + + unsafe { + bpf_xdp_store_bytes( + ctx.ctx, + offset, + header.as_mut_ptr() as *mut c_void, + CHANNEL_DATA_HEADER_LEN as u32, + ) + }; +} + +fn move_headers(ctx: &XdpContext) { + // Scratch space for our headers. + // IPv6 headers are always 40 bytes long. + // IPv4 headers are between 20 and 60 bytes long. + // We restrict the eBPF program to only handle 20 byte long IPv4 headers. + // Therefore, we only need to reserver space for IPv6 headers. + // + // Ideally, we would just use the const-generic argument here but that is not yet supported ... + let mut headers = [0u8; Ethernet2Header::LEN + Ipv6Header::LEN + UdpHeader::LEN]; + + // Copy headers into buffer. + unsafe { + bpf_xdp_load_bytes( + ctx.ctx, + 0, + headers.as_mut_ptr() as *mut c_void, + (Ethernet2Header::LEN + IP_HEADER_LEN + UdpHeader::LEN) as u32, + ); + } + + // Move the head for the packet by `DELTA`. + unsafe { bpf_xdp_adjust_head(ctx.ctx, DELTA) }; + + // Copy the headers back. + unsafe { + bpf_xdp_store_bytes( + ctx.ctx, + 0, + headers.as_mut_ptr() as *mut c_void, + (Ethernet2Header::LEN + IP_HEADER_LEN + UdpHeader::LEN) as u32, + ) + }; +} diff --git a/rust/relay/ebpf-turn-router/src/slice_mut_at.rs b/rust/relay/ebpf-turn-router/src/slice_mut_at.rs new file mode 100644 index 000000000..ff3e84d42 --- /dev/null +++ b/rust/relay/ebpf-turn-router/src/slice_mut_at.rs @@ -0,0 +1,17 @@ +use aya_ebpf::programs::XdpContext; + +use crate::error::Error; + +/// Helper function to get a mutable slice of `LEN` bytes at `offset` from the packet data. +#[inline(always)] +pub fn slice_mut_at(ctx: &XdpContext, offset: usize) -> Result<&mut [u8], Error> { + let start = ctx.data(); + let end = ctx.data_end(); + + // Ensure our access is not out-of-bounds. + if start + offset + LEN > end { + return Err(Error::PacketTooShort); + } + + Ok(unsafe { core::slice::from_raw_parts_mut((start + offset) as *mut u8, LEN) }) +} diff --git a/rust/relay/ebpf-turn-router/src/udp.rs b/rust/relay/ebpf-turn-router/src/udp.rs new file mode 100644 index 000000000..9ebf502b5 --- /dev/null +++ b/rust/relay/ebpf-turn-router/src/udp.rs @@ -0,0 +1,86 @@ +use crate::{Error, checksum::ChecksumUpdate, slice_mut_at::slice_mut_at}; +use aya_ebpf::programs::XdpContext; +use aya_log_ebpf::debug; +use etherparse::{Ethernet2Header, Ipv4Header, UdpHeader, UdpHeaderSlice}; +use etherparse_ext::UdpHeaderSliceMut; + +/// Represents a UDP header within our packet. +pub struct Udp<'a> { + src: u16, + dst: u16, + len: u16, + checksum: u16, + + ctx: &'a XdpContext, + + /// Mutable slice of the UDP header, allows us to modify the header in-place. + slice_mut: UdpHeaderSliceMut<'a>, +} + +impl<'a> Udp<'a> { + pub fn parse(ctx: &'a XdpContext) -> Result { + let slice = + slice_mut_at::<{ UdpHeader::LEN }>(ctx, Ethernet2Header::LEN + Ipv4Header::MIN_LEN)?; + let udp_slice = UdpHeaderSlice::from_slice(slice).map_err(|_| Error::ParseUdpHeader)?; + + Ok(Self { + src: udp_slice.source_port(), + dst: udp_slice.destination_port(), + len: udp_slice.length(), + checksum: udp_slice.checksum(), + ctx, + slice_mut: { + // SAFETY: We parsed the slice as a UDP header above. + unsafe { UdpHeaderSliceMut::from_slice_unchecked(slice) } + }, + }) + } + + pub fn src(&self) -> u16 { + self.src + } + + pub fn dst(&self) -> u16 { + self.dst + } + + pub fn len(&self) -> u16 { + self.len + } + + /// Update this packet with a new source, destination, and length. + pub fn update( + mut self, + ip_pseudo_header: ChecksumUpdate, + new_src: u16, + new_dst: u16, + new_len: u16, + ) { + self.slice_mut.set_source_port(new_src); + self.slice_mut.set_destination_port(new_dst); + self.slice_mut.set_length(new_len); + + let ip_pseudo_header = ip_pseudo_header.remove_u16(self.len).add_u16(new_len); + + if crate::config::udp_checksum_enabled() { + self.slice_mut.set_checksum( + ChecksumUpdate::new(self.checksum) + .add_update(ip_pseudo_header) + .remove_u16(self.len) + .add_u16(new_len) + .remove_u16(self.src) + .add_u16(self.src) + .remove_u16(self.dst) + .add_u16(self.dst) + .into_checksum(), + ); + } else { + self.slice_mut.set_checksum(0); + } + + debug!( + self.ctx, + "UDP header update: src {} -> {}; dst {} -> {}", self.src, new_src, self.dst, new_dst, + ); + } +} diff --git a/rust/relay/Cargo.toml b/rust/relay/server/Cargo.toml similarity index 84% rename from rust/relay/Cargo.toml rename to rust/relay/server/Cargo.toml index ba756c83a..126aa491d 100644 --- a/rust/relay/Cargo.toml +++ b/rust/relay/server/Cargo.toml @@ -4,6 +4,9 @@ version = "0.1.0" edition = { workspace = true } license = { workspace = true } +[package.metadata.cargo-udeps.ignore] +development = ["difference", "env_logger"] + [dependencies] anyhow = { workspace = true } backoff = { workspace = true } @@ -12,6 +15,7 @@ bytecodec = { workspace = true } bytes = { workspace = true } clap = { workspace = true, features = ["derive", "env"] } derive_more = { workspace = true, features = ["from"] } +ebpf-shared = { workspace = true, features = ["std"] } firezone-bin-shared = { workspace = true } firezone-logging = { workspace = true } firezone-telemetry = { workspace = true } @@ -45,10 +49,19 @@ trackable = { workspace = true } url = { workspace = true } uuid = { workspace = true, features = ["v4"] } +[target.'cfg(target_os = "linux")'.dependencies] +aya = { workspace = true } +aya-log = { workspace = true } + [dev-dependencies] difference = { workspace = true } env_logger = { workspace = true } test-strategy = { workspace = true } +tokio = { workspace = true, features = ["process", "macros", "net"] } + +[build-dependencies] +anyhow = "1" +aya-build = { workspace = true } [[test]] name = "regression" diff --git a/rust/relay/README.md b/rust/relay/server/README.md similarity index 100% rename from rust/relay/README.md rename to rust/relay/server/README.md diff --git a/rust/relay/server/build.rs b/rust/relay/server/build.rs new file mode 100644 index 000000000..83f9c32cd --- /dev/null +++ b/rust/relay/server/build.rs @@ -0,0 +1,24 @@ +#[cfg(target_os = "linux")] +fn main() -> anyhow::Result<()> { + use anyhow::Context as _; + use aya_build::cargo_metadata::{MetadataCommand, Package}; + + let package = MetadataCommand::new() + .no_deps() + .exec() + .context("MetadataCommand::exec")? + .packages + .into_iter() + .find(|Package { name, .. }| name == "ebpf-turn-router") + .context("`ebpf-turn-router` package not found")?; + + aya_build::build_ebpf( + [package], + aya_build::Toolchain::Custom("+nightly-2024-12-13"), + )?; + + Ok(()) +} + +#[cfg(not(target_os = "linux"))] +fn main() {} diff --git a/rust/relay/proptest-regressions/server/channel_data.txt b/rust/relay/server/proptest-regressions/server/channel_data.txt similarity index 100% rename from rust/relay/proptest-regressions/server/channel_data.txt rename to rust/relay/server/proptest-regressions/server/channel_data.txt diff --git a/rust/relay/src/auth.rs b/rust/relay/server/src/auth.rs similarity index 100% rename from rust/relay/src/auth.rs rename to rust/relay/server/src/auth.rs diff --git a/rust/relay/server/src/ebpf.rs b/rust/relay/server/src/ebpf.rs new file mode 100644 index 000000000..6db0d39d5 --- /dev/null +++ b/rust/relay/server/src/ebpf.rs @@ -0,0 +1,8 @@ +#[cfg(target_os = "linux")] +#[path = "ebpf/linux.rs"] +mod platform; +#[cfg(not(target_os = "linux"))] +#[path = "ebpf/stub.rs"] +mod platform; + +pub use platform::Program; diff --git a/rust/relay/server/src/ebpf/linux.rs b/rust/relay/server/src/ebpf/linux.rs new file mode 100644 index 000000000..92ba1543e --- /dev/null +++ b/rust/relay/server/src/ebpf/linux.rs @@ -0,0 +1,148 @@ +use std::net::SocketAddr; + +use anyhow::{Context as _, Result}; +use aya::{ + Pod, + maps::{Array, HashMap, MapData}, + programs::{Xdp, XdpFlags}, +}; +use aya_log::EbpfLogger; +use ebpf_shared::{ClientAndChannelV4, Config, PortAndPeerV4}; +use stun_codec::rfc5766::attributes::ChannelNumber; + +use crate::{AllocationPort, ClientSocket, PeerSocket}; + +pub struct Program { + ebpf: aya::Ebpf, +} + +impl Program { + pub fn try_load(interface: &'static str) -> Result { + let mut ebpf = aya::Ebpf::load(aya::include_bytes_aligned!(concat!( + env!("OUT_DIR"), + "/ebpf-turn-router-main" + )))?; + let _ = EbpfLogger::init(&mut ebpf); + let program: &mut Xdp = ebpf + .program_mut("handle_turn") + .context("No program")? + .try_into()?; + program.load().context("Failed to load program")?; + program + .attach(interface, XdpFlags::default()) + .with_context(|| format!("Failed to attached to interface {interface}"))?; + + Ok(Self { ebpf }) + } + + pub fn add_channel_binding( + &mut self, + client: ClientSocket, + channel_number: ChannelNumber, + peer: PeerSocket, + allocation_port: AllocationPort, + ) -> Result<()> { + let client = client.into_socket(); + let peer = peer.into_socket(); + + match (client, peer) { + (SocketAddr::V4(client), SocketAddr::V4(peer)) => { + let client_and_channel = + ClientAndChannelV4::from_socket(client, channel_number.value()); + let port_and_peer = PortAndPeerV4::from_socket(peer, allocation_port.value()); + + self.chan_to_udp_44_map_mut()? + .insert(client_and_channel, port_and_peer, 0)?; + self.udp_to_chan_44_map_mut()? + .insert(port_and_peer, client_and_channel, 0)?; + } + (SocketAddr::V6(_), SocketAddr::V6(_)) => { + // IPv6 is not yet supported in the eBPF kernel. + } + (SocketAddr::V4(_), SocketAddr::V6(_)) | (SocketAddr::V6(_), SocketAddr::V4(_)) => { + // Relaying between IPv4 and IPv6 is not supported in the eBPF kernel. + } + } + + Ok(()) + } + + pub fn remove_channel_binding( + &mut self, + client: ClientSocket, + channel_number: ChannelNumber, + peer: PeerSocket, + allocation_port: AllocationPort, + ) -> Result<()> { + let client = client.into_socket(); + let peer = peer.into_socket(); + + match (client, peer) { + (SocketAddr::V4(client), SocketAddr::V4(peer)) => { + let client_and_channel = + ClientAndChannelV4::from_socket(client, channel_number.value()); + let port_and_peer = PortAndPeerV4::from_socket(peer, allocation_port.value()); + + self.chan_to_udp_44_map_mut()?.remove(&client_and_channel)?; + self.udp_to_chan_44_map_mut()?.remove(&port_and_peer)?; + } + (SocketAddr::V6(_), SocketAddr::V6(_)) => { + // IPv6 is not yet supported in the eBPF kernel. + } + (SocketAddr::V4(_), SocketAddr::V6(_)) | (SocketAddr::V6(_), SocketAddr::V4(_)) => { + // Relaying between IPv4 and IPv6 is not supported in the eBPF kernel. + } + } + + Ok(()) + } + + pub fn set_config(&mut self, config: Config) -> Result<()> { + self.config_array_mut()?.set(0, config, 0)?; + + Ok(()) + } + + fn chan_to_udp_44_map_mut( + &mut self, + ) -> Result> { + self.hash_map_mut("CHAN_TO_UDP_44") + } + + fn udp_to_chan_44_map_mut( + &mut self, + ) -> Result> { + self.hash_map_mut("UDP_TO_CHAN_44") + } + + fn config_array_mut(&mut self) -> Result> { + self.array_mut("CONFIG") + } + + fn hash_map_mut(&mut self, name: &'static str) -> Result> + where + K: Pod, + V: Pod, + { + let map = self + .ebpf + .map_mut(name) + .with_context(|| format!("Map `{name}` not found"))?; + let map = HashMap::<_, K, V>::try_from(map).context("Failed to convert map")?; + + Ok(map) + } + + fn array_mut(&mut self, name: &'static str) -> Result> + where + T: Pod, + { + let map = self + .ebpf + .map_mut(name) + .with_context(|| format!("Array `{name}` not found"))?; + let map = Array::<_, T>::try_from(map).context("Failed to convert array")?; + + Ok(map) + } +} diff --git a/rust/relay/server/src/ebpf/stub.rs b/rust/relay/server/src/ebpf/stub.rs new file mode 100644 index 000000000..59a497ab7 --- /dev/null +++ b/rust/relay/server/src/ebpf/stub.rs @@ -0,0 +1,42 @@ +#![expect( + clippy::unnecessary_wraps, + reason = "Function signatures must align with the Linux impl." +)] + +use anyhow::Result; +use ebpf_shared::Config; +use stun_codec::rfc5766::attributes::ChannelNumber; + +use crate::{AllocationPort, ClientSocket, PeerSocket}; + +pub struct Program {} + +impl Program { + pub fn try_load(_: &'static str) -> Result { + Err(anyhow::anyhow!("Platform not supported")) + } + + pub fn add_channel_binding_ipv4( + &mut self, + _: ClientSocket, + _: ChannelNumber, + _: PeerSocket, + _: AllocationPort, + ) -> Result<()> { + Ok(()) + } + + pub fn remove_channel_binding_ipv4( + &mut self, + _: ClientSocket, + _: ChannelNumber, + _: PeerSocket, + _: AllocationPort, + ) -> Result<()> { + Ok(()) + } + + pub fn set_config(&mut self, _: Config) -> Result<()> { + Ok(()) + } +} diff --git a/rust/relay/src/lib.rs b/rust/relay/server/src/lib.rs similarity index 99% rename from rust/relay/src/lib.rs rename to rust/relay/server/src/lib.rs index 2d3a2342d..3b9edb884 100644 --- a/rust/relay/src/lib.rs +++ b/rust/relay/server/src/lib.rs @@ -5,6 +5,7 @@ mod server; mod sleep; pub mod auth; +pub mod ebpf; #[cfg(feature = "proptest")] #[allow(clippy::unwrap_used)] pub mod proptest; diff --git a/rust/relay/src/main.rs b/rust/relay/server/src/main.rs similarity index 91% rename from rust/relay/src/main.rs rename to rust/relay/server/src/main.rs index 36089ddad..00fb3125a 100644 --- a/rust/relay/src/main.rs +++ b/rust/relay/server/src/main.rs @@ -8,7 +8,7 @@ use firezone_logging::{err_with_src, sentry_layer}; use firezone_relay::sockets::Sockets; use firezone_relay::{ AddressFamily, AllocationPort, ChannelData, ClientSocket, Command, IpStack, PeerSocket, Server, - Sleep, VERSION, sockets, + Sleep, VERSION, ebpf, sockets, }; use firezone_telemetry::{RELAY_DSN, Telemetry}; use futures::{FutureExt, future}; @@ -22,6 +22,7 @@ use std::pin::Pin; use std::sync::{Arc, Mutex}; use std::task::{Poll, ready}; use std::time::{Duration, Instant}; +use stun_codec::rfc5766::attributes::ChannelNumber; use tracing::{Subscriber, level_filters::LevelFilter}; use tracing_core::Dispatch; use tracing_stackdriver::CloudTraceConfiguration; @@ -130,6 +131,10 @@ fn main() { async fn try_main(args: Args) -> Result<()> { setup_tracing(&args)?; + let ebpf = ebpf::Program::try_load("eth0") + .inspect_err(|e| tracing::info!("Failed to load eBPF TURN router: {e:#}")) + .ok(); + let public_addr = match (args.public_ip4_addr, args.public_ip6_addr) { (Some(ip4), Some(ip6)) => IpStack::Dual { ip4, ip6 }, (Some(ip4), None) => IpStack::Ip4(ip4), @@ -178,7 +183,7 @@ async fn try_main(args: Args) -> Result<()> { )?; channel.connect(NoParams); - let mut eventloop = Eventloop::new(server, channel, public_addr, last_heartbeat_sent)?; + let mut eventloop = Eventloop::new(server, ebpf, channel, public_addr, last_heartbeat_sent)?; tracing::info!(target: "relay", "Listening for incoming traffic on UDP port {0}", args.listen_port); @@ -331,6 +336,8 @@ struct Eventloop { channel: Option>, sleep: Sleep, + ebpf: Option, + #[cfg(unix)] sigterm: tokio::signal::unix::Signal, shutting_down: bool, @@ -349,6 +356,7 @@ where { fn new( server: Server, + ebpf: Option, channel: PhoenixChannel, public_address: IpStack, last_heartbeat_sent: Arc>>, @@ -383,6 +391,7 @@ where stats_log_interval: tokio::time::interval(STATS_LOG_INTERVAL), last_num_bytes_relayed: 0, sockets, + ebpf, buffer: [0u8; MAX_UDP_SIZE], last_heartbeat_sent, #[cfg(unix)] @@ -433,6 +442,36 @@ where tracing::info!(target: "relay", %port, %family, "Freeing allocation"); } + Command::CreateChannelBinding { + client, + channel_number, + peer, + allocation_port, + } => { + if let Err(e) = self.create_channel_binding_in_ebpf_map( + client, + channel_number, + peer, + allocation_port, + ) { + tracing::debug!(target: "relay", %client, "Failed to create channel binding in eBPF map: {e:#}"); + } + } + Command::DeleteChannelBinding { + client, + channel_number, + peer, + allocation_port, + } => { + if let Err(e) = self.delete_channel_binding_in_ebpf_map( + client, + channel_number, + peer, + allocation_port, + ) { + tracing::debug!(target: "relay", %client, "Failed to delete channel binding in eBPF map: {e:#}"); + } + } } ready = true; @@ -595,6 +634,38 @@ where } } + fn create_channel_binding_in_ebpf_map( + &mut self, + client: ClientSocket, + channel_number: ChannelNumber, + peer: PeerSocket, + allocation_port: AllocationPort, + ) -> Result<()> { + let Some(ebpf) = self.ebpf.as_mut() else { + return Ok(()); // ebPF program not loaded ... + }; + + ebpf.add_channel_binding(client, channel_number, peer, allocation_port)?; + + Ok(()) + } + + fn delete_channel_binding_in_ebpf_map( + &mut self, + client: ClientSocket, + channel_number: ChannelNumber, + peer: PeerSocket, + allocation_port: AllocationPort, + ) -> Result<()> { + let Some(ebpf) = self.ebpf.as_mut() else { + return Ok(()); // ebPF program not loaded ... + }; + + ebpf.remove_channel_binding(client, channel_number, peer, allocation_port)?; + + Ok(()) + } + fn handle_portal_event(&mut self, event: phoenix_channel::Event) { match event { Event::SuccessResponse { res: (), .. } => {} diff --git a/rust/relay/src/net_ext.rs b/rust/relay/server/src/net_ext.rs similarity index 100% rename from rust/relay/src/net_ext.rs rename to rust/relay/server/src/net_ext.rs diff --git a/rust/relay/src/proptest.rs b/rust/relay/server/src/proptest.rs similarity index 100% rename from rust/relay/src/proptest.rs rename to rust/relay/server/src/proptest.rs diff --git a/rust/relay/src/server.rs b/rust/relay/server/src/server.rs similarity index 98% rename from rust/relay/src/server.rs rename to rust/relay/server/src/server.rs index c4fa8b282..1d76cedeb 100644 --- a/rust/relay/src/server.rs +++ b/rust/relay/server/src/server.rs @@ -110,6 +110,20 @@ pub enum Command { port: AllocationPort, family: AddressFamily, }, + CreateChannelBinding { + client: ClientSocket, + channel_number: ChannelNumber, + + peer: PeerSocket, + allocation_port: AllocationPort, + }, + DeleteChannelBinding { + client: ClientSocket, + channel_number: ChannelNumber, + + peer: PeerSocket, + allocation_port: AllocationPort, + }, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)] @@ -430,6 +444,14 @@ where { tracing::info!(target: "relay", channel = %number.value(), %client, peer = %channel.peer_address, allocation = %channel.allocation, "Channel is now expired"); + self.pending_commands + .push_back(Command::DeleteChannelBinding { + client: *client, + channel_number: *number, + peer: channel.peer_address, + allocation_port: channel.allocation, + }); + channel.bound = false; if let Some((cs, n)) = self .channel_and_client_by_port_and_peer @@ -707,6 +729,13 @@ where (channel.allocation, channel.peer_address), (sender, requested_channel), ); + self.pending_commands + .push_back(Command::CreateChannelBinding { + client: sender, + channel_number: requested_channel, + peer: peer_address, + allocation_port: channel.allocation, + }); tracing::info!(target: "relay", "Refreshed channel binding"); @@ -911,6 +940,7 @@ where bound: true, }, ); + debug_assert!(existing.is_none()); let existing = self diff --git a/rust/relay/src/server/channel_data.rs b/rust/relay/server/src/server/channel_data.rs similarity index 100% rename from rust/relay/src/server/channel_data.rs rename to rust/relay/server/src/server/channel_data.rs diff --git a/rust/relay/src/server/client_message.rs b/rust/relay/server/src/server/client_message.rs similarity index 100% rename from rust/relay/src/server/client_message.rs rename to rust/relay/server/src/server/client_message.rs diff --git a/rust/relay/src/sleep.rs b/rust/relay/server/src/sleep.rs similarity index 100% rename from rust/relay/src/sleep.rs rename to rust/relay/server/src/sleep.rs diff --git a/rust/relay/src/sockets.rs b/rust/relay/server/src/sockets.rs similarity index 100% rename from rust/relay/src/sockets.rs rename to rust/relay/server/src/sockets.rs diff --git a/rust/relay/server/tests/ebpf_ipv4.rs b/rust/relay/server/tests/ebpf_ipv4.rs new file mode 100644 index 000000000..f3ce2be2e --- /dev/null +++ b/rust/relay/server/tests/ebpf_ipv4.rs @@ -0,0 +1,63 @@ +#![allow(clippy::unwrap_used)] + +use firezone_relay::{AllocationPort, ClientSocket, PeerSocket}; +use std::time::Duration; +use tokio::net::UdpSocket; + +use ebpf_shared::Config; +use stun_codec::rfc5766::attributes::ChannelNumber; + +#[tokio::test] +async fn ping_pong() { + let _guard = firezone_logging::test("trace,mio=off"); + + let mut program = firezone_relay::ebpf::Program::try_load("lo").unwrap(); + + // Linux does not set the correct UDP checksum when sending the packet, so our updated checksum in the eBPF code will be wrong and later dropped. + // To make the test work, we therefore need to tell the eBPF program to disable UDP checksumming by just setting it to 0. + program + .set_config(Config { + udp_checksum_enabled: false, + }) + .unwrap(); + + let client = UdpSocket::bind("127.0.0.1:0").await.unwrap(); + let peer = UdpSocket::bind("127.0.0.1:0").await.unwrap(); + + let client_socket = client.local_addr().unwrap(); + let peer_socket = peer.local_addr().unwrap(); + + let channel_number = ChannelNumber::new(0x4000).unwrap(); + let allocation_port = 50000; + + program + .add_channel_binding( + ClientSocket::new(client_socket), + channel_number, + PeerSocket::new(peer_socket), + AllocationPort::new(allocation_port), + ) + .unwrap(); + + let msg = b"ping"; + let msg_len = msg.len(); + let mut buf = [0u8; 512]; + + let (header, payload) = buf.split_at_mut(4); + payload[..msg_len].copy_from_slice(msg); + + let len = + firezone_relay::ChannelData::encode_header_to_slice(channel_number, msg_len as u16, header); + + client.send_to(&buf[..len], "127.0.0.1:3478").await.unwrap(); + + let mut recv_buf = [0u8; 512]; + + let (len, from) = tokio::time::timeout(Duration::from_secs(1), peer.recv_from(&mut recv_buf)) + .await + .unwrap() + .unwrap(); + + assert_eq!(from.port(), allocation_port); + assert_eq!(&recv_buf[..len], msg); +} diff --git a/rust/relay/tests/regression.proptest-regressions b/rust/relay/server/tests/regression.proptest-regressions similarity index 100% rename from rust/relay/tests/regression.proptest-regressions rename to rust/relay/server/tests/regression.proptest-regressions diff --git a/rust/relay/server/tests/regression.rs b/rust/relay/server/tests/regression.rs new file mode 100644 index 000000000..128d21698 --- /dev/null +++ b/rust/relay/server/tests/regression.rs @@ -0,0 +1,936 @@ +// #![allow(clippy::unwrap_used)] + +// use bytecodec::{DecodeExt, EncodeExt}; +// use firezone_relay::{ +// AddressFamily, Allocate, AllocationPort, Attribute, Binding, ChannelBind, ChannelData, +// ClientMessage, ClientSocket, Command, IpStack, PeerSocket, Refresh, Server, SOFTWARE, +// }; +// use rand::rngs::mock::StepRng; +// use secrecy::SecretString; +// use std::iter; +// use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; +// use std::time::{Duration, Instant, SystemTime}; +// use stun_codec::rfc5389::attributes::{ +// ErrorCode, MessageIntegrity, Nonce, Realm, Username, XorMappedAddress, +// }; +// use stun_codec::rfc5389::errors::Unauthorized; +// use stun_codec::rfc5389::methods::BINDING; +// use stun_codec::rfc5766::attributes::{ChannelNumber, Lifetime, XorPeerAddress, XorRelayAddress}; +// use stun_codec::rfc5766::methods::{ALLOCATE, CHANNEL_BIND, REFRESH}; +// use stun_codec::{Message, MessageClass, MessageDecoder, MessageEncoder, TransactionId}; +// use test_strategy::proptest; +// use uuid::Uuid; +// use Output::{CreateAllocation, FreeAllocation}; + +// #[proptest] +// fn can_answer_stun_request_from_ip4_address( +// #[strategy(firezone_relay::proptest::binding())] request: Binding, +// source: SocketAddrV4, +// public_relay_addr: Ipv4Addr, +// ) { +// let _ = env_logger::try_init(); +// let mut server = TestServer::new(public_relay_addr); + +// let transaction_id = request.transaction_id(); + +// server.assert_commands( +// from_client(source, request, Instant::now()), +// [send_message( +// source, +// binding_response(transaction_id, source), +// )], +// ); +// } + +// #[proptest] +// fn deallocate_once_time_expired( +// #[strategy(firezone_relay::proptest::transaction_id())] transaction_id: TransactionId, +// #[strategy(firezone_relay::proptest::allocation_lifetime())] lifetime: Lifetime, +// #[strategy(firezone_relay::proptest::username_salt())] username_salt: String, +// source: SocketAddrV4, +// public_relay_addr: Ipv4Addr, +// #[strategy(firezone_relay::proptest::nonce())] nonce: Uuid, +// ) { +// let now = Instant::now(); + +// let mut server = TestServer::new(public_relay_addr).with_nonce(nonce); +// let secret = server.auth_secret(); + +// server.assert_commands( +// from_client( +// source, +// Allocate::new_authenticated_udp_implicit_ip4( +// transaction_id, +// Some(lifetime.clone()), +// valid_username(&username_salt), +// secret, +// nonce, +// ) +// .unwrap(), +// now, +// ), +// [ +// create_allocation(49152, AddressFamily::V4), +// send_message( +// source, +// allocate_response(transaction_id, public_relay_addr, 49152, source, &lifetime), +// ), +// ], +// ); + +// assert_eq!( +// server.server.poll_timeout(), +// Some(now + lifetime.lifetime()) +// ); + +// server.assert_commands( +// forward_time_to(now + lifetime.lifetime() + Duration::from_secs(1)), +// [free_allocation(49152, AddressFamily::V4)], +// ); +// } + +// #[proptest] +// fn unauthenticated_allocate_triggers_authentication( +// #[strategy(firezone_relay::proptest::transaction_id())] transaction_id: TransactionId, +// #[strategy(firezone_relay::proptest::allocation_lifetime())] lifetime: Lifetime, +// #[strategy(firezone_relay::proptest::username_salt())] username_salt: String, +// source: SocketAddrV4, +// public_relay_addr: Ipv4Addr, +// ) { +// let now = Instant::now(); + +// // Nonces are generated randomly and we control the randomness in the test, thus this is deterministic. +// let first_nonce = Uuid::from_u128(0x0); + +// let mut server = TestServer::new(public_relay_addr); +// let secret = server.auth_secret().to_owned(); + +// server.assert_commands( +// from_client( +// source, +// Allocate::new_unauthenticated_udp(transaction_id, Some(lifetime.clone())), +// now, +// ), +// [send_message( +// source, +// unauthorized_allocate_response(transaction_id, first_nonce), +// )], +// ); + +// server.assert_commands( +// from_client( +// source, +// Allocate::new_authenticated_udp_implicit_ip4( +// transaction_id, +// Some(lifetime.clone()), +// valid_username(&username_salt), +// &secret, +// first_nonce, +// ) +// .unwrap(), +// now, +// ), +// [ +// create_allocation(49152, AddressFamily::V4), +// send_message( +// source, +// allocate_response(transaction_id, public_relay_addr, 49152, source, &lifetime), +// ), +// ], +// ); + +// assert_eq!( +// server.server.poll_timeout(), +// Some(now + lifetime.lifetime()) +// ); +// } + +// #[proptest] +// fn when_refreshed_in_time_allocation_does_not_expire( +// #[strategy(firezone_relay::proptest::transaction_id())] allocate_transaction_id: TransactionId, +// #[strategy(firezone_relay::proptest::transaction_id())] refresh_transaction_id: TransactionId, +// #[strategy(firezone_relay::proptest::allocation_lifetime())] allocate_lifetime: Lifetime, +// #[strategy(firezone_relay::proptest::allocation_lifetime())] refresh_lifetime: Lifetime, +// #[strategy(firezone_relay::proptest::username_salt())] username_salt: String, +// source: SocketAddrV4, +// public_relay_addr: Ipv4Addr, +// #[strategy(firezone_relay::proptest::nonce())] nonce: Uuid, +// ) { +// let now = Instant::now(); + +// let mut server = TestServer::new(public_relay_addr).with_nonce(nonce); +// let secret = server.auth_secret().to_owned(); +// let first_wake = now + allocate_lifetime.lifetime(); + +// server.assert_commands( +// from_client( +// source, +// Allocate::new_authenticated_udp_implicit_ip4( +// allocate_transaction_id, +// Some(allocate_lifetime.clone()), +// valid_username(&username_salt), +// &secret, +// nonce, +// ) +// .unwrap(), +// now, +// ), +// [ +// create_allocation(49152, AddressFamily::V4), +// send_message( +// source, +// allocate_response( +// allocate_transaction_id, +// public_relay_addr, +// 49152, +// source, +// &allocate_lifetime, +// ), +// ), +// ], +// ); + +// assert_eq!(server.server.poll_timeout(), Some(first_wake)); + +// // Forward time +// let now = now + allocate_lifetime.lifetime() / 2; +// let second_wake = now + refresh_lifetime.lifetime(); + +// server.assert_commands( +// from_client( +// source, +// Refresh::new( +// refresh_transaction_id, +// Some(refresh_lifetime.clone()), +// valid_username(&username_salt), +// &secret, +// nonce, +// ) +// .unwrap(), +// now, +// ), +// [send_message( +// source, +// refresh_response(refresh_transaction_id, refresh_lifetime.clone()), +// )], +// ); + +// assert_eq!(server.server.poll_timeout(), Some(second_wake)); + +// // The allocation MUST NOT be expired 1 sec before its refresh lifetime. +// // Note that depending on how the lifetimes were generated, this may still be before the initial allocation lifetime. +// // This is okay because lifetimes do not roll over, i.e. a refresh is not "added" to the initial lifetime but the allocation's lifetime is simply computed from now + requested lifetime of the refresh request. +// server.assert_commands( +// forward_time_to(now + refresh_lifetime.lifetime() - Duration::from_secs(1)), +// [], +// ); +// } + +// #[proptest] +// fn when_receiving_lifetime_0_for_existing_allocation_then_delete( +// #[strategy(firezone_relay::proptest::transaction_id())] allocate_transaction_id: TransactionId, +// #[strategy(firezone_relay::proptest::transaction_id())] refresh_transaction_id: TransactionId, +// #[strategy(firezone_relay::proptest::allocation_lifetime())] allocate_lifetime: Lifetime, +// #[strategy(firezone_relay::proptest::username_salt())] username_salt: String, +// source: SocketAddrV4, +// public_relay_addr: Ipv4Addr, +// #[strategy(firezone_relay::proptest::nonce())] nonce: Uuid, +// ) { +// let now = Instant::now(); + +// let mut server = TestServer::new(public_relay_addr).with_nonce(nonce); +// let secret = server.auth_secret().to_owned(); +// let first_wake = now + allocate_lifetime.lifetime(); + +// server.assert_commands( +// from_client( +// source, +// Allocate::new_authenticated_udp_implicit_ip4( +// allocate_transaction_id, +// Some(allocate_lifetime.clone()), +// valid_username(&username_salt), +// &secret, +// nonce, +// ) +// .unwrap(), +// now, +// ), +// [ +// create_allocation(49152, AddressFamily::V4), +// send_message( +// source, +// allocate_response( +// allocate_transaction_id, +// public_relay_addr, +// 49152, +// source, +// &allocate_lifetime, +// ), +// ), +// ], +// ); + +// assert_eq!(server.server.poll_timeout(), Some(first_wake)); + +// // Forward time +// let now = now + allocate_lifetime.lifetime() / 2; + +// server.assert_commands( +// from_client( +// source, +// Refresh::new( +// refresh_transaction_id, +// Some(Lifetime::new(Duration::ZERO).unwrap()), +// valid_username(&username_salt), +// &secret, +// nonce, +// ) +// .unwrap(), +// now, +// ), +// [ +// free_allocation(49152, AddressFamily::V4), +// send_message( +// source, +// refresh_response( +// refresh_transaction_id, +// Lifetime::new(Duration::ZERO).unwrap(), +// ), +// ), +// ], +// ); + +// // Assert that forwarding time does not produce an obsolete event. +// server.assert_commands(forward_time_to(first_wake + Duration::from_secs(1)), []); +// } + +// #[proptest] +// fn freeing_allocation_clears_all_channels( +// #[strategy(firezone_relay::proptest::transaction_id())] allocate_transaction_id: TransactionId, +// #[strategy(firezone_relay::proptest::transaction_id())] +// channel_bind_transaction_id: TransactionId, +// #[strategy(firezone_relay::proptest::transaction_id())] refresh_transaction_id: TransactionId, +// #[strategy(firezone_relay::proptest::channel_number())] channel: ChannelNumber, +// #[strategy(firezone_relay::proptest::username_salt())] username_salt: String, +// source: SocketAddr, +// peer: SocketAddrV4, +// public_relay_addr: Ipv4Addr, +// #[strategy(firezone_relay::proptest::nonce())] nonce: Uuid, +// ) { +// let now = Instant::now(); + +// let _ = env_logger::try_init(); + +// let mut server = TestServer::new(public_relay_addr).with_nonce(nonce); +// let secret = server.auth_secret().to_owned(); + +// let _ = server.server.handle_client_message( +// ClientMessage::Allocate( +// Allocate::new_authenticated_udp_implicit_ip4( +// allocate_transaction_id, +// None, +// valid_username(&username_salt), +// &secret, +// nonce, +// ) +// .unwrap(), +// ), +// ClientSocket::new(source), +// now, +// ); +// let _ = server.server.handle_client_message( +// ClientMessage::ChannelBind( +// ChannelBind::new( +// channel_bind_transaction_id, +// channel, +// XorPeerAddress::new(peer.into()), +// valid_username(&username_salt), +// &secret, +// nonce, +// ) +// .unwrap(), +// ), +// ClientSocket::new(source), +// now, +// ); +// let _ = server.server.handle_client_message( +// ClientMessage::Refresh( +// Refresh::new( +// refresh_transaction_id, +// Some(Lifetime::new(Duration::ZERO).unwrap()), +// valid_username(&username_salt), +// &secret, +// nonce, +// ) +// .unwrap(), +// ), +// ClientSocket::new(source), +// now, +// ); + +// assert_eq!(server.server.num_active_channels(), 0); +// } + +// // #[test] +// // fn server_waits_for_5_minutes_before_allowing_reuse_of_channel_number_after_expiry() { +// // // todo!() +// // } + +// #[proptest] +// fn ping_pong_relay( +// #[strategy(firezone_relay::proptest::transaction_id())] allocate_transaction_id: TransactionId, +// #[strategy(firezone_relay::proptest::transaction_id())] +// channel_bind_transaction_id: TransactionId, +// #[strategy(firezone_relay::proptest::username_salt())] username_salt: String, +// source: SocketAddrV4, +// peer: SocketAddrV4, +// public_relay_addr: Ipv4Addr, +// peer_to_client_ping: [u8; 32], +// #[strategy(firezone_relay::proptest::channel_data())] client_to_peer_ping: ChannelData<'static>, +// #[strategy(firezone_relay::proptest::nonce())] nonce: Uuid, +// ) { +// let now = Instant::now(); + +// let _ = env_logger::try_init(); + +// let mut server = TestServer::new(public_relay_addr).with_nonce(nonce); +// let secret = server.auth_secret().to_owned(); +// let lifetime = Lifetime::new(Duration::from_secs(60 * 60)).unwrap(); // Lifetime longer than channel expiry + +// server.assert_commands( +// from_client( +// source, +// Allocate::new_authenticated_udp_implicit_ip4( +// allocate_transaction_id, +// Some(lifetime.clone()), +// valid_username(&username_salt), +// &secret, +// nonce, +// ) +// .unwrap(), +// now, +// ), +// [ +// create_allocation(49152, AddressFamily::V4), +// send_message( +// source, +// allocate_response( +// allocate_transaction_id, +// public_relay_addr, +// 49152, +// source, +// &lifetime, +// ), +// ), +// ], +// ); + +// assert_eq!( +// server.server.poll_timeout(), +// Some(now + lifetime.lifetime()) +// ); + +// let now = now + Duration::from_secs(1); + +// server.assert_commands( +// from_client( +// source, +// ChannelBind::new( +// channel_bind_transaction_id, +// client_to_peer_ping.channel(), +// XorPeerAddress::new(peer.into()), +// valid_username(&username_salt), +// &secret, +// nonce, +// ) +// .unwrap(), +// now, +// ), +// [send_message( +// source, +// channel_bind_response(channel_bind_transaction_id), +// )], +// ); + +// assert_eq!( +// server.server.poll_timeout(), +// Some(now + Duration::from_secs(60 * 10)) +// ); + +// let now = now + Duration::from_secs(1); + +// let maybe_forward = server.server.handle_client_input( +// client_to_peer_ping.as_msg(), +// ClientSocket::new(source.into()), +// now, +// ); + +// assert_eq!( +// maybe_forward, +// Some((AllocationPort::new(49152), PeerSocket::new(peer.into()))) +// ); + +// let maybe_forward = server.server.handle_peer_traffic( +// peer_to_client_ping.as_slice(), +// PeerSocket::new(peer.into()), +// AllocationPort::new(49152), +// ); + +// assert_eq!( +// maybe_forward, +// Some(( +// ClientSocket::new(source.into()), +// client_to_peer_ping.channel() +// )) +// ); +// } + +// #[proptest] +// fn allows_rebind_channel_after_expiry( +// #[strategy(firezone_relay::proptest::transaction_id())] allocate_transaction_id: TransactionId, +// #[strategy(firezone_relay::proptest::transaction_id())] +// channel_bind_transaction_id: TransactionId, +// #[strategy(firezone_relay::proptest::transaction_id())] +// channel_bind_2_transaction_id: TransactionId, +// #[strategy(firezone_relay::proptest::username_salt())] username_salt: String, +// #[strategy(firezone_relay::proptest::channel_number())] channel: ChannelNumber, +// source: SocketAddrV4, +// peer: SocketAddrV4, +// peer2: SocketAddrV4, +// public_relay_addr: Ipv4Addr, +// #[strategy(firezone_relay::proptest::nonce())] nonce: Uuid, +// ) { +// let now = Instant::now(); + +// let _ = env_logger::try_init(); + +// let mut server = TestServer::new(public_relay_addr).with_nonce(nonce); +// let secret = server.auth_secret().to_owned(); +// let lifetime = Lifetime::new(Duration::from_secs(60 * 60)).unwrap(); // Lifetime longer than channel expiry + +// server.assert_commands( +// from_client( +// source, +// Allocate::new_authenticated_udp_implicit_ip4( +// allocate_transaction_id, +// Some(lifetime.clone()), +// valid_username(&username_salt), +// &secret, +// nonce, +// ) +// .unwrap(), +// now, +// ), +// [ +// create_allocation(49152, AddressFamily::V4), +// send_message( +// source, +// allocate_response( +// allocate_transaction_id, +// public_relay_addr, +// 49152, +// source, +// &lifetime, +// ), +// ), +// ], +// ); + +// assert_eq!( +// server.server.poll_timeout(), +// Some(now + lifetime.lifetime()) +// ); + +// let now = now + Duration::from_secs(1); + +// server.assert_commands( +// from_client( +// source, +// ChannelBind::new( +// channel_bind_transaction_id, +// channel, +// XorPeerAddress::new(peer.into()), +// valid_username(&username_salt), +// &secret, +// nonce, +// ) +// .unwrap(), +// now, +// ), +// [send_message( +// source, +// channel_bind_response(channel_bind_transaction_id), +// )], +// ); + +// let channel_expiry = now + Duration::from_secs(60 * 10); +// let channel_rebind = channel_expiry + Duration::from_secs(60 * 5); + +// assert_eq!(server.server.poll_timeout(), Some(channel_expiry)); + +// let now = now + Duration::from_secs(60 * 10 + 1); + +// server.server.handle_timeout(now); +// assert_eq!(server.server.poll_timeout(), Some(channel_rebind)); + +// let now = now + Duration::from_secs(60 * 5 + 1); + +// server.server.handle_timeout(now); + +// let now = now + Duration::from_secs(1); + +// server.assert_commands( +// from_client( +// source, +// ChannelBind::new( +// channel_bind_2_transaction_id, +// channel, +// XorPeerAddress::new(peer2.into()), +// valid_username(&username_salt), +// &secret, +// nonce, +// ) +// .unwrap(), +// now, +// ), +// [send_message( +// source, +// channel_bind_response(channel_bind_2_transaction_id), +// )], +// ); + +// assert_eq!( +// server.server.poll_timeout(), +// Some(now + Duration::from_secs(60 * 10)) // For channel expiry +// ); +// } + +// #[proptest] +// fn ping_pong_ip6_relay( +// #[strategy(firezone_relay::proptest::transaction_id())] allocate_transaction_id: TransactionId, +// #[strategy(firezone_relay::proptest::transaction_id())] +// channel_bind_transaction_id: TransactionId, +// #[strategy(firezone_relay::proptest::username_salt())] username_salt: String, +// #[strategy(firezone_relay::proptest::channel_number())] channel: ChannelNumber, +// source: SocketAddrV6, +// peer: SocketAddrV6, +// public_relay_ip4_addr: Ipv4Addr, +// public_relay_ip6_addr: Ipv6Addr, +// peer_to_client_ping: [u8; 32], +// mut client_to_peer_ping: [u8; 36], +// #[strategy(firezone_relay::proptest::nonce())] nonce: Uuid, +// ) { +// let now = Instant::now(); + +// let _ = env_logger::try_init(); + +// let mut server = +// TestServer::new((public_relay_ip4_addr, public_relay_ip6_addr)).with_nonce(nonce); +// let secret = server.auth_secret().to_owned(); +// let lifetime = Lifetime::new(Duration::from_secs(60 * 60)).unwrap(); // Lifetime longer than channel expiry + +// server.assert_commands( +// from_client( +// source, +// Allocate::new_authenticated_udp_ip6( +// allocate_transaction_id, +// Some(lifetime.clone()), +// valid_username(&username_salt), +// &secret, +// nonce, +// ) +// .unwrap(), +// now, +// ), +// [ +// create_allocation(49152, AddressFamily::V6), +// send_message( +// source, +// allocate_response( +// allocate_transaction_id, +// public_relay_ip6_addr, +// 49152, +// source, +// &lifetime, +// ), +// ), +// ], +// ); + +// assert_eq!( +// server.server.poll_timeout(), +// Some(now + lifetime.lifetime()) +// ); + +// let now = now + Duration::from_secs(1); + +// server.assert_commands( +// from_client( +// source, +// ChannelBind::new( +// channel_bind_transaction_id, +// channel, +// XorPeerAddress::new(peer.into()), +// valid_username(&username_salt), +// &secret, +// nonce, +// ) +// .unwrap(), +// now, +// ), +// [send_message( +// source, +// channel_bind_response(channel_bind_transaction_id), +// )], +// ); + +// assert_eq!( +// server.server.poll_timeout(), +// Some(now + Duration::from_secs(60 * 10)) +// ); + +// let now = now + Duration::from_secs(1); + +// ChannelData::encode_header_to_slice(channel, 32, &mut client_to_peer_ping[..4]); +// let maybe_forward = server.server.handle_client_input( +// client_to_peer_ping.as_slice(), +// ClientSocket::new(source.into()), +// now, +// ); + +// assert_eq!( +// maybe_forward, +// Some((AllocationPort::new(49152), PeerSocket::new(peer.into()))) +// ); + +// let maybe_forward = server.server.handle_peer_traffic( +// peer_to_client_ping.as_slice(), +// PeerSocket::new(peer.into()), +// AllocationPort::new(49152), +// ); + +// assert_eq!( +// maybe_forward, +// Some((ClientSocket::new(source.into()), channel)) +// ); +// } + +// struct TestServer { +// server: Server, +// } + +// impl TestServer { +// fn new(relay_public_addr: impl Into) -> Self { +// Self { +// server: Server::new(relay_public_addr, StepRng::new(0, 0), 3478, 49152..=65535), +// } +// } + +// fn with_nonce(mut self, nonce: Uuid) -> Self { +// self.server.add_nonce(nonce); + +// self +// } + +// fn auth_secret(&self) -> &SecretString { +// self.server.auth_secret() +// } + +// fn assert_commands(&mut self, input: Input, output: [Output; N]) { +// match input { +// Input::Client(sender, message, now) => { +// self.server.handle_client_message(message, sender, now); +// } +// Input::Time(now) => { +// self.server.handle_timeout(now); +// } +// } + +// for expected_output in output { +// let Some(actual_output) = self.server.next_command() else { +// let msg = match expected_output { +// Output::SendMessage((recipient, msg)) => { +// format!("to send message {:?} to {recipient}", msg) +// } +// CreateAllocation(port, family) => { +// format!("to create allocation on port {port} for address family {family}") +// } +// FreeAllocation(port, family) => { +// format!("to free allocation on port {port} for address family {family}") +// } +// }; + +// panic!("No commands produced but expected {msg}"); +// }; + +// match (expected_output, actual_output) { +// ( +// Output::SendMessage((to, mut message)), +// Command::SendMessage { payload, recipient }, +// ) => { +// let sent_message = parse_message(&payload); + +// // In order to avoid simulating authentication, we copy the MessageIntegrity attribute. +// if let Some(mi) = sent_message.get_attribute::() { +// message.add_attribute(mi.clone()); +// } + +// let expected_bytes = MessageEncoder::new() +// .encode_into_bytes(message.clone()) +// .unwrap(); + +// if expected_bytes != payload { +// let expected_message = format!("{:?}", message); +// let actual_message = format!("{:?}", sent_message); + +// difference::assert_diff!(&expected_message, &actual_message, "\n", 0); +// } + +// assert_eq!(recipient, to); +// } +// ( +// CreateAllocation(expected_port, expected_family), +// Command::CreateAllocation { +// port: actual_port, +// family: actual_family, +// }, +// ) => { +// assert_eq!(expected_port, actual_port); +// assert_eq!(expected_family, actual_family); +// } +// ( +// FreeAllocation(port, family), +// Command::FreeAllocation { +// port: actual_port, +// family: actual_family, +// }, +// ) => { +// assert_eq!(port, actual_port); +// assert_eq!(family, actual_family); +// } +// (expected, actual) => panic!("Unhandled combination: {expected:?} {actual:?}"), +// } +// } + +// let remaining_commands = iter::from_fn(|| self.server.next_command()).collect::>(); + +// assert_eq!(remaining_commands, vec![]) +// } +// } + +// fn valid_username(salt: &str) -> Username { +// let now_unix = SystemTime::now() +// .duration_since(SystemTime::UNIX_EPOCH) +// .unwrap() +// .as_secs(); + +// let expiry = now_unix + 1000; + +// Username::new(format!("{expiry}:{salt}")).unwrap() +// } + +// fn binding_response( +// transaction_id: TransactionId, +// address: impl Into, +// ) -> Message { +// let mut message = +// Message::::new(MessageClass::SuccessResponse, BINDING, transaction_id); +// message.add_attribute(SOFTWARE.clone()); +// message.add_attribute(XorMappedAddress::new(address.into())); + +// message +// } + +// fn allocate_response( +// transaction_id: TransactionId, +// public_relay_addr: impl Into, +// port: u16, +// source: impl Into, +// lifetime: &Lifetime, +// ) -> Message { +// let mut message = +// Message::::new(MessageClass::SuccessResponse, ALLOCATE, transaction_id); +// message.add_attribute(SOFTWARE.clone()); +// message.add_attribute(XorRelayAddress::new(SocketAddr::new( +// public_relay_addr.into(), +// port, +// ))); +// message.add_attribute(XorMappedAddress::new(source.into())); +// message.add_attribute(lifetime.clone()); + +// message +// } + +// fn unauthorized_allocate_response( +// transaction_id: TransactionId, +// nonce: Uuid, +// ) -> Message { +// let mut message = +// Message::::new(MessageClass::ErrorResponse, ALLOCATE, transaction_id); +// message.add_attribute(SOFTWARE.clone()); +// message.add_attribute(ErrorCode::from(Unauthorized)); +// message.add_attribute(Realm::new("firezone".to_owned()).unwrap()); +// message.add_attribute(Nonce::new(nonce.as_hyphenated().to_string()).unwrap()); + +// message +// } + +// fn refresh_response(transaction_id: TransactionId, lifetime: Lifetime) -> Message { +// let mut message = +// Message::::new(MessageClass::SuccessResponse, REFRESH, transaction_id); +// message.add_attribute(SOFTWARE.clone()); +// message.add_attribute(lifetime); + +// message +// } + +// fn channel_bind_response(transaction_id: TransactionId) -> Message { +// let mut message = +// Message::::new(MessageClass::SuccessResponse, CHANNEL_BIND, transaction_id); +// message.add_attribute(SOFTWARE.clone()); + +// message +// } + +// fn parse_message(message: &[u8]) -> Message { +// MessageDecoder::new() +// .decode_from_bytes(message) +// .unwrap() +// .unwrap() +// } + +// enum Input<'a> { +// Client(ClientSocket, ClientMessage<'a>, Instant), +// Time(Instant), +// } + +// fn from_client<'a>( +// from: impl Into, +// message: impl Into>, +// now: Instant, +// ) -> Input<'a> { +// Input::Client(ClientSocket::new(from.into()), message.into(), now) +// } + +// fn forward_time_to<'a>(when: Instant) -> Input<'a> { +// Input::Time(when) +// } + +// #[derive(Debug)] +// enum Output { +// SendMessage((ClientSocket, Message)), +// CreateAllocation(AllocationPort, AddressFamily), +// FreeAllocation(AllocationPort, AddressFamily), +// } + +// fn create_allocation(port: u16, fam: AddressFamily) -> Output { +// Output::CreateAllocation(AllocationPort::new(port), fam) +// } + +// fn free_allocation(port: u16, fam: AddressFamily) -> Output { +// Output::FreeAllocation(AllocationPort::new(port), fam) +// } + +// fn send_message(source: impl Into, message: Message) -> Output { +// Output::SendMessage((ClientSocket::new(source.into()), message)) +// }