feat(relay): record metrics about bytes relayed via eBPF (#8556)

Perf events are designed to be an extremely efficient way of
transferring data from an eBPF kernel to the user-space program. In
order to monitor, how much traffic we are actually relaying via eBPF, we
introduce a dedicated `STATS` map that is a `PerfEventArray`.

The events from that array are read asynchronously in user-space and fed
into our OTEL metrics. They will show up in our Google Cloud metrics as
`data_relayed_ebpf_bytes`. We already have a metric for the total
relayed bytes. That counter is renamed to `data_relayed_userspace_bytes`
so we can clearly differentiate the two.
This commit is contained in:
Thomas Eizinger
2025-04-01 08:57:31 +11:00
committed by GitHub
parent b51a68def0
commit 1d0ecf94b8
11 changed files with 254 additions and 67 deletions

108
rust/Cargo.lock generated
View File

@@ -219,12 +219,23 @@ version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20cd0e2e25ea8e5f7e9df04578dc6cf5c83577fd09b1a46aaf5c85e1c33f2a7e"
dependencies = [
"event-listener",
"event-listener 5.3.1",
"event-listener-strategy",
"futures-core",
"pin-project-lite",
]
[[package]]
name = "async-channel"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35"
dependencies = [
"concurrent-queue",
"event-listener 2.5.3",
"futures-core",
]
[[package]]
name = "async-channel"
version = "2.3.1"
@@ -261,6 +272,21 @@ dependencies = [
"futures-lite",
]
[[package]]
name = "async-global-executor"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05b1b633a2115cd122d73b955eadd9916c18c8f510ec9cd1686404c60ad1c29c"
dependencies = [
"async-channel 2.3.1",
"async-executor",
"async-io",
"async-lock",
"blocking",
"futures-lite",
"once_cell",
]
[[package]]
name = "async-io"
version = "2.3.4"
@@ -286,7 +312,7 @@ version = "3.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18"
dependencies = [
"event-listener",
"event-listener 5.3.1",
"event-listener-strategy",
"pin-project-lite",
]
@@ -297,14 +323,14 @@ version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63255f1dc2381611000436537bbedfe83183faa303a5a0edaf191edef06526bb"
dependencies = [
"async-channel",
"async-channel 2.3.1",
"async-io",
"async-lock",
"async-signal",
"async-task",
"blocking",
"cfg-if",
"event-listener",
"event-listener 5.3.1",
"futures-lite",
"rustix",
"tracing",
@@ -339,6 +365,33 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "async-std"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c634475f29802fde2b8f0b505b1bd00dfe4df7d4a000f0b36f7671197d5c3615"
dependencies = [
"async-channel 1.9.0",
"async-global-executor",
"async-io",
"async-lock",
"async-process",
"crossbeam-utils",
"futures-channel",
"futures-core",
"futures-io",
"futures-lite",
"gloo-timers",
"kv-log-macro",
"log",
"memchr",
"once_cell",
"pin-project-lite",
"pin-utils",
"slab",
"wasm-bindgen-futures",
]
[[package]]
name = "async-stream"
version = "0.3.6"
@@ -725,7 +778,7 @@ version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "703f41c54fc768e63e091340b424302bb1c29ef4aa0c7f10fe849dfb114d29ea"
dependencies = [
"async-channel",
"async-channel 2.3.1",
"async-task",
"futures-io",
"futures-lite",
@@ -2032,6 +2085,12 @@ dependencies = [
"etherparse",
]
[[package]]
name = "event-listener"
version = "2.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
[[package]]
name = "event-listener"
version = "5.3.1"
@@ -2049,7 +2108,7 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f214dc438f977e6d4e3500aaa277f5ad94ca83fbbd9b1a15713ce2344ccc5a1"
dependencies = [
"event-listener",
"event-listener 5.3.1",
"pin-project-lite",
]
@@ -2923,6 +2982,18 @@ version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2"
[[package]]
name = "gloo-timers"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994"
dependencies = [
"futures-channel",
"futures-core",
"js-sys",
"wasm-bindgen",
]
[[package]]
name = "gobject-sys"
version = "0.18.0"
@@ -3647,6 +3718,15 @@ dependencies = [
"selectors",
]
[[package]]
name = "kv-log-macro"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f"
dependencies = [
"log",
]
[[package]]
name = "l4-tcp-dns-server"
version = "0.1.0"
@@ -3771,6 +3851,9 @@ name = "log"
version = "0.4.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30bde2b3dc3671ae49d8e2e9f044c7c005836e7a023ee57cffa25ab82764bb9e"
dependencies = [
"value-bag",
]
[[package]]
name = "loom"
@@ -3937,7 +4020,7 @@ dependencies = [
"crossbeam-channel",
"crossbeam-epoch",
"crossbeam-utils",
"event-listener",
"event-listener 5.3.1",
"futures-util",
"loom",
"parking_lot",
@@ -4578,6 +4661,7 @@ version = "0.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2c627d9f4c9cdc1f21a29ee4bfbd6028fcb8bcf2a857b43f3abdf72c9c862f3"
dependencies = [
"async-std",
"async-trait",
"futures-channel",
"futures-executor",
@@ -7855,6 +7939,12 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "value-bag"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ef4c4aa54d5d05a279399bfa921ec387b7aba77caf7a682ae8d86785b8fdad2"
[[package]]
name = "version-compare"
version = "0.2.0"
@@ -8875,7 +8965,7 @@ dependencies = [
"async-recursion",
"async-trait",
"enumflags2",
"event-listener",
"event-listener 5.3.1",
"futures-core",
"futures-sink",
"futures-util",
@@ -8913,7 +9003,7 @@ dependencies = [
"async-trait",
"blocking",
"enumflags2",
"event-listener",
"event-listener 5.3.1",
"futures-core",
"futures-lite",
"hex",

View File

@@ -228,6 +228,7 @@ deny = [
# Certain crates/versions that will be skipped when doing duplicate detection.
skip = [
"async-channel",
"base64",
"bitflags",
"core-foundation",
@@ -236,6 +237,7 @@ skip = [
"derive_more",
"dirs",
"dirs-sys",
"event-listener",
"getrandom",
"hashbrown",
"heck",

View File

@@ -219,6 +219,23 @@ impl Default for Config {
}
}
#[repr(C)]
#[derive(Clone, Copy)]
#[cfg_attr(feature = "std", derive(Debug))]
pub struct StatsEvent {
pub relayed_data: u64,
}
impl StatsEvent {
#[cfg(feature = "std")]
pub fn from_bytes(bytes: &[u8]) -> Option<Self> {
let (chunk, _) = bytes.split_first_chunk()?;
let relayed_data = u64::from_ne_bytes(*chunk);
Some(Self { relayed_data })
}
}
#[cfg(all(feature = "std", target_os = "linux"))]
mod userspace {
use super::*;

View File

@@ -8,6 +8,7 @@ pub enum Error {
Ipv4PacketWithOptions,
NotAChannelDataMessage,
BadChannelDataLength,
NoChannelBinding,
}
impl Error {
@@ -17,6 +18,7 @@ impl Error {
Error::Ipv4PacketWithOptions => xdp_action::XDP_PASS,
Error::BadChannelDataLength => xdp_action::XDP_DROP,
Error::NotAChannelDataMessage => xdp_action::XDP_PASS,
Error::NoChannelBinding => xdp_action::XDP_PASS,
}
}
}
@@ -28,6 +30,7 @@ impl aya_log_ebpf::WriteToBuf for Error {
Error::Ipv4PacketWithOptions => "IPv4 packet has options",
Error::NotAChannelDataMessage => "Not a channel data message",
Error::BadChannelDataLength => "Channel data length does not match packet length",
Error::NoChannelBinding => "No channel binding",
};
msg.write(buf)

View File

@@ -33,6 +33,7 @@ mod ip4;
mod ip6;
mod move_headers;
mod slice_mut_at;
mod stats;
mod udp;
/// Channel mappings from an IPv4 socket + channel number to an IPv4 socket + port.
@@ -90,6 +91,7 @@ fn try_handle_turn_ipv4(ctx: &XdpContext) -> Result<u32, Error> {
}
let udp = Udp::parse(ctx, Ipv4Hdr::LEN)?; // TODO: Change the API so we parse the UDP header _from_ the ipv4 struct?
let udp_payload_len = udp.payload_len();
trace!(
ctx,
@@ -98,17 +100,19 @@ fn try_handle_turn_ipv4(ctx: &XdpContext) -> Result<u32, Error> {
udp.src(),
ipv4.dst(),
udp.dst(),
udp.len()
udp_payload_len
);
if config::allocation_range().contains(&udp.dst()) {
let action = try_handle_ipv4_udp_to_channel_data(ctx, ipv4, udp)?;
stats::emit_data_relayed(ctx, udp_payload_len);
return Ok(action);
}
if udp.dst() == 3478 {
let action = try_handle_ipv4_channel_data_to_udp(ctx, ipv4, udp)?;
stats::emit_data_relayed(ctx, udp_payload_len);
return Ok(action);
}
@@ -125,19 +129,9 @@ fn try_handle_ipv4_channel_data_to_udp(
let cd = ChannelData::parse(ctx, Ipv4Hdr::LEN)?;
// SAFETY: ???
let maybe_peer =
unsafe { CHAN_TO_UDP_44.get(&ClientAndChannelV4::new(ipv4.src(), udp.src(), cd.number())) };
let Some(port_and_peer) = maybe_peer else {
debug!(
ctx,
"No channel binding from {:i}:{} for channel {}",
ipv4.src(),
udp.src(),
cd.number(),
);
return Ok(xdp_action::XDP_PASS);
};
let port_and_peer =
unsafe { CHAN_TO_UDP_44.get(&ClientAndChannelV4::new(ipv4.src(), udp.src(), cd.number())) }
.ok_or(Error::NoChannelBinding)?;
let new_src = ipv4.dst(); // The IP we received the packet on will be the new source IP.
let new_ipv4_total_len = ipv4.total_len() - CdHdr::LEN as u16;
@@ -162,19 +156,9 @@ fn try_handle_ipv4_udp_to_channel_data(
ipv4: Ip4,
udp: Udp,
) -> Result<u32, Error> {
let maybe_client =
unsafe { UDP_TO_CHAN_44.get(&PortAndPeerV4::new(ipv4.src(), udp.dst(), udp.src())) };
let Some(client_and_channel) = maybe_client else {
debug!(
ctx,
"No channel binding from {:i}:{} on allocation {}",
ipv4.src(),
udp.src(),
udp.dst(),
);
return Ok(xdp_action::XDP_PASS);
};
let client_and_channel =
unsafe { UDP_TO_CHAN_44.get(&PortAndPeerV4::new(ipv4.src(), udp.dst(), udp.src())) }
.ok_or(Error::NoChannelBinding)?;
let new_src = ipv4.dst(); // The IP we received the packet on will be the new source IP.
let new_ipv4_total_len = ipv4.total_len() + CdHdr::LEN as u16;
@@ -208,6 +192,8 @@ fn try_handle_turn_ipv6(ctx: &XdpContext) -> Result<u32, Error> {
}
let udp = Udp::parse(ctx, Ipv6Hdr::LEN)?; // TODO: Change the API so we parse the UDP header _from_ the ipv6 struct?
let udp_payload_len = udp.payload_len();
trace!(
ctx,
"New packet from {:i}:{} for {:i}:{} with UDP payload {}",
@@ -215,17 +201,19 @@ fn try_handle_turn_ipv6(ctx: &XdpContext) -> Result<u32, Error> {
udp.src(),
ipv6.dst(),
udp.dst(),
udp.len()
udp_payload_len
);
if config::allocation_range().contains(&udp.dst()) {
let action = try_handle_ipv6_udp_to_channel_data(ctx, ipv6, udp)?;
stats::emit_data_relayed(ctx, udp_payload_len);
return Ok(action);
}
if udp.dst() == 3478 {
let action = try_handle_ipv6_channel_data_to_udp(ctx, ipv6, udp)?;
stats::emit_data_relayed(ctx, udp_payload_len);
return Ok(action);
}
@@ -238,19 +226,9 @@ fn try_handle_ipv6_udp_to_channel_data(
ipv6: Ip6,
udp: Udp,
) -> Result<u32, Error> {
let maybe_client =
unsafe { UDP_TO_CHAN_66.get(&PortAndPeerV6::new(ipv6.src(), udp.dst(), udp.src())) };
let Some(client_and_channel) = maybe_client else {
debug!(
ctx,
"No channel binding from {:i}:{} on allocation {}",
ipv6.src(),
udp.src(),
udp.dst(),
);
return Ok(xdp_action::XDP_PASS);
};
let client_and_channel =
unsafe { UDP_TO_CHAN_66.get(&PortAndPeerV6::new(ipv6.src(), udp.dst(), udp.src())) }
.ok_or(Error::NoChannelBinding)?;
let new_src = ipv6.dst(); // The IP we received the packet on will be the new source IP.
let new_ipv6_total_len = ipv6.payload_len() + CdHdr::LEN as u16;
@@ -283,19 +261,9 @@ fn try_handle_ipv6_channel_data_to_udp(
let cd = ChannelData::parse(ctx, Ipv6Hdr::LEN)?;
// SAFETY: ???
let maybe_peer =
unsafe { CHAN_TO_UDP_66.get(&ClientAndChannelV6::new(ipv6.src(), udp.src(), cd.number())) };
let Some(port_and_peer) = maybe_peer else {
debug!(
ctx,
"No channel binding from {:i}:{} for channel {}",
ipv6.src(),
udp.src(),
cd.number(),
);
return Ok(xdp_action::XDP_PASS);
};
let port_and_peer =
unsafe { CHAN_TO_UDP_66.get(&ClientAndChannelV6::new(ipv6.src(), udp.src(), cd.number())) }
.ok_or(Error::NoChannelBinding)?;
let new_src = ipv6.dst(); // The IP we received the packet on will be the new source IP.
let new_ipv6_payload_len = ipv6.payload_len() - CdHdr::LEN as u16;

View File

@@ -0,0 +1,15 @@
use aya_ebpf::{macros::map, maps::PerfEventArray, programs::XdpContext};
use ebpf_shared::StatsEvent;
#[map]
static STATS: PerfEventArray<StatsEvent> = PerfEventArray::new(0);
pub fn emit_data_relayed(ctx: &XdpContext, bytes: impl Into<u64>) {
STATS.output(
ctx,
&StatsEvent {
relayed_data: bytes.into(),
},
0,
);
}

View File

@@ -30,6 +30,10 @@ impl<'a> Udp<'a> {
u16::from_be_bytes(self.inner.len)
}
pub fn payload_len(&self) -> u16 {
self.len() - UdpHdr::LEN as u16
}
/// Update this packet with a new source, destination, and length.
#[inline(always)]
pub fn update(

View File

@@ -50,12 +50,13 @@ url = { workspace = true }
uuid = { workspace = true, features = ["v4"] }
[target.'cfg(target_os = "linux")'.dependencies]
aya = { workspace = true }
aya = { workspace = true, features = ["tokio"] }
aya-log = { workspace = true }
[dev-dependencies]
difference = { workspace = true }
env_logger = { workspace = true }
opentelemetry_sdk = { workspace = true, features = ["testing"] }
test-strategy = { workspace = true }
tokio = { workspace = true, features = ["process", "macros", "net"] }

View File

@@ -3,11 +3,14 @@ use std::net::SocketAddr;
use anyhow::{Context as _, Result};
use aya::{
Pod,
maps::{Array, HashMap, MapData},
maps::{Array, AsyncPerfEventArray, HashMap, MapData},
programs::{Xdp, XdpFlags},
};
use aya_log::EbpfLogger;
use ebpf_shared::{ClientAndChannelV4, ClientAndChannelV6, Config, PortAndPeerV4, PortAndPeerV6};
use bytes::BytesMut;
use ebpf_shared::{
ClientAndChannelV4, ClientAndChannelV6, Config, PortAndPeerV4, PortAndPeerV6, StatsEvent,
};
use stun_codec::rfc5766::attributes::ChannelNumber;
use crate::{AllocationPort, ClientSocket, PeerSocket};
@@ -32,6 +35,56 @@ impl Program {
.attach(interface, XdpFlags::default())
.with_context(|| format!("Failed to attached to interface {interface}"))?;
let mut stats = AsyncPerfEventArray::try_from(
ebpf.take_map("STATS")
.context("`STATS` perf array not found")?,
)?;
let data_relayed = opentelemetry::global::meter("relay")
.u64_counter("data_relayed_ebpf_bytes")
.with_description("The number of bytes relayed by the eBPF kernel")
.with_unit("b")
.init();
for cpu_id in aya::util::online_cpus()
.map_err(|(_, error)| error)
.context("Failed to determine number of CPUs")?
{
// open a separate perf buffer for each cpu
let mut stats_array_buf = stats.open(cpu_id, None)?;
// process each perf buffer in a separate task
tokio::task::spawn({
let data_relayed = data_relayed.clone();
async move {
let mut buffers = (0..1000)
.map(|_| BytesMut::with_capacity(std::mem::size_of::<StatsEvent>()))
.collect::<Vec<_>>();
loop {
let events = match stats_array_buf.read_events(&mut buffers).await {
Ok(events) => events,
Err(e) => {
tracing::warn!("Failed to read perf events: {e}");
break;
}
};
tracing::debug!(%cpu_id, num_read = %events.read, "Read perf events from eBPF kernel");
for bytes in buffers.iter().take(events.read) {
let Some(stats) = StatsEvent::from_bytes(bytes) else {
continue;
};
data_relayed.add(stats.relayed_data, &[]);
}
}
}
});
}
Ok(Self { ebpf })
}

View File

@@ -189,7 +189,7 @@ where
.with_description("The number of responses")
.init();
let data_relayed_counter = meter
.u64_counter("data_relayed_bytes")
.u64_counter("data_relayed_userspace_bytes")
.with_description("The number of bytes relayed")
.with_unit("b")
.init();

View File

@@ -1,6 +1,11 @@
#![allow(clippy::unwrap_used)]
use firezone_relay::{AllocationPort, ClientSocket, PeerSocket};
use opentelemetry::global;
use opentelemetry_sdk::{
metrics::{PeriodicReader, SdkMeterProvider, data::Sum},
testing::metrics::InMemoryMetricsExporter,
};
use std::time::Duration;
use tokio::net::UdpSocket;
@@ -12,6 +17,8 @@ use stun_codec::rfc5766::attributes::ChannelNumber;
async fn ping_pong() {
let _guard = firezone_logging::test("trace,mio=off");
let (_meter_provider, exporter) = init_meter_provider();
let mut program = firezone_relay::ebpf::Program::try_load("lo").unwrap();
// Linux does not set the correct UDP checksum when sending the packet, so our updated checksum in the eBPF code will be wrong and later dropped.
@@ -88,4 +95,31 @@ async fn ping_pong() {
assert_eq!(from.port(), 3478);
assert_eq!(channel_data.data(), msg);
}
tokio::time::sleep(Duration::from_millis(10)).await; // Wait for metrics to be exported.
let metrics = exporter.get_finished_metrics().unwrap();
assert!(!metrics.is_empty());
let metric = &metrics.iter().last().unwrap().scope_metrics[0].metrics[0];
let sum = metric.data.as_any().downcast_ref::<Sum<u64>>().unwrap();
assert_eq!(metric.name, "data_relayed_ebpf_bytes");
assert_eq!(sum.data_points[0].value, 12); // "ping" and "pong" are both 4 bytes, we also send 1 CD message, meaning + 4 bytes for that header.
}
fn init_meter_provider() -> (SdkMeterProvider, InMemoryMetricsExporter) {
let exporter = InMemoryMetricsExporter::default();
let provider = SdkMeterProvider::builder()
.with_reader(
PeriodicReader::builder(exporter.clone(), opentelemetry_sdk::runtime::Tokio)
.with_interval(Duration::from_millis(1))
.build(),
)
.build();
global::set_meter_provider(provider.clone());
(provider, exporter)
}