chore(rust): initialise OTEL with useful metadata (#8945)

Once we start collecting metrics across various Clients and Gateways,
these metrics need to be tagged with the correct `service.name`,
`service.version` as well as an instance ID to differentiate metrics
from different instances.
This commit is contained in:
Thomas Eizinger
2025-05-01 15:19:07 +10:00
committed by GitHub
parent 42b2420c00
commit ea5709e8da
12 changed files with 215 additions and 132 deletions

3
rust/Cargo.lock generated
View File

@@ -2473,6 +2473,9 @@ name = "firezone-telemetry"
version = "0.1.0"
dependencies = [
"anyhow",
"ip-packet",
"opentelemetry",
"opentelemetry_sdk",
"parking_lot",
"reqwest",
"sentry",

View File

@@ -3,7 +3,7 @@ mod nameserver_set;
mod tcp_dns;
mod udp_dns;
use crate::{device_channel::Device, dns, sockets::Sockets};
use crate::{device_channel::Device, dns, otel, sockets::Sockets};
use anyhow::{Context as _, Result};
use firezone_logging::{telemetry_event, telemetry_span};
use futures::FutureExt as _;
@@ -194,15 +194,15 @@ impl Io {
self.packet_counter.add(
num_ipv4 as u64,
&[
crate::otel::network_type_ipv4(),
crate::otel::network_io_direction_receive(),
otel::attr::network_type_ipv4(),
otel::attr::network_io_direction_receive(),
],
);
self.packet_counter.add(
num_ipv6 as u64,
&[
crate::otel::network_type_ipv6(),
crate::otel::network_io_direction_receive(),
otel::attr::network_type_ipv6(),
otel::attr::network_io_direction_receive(),
],
);
@@ -312,8 +312,8 @@ impl Io {
self.packet_counter.add(
1,
&[
crate::otel::network_type_for_packet(&packet),
crate::otel::network_io_direction_transmit(),
otel::attr::network_type_for_packet(&packet),
otel::attr::network_io_direction_transmit(),
],
);
@@ -351,9 +351,9 @@ impl Io {
self.packet_counter.add(
1,
&[
crate::otel::network_protocol_name(payload),
crate::otel::network_transport_udp(),
crate::otel::network_io_direction_transmit(),
otel::attr::network_protocol_name(payload),
otel::attr::network_transport_udp(),
otel::attr::network_io_direction_transmit(),
],
);
}

View File

@@ -195,9 +195,9 @@ impl ClientTunnel {
self.packet_counter.add(
1,
&[
crate::otel::network_protocol_name(received.packet),
crate::otel::network_transport_udp(),
crate::otel::network_io_direction_receive(),
otel::attr::network_protocol_name(received.packet),
otel::attr::network_transport_udp(),
otel::attr::network_io_direction_receive(),
],
);
@@ -327,9 +327,9 @@ impl GatewayTunnel {
self.packet_counter.add(
1,
&[
crate::otel::network_protocol_name(received.packet),
crate::otel::network_transport_udp(),
crate::otel::network_io_direction_receive(),
otel::attr::network_protocol_name(received.packet),
otel::attr::network_transport_udp(),
otel::attr::network_io_direction_receive(),
],
);

View File

@@ -1,93 +1,23 @@
use std::{io, net::SocketAddr};
pub mod attr {
pub use firezone_telemetry::otel::attr::*;
use ip_packet::IpPacket;
use opentelemetry::{KeyValue, Value};
use opentelemetry::KeyValue;
pub fn network_transport_udp() -> KeyValue {
KeyValue::new("network.transport", "udp")
}
pub fn network_protocol_name(payload: &[u8]) -> KeyValue {
const KEY: &str = "network.protocol.name";
pub fn network_type_for_packet(p: &IpPacket) -> KeyValue {
match p {
IpPacket::Ipv4(_) => network_type_ipv4(),
IpPacket::Ipv6(_) => network_type_ipv6(),
}
}
pub fn network_type_for_addr(addr: SocketAddr) -> KeyValue {
match addr {
SocketAddr::V4(_) => network_type_ipv4(),
SocketAddr::V6(_) => network_type_ipv6(),
}
}
pub fn network_protocol_name(payload: &[u8]) -> KeyValue {
const KEY: &str = "network.protocol.name";
match payload {
[0..3, ..] => KeyValue::new(KEY, "stun"),
// Channel-data is a 4-byte header so the actual payload starts on the 5th byte
[64..=79, _, _, _, 0..3, ..] => KeyValue::new(KEY, "stun-over-turn"),
[64..=79, _, _, _, payload @ ..] if snownet::is_wireguard(payload) => {
KeyValue::new(KEY, "wireguard-over-turn")
match payload {
[0..3, ..] => KeyValue::new(KEY, "stun"),
// Channel-data is a 4-byte header so the actual payload starts on the 5th byte
[64..=79, _, _, _, 0..3, ..] => KeyValue::new(KEY, "stun-over-turn"),
[64..=79, _, _, _, payload @ ..] if snownet::is_wireguard(payload) => {
KeyValue::new(KEY, "wireguard-over-turn")
}
[64..=79, _, _, _, ..] => KeyValue::new(KEY, "unknown-over-turn"),
payload if snownet::is_wireguard(payload) => KeyValue::new(KEY, "wireguard"),
_ => KeyValue::new(KEY, "unknown"),
}
[64..=79, _, _, _, ..] => KeyValue::new(KEY, "unknown-over-turn"),
payload if snownet::is_wireguard(payload) => KeyValue::new(KEY, "wireguard"),
_ => KeyValue::new(KEY, "unknown"),
}
}
pub fn network_type_ipv4() -> KeyValue {
KeyValue::new("network.type", "ipv4")
}
pub fn network_type_ipv6() -> KeyValue {
KeyValue::new("network.type", "ipv6")
}
pub fn network_io_direction_receive() -> KeyValue {
KeyValue::new("network.io.direction", "receive")
}
pub fn network_io_direction_transmit() -> KeyValue {
KeyValue::new("network.io.direction", "transmit")
}
pub fn io_error_code(e: &io::Error) -> KeyValue {
KeyValue::new("error.code", e.raw_os_error().unwrap_or_default() as i64)
}
pub fn io_error_type(e: &io::Error) -> KeyValue {
error_type(format!("io::ErrorKind::{:?}", e.kind()))
}
pub fn error_type(ty: impl Into<Value>) -> KeyValue {
KeyValue::new("error.type", ty)
}
pub mod metrics {
use opentelemetry::metrics::Counter;
pub fn network_packet_dropped() -> Counter<u64> {
opentelemetry::global::meter("connlib")
.u64_counter("network.packet.dropped")
.with_description("Count of packets that are dropped or discarded")
.with_unit("{packet}")
.init()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn io_error_type_serialisation() {
let error = io::Error::from(io::ErrorKind::NetworkUnreachable);
assert_eq!(
io_error_type(&error),
KeyValue::new("error.type", "io::ErrorKind::NetworkUnreachable")
);
}
}
pub use firezone_telemetry::otel::metrics;

View File

@@ -15,7 +15,7 @@ use ip_network_table::IpNetworkTable;
use ip_packet::{IpPacket, PacketBuilder, Protocol, UnsupportedProtocol, icmpv4, icmpv6};
use crate::utils::network_contains_network;
use crate::{GatewayEvent, IpConfig};
use crate::{GatewayEvent, IpConfig, otel};
use anyhow::{Context, Result, bail};
use nat_table::{NatTable, TranslateIncomingResult};
@@ -95,7 +95,7 @@ impl ClientOnGateway {
nat_table: Default::default(),
buffered_events: Default::default(),
internet_resource_enabled: false,
num_dropped_packets: crate::otel::metrics::network_packet_dropped(),
num_dropped_packets: otel::metrics::network_packet_dropped(),
}
}
@@ -364,9 +364,9 @@ impl ClientOnGateway {
self.num_dropped_packets.add(
1,
&[
crate::otel::network_type_for_packet(&packet),
crate::otel::network_io_direction_receive(),
crate::otel::error_type(e.root_cause().to_string()),
otel::attr::network_type_for_packet(&packet),
otel::attr::network_io_direction_receive(),
otel::attr::error_type(e.root_cause().to_string()),
],
);
@@ -401,9 +401,9 @@ impl ClientOnGateway {
self.num_dropped_packets.add(
1,
&[
crate::otel::network_type_for_packet(&packet),
crate::otel::network_io_direction_receive(),
crate::otel::error_type("ExpiredNatSession"),
otel::attr::network_type_for_packet(&packet),
otel::attr::network_io_direction_receive(),
otel::attr::error_type("ExpiredNatSession"),
],
);

View File

@@ -1,3 +1,4 @@
use crate::otel;
use anyhow::Result;
use futures::{SinkExt, StreamExt, ready};
use gat_lending_iterator::LendingIterator;
@@ -201,10 +202,10 @@ impl ThreadedUdpSocket {
if let Err(e) = socket.send(datagram).await {
if let Some(io) = e.downcast_ref::<io::Error>() {
io_error_counter.add(1, &[
crate::otel::network_io_direction_transmit(),
crate::otel::network_type_for_addr(addr),
crate::otel::io_error_type(io),
crate::otel::io_error_code(io)
otel::attr::network_io_direction_transmit(),
otel::attr::network_type_for_addr(addr),
otel::attr::io_error_type(io),
otel::attr::io_error_code(io)
]);
}
@@ -226,10 +227,10 @@ impl ThreadedUdpSocket {
if let Some(io) = result.as_ref().err().and_then(|e| e.downcast_ref::<io::Error>()) {
io_error_counter.add(1, &[
crate::otel::network_io_direction_receive(),
crate::otel::network_type_for_addr(addr),
crate::otel::io_error_type(io),
crate::otel::io_error_code(io)
otel::attr::network_io_direction_receive(),
otel::attr::network_type_for_addr(addr),
otel::attr::io_error_type(io),
otel::attr::io_error_code(io)
]);
}

View File

@@ -2,6 +2,8 @@ use ip_packet::IpPacket;
use opentelemetry::KeyValue;
use ringbuffer::{AllocRingBuffer, RingBuffer};
use crate::otel;
pub struct UniquePacketBuffer {
buffer: AllocRingBuffer<IpPacket>,
tag: &'static str,
@@ -41,10 +43,10 @@ impl UniquePacketBuffer {
self.num_dropped_packets.add(
1,
&[
crate::otel::network_type_for_packet(&new),
crate::otel::network_io_direction_transmit(),
otel::attr::network_type_for_packet(&new),
otel::attr::network_io_direction_transmit(),
KeyValue::new("system.buffer.pool.name", self.tag),
crate::otel::error_type("BufferFull"),
otel::attr::error_type("BufferFull"),
],
);
}

View File

@@ -11,7 +11,7 @@ use firezone_bin_shared::{
platform::{tcp_socket_factory, udp_socket_factory},
};
use firezone_telemetry::Telemetry;
use firezone_telemetry::{Telemetry, otel};
use firezone_tunnel::GatewayTunnel;
use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
use phoenix_channel::LoginUrl;
@@ -105,18 +105,25 @@ async fn try_main(cli: Cli) -> Result<ExitCode> {
firezone_logging::setup_global_subscriber(layer::Identity::default())
.context("Failed to set up logging")?;
if cli.metrics {
let exporter = opentelemetry_stdout::MetricsExporter::default();
let reader = PeriodicReader::builder(exporter, opentelemetry_sdk::runtime::Tokio).build();
let provider = SdkMeterProvider::builder().with_reader(reader).build();
opentelemetry::global::set_meter_provider(provider);
}
let firezone_id = get_firezone_id(cli.firezone_id).await
.context("Couldn't read FIREZONE_ID or write it to disk: Please provide it through the env variable or provide rw access to /var/lib/firezone/")?;
Telemetry::set_firezone_id(firezone_id.clone());
if cli.metrics {
let exporter = opentelemetry_stdout::MetricsExporter::default();
let reader = PeriodicReader::builder(exporter, opentelemetry_sdk::runtime::Tokio).build();
let provider = SdkMeterProvider::builder()
.with_reader(reader)
.with_resource(otel::default_resource_with([
otel::attr::service_name!(),
otel::attr::service_version!(),
otel::attr::service_instance_id(firezone_id.clone()),
]))
.build();
opentelemetry::global::set_meter_provider(provider);
}
let login = LoginUrl::gateway(
cli.api_url,
&SecretString::new(cli.token),

View File

@@ -15,6 +15,7 @@ use firezone_headless_client::{
};
use firezone_logging::telemetry_span;
use firezone_telemetry::Telemetry;
use firezone_telemetry::otel;
use futures::StreamExt as _;
use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
use phoenix_channel::LoginUrl;
@@ -192,7 +193,7 @@ fn main() -> Result<()> {
let url = LoginUrl::client(
cli.api_url,
&token,
firezone_id,
firezone_id.clone(),
cli.firezone_name,
device_id::device_info(),
)?;
@@ -213,7 +214,14 @@ fn main() -> Result<()> {
let exporter = opentelemetry_stdout::MetricsExporter::default();
let reader =
PeriodicReader::builder(exporter, opentelemetry_sdk::runtime::Tokio).build();
let provider = SdkMeterProvider::builder().with_reader(reader).build();
let provider = SdkMeterProvider::builder()
.with_reader(reader)
.with_resource(otel::default_resource_with([
otel::attr::service_name!(),
otel::attr::service_version!(),
otel::attr::service_instance_id(firezone_id),
]))
.build();
opentelemetry::global::set_meter_provider(provider);
}

View File

@@ -6,6 +6,9 @@ license = { workspace = true }
[dependencies]
anyhow = { workspace = true }
ip-packet = { workspace = true }
opentelemetry = { workspace = true }
opentelemetry_sdk = { workspace = true }
parking_lot = { workspace = true }
reqwest = { workspace = true }
sentry = { workspace = true, features = ["contexts", "backtrace", "debug-images", "panic", "reqwest", "rustls", "tracing"] }

View File

@@ -6,6 +6,7 @@ use env::ON_PREM;
use sentry::protocol::SessionStatus;
pub mod feature_flags;
pub mod otel;
pub struct Dsn(&'static str);

128
rust/telemetry/src/otel.rs Normal file
View File

@@ -0,0 +1,128 @@
use std::time::Duration;
use opentelemetry::KeyValue;
use opentelemetry_sdk::{
Resource,
resource::{ResourceDetector, TelemetryResourceDetector},
};
pub mod attr {
use ip_packet::IpPacket;
use opentelemetry::Value;
use std::{io, net::SocketAddr};
use super::*;
#[macro_export]
macro_rules! service_name {
() => {
::opentelemetry::KeyValue::new("service.name", env!("CARGO_PKG_NAME"))
};
}
#[macro_export]
macro_rules! service_version {
() => {
::opentelemetry::KeyValue::new("service.version", env!("CARGO_PKG_VERSION"))
};
}
pub use service_name;
pub use service_version;
pub fn service_instance_id(firezone_id: String) -> KeyValue {
KeyValue::new("service.instance.id", firezone_id)
}
pub fn network_transport_udp() -> KeyValue {
KeyValue::new("network.transport", "udp")
}
pub fn network_type_for_packet(p: &IpPacket) -> KeyValue {
match p {
IpPacket::Ipv4(_) => network_type_ipv4(),
IpPacket::Ipv6(_) => network_type_ipv6(),
}
}
pub fn network_type_for_addr(addr: SocketAddr) -> KeyValue {
match addr {
SocketAddr::V4(_) => network_type_ipv4(),
SocketAddr::V6(_) => network_type_ipv6(),
}
}
pub fn network_type_ipv4() -> KeyValue {
KeyValue::new("network.type", "ipv4")
}
pub fn network_type_ipv6() -> KeyValue {
KeyValue::new("network.type", "ipv6")
}
pub fn network_io_direction_receive() -> KeyValue {
KeyValue::new("network.io.direction", "receive")
}
pub fn network_io_direction_transmit() -> KeyValue {
KeyValue::new("network.io.direction", "transmit")
}
pub fn io_error_code(e: &io::Error) -> KeyValue {
KeyValue::new("error.code", e.raw_os_error().unwrap_or_default() as i64)
}
pub fn io_error_type(e: &io::Error) -> KeyValue {
error_type(format!("io::ErrorKind::{:?}", e.kind()))
}
pub fn error_type(value: impl Into<Value>) -> KeyValue {
KeyValue::new("error.type", value)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn io_error_type_serialisation() {
let error = io::Error::from(io::ErrorKind::NetworkUnreachable);
assert_eq!(
io_error_type(&error),
KeyValue::new("error.type", "io::ErrorKind::NetworkUnreachable")
);
}
}
}
pub mod metrics {
use opentelemetry::metrics::Counter;
pub fn network_packet_dropped() -> Counter<u64> {
opentelemetry::global::meter("connlib")
.u64_counter("network.packet.dropped")
.with_description("Count of packets that are dropped or discarded")
.with_unit("{packet}")
.init()
}
}
pub fn default_resource_with<const N: usize>(attributes: [KeyValue; N]) -> Resource {
Resource::from_detectors(
Duration::from_secs(0),
vec![
Box::new(TelemetryResourceDetector),
Box::new(OsResourceDetector),
],
)
.merge(&Resource::new(attributes))
}
pub struct OsResourceDetector;
impl ResourceDetector for OsResourceDetector {
fn detect(&self, _: Duration) -> Resource {
Resource::new([KeyValue::new("os.type", std::env::consts::OS)])
}
}