From 71e6b56654d64da9a834e62eacce1716f0603993 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 23 Jul 2025 03:40:33 +1000 Subject: [PATCH] feat(snownet): remove "connection ID" span (#9949) At present, `snownet` uses a `tracing::Span` to attach the connection ID to various log messages. This requires the span to be entered and exited on every packet. Whilst profiling Firezone, I noticed that is takes between 10% and 20% of CPU time on the main thread. Previously, this wasn't a bottleneck as other parts of Firezone were not yet as optimised. With some changes earlier this year of a dedicated UDP thread and better GSO, this does appear to be a bottleneck now. On `main`, I am currently getting the following numbers on my local machine: ``` Connecting to host 172.20.0.110, port 5201 [ 5] local 100.85.16.226 port 42012 connected to 172.20.0.110 port 5201 [ ID] Interval Transfer Bitrate Retr Cwnd [ 5] 0.00-1.00 sec 251 MBytes 2.11 Gbits/sec 16 558 KBytes [ 5] 1.00-2.00 sec 287 MBytes 2.41 Gbits/sec 6 800 KBytes [ 5] 2.00-3.00 sec 284 MBytes 2.38 Gbits/sec 2 992 KBytes [ 5] 3.00-4.00 sec 287 MBytes 2.41 Gbits/sec 3 1.12 MBytes [ 5] 4.00-5.00 sec 290 MBytes 2.44 Gbits/sec 0 1.27 MBytes [ 5] 5.00-6.00 sec 300 MBytes 2.52 Gbits/sec 2 1.40 MBytes [ 5] 6.00-7.00 sec 295 MBytes 2.47 Gbits/sec 2 1.52 MBytes [ 5] 7.00-8.00 sec 304 MBytes 2.55 Gbits/sec 3 1.63 MBytes [ 5] 8.00-9.00 sec 290 MBytes 2.44 Gbits/sec 49 1.21 MBytes [ 5] 9.00-10.00 sec 288 MBytes 2.41 Gbits/sec 24 1023 KBytes - - - - - - - - - - - - - - - - - - - - - - - - - [ ID] Interval Transfer Bitrate Retr [ 5] 0.00-10.00 sec 2.81 GBytes 2.41 Gbits/sec 107 sender [ 5] 0.00-10.00 sec 2.81 GBytes 2.41 Gbits/sec receiver ``` With this patch applied, the throughput goes up significantly: ``` Connecting to host 172.20.0.110, port 5201 [ 5] local 100.85.16.226 port 41402 connected to 172.20.0.110 port 5201 [ ID] Interval Transfer Bitrate Retr Cwnd [ 5] 0.00-1.00 sec 315 MBytes 2.64 Gbits/sec 7 619 KBytes [ 5] 1.00-2.00 sec 363 MBytes 3.05 Gbits/sec 11 847 KBytes [ 5] 2.00-3.00 sec 379 MBytes 3.18 Gbits/sec 1 1.07 MBytes [ 5] 3.00-4.00 sec 384 MBytes 3.22 Gbits/sec 44 981 KBytes [ 5] 4.00-5.00 sec 377 MBytes 3.16 Gbits/sec 116 911 KBytes [ 5] 5.00-6.00 sec 378 MBytes 3.17 Gbits/sec 3 1.10 MBytes [ 5] 6.00-7.00 sec 377 MBytes 3.16 Gbits/sec 48 929 KBytes [ 5] 7.00-8.00 sec 374 MBytes 3.14 Gbits/sec 151 947 KBytes [ 5] 8.00-9.00 sec 382 MBytes 3.21 Gbits/sec 36 833 KBytes [ 5] 9.00-10.00 sec 375 MBytes 3.14 Gbits/sec 1 1.06 MBytes - - - - - - - - - - - - - - - - - - - - - - - - - [ ID] Interval Transfer Bitrate Retr [ 5] 0.00-10.00 sec 3.62 GBytes 3.11 Gbits/sec 418 sender [ 5] 0.00-10.00 sec 3.61 GBytes 3.10 Gbits/sec receiver ``` Resolves: #9948 --- rust/connlib/snownet/src/node.rs | 162 +++++++++++++++++-------------- 1 file changed, 89 insertions(+), 73 deletions(-) diff --git a/rust/connlib/snownet/src/node.rs b/rust/connlib/snownet/src/node.rs index f9afda371..3f8e78471 100644 --- a/rust/connlib/snownet/src/node.rs +++ b/rust/connlib/snownet/src/node.rs @@ -32,7 +32,6 @@ use str0m::ice::{IceAgent, IceAgentEvent, IceCreds, StunMessage, StunPacket}; use str0m::net::Protocol; use str0m::{Candidate, CandidateKind, IceConnectionState}; use stun_codec::rfc5389::attributes::{Realm, Username}; -use tracing::info_span; // Note: Taken from boringtun const HANDSHAKE_RATE_LIMIT: u64 = 100; @@ -258,7 +257,7 @@ where .is_some_and(|c| c == &remote_creds) && c.tunnel.remote_static_public() == remote { - c.state.on_upsert(&mut c.agent, now); + c.state.on_upsert(cid, &mut c.agent, now); tracing::info!(local = ?local_creds, "Reusing existing connection"); return Ok(()); @@ -443,14 +442,14 @@ where /// Nevertheless, using [`IpPacket`] in our API has good documentation value. pub fn encapsulate( &mut self, - connection: TId, + cid: TId, packet: IpPacket, now: Instant, ) -> Result> { let conn = self .connections - .get_established_mut(&connection) - .with_context(|| format!("Unknown connection {connection}"))?; + .get_established_mut(&cid) + .with_context(|| format!("Unknown connection {cid}"))?; if self.mode.is_server() && !conn.state.has_nominated_socket() { tracing::debug!( @@ -466,8 +465,8 @@ where // Encode the packet with an offset of 4 bytes, in case we need to wrap it in a channel-data message. let Some(packet_len) = conn - .encapsulate(packet, &mut buffer[4..], now) - .with_context(|| format!("cid={connection}"))? + .encapsulate(cid, packet, &mut buffer[4..], now) + .with_context(|| format!("cid={cid}"))? .map(|p| p.len()) // Mapping to len() here terminate the mutable borrow of buffer, allowing re-borrowing further down. else { @@ -482,16 +481,14 @@ where buffered.push(buffer[packet_start..packet_end].to_vec()); let num_buffered = buffered.len(); - let _guard = conn.span.enter(); - - tracing::debug!(%num_buffered, "ICE is still in progress, buffering WG handshake"); + tracing::debug!(%num_buffered, %cid, "ICE is still in progress, buffering WG handshake"); return Ok(None); } ConnectionState::Connected { peer_socket, .. } => peer_socket, ConnectionState::Idle { peer_socket } => peer_socket, ConnectionState::Failed => { - return Err(anyhow!("Connection {connection} failed")); + return Err(anyhow!("Connection {cid} failed")); } }; @@ -516,7 +513,7 @@ where PeerSocket::RelayToPeer { relay, dest: peer } | PeerSocket::RelayToRelay { relay, dest: peer } => { let Some(allocation) = self.allocations.get_mut(&relay) else { - tracing::warn!(%relay, "No allocation"); + tracing::warn!(%relay, %cid, "No allocation"); return Ok(None); }; let Some(encode_ok) = @@ -750,7 +747,7 @@ where agent.handle_timeout(now); if self.allocations.is_empty() { - tracing::warn!("No TURN servers connected; connection may fail to establish"); + tracing::warn!(%cid, "No TURN servers connected; connection may fail to establish"); } let mut tunnel = Tunn::new_at( @@ -790,7 +787,6 @@ where }, disconnected_at: None, possible_sockets: BTreeSet::default(), - span: info_span!(parent: tracing::Span::none(), "connection", %cid), buffer_pool: self.buffer_pool.clone(), } } @@ -807,7 +803,7 @@ where return Ok(()); } - for (cid, agent, _span) in self.connections.agents_mut() { + for (cid, agent) in self.connections.agents_mut() { add_local_candidate(cid, agent, host_candidate.clone(), &mut self.pending_events); } @@ -896,7 +892,7 @@ where return ControlFlow::Continue(()); }; - for (_, agent, _span) in self.connections.agents_mut() { + for (_, agent) in self.connections.agents_mut() { if agent.accepts_message(&message) { agent.handle_packet( now, @@ -931,6 +927,7 @@ where let handshake_complete_before_decapsulate = conn.wg_handshake_complete(now); let control_flow = conn.decapsulate( + cid, from.ip(), packet, &mut self.allocations, @@ -985,19 +982,17 @@ where continue; } - for (cid, agent, _span) in self.connections.connecting_agents_by_relay_mut(rid) - { + for (cid, agent) in self.connections.connecting_agents_by_relay_mut(rid) { add_local_candidate(cid, agent, candidate.clone(), &mut self.pending_events) } } allocation::Event::New(candidate) => { - for (cid, agent, _span) in self.connections.connecting_agents_by_relay_mut(rid) - { + for (cid, agent) in self.connections.connecting_agents_by_relay_mut(rid) { add_local_candidate(cid, agent, candidate.clone(), &mut self.pending_events) } } allocation::Event::Invalid(candidate) => { - for (cid, agent, _span) in self.connections.agents_mut() { + for (cid, agent) in self.connections.agents_mut() { remove_local_candidate(cid, agent, &candidate, &mut self.pending_events); } } @@ -1068,7 +1063,7 @@ where intent_sent_at, relay: self.sample_relay()?, is_failed: false, - span: info_span!("connection", %cid), + span: tracing::Span::none(), }; let duration_since_intent = initial_connection.duration_since_intent(now); @@ -1333,8 +1328,6 @@ where // For established connections, we check if we are currently using the relay. for (_, c) in self.iter_established_mut() { - let _guard = c.span.enter(); - use ConnectionState::*; let peer_socket = match &mut c.state { Connected { peer_socket, .. } | Idle { peer_socket } => peer_socket, @@ -1400,17 +1393,18 @@ where fn connecting_agents_by_relay_mut( &mut self, id: RId, - ) -> impl Iterator)> + '_ { - let initial_connections = self.initial.iter_mut().filter_map(move |(cid, i)| { - (i.relay == id).then_some((*cid, &mut i.agent, i.span.enter())) - }); + ) -> impl Iterator + '_ { + let initial_connections = self + .initial + .iter_mut() + .filter_map(move |(cid, i)| (i.relay == id).then_some((*cid, &mut i.agent))); let pending_connections = self.established.iter_mut().filter_map(move |(cid, c)| { use ConnectionState::*; match c.state { Connecting { relay: Some(relay), .. - } if relay == id => Some((*cid, &mut c.agent, c.span.enter())), + } if relay == id => Some((*cid, &mut c.agent)), Failed | Idle { .. } | Connecting { .. } | Connected { .. } => None, } }); @@ -1418,17 +1412,12 @@ where initial_connections.chain(pending_connections) } - fn agents_mut( - &mut self, - ) -> impl Iterator)> { - let initial_agents = self - .initial - .iter_mut() - .map(|(id, c)| (*id, &mut c.agent, c.span.enter())); + fn agents_mut(&mut self) -> impl Iterator { + let initial_agents = self.initial.iter_mut().map(|(id, c)| (*id, &mut c.agent)); let negotiated_agents = self .established .iter_mut() - .map(|(id, c)| (*id, &mut c.agent, c.span.enter())); + .map(|(id, c)| (*id, &mut c.agent)); initial_agents.chain(negotiated_agents) } @@ -1517,7 +1506,7 @@ fn invalidate_allocation_candidates( TId: Eq + Hash + Copy + Ord + fmt::Display, RId: Copy + Eq + Hash + PartialEq + Ord + fmt::Debug + fmt::Display, { - for (cid, agent, _guard) in connections.agents_mut() { + for (cid, agent) in connections.agents_mut() { for candidate in allocation.current_relay_candidates() { remove_local_candidate(cid, agent, &candidate, pending_events); } @@ -1659,7 +1648,6 @@ struct InitialConnection { } impl InitialConnection { - #[tracing::instrument(level = "debug", skip_all, fields(%cid))] fn handle_timeout(&mut self, cid: TId, now: Instant) where TId: fmt::Display, @@ -1667,7 +1655,7 @@ impl InitialConnection { self.agent.handle_timeout(now); if now >= self.no_answer_received_timeout() { - tracing::info!("Connection setup timed out (no answer received)"); + tracing::info!(%cid, "Connection setup timed out (no answer received)"); self.is_failed = true; } } @@ -1715,7 +1703,6 @@ struct Connection { buffer: Vec, - span: tracing::Span, buffer_pool: BufferPool>, } @@ -1767,7 +1754,10 @@ where } } - fn handle_timeout(&mut self, agent: &mut IceAgent, now: Instant) { + fn handle_timeout(&mut self, cid: TId, agent: &mut IceAgent, now: Instant) + where + TId: fmt::Display, + { let Self::Connected { last_outgoing, last_incoming, @@ -1783,19 +1773,25 @@ where let peer_socket = *peer_socket; - self.transition_to_idle(peer_socket, agent); + self.transition_to_idle(cid, peer_socket, agent); } - fn on_upsert(&mut self, agent: &mut IceAgent, now: Instant) { + fn on_upsert(&mut self, cid: TId, agent: &mut IceAgent, now: Instant) + where + TId: fmt::Display, + { let peer_socket = match self { Self::Idle { peer_socket } => *peer_socket, Self::Failed | Self::Connecting { .. } | Self::Connected { .. } => return, }; - self.transition_to_connected(peer_socket, agent, "upsert", now); + self.transition_to_connected(cid, peer_socket, agent, "upsert", now); } - fn on_outgoing(&mut self, agent: &mut IceAgent, packet: &IpPacket, now: Instant) { + fn on_outgoing(&mut self, cid: TId, agent: &mut IceAgent, packet: &IpPacket, now: Instant) + where + TId: fmt::Display, + { let peer_socket = match self { Self::Idle { peer_socket } => *peer_socket, Self::Connected { last_outgoing, .. } => { @@ -1805,10 +1801,13 @@ where Self::Failed | Self::Connecting { .. } => return, }; - self.transition_to_connected(peer_socket, agent, tracing::field::debug(packet), now); + self.transition_to_connected(cid, peer_socket, agent, tracing::field::debug(packet), now); } - fn on_incoming(&mut self, agent: &mut IceAgent, packet: &IpPacket, now: Instant) { + fn on_incoming(&mut self, cid: TId, agent: &mut IceAgent, packet: &IpPacket, now: Instant) + where + TId: fmt::Display, + { let peer_socket = match self { Self::Idle { peer_socket } => *peer_socket, Self::Connected { last_incoming, .. } => { @@ -1818,23 +1817,33 @@ where Self::Failed | Self::Connecting { .. } => return, }; - self.transition_to_connected(peer_socket, agent, tracing::field::debug(packet), now); + self.transition_to_connected(cid, peer_socket, agent, tracing::field::debug(packet), now); } - fn transition_to_idle(&mut self, peer_socket: PeerSocket, agent: &mut IceAgent) { - tracing::debug!("Connection is idle"); + fn transition_to_idle( + &mut self, + cid: TId, + peer_socket: PeerSocket, + agent: &mut IceAgent, + ) where + TId: fmt::Display, + { + tracing::debug!(%cid, "Connection is idle"); *self = Self::Idle { peer_socket }; apply_idle_stun_timings(agent); } - fn transition_to_connected( + fn transition_to_connected( &mut self, + cid: TId, peer_socket: PeerSocket, agent: &mut IceAgent, trigger: impl tracing::Value, now: Instant, - ) { - tracing::debug!(trigger, "Connection resumed"); + ) where + TId: fmt::Display, + { + tracing::debug!(trigger, %cid, "Connection resumed"); *self = Self::Connected { peer_socket, last_outgoing: now, @@ -1943,7 +1952,6 @@ where Some(disconnected_at + DISCONNECT_TIMEOUT) } - #[tracing::instrument(level = "info", skip_all, fields(%cid))] fn handle_timeout( &mut self, cid: TId, @@ -1955,13 +1963,13 @@ where RId: Copy + Ord + fmt::Display, { self.agent.handle_timeout(now); - self.state.handle_timeout(&mut self.agent, now); + self.state.handle_timeout(cid, &mut self.agent, now); if self .candidate_timeout() .is_some_and(|timeout| now >= timeout) { - tracing::info!("Connection failed (no candidates received)"); + tracing::info!(%cid, "Connection failed (no candidates received)"); self.state = ConnectionState::Failed; return; } @@ -1970,12 +1978,12 @@ where .disconnect_timeout() .is_some_and(|timeout| now >= timeout) { - tracing::info!("Connection failed (ICE timeout)"); + tracing::info!(%cid, "Connection failed (ICE timeout)"); self.state = ConnectionState::Failed; return; } - self.handle_tunnel_timeout(now, allocations, transmits); + self.handle_tunnel_timeout(cid, now, allocations, transmits); // If this was a scheduled update, hop to the next interval. if now >= self.next_wg_timer_update { @@ -2153,12 +2161,15 @@ where } } - fn handle_tunnel_timeout( + fn handle_tunnel_timeout( &mut self, + cid: TId, now: Instant, allocations: &mut BTreeMap, transmits: &mut VecDeque, - ) { + ) where + TId: fmt::Display, + { // Don't update wireguard timers until we are connected. let Some(peer_socket) = self.socket() else { return; @@ -2174,11 +2185,11 @@ where match self.tunnel.update_timers_at(&mut buf, now) { TunnResult::Done => {} TunnResult::Err(WireGuardError::ConnectionExpired) => { - tracing::info!("Connection failed (wireguard tunnel expired)"); + tracing::info!(%cid, "Connection failed (wireguard tunnel expired)"); self.state = ConnectionState::Failed; } TunnResult::Err(e) => { - tracing::warn!(?e); + tracing::warn!(%cid, "boringtun error: {e}"); } TunnResult::WriteToNetwork(b) => { transmits.extend(make_owned_transmit( @@ -2195,15 +2206,17 @@ where }; } - fn encapsulate<'b>( + fn encapsulate<'b, TId>( &mut self, + cid: TId, packet: IpPacket, buffer: &'b mut [u8], now: Instant, - ) -> Result> { - let _guard = self.span.enter(); - - self.state.on_outgoing(&mut self.agent, &packet, now); + ) -> Result> + where + TId: fmt::Display, + { + self.state.on_outgoing(cid, &mut self.agent, &packet, now); let len = match self.tunnel.encapsulate_at(packet.packet(), buffer, now) { TunnResult::Done => return Ok(None), @@ -2217,15 +2230,18 @@ where Ok(Some(&buffer[..len])) } - fn decapsulate( + fn decapsulate( &mut self, + cid: TId, src: IpAddr, packet: &[u8], allocations: &mut BTreeMap, transmits: &mut VecDeque, now: Instant, - ) -> ControlFlow, IpPacket> { - let _guard = self.span.enter(); + ) -> ControlFlow, IpPacket> + where + TId: fmt::Display, + { let mut ip_packet = IpPacketBuf::new(); let control_flow = match self @@ -2269,7 +2285,7 @@ where TunnResult::WriteToNetwork(bytes) => { match &mut self.state { ConnectionState::Connecting { buffered, .. } => { - tracing::debug!("No socket has been nominated yet, buffering WG packet"); + tracing::debug!(%cid, "No socket has been nominated yet, buffering WG packet"); buffered.push(bytes.to_owned()); @@ -2311,7 +2327,7 @@ where }; if let ControlFlow::Continue(packet) = &control_flow { - self.state.on_incoming(&mut self.agent, packet, now); + self.state.on_incoming(cid, &mut self.agent, packet, now); } control_flow