feat(snownet): remove "connection ID" span (#9949)

At present, `snownet` uses a `tracing::Span` to attach the connection ID
to various log messages. This requires the span to be entered and exited
on every packet. Whilst profiling Firezone, I noticed that is takes
between 10% and 20% of CPU time on the main thread.

Previously, this wasn't a bottleneck as other parts of Firezone were not
yet as optimised. With some changes earlier this year of a dedicated UDP
thread and better GSO, this does appear to be a bottleneck now.

On `main`, I am currently getting the following numbers on my local
machine:

```
Connecting to host 172.20.0.110, port 5201
[  5] local 100.85.16.226 port 42012 connected to 172.20.0.110 port 5201
[ ID] Interval           Transfer     Bitrate         Retr  Cwnd
[  5]   0.00-1.00   sec   251 MBytes  2.11 Gbits/sec   16    558 KBytes       
[  5]   1.00-2.00   sec   287 MBytes  2.41 Gbits/sec    6    800 KBytes       
[  5]   2.00-3.00   sec   284 MBytes  2.38 Gbits/sec    2    992 KBytes       
[  5]   3.00-4.00   sec   287 MBytes  2.41 Gbits/sec    3   1.12 MBytes       
[  5]   4.00-5.00   sec   290 MBytes  2.44 Gbits/sec    0   1.27 MBytes       
[  5]   5.00-6.00   sec   300 MBytes  2.52 Gbits/sec    2   1.40 MBytes       
[  5]   6.00-7.00   sec   295 MBytes  2.47 Gbits/sec    2   1.52 MBytes       
[  5]   7.00-8.00   sec   304 MBytes  2.55 Gbits/sec    3   1.63 MBytes       
[  5]   8.00-9.00   sec   290 MBytes  2.44 Gbits/sec   49   1.21 MBytes       
[  5]   9.00-10.00  sec   288 MBytes  2.41 Gbits/sec   24   1023 KBytes       
- - - - - - - - - - - - - - - - - - - - - - - - -
[ ID] Interval           Transfer     Bitrate         Retr
[  5]   0.00-10.00  sec  2.81 GBytes  2.41 Gbits/sec  107             sender
[  5]   0.00-10.00  sec  2.81 GBytes  2.41 Gbits/sec                  receiver
```

With this patch applied, the throughput goes up significantly:

```
Connecting to host 172.20.0.110, port 5201
[  5] local 100.85.16.226 port 41402 connected to 172.20.0.110 port 5201
[ ID] Interval           Transfer     Bitrate         Retr  Cwnd
[  5]   0.00-1.00   sec   315 MBytes  2.64 Gbits/sec    7    619 KBytes       
[  5]   1.00-2.00   sec   363 MBytes  3.05 Gbits/sec   11    847 KBytes       
[  5]   2.00-3.00   sec   379 MBytes  3.18 Gbits/sec    1   1.07 MBytes       
[  5]   3.00-4.00   sec   384 MBytes  3.22 Gbits/sec   44    981 KBytes       
[  5]   4.00-5.00   sec   377 MBytes  3.16 Gbits/sec  116    911 KBytes       
[  5]   5.00-6.00   sec   378 MBytes  3.17 Gbits/sec    3   1.10 MBytes       
[  5]   6.00-7.00   sec   377 MBytes  3.16 Gbits/sec   48    929 KBytes       
[  5]   7.00-8.00   sec   374 MBytes  3.14 Gbits/sec  151    947 KBytes       
[  5]   8.00-9.00   sec   382 MBytes  3.21 Gbits/sec   36    833 KBytes       
[  5]   9.00-10.00  sec   375 MBytes  3.14 Gbits/sec    1   1.06 MBytes       
- - - - - - - - - - - - - - - - - - - - - - - - -
[ ID] Interval           Transfer     Bitrate         Retr
[  5]   0.00-10.00  sec  3.62 GBytes  3.11 Gbits/sec  418             sender
[  5]   0.00-10.00  sec  3.61 GBytes  3.10 Gbits/sec                  receiver
```

Resolves: #9948
This commit is contained in:
Thomas Eizinger
2025-07-23 03:40:33 +10:00
committed by GitHub
parent f41a6f9e0b
commit 71e6b56654

View File

@@ -32,7 +32,6 @@ use str0m::ice::{IceAgent, IceAgentEvent, IceCreds, StunMessage, StunPacket};
use str0m::net::Protocol;
use str0m::{Candidate, CandidateKind, IceConnectionState};
use stun_codec::rfc5389::attributes::{Realm, Username};
use tracing::info_span;
// Note: Taken from boringtun
const HANDSHAKE_RATE_LIMIT: u64 = 100;
@@ -258,7 +257,7 @@ where
.is_some_and(|c| c == &remote_creds)
&& c.tunnel.remote_static_public() == remote
{
c.state.on_upsert(&mut c.agent, now);
c.state.on_upsert(cid, &mut c.agent, now);
tracing::info!(local = ?local_creds, "Reusing existing connection");
return Ok(());
@@ -443,14 +442,14 @@ where
/// Nevertheless, using [`IpPacket`] in our API has good documentation value.
pub fn encapsulate(
&mut self,
connection: TId,
cid: TId,
packet: IpPacket,
now: Instant,
) -> Result<Option<Transmit>> {
let conn = self
.connections
.get_established_mut(&connection)
.with_context(|| format!("Unknown connection {connection}"))?;
.get_established_mut(&cid)
.with_context(|| format!("Unknown connection {cid}"))?;
if self.mode.is_server() && !conn.state.has_nominated_socket() {
tracing::debug!(
@@ -466,8 +465,8 @@ where
// Encode the packet with an offset of 4 bytes, in case we need to wrap it in a channel-data message.
let Some(packet_len) = conn
.encapsulate(packet, &mut buffer[4..], now)
.with_context(|| format!("cid={connection}"))?
.encapsulate(cid, packet, &mut buffer[4..], now)
.with_context(|| format!("cid={cid}"))?
.map(|p| p.len())
// Mapping to len() here terminate the mutable borrow of buffer, allowing re-borrowing further down.
else {
@@ -482,16 +481,14 @@ where
buffered.push(buffer[packet_start..packet_end].to_vec());
let num_buffered = buffered.len();
let _guard = conn.span.enter();
tracing::debug!(%num_buffered, "ICE is still in progress, buffering WG handshake");
tracing::debug!(%num_buffered, %cid, "ICE is still in progress, buffering WG handshake");
return Ok(None);
}
ConnectionState::Connected { peer_socket, .. } => peer_socket,
ConnectionState::Idle { peer_socket } => peer_socket,
ConnectionState::Failed => {
return Err(anyhow!("Connection {connection} failed"));
return Err(anyhow!("Connection {cid} failed"));
}
};
@@ -516,7 +513,7 @@ where
PeerSocket::RelayToPeer { relay, dest: peer }
| PeerSocket::RelayToRelay { relay, dest: peer } => {
let Some(allocation) = self.allocations.get_mut(&relay) else {
tracing::warn!(%relay, "No allocation");
tracing::warn!(%relay, %cid, "No allocation");
return Ok(None);
};
let Some(encode_ok) =
@@ -750,7 +747,7 @@ where
agent.handle_timeout(now);
if self.allocations.is_empty() {
tracing::warn!("No TURN servers connected; connection may fail to establish");
tracing::warn!(%cid, "No TURN servers connected; connection may fail to establish");
}
let mut tunnel = Tunn::new_at(
@@ -790,7 +787,6 @@ where
},
disconnected_at: None,
possible_sockets: BTreeSet::default(),
span: info_span!(parent: tracing::Span::none(), "connection", %cid),
buffer_pool: self.buffer_pool.clone(),
}
}
@@ -807,7 +803,7 @@ where
return Ok(());
}
for (cid, agent, _span) in self.connections.agents_mut() {
for (cid, agent) in self.connections.agents_mut() {
add_local_candidate(cid, agent, host_candidate.clone(), &mut self.pending_events);
}
@@ -896,7 +892,7 @@ where
return ControlFlow::Continue(());
};
for (_, agent, _span) in self.connections.agents_mut() {
for (_, agent) in self.connections.agents_mut() {
if agent.accepts_message(&message) {
agent.handle_packet(
now,
@@ -931,6 +927,7 @@ where
let handshake_complete_before_decapsulate = conn.wg_handshake_complete(now);
let control_flow = conn.decapsulate(
cid,
from.ip(),
packet,
&mut self.allocations,
@@ -985,19 +982,17 @@ where
continue;
}
for (cid, agent, _span) in self.connections.connecting_agents_by_relay_mut(rid)
{
for (cid, agent) in self.connections.connecting_agents_by_relay_mut(rid) {
add_local_candidate(cid, agent, candidate.clone(), &mut self.pending_events)
}
}
allocation::Event::New(candidate) => {
for (cid, agent, _span) in self.connections.connecting_agents_by_relay_mut(rid)
{
for (cid, agent) in self.connections.connecting_agents_by_relay_mut(rid) {
add_local_candidate(cid, agent, candidate.clone(), &mut self.pending_events)
}
}
allocation::Event::Invalid(candidate) => {
for (cid, agent, _span) in self.connections.agents_mut() {
for (cid, agent) in self.connections.agents_mut() {
remove_local_candidate(cid, agent, &candidate, &mut self.pending_events);
}
}
@@ -1068,7 +1063,7 @@ where
intent_sent_at,
relay: self.sample_relay()?,
is_failed: false,
span: info_span!("connection", %cid),
span: tracing::Span::none(),
};
let duration_since_intent = initial_connection.duration_since_intent(now);
@@ -1333,8 +1328,6 @@ where
// For established connections, we check if we are currently using the relay.
for (_, c) in self.iter_established_mut() {
let _guard = c.span.enter();
use ConnectionState::*;
let peer_socket = match &mut c.state {
Connected { peer_socket, .. } | Idle { peer_socket } => peer_socket,
@@ -1400,17 +1393,18 @@ where
fn connecting_agents_by_relay_mut(
&mut self,
id: RId,
) -> impl Iterator<Item = (TId, &mut IceAgent, tracing::span::Entered<'_>)> + '_ {
let initial_connections = self.initial.iter_mut().filter_map(move |(cid, i)| {
(i.relay == id).then_some((*cid, &mut i.agent, i.span.enter()))
});
) -> impl Iterator<Item = (TId, &mut IceAgent)> + '_ {
let initial_connections = self
.initial
.iter_mut()
.filter_map(move |(cid, i)| (i.relay == id).then_some((*cid, &mut i.agent)));
let pending_connections = self.established.iter_mut().filter_map(move |(cid, c)| {
use ConnectionState::*;
match c.state {
Connecting {
relay: Some(relay), ..
} if relay == id => Some((*cid, &mut c.agent, c.span.enter())),
} if relay == id => Some((*cid, &mut c.agent)),
Failed | Idle { .. } | Connecting { .. } | Connected { .. } => None,
}
});
@@ -1418,17 +1412,12 @@ where
initial_connections.chain(pending_connections)
}
fn agents_mut(
&mut self,
) -> impl Iterator<Item = (TId, &mut IceAgent, tracing::span::Entered<'_>)> {
let initial_agents = self
.initial
.iter_mut()
.map(|(id, c)| (*id, &mut c.agent, c.span.enter()));
fn agents_mut(&mut self) -> impl Iterator<Item = (TId, &mut IceAgent)> {
let initial_agents = self.initial.iter_mut().map(|(id, c)| (*id, &mut c.agent));
let negotiated_agents = self
.established
.iter_mut()
.map(|(id, c)| (*id, &mut c.agent, c.span.enter()));
.map(|(id, c)| (*id, &mut c.agent));
initial_agents.chain(negotiated_agents)
}
@@ -1517,7 +1506,7 @@ fn invalidate_allocation_candidates<TId, RId>(
TId: Eq + Hash + Copy + Ord + fmt::Display,
RId: Copy + Eq + Hash + PartialEq + Ord + fmt::Debug + fmt::Display,
{
for (cid, agent, _guard) in connections.agents_mut() {
for (cid, agent) in connections.agents_mut() {
for candidate in allocation.current_relay_candidates() {
remove_local_candidate(cid, agent, &candidate, pending_events);
}
@@ -1659,7 +1648,6 @@ struct InitialConnection<RId> {
}
impl<RId> InitialConnection<RId> {
#[tracing::instrument(level = "debug", skip_all, fields(%cid))]
fn handle_timeout<TId>(&mut self, cid: TId, now: Instant)
where
TId: fmt::Display,
@@ -1667,7 +1655,7 @@ impl<RId> InitialConnection<RId> {
self.agent.handle_timeout(now);
if now >= self.no_answer_received_timeout() {
tracing::info!("Connection setup timed out (no answer received)");
tracing::info!(%cid, "Connection setup timed out (no answer received)");
self.is_failed = true;
}
}
@@ -1715,7 +1703,6 @@ struct Connection<RId> {
buffer: Vec<u8>,
span: tracing::Span,
buffer_pool: BufferPool<Vec<u8>>,
}
@@ -1767,7 +1754,10 @@ where
}
}
fn handle_timeout(&mut self, agent: &mut IceAgent, now: Instant) {
fn handle_timeout<TId>(&mut self, cid: TId, agent: &mut IceAgent, now: Instant)
where
TId: fmt::Display,
{
let Self::Connected {
last_outgoing,
last_incoming,
@@ -1783,19 +1773,25 @@ where
let peer_socket = *peer_socket;
self.transition_to_idle(peer_socket, agent);
self.transition_to_idle(cid, peer_socket, agent);
}
fn on_upsert(&mut self, agent: &mut IceAgent, now: Instant) {
fn on_upsert<TId>(&mut self, cid: TId, agent: &mut IceAgent, now: Instant)
where
TId: fmt::Display,
{
let peer_socket = match self {
Self::Idle { peer_socket } => *peer_socket,
Self::Failed | Self::Connecting { .. } | Self::Connected { .. } => return,
};
self.transition_to_connected(peer_socket, agent, "upsert", now);
self.transition_to_connected(cid, peer_socket, agent, "upsert", now);
}
fn on_outgoing(&mut self, agent: &mut IceAgent, packet: &IpPacket, now: Instant) {
fn on_outgoing<TId>(&mut self, cid: TId, agent: &mut IceAgent, packet: &IpPacket, now: Instant)
where
TId: fmt::Display,
{
let peer_socket = match self {
Self::Idle { peer_socket } => *peer_socket,
Self::Connected { last_outgoing, .. } => {
@@ -1805,10 +1801,13 @@ where
Self::Failed | Self::Connecting { .. } => return,
};
self.transition_to_connected(peer_socket, agent, tracing::field::debug(packet), now);
self.transition_to_connected(cid, peer_socket, agent, tracing::field::debug(packet), now);
}
fn on_incoming(&mut self, agent: &mut IceAgent, packet: &IpPacket, now: Instant) {
fn on_incoming<TId>(&mut self, cid: TId, agent: &mut IceAgent, packet: &IpPacket, now: Instant)
where
TId: fmt::Display,
{
let peer_socket = match self {
Self::Idle { peer_socket } => *peer_socket,
Self::Connected { last_incoming, .. } => {
@@ -1818,23 +1817,33 @@ where
Self::Failed | Self::Connecting { .. } => return,
};
self.transition_to_connected(peer_socket, agent, tracing::field::debug(packet), now);
self.transition_to_connected(cid, peer_socket, agent, tracing::field::debug(packet), now);
}
fn transition_to_idle(&mut self, peer_socket: PeerSocket<RId>, agent: &mut IceAgent) {
tracing::debug!("Connection is idle");
fn transition_to_idle<TId>(
&mut self,
cid: TId,
peer_socket: PeerSocket<RId>,
agent: &mut IceAgent,
) where
TId: fmt::Display,
{
tracing::debug!(%cid, "Connection is idle");
*self = Self::Idle { peer_socket };
apply_idle_stun_timings(agent);
}
fn transition_to_connected(
fn transition_to_connected<TId>(
&mut self,
cid: TId,
peer_socket: PeerSocket<RId>,
agent: &mut IceAgent,
trigger: impl tracing::Value,
now: Instant,
) {
tracing::debug!(trigger, "Connection resumed");
) where
TId: fmt::Display,
{
tracing::debug!(trigger, %cid, "Connection resumed");
*self = Self::Connected {
peer_socket,
last_outgoing: now,
@@ -1943,7 +1952,6 @@ where
Some(disconnected_at + DISCONNECT_TIMEOUT)
}
#[tracing::instrument(level = "info", skip_all, fields(%cid))]
fn handle_timeout<TId>(
&mut self,
cid: TId,
@@ -1955,13 +1963,13 @@ where
RId: Copy + Ord + fmt::Display,
{
self.agent.handle_timeout(now);
self.state.handle_timeout(&mut self.agent, now);
self.state.handle_timeout(cid, &mut self.agent, now);
if self
.candidate_timeout()
.is_some_and(|timeout| now >= timeout)
{
tracing::info!("Connection failed (no candidates received)");
tracing::info!(%cid, "Connection failed (no candidates received)");
self.state = ConnectionState::Failed;
return;
}
@@ -1970,12 +1978,12 @@ where
.disconnect_timeout()
.is_some_and(|timeout| now >= timeout)
{
tracing::info!("Connection failed (ICE timeout)");
tracing::info!(%cid, "Connection failed (ICE timeout)");
self.state = ConnectionState::Failed;
return;
}
self.handle_tunnel_timeout(now, allocations, transmits);
self.handle_tunnel_timeout(cid, now, allocations, transmits);
// If this was a scheduled update, hop to the next interval.
if now >= self.next_wg_timer_update {
@@ -2153,12 +2161,15 @@ where
}
}
fn handle_tunnel_timeout(
fn handle_tunnel_timeout<TId>(
&mut self,
cid: TId,
now: Instant,
allocations: &mut BTreeMap<RId, Allocation>,
transmits: &mut VecDeque<Transmit>,
) {
) where
TId: fmt::Display,
{
// Don't update wireguard timers until we are connected.
let Some(peer_socket) = self.socket() else {
return;
@@ -2174,11 +2185,11 @@ where
match self.tunnel.update_timers_at(&mut buf, now) {
TunnResult::Done => {}
TunnResult::Err(WireGuardError::ConnectionExpired) => {
tracing::info!("Connection failed (wireguard tunnel expired)");
tracing::info!(%cid, "Connection failed (wireguard tunnel expired)");
self.state = ConnectionState::Failed;
}
TunnResult::Err(e) => {
tracing::warn!(?e);
tracing::warn!(%cid, "boringtun error: {e}");
}
TunnResult::WriteToNetwork(b) => {
transmits.extend(make_owned_transmit(
@@ -2195,15 +2206,17 @@ where
};
}
fn encapsulate<'b>(
fn encapsulate<'b, TId>(
&mut self,
cid: TId,
packet: IpPacket,
buffer: &'b mut [u8],
now: Instant,
) -> Result<Option<&'b [u8]>> {
let _guard = self.span.enter();
self.state.on_outgoing(&mut self.agent, &packet, now);
) -> Result<Option<&'b [u8]>>
where
TId: fmt::Display,
{
self.state.on_outgoing(cid, &mut self.agent, &packet, now);
let len = match self.tunnel.encapsulate_at(packet.packet(), buffer, now) {
TunnResult::Done => return Ok(None),
@@ -2217,15 +2230,18 @@ where
Ok(Some(&buffer[..len]))
}
fn decapsulate(
fn decapsulate<TId>(
&mut self,
cid: TId,
src: IpAddr,
packet: &[u8],
allocations: &mut BTreeMap<RId, Allocation>,
transmits: &mut VecDeque<Transmit>,
now: Instant,
) -> ControlFlow<Result<()>, IpPacket> {
let _guard = self.span.enter();
) -> ControlFlow<Result<()>, IpPacket>
where
TId: fmt::Display,
{
let mut ip_packet = IpPacketBuf::new();
let control_flow = match self
@@ -2269,7 +2285,7 @@ where
TunnResult::WriteToNetwork(bytes) => {
match &mut self.state {
ConnectionState::Connecting { buffered, .. } => {
tracing::debug!("No socket has been nominated yet, buffering WG packet");
tracing::debug!(%cid, "No socket has been nominated yet, buffering WG packet");
buffered.push(bytes.to_owned());
@@ -2311,7 +2327,7 @@ where
};
if let ControlFlow::Continue(packet) = &control_flow {
self.state.on_incoming(&mut self.agent, packet, now);
self.state.on_incoming(cid, &mut self.agent, packet, now);
}
control_flow