chore(connlib): minor logging tweaks (#5746)

Noticed a few things that caused unnecessary verbosity in the logs.
This commit is contained in:
Thomas Eizinger
2024-07-06 00:45:32 +10:00
committed by GitHub
parent 90ea603584
commit 28d5b8574c
6 changed files with 90 additions and 75 deletions

8
rust/Cargo.lock generated
View File

@@ -1989,6 +1989,7 @@ dependencies = [
"env_logger",
"futures",
"hex",
"hex-display",
"http-health-check",
"mio",
"once_cell",
@@ -2698,6 +2699,12 @@ version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]]
name = "hex-display"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b53d6a634507c5d9fdee77261ae54a8d1ff7887f5304389025b03c3292a1756"
[[package]]
name = "hickory-proto"
version = "0.24.0"
@@ -5764,6 +5771,7 @@ dependencies = [
"bytes",
"firezone-relay",
"hex",
"hex-display",
"ip-packet",
"once_cell",
"rand 0.8.5",

View File

@@ -17,6 +17,7 @@ bytes = "1.4.0"
once_cell = "1.17.1"
backoff = "0.4.0"
hex = "0.4.0"
hex-display = "0.3.0"
[dev-dependencies]
tracing-subscriber = {version = "0.3", features = ["env-filter"]}

View File

@@ -6,6 +6,7 @@ use crate::{
};
use ::backoff::backoff::Backoff;
use bytecodec::{DecodeExt as _, EncodeExt as _};
use hex_display::HexDisplayExt as _;
use rand::random;
use std::{
borrow::Cow,
@@ -262,7 +263,7 @@ impl Allocation {
/// Refresh this allocation.
///
/// In case refreshing the allocation fails, we will attempt to make a new one.
#[tracing::instrument(level = "debug", skip_all, fields(relay = ?self.active_socket))]
#[tracing::instrument(level = "debug", skip_all, fields(active_socket = ?self.active_socket))]
pub fn refresh(&mut self, now: Instant) {
self.update_now(now);
@@ -286,7 +287,7 @@ impl Allocation {
self.send_binding_requests();
}
#[tracing::instrument(level = "debug", skip_all, fields(id, method, class, rtt))]
#[tracing::instrument(level = "debug", skip_all, fields(tid, method, class, rtt))]
pub fn handle_input(
&mut self,
from: SocketAddr,
@@ -312,7 +313,10 @@ impl Allocation {
let transaction_id = message.transaction_id();
Span::current().record("id", field::debug(transaction_id));
Span::current().record(
"tid",
field::display(format_args!("{:X}", transaction_id.as_bytes().hex())),
);
Span::current().record("method", field::display(message.method()));
Span::current().record("class", field::display(message.class()));
@@ -433,13 +437,17 @@ impl Allocation {
// We send 2 BINDING requests to start with (one for each IP version) and the first one coming back wins.
// Thus, if we already have a socket set, we are done with processing this binding request.
if self.active_socket.is_some() {
if let Some(active_socket) = self.active_socket {
tracing::debug!(%active_socket, additional_socket = %original_dst, "Relay supports dual-stack but we've already picked a socket");
return true;
}
// If the socket isn't set yet, use the `original_dst` as the primary socket.
self.active_socket = Some(original_dst);
tracing::debug!(active_socket = %original_dst, "Updating active socket");
if self.has_allocation() {
self.authenticate_and_queue(make_refresh_request(), None);
} else {
@@ -549,7 +557,7 @@ impl Allocation {
Some((peer, payload, socket))
}
#[tracing::instrument(level = "debug", skip_all, fields(relay = ?self.active_socket))]
#[tracing::instrument(level = "debug", skip_all, fields(active_socket = ?self.active_socket))]
pub fn handle_timeout(&mut self, now: Instant) {
self.update_now(now);
@@ -619,11 +627,7 @@ impl Allocation {
}
pub fn poll_event(&mut self) -> Option<CandidateEvent> {
let next_event = self.events.pop_front()?;
tracing::debug!(?next_event);
Some(next_event)
self.events.pop_front()
}
pub fn poll_transmit(&mut self) -> Option<Transmit<'static>> {
@@ -644,7 +648,7 @@ impl Allocation {
earliest_timeout
}
#[tracing::instrument(level = "debug", skip(self, now), fields(relay = ?self.active_socket))]
#[tracing::instrument(level = "debug", skip(self, now), fields(active_socket = ?self.active_socket))]
pub fn bind_channel(&mut self, peer: SocketAddr, now: Instant) {
if self.is_suspended() {
tracing::debug!("Allocation is suspended");
@@ -757,10 +761,10 @@ impl Allocation {
fn log_update(&self) {
tracing::info!(
srflx_ip4 = ?self.ip4_srflx_candidate,
srflx_ip6 = ?self.ip6_srflx_candidate,
relay_ip4 = ?self.ip4_allocation,
relay_ip6 = ?self.ip6_allocation,
srflx_ip4 = ?self.ip4_srflx_candidate.as_ref().map(|c| c.addr()),
srflx_ip6 = ?self.ip6_srflx_candidate.as_ref().map(|c| c.addr()),
relay_ip4 = ?self.ip4_allocation.as_ref().map(|c| c.addr()),
relay_ip6 = ?self.ip6_allocation.as_ref().map(|c| c.addr()),
lifetime = ?self.allocation_lifetime,
"Updated allocation"
);

View File

@@ -218,8 +218,8 @@ where
Ok(())
}
#[tracing::instrument(level = "info", skip_all, fields(%id))]
pub fn add_remote_candidate(&mut self, id: TId, candidate: String, now: Instant) {
#[tracing::instrument(level = "info", skip_all, fields(%cid))]
pub fn add_remote_candidate(&mut self, cid: TId, candidate: String, now: Instant) {
let candidate = match Candidate::from_sdp_string(&candidate) {
Ok(c) => c,
Err(e) => {
@@ -228,7 +228,7 @@ where
}
};
if let Some(agent) = self.connections.agent_mut(id) {
if let Some(agent) = self.connections.agent_mut(cid) {
agent.add_remote_candidate(candidate.clone());
}
@@ -255,8 +255,8 @@ where
}
}
#[tracing::instrument(level = "info", skip_all, fields(%id))]
pub fn remove_remote_candidate(&mut self, id: TId, candidate: String) {
#[tracing::instrument(level = "info", skip_all, fields(%cid))]
pub fn remove_remote_candidate(&mut self, cid: TId, candidate: String) {
let candidate = match Candidate::from_sdp_string(&candidate) {
Ok(c) => c,
Err(e) => {
@@ -265,7 +265,7 @@ where
}
};
if let Some(agent) = self.connections.agent_mut(id) {
if let Some(agent) = self.connections.agent_mut(cid) {
agent.invalidate_candidate(&candidate);
}
}
@@ -291,7 +291,7 @@ where
/// - `Ok(None)` if the packet was handled internally, for example, a response from a TURN server.
/// - `Ok(Some)` if the packet was an encrypted wireguard packet from a peer.
/// The `Option` contains the connection on which the packet was decrypted.
#[tracing::instrument(level = "debug", skip_all, fields(%from, num_bytes = %packet.len()))]
#[tracing::instrument(level = "debug", skip_all, fields(%from))]
pub fn decapsulate<'s>(
&mut self,
local: SocketAddr,
@@ -455,9 +455,9 @@ where
}
self.allocations
.retain(|id, allocation| match allocation.can_be_freed() {
.retain(|rid, allocation| match allocation.can_be_freed() {
Some(e) => {
tracing::info!(%id, "Disconnecting from relay; {e}");
tracing::info!(%rid, "Disconnecting from relay; {e}");
false
}
@@ -495,29 +495,29 @@ where
now: Instant,
) {
// First, invalidate all candidates from relays that we should stop using.
for id in to_remove {
let Some(allocation) = self.allocations.remove(&id) else {
tracing::debug!(%id, "Cannot delete unknown allocation");
for rid in to_remove {
let Some(allocation) = self.allocations.remove(&rid) else {
tracing::debug!(%rid, "Cannot delete unknown allocation");
continue;
};
for (id, agent) in self.connections.agents_mut() {
let _span = info_span!("connection", %id).entered();
for (cid, agent) in self.connections.agents_mut() {
let _span = info_span!("connection", %cid).entered();
for candidate in allocation
.current_candidates()
.filter(|c| c.kind() == CandidateKind::Relayed)
{
remove_local_candidate(id, agent, &candidate, &mut self.pending_events);
remove_local_candidate(cid, agent, &candidate, &mut self.pending_events);
}
}
tracing::info!(%id, address = ?allocation.server(), "Removed TURN server");
tracing::info!(%rid, address = ?allocation.server(), "Removed TURN server");
}
// Second, upsert all new relays.
for (id, server, username, password, realm) in to_add {
for (rid, server, username, password, realm) in to_add {
let Ok(username) = Username::new(username.to_owned()) else {
tracing::debug!(%username, "Invalid TURN username");
continue;
@@ -527,17 +527,17 @@ where
continue;
};
if let Some(existing) = self.allocations.get_mut(id) {
if let Some(existing) = self.allocations.get_mut(rid) {
existing.update_credentials(*server, username, password, realm, now);
continue;
}
self.allocations.insert(
*id,
*rid,
Allocation::new(*server, username, password.clone(), realm, now),
);
tracing::info!(%id, address = ?server, "Added new TURN server");
tracing::info!(%rid, address = ?server, "Added new TURN server");
}
}
@@ -596,10 +596,10 @@ where
return Ok(());
}
for (id, agent) in self.connections.agents_mut() {
let _span = info_span!("connection", %id).entered();
for (cid, agent) in self.connections.agents_mut() {
let _span = info_span!("connection", %cid).entered();
add_local_candidate(id, agent, host_candidate.clone(), &mut self.pending_events);
add_local_candidate(cid, agent, host_candidate.clone(), &mut self.pending_events);
}
Ok(())
@@ -684,8 +684,8 @@ where
return ControlFlow::Continue(());
};
for (id, agent) in self.connections.agents_mut() {
let _span = info_span!("connection", %id).entered();
for (cid, agent) in self.connections.agents_mut() {
let _span = info_span!("connection", %cid).entered();
if agent.accepts_message(&message) {
agent.handle_packet(
@@ -715,8 +715,8 @@ where
buffer: &'b mut [u8],
now: Instant,
) -> ControlFlow<Result<(), Error>, (TId, MutableIpPacket<'b>)> {
for (id, conn) in self.connections.iter_established_mut() {
let _span = info_span!("connection", %id).entered();
for (cid, conn) in self.connections.iter_established_mut() {
let _span = info_span!("connection", %cid).entered();
if !conn.accepts(&from) {
continue;
@@ -739,11 +739,11 @@ where
tracing::info!(duration_since_intent = ?conn.duration_since_intent(now), "Completed wireguard handshake");
self.pending_events
.push_back(Event::ConnectionEstablished(id))
.push_back(Event::ConnectionEstablished(cid))
}
return match control_flow {
ControlFlow::Continue(c) => ControlFlow::Continue((id, c)),
ControlFlow::Continue(c) => ControlFlow::Continue((cid, c)),
ControlFlow::Break(b) => ControlFlow::Break(b),
};
}
@@ -769,10 +769,10 @@ where
);
}
CandidateEvent::Invalid(candidate) => {
for (id, agent) in self.connections.agents_mut() {
let _span = info_span!("connection", %id).entered();
for (cid, agent) in self.connections.agents_mut() {
let _span = info_span!("connection", %cid).entered();
remove_local_candidate(id, agent, &candidate, &mut self.pending_events);
remove_local_candidate(cid, agent, &candidate, &mut self.pending_events);
}
}
}
@@ -789,14 +789,14 @@ where
///
/// Out of all configured STUN and TURN servers, the connection will only use the ones provided here.
/// The returned [`Offer`] must be passed to the remote via a signalling channel.
#[tracing::instrument(level = "info", skip_all, fields(%id))]
#[tracing::instrument(level = "info", skip_all, fields(%cid))]
#[must_use]
pub fn new_connection(&mut self, id: TId, intent_sent_at: Instant, now: Instant) -> Offer {
if self.connections.initial.remove(&id).is_some() {
pub fn new_connection(&mut self, cid: TId, intent_sent_at: Instant, now: Instant) -> Offer {
if self.connections.initial.remove(&cid).is_some() {
tracing::info!("Replacing existing initial connection");
};
if self.connections.established.remove(&id).is_some() {
if self.connections.established.remove(&cid).is_some() {
tracing::info!("Replacing existing established connection");
};
@@ -825,7 +825,7 @@ where
};
let duration_since_intent = initial_connection.duration_since_intent(now);
let existing = self.connections.initial.insert(id, initial_connection);
let existing = self.connections.initial.insert(cid, initial_connection);
debug_assert!(existing.is_none());
tracing::info!(?duration_since_intent, "Establishing new connection");
@@ -839,9 +839,9 @@ where
}
/// Accept an [`Answer`] from the remote for a connection previously created via [`Node::new_connection`].
#[tracing::instrument(level = "info", skip_all, fields(%id))]
pub fn accept_answer(&mut self, id: TId, remote: PublicKey, answer: Answer, now: Instant) {
let Some(initial) = self.connections.initial.remove(&id) else {
#[tracing::instrument(level = "info", skip_all, fields(%cid))]
pub fn accept_answer(&mut self, cid: TId, remote: PublicKey, answer: Answer, now: Instant) {
let Some(initial) = self.connections.initial.remove(&cid) else {
tracing::debug!("No initial connection state, ignoring answer"); // This can happen if the connection setup timed out.
return;
};
@@ -852,7 +852,7 @@ where
pass: answer.credentials.password,
});
self.seed_agent_with_local_candidates(id, &mut agent);
self.seed_agent_with_local_candidates(cid, &mut agent);
let connection = self.init_connection(
agent,
@@ -863,7 +863,7 @@ where
);
let duration_since_intent = connection.duration_since_intent(now);
let existing = self.connections.established.insert(id, connection);
let existing = self.connections.established.insert(cid, connection);
tracing::info!(?duration_since_intent, remote = %hex::encode(remote.as_bytes()), "Signalling protocol completed");
@@ -880,21 +880,21 @@ where
///
/// Out of all configured STUN and TURN servers, the connection will only use the ones provided here.
/// The returned [`Answer`] must be passed to the remote via a signalling channel.
#[tracing::instrument(level = "info", skip_all, fields(%id))]
#[tracing::instrument(level = "info", skip_all, fields(%cid))]
#[must_use]
pub fn accept_connection(
&mut self,
id: TId,
cid: TId,
offer: Offer,
remote: PublicKey,
now: Instant,
) -> Answer {
debug_assert!(
!self.connections.initial.contains_key(&id),
!self.connections.initial.contains_key(&cid),
"server to not use `initial_connections`"
);
if self.connections.established.remove(&id).is_some() {
if self.connections.established.remove(&cid).is_some() {
tracing::info!("Replacing existing established connection");
};
@@ -913,7 +913,7 @@ where
},
};
self.seed_agent_with_local_candidates(id, &mut agent);
self.seed_agent_with_local_candidates(cid, &mut agent);
let connection = self.init_connection(
agent,
@@ -922,7 +922,7 @@ where
now, // Technically, this isn't fully correct because gateways don't send intents so we just use the current time.
now,
);
let existing = self.connections.established.insert(id, connection);
let existing = self.connections.established.insert(cid, connection);
debug_assert!(existing.is_none());
@@ -1099,10 +1099,10 @@ fn add_local_candidate_to_all<TId, RId>(
.iter_mut()
.map(|(id, c)| (*id, &mut c.agent));
for (id, agent) in initial_connections.chain(established_connections) {
let _span = info_span!("connection", %id).entered();
for (cid, agent) in initial_connections.chain(established_connections) {
let _span = info_span!("connection", %cid).entered();
add_local_candidate(id, agent, candidate.clone(), pending_events);
add_local_candidate(cid, agent, candidate.clone(), pending_events);
}
}
@@ -1257,8 +1257,8 @@ struct InitialConnection {
}
impl InitialConnection {
#[tracing::instrument(level = "debug", skip_all, fields(%id))]
fn handle_timeout<TId>(&mut self, id: TId, now: Instant)
#[tracing::instrument(level = "debug", skip_all, fields(%cid))]
fn handle_timeout<TId>(&mut self, cid: TId, now: Instant)
where
TId: fmt::Display,
{
@@ -1422,10 +1422,10 @@ where
self.last_incoming.max(self.last_outgoing) + MAX_IDLE
}
#[tracing::instrument(level = "info", skip_all, fields(%id))]
#[tracing::instrument(level = "info", skip_all, fields(%cid))]
fn handle_timeout<TId>(
&mut self,
id: TId,
cid: TId,
now: Instant,
allocations: &mut HashMap<RId, Allocation>,
transmits: &mut VecDeque<Transmit<'static>>,

View File

@@ -36,6 +36,7 @@ socket2 = "0.5.7"
backoff = "0.4"
http-health-check = { workspace = true }
mio = "0.8.11"
hex-display = "0.3.0"
[dev-dependencies]
difference = "2.0.0"

View File

@@ -12,6 +12,7 @@ use crate::{ClientSocket, IpStack, PeerSocket};
use anyhow::Result;
use bytecodec::EncodeExt;
use core::fmt;
use hex_display::HexDisplayExt as _;
use opentelemetry::metrics::{Counter, Unit, UpDownCounter};
use opentelemetry::KeyValue;
use rand::Rng;
@@ -403,7 +404,7 @@ where
}
}
#[tracing::instrument(level = "info", skip_all, fields(transaction_id = ?request.transaction_id(), %sender))]
#[tracing::instrument(level = "info", skip_all, fields(tid = %format_args!("{:X}", request.transaction_id().as_bytes().hex()), %sender))]
fn handle_binding_request(&mut self, request: Binding, sender: ClientSocket) {
let mut message = Message::new(
MessageClass::SuccessResponse,
@@ -420,7 +421,7 @@ where
/// Handle a TURN allocate request.
///
/// See <https://www.rfc-editor.org/rfc/rfc8656#name-receiving-an-allocate-reque> for details.
#[tracing::instrument(level = "info", skip_all, fields(allocation, transaction_id = ?request.transaction_id(), %sender))]
#[tracing::instrument(level = "info", skip_all, fields(allocation, tid = %format_args!("{:X}", request.transaction_id().as_bytes().hex()), %sender))]
fn handle_allocate_request(
&mut self,
request: Allocate,
@@ -539,7 +540,7 @@ where
/// Handle a TURN refresh request.
///
/// See <https://www.rfc-editor.org/rfc/rfc8656#name-receiving-a-refresh-request> for details.
#[tracing::instrument(level = "info", skip_all, fields(allocation, transaction_id = ?request.transaction_id(), %sender))]
#[tracing::instrument(level = "info", skip_all, fields(allocation, tid = %format_args!("{:X}", request.transaction_id().as_bytes().hex()), %sender))]
fn handle_refresh_request(
&mut self,
request: Refresh,
@@ -588,7 +589,7 @@ where
/// Handle a TURN channel bind request.
///
/// See <https://www.rfc-editor.org/rfc/rfc8656#name-receiving-a-channelbind-req> for details.
#[tracing::instrument(level = "info", skip_all, fields(allocation, peer, channel, transaction_id = ?request.transaction_id(), %sender))]
#[tracing::instrument(level = "info", skip_all, fields(allocation, peer, channel, tid = %format_args!("{:X}", request.transaction_id().as_bytes().hex()), %sender))]
fn handle_channel_bind_request(
&mut self,
request: ChannelBind,
@@ -692,7 +693,7 @@ where
///
/// This TURN server implementation does not support relaying data other than through channels.
/// Thus, creating a permission is a no-op that always succeeds.
#[tracing::instrument(level = "info", skip_all, fields(transaction_id = ?request.transaction_id(), %sender))]
#[tracing::instrument(level = "info", skip_all, fields(tid = %format_args!("{:X}", request.transaction_id().as_bytes().hex()), %sender))]
fn handle_create_permission_request(
&mut self,
request: CreatePermission,