From 1d0ecf94b8aa837fcac973e63a0808825653c412 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 1 Apr 2025 08:57:31 +1100 Subject: [PATCH] feat(relay): record metrics about bytes relayed via eBPF (#8556) Perf events are designed to be an extremely efficient way of transferring data from an eBPF kernel to the user-space program. In order to monitor, how much traffic we are actually relaying via eBPF, we introduce a dedicated `STATS` map that is a `PerfEventArray`. The events from that array are read asynchronously in user-space and fed into our OTEL metrics. They will show up in our Google Cloud metrics as `data_relayed_ebpf_bytes`. We already have a metric for the total relayed bytes. That counter is renamed to `data_relayed_userspace_bytes` so we can clearly differentiate the two. --- rust/Cargo.lock | 108 +++++++++++++++++++++-- rust/deny.toml | 2 + rust/relay/ebpf-shared/src/lib.rs | 17 ++++ rust/relay/ebpf-turn-router/src/error.rs | 3 + rust/relay/ebpf-turn-router/src/main.rs | 76 +++++----------- rust/relay/ebpf-turn-router/src/stats.rs | 15 ++++ rust/relay/ebpf-turn-router/src/udp.rs | 4 + rust/relay/server/Cargo.toml | 3 +- rust/relay/server/src/ebpf/linux.rs | 57 +++++++++++- rust/relay/server/src/server.rs | 2 +- rust/relay/server/tests/ebpf_ipv4.rs | 34 +++++++ 11 files changed, 254 insertions(+), 67 deletions(-) create mode 100644 rust/relay/ebpf-turn-router/src/stats.rs diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 7bf4968ac..46e9b5c6b 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -219,12 +219,23 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "20cd0e2e25ea8e5f7e9df04578dc6cf5c83577fd09b1a46aaf5c85e1c33f2a7e" dependencies = [ - "event-listener", + "event-listener 5.3.1", "event-listener-strategy", "futures-core", "pin-project-lite", ] +[[package]] +name = "async-channel" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" +dependencies = [ + "concurrent-queue", + "event-listener 2.5.3", + "futures-core", +] + [[package]] name = "async-channel" version = "2.3.1" @@ -261,6 +272,21 @@ dependencies = [ "futures-lite", ] +[[package]] +name = "async-global-executor" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05b1b633a2115cd122d73b955eadd9916c18c8f510ec9cd1686404c60ad1c29c" +dependencies = [ + "async-channel 2.3.1", + "async-executor", + "async-io", + "async-lock", + "blocking", + "futures-lite", + "once_cell", +] + [[package]] name = "async-io" version = "2.3.4" @@ -286,7 +312,7 @@ version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18" dependencies = [ - "event-listener", + "event-listener 5.3.1", "event-listener-strategy", "pin-project-lite", ] @@ -297,14 +323,14 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "63255f1dc2381611000436537bbedfe83183faa303a5a0edaf191edef06526bb" dependencies = [ - "async-channel", + "async-channel 2.3.1", "async-io", "async-lock", "async-signal", "async-task", "blocking", "cfg-if", - "event-listener", + "event-listener 5.3.1", "futures-lite", "rustix", "tracing", @@ -339,6 +365,33 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "async-std" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c634475f29802fde2b8f0b505b1bd00dfe4df7d4a000f0b36f7671197d5c3615" +dependencies = [ + "async-channel 1.9.0", + "async-global-executor", + "async-io", + "async-lock", + "async-process", + "crossbeam-utils", + "futures-channel", + "futures-core", + "futures-io", + "futures-lite", + "gloo-timers", + "kv-log-macro", + "log", + "memchr", + "once_cell", + "pin-project-lite", + "pin-utils", + "slab", + "wasm-bindgen-futures", +] + [[package]] name = "async-stream" version = "0.3.6" @@ -725,7 +778,7 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "703f41c54fc768e63e091340b424302bb1c29ef4aa0c7f10fe849dfb114d29ea" dependencies = [ - "async-channel", + "async-channel 2.3.1", "async-task", "futures-io", "futures-lite", @@ -2032,6 +2085,12 @@ dependencies = [ "etherparse", ] +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + [[package]] name = "event-listener" version = "5.3.1" @@ -2049,7 +2108,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0f214dc438f977e6d4e3500aaa277f5ad94ca83fbbd9b1a15713ce2344ccc5a1" dependencies = [ - "event-listener", + "event-listener 5.3.1", "pin-project-lite", ] @@ -2923,6 +2982,18 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2" +[[package]] +name = "gloo-timers" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "gobject-sys" version = "0.18.0" @@ -3647,6 +3718,15 @@ dependencies = [ "selectors", ] +[[package]] +name = "kv-log-macro" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" +dependencies = [ + "log", +] + [[package]] name = "l4-tcp-dns-server" version = "0.1.0" @@ -3771,6 +3851,9 @@ name = "log" version = "0.4.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "30bde2b3dc3671ae49d8e2e9f044c7c005836e7a023ee57cffa25ab82764bb9e" +dependencies = [ + "value-bag", +] [[package]] name = "loom" @@ -3937,7 +4020,7 @@ dependencies = [ "crossbeam-channel", "crossbeam-epoch", "crossbeam-utils", - "event-listener", + "event-listener 5.3.1", "futures-util", "loom", "parking_lot", @@ -4578,6 +4661,7 @@ version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2c627d9f4c9cdc1f21a29ee4bfbd6028fcb8bcf2a857b43f3abdf72c9c862f3" dependencies = [ + "async-std", "async-trait", "futures-channel", "futures-executor", @@ -7855,6 +7939,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "value-bag" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ef4c4aa54d5d05a279399bfa921ec387b7aba77caf7a682ae8d86785b8fdad2" + [[package]] name = "version-compare" version = "0.2.0" @@ -8875,7 +8965,7 @@ dependencies = [ "async-recursion", "async-trait", "enumflags2", - "event-listener", + "event-listener 5.3.1", "futures-core", "futures-sink", "futures-util", @@ -8913,7 +9003,7 @@ dependencies = [ "async-trait", "blocking", "enumflags2", - "event-listener", + "event-listener 5.3.1", "futures-core", "futures-lite", "hex", diff --git a/rust/deny.toml b/rust/deny.toml index c8ebc7fa3..f116bd381 100644 --- a/rust/deny.toml +++ b/rust/deny.toml @@ -228,6 +228,7 @@ deny = [ # Certain crates/versions that will be skipped when doing duplicate detection. skip = [ + "async-channel", "base64", "bitflags", "core-foundation", @@ -236,6 +237,7 @@ skip = [ "derive_more", "dirs", "dirs-sys", + "event-listener", "getrandom", "hashbrown", "heck", diff --git a/rust/relay/ebpf-shared/src/lib.rs b/rust/relay/ebpf-shared/src/lib.rs index 3110ff0d6..3d8853153 100644 --- a/rust/relay/ebpf-shared/src/lib.rs +++ b/rust/relay/ebpf-shared/src/lib.rs @@ -219,6 +219,23 @@ impl Default for Config { } } +#[repr(C)] +#[derive(Clone, Copy)] +#[cfg_attr(feature = "std", derive(Debug))] +pub struct StatsEvent { + pub relayed_data: u64, +} + +impl StatsEvent { + #[cfg(feature = "std")] + pub fn from_bytes(bytes: &[u8]) -> Option { + let (chunk, _) = bytes.split_first_chunk()?; + let relayed_data = u64::from_ne_bytes(*chunk); + + Some(Self { relayed_data }) + } +} + #[cfg(all(feature = "std", target_os = "linux"))] mod userspace { use super::*; diff --git a/rust/relay/ebpf-turn-router/src/error.rs b/rust/relay/ebpf-turn-router/src/error.rs index 080181c0e..9de2cd50e 100644 --- a/rust/relay/ebpf-turn-router/src/error.rs +++ b/rust/relay/ebpf-turn-router/src/error.rs @@ -8,6 +8,7 @@ pub enum Error { Ipv4PacketWithOptions, NotAChannelDataMessage, BadChannelDataLength, + NoChannelBinding, } impl Error { @@ -17,6 +18,7 @@ impl Error { Error::Ipv4PacketWithOptions => xdp_action::XDP_PASS, Error::BadChannelDataLength => xdp_action::XDP_DROP, Error::NotAChannelDataMessage => xdp_action::XDP_PASS, + Error::NoChannelBinding => xdp_action::XDP_PASS, } } } @@ -28,6 +30,7 @@ impl aya_log_ebpf::WriteToBuf for Error { Error::Ipv4PacketWithOptions => "IPv4 packet has options", Error::NotAChannelDataMessage => "Not a channel data message", Error::BadChannelDataLength => "Channel data length does not match packet length", + Error::NoChannelBinding => "No channel binding", }; msg.write(buf) diff --git a/rust/relay/ebpf-turn-router/src/main.rs b/rust/relay/ebpf-turn-router/src/main.rs index 221d30a64..f6d5e9d6e 100644 --- a/rust/relay/ebpf-turn-router/src/main.rs +++ b/rust/relay/ebpf-turn-router/src/main.rs @@ -33,6 +33,7 @@ mod ip4; mod ip6; mod move_headers; mod slice_mut_at; +mod stats; mod udp; /// Channel mappings from an IPv4 socket + channel number to an IPv4 socket + port. @@ -90,6 +91,7 @@ fn try_handle_turn_ipv4(ctx: &XdpContext) -> Result { } let udp = Udp::parse(ctx, Ipv4Hdr::LEN)?; // TODO: Change the API so we parse the UDP header _from_ the ipv4 struct? + let udp_payload_len = udp.payload_len(); trace!( ctx, @@ -98,17 +100,19 @@ fn try_handle_turn_ipv4(ctx: &XdpContext) -> Result { udp.src(), ipv4.dst(), udp.dst(), - udp.len() + udp_payload_len ); if config::allocation_range().contains(&udp.dst()) { let action = try_handle_ipv4_udp_to_channel_data(ctx, ipv4, udp)?; + stats::emit_data_relayed(ctx, udp_payload_len); return Ok(action); } if udp.dst() == 3478 { let action = try_handle_ipv4_channel_data_to_udp(ctx, ipv4, udp)?; + stats::emit_data_relayed(ctx, udp_payload_len); return Ok(action); } @@ -125,19 +129,9 @@ fn try_handle_ipv4_channel_data_to_udp( let cd = ChannelData::parse(ctx, Ipv4Hdr::LEN)?; // SAFETY: ??? - let maybe_peer = - unsafe { CHAN_TO_UDP_44.get(&ClientAndChannelV4::new(ipv4.src(), udp.src(), cd.number())) }; - let Some(port_and_peer) = maybe_peer else { - debug!( - ctx, - "No channel binding from {:i}:{} for channel {}", - ipv4.src(), - udp.src(), - cd.number(), - ); - - return Ok(xdp_action::XDP_PASS); - }; + let port_and_peer = + unsafe { CHAN_TO_UDP_44.get(&ClientAndChannelV4::new(ipv4.src(), udp.src(), cd.number())) } + .ok_or(Error::NoChannelBinding)?; let new_src = ipv4.dst(); // The IP we received the packet on will be the new source IP. let new_ipv4_total_len = ipv4.total_len() - CdHdr::LEN as u16; @@ -162,19 +156,9 @@ fn try_handle_ipv4_udp_to_channel_data( ipv4: Ip4, udp: Udp, ) -> Result { - let maybe_client = - unsafe { UDP_TO_CHAN_44.get(&PortAndPeerV4::new(ipv4.src(), udp.dst(), udp.src())) }; - let Some(client_and_channel) = maybe_client else { - debug!( - ctx, - "No channel binding from {:i}:{} on allocation {}", - ipv4.src(), - udp.src(), - udp.dst(), - ); - - return Ok(xdp_action::XDP_PASS); - }; + let client_and_channel = + unsafe { UDP_TO_CHAN_44.get(&PortAndPeerV4::new(ipv4.src(), udp.dst(), udp.src())) } + .ok_or(Error::NoChannelBinding)?; let new_src = ipv4.dst(); // The IP we received the packet on will be the new source IP. let new_ipv4_total_len = ipv4.total_len() + CdHdr::LEN as u16; @@ -208,6 +192,8 @@ fn try_handle_turn_ipv6(ctx: &XdpContext) -> Result { } let udp = Udp::parse(ctx, Ipv6Hdr::LEN)?; // TODO: Change the API so we parse the UDP header _from_ the ipv6 struct? + let udp_payload_len = udp.payload_len(); + trace!( ctx, "New packet from {:i}:{} for {:i}:{} with UDP payload {}", @@ -215,17 +201,19 @@ fn try_handle_turn_ipv6(ctx: &XdpContext) -> Result { udp.src(), ipv6.dst(), udp.dst(), - udp.len() + udp_payload_len ); if config::allocation_range().contains(&udp.dst()) { let action = try_handle_ipv6_udp_to_channel_data(ctx, ipv6, udp)?; + stats::emit_data_relayed(ctx, udp_payload_len); return Ok(action); } if udp.dst() == 3478 { let action = try_handle_ipv6_channel_data_to_udp(ctx, ipv6, udp)?; + stats::emit_data_relayed(ctx, udp_payload_len); return Ok(action); } @@ -238,19 +226,9 @@ fn try_handle_ipv6_udp_to_channel_data( ipv6: Ip6, udp: Udp, ) -> Result { - let maybe_client = - unsafe { UDP_TO_CHAN_66.get(&PortAndPeerV6::new(ipv6.src(), udp.dst(), udp.src())) }; - let Some(client_and_channel) = maybe_client else { - debug!( - ctx, - "No channel binding from {:i}:{} on allocation {}", - ipv6.src(), - udp.src(), - udp.dst(), - ); - - return Ok(xdp_action::XDP_PASS); - }; + let client_and_channel = + unsafe { UDP_TO_CHAN_66.get(&PortAndPeerV6::new(ipv6.src(), udp.dst(), udp.src())) } + .ok_or(Error::NoChannelBinding)?; let new_src = ipv6.dst(); // The IP we received the packet on will be the new source IP. let new_ipv6_total_len = ipv6.payload_len() + CdHdr::LEN as u16; @@ -283,19 +261,9 @@ fn try_handle_ipv6_channel_data_to_udp( let cd = ChannelData::parse(ctx, Ipv6Hdr::LEN)?; // SAFETY: ??? - let maybe_peer = - unsafe { CHAN_TO_UDP_66.get(&ClientAndChannelV6::new(ipv6.src(), udp.src(), cd.number())) }; - let Some(port_and_peer) = maybe_peer else { - debug!( - ctx, - "No channel binding from {:i}:{} for channel {}", - ipv6.src(), - udp.src(), - cd.number(), - ); - - return Ok(xdp_action::XDP_PASS); - }; + let port_and_peer = + unsafe { CHAN_TO_UDP_66.get(&ClientAndChannelV6::new(ipv6.src(), udp.src(), cd.number())) } + .ok_or(Error::NoChannelBinding)?; let new_src = ipv6.dst(); // The IP we received the packet on will be the new source IP. let new_ipv6_payload_len = ipv6.payload_len() - CdHdr::LEN as u16; diff --git a/rust/relay/ebpf-turn-router/src/stats.rs b/rust/relay/ebpf-turn-router/src/stats.rs new file mode 100644 index 000000000..90cd7ad76 --- /dev/null +++ b/rust/relay/ebpf-turn-router/src/stats.rs @@ -0,0 +1,15 @@ +use aya_ebpf::{macros::map, maps::PerfEventArray, programs::XdpContext}; +use ebpf_shared::StatsEvent; + +#[map] +static STATS: PerfEventArray = PerfEventArray::new(0); + +pub fn emit_data_relayed(ctx: &XdpContext, bytes: impl Into) { + STATS.output( + ctx, + &StatsEvent { + relayed_data: bytes.into(), + }, + 0, + ); +} diff --git a/rust/relay/ebpf-turn-router/src/udp.rs b/rust/relay/ebpf-turn-router/src/udp.rs index adcabe574..eb201ef09 100644 --- a/rust/relay/ebpf-turn-router/src/udp.rs +++ b/rust/relay/ebpf-turn-router/src/udp.rs @@ -30,6 +30,10 @@ impl<'a> Udp<'a> { u16::from_be_bytes(self.inner.len) } + pub fn payload_len(&self) -> u16 { + self.len() - UdpHdr::LEN as u16 + } + /// Update this packet with a new source, destination, and length. #[inline(always)] pub fn update( diff --git a/rust/relay/server/Cargo.toml b/rust/relay/server/Cargo.toml index 126aa491d..1c8de4090 100644 --- a/rust/relay/server/Cargo.toml +++ b/rust/relay/server/Cargo.toml @@ -50,12 +50,13 @@ url = { workspace = true } uuid = { workspace = true, features = ["v4"] } [target.'cfg(target_os = "linux")'.dependencies] -aya = { workspace = true } +aya = { workspace = true, features = ["tokio"] } aya-log = { workspace = true } [dev-dependencies] difference = { workspace = true } env_logger = { workspace = true } +opentelemetry_sdk = { workspace = true, features = ["testing"] } test-strategy = { workspace = true } tokio = { workspace = true, features = ["process", "macros", "net"] } diff --git a/rust/relay/server/src/ebpf/linux.rs b/rust/relay/server/src/ebpf/linux.rs index c2120e707..1cf9d7a95 100644 --- a/rust/relay/server/src/ebpf/linux.rs +++ b/rust/relay/server/src/ebpf/linux.rs @@ -3,11 +3,14 @@ use std::net::SocketAddr; use anyhow::{Context as _, Result}; use aya::{ Pod, - maps::{Array, HashMap, MapData}, + maps::{Array, AsyncPerfEventArray, HashMap, MapData}, programs::{Xdp, XdpFlags}, }; use aya_log::EbpfLogger; -use ebpf_shared::{ClientAndChannelV4, ClientAndChannelV6, Config, PortAndPeerV4, PortAndPeerV6}; +use bytes::BytesMut; +use ebpf_shared::{ + ClientAndChannelV4, ClientAndChannelV6, Config, PortAndPeerV4, PortAndPeerV6, StatsEvent, +}; use stun_codec::rfc5766::attributes::ChannelNumber; use crate::{AllocationPort, ClientSocket, PeerSocket}; @@ -32,6 +35,56 @@ impl Program { .attach(interface, XdpFlags::default()) .with_context(|| format!("Failed to attached to interface {interface}"))?; + let mut stats = AsyncPerfEventArray::try_from( + ebpf.take_map("STATS") + .context("`STATS` perf array not found")?, + )?; + + let data_relayed = opentelemetry::global::meter("relay") + .u64_counter("data_relayed_ebpf_bytes") + .with_description("The number of bytes relayed by the eBPF kernel") + .with_unit("b") + .init(); + + for cpu_id in aya::util::online_cpus() + .map_err(|(_, error)| error) + .context("Failed to determine number of CPUs")? + { + // open a separate perf buffer for each cpu + let mut stats_array_buf = stats.open(cpu_id, None)?; + + // process each perf buffer in a separate task + tokio::task::spawn({ + let data_relayed = data_relayed.clone(); + + async move { + let mut buffers = (0..1000) + .map(|_| BytesMut::with_capacity(std::mem::size_of::())) + .collect::>(); + + loop { + let events = match stats_array_buf.read_events(&mut buffers).await { + Ok(events) => events, + Err(e) => { + tracing::warn!("Failed to read perf events: {e}"); + break; + } + }; + + tracing::debug!(%cpu_id, num_read = %events.read, "Read perf events from eBPF kernel"); + + for bytes in buffers.iter().take(events.read) { + let Some(stats) = StatsEvent::from_bytes(bytes) else { + continue; + }; + + data_relayed.add(stats.relayed_data, &[]); + } + } + } + }); + } + Ok(Self { ebpf }) } diff --git a/rust/relay/server/src/server.rs b/rust/relay/server/src/server.rs index 34c3b79cb..5f33cb9d2 100644 --- a/rust/relay/server/src/server.rs +++ b/rust/relay/server/src/server.rs @@ -189,7 +189,7 @@ where .with_description("The number of responses") .init(); let data_relayed_counter = meter - .u64_counter("data_relayed_bytes") + .u64_counter("data_relayed_userspace_bytes") .with_description("The number of bytes relayed") .with_unit("b") .init(); diff --git a/rust/relay/server/tests/ebpf_ipv4.rs b/rust/relay/server/tests/ebpf_ipv4.rs index 7a87cfb30..8e1855679 100644 --- a/rust/relay/server/tests/ebpf_ipv4.rs +++ b/rust/relay/server/tests/ebpf_ipv4.rs @@ -1,6 +1,11 @@ #![allow(clippy::unwrap_used)] use firezone_relay::{AllocationPort, ClientSocket, PeerSocket}; +use opentelemetry::global; +use opentelemetry_sdk::{ + metrics::{PeriodicReader, SdkMeterProvider, data::Sum}, + testing::metrics::InMemoryMetricsExporter, +}; use std::time::Duration; use tokio::net::UdpSocket; @@ -12,6 +17,8 @@ use stun_codec::rfc5766::attributes::ChannelNumber; async fn ping_pong() { let _guard = firezone_logging::test("trace,mio=off"); + let (_meter_provider, exporter) = init_meter_provider(); + let mut program = firezone_relay::ebpf::Program::try_load("lo").unwrap(); // Linux does not set the correct UDP checksum when sending the packet, so our updated checksum in the eBPF code will be wrong and later dropped. @@ -88,4 +95,31 @@ async fn ping_pong() { assert_eq!(from.port(), 3478); assert_eq!(channel_data.data(), msg); } + + tokio::time::sleep(Duration::from_millis(10)).await; // Wait for metrics to be exported. + + let metrics = exporter.get_finished_metrics().unwrap(); + + assert!(!metrics.is_empty()); + + let metric = &metrics.iter().last().unwrap().scope_metrics[0].metrics[0]; + let sum = metric.data.as_any().downcast_ref::>().unwrap(); + + assert_eq!(metric.name, "data_relayed_ebpf_bytes"); + assert_eq!(sum.data_points[0].value, 12); // "ping" and "pong" are both 4 bytes, we also send 1 CD message, meaning + 4 bytes for that header. +} + +fn init_meter_provider() -> (SdkMeterProvider, InMemoryMetricsExporter) { + let exporter = InMemoryMetricsExporter::default(); + + let provider = SdkMeterProvider::builder() + .with_reader( + PeriodicReader::builder(exporter.clone(), opentelemetry_sdk::runtime::Tokio) + .with_interval(Duration::from_millis(1)) + .build(), + ) + .build(); + global::set_meter_provider(provider.clone()); + + (provider, exporter) }