feat(snownet): introduce connection span to capture str0m and boringtun logs (#4144)

These new spans help capture logs from within `str0m` and `boringtun`
with the context of our connection ID.

Resolves: #4140.
This commit is contained in:
Thomas Eizinger
2024-03-15 07:38:34 +11:00
committed by GitHub
parent d6827c046a
commit 0dfd26a014

View File

@@ -30,7 +30,7 @@ use std::borrow::Cow;
use std::iter;
use std::ops::ControlFlow;
use stun_codec::rfc5389::attributes::{Realm, Username};
use tracing::{field, Span};
use tracing::{field, info_span, Span};
// Note: Taken from boringtun
const HANDSHAKE_RATE_LIMIT: u64 = 100;
@@ -179,7 +179,7 @@ where
Ok(())
}
#[tracing::instrument(level = "debug", skip_all, fields(%id))]
#[tracing::instrument(level = "info", skip_all, fields(%id))]
pub fn add_remote_candidate(&mut self, id: TId, candidate: String, now: Instant) {
let candidate = match Candidate::from_sdp_string(&candidate) {
Ok(c) => c,
@@ -498,13 +498,10 @@ where
return Ok(());
}
for (conn, agent) in self.connections.agents_mut() {
add_local_candidate(
conn,
agent,
host_candidate.clone(),
&mut self.pending_events,
);
for (id, agent) in self.connections.agents_mut() {
let _span = info_span!("connection", %id).entered();
add_local_candidate(id, agent, host_candidate.clone(), &mut self.pending_events);
}
Ok(())
@@ -581,7 +578,9 @@ where
return ControlFlow::Continue(());
};
for (_, agent) in self.connections.agents_mut() {
for (id, agent) in self.connections.agents_mut() {
let _span = info_span!("connection", %id).entered();
if agent.accepts_message(&message) {
agent.handle_packet(
now,
@@ -613,6 +612,8 @@ where
now: Instant,
) -> ControlFlow<Result<(), Error>, (TId, MutableIpPacket<'b>)> {
for (id, conn) in self.connections.iter_established_mut() {
let _span = info_span!("connection", %id).entered();
if !conn.accepts(from) {
continue;
}
@@ -634,7 +635,7 @@ where
// I can't think of a better way to detect this ...
if !handshake_complete_before_decapsulate && handshake_complete_after_decapsulate {
tracing::info!(%id, duration_since_intent = ?conn.duration_since_intent(now), "Completed wireguard handshake");
tracing::info!(duration_since_intent = ?conn.duration_since_intent(now), "Completed wireguard handshake");
self.pending_events
.push_back(Event::ConnectionEstablished(id))
@@ -665,7 +666,7 @@ where
for (server, event) in binding_events.chain(allocation_events) {
match event {
CandidateEvent::New(candidate) => {
add_candidates(
add_local_candidate_to_all(
server,
candidate,
&mut self.connections,
@@ -673,7 +674,8 @@ where
);
}
CandidateEvent::Invalid(candidate) => {
for (_, agent) in self.connections.agents_mut() {
for (id, agent) in self.connections.agents_mut() {
let _span = info_span!("connection", %id).entered();
agent.invalidate_candidate(&candidate);
}
}
@@ -977,7 +979,7 @@ impl<TId> Default for Connections<TId> {
impl<TId> Connections<TId>
where
TId: Eq + Hash + Copy,
TId: Eq + Hash + Copy + fmt::Display,
{
fn remove_failed(&mut self, events: &mut VecDeque<Event<TId>>) {
self.initial.retain(|id, conn| {
@@ -1084,7 +1086,7 @@ enum EncodeError {
NoChannel,
}
fn add_candidates<TId>(
fn add_local_candidate_to_all<TId>(
server: SocketAddr,
candidate: Candidate,
connections: &mut Connections<TId>,
@@ -1104,16 +1106,18 @@ fn add_candidates<TId>(
for (id, allowed_stun, allowed_turn, agent) in
initial_connections.chain(established_connections)
{
let _span = info_span!("connection", %id).entered();
match candidate.kind() {
CandidateKind::ServerReflexive => {
if (!allowed_stun.contains(&server)) && (!allowed_turn.contains(&server)) {
tracing::debug!(%id, %server, ?allowed_stun, ?allowed_turn, "Not adding srflx candidate");
tracing::debug!(%server, ?allowed_stun, ?allowed_turn, "Not adding srflx candidate");
continue;
}
}
CandidateKind::Relayed => {
if !allowed_turn.contains(&server) {
tracing::debug!(%id, %server, ?allowed_turn, "Not adding relay candidate");
tracing::debug!(%server, ?allowed_turn, "Not adding relay candidate");
continue;
}
@@ -1130,7 +1134,9 @@ fn add_local_candidate<TId>(
agent: &mut IceAgent,
candidate: Candidate,
pending_events: &mut VecDeque<Event<TId>>,
) {
) where
TId: fmt::Display,
{
let is_new = agent.add_local_candidate(candidate.clone());
if is_new {
@@ -1219,12 +1225,13 @@ struct InitialConnection {
}
impl InitialConnection {
#[tracing::instrument(level = "debug", skip_all, fields(%id))]
fn handle_timeout<TId>(&mut self, id: TId, now: Instant)
where
TId: fmt::Display,
{
if now.duration_since(self.created_at) >= HANDSHAKE_TIMEOUT {
tracing::info!(%id, "Connection setup timed out (no answer received)");
tracing::info!("Connection setup timed out (no answer received)");
self.is_failed = true;
}
}
@@ -1348,6 +1355,7 @@ impl Connection {
Some(self.signalling_completed_at + CANDIDATE_TIMEOUT)
}
#[tracing::instrument(level = "info", skip_all, fields(%id))]
fn handle_timeout<TId>(
&mut self,
id: TId,
@@ -1363,7 +1371,7 @@ impl Connection {
.candidate_timeout()
.is_some_and(|timeout| now >= timeout)
{
tracing::info!(%id, "Connection failed (no candidates received)");
tracing::info!("Connection failed (no candidates received)");
self.is_failed = true;
return;
}
@@ -1388,11 +1396,11 @@ impl Connection {
match self.tunnel.update_timers(&mut buf) {
TunnResult::Done => {}
TunnResult::Err(WireGuardError::ConnectionExpired) => {
tracing::info!(%id, "Connection failed (wireguard tunnel expired)");
tracing::info!("Connection failed (wireguard tunnel expired)");
self.is_failed = true;
}
TunnResult::Err(e) => {
tracing::warn!(%id, ?e);
tracing::warn!(?e);
}
TunnResult::WriteToNetwork(b) => {
transmits.extend(make_owned_transmit(peer_socket, b, allocations, now));
@@ -1407,7 +1415,7 @@ impl Connection {
self.possible_sockets.insert(source);
}
IceAgentEvent::IceConnectionStateChange(IceConnectionState::Disconnected) => {
tracing::info!(%id, "Connection failed (ICE timeout)");
tracing::info!("Connection failed (ICE timeout)");
self.is_failed = true;
}
IceAgentEvent::NominatedSend {