diff --git a/rust/Cargo.lock b/rust/Cargo.lock index a18d1ec28..11ac0b40f 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1989,6 +1989,7 @@ dependencies = [ "env_logger", "futures", "hex", + "hex-display", "http-health-check", "mio", "once_cell", @@ -2698,6 +2699,12 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "hex-display" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b53d6a634507c5d9fdee77261ae54a8d1ff7887f5304389025b03c3292a1756" + [[package]] name = "hickory-proto" version = "0.24.0" @@ -5764,6 +5771,7 @@ dependencies = [ "bytes", "firezone-relay", "hex", + "hex-display", "ip-packet", "once_cell", "rand 0.8.5", diff --git a/rust/connlib/snownet/Cargo.toml b/rust/connlib/snownet/Cargo.toml index 5f217d14a..8f6a02875 100644 --- a/rust/connlib/snownet/Cargo.toml +++ b/rust/connlib/snownet/Cargo.toml @@ -17,6 +17,7 @@ bytes = "1.4.0" once_cell = "1.17.1" backoff = "0.4.0" hex = "0.4.0" +hex-display = "0.3.0" [dev-dependencies] tracing-subscriber = {version = "0.3", features = ["env-filter"]} diff --git a/rust/connlib/snownet/src/allocation.rs b/rust/connlib/snownet/src/allocation.rs index 0ff76252f..fde3bff74 100644 --- a/rust/connlib/snownet/src/allocation.rs +++ b/rust/connlib/snownet/src/allocation.rs @@ -6,6 +6,7 @@ use crate::{ }; use ::backoff::backoff::Backoff; use bytecodec::{DecodeExt as _, EncodeExt as _}; +use hex_display::HexDisplayExt as _; use rand::random; use std::{ borrow::Cow, @@ -262,7 +263,7 @@ impl Allocation { /// Refresh this allocation. /// /// In case refreshing the allocation fails, we will attempt to make a new one. - #[tracing::instrument(level = "debug", skip_all, fields(relay = ?self.active_socket))] + #[tracing::instrument(level = "debug", skip_all, fields(active_socket = ?self.active_socket))] pub fn refresh(&mut self, now: Instant) { self.update_now(now); @@ -286,7 +287,7 @@ impl Allocation { self.send_binding_requests(); } - #[tracing::instrument(level = "debug", skip_all, fields(id, method, class, rtt))] + #[tracing::instrument(level = "debug", skip_all, fields(tid, method, class, rtt))] pub fn handle_input( &mut self, from: SocketAddr, @@ -312,7 +313,10 @@ impl Allocation { let transaction_id = message.transaction_id(); - Span::current().record("id", field::debug(transaction_id)); + Span::current().record( + "tid", + field::display(format_args!("{:X}", transaction_id.as_bytes().hex())), + ); Span::current().record("method", field::display(message.method())); Span::current().record("class", field::display(message.class())); @@ -433,13 +437,17 @@ impl Allocation { // We send 2 BINDING requests to start with (one for each IP version) and the first one coming back wins. // Thus, if we already have a socket set, we are done with processing this binding request. - if self.active_socket.is_some() { + if let Some(active_socket) = self.active_socket { + tracing::debug!(%active_socket, additional_socket = %original_dst, "Relay supports dual-stack but we've already picked a socket"); + return true; } // If the socket isn't set yet, use the `original_dst` as the primary socket. self.active_socket = Some(original_dst); + tracing::debug!(active_socket = %original_dst, "Updating active socket"); + if self.has_allocation() { self.authenticate_and_queue(make_refresh_request(), None); } else { @@ -549,7 +557,7 @@ impl Allocation { Some((peer, payload, socket)) } - #[tracing::instrument(level = "debug", skip_all, fields(relay = ?self.active_socket))] + #[tracing::instrument(level = "debug", skip_all, fields(active_socket = ?self.active_socket))] pub fn handle_timeout(&mut self, now: Instant) { self.update_now(now); @@ -619,11 +627,7 @@ impl Allocation { } pub fn poll_event(&mut self) -> Option { - let next_event = self.events.pop_front()?; - - tracing::debug!(?next_event); - - Some(next_event) + self.events.pop_front() } pub fn poll_transmit(&mut self) -> Option> { @@ -644,7 +648,7 @@ impl Allocation { earliest_timeout } - #[tracing::instrument(level = "debug", skip(self, now), fields(relay = ?self.active_socket))] + #[tracing::instrument(level = "debug", skip(self, now), fields(active_socket = ?self.active_socket))] pub fn bind_channel(&mut self, peer: SocketAddr, now: Instant) { if self.is_suspended() { tracing::debug!("Allocation is suspended"); @@ -757,10 +761,10 @@ impl Allocation { fn log_update(&self) { tracing::info!( - srflx_ip4 = ?self.ip4_srflx_candidate, - srflx_ip6 = ?self.ip6_srflx_candidate, - relay_ip4 = ?self.ip4_allocation, - relay_ip6 = ?self.ip6_allocation, + srflx_ip4 = ?self.ip4_srflx_candidate.as_ref().map(|c| c.addr()), + srflx_ip6 = ?self.ip6_srflx_candidate.as_ref().map(|c| c.addr()), + relay_ip4 = ?self.ip4_allocation.as_ref().map(|c| c.addr()), + relay_ip6 = ?self.ip6_allocation.as_ref().map(|c| c.addr()), lifetime = ?self.allocation_lifetime, "Updated allocation" ); diff --git a/rust/connlib/snownet/src/node.rs b/rust/connlib/snownet/src/node.rs index b8d7cf17d..0c937fd03 100644 --- a/rust/connlib/snownet/src/node.rs +++ b/rust/connlib/snownet/src/node.rs @@ -218,8 +218,8 @@ where Ok(()) } - #[tracing::instrument(level = "info", skip_all, fields(%id))] - pub fn add_remote_candidate(&mut self, id: TId, candidate: String, now: Instant) { + #[tracing::instrument(level = "info", skip_all, fields(%cid))] + pub fn add_remote_candidate(&mut self, cid: TId, candidate: String, now: Instant) { let candidate = match Candidate::from_sdp_string(&candidate) { Ok(c) => c, Err(e) => { @@ -228,7 +228,7 @@ where } }; - if let Some(agent) = self.connections.agent_mut(id) { + if let Some(agent) = self.connections.agent_mut(cid) { agent.add_remote_candidate(candidate.clone()); } @@ -255,8 +255,8 @@ where } } - #[tracing::instrument(level = "info", skip_all, fields(%id))] - pub fn remove_remote_candidate(&mut self, id: TId, candidate: String) { + #[tracing::instrument(level = "info", skip_all, fields(%cid))] + pub fn remove_remote_candidate(&mut self, cid: TId, candidate: String) { let candidate = match Candidate::from_sdp_string(&candidate) { Ok(c) => c, Err(e) => { @@ -265,7 +265,7 @@ where } }; - if let Some(agent) = self.connections.agent_mut(id) { + if let Some(agent) = self.connections.agent_mut(cid) { agent.invalidate_candidate(&candidate); } } @@ -291,7 +291,7 @@ where /// - `Ok(None)` if the packet was handled internally, for example, a response from a TURN server. /// - `Ok(Some)` if the packet was an encrypted wireguard packet from a peer. /// The `Option` contains the connection on which the packet was decrypted. - #[tracing::instrument(level = "debug", skip_all, fields(%from, num_bytes = %packet.len()))] + #[tracing::instrument(level = "debug", skip_all, fields(%from))] pub fn decapsulate<'s>( &mut self, local: SocketAddr, @@ -455,9 +455,9 @@ where } self.allocations - .retain(|id, allocation| match allocation.can_be_freed() { + .retain(|rid, allocation| match allocation.can_be_freed() { Some(e) => { - tracing::info!(%id, "Disconnecting from relay; {e}"); + tracing::info!(%rid, "Disconnecting from relay; {e}"); false } @@ -495,29 +495,29 @@ where now: Instant, ) { // First, invalidate all candidates from relays that we should stop using. - for id in to_remove { - let Some(allocation) = self.allocations.remove(&id) else { - tracing::debug!(%id, "Cannot delete unknown allocation"); + for rid in to_remove { + let Some(allocation) = self.allocations.remove(&rid) else { + tracing::debug!(%rid, "Cannot delete unknown allocation"); continue; }; - for (id, agent) in self.connections.agents_mut() { - let _span = info_span!("connection", %id).entered(); + for (cid, agent) in self.connections.agents_mut() { + let _span = info_span!("connection", %cid).entered(); for candidate in allocation .current_candidates() .filter(|c| c.kind() == CandidateKind::Relayed) { - remove_local_candidate(id, agent, &candidate, &mut self.pending_events); + remove_local_candidate(cid, agent, &candidate, &mut self.pending_events); } } - tracing::info!(%id, address = ?allocation.server(), "Removed TURN server"); + tracing::info!(%rid, address = ?allocation.server(), "Removed TURN server"); } // Second, upsert all new relays. - for (id, server, username, password, realm) in to_add { + for (rid, server, username, password, realm) in to_add { let Ok(username) = Username::new(username.to_owned()) else { tracing::debug!(%username, "Invalid TURN username"); continue; @@ -527,17 +527,17 @@ where continue; }; - if let Some(existing) = self.allocations.get_mut(id) { + if let Some(existing) = self.allocations.get_mut(rid) { existing.update_credentials(*server, username, password, realm, now); continue; } self.allocations.insert( - *id, + *rid, Allocation::new(*server, username, password.clone(), realm, now), ); - tracing::info!(%id, address = ?server, "Added new TURN server"); + tracing::info!(%rid, address = ?server, "Added new TURN server"); } } @@ -596,10 +596,10 @@ where return Ok(()); } - for (id, agent) in self.connections.agents_mut() { - let _span = info_span!("connection", %id).entered(); + for (cid, agent) in self.connections.agents_mut() { + let _span = info_span!("connection", %cid).entered(); - add_local_candidate(id, agent, host_candidate.clone(), &mut self.pending_events); + add_local_candidate(cid, agent, host_candidate.clone(), &mut self.pending_events); } Ok(()) @@ -684,8 +684,8 @@ where return ControlFlow::Continue(()); }; - for (id, agent) in self.connections.agents_mut() { - let _span = info_span!("connection", %id).entered(); + for (cid, agent) in self.connections.agents_mut() { + let _span = info_span!("connection", %cid).entered(); if agent.accepts_message(&message) { agent.handle_packet( @@ -715,8 +715,8 @@ where buffer: &'b mut [u8], now: Instant, ) -> ControlFlow, (TId, MutableIpPacket<'b>)> { - for (id, conn) in self.connections.iter_established_mut() { - let _span = info_span!("connection", %id).entered(); + for (cid, conn) in self.connections.iter_established_mut() { + let _span = info_span!("connection", %cid).entered(); if !conn.accepts(&from) { continue; @@ -739,11 +739,11 @@ where tracing::info!(duration_since_intent = ?conn.duration_since_intent(now), "Completed wireguard handshake"); self.pending_events - .push_back(Event::ConnectionEstablished(id)) + .push_back(Event::ConnectionEstablished(cid)) } return match control_flow { - ControlFlow::Continue(c) => ControlFlow::Continue((id, c)), + ControlFlow::Continue(c) => ControlFlow::Continue((cid, c)), ControlFlow::Break(b) => ControlFlow::Break(b), }; } @@ -769,10 +769,10 @@ where ); } CandidateEvent::Invalid(candidate) => { - for (id, agent) in self.connections.agents_mut() { - let _span = info_span!("connection", %id).entered(); + for (cid, agent) in self.connections.agents_mut() { + let _span = info_span!("connection", %cid).entered(); - remove_local_candidate(id, agent, &candidate, &mut self.pending_events); + remove_local_candidate(cid, agent, &candidate, &mut self.pending_events); } } } @@ -789,14 +789,14 @@ where /// /// Out of all configured STUN and TURN servers, the connection will only use the ones provided here. /// The returned [`Offer`] must be passed to the remote via a signalling channel. - #[tracing::instrument(level = "info", skip_all, fields(%id))] + #[tracing::instrument(level = "info", skip_all, fields(%cid))] #[must_use] - pub fn new_connection(&mut self, id: TId, intent_sent_at: Instant, now: Instant) -> Offer { - if self.connections.initial.remove(&id).is_some() { + pub fn new_connection(&mut self, cid: TId, intent_sent_at: Instant, now: Instant) -> Offer { + if self.connections.initial.remove(&cid).is_some() { tracing::info!("Replacing existing initial connection"); }; - if self.connections.established.remove(&id).is_some() { + if self.connections.established.remove(&cid).is_some() { tracing::info!("Replacing existing established connection"); }; @@ -825,7 +825,7 @@ where }; let duration_since_intent = initial_connection.duration_since_intent(now); - let existing = self.connections.initial.insert(id, initial_connection); + let existing = self.connections.initial.insert(cid, initial_connection); debug_assert!(existing.is_none()); tracing::info!(?duration_since_intent, "Establishing new connection"); @@ -839,9 +839,9 @@ where } /// Accept an [`Answer`] from the remote for a connection previously created via [`Node::new_connection`]. - #[tracing::instrument(level = "info", skip_all, fields(%id))] - pub fn accept_answer(&mut self, id: TId, remote: PublicKey, answer: Answer, now: Instant) { - let Some(initial) = self.connections.initial.remove(&id) else { + #[tracing::instrument(level = "info", skip_all, fields(%cid))] + pub fn accept_answer(&mut self, cid: TId, remote: PublicKey, answer: Answer, now: Instant) { + let Some(initial) = self.connections.initial.remove(&cid) else { tracing::debug!("No initial connection state, ignoring answer"); // This can happen if the connection setup timed out. return; }; @@ -852,7 +852,7 @@ where pass: answer.credentials.password, }); - self.seed_agent_with_local_candidates(id, &mut agent); + self.seed_agent_with_local_candidates(cid, &mut agent); let connection = self.init_connection( agent, @@ -863,7 +863,7 @@ where ); let duration_since_intent = connection.duration_since_intent(now); - let existing = self.connections.established.insert(id, connection); + let existing = self.connections.established.insert(cid, connection); tracing::info!(?duration_since_intent, remote = %hex::encode(remote.as_bytes()), "Signalling protocol completed"); @@ -880,21 +880,21 @@ where /// /// Out of all configured STUN and TURN servers, the connection will only use the ones provided here. /// The returned [`Answer`] must be passed to the remote via a signalling channel. - #[tracing::instrument(level = "info", skip_all, fields(%id))] + #[tracing::instrument(level = "info", skip_all, fields(%cid))] #[must_use] pub fn accept_connection( &mut self, - id: TId, + cid: TId, offer: Offer, remote: PublicKey, now: Instant, ) -> Answer { debug_assert!( - !self.connections.initial.contains_key(&id), + !self.connections.initial.contains_key(&cid), "server to not use `initial_connections`" ); - if self.connections.established.remove(&id).is_some() { + if self.connections.established.remove(&cid).is_some() { tracing::info!("Replacing existing established connection"); }; @@ -913,7 +913,7 @@ where }, }; - self.seed_agent_with_local_candidates(id, &mut agent); + self.seed_agent_with_local_candidates(cid, &mut agent); let connection = self.init_connection( agent, @@ -922,7 +922,7 @@ where now, // Technically, this isn't fully correct because gateways don't send intents so we just use the current time. now, ); - let existing = self.connections.established.insert(id, connection); + let existing = self.connections.established.insert(cid, connection); debug_assert!(existing.is_none()); @@ -1099,10 +1099,10 @@ fn add_local_candidate_to_all( .iter_mut() .map(|(id, c)| (*id, &mut c.agent)); - for (id, agent) in initial_connections.chain(established_connections) { - let _span = info_span!("connection", %id).entered(); + for (cid, agent) in initial_connections.chain(established_connections) { + let _span = info_span!("connection", %cid).entered(); - add_local_candidate(id, agent, candidate.clone(), pending_events); + add_local_candidate(cid, agent, candidate.clone(), pending_events); } } @@ -1257,8 +1257,8 @@ struct InitialConnection { } impl InitialConnection { - #[tracing::instrument(level = "debug", skip_all, fields(%id))] - fn handle_timeout(&mut self, id: TId, now: Instant) + #[tracing::instrument(level = "debug", skip_all, fields(%cid))] + fn handle_timeout(&mut self, cid: TId, now: Instant) where TId: fmt::Display, { @@ -1422,10 +1422,10 @@ where self.last_incoming.max(self.last_outgoing) + MAX_IDLE } - #[tracing::instrument(level = "info", skip_all, fields(%id))] + #[tracing::instrument(level = "info", skip_all, fields(%cid))] fn handle_timeout( &mut self, - id: TId, + cid: TId, now: Instant, allocations: &mut HashMap, transmits: &mut VecDeque>, diff --git a/rust/relay/Cargo.toml b/rust/relay/Cargo.toml index aeb4b7525..4afbd4e2b 100644 --- a/rust/relay/Cargo.toml +++ b/rust/relay/Cargo.toml @@ -36,6 +36,7 @@ socket2 = "0.5.7" backoff = "0.4" http-health-check = { workspace = true } mio = "0.8.11" +hex-display = "0.3.0" [dev-dependencies] difference = "2.0.0" diff --git a/rust/relay/src/server.rs b/rust/relay/src/server.rs index 70bc2ce33..1f4bcc541 100644 --- a/rust/relay/src/server.rs +++ b/rust/relay/src/server.rs @@ -12,6 +12,7 @@ use crate::{ClientSocket, IpStack, PeerSocket}; use anyhow::Result; use bytecodec::EncodeExt; use core::fmt; +use hex_display::HexDisplayExt as _; use opentelemetry::metrics::{Counter, Unit, UpDownCounter}; use opentelemetry::KeyValue; use rand::Rng; @@ -403,7 +404,7 @@ where } } - #[tracing::instrument(level = "info", skip_all, fields(transaction_id = ?request.transaction_id(), %sender))] + #[tracing::instrument(level = "info", skip_all, fields(tid = %format_args!("{:X}", request.transaction_id().as_bytes().hex()), %sender))] fn handle_binding_request(&mut self, request: Binding, sender: ClientSocket) { let mut message = Message::new( MessageClass::SuccessResponse, @@ -420,7 +421,7 @@ where /// Handle a TURN allocate request. /// /// See for details. - #[tracing::instrument(level = "info", skip_all, fields(allocation, transaction_id = ?request.transaction_id(), %sender))] + #[tracing::instrument(level = "info", skip_all, fields(allocation, tid = %format_args!("{:X}", request.transaction_id().as_bytes().hex()), %sender))] fn handle_allocate_request( &mut self, request: Allocate, @@ -539,7 +540,7 @@ where /// Handle a TURN refresh request. /// /// See for details. - #[tracing::instrument(level = "info", skip_all, fields(allocation, transaction_id = ?request.transaction_id(), %sender))] + #[tracing::instrument(level = "info", skip_all, fields(allocation, tid = %format_args!("{:X}", request.transaction_id().as_bytes().hex()), %sender))] fn handle_refresh_request( &mut self, request: Refresh, @@ -588,7 +589,7 @@ where /// Handle a TURN channel bind request. /// /// See for details. - #[tracing::instrument(level = "info", skip_all, fields(allocation, peer, channel, transaction_id = ?request.transaction_id(), %sender))] + #[tracing::instrument(level = "info", skip_all, fields(allocation, peer, channel, tid = %format_args!("{:X}", request.transaction_id().as_bytes().hex()), %sender))] fn handle_channel_bind_request( &mut self, request: ChannelBind, @@ -692,7 +693,7 @@ where /// /// This TURN server implementation does not support relaying data other than through channels. /// Thus, creating a permission is a no-op that always succeeds. - #[tracing::instrument(level = "info", skip_all, fields(transaction_id = ?request.transaction_id(), %sender))] + #[tracing::instrument(level = "info", skip_all, fields(tid = %format_args!("{:X}", request.transaction_id().as_bytes().hex()), %sender))] fn handle_create_permission_request( &mut self, request: CreatePermission,