mirror of
https://github.com/outbackdingo/firezone.git
synced 2026-01-27 18:18:55 +00:00
refactor(relay): use tracing::instrument macro for spans (#2068)
This commit is contained in:
@@ -35,7 +35,7 @@ use stun_codec::rfc5766::attributes::{
|
||||
use stun_codec::rfc5766::errors::{AllocationMismatch, InsufficientCapacity};
|
||||
use stun_codec::rfc5766::methods::{ALLOCATE, CHANNEL_BIND, CREATE_PERMISSION, REFRESH};
|
||||
use stun_codec::{Message, MessageClass, MessageEncoder, Method, TransactionId};
|
||||
use tracing::{field, log};
|
||||
use tracing::{field, log, Span};
|
||||
use uuid::Uuid;
|
||||
|
||||
/// A sans-IO STUN & TURN server.
|
||||
@@ -212,11 +212,8 @@ where
|
||||
/// Process the bytes received from a client.
|
||||
///
|
||||
/// After calling this method, you should call [`Server::next_command`] until it returns `None`.
|
||||
#[tracing::instrument(skip_all, fields(transaction_id, %sender), level = "error")]
|
||||
pub fn handle_client_input(&mut self, bytes: &[u8], sender: SocketAddr, now: SystemTime) {
|
||||
let span =
|
||||
tracing::error_span!("handle_client_input", %sender, transaction_id = field::Empty);
|
||||
let _guard = span.enter();
|
||||
|
||||
if tracing::enabled!(target: "wire", tracing::Level::TRACE) {
|
||||
let hex_bytes = hex::encode(bytes);
|
||||
tracing::trace!(target: "wire", %hex_bytes, "receiving bytes");
|
||||
@@ -225,7 +222,7 @@ where
|
||||
match self.decoder.decode(bytes) {
|
||||
Ok(Ok(message)) => {
|
||||
if let Some(id) = message.transaction_id() {
|
||||
span.record("transaction_id", hex::encode(id.as_bytes()));
|
||||
Span::current().record("transaction_id", hex::encode(id.as_bytes()));
|
||||
}
|
||||
|
||||
self.handle_client_message(message, sender, now);
|
||||
@@ -236,16 +233,10 @@ where
|
||||
}
|
||||
// Parsing the bytes failed.
|
||||
Err(client_message::Error::BadChannelData(ref error)) => {
|
||||
tracing::debug!(
|
||||
error = error as &dyn std::error::Error,
|
||||
"failed to decode channel data"
|
||||
)
|
||||
tracing::debug!(%error, "failed to decode channel data")
|
||||
}
|
||||
Err(client_message::Error::DecodeStun(ref error)) => {
|
||||
tracing::debug!(
|
||||
error = error as &dyn std::error::Error,
|
||||
"failed to decode stun packet"
|
||||
)
|
||||
tracing::debug!(%error, "failed to decode stun packet")
|
||||
}
|
||||
Err(client_message::Error::UnknownMessageType(t)) => {
|
||||
tracing::debug!(r#type = %t, "unknown STUN message type")
|
||||
@@ -308,15 +299,13 @@ where
|
||||
}
|
||||
|
||||
/// Process the bytes received from an allocation.
|
||||
#[tracing::instrument(skip_all, fields(%sender, %allocation_id, recipient, channel), level = "error")]
|
||||
pub fn handle_relay_input(
|
||||
&mut self,
|
||||
bytes: &[u8],
|
||||
sender: SocketAddr,
|
||||
allocation_id: AllocationId,
|
||||
) {
|
||||
let span = tracing::error_span!("handle_relay_input", %sender, %allocation_id, recipient = field::Empty, channel = field::Empty);
|
||||
let _guard = span.enter();
|
||||
|
||||
if tracing::enabled!(target: "wire", tracing::Level::TRACE) {
|
||||
let hex_bytes = hex::encode(bytes);
|
||||
tracing::trace!(target: "wire", %hex_bytes, "receiving bytes");
|
||||
@@ -327,14 +316,14 @@ where
|
||||
return;
|
||||
};
|
||||
|
||||
span.record("recipient", field::display(&recipient));
|
||||
Span::current().record("recipient", field::display(&recipient));
|
||||
|
||||
let Some(channel_number) = self.channel_numbers_by_peer.get(&sender) else {
|
||||
tracing::debug!(target: "relay", "no active channel, refusing to relay {} bytes", bytes.len());
|
||||
return;
|
||||
};
|
||||
|
||||
span.record("channel", channel_number);
|
||||
Span::current().record("channel", channel_number);
|
||||
|
||||
let Some(channel) = self.channels_by_number.get(channel_number) else {
|
||||
debug_assert!(false, "unknown channel {}", channel_number);
|
||||
@@ -369,10 +358,8 @@ where
|
||||
})
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self), level = "error")]
|
||||
pub fn handle_deadline_reached(&mut self, now: SystemTime) {
|
||||
let span = tracing::error_span!("handle_deadline_reached", ?now);
|
||||
let _guard = span.enter();
|
||||
|
||||
for action in self.time_events.pending_actions(now) {
|
||||
match action {
|
||||
TimedAction::ExpireAllocation(id) => {
|
||||
@@ -412,11 +399,9 @@ where
|
||||
}
|
||||
|
||||
/// An allocation failed.
|
||||
pub fn handle_allocation_failed(&mut self, id: AllocationId) {
|
||||
let span = tracing::error_span!("handle_allocation_failed", %id);
|
||||
let _guard = span.enter();
|
||||
|
||||
self.delete_allocation(id)
|
||||
#[tracing::instrument(skip(self), fields(%allocation_id), level = "error")]
|
||||
pub fn handle_allocation_failed(&mut self, allocation_id: AllocationId) {
|
||||
self.delete_allocation(allocation_id)
|
||||
}
|
||||
|
||||
/// Return the next command to be executed.
|
||||
@@ -444,17 +429,13 @@ where
|
||||
/// Handle a TURN allocate request.
|
||||
///
|
||||
/// See <https://www.rfc-editor.org/rfc/rfc8656#name-receiving-an-allocate-reque> for details.
|
||||
#[tracing::instrument(skip(self, request, now), fields(%sender, effective_lifetime = ?request.effective_lifetime().lifetime()), level = "error")]
|
||||
fn handle_allocate_request(
|
||||
&mut self,
|
||||
request: Allocate,
|
||||
sender: SocketAddr,
|
||||
now: SystemTime,
|
||||
) -> Result<(), Message<Attribute>> {
|
||||
let effective_lifetime = request.effective_lifetime();
|
||||
|
||||
let span = tracing::error_span!("handle_allocate_request", lifetime = %effective_lifetime.lifetime().as_secs());
|
||||
let _guard = span.enter();
|
||||
|
||||
self.verify_auth(&request, now)?;
|
||||
|
||||
if self.allocations.contains_key(&sender) {
|
||||
@@ -478,6 +459,7 @@ where
|
||||
|
||||
// TODO: Do we need to handle DONT-FRAGMENT?
|
||||
// TODO: Do we need to handle EVEN/ODD-PORT?
|
||||
let effective_lifetime = request.effective_lifetime();
|
||||
|
||||
let allocation = self.create_new_allocation(
|
||||
now,
|
||||
@@ -552,17 +534,13 @@ where
|
||||
/// Handle a TURN refresh request.
|
||||
///
|
||||
/// See <https://www.rfc-editor.org/rfc/rfc8656#name-receiving-a-refresh-request> for details.
|
||||
#[tracing::instrument(skip(self, request, now), fields(%sender, lifetime = ?request.effective_lifetime().lifetime()), level = "error")]
|
||||
fn handle_refresh_request(
|
||||
&mut self,
|
||||
request: Refresh,
|
||||
sender: SocketAddr,
|
||||
now: SystemTime,
|
||||
) -> Result<(), Message<Attribute>> {
|
||||
let effective_lifetime = request.effective_lifetime();
|
||||
|
||||
let span = tracing::error_span!("handle_refresh_request", lifetime = %effective_lifetime.lifetime().as_secs());
|
||||
let _guard = span.enter();
|
||||
|
||||
self.verify_auth(&request, now)?;
|
||||
|
||||
// TODO: Verify that this is the correct error code.
|
||||
@@ -571,6 +549,8 @@ where
|
||||
.get_mut(&sender)
|
||||
.ok_or(error_response(AllocationMismatch, &request))?;
|
||||
|
||||
let effective_lifetime = request.effective_lifetime();
|
||||
|
||||
if effective_lifetime.lifetime().is_zero() {
|
||||
let id = allocation.id;
|
||||
|
||||
@@ -609,19 +589,13 @@ where
|
||||
/// Handle a TURN channel bind request.
|
||||
///
|
||||
/// See <https://www.rfc-editor.org/rfc/rfc8656#name-receiving-a-channelbind-req> for details.
|
||||
#[tracing::instrument(skip(self, request, now), fields(%sender, peer_address = %request.xor_peer_address().address(), requested_channel = %request.channel_number().value(), allocation), level = "error")]
|
||||
fn handle_channel_bind_request(
|
||||
&mut self,
|
||||
request: ChannelBind,
|
||||
sender: SocketAddr,
|
||||
now: SystemTime,
|
||||
) -> Result<(), Message<Attribute>> {
|
||||
// Note: `channel_number` is enforced to be in the correct range.
|
||||
let requested_channel = request.channel_number().value();
|
||||
let peer_address = request.xor_peer_address().address();
|
||||
|
||||
let span = tracing::error_span!("handle_channel_bind_request", %requested_channel, %peer_address, allocation = field::Empty);
|
||||
let _guard = span.enter();
|
||||
|
||||
self.verify_auth(&request, now)?;
|
||||
|
||||
let allocation = self
|
||||
@@ -629,7 +603,11 @@ where
|
||||
.get_mut(&sender)
|
||||
.ok_or(error_response(AllocationMismatch, &request))?;
|
||||
|
||||
span.record("allocation", allocation.id.to_string());
|
||||
Span::current().record("allocation", allocation.id.to_string());
|
||||
|
||||
// Note: `channel_number` is enforced to be in the correct range.
|
||||
let requested_channel = request.channel_number().value();
|
||||
let peer_address = request.xor_peer_address().address();
|
||||
|
||||
// Check that our allocation can handle the requested peer addr.
|
||||
if !allocation.can_relay_to(peer_address) {
|
||||
@@ -690,6 +668,7 @@ where
|
||||
///
|
||||
/// This TURN server implementation does not support relaying data other than through channels.
|
||||
/// Thus, creating a permission is a no-op that always succeeds.
|
||||
#[tracing::instrument(skip(self, message, now), fields(%sender), level = "error")]
|
||||
fn handle_create_permission_request(
|
||||
&mut self,
|
||||
message: CreatePermission,
|
||||
@@ -706,13 +685,16 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_channel_data_message(&mut self, message: ChannelData, _: SocketAddr, _: SystemTime) {
|
||||
#[tracing::instrument(skip(self, message), fields(allocation_id, %sender, channel = %message.channel(), recipient), level = "error")]
|
||||
fn handle_channel_data_message(
|
||||
&mut self,
|
||||
message: ChannelData,
|
||||
sender: SocketAddr,
|
||||
_: SystemTime,
|
||||
) {
|
||||
let channel_number = message.channel();
|
||||
let data = message.data();
|
||||
|
||||
let span = tracing::error_span!("handle_channel_data_message", channel = %channel_number, recipient = field::Empty);
|
||||
let _guard = span.enter();
|
||||
|
||||
let Some(channel) = self.channels_by_number.get(&channel_number) else {
|
||||
tracing::debug!(target: "relay", "Channel does not exist, refusing to forward data");
|
||||
return;
|
||||
@@ -727,7 +709,7 @@ where
|
||||
}
|
||||
|
||||
let recipient = channel.peer_address;
|
||||
span.record("recipient", field::display(&recipient));
|
||||
Span::current().record("recipient", field::display(&recipient));
|
||||
|
||||
tracing::debug!(target: "relay", "Relaying {} bytes", data.len());
|
||||
|
||||
|
||||
Reference in New Issue
Block a user