feat(relay): MVP for routing channel data message in eBPF kernel (#8496)

## Abstract

This pull-request implements the first stage of off-loading routing of
TURN data channel messages to the kernel via an eBPF XDP program. In
particular, the eBPF kernel implemented here **only** handles the
decapsulation of IPv4 data channel messages into their embedded UDP
payload. Implementation of other data paths, such as the receiving of
UDP traffic on an allocation and wrapping it in a TURN channel data
message is deferred to a later point for reasons explained further down.
As it stands, this PR implements the bare minimum for us to start
experimenting and benefiting from eBPF. It is already massive as it is
due to the infrastructure required for actually doing this. Let's dive
into it!

## A refresher on TURN channel-data messages

TURN specifies a channel-data message for relaying data between two
peers. A channel data message has a fixed 4-byte header:

- The first two bytes specify the channel number
- The second two bytes specify the length of the encapsulated payload

Like all TURN traffic, channel data messages run over UDP by default,
meaning this header sits at the very front of the UDP payload. This will
be important later.

After making an allocation with a TURN server (i.e. reserving a port on
the TURN server's interfaces), a TURN client can bind channels on that
allocation. As such, channel numbers are scoped to a client's
allocation. Channel numbers are allocated by the client within a given
range (0x4000 - 0x4FFF). When binding a channel, the client specifies
the remote's peer address that they'd like the data sent on the channel
to be sent to.

Given this setup, when a TURN server receives a channel data message, it
first looks at the sender's IP + port to infer the allocation (a client
can only ever have 1 allocation at a time). Within that allocation, the
server then looks for the channel number and retrieves the target socket
address from that. The allocation itself is a port on the relay's
interface. With that, we can now "unpack" the payload of the channel
data message and rewrite it to the new receiver:

- The new source IP can be set from the old dst IP (when operating in
user-space mode this is irrelevant because we are working with the
socket API).
- The new source port is the client's allocation.
- The new destination IP is retrieved from the mapping retrieved via the
channel number.
- The new destination port is retrieved from the mapping retrieved via
the channel number.

Last but not least, all that is left is removing the channel data header
from the UDP payload and we can send out the packet. In other words, we
need to cut off the first 4 bytes of the UDP payload.

## User-space relaying

At present, we implement the above flow in user-space. This is tricky to
do because we need to bind _many_ sockets, one for each possible
allocation port (of which there can be 16383). The actual work to be
done on these packets is also extremely minimal. All we do is cut off
(or add on) the data-channel header. Benchmarks show that we spend
pretty much all of our time copying data between user-space and
kernel-space. Cutting this out should give us a massive increase in
performance.

## Implementing an eBPF XDP TURN router

eBPF has been shown to be a very efficient way of speeding up a TURN
server [0]. After many failed experiments (e.g. using TC instead of XDP)
and countless rabbit-holes, we have also arrived at the design
documented within the paper. Most notably:

- The eBPF program is entirely optional. We try to load it on startup,
but if that fails, we will simply use the user-space mode.
- Retaining the user-space mode is also important because under certain
circumstances, the eBPF kernel needs to pass on the packet, for example,
when receiving IPv4 packets with options. Those make the header
dynamically-sized which makes further processing difficult because the
eBPF verifier disallows indexing into the packet with data derived from
the packet itself.
- In order to add/remove the channel-data header, we shift the packet
headers backwards / forwards and leave the payload in place as the
packet headers are constant in size and can thus easily and cheaply be
copied out.

In order to perform the relaying flow explained above, we introduce maps
that are shared with user-space. These maps go from a tuple of
(client-socket, channel-number) to a tuple of (allocation-port,
peer-socket) and thus give us all the data necessary to rewrite the
packet.

## Integration with our relay

Last but not least, to actually integrate the eBPF kernel with our
relay, we need to extend the `Server` with two more events so we can
learn, when channel bindings are created and when they expire. Using
these events, we can then update the eBPF maps accordingly and therefore
influence the routing behaviour in the kernel.

## Scope

What is implemented here is only one of several possible data paths.
Implementing the others isn't conceptually difficult but it does
increase the scope. Landing something that already works allows us to
gain experience running it in staging (and possibly production).
Additionally, I've hit some issues with the eBPF verifier when adding
more codepaths to the kernel. I expect those to be possible to resolve
given sufficient debugging but I'd like to do so after merging this.

---

Depends-On: #8506
Depends-On: #8507
Depends-On: #8500
Resolves: #8501

[0]: https://dl.acm.org/doi/pdf/10.1145/3609021.3609296
This commit is contained in:
Thomas Eizinger
2025-03-27 21:59:40 +11:00
committed by GitHub
parent fb64c8b971
commit 3c7ac084c0
40 changed files with 2415 additions and 4 deletions

View File

@@ -92,6 +92,7 @@ runs:
NIGHTLY="nightly-2024-12-13"
rustup toolchain install $NIGHTLY
rustup component add rust-src --toolchain $NIGHTLY
echo "nightly=$NIGHTLY" >> $GITHUB_OUTPUT
shell: bash

View File

@@ -264,6 +264,11 @@ jobs:
targets: ${{ matrix.arch.target }}
- name: Install dependencies
run: ${{ matrix.arch.install_dependencies }}
- uses: taiki-e/install-action@0b63bc859f7224657cf7e39426848cabaa36f456 # v2.49.9
with:
tool: bpf-linker
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Build binaries
run: |
set -xe

View File

@@ -55,6 +55,12 @@ jobs:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
tool: cargo-udeps,cargo-deny
- uses: taiki-e/install-action@0b63bc859f7224657cf7e39426848cabaa36f456 # v2.49.9
if: ${{ runner.os == 'Linux' }}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
tool: bpf-linker
- run: |
cargo +${{ steps.setup-rust.outputs.nightly_version }} udeps --all-targets --all-features ${{ steps.setup-rust.outputs.packages }}
name: Check for unused dependencies
@@ -67,6 +73,7 @@ jobs:
shell: bash
- run: cargo deny check --hide-inclusion-graph
shell: bash
test:
name: test-${{ matrix.runs-on }}
strategy:
@@ -94,6 +101,12 @@ jobs:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
tool: ripgrep
- uses: taiki-e/install-action@0b63bc859f7224657cf7e39426848cabaa36f456 # v2.49.9
if: ${{ runner.os == 'Linux' }}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
tool: bpf-linker
- name: "cargo test"
shell: bash
run: |

171
rust/Cargo.lock generated
View File

@@ -207,6 +207,12 @@ dependencies = [
"zbus 5.5.0",
]
[[package]]
name = "assert_matches"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9"
[[package]]
name = "async-broadcast"
version = "0.7.1"
@@ -469,6 +475,131 @@ dependencies = [
"tower-service",
]
[[package]]
name = "aya"
version = "0.13.1"
source = "git+https://github.com/aya-rs/aya#8724cc1b2d61a006f74bc0aab77a5df6dd6b60f3"
dependencies = [
"assert_matches",
"aya-obj",
"bitflags 2.6.0",
"bytes",
"hashbrown 0.15.2",
"libc",
"log",
"object",
"once_cell",
"thiserror 2.0.6",
"tokio",
]
[[package]]
name = "aya-build"
version = "0.1.2"
source = "git+https://github.com/thomaseizinger/aya?branch=fix%2Faya-build-pin-nightly#c766051000cc41e3409477a8fec8547acc4d72fd"
dependencies = [
"anyhow",
"cargo_metadata",
]
[[package]]
name = "aya-ebpf"
version = "0.1.1"
source = "git+https://github.com/aya-rs/aya#8724cc1b2d61a006f74bc0aab77a5df6dd6b60f3"
dependencies = [
"aya-ebpf-bindings",
"aya-ebpf-cty",
"aya-ebpf-macros",
"rustversion",
]
[[package]]
name = "aya-ebpf-bindings"
version = "0.1.1"
source = "git+https://github.com/aya-rs/aya#8724cc1b2d61a006f74bc0aab77a5df6dd6b60f3"
dependencies = [
"aya-ebpf-cty",
]
[[package]]
name = "aya-ebpf-cty"
version = "0.2.2"
source = "git+https://github.com/aya-rs/aya#8724cc1b2d61a006f74bc0aab77a5df6dd6b60f3"
[[package]]
name = "aya-ebpf-macros"
version = "0.1.1"
source = "git+https://github.com/aya-rs/aya#8724cc1b2d61a006f74bc0aab77a5df6dd6b60f3"
dependencies = [
"proc-macro2",
"proc-macro2-diagnostics",
"quote",
"syn 2.0.87",
]
[[package]]
name = "aya-log"
version = "0.2.1"
source = "git+https://github.com/aya-rs/aya#8724cc1b2d61a006f74bc0aab77a5df6dd6b60f3"
dependencies = [
"aya",
"aya-log-common",
"bytes",
"log",
"thiserror 2.0.6",
"tokio",
]
[[package]]
name = "aya-log-common"
version = "0.1.15"
source = "git+https://github.com/aya-rs/aya#8724cc1b2d61a006f74bc0aab77a5df6dd6b60f3"
dependencies = [
"num_enum",
]
[[package]]
name = "aya-log-ebpf"
version = "0.1.1"
source = "git+https://github.com/aya-rs/aya#8724cc1b2d61a006f74bc0aab77a5df6dd6b60f3"
dependencies = [
"aya-ebpf",
"aya-log-common",
"aya-log-ebpf-macros",
]
[[package]]
name = "aya-log-ebpf-macros"
version = "0.1.0"
source = "git+https://github.com/aya-rs/aya#8724cc1b2d61a006f74bc0aab77a5df6dd6b60f3"
dependencies = [
"aya-log-common",
"aya-log-parser",
"proc-macro2",
"quote",
"syn 2.0.87",
]
[[package]]
name = "aya-log-parser"
version = "0.1.13"
source = "git+https://github.com/aya-rs/aya#8724cc1b2d61a006f74bc0aab77a5df6dd6b60f3"
dependencies = [
"aya-log-common",
]
[[package]]
name = "aya-obj"
version = "0.2.1"
source = "git+https://github.com/aya-rs/aya#8724cc1b2d61a006f74bc0aab77a5df6dd6b60f3"
dependencies = [
"bytes",
"hashbrown 0.15.2",
"log",
"object",
"thiserror 2.0.6",
]
[[package]]
name = "backoff"
version = "0.4.0"
@@ -1748,6 +1879,27 @@ version = "1.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d6ef0072f8a535281e4876be788938b528e9a1d43900b82c2569af7da799125"
[[package]]
name = "ebpf-shared"
version = "0.1.0"
dependencies = [
"aya",
]
[[package]]
name = "ebpf-turn-router"
version = "0.1.0"
dependencies = [
"aya-ebpf",
"aya-log-ebpf",
"ebpf-shared",
"etherparse",
"etherparse-ext",
"hex-literal",
"ip-packet",
"which",
]
[[package]]
name = "either"
version = "1.15.0"
@@ -2183,6 +2335,9 @@ name = "firezone-relay"
version = "0.1.0"
dependencies = [
"anyhow",
"aya",
"aya-build",
"aya-log",
"backoff",
"base64 0.22.1",
"bytecodec",
@@ -2190,6 +2345,7 @@ dependencies = [
"clap",
"derive_more 1.0.0",
"difference",
"ebpf-shared",
"env_logger",
"firezone-bin-shared",
"firezone-logging",
@@ -4322,6 +4478,9 @@ version = "0.36.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aedf0a2d09c573ed1d8d85b30c119153926a2b36dce0ab28322c09a117a4683e"
dependencies = [
"crc32fast",
"hashbrown 0.15.2",
"indexmap 2.6.0",
"memchr",
]
@@ -4919,6 +5078,18 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "proc-macro2-diagnostics"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af066a9c399a26e020ada66a034357a868728e72cd426f3adcd35f80d88d88c8"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
"version_check",
]
[[package]]
name = "proptest"
version = "1.6.0"

View File

@@ -19,7 +19,9 @@ members = [
"ip-packet",
"logging",
"phoenix-channel",
"relay",
"relay/ebpf-shared",
"relay/ebpf-turn-router",
"relay/server",
"socket-factory",
"telemetry",
"tests/gui-smoke-test",
@@ -39,6 +41,11 @@ arboard = { version = "3.4.0", default-features = false }
async-trait = { version = "0.1", default-features = false }
atomicwrites = "0.4.4"
axum = { version = "0.7.7", default-features = false }
aya = { git = "https://github.com/aya-rs/aya" }
aya-build = { git = "https://github.com/thomaseizinger/aya", branch = "fix/aya-build-pin-nightly" }
aya-ebpf = { git = "https://github.com/aya-rs/aya" }
aya-log = { git = "https://github.com/aya-rs/aya" }
aya-log-ebpf = { git = "https://github.com/aya-rs/aya" }
backoff = { version = "0.4", features = ["tokio"] }
base64 = { version = "0.22.1", default-features = false }
bimap = "0.6"
@@ -67,7 +74,8 @@ firezone-bin-shared = { path = "bin-shared" }
firezone-gui-client-common = { path = "gui-client/src-common" }
firezone-headless-client = { path = "headless-client" }
firezone-logging = { path = "logging" }
firezone-relay = { path = "relay" }
firezone-relay = { path = "relay/server" }
ebpf-shared = { path = "relay/ebpf-shared" }
firezone-telemetry = { path = "telemetry" }
firezone-tunnel = { path = "connlib/tunnel" }
flume = { version = "0.11.1", features = ["async"] }
@@ -173,6 +181,7 @@ url = "2.5.2"
uuid = "1.16.0"
windows = "0.58.0"
winreg = "0.52.0"
which = "4.4.2"
zbus = "5.5.0"
zip = { version = "2", default-features = false }
@@ -220,3 +229,7 @@ split-debuginfo = "packed"
[profile.release.package.firezone-gui-client]
debug = "full"
split-debuginfo = "packed"
[profile.release.package.ebpf-turn-router]
debug = 2
codegen-units = 1

View File

@@ -474,6 +474,8 @@ impl TunnelTest {
relay.deallocate_port(port.value(), family);
relay.exec_mut(|r| r.allocations.remove(&(family, port)));
}
firezone_relay::Command::CreateChannelBinding { .. } => {}
firezone_relay::Command::DeleteChannelBinding { .. } => {}
}
continue 'outer;

View File

@@ -0,0 +1,11 @@
[package]
name = "ebpf-shared"
version = "0.1.0"
edition = { workspace = true }
license = { workspace = true }
[features]
std = ["aya"]
[target.'cfg(target_os = "linux")'.dependencies]
aya = { workspace = true, optional = true }

View File

@@ -0,0 +1,127 @@
//! Shared data structures between the kernel and userspace.
//!
//! To learn more about the layout requirements of these structs, read <https://github.com/foniod/redbpf/issues/150#issuecomment-964017857>.
#![cfg_attr(not(feature = "std"), no_std)]
#[repr(C)]
#[derive(Clone, Copy)]
#[cfg_attr(feature = "std", derive(Debug))]
pub struct ClientAndChannelV4 {
ipv4_address: [u8; 4],
_padding_ipv4_address: [u8; 4],
port: u16,
_padding_port: [u8; 6],
channel: u16,
_padding_channel: [u8; 6],
_padding_struct: [u8; 40],
}
impl ClientAndChannelV4 {
pub fn new(ipv4_address: [u8; 4], port: u16, channel: u16) -> Self {
Self {
ipv4_address,
_padding_ipv4_address: [0u8; 4],
port: port.to_be(),
_padding_port: [0u8; 6],
channel: channel.to_be(),
_padding_channel: [0u8; 6],
_padding_struct: [0u8; 40],
}
}
pub fn from_socket(src: core::net::SocketAddrV4, channel: u16) -> Self {
Self::new(src.ip().octets(), src.port(), channel)
}
pub fn client_ip(&self) -> [u8; 4] {
self.ipv4_address
}
pub fn client_port(&self) -> u16 {
self.port
}
pub fn channel(&self) -> u16 {
self.channel
}
}
#[repr(C)]
#[derive(Clone, Copy)]
#[cfg_attr(feature = "std", derive(Debug))]
pub struct PortAndPeerV4 {
ipv4_address: [u8; 4],
_padding_ipv4_address: [u8; 4],
allocation_port: u16,
_padding_allocation_port: [u8; 6],
peer_port: u16,
_padding_dest_port: [u8; 6],
}
impl PortAndPeerV4 {
pub fn new(ipv4_address: [u8; 4], allocation_port: u16, peer_port: u16) -> Self {
Self {
ipv4_address,
_padding_ipv4_address: [0u8; 4],
allocation_port,
_padding_allocation_port: [0u8; 6],
peer_port,
_padding_dest_port: [0u8; 6],
}
}
pub fn from_socket(dst: core::net::SocketAddrV4, allocation_port: u16) -> Self {
Self::new(dst.ip().octets(), allocation_port, dst.port())
}
pub fn peer_ip(&self) -> [u8; 4] {
self.ipv4_address
}
pub fn allocation_port(&self) -> u16 {
self.allocation_port
}
pub fn peer_port(&self) -> u16 {
self.peer_port
}
}
#[repr(C)]
#[derive(Clone, Copy, Default)]
#[cfg_attr(feature = "std", derive(Debug))]
pub struct Config {
pub udp_checksum_enabled: bool,
}
#[cfg(all(feature = "std", target_os = "linux"))]
mod userspace {
use super::*;
unsafe impl aya::Pod for ClientAndChannelV4 {}
unsafe impl aya::Pod for PortAndPeerV4 {}
unsafe impl aya::Pod for Config {}
}
#[cfg(all(test, feature = "std"))]
mod tests {
use super::*;
#[test]
fn client_and_channel_has_size_64() {
assert_eq!(std::mem::size_of::<ClientAndChannelV4>(), 64)
}
}

View File

@@ -0,0 +1,23 @@
[package]
name = "ebpf-turn-router"
version = "0.1.0"
edition = { workspace = true }
license = { workspace = true }
[dependencies]
aya-ebpf = { workspace = true }
aya-log-ebpf = { workspace = true }
ebpf-shared = { workspace = true }
etherparse = { workspace = true }
etherparse-ext = { workspace = true }
[dev-dependencies]
hex-literal = { workspace = true }
ip-packet = { workspace = true }
[build-dependencies]
which = { workspace = true }
[[bin]]
name = "ebpf-turn-router-main" # This needs to be different from the package name otherwise the build-script fails to differentiate between the directory it is built in and the actual binary.
path = "src/main.rs"

View File

@@ -0,0 +1,17 @@
use which::which;
/// Building this crate has an undeclared dependency on the `bpf-linker` binary. This would be
/// better expressed by [artifact-dependencies][bindeps] but issues such as
/// https://github.com/rust-lang/cargo/issues/12385 make their use impractical for the time being.
///
/// This file implements an imperfect solution: it causes cargo to rebuild the crate whenever the
/// mtime of `which bpf-linker` changes. Note that possibility that a new bpf-linker is added to
/// $PATH ahead of the one used as the cache key still exists. Solving this in the general case
/// would require rebuild-if-changed-env=PATH *and* rebuild-if-changed={every-directory-in-PATH}
/// which would likely mean far too much cache invalidation.
///
/// [bindeps]: https://doc.rust-lang.org/nightly/cargo/reference/unstable.html?highlight=feature#artifact-dependencies
fn main() {
let bpf_linker = which("bpf-linker").expect("bpf-linker not found in $PATH");
println!("cargo:rerun-if-changed={}", bpf_linker.to_str().unwrap());
}

View File

@@ -0,0 +1,64 @@
use crate::{CHANNEL_DATA_HEADER_LEN, Error, slice_mut_at::slice_mut_at};
use aya_ebpf::programs::XdpContext;
use etherparse::{Ethernet2Header, Ipv4Header, UdpHeader};
/// Represents a channel-data header within our packet.
pub struct ChannelData<'a> {
number: u16,
_ctx: &'a XdpContext,
}
impl<'a> ChannelData<'a> {
pub fn parse(ctx: &'a XdpContext) -> Result<Self, Error> {
let cdhdr = slice_mut_at::<{ CHANNEL_DATA_HEADER_LEN }>(
ctx,
Ethernet2Header::LEN + Ipv4Header::MIN_LEN + UdpHeader::LEN,
)?;
if !(64..=79).contains(&cdhdr[0]) {
return Err(Error::NotAChannelDataMessage);
}
let number = u16::from_be_bytes([cdhdr[0], cdhdr[1]]);
// Untrusted because we read it from the packet.
let untrusted_channel_data_length = u16::from_be_bytes([cdhdr[2], cdhdr[3]]);
// The eBPF verifier doesn't allow you to use data read from the packet to index into the packet.
// Hence, we cannot read the length of the channel-data message and use it on the packet.
// So what we do instead is, we check how many bytes we have left in the packet and compare it to the length we read.
let length = remaining_bytes(
ctx,
Ethernet2Header::LEN + Ipv4Header::MIN_LEN + UdpHeader::LEN + CHANNEL_DATA_HEADER_LEN,
)?;
// We received less (or more) data than the header said we would.
if length != usize::from(untrusted_channel_data_length) {
return Err(Error::BadChannelDataLength);
}
// After we have verified the length, we don't need it anymore.
Ok(Self { number, _ctx: ctx })
}
pub fn number(&self) -> u16 {
self.number
}
}
/// Computes how many more bytes we have in the packet after a given offset.
#[inline(always)]
fn remaining_bytes(ctx: &XdpContext, offset: usize) -> Result<usize, Error> {
let start = ctx.data() + offset;
let end = ctx.data_end();
if start > end {
return Err(Error::PacketTooShort);
}
let len = end - start;
Ok(len)
}

View File

@@ -0,0 +1,131 @@
//! Incremental updates to Internet checksums.
//!
//! The Internet checksum is the one's complement of the one's complement sum of certain 16-bit words.
//! The use of one's complement arithmetic allows us to make incremental updates to a checksum without requiring a full re-computation.
//!
//! That is what we are implementing in this module.
//! There are three things you need to know:
//!
//! 1. The one's complement of a number `x` is `!x`.
//! 2. Addition in one's complement arithmetic is the same as regular addition, except that upon overflow, we add an additional bit.
//! 3. Subtraction in one's complement arithmetic is implemented as the addition of the one's complement of the number to be subtracted.
//!
//! This allows us to e.g. take an existing IP header checksum and update it to account for just the destination address changing.
#[derive(Default)]
pub struct ChecksumUpdate {
inner: u16,
}
impl ChecksumUpdate {
pub fn new(checksum: u16) -> Self {
Self { inner: !checksum }
}
pub fn remove_u16(self, val: u16) -> Self {
self.ones_complement_add(!val)
}
pub fn remove_addr(self, val: [u8; 4]) -> Self {
self.remove_u16(fold_u32_into_u16(u32::from_be_bytes(val)))
}
pub fn add_u16(self, val: u16) -> Self {
self.ones_complement_add(val)
}
pub fn add_addr(self, val: [u8; 4]) -> Self {
self.add_u16(fold_u32_into_u16(u32::from_be_bytes(val)))
}
pub fn add_update(self, update: ChecksumUpdate) -> Self {
self.add_u16(update.inner)
}
fn ones_complement_add(self, val: u16) -> Self {
let (res, carry) = self.inner.overflowing_add(val);
Self {
inner: res + (carry as u16),
}
}
pub fn into_checksum(self) -> u16 {
!self.inner
}
}
#[inline(always)]
fn fold_u32_into_u16(mut csum: u32) -> u16 {
csum = (csum & 0xffff) + (csum >> 16);
csum = (csum & 0xffff) + (csum >> 16);
csum as u16
}
#[cfg(test)]
mod tests {
use core::net::Ipv4Addr;
use super::*;
#[test]
fn recompute_udp_checksum() {
let old_src_ip = Ipv4Addr::new(172, 28, 0, 100);
let old_dst_ip = Ipv4Addr::new(172, 28, 0, 101);
let old_src_port = 45088;
let old_dst_port = 3478;
let old_udp_payload = hex_literal::hex!(
"400100400101002c2112a44293d108418ca2a8fdf7e8930e002000080001b6c58d0ea42b00080014019672bb752bf292ecf95b498b8b4797eacef51d80280004a057ff1e"
);
let channel_number = 0x4001;
let channel_data_len = 0x0040;
let incoming_ip_packet = ip_packet::make::udp_packet(
old_src_ip,
old_dst_ip,
old_src_port,
old_dst_port,
old_udp_payload.to_vec(),
)
.unwrap();
let incoming_checksum = incoming_ip_packet.as_udp().unwrap().checksum();
let new_src_ip = Ipv4Addr::new(172, 28, 0, 101);
let new_dst_ip = Ipv4Addr::new(172, 28, 0, 105);
let new_src_port = 4324;
let new_dst_port = 59385;
let new_udp_payload = hex_literal::hex!(
"0101002c2112a44293d108418ca2a8fdf7e8930e002000080001b6c58d0ea42b00080014019672bb752bf292ecf95b498b8b4797eacef51d80280004a057ff1e"
);
let outgoing_ip_packet = ip_packet::make::udp_packet(
new_src_ip,
new_dst_ip,
new_src_port,
new_dst_port,
new_udp_payload.to_vec(),
)
.unwrap();
let outgoing_checksum = outgoing_ip_packet.as_udp().unwrap().checksum();
let computed_checksum = ChecksumUpdate::new(incoming_checksum)
.remove_addr(old_src_ip.octets())
.remove_addr(old_dst_ip.octets())
.remove_u16(old_src_port)
.remove_u16(old_dst_port)
.remove_u16(old_udp_payload.len() as u16)
.remove_u16(old_udp_payload.len() as u16)
.remove_u16(channel_number)
.remove_u16(channel_data_len)
.add_addr(new_src_ip.octets())
.add_addr(new_dst_ip.octets())
.add_u16(new_src_port)
.add_u16(new_dst_port)
.add_u16(new_udp_payload.len() as u16)
.add_u16(new_udp_payload.len() as u16)
.into_checksum();
assert_eq!(computed_checksum, outgoing_checksum)
}
}

View File

@@ -0,0 +1,12 @@
use aya_ebpf::{macros::map, maps::Array};
use ebpf_shared::Config;
/// Dynamic configuration of the eBPF program.
#[map]
static CONFIG: Array<Config> = Array::with_max_entries(1, 0);
pub fn udp_checksum_enabled() -> bool {
CONFIG
.get(0)
.is_some_and(|config| config.udp_checksum_enabled)
}

View File

@@ -0,0 +1,46 @@
use core::num::NonZeroUsize;
use aya_ebpf::bindings::xdp_action;
#[derive(Debug, Clone, Copy)]
pub enum Error {
ParseEthernet2Header,
ParseIpv4Header,
ParseUdpHeader,
PacketTooShort,
Ipv4PacketWithOptions,
NotAChannelDataMessage,
BadChannelDataLength,
}
impl Error {
pub fn xdp_action(&self) -> xdp_action::Type {
match self {
Error::ParseEthernet2Header => xdp_action::XDP_PASS,
Error::ParseIpv4Header => xdp_action::XDP_PASS,
Error::ParseUdpHeader => xdp_action::XDP_PASS,
Error::PacketTooShort => xdp_action::XDP_PASS,
Error::Ipv4PacketWithOptions => xdp_action::XDP_PASS,
Error::BadChannelDataLength => xdp_action::XDP_DROP,
Error::NotAChannelDataMessage => xdp_action::XDP_PASS,
}
}
}
impl aya_log_ebpf::WriteToBuf for Error {
fn write(self, buf: &mut [u8]) -> Option<NonZeroUsize> {
let msg = match self {
Error::ParseEthernet2Header => "Failed to parse Ethernet2 header",
Error::ParseIpv4Header => "Failed to parse IPv4 header",
Error::ParseUdpHeader => "Failed to parse UDP header",
Error::PacketTooShort => "Packet is too short",
Error::Ipv4PacketWithOptions => "IPv4 packet has optiosn",
Error::NotAChannelDataMessage => "Not a channel data message",
Error::BadChannelDataLength => "Channel data length does not match packet length",
};
msg.write(buf)
}
}
impl aya_log_ebpf::macro_support::DefaultFormatter for Error {}

View File

@@ -0,0 +1,24 @@
use aya_ebpf::programs::XdpContext;
use etherparse::{EtherType, Ethernet2Header, Ethernet2HeaderSlice};
use crate::{error::Error, slice_mut_at};
pub struct Eth2 {
ty: EtherType,
}
impl Eth2 {
pub fn parse(ctx: &XdpContext) -> Result<Self, Error> {
let slice = slice_mut_at::<{ Ethernet2Header::LEN }>(ctx, 0)?;
let eth =
Ethernet2HeaderSlice::from_slice(slice).map_err(|_| Error::ParseEthernet2Header)?;
Ok(Self {
ty: eth.ether_type(),
})
}
pub fn ether_type(&self) -> EtherType {
self.ty
}
}

View File

@@ -0,0 +1,99 @@
use crate::{Error, checksum::ChecksumUpdate, slice_mut_at::slice_mut_at};
use aya_ebpf::programs::XdpContext;
use aya_log_ebpf::debug;
use etherparse::{Ethernet2Header, IpNumber, Ipv4Header, Ipv4HeaderSlice};
use etherparse_ext::Ipv4HeaderSliceMut;
/// Represents an IPv4 header within our packet.
pub struct Ip4<'a> {
src: [u8; 4],
dst: [u8; 4],
protocol: IpNumber,
checksum: u16,
total_len: u16,
ctx: &'a XdpContext,
/// Mutable slice of the IPv4 header, allows us to modify the header in-place.
slice_mut: Ipv4HeaderSliceMut<'a>,
}
impl<'a> Ip4<'a> {
pub fn parse(ctx: &'a XdpContext) -> Result<Self, Error> {
let slice_mut = slice_mut_at::<{ Ipv4Header::MIN_LEN }>(ctx, Ethernet2Header::LEN)?;
let ipv4_slice =
Ipv4HeaderSlice::from_slice(slice_mut).map_err(|_| Error::ParseIpv4Header)?;
// IPv4 packets with options are handled in user-space.
if usize::from(ipv4_slice.ihl() * 4) != Ipv4Header::MIN_LEN {
return Err(Error::Ipv4PacketWithOptions);
}
Ok(Self {
ctx,
src: ipv4_slice.source(),
dst: ipv4_slice.destination(),
protocol: ipv4_slice.protocol(),
checksum: ipv4_slice.header_checksum(),
total_len: ipv4_slice.total_len(),
slice_mut: {
// SAFETY: We parsed the slice as an IPv4 header above.
unsafe { Ipv4HeaderSliceMut::from_slice_unchecked(slice_mut) }
},
})
}
pub fn src(&self) -> [u8; 4] {
self.src
}
pub fn dst(&self) -> [u8; 4] {
self.dst
}
pub fn protocol(&self) -> IpNumber {
self.protocol
}
pub fn total_len(&self) -> u16 {
self.total_len
}
/// Update this packet with a new source, destination, and total length.
///
/// Returns a [`ChecksumUpdate`] representing the checksum-difference of the "IP pseudo-header."
/// which is used in certain L4 protocols (e.g. UDP).
pub fn update(mut self, new_src: [u8; 4], new_dst: [u8; 4], new_len: u16) -> ChecksumUpdate {
self.slice_mut.set_source(new_src);
self.slice_mut.set_destination(new_dst);
self.slice_mut.set_total_length(new_len.to_be_bytes());
let ip_pseudo_header = ChecksumUpdate::default()
.remove_addr(self.src)
.add_addr(new_src)
.remove_addr(self.dst)
.add_addr(new_dst);
self.slice_mut.set_checksum(
ChecksumUpdate::new(self.checksum)
.remove_addr(self.src)
.add_addr(new_src)
.remove_addr(self.dst)
.add_addr(new_dst)
.remove_u16(self.total_len)
.add_u16(new_len)
.into_checksum(),
);
debug!(
self.ctx,
"IP4 header update: src {:i} -> {:i}; dst {:i} -> {:i}",
self.src,
new_src,
self.dst,
new_dst,
);
ip_pseudo_header
}
}

View File

@@ -0,0 +1,152 @@
#![cfg_attr(target_arch = "bpf", no_std)]
#![cfg_attr(target_arch = "bpf", no_main)]
use crate::error::Error;
use crate::slice_mut_at::slice_mut_at;
use aya_ebpf::{
bindings::xdp_action,
macros::{map, xdp},
maps::HashMap,
programs::XdpContext,
};
use aya_log_ebpf::*;
use channel_data::ChannelData;
use ebpf_shared::{ClientAndChannelV4, PortAndPeerV4};
use eth2::Eth2;
use etherparse::{EtherType, IpNumber};
use ip4::Ip4;
use move_headers::remove_channel_data_header_ipv4;
use udp::Udp;
mod channel_data;
mod checksum;
mod config;
mod error;
mod eth2;
mod ip4;
mod move_headers;
mod slice_mut_at;
mod udp;
pub const CHANNEL_DATA_HEADER_LEN: usize = 4;
/// Channel mappings from an IPv4 socket + channel number to an IPv4 socket + port.
///
/// TODO: Update flags to `BPF_F_NO_PREALLOC` to guarantee atomicity? Needs research.
#[map]
static CHAN_TO_UDP_44: HashMap<ClientAndChannelV4, PortAndPeerV4> =
HashMap::with_max_entries(0x100000, 0);
#[map]
static UDP_TO_CHAN_44: HashMap<PortAndPeerV4, ClientAndChannelV4> =
HashMap::with_max_entries(0x100000, 0);
#[xdp]
pub fn handle_turn(ctx: XdpContext) -> u32 {
try_handle_turn(&ctx).unwrap_or_else(|e| {
let action = e.xdp_action();
debug!(&ctx, "Failed to handle packet {}; action = {}", e, action);
action
})
}
#[inline(always)]
fn try_handle_turn(ctx: &XdpContext) -> Result<u32, Error> {
let eth = Eth2::parse(ctx)?;
match eth.ether_type() {
EtherType::IPV4 => try_handle_turn_ipv4(ctx),
EtherType::IPV6 => try_handle_turn_ipv6(ctx),
_ => Ok(xdp_action::XDP_PASS),
}
}
#[inline(always)]
fn try_handle_turn_ipv4(ctx: &XdpContext) -> Result<u32, Error> {
let ipv4 = Ip4::parse(ctx)?;
if ipv4.protocol() != IpNumber::UDP {
return Ok(xdp_action::XDP_PASS);
}
let udp = Udp::parse(ctx)?; // TODO: Change the API so we parse the UDP header _from_ the ipv4 struct?
trace!(ctx, "New packet from {:i}:{}", ipv4.src(), udp.src());
if udp.dst() == 3478 {
try_handle_ipv4_channel_data_to_udp(ctx, ipv4, udp)
} else {
try_handle_ipv4_udp_to_channel_data(ctx, ipv4, udp)
}
}
fn try_handle_ipv4_channel_data_to_udp(
ctx: &XdpContext,
ipv4: Ip4,
udp: Udp,
) -> Result<u32, Error> {
let cd = ChannelData::parse(ctx)?;
// SAFETY: ???
let maybe_peer =
unsafe { CHAN_TO_UDP_44.get(&ClientAndChannelV4::new(ipv4.src(), udp.src(), cd.number())) };
let Some(port_and_peer) = maybe_peer else {
debug!(
ctx,
"No channel binding from {:i}:{} for channel {}",
ipv4.src(),
udp.src(),
cd.number(),
);
return Ok(xdp_action::XDP_PASS);
};
let new_src = ipv4.dst(); // The IP we received the packet on will be the new source IP.
let new_ipv4_total_len = ipv4.total_len() - CHANNEL_DATA_HEADER_LEN as u16;
let pseudo_header = ipv4.update(new_src, port_and_peer.peer_ip(), new_ipv4_total_len);
let new_udp_len = udp.len() - CHANNEL_DATA_HEADER_LEN as u16;
udp.update(
pseudo_header,
port_and_peer.allocation_port(),
port_and_peer.peer_port(),
new_udp_len,
);
remove_channel_data_header_ipv4(ctx);
Ok(xdp_action::XDP_TX)
}
#[inline(always)]
#[expect(unused_variables, reason = "Will be used in the future.")]
fn try_handle_ipv4_udp_to_channel_data(
ctx: &XdpContext,
ipv4: Ip4,
udp: Udp,
) -> Result<u32, Error> {
// Not yet implemented ...
Ok(xdp_action::XDP_PASS)
}
fn try_handle_turn_ipv6(_: &XdpContext) -> Result<u32, Error> {
Ok(xdp_action::XDP_PASS)
}
/// Defines our panic handler.
///
/// This doesn't do anything because we can never actually panic in eBPF.
/// Attempting to link a program that wants to abort fails at compile time anyway.
#[cfg(target_arch = "bpf")]
#[panic_handler]
fn on_panic(_: &core::panic::PanicInfo) -> ! {
loop {}
}
#[cfg(not(target_arch = "bpf"))]
fn main() {
panic!("This program is meant to be compiled as an eBPF program.");
}

View File

@@ -0,0 +1,61 @@
use aya_ebpf::{
cty::c_void,
helpers::{bpf_xdp_adjust_head, bpf_xdp_load_bytes, bpf_xdp_store_bytes},
programs::XdpContext,
};
use etherparse::{Ethernet2Header, Ipv4Header, Ipv6Header, UdpHeader};
use crate::CHANNEL_DATA_HEADER_LEN;
pub fn remove_channel_data_header_ipv4(ctx: &XdpContext) {
move_headers::<{ CHANNEL_DATA_HEADER_LEN as i32 }, { Ipv4Header::MIN_LEN }>(ctx)
}
#[expect(dead_code, reason = "Will be used in the future.")]
pub fn add_channel_data_header_ipv4(ctx: &XdpContext, mut header: [u8; 4]) {
move_headers::<{ -(CHANNEL_DATA_HEADER_LEN as i32) }, { Ipv4Header::MIN_LEN }>(ctx);
let offset = (Ethernet2Header::LEN + Ipv4Header::MIN_LEN + UdpHeader::LEN) as u32;
unsafe {
bpf_xdp_store_bytes(
ctx.ctx,
offset,
header.as_mut_ptr() as *mut c_void,
CHANNEL_DATA_HEADER_LEN as u32,
)
};
}
fn move_headers<const DELTA: i32, const IP_HEADER_LEN: usize>(ctx: &XdpContext) {
// Scratch space for our headers.
// IPv6 headers are always 40 bytes long.
// IPv4 headers are between 20 and 60 bytes long.
// We restrict the eBPF program to only handle 20 byte long IPv4 headers.
// Therefore, we only need to reserver space for IPv6 headers.
//
// Ideally, we would just use the const-generic argument here but that is not yet supported ...
let mut headers = [0u8; Ethernet2Header::LEN + Ipv6Header::LEN + UdpHeader::LEN];
// Copy headers into buffer.
unsafe {
bpf_xdp_load_bytes(
ctx.ctx,
0,
headers.as_mut_ptr() as *mut c_void,
(Ethernet2Header::LEN + IP_HEADER_LEN + UdpHeader::LEN) as u32,
);
}
// Move the head for the packet by `DELTA`.
unsafe { bpf_xdp_adjust_head(ctx.ctx, DELTA) };
// Copy the headers back.
unsafe {
bpf_xdp_store_bytes(
ctx.ctx,
0,
headers.as_mut_ptr() as *mut c_void,
(Ethernet2Header::LEN + IP_HEADER_LEN + UdpHeader::LEN) as u32,
)
};
}

View File

@@ -0,0 +1,17 @@
use aya_ebpf::programs::XdpContext;
use crate::error::Error;
/// Helper function to get a mutable slice of `LEN` bytes at `offset` from the packet data.
#[inline(always)]
pub fn slice_mut_at<const LEN: usize>(ctx: &XdpContext, offset: usize) -> Result<&mut [u8], Error> {
let start = ctx.data();
let end = ctx.data_end();
// Ensure our access is not out-of-bounds.
if start + offset + LEN > end {
return Err(Error::PacketTooShort);
}
Ok(unsafe { core::slice::from_raw_parts_mut((start + offset) as *mut u8, LEN) })
}

View File

@@ -0,0 +1,86 @@
use crate::{Error, checksum::ChecksumUpdate, slice_mut_at::slice_mut_at};
use aya_ebpf::programs::XdpContext;
use aya_log_ebpf::debug;
use etherparse::{Ethernet2Header, Ipv4Header, UdpHeader, UdpHeaderSlice};
use etherparse_ext::UdpHeaderSliceMut;
/// Represents a UDP header within our packet.
pub struct Udp<'a> {
src: u16,
dst: u16,
len: u16,
checksum: u16,
ctx: &'a XdpContext,
/// Mutable slice of the UDP header, allows us to modify the header in-place.
slice_mut: UdpHeaderSliceMut<'a>,
}
impl<'a> Udp<'a> {
pub fn parse(ctx: &'a XdpContext) -> Result<Self, Error> {
let slice =
slice_mut_at::<{ UdpHeader::LEN }>(ctx, Ethernet2Header::LEN + Ipv4Header::MIN_LEN)?;
let udp_slice = UdpHeaderSlice::from_slice(slice).map_err(|_| Error::ParseUdpHeader)?;
Ok(Self {
src: udp_slice.source_port(),
dst: udp_slice.destination_port(),
len: udp_slice.length(),
checksum: udp_slice.checksum(),
ctx,
slice_mut: {
// SAFETY: We parsed the slice as a UDP header above.
unsafe { UdpHeaderSliceMut::from_slice_unchecked(slice) }
},
})
}
pub fn src(&self) -> u16 {
self.src
}
pub fn dst(&self) -> u16 {
self.dst
}
pub fn len(&self) -> u16 {
self.len
}
/// Update this packet with a new source, destination, and length.
pub fn update(
mut self,
ip_pseudo_header: ChecksumUpdate,
new_src: u16,
new_dst: u16,
new_len: u16,
) {
self.slice_mut.set_source_port(new_src);
self.slice_mut.set_destination_port(new_dst);
self.slice_mut.set_length(new_len);
let ip_pseudo_header = ip_pseudo_header.remove_u16(self.len).add_u16(new_len);
if crate::config::udp_checksum_enabled() {
self.slice_mut.set_checksum(
ChecksumUpdate::new(self.checksum)
.add_update(ip_pseudo_header)
.remove_u16(self.len)
.add_u16(new_len)
.remove_u16(self.src)
.add_u16(self.src)
.remove_u16(self.dst)
.add_u16(self.dst)
.into_checksum(),
);
} else {
self.slice_mut.set_checksum(0);
}
debug!(
self.ctx,
"UDP header update: src {} -> {}; dst {} -> {}", self.src, new_src, self.dst, new_dst,
);
}
}

View File

@@ -4,6 +4,9 @@ version = "0.1.0"
edition = { workspace = true }
license = { workspace = true }
[package.metadata.cargo-udeps.ignore]
development = ["difference", "env_logger"]
[dependencies]
anyhow = { workspace = true }
backoff = { workspace = true }
@@ -12,6 +15,7 @@ bytecodec = { workspace = true }
bytes = { workspace = true }
clap = { workspace = true, features = ["derive", "env"] }
derive_more = { workspace = true, features = ["from"] }
ebpf-shared = { workspace = true, features = ["std"] }
firezone-bin-shared = { workspace = true }
firezone-logging = { workspace = true }
firezone-telemetry = { workspace = true }
@@ -45,10 +49,19 @@ trackable = { workspace = true }
url = { workspace = true }
uuid = { workspace = true, features = ["v4"] }
[target.'cfg(target_os = "linux")'.dependencies]
aya = { workspace = true }
aya-log = { workspace = true }
[dev-dependencies]
difference = { workspace = true }
env_logger = { workspace = true }
test-strategy = { workspace = true }
tokio = { workspace = true, features = ["process", "macros", "net"] }
[build-dependencies]
anyhow = "1"
aya-build = { workspace = true }
[[test]]
name = "regression"

View File

@@ -0,0 +1,24 @@
#[cfg(target_os = "linux")]
fn main() -> anyhow::Result<()> {
use anyhow::Context as _;
use aya_build::cargo_metadata::{MetadataCommand, Package};
let package = MetadataCommand::new()
.no_deps()
.exec()
.context("MetadataCommand::exec")?
.packages
.into_iter()
.find(|Package { name, .. }| name == "ebpf-turn-router")
.context("`ebpf-turn-router` package not found")?;
aya_build::build_ebpf(
[package],
aya_build::Toolchain::Custom("+nightly-2024-12-13"),
)?;
Ok(())
}
#[cfg(not(target_os = "linux"))]
fn main() {}

View File

@@ -0,0 +1,8 @@
#[cfg(target_os = "linux")]
#[path = "ebpf/linux.rs"]
mod platform;
#[cfg(not(target_os = "linux"))]
#[path = "ebpf/stub.rs"]
mod platform;
pub use platform::Program;

View File

@@ -0,0 +1,148 @@
use std::net::SocketAddr;
use anyhow::{Context as _, Result};
use aya::{
Pod,
maps::{Array, HashMap, MapData},
programs::{Xdp, XdpFlags},
};
use aya_log::EbpfLogger;
use ebpf_shared::{ClientAndChannelV4, Config, PortAndPeerV4};
use stun_codec::rfc5766::attributes::ChannelNumber;
use crate::{AllocationPort, ClientSocket, PeerSocket};
pub struct Program {
ebpf: aya::Ebpf,
}
impl Program {
pub fn try_load(interface: &'static str) -> Result<Self> {
let mut ebpf = aya::Ebpf::load(aya::include_bytes_aligned!(concat!(
env!("OUT_DIR"),
"/ebpf-turn-router-main"
)))?;
let _ = EbpfLogger::init(&mut ebpf);
let program: &mut Xdp = ebpf
.program_mut("handle_turn")
.context("No program")?
.try_into()?;
program.load().context("Failed to load program")?;
program
.attach(interface, XdpFlags::default())
.with_context(|| format!("Failed to attached to interface {interface}"))?;
Ok(Self { ebpf })
}
pub fn add_channel_binding(
&mut self,
client: ClientSocket,
channel_number: ChannelNumber,
peer: PeerSocket,
allocation_port: AllocationPort,
) -> Result<()> {
let client = client.into_socket();
let peer = peer.into_socket();
match (client, peer) {
(SocketAddr::V4(client), SocketAddr::V4(peer)) => {
let client_and_channel =
ClientAndChannelV4::from_socket(client, channel_number.value());
let port_and_peer = PortAndPeerV4::from_socket(peer, allocation_port.value());
self.chan_to_udp_44_map_mut()?
.insert(client_and_channel, port_and_peer, 0)?;
self.udp_to_chan_44_map_mut()?
.insert(port_and_peer, client_and_channel, 0)?;
}
(SocketAddr::V6(_), SocketAddr::V6(_)) => {
// IPv6 is not yet supported in the eBPF kernel.
}
(SocketAddr::V4(_), SocketAddr::V6(_)) | (SocketAddr::V6(_), SocketAddr::V4(_)) => {
// Relaying between IPv4 and IPv6 is not supported in the eBPF kernel.
}
}
Ok(())
}
pub fn remove_channel_binding(
&mut self,
client: ClientSocket,
channel_number: ChannelNumber,
peer: PeerSocket,
allocation_port: AllocationPort,
) -> Result<()> {
let client = client.into_socket();
let peer = peer.into_socket();
match (client, peer) {
(SocketAddr::V4(client), SocketAddr::V4(peer)) => {
let client_and_channel =
ClientAndChannelV4::from_socket(client, channel_number.value());
let port_and_peer = PortAndPeerV4::from_socket(peer, allocation_port.value());
self.chan_to_udp_44_map_mut()?.remove(&client_and_channel)?;
self.udp_to_chan_44_map_mut()?.remove(&port_and_peer)?;
}
(SocketAddr::V6(_), SocketAddr::V6(_)) => {
// IPv6 is not yet supported in the eBPF kernel.
}
(SocketAddr::V4(_), SocketAddr::V6(_)) | (SocketAddr::V6(_), SocketAddr::V4(_)) => {
// Relaying between IPv4 and IPv6 is not supported in the eBPF kernel.
}
}
Ok(())
}
pub fn set_config(&mut self, config: Config) -> Result<()> {
self.config_array_mut()?.set(0, config, 0)?;
Ok(())
}
fn chan_to_udp_44_map_mut(
&mut self,
) -> Result<HashMap<&mut MapData, ClientAndChannelV4, PortAndPeerV4>> {
self.hash_map_mut("CHAN_TO_UDP_44")
}
fn udp_to_chan_44_map_mut(
&mut self,
) -> Result<HashMap<&mut MapData, PortAndPeerV4, ClientAndChannelV4>> {
self.hash_map_mut("UDP_TO_CHAN_44")
}
fn config_array_mut(&mut self) -> Result<Array<&mut MapData, Config>> {
self.array_mut("CONFIG")
}
fn hash_map_mut<K, V>(&mut self, name: &'static str) -> Result<HashMap<&mut MapData, K, V>>
where
K: Pod,
V: Pod,
{
let map = self
.ebpf
.map_mut(name)
.with_context(|| format!("Map `{name}` not found"))?;
let map = HashMap::<_, K, V>::try_from(map).context("Failed to convert map")?;
Ok(map)
}
fn array_mut<T>(&mut self, name: &'static str) -> Result<Array<&mut MapData, T>>
where
T: Pod,
{
let map = self
.ebpf
.map_mut(name)
.with_context(|| format!("Array `{name}` not found"))?;
let map = Array::<_, T>::try_from(map).context("Failed to convert array")?;
Ok(map)
}
}

View File

@@ -0,0 +1,42 @@
#![expect(
clippy::unnecessary_wraps,
reason = "Function signatures must align with the Linux impl."
)]
use anyhow::Result;
use ebpf_shared::Config;
use stun_codec::rfc5766::attributes::ChannelNumber;
use crate::{AllocationPort, ClientSocket, PeerSocket};
pub struct Program {}
impl Program {
pub fn try_load(_: &'static str) -> Result<Self> {
Err(anyhow::anyhow!("Platform not supported"))
}
pub fn add_channel_binding_ipv4(
&mut self,
_: ClientSocket,
_: ChannelNumber,
_: PeerSocket,
_: AllocationPort,
) -> Result<()> {
Ok(())
}
pub fn remove_channel_binding_ipv4(
&mut self,
_: ClientSocket,
_: ChannelNumber,
_: PeerSocket,
_: AllocationPort,
) -> Result<()> {
Ok(())
}
pub fn set_config(&mut self, _: Config) -> Result<()> {
Ok(())
}
}

View File

@@ -5,6 +5,7 @@ mod server;
mod sleep;
pub mod auth;
pub mod ebpf;
#[cfg(feature = "proptest")]
#[allow(clippy::unwrap_used)]
pub mod proptest;

View File

@@ -8,7 +8,7 @@ use firezone_logging::{err_with_src, sentry_layer};
use firezone_relay::sockets::Sockets;
use firezone_relay::{
AddressFamily, AllocationPort, ChannelData, ClientSocket, Command, IpStack, PeerSocket, Server,
Sleep, VERSION, sockets,
Sleep, VERSION, ebpf, sockets,
};
use firezone_telemetry::{RELAY_DSN, Telemetry};
use futures::{FutureExt, future};
@@ -22,6 +22,7 @@ use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Poll, ready};
use std::time::{Duration, Instant};
use stun_codec::rfc5766::attributes::ChannelNumber;
use tracing::{Subscriber, level_filters::LevelFilter};
use tracing_core::Dispatch;
use tracing_stackdriver::CloudTraceConfiguration;
@@ -130,6 +131,10 @@ fn main() {
async fn try_main(args: Args) -> Result<()> {
setup_tracing(&args)?;
let ebpf = ebpf::Program::try_load("eth0")
.inspect_err(|e| tracing::info!("Failed to load eBPF TURN router: {e:#}"))
.ok();
let public_addr = match (args.public_ip4_addr, args.public_ip6_addr) {
(Some(ip4), Some(ip6)) => IpStack::Dual { ip4, ip6 },
(Some(ip4), None) => IpStack::Ip4(ip4),
@@ -178,7 +183,7 @@ async fn try_main(args: Args) -> Result<()> {
)?;
channel.connect(NoParams);
let mut eventloop = Eventloop::new(server, channel, public_addr, last_heartbeat_sent)?;
let mut eventloop = Eventloop::new(server, ebpf, channel, public_addr, last_heartbeat_sent)?;
tracing::info!(target: "relay", "Listening for incoming traffic on UDP port {0}", args.listen_port);
@@ -331,6 +336,8 @@ struct Eventloop<R> {
channel: Option<PhoenixChannel<JoinMessage, IngressMessage, (), NoParams>>,
sleep: Sleep,
ebpf: Option<ebpf::Program>,
#[cfg(unix)]
sigterm: tokio::signal::unix::Signal,
shutting_down: bool,
@@ -349,6 +356,7 @@ where
{
fn new(
server: Server<R>,
ebpf: Option<ebpf::Program>,
channel: PhoenixChannel<JoinMessage, IngressMessage, (), NoParams>,
public_address: IpStack,
last_heartbeat_sent: Arc<Mutex<Option<Instant>>>,
@@ -383,6 +391,7 @@ where
stats_log_interval: tokio::time::interval(STATS_LOG_INTERVAL),
last_num_bytes_relayed: 0,
sockets,
ebpf,
buffer: [0u8; MAX_UDP_SIZE],
last_heartbeat_sent,
#[cfg(unix)]
@@ -433,6 +442,36 @@ where
tracing::info!(target: "relay", %port, %family, "Freeing allocation");
}
Command::CreateChannelBinding {
client,
channel_number,
peer,
allocation_port,
} => {
if let Err(e) = self.create_channel_binding_in_ebpf_map(
client,
channel_number,
peer,
allocation_port,
) {
tracing::debug!(target: "relay", %client, "Failed to create channel binding in eBPF map: {e:#}");
}
}
Command::DeleteChannelBinding {
client,
channel_number,
peer,
allocation_port,
} => {
if let Err(e) = self.delete_channel_binding_in_ebpf_map(
client,
channel_number,
peer,
allocation_port,
) {
tracing::debug!(target: "relay", %client, "Failed to delete channel binding in eBPF map: {e:#}");
}
}
}
ready = true;
@@ -595,6 +634,38 @@ where
}
}
fn create_channel_binding_in_ebpf_map(
&mut self,
client: ClientSocket,
channel_number: ChannelNumber,
peer: PeerSocket,
allocation_port: AllocationPort,
) -> Result<()> {
let Some(ebpf) = self.ebpf.as_mut() else {
return Ok(()); // ebPF program not loaded ...
};
ebpf.add_channel_binding(client, channel_number, peer, allocation_port)?;
Ok(())
}
fn delete_channel_binding_in_ebpf_map(
&mut self,
client: ClientSocket,
channel_number: ChannelNumber,
peer: PeerSocket,
allocation_port: AllocationPort,
) -> Result<()> {
let Some(ebpf) = self.ebpf.as_mut() else {
return Ok(()); // ebPF program not loaded ...
};
ebpf.remove_channel_binding(client, channel_number, peer, allocation_port)?;
Ok(())
}
fn handle_portal_event(&mut self, event: phoenix_channel::Event<IngressMessage, ()>) {
match event {
Event::SuccessResponse { res: (), .. } => {}

View File

@@ -110,6 +110,20 @@ pub enum Command {
port: AllocationPort,
family: AddressFamily,
},
CreateChannelBinding {
client: ClientSocket,
channel_number: ChannelNumber,
peer: PeerSocket,
allocation_port: AllocationPort,
},
DeleteChannelBinding {
client: ClientSocket,
channel_number: ChannelNumber,
peer: PeerSocket,
allocation_port: AllocationPort,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
@@ -430,6 +444,14 @@ where
{
tracing::info!(target: "relay", channel = %number.value(), %client, peer = %channel.peer_address, allocation = %channel.allocation, "Channel is now expired");
self.pending_commands
.push_back(Command::DeleteChannelBinding {
client: *client,
channel_number: *number,
peer: channel.peer_address,
allocation_port: channel.allocation,
});
channel.bound = false;
if let Some((cs, n)) = self
.channel_and_client_by_port_and_peer
@@ -707,6 +729,13 @@ where
(channel.allocation, channel.peer_address),
(sender, requested_channel),
);
self.pending_commands
.push_back(Command::CreateChannelBinding {
client: sender,
channel_number: requested_channel,
peer: peer_address,
allocation_port: channel.allocation,
});
tracing::info!(target: "relay", "Refreshed channel binding");
@@ -911,6 +940,7 @@ where
bound: true,
},
);
debug_assert!(existing.is_none());
let existing = self

View File

@@ -0,0 +1,63 @@
#![allow(clippy::unwrap_used)]
use firezone_relay::{AllocationPort, ClientSocket, PeerSocket};
use std::time::Duration;
use tokio::net::UdpSocket;
use ebpf_shared::Config;
use stun_codec::rfc5766::attributes::ChannelNumber;
#[tokio::test]
async fn ping_pong() {
let _guard = firezone_logging::test("trace,mio=off");
let mut program = firezone_relay::ebpf::Program::try_load("lo").unwrap();
// Linux does not set the correct UDP checksum when sending the packet, so our updated checksum in the eBPF code will be wrong and later dropped.
// To make the test work, we therefore need to tell the eBPF program to disable UDP checksumming by just setting it to 0.
program
.set_config(Config {
udp_checksum_enabled: false,
})
.unwrap();
let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let peer = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let client_socket = client.local_addr().unwrap();
let peer_socket = peer.local_addr().unwrap();
let channel_number = ChannelNumber::new(0x4000).unwrap();
let allocation_port = 50000;
program
.add_channel_binding(
ClientSocket::new(client_socket),
channel_number,
PeerSocket::new(peer_socket),
AllocationPort::new(allocation_port),
)
.unwrap();
let msg = b"ping";
let msg_len = msg.len();
let mut buf = [0u8; 512];
let (header, payload) = buf.split_at_mut(4);
payload[..msg_len].copy_from_slice(msg);
let len =
firezone_relay::ChannelData::encode_header_to_slice(channel_number, msg_len as u16, header);
client.send_to(&buf[..len], "127.0.0.1:3478").await.unwrap();
let mut recv_buf = [0u8; 512];
let (len, from) = tokio::time::timeout(Duration::from_secs(1), peer.recv_from(&mut recv_buf))
.await
.unwrap()
.unwrap();
assert_eq!(from.port(), allocation_port);
assert_eq!(&recv_buf[..len], msg);
}

View File

@@ -0,0 +1,936 @@
// #![allow(clippy::unwrap_used)]
// use bytecodec::{DecodeExt, EncodeExt};
// use firezone_relay::{
// AddressFamily, Allocate, AllocationPort, Attribute, Binding, ChannelBind, ChannelData,
// ClientMessage, ClientSocket, Command, IpStack, PeerSocket, Refresh, Server, SOFTWARE,
// };
// use rand::rngs::mock::StepRng;
// use secrecy::SecretString;
// use std::iter;
// use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
// use std::time::{Duration, Instant, SystemTime};
// use stun_codec::rfc5389::attributes::{
// ErrorCode, MessageIntegrity, Nonce, Realm, Username, XorMappedAddress,
// };
// use stun_codec::rfc5389::errors::Unauthorized;
// use stun_codec::rfc5389::methods::BINDING;
// use stun_codec::rfc5766::attributes::{ChannelNumber, Lifetime, XorPeerAddress, XorRelayAddress};
// use stun_codec::rfc5766::methods::{ALLOCATE, CHANNEL_BIND, REFRESH};
// use stun_codec::{Message, MessageClass, MessageDecoder, MessageEncoder, TransactionId};
// use test_strategy::proptest;
// use uuid::Uuid;
// use Output::{CreateAllocation, FreeAllocation};
// #[proptest]
// fn can_answer_stun_request_from_ip4_address(
// #[strategy(firezone_relay::proptest::binding())] request: Binding,
// source: SocketAddrV4,
// public_relay_addr: Ipv4Addr,
// ) {
// let _ = env_logger::try_init();
// let mut server = TestServer::new(public_relay_addr);
// let transaction_id = request.transaction_id();
// server.assert_commands(
// from_client(source, request, Instant::now()),
// [send_message(
// source,
// binding_response(transaction_id, source),
// )],
// );
// }
// #[proptest]
// fn deallocate_once_time_expired(
// #[strategy(firezone_relay::proptest::transaction_id())] transaction_id: TransactionId,
// #[strategy(firezone_relay::proptest::allocation_lifetime())] lifetime: Lifetime,
// #[strategy(firezone_relay::proptest::username_salt())] username_salt: String,
// source: SocketAddrV4,
// public_relay_addr: Ipv4Addr,
// #[strategy(firezone_relay::proptest::nonce())] nonce: Uuid,
// ) {
// let now = Instant::now();
// let mut server = TestServer::new(public_relay_addr).with_nonce(nonce);
// let secret = server.auth_secret();
// server.assert_commands(
// from_client(
// source,
// Allocate::new_authenticated_udp_implicit_ip4(
// transaction_id,
// Some(lifetime.clone()),
// valid_username(&username_salt),
// secret,
// nonce,
// )
// .unwrap(),
// now,
// ),
// [
// create_allocation(49152, AddressFamily::V4),
// send_message(
// source,
// allocate_response(transaction_id, public_relay_addr, 49152, source, &lifetime),
// ),
// ],
// );
// assert_eq!(
// server.server.poll_timeout(),
// Some(now + lifetime.lifetime())
// );
// server.assert_commands(
// forward_time_to(now + lifetime.lifetime() + Duration::from_secs(1)),
// [free_allocation(49152, AddressFamily::V4)],
// );
// }
// #[proptest]
// fn unauthenticated_allocate_triggers_authentication(
// #[strategy(firezone_relay::proptest::transaction_id())] transaction_id: TransactionId,
// #[strategy(firezone_relay::proptest::allocation_lifetime())] lifetime: Lifetime,
// #[strategy(firezone_relay::proptest::username_salt())] username_salt: String,
// source: SocketAddrV4,
// public_relay_addr: Ipv4Addr,
// ) {
// let now = Instant::now();
// // Nonces are generated randomly and we control the randomness in the test, thus this is deterministic.
// let first_nonce = Uuid::from_u128(0x0);
// let mut server = TestServer::new(public_relay_addr);
// let secret = server.auth_secret().to_owned();
// server.assert_commands(
// from_client(
// source,
// Allocate::new_unauthenticated_udp(transaction_id, Some(lifetime.clone())),
// now,
// ),
// [send_message(
// source,
// unauthorized_allocate_response(transaction_id, first_nonce),
// )],
// );
// server.assert_commands(
// from_client(
// source,
// Allocate::new_authenticated_udp_implicit_ip4(
// transaction_id,
// Some(lifetime.clone()),
// valid_username(&username_salt),
// &secret,
// first_nonce,
// )
// .unwrap(),
// now,
// ),
// [
// create_allocation(49152, AddressFamily::V4),
// send_message(
// source,
// allocate_response(transaction_id, public_relay_addr, 49152, source, &lifetime),
// ),
// ],
// );
// assert_eq!(
// server.server.poll_timeout(),
// Some(now + lifetime.lifetime())
// );
// }
// #[proptest]
// fn when_refreshed_in_time_allocation_does_not_expire(
// #[strategy(firezone_relay::proptest::transaction_id())] allocate_transaction_id: TransactionId,
// #[strategy(firezone_relay::proptest::transaction_id())] refresh_transaction_id: TransactionId,
// #[strategy(firezone_relay::proptest::allocation_lifetime())] allocate_lifetime: Lifetime,
// #[strategy(firezone_relay::proptest::allocation_lifetime())] refresh_lifetime: Lifetime,
// #[strategy(firezone_relay::proptest::username_salt())] username_salt: String,
// source: SocketAddrV4,
// public_relay_addr: Ipv4Addr,
// #[strategy(firezone_relay::proptest::nonce())] nonce: Uuid,
// ) {
// let now = Instant::now();
// let mut server = TestServer::new(public_relay_addr).with_nonce(nonce);
// let secret = server.auth_secret().to_owned();
// let first_wake = now + allocate_lifetime.lifetime();
// server.assert_commands(
// from_client(
// source,
// Allocate::new_authenticated_udp_implicit_ip4(
// allocate_transaction_id,
// Some(allocate_lifetime.clone()),
// valid_username(&username_salt),
// &secret,
// nonce,
// )
// .unwrap(),
// now,
// ),
// [
// create_allocation(49152, AddressFamily::V4),
// send_message(
// source,
// allocate_response(
// allocate_transaction_id,
// public_relay_addr,
// 49152,
// source,
// &allocate_lifetime,
// ),
// ),
// ],
// );
// assert_eq!(server.server.poll_timeout(), Some(first_wake));
// // Forward time
// let now = now + allocate_lifetime.lifetime() / 2;
// let second_wake = now + refresh_lifetime.lifetime();
// server.assert_commands(
// from_client(
// source,
// Refresh::new(
// refresh_transaction_id,
// Some(refresh_lifetime.clone()),
// valid_username(&username_salt),
// &secret,
// nonce,
// )
// .unwrap(),
// now,
// ),
// [send_message(
// source,
// refresh_response(refresh_transaction_id, refresh_lifetime.clone()),
// )],
// );
// assert_eq!(server.server.poll_timeout(), Some(second_wake));
// // The allocation MUST NOT be expired 1 sec before its refresh lifetime.
// // Note that depending on how the lifetimes were generated, this may still be before the initial allocation lifetime.
// // This is okay because lifetimes do not roll over, i.e. a refresh is not "added" to the initial lifetime but the allocation's lifetime is simply computed from now + requested lifetime of the refresh request.
// server.assert_commands(
// forward_time_to(now + refresh_lifetime.lifetime() - Duration::from_secs(1)),
// [],
// );
// }
// #[proptest]
// fn when_receiving_lifetime_0_for_existing_allocation_then_delete(
// #[strategy(firezone_relay::proptest::transaction_id())] allocate_transaction_id: TransactionId,
// #[strategy(firezone_relay::proptest::transaction_id())] refresh_transaction_id: TransactionId,
// #[strategy(firezone_relay::proptest::allocation_lifetime())] allocate_lifetime: Lifetime,
// #[strategy(firezone_relay::proptest::username_salt())] username_salt: String,
// source: SocketAddrV4,
// public_relay_addr: Ipv4Addr,
// #[strategy(firezone_relay::proptest::nonce())] nonce: Uuid,
// ) {
// let now = Instant::now();
// let mut server = TestServer::new(public_relay_addr).with_nonce(nonce);
// let secret = server.auth_secret().to_owned();
// let first_wake = now + allocate_lifetime.lifetime();
// server.assert_commands(
// from_client(
// source,
// Allocate::new_authenticated_udp_implicit_ip4(
// allocate_transaction_id,
// Some(allocate_lifetime.clone()),
// valid_username(&username_salt),
// &secret,
// nonce,
// )
// .unwrap(),
// now,
// ),
// [
// create_allocation(49152, AddressFamily::V4),
// send_message(
// source,
// allocate_response(
// allocate_transaction_id,
// public_relay_addr,
// 49152,
// source,
// &allocate_lifetime,
// ),
// ),
// ],
// );
// assert_eq!(server.server.poll_timeout(), Some(first_wake));
// // Forward time
// let now = now + allocate_lifetime.lifetime() / 2;
// server.assert_commands(
// from_client(
// source,
// Refresh::new(
// refresh_transaction_id,
// Some(Lifetime::new(Duration::ZERO).unwrap()),
// valid_username(&username_salt),
// &secret,
// nonce,
// )
// .unwrap(),
// now,
// ),
// [
// free_allocation(49152, AddressFamily::V4),
// send_message(
// source,
// refresh_response(
// refresh_transaction_id,
// Lifetime::new(Duration::ZERO).unwrap(),
// ),
// ),
// ],
// );
// // Assert that forwarding time does not produce an obsolete event.
// server.assert_commands(forward_time_to(first_wake + Duration::from_secs(1)), []);
// }
// #[proptest]
// fn freeing_allocation_clears_all_channels(
// #[strategy(firezone_relay::proptest::transaction_id())] allocate_transaction_id: TransactionId,
// #[strategy(firezone_relay::proptest::transaction_id())]
// channel_bind_transaction_id: TransactionId,
// #[strategy(firezone_relay::proptest::transaction_id())] refresh_transaction_id: TransactionId,
// #[strategy(firezone_relay::proptest::channel_number())] channel: ChannelNumber,
// #[strategy(firezone_relay::proptest::username_salt())] username_salt: String,
// source: SocketAddr,
// peer: SocketAddrV4,
// public_relay_addr: Ipv4Addr,
// #[strategy(firezone_relay::proptest::nonce())] nonce: Uuid,
// ) {
// let now = Instant::now();
// let _ = env_logger::try_init();
// let mut server = TestServer::new(public_relay_addr).with_nonce(nonce);
// let secret = server.auth_secret().to_owned();
// let _ = server.server.handle_client_message(
// ClientMessage::Allocate(
// Allocate::new_authenticated_udp_implicit_ip4(
// allocate_transaction_id,
// None,
// valid_username(&username_salt),
// &secret,
// nonce,
// )
// .unwrap(),
// ),
// ClientSocket::new(source),
// now,
// );
// let _ = server.server.handle_client_message(
// ClientMessage::ChannelBind(
// ChannelBind::new(
// channel_bind_transaction_id,
// channel,
// XorPeerAddress::new(peer.into()),
// valid_username(&username_salt),
// &secret,
// nonce,
// )
// .unwrap(),
// ),
// ClientSocket::new(source),
// now,
// );
// let _ = server.server.handle_client_message(
// ClientMessage::Refresh(
// Refresh::new(
// refresh_transaction_id,
// Some(Lifetime::new(Duration::ZERO).unwrap()),
// valid_username(&username_salt),
// &secret,
// nonce,
// )
// .unwrap(),
// ),
// ClientSocket::new(source),
// now,
// );
// assert_eq!(server.server.num_active_channels(), 0);
// }
// // #[test]
// // fn server_waits_for_5_minutes_before_allowing_reuse_of_channel_number_after_expiry() {
// // // todo!()
// // }
// #[proptest]
// fn ping_pong_relay(
// #[strategy(firezone_relay::proptest::transaction_id())] allocate_transaction_id: TransactionId,
// #[strategy(firezone_relay::proptest::transaction_id())]
// channel_bind_transaction_id: TransactionId,
// #[strategy(firezone_relay::proptest::username_salt())] username_salt: String,
// source: SocketAddrV4,
// peer: SocketAddrV4,
// public_relay_addr: Ipv4Addr,
// peer_to_client_ping: [u8; 32],
// #[strategy(firezone_relay::proptest::channel_data())] client_to_peer_ping: ChannelData<'static>,
// #[strategy(firezone_relay::proptest::nonce())] nonce: Uuid,
// ) {
// let now = Instant::now();
// let _ = env_logger::try_init();
// let mut server = TestServer::new(public_relay_addr).with_nonce(nonce);
// let secret = server.auth_secret().to_owned();
// let lifetime = Lifetime::new(Duration::from_secs(60 * 60)).unwrap(); // Lifetime longer than channel expiry
// server.assert_commands(
// from_client(
// source,
// Allocate::new_authenticated_udp_implicit_ip4(
// allocate_transaction_id,
// Some(lifetime.clone()),
// valid_username(&username_salt),
// &secret,
// nonce,
// )
// .unwrap(),
// now,
// ),
// [
// create_allocation(49152, AddressFamily::V4),
// send_message(
// source,
// allocate_response(
// allocate_transaction_id,
// public_relay_addr,
// 49152,
// source,
// &lifetime,
// ),
// ),
// ],
// );
// assert_eq!(
// server.server.poll_timeout(),
// Some(now + lifetime.lifetime())
// );
// let now = now + Duration::from_secs(1);
// server.assert_commands(
// from_client(
// source,
// ChannelBind::new(
// channel_bind_transaction_id,
// client_to_peer_ping.channel(),
// XorPeerAddress::new(peer.into()),
// valid_username(&username_salt),
// &secret,
// nonce,
// )
// .unwrap(),
// now,
// ),
// [send_message(
// source,
// channel_bind_response(channel_bind_transaction_id),
// )],
// );
// assert_eq!(
// server.server.poll_timeout(),
// Some(now + Duration::from_secs(60 * 10))
// );
// let now = now + Duration::from_secs(1);
// let maybe_forward = server.server.handle_client_input(
// client_to_peer_ping.as_msg(),
// ClientSocket::new(source.into()),
// now,
// );
// assert_eq!(
// maybe_forward,
// Some((AllocationPort::new(49152), PeerSocket::new(peer.into())))
// );
// let maybe_forward = server.server.handle_peer_traffic(
// peer_to_client_ping.as_slice(),
// PeerSocket::new(peer.into()),
// AllocationPort::new(49152),
// );
// assert_eq!(
// maybe_forward,
// Some((
// ClientSocket::new(source.into()),
// client_to_peer_ping.channel()
// ))
// );
// }
// #[proptest]
// fn allows_rebind_channel_after_expiry(
// #[strategy(firezone_relay::proptest::transaction_id())] allocate_transaction_id: TransactionId,
// #[strategy(firezone_relay::proptest::transaction_id())]
// channel_bind_transaction_id: TransactionId,
// #[strategy(firezone_relay::proptest::transaction_id())]
// channel_bind_2_transaction_id: TransactionId,
// #[strategy(firezone_relay::proptest::username_salt())] username_salt: String,
// #[strategy(firezone_relay::proptest::channel_number())] channel: ChannelNumber,
// source: SocketAddrV4,
// peer: SocketAddrV4,
// peer2: SocketAddrV4,
// public_relay_addr: Ipv4Addr,
// #[strategy(firezone_relay::proptest::nonce())] nonce: Uuid,
// ) {
// let now = Instant::now();
// let _ = env_logger::try_init();
// let mut server = TestServer::new(public_relay_addr).with_nonce(nonce);
// let secret = server.auth_secret().to_owned();
// let lifetime = Lifetime::new(Duration::from_secs(60 * 60)).unwrap(); // Lifetime longer than channel expiry
// server.assert_commands(
// from_client(
// source,
// Allocate::new_authenticated_udp_implicit_ip4(
// allocate_transaction_id,
// Some(lifetime.clone()),
// valid_username(&username_salt),
// &secret,
// nonce,
// )
// .unwrap(),
// now,
// ),
// [
// create_allocation(49152, AddressFamily::V4),
// send_message(
// source,
// allocate_response(
// allocate_transaction_id,
// public_relay_addr,
// 49152,
// source,
// &lifetime,
// ),
// ),
// ],
// );
// assert_eq!(
// server.server.poll_timeout(),
// Some(now + lifetime.lifetime())
// );
// let now = now + Duration::from_secs(1);
// server.assert_commands(
// from_client(
// source,
// ChannelBind::new(
// channel_bind_transaction_id,
// channel,
// XorPeerAddress::new(peer.into()),
// valid_username(&username_salt),
// &secret,
// nonce,
// )
// .unwrap(),
// now,
// ),
// [send_message(
// source,
// channel_bind_response(channel_bind_transaction_id),
// )],
// );
// let channel_expiry = now + Duration::from_secs(60 * 10);
// let channel_rebind = channel_expiry + Duration::from_secs(60 * 5);
// assert_eq!(server.server.poll_timeout(), Some(channel_expiry));
// let now = now + Duration::from_secs(60 * 10 + 1);
// server.server.handle_timeout(now);
// assert_eq!(server.server.poll_timeout(), Some(channel_rebind));
// let now = now + Duration::from_secs(60 * 5 + 1);
// server.server.handle_timeout(now);
// let now = now + Duration::from_secs(1);
// server.assert_commands(
// from_client(
// source,
// ChannelBind::new(
// channel_bind_2_transaction_id,
// channel,
// XorPeerAddress::new(peer2.into()),
// valid_username(&username_salt),
// &secret,
// nonce,
// )
// .unwrap(),
// now,
// ),
// [send_message(
// source,
// channel_bind_response(channel_bind_2_transaction_id),
// )],
// );
// assert_eq!(
// server.server.poll_timeout(),
// Some(now + Duration::from_secs(60 * 10)) // For channel expiry
// );
// }
// #[proptest]
// fn ping_pong_ip6_relay(
// #[strategy(firezone_relay::proptest::transaction_id())] allocate_transaction_id: TransactionId,
// #[strategy(firezone_relay::proptest::transaction_id())]
// channel_bind_transaction_id: TransactionId,
// #[strategy(firezone_relay::proptest::username_salt())] username_salt: String,
// #[strategy(firezone_relay::proptest::channel_number())] channel: ChannelNumber,
// source: SocketAddrV6,
// peer: SocketAddrV6,
// public_relay_ip4_addr: Ipv4Addr,
// public_relay_ip6_addr: Ipv6Addr,
// peer_to_client_ping: [u8; 32],
// mut client_to_peer_ping: [u8; 36],
// #[strategy(firezone_relay::proptest::nonce())] nonce: Uuid,
// ) {
// let now = Instant::now();
// let _ = env_logger::try_init();
// let mut server =
// TestServer::new((public_relay_ip4_addr, public_relay_ip6_addr)).with_nonce(nonce);
// let secret = server.auth_secret().to_owned();
// let lifetime = Lifetime::new(Duration::from_secs(60 * 60)).unwrap(); // Lifetime longer than channel expiry
// server.assert_commands(
// from_client(
// source,
// Allocate::new_authenticated_udp_ip6(
// allocate_transaction_id,
// Some(lifetime.clone()),
// valid_username(&username_salt),
// &secret,
// nonce,
// )
// .unwrap(),
// now,
// ),
// [
// create_allocation(49152, AddressFamily::V6),
// send_message(
// source,
// allocate_response(
// allocate_transaction_id,
// public_relay_ip6_addr,
// 49152,
// source,
// &lifetime,
// ),
// ),
// ],
// );
// assert_eq!(
// server.server.poll_timeout(),
// Some(now + lifetime.lifetime())
// );
// let now = now + Duration::from_secs(1);
// server.assert_commands(
// from_client(
// source,
// ChannelBind::new(
// channel_bind_transaction_id,
// channel,
// XorPeerAddress::new(peer.into()),
// valid_username(&username_salt),
// &secret,
// nonce,
// )
// .unwrap(),
// now,
// ),
// [send_message(
// source,
// channel_bind_response(channel_bind_transaction_id),
// )],
// );
// assert_eq!(
// server.server.poll_timeout(),
// Some(now + Duration::from_secs(60 * 10))
// );
// let now = now + Duration::from_secs(1);
// ChannelData::encode_header_to_slice(channel, 32, &mut client_to_peer_ping[..4]);
// let maybe_forward = server.server.handle_client_input(
// client_to_peer_ping.as_slice(),
// ClientSocket::new(source.into()),
// now,
// );
// assert_eq!(
// maybe_forward,
// Some((AllocationPort::new(49152), PeerSocket::new(peer.into())))
// );
// let maybe_forward = server.server.handle_peer_traffic(
// peer_to_client_ping.as_slice(),
// PeerSocket::new(peer.into()),
// AllocationPort::new(49152),
// );
// assert_eq!(
// maybe_forward,
// Some((ClientSocket::new(source.into()), channel))
// );
// }
// struct TestServer {
// server: Server<StepRng>,
// }
// impl TestServer {
// fn new(relay_public_addr: impl Into<IpStack>) -> Self {
// Self {
// server: Server::new(relay_public_addr, StepRng::new(0, 0), 3478, 49152..=65535),
// }
// }
// fn with_nonce(mut self, nonce: Uuid) -> Self {
// self.server.add_nonce(nonce);
// self
// }
// fn auth_secret(&self) -> &SecretString {
// self.server.auth_secret()
// }
// fn assert_commands<const N: usize>(&mut self, input: Input, output: [Output; N]) {
// match input {
// Input::Client(sender, message, now) => {
// self.server.handle_client_message(message, sender, now);
// }
// Input::Time(now) => {
// self.server.handle_timeout(now);
// }
// }
// for expected_output in output {
// let Some(actual_output) = self.server.next_command() else {
// let msg = match expected_output {
// Output::SendMessage((recipient, msg)) => {
// format!("to send message {:?} to {recipient}", msg)
// }
// CreateAllocation(port, family) => {
// format!("to create allocation on port {port} for address family {family}")
// }
// FreeAllocation(port, family) => {
// format!("to free allocation on port {port} for address family {family}")
// }
// };
// panic!("No commands produced but expected {msg}");
// };
// match (expected_output, actual_output) {
// (
// Output::SendMessage((to, mut message)),
// Command::SendMessage { payload, recipient },
// ) => {
// let sent_message = parse_message(&payload);
// // In order to avoid simulating authentication, we copy the MessageIntegrity attribute.
// if let Some(mi) = sent_message.get_attribute::<MessageIntegrity>() {
// message.add_attribute(mi.clone());
// }
// let expected_bytes = MessageEncoder::new()
// .encode_into_bytes(message.clone())
// .unwrap();
// if expected_bytes != payload {
// let expected_message = format!("{:?}", message);
// let actual_message = format!("{:?}", sent_message);
// difference::assert_diff!(&expected_message, &actual_message, "\n", 0);
// }
// assert_eq!(recipient, to);
// }
// (
// CreateAllocation(expected_port, expected_family),
// Command::CreateAllocation {
// port: actual_port,
// family: actual_family,
// },
// ) => {
// assert_eq!(expected_port, actual_port);
// assert_eq!(expected_family, actual_family);
// }
// (
// FreeAllocation(port, family),
// Command::FreeAllocation {
// port: actual_port,
// family: actual_family,
// },
// ) => {
// assert_eq!(port, actual_port);
// assert_eq!(family, actual_family);
// }
// (expected, actual) => panic!("Unhandled combination: {expected:?} {actual:?}"),
// }
// }
// let remaining_commands = iter::from_fn(|| self.server.next_command()).collect::<Vec<_>>();
// assert_eq!(remaining_commands, vec![])
// }
// }
// fn valid_username(salt: &str) -> Username {
// let now_unix = SystemTime::now()
// .duration_since(SystemTime::UNIX_EPOCH)
// .unwrap()
// .as_secs();
// let expiry = now_unix + 1000;
// Username::new(format!("{expiry}:{salt}")).unwrap()
// }
// fn binding_response(
// transaction_id: TransactionId,
// address: impl Into<SocketAddr>,
// ) -> Message<Attribute> {
// let mut message =
// Message::<Attribute>::new(MessageClass::SuccessResponse, BINDING, transaction_id);
// message.add_attribute(SOFTWARE.clone());
// message.add_attribute(XorMappedAddress::new(address.into()));
// message
// }
// fn allocate_response(
// transaction_id: TransactionId,
// public_relay_addr: impl Into<IpAddr>,
// port: u16,
// source: impl Into<SocketAddr>,
// lifetime: &Lifetime,
// ) -> Message<Attribute> {
// let mut message =
// Message::<Attribute>::new(MessageClass::SuccessResponse, ALLOCATE, transaction_id);
// message.add_attribute(SOFTWARE.clone());
// message.add_attribute(XorRelayAddress::new(SocketAddr::new(
// public_relay_addr.into(),
// port,
// )));
// message.add_attribute(XorMappedAddress::new(source.into()));
// message.add_attribute(lifetime.clone());
// message
// }
// fn unauthorized_allocate_response(
// transaction_id: TransactionId,
// nonce: Uuid,
// ) -> Message<Attribute> {
// let mut message =
// Message::<Attribute>::new(MessageClass::ErrorResponse, ALLOCATE, transaction_id);
// message.add_attribute(SOFTWARE.clone());
// message.add_attribute(ErrorCode::from(Unauthorized));
// message.add_attribute(Realm::new("firezone".to_owned()).unwrap());
// message.add_attribute(Nonce::new(nonce.as_hyphenated().to_string()).unwrap());
// message
// }
// fn refresh_response(transaction_id: TransactionId, lifetime: Lifetime) -> Message<Attribute> {
// let mut message =
// Message::<Attribute>::new(MessageClass::SuccessResponse, REFRESH, transaction_id);
// message.add_attribute(SOFTWARE.clone());
// message.add_attribute(lifetime);
// message
// }
// fn channel_bind_response(transaction_id: TransactionId) -> Message<Attribute> {
// let mut message =
// Message::<Attribute>::new(MessageClass::SuccessResponse, CHANNEL_BIND, transaction_id);
// message.add_attribute(SOFTWARE.clone());
// message
// }
// fn parse_message(message: &[u8]) -> Message<Attribute> {
// MessageDecoder::new()
// .decode_from_bytes(message)
// .unwrap()
// .unwrap()
// }
// enum Input<'a> {
// Client(ClientSocket, ClientMessage<'a>, Instant),
// Time(Instant),
// }
// fn from_client<'a>(
// from: impl Into<SocketAddr>,
// message: impl Into<ClientMessage<'a>>,
// now: Instant,
// ) -> Input<'a> {
// Input::Client(ClientSocket::new(from.into()), message.into(), now)
// }
// fn forward_time_to<'a>(when: Instant) -> Input<'a> {
// Input::Time(when)
// }
// #[derive(Debug)]
// enum Output {
// SendMessage((ClientSocket, Message<Attribute>)),
// CreateAllocation(AllocationPort, AddressFamily),
// FreeAllocation(AllocationPort, AddressFamily),
// }
// fn create_allocation(port: u16, fam: AddressFamily) -> Output {
// Output::CreateAllocation(AllocationPort::new(port), fam)
// }
// fn free_allocation(port: u16, fam: AddressFamily) -> Output {
// Output::FreeAllocation(AllocationPort::new(port), fam)
// }
// fn send_message(source: impl Into<SocketAddr>, message: Message<Attribute>) -> Output {
// Output::SendMessage((ClientSocket::new(source.into()), message))
// }