feat(connlib): tune down logs for recently disconnected clients (#10501)

When a Client disconnects from a Gateway, we might still be receiving
packets that are either in-flight or are still being sent by the
resource. For some amount of time after a disconnect, this is expected
and not worth logging a warning for.

With this PR, we define this time to be 60s. If we cannot look up a
connection either by ID, session index or public key but the peer has
disconnected within the last 60s, we will now only print a DEBUG log
instead of a WARN.

Resolves: #10175
This commit is contained in:
Thomas Eizinger
2025-10-03 13:08:06 +00:00
committed by GitHub
parent 2cc13cea24
commit e9e8792512
5 changed files with 618 additions and 273 deletions

View File

@@ -1,5 +1,10 @@
mod connections;
pub use connections::UnknownConnection;
use crate::allocation::{self, Allocation, RelaySocket, Socket};
use crate::index::IndexLfsr;
use crate::node::connections::Connections;
use crate::stats::{ConnectionStats, NodeStats};
use crate::utils::channel_data_packet_buffer;
use anyhow::{Context, Result, anyhow};
@@ -16,7 +21,7 @@ use ip_packet::{Ecn, IpPacket, IpPacketBuf};
use itertools::Itertools;
use rand::rngs::StdRng;
use rand::seq::IteratorRandom;
use rand::{Rng, RngCore, SeedableRng, random};
use rand::{RngCore, SeedableRng, random};
use ringbuffer::{AllocRingBuffer, RingBuffer as _};
use secrecy::{ExposeSecret, Secret};
use sha2::Digest;
@@ -236,7 +241,7 @@ where
let local_creds = local_creds.into();
let remote_creds = remote_creds.into();
if self.connections.initial.contains_key(&cid) {
if self.connections.contains_initial(&cid) {
debug_assert!(
false,
"The new `upsert_connection` API is incompatible with the previous `new_connection` API"
@@ -256,7 +261,7 @@ where
// Only if all of those things are the same, will:
// - ICE be able to establish a connection
// - boringtun be able to handshake a session
if let Some(c) = self.connections.get_established_mut(&cid)
if let Ok(c) = self.connections.get_established_mut(&cid, now)
&& c.agent.local_credentials() == &local_creds
&& c.agent
.remote_credentials()
@@ -290,7 +295,7 @@ where
return Ok(());
}
let existing = self.connections.established.remove(&cid);
let existing = self.connections.remove_established(&cid, now);
let index = self.index.next();
if let Some(existing) = existing {
@@ -333,8 +338,8 @@ where
/// Removes a connection by just clearing its local memory.
#[tracing::instrument(level = "info", skip_all, fields(%cid))]
pub fn remove_connection(&mut self, cid: TId, reason: impl fmt::Display) {
let existing = self.connections.established.remove(&cid);
pub fn remove_connection(&mut self, cid: TId, reason: impl fmt::Display, now: Instant) {
let existing = self.connections.remove_established(&cid, now);
if existing.is_none() {
return;
@@ -348,7 +353,7 @@ where
/// If we are connected, sends the provided "goodbye" packet before discarding the connection.
#[tracing::instrument(level = "info", skip_all, fields(%cid))]
pub fn close_connection(&mut self, cid: TId, goodbye: IpPacket, now: Instant) {
let Some(mut connection) = self.connections.established.remove(&cid) else {
let Some(mut connection) = self.connections.remove_established(&cid, now) else {
tracing::debug!("Cannot close unknown connection");
return;
@@ -433,7 +438,7 @@ where
allocation.bind_channel(candidate.addr(), now);
let Some(connection) = self.connections.get_established_mut(&cid) else {
let Ok(connection) = self.connections.get_established_mut(&cid, now) else {
return;
};
@@ -499,10 +504,7 @@ where
packet: IpPacket,
now: Instant,
) -> Result<Option<Transmit>> {
let conn = self
.connections
.get_established_mut(&cid)
.context(UnknownConnection(cid.to_string()))?;
let conn = self.connections.get_established_mut(&cid, now)?;
if self.mode.is_server() && !conn.state.has_nominated_socket() {
tracing::debug!(
@@ -589,7 +591,7 @@ where
connection.handle_timeout(id, now, &mut self.allocations, &mut self.buffered_transmits);
}
for (id, connection) in self.connections.initial.iter_mut() {
for (id, connection) in self.connections.iter_initial_mut() {
connection.handle_timeout(id, now);
}
@@ -619,7 +621,8 @@ where
&mut self.pending_events,
&mut self.rng,
);
self.connections.gc(&mut self.pending_events);
self.connections
.handle_timeout(&mut self.pending_events, now);
}
/// Returns buffered data that needs to be sent on the socket.
@@ -780,6 +783,7 @@ where
Connection {
agent,
index,
tunnel,
next_wg_timer_update: now,
stats: Default::default(),
@@ -958,7 +962,7 @@ where
match self
.connections
.get_established_mut_by_public_key(handshake.peer_static_public)
.get_established_mut_by_public_key(handshake.peer_static_public, now)
{
Ok(c) => c,
Err(e) => return ControlFlow::Break(Err(e)),
@@ -969,7 +973,7 @@ where
| Packet::PacketCookieReply(PacketCookieReply { receiver_idx, .. })
| Packet::PacketData(PacketData { receiver_idx, .. }) => match self
.connections
.get_established_mut_session_index(Index::from_peer(*receiver_idx))
.get_established_mut_session_index(Index::from_peer(*receiver_idx), now)
{
Ok(c) => c,
Err(e) => return ControlFlow::Break(Err(e)),
@@ -1062,11 +1066,11 @@ where
intent_sent_at: Instant,
now: Instant,
) -> Result<Offer, NoTurnServers> {
if self.connections.initial.remove(&cid).is_some() {
if self.connections.remove_initial(&cid).is_some() {
tracing::info!("Replacing existing initial connection");
};
if self.connections.established.remove(&cid).is_some() {
if self.connections.remove_established(&cid, now).is_some() {
tracing::info!("Replacing existing established connection");
};
@@ -1094,7 +1098,7 @@ where
};
let duration_since_intent = initial_connection.duration_since_intent(now);
let existing = self.connections.initial.insert(cid, initial_connection);
let existing = self.connections.insert_initial(cid, initial_connection);
debug_assert!(existing.is_none());
tracing::info!(?duration_since_intent, "Establishing new connection");
@@ -1107,7 +1111,7 @@ where
#[deprecated]
#[expect(deprecated)]
pub fn accept_answer(&mut self, cid: TId, remote: PublicKey, answer: Answer, now: Instant) {
let Some(initial) = self.connections.initial.remove(&cid) else {
let Some(initial) = self.connections.remove_initial(&cid) else {
tracing::debug!("No initial connection state, ignoring answer"); // This can happen if the connection setup timed out.
return;
};
@@ -1170,11 +1174,11 @@ where
now: Instant,
) -> Result<Answer, NoTurnServers> {
debug_assert!(
!self.connections.initial.contains_key(&cid),
!self.connections.contains_initial(&cid),
"server to not use `initial_connections`"
);
if self.connections.established.remove(&cid).is_some() {
if self.connections.remove_established(&cid, now).is_some() {
tracing::info!("Replacing existing established connection");
};
@@ -1313,228 +1317,6 @@ fn generate_optimistic_candidates(agent: &mut IceAgent) {
}
}
struct Connections<TId, RId> {
initial: BTreeMap<TId, InitialConnection<RId>>,
established: BTreeMap<TId, Connection<RId>>,
established_by_wireguard_session_index: BTreeMap<usize, TId>,
}
impl<TId, RId> Default for Connections<TId, RId> {
fn default() -> Self {
Self {
initial: Default::default(),
established: Default::default(),
established_by_wireguard_session_index: Default::default(),
}
}
}
impl<TId, RId> Connections<TId, RId>
where
TId: Eq + Hash + Copy + Ord + fmt::Display,
RId: Copy + Eq + Hash + PartialEq + Ord + fmt::Debug + fmt::Display,
{
fn gc(&mut self, events: &mut VecDeque<Event<TId>>) {
self.initial.retain(|id, conn| {
if conn.is_failed {
events.push_back(Event::ConnectionFailed(*id));
return false;
}
true
});
self.established.retain(|id, conn| {
if conn.is_failed() {
events.push_back(Event::ConnectionFailed(*id));
self.established_by_wireguard_session_index
.retain(|_, c| c != id);
return false;
}
true
});
}
fn check_relays_available(
&mut self,
allocations: &BTreeMap<RId, Allocation>,
pending_events: &mut VecDeque<Event<TId>>,
rng: &mut impl Rng,
) {
for (_, c) in self.iter_initial_mut() {
if allocations.contains_key(&c.relay) {
continue;
}
let Some(new_rid) = allocations.keys().copied().choose(rng) else {
continue;
};
tracing::info!(old_rid = ?c.relay, %new_rid, "Updating relay");
c.relay = new_rid;
}
for (cid, c) in self.iter_established_mut() {
if allocations.contains_key(&c.relay.id) {
continue; // Our relay is still there, no problems.
}
let Some((rid, allocation)) = allocations.iter().choose(rng) else {
if !c.relay.logged_sample_failure {
tracing::debug!(%cid, "Failed to sample new relay for connection");
}
c.relay.logged_sample_failure = true;
continue;
};
tracing::info!(%cid, old = %c.relay.id, new = %rid, "Attempting to migrate connection to new relay");
c.relay.id = *rid;
for candidate in allocation.current_relay_candidates() {
add_local_candidate(cid, &mut c.agent, candidate, pending_events);
}
}
}
fn stats(&self) -> impl Iterator<Item = (TId, ConnectionStats)> + '_ {
self.established.iter().map(move |(id, c)| (*id, c.stats))
}
fn insert_established(
&mut self,
id: TId,
index: Index,
connection: Connection<RId>,
) -> Option<Connection<RId>> {
let existing = self.established.insert(id, connection);
// Remove previous mappings for connection.
self.established_by_wireguard_session_index
.retain(|_, c| c != &id);
self.established_by_wireguard_session_index
.insert(index.global(), id);
existing
}
fn agent_mut(&mut self, id: TId) -> Option<(&mut IceAgent, RId)> {
let maybe_initial_connection = self.initial.get_mut(&id).map(|i| (&mut i.agent, i.relay));
let maybe_established_connection = self
.established
.get_mut(&id)
.map(|c| (&mut c.agent, c.relay.id));
maybe_initial_connection.or(maybe_established_connection)
}
fn agents_by_relay_mut(&mut self, id: RId) -> impl Iterator<Item = (TId, &mut IceAgent)> + '_ {
let initial_connections = self
.initial
.iter_mut()
.filter_map(move |(cid, i)| (i.relay == id).then_some((*cid, &mut i.agent)));
let established_connections = self
.established
.iter_mut()
.filter_map(move |(cid, c)| (c.relay.id == id).then_some((*cid, &mut c.agent)));
initial_connections.chain(established_connections)
}
fn agents_mut(&mut self) -> impl Iterator<Item = (TId, &mut IceAgent)> {
let initial_agents = self.initial.iter_mut().map(|(id, c)| (*id, &mut c.agent));
let negotiated_agents = self
.established
.iter_mut()
.map(|(id, c)| (*id, &mut c.agent));
initial_agents.chain(negotiated_agents)
}
fn get_established_mut(&mut self, id: &TId) -> Option<&mut Connection<RId>> {
self.established.get_mut(id)
}
fn get_established_mut_session_index(
&mut self,
index: Index,
) -> Result<(TId, &mut Connection<RId>)> {
let id = self
.established_by_wireguard_session_index
.get(&index.global())
.with_context(|| format!("No connection for index {}", index.global()))?;
let connection = self
.established
.get_mut(id)
.with_context(|| format!("No connection for ID {id}"))?;
Ok((*id, connection))
}
fn get_established_mut_by_public_key(
&mut self,
key: [u8; 32],
) -> Result<(TId, &mut Connection<RId>)> {
let (id, conn) = self
.established
.iter_mut()
.find(|(_, c)| c.tunnel.remote_static_public().as_bytes() == &key)
.with_context(|| format!("No connection for public key {}", into_u256(key)))?;
Ok((*id, conn))
}
fn iter_initial_mut(&mut self) -> impl Iterator<Item = (TId, &mut InitialConnection<RId>)> {
self.initial.iter_mut().map(|(id, conn)| (*id, conn))
}
fn iter_established(&self) -> impl Iterator<Item = (TId, &Connection<RId>)> {
self.established.iter().map(|(id, conn)| (*id, conn))
}
fn iter_established_mut(&mut self) -> impl Iterator<Item = (TId, &mut Connection<RId>)> {
self.established.iter_mut().map(|(id, conn)| (*id, conn))
}
fn len(&self) -> usize {
self.initial.len() + self.established.len()
}
fn clear(&mut self) {
self.initial.clear();
self.established.clear();
self.established_by_wireguard_session_index.clear();
}
fn iter_ids(&self) -> impl Iterator<Item = TId> + '_ {
self.initial.keys().chain(self.established.keys()).copied()
}
fn all_idle(&self) -> bool {
self.established.values().all(|c| c.is_idle())
}
fn poll_timeout(&mut self) -> Option<(Instant, &'static str)> {
iter::empty()
.chain(self.initial.values_mut().filter_map(|c| c.poll_timeout()))
.chain(
self.established
.values_mut()
.filter_map(|c| c.poll_timeout()),
)
.min_by_key(|(instant, _)| *instant)
}
}
fn into_u256(key: [u8; 32]) -> bnum::BUint<4> {
// Note: `parse_str_radix` panics when the number is too big.
// We are passing 32 u8's though which fits exactly into a u256.
bnum::types::U256::parse_str_radix(&hex::encode(key), 16)
}
fn add_local_candidate<TId>(
id: TId,
agent: &mut IceAgent,
@@ -1610,10 +1392,6 @@ fn remove_local_candidate<TId>(
}
}
#[derive(thiserror::Error, Debug)]
#[error("Unknown connection: {0}")]
pub struct UnknownConnection(String);
#[deprecated]
pub struct Offer {
/// The Wireguard session key for a connection.
@@ -1754,9 +1532,12 @@ impl<RId> InitialConnection<RId> {
}
}
#[derive(derive_more::Debug)]
struct Connection<RId> {
agent: IceAgent,
index: Index,
#[debug(skip)]
tunnel: Tunn,
remote_pub_key: PublicKey,
/// When to next update the [`Tunn`]'s timers.
@@ -1777,15 +1558,18 @@ struct Connection<RId> {
buffer: Vec<u8>,
#[debug(skip)]
buffer_pool: BufferPool<Vec<u8>>,
}
#[derive(Debug)]
struct SelectedRelay<RId> {
id: RId,
/// Whether we've already logged failure to sample a new relay.
logged_sample_failure: bool,
}
#[derive(Debug)]
enum ConnectionState {
/// We are still running ICE to figure out, which socket to use to send data.
Connecting {
@@ -1954,7 +1738,7 @@ fn idle_at(last_activity: Instant) -> Instant {
}
/// The socket of the peer we are connected to.
#[derive(PartialEq, Clone, Copy)]
#[derive(PartialEq, Clone, Copy, Debug)]
enum PeerSocket {
PeerToPeer {
source: SocketAddr,
@@ -2767,20 +2551,4 @@ mod tests {
assert!(agent.remote_candidates().contains(&expected_candidate2));
assert!(!agent.remote_candidates().contains(&unexpected_candidate3));
}
#[test]
fn can_make_u256_out_of_byte_array() {
let bytes = random();
let _num = into_u256(bytes);
}
#[test]
fn u256_renders_as_int() {
let num = into_u256([1; 32]);
assert_eq!(
num.to_string(),
"454086624460063511464984254936031011189294057512315937409637584344757371137"
);
}
}

View File

@@ -0,0 +1,576 @@
use std::{
collections::{BTreeMap, HashMap, VecDeque},
fmt,
hash::Hash,
iter,
time::{Duration, Instant},
};
use anyhow::{Context as _, Result};
use boringtun::noise::Index;
use rand::{Rng, seq::IteratorRandom as _};
use str0m::ice::IceAgent;
use crate::{
ConnectionStats, Event,
allocation::Allocation,
node::{Connection, InitialConnection, add_local_candidate},
};
pub struct Connections<TId, RId> {
initial: BTreeMap<TId, InitialConnection<RId>>,
established: BTreeMap<TId, Connection<RId>>,
established_by_wireguard_session_index: BTreeMap<usize, TId>,
disconnected_ids: HashMap<TId, Instant>,
disconnected_public_keys: HashMap<[u8; 32], Instant>,
disconnected_session_indices: HashMap<usize, Instant>,
}
impl<TId, RId> Default for Connections<TId, RId> {
fn default() -> Self {
Self {
initial: Default::default(),
established: Default::default(),
established_by_wireguard_session_index: Default::default(),
disconnected_ids: Default::default(),
disconnected_public_keys: Default::default(),
disconnected_session_indices: Default::default(),
}
}
}
impl<TId, RId> Connections<TId, RId>
where
TId: Eq + Hash + Copy + Ord + fmt::Display,
RId: Copy + Eq + Hash + PartialEq + Ord + fmt::Debug + fmt::Display,
{
const RECENT_DISCONNECT_TIMEOUT: Duration = Duration::from_secs(5);
pub(crate) fn handle_timeout(&mut self, events: &mut VecDeque<Event<TId>>, now: Instant) {
self.initial.retain(|id, conn| {
if conn.is_failed {
events.push_back(Event::ConnectionFailed(*id));
return false;
}
true
});
self.established.retain(|id, conn| {
if conn.is_failed() {
events.push_back(Event::ConnectionFailed(*id));
self.established_by_wireguard_session_index
.retain(|index, c| {
if c == id {
self.disconnected_session_indices.insert(*index, now);
return false;
}
true
});
self.disconnected_public_keys
.insert(conn.tunnel.remote_static_public().to_bytes(), now);
self.disconnected_ids.insert(*id, now);
return false;
}
true
});
self.disconnected_ids
.retain(|_, v| now.duration_since(*v) < Self::RECENT_DISCONNECT_TIMEOUT);
self.disconnected_public_keys
.retain(|_, v| now.duration_since(*v) < Self::RECENT_DISCONNECT_TIMEOUT);
self.disconnected_session_indices
.retain(|_, v| now.duration_since(*v) < Self::RECENT_DISCONNECT_TIMEOUT);
}
pub(crate) fn remove_initial(&mut self, id: &TId) -> Option<InitialConnection<RId>> {
self.initial.remove(id)
}
pub(crate) fn remove_established(&mut self, id: &TId, now: Instant) -> Option<Connection<RId>> {
let connection = self.established.remove(id)?;
self.established_by_wireguard_session_index
.remove(&connection.index.global());
self.disconnected_ids.insert(*id, now);
self.disconnected_public_keys
.insert(connection.tunnel.remote_static_public().to_bytes(), now);
self.disconnected_session_indices
.insert(connection.index.global(), now);
Some(connection)
}
pub(crate) fn contains_initial(&self, id: &TId) -> bool {
self.initial.contains_key(id)
}
pub(crate) fn check_relays_available(
&mut self,
allocations: &BTreeMap<RId, Allocation>,
pending_events: &mut VecDeque<Event<TId>>,
rng: &mut impl Rng,
) {
for (_, c) in self.iter_initial_mut() {
if allocations.contains_key(&c.relay) {
continue;
}
let Some(new_rid) = allocations.keys().copied().choose(rng) else {
continue;
};
tracing::info!(old_rid = ?c.relay, %new_rid, "Updating relay");
c.relay = new_rid;
}
for (cid, c) in self.iter_established_mut() {
if allocations.contains_key(&c.relay.id) {
continue; // Our relay is still there, no problems.
}
let Some((rid, allocation)) = allocations.iter().choose(rng) else {
if !c.relay.logged_sample_failure {
tracing::debug!(%cid, "Failed to sample new relay for connection");
}
c.relay.logged_sample_failure = true;
continue;
};
tracing::info!(%cid, old = %c.relay.id, new = %rid, "Attempting to migrate connection to new relay");
c.relay.id = *rid;
for candidate in allocation.current_relay_candidates() {
add_local_candidate(cid, &mut c.agent, candidate, pending_events);
}
}
}
pub(crate) fn stats(&self) -> impl Iterator<Item = (TId, ConnectionStats)> + '_ {
self.established.iter().map(move |(id, c)| (*id, c.stats))
}
pub(crate) fn insert_initial(
&mut self,
id: TId,
connection: InitialConnection<RId>,
) -> Option<InitialConnection<RId>> {
self.initial.insert(id, connection)
}
pub(crate) fn insert_established(
&mut self,
id: TId,
index: Index,
connection: Connection<RId>,
) -> Option<Connection<RId>> {
let existing = self.established.insert(id, connection);
// Remove previous mappings for connection.
self.established_by_wireguard_session_index
.retain(|_, c| c != &id);
self.established_by_wireguard_session_index
.insert(index.global(), id);
existing
}
pub(crate) fn agent_mut(&mut self, id: TId) -> Option<(&mut IceAgent, RId)> {
let maybe_initial_connection = self.initial.get_mut(&id).map(|i| (&mut i.agent, i.relay));
let maybe_established_connection = self
.established
.get_mut(&id)
.map(|c| (&mut c.agent, c.relay.id));
maybe_initial_connection.or(maybe_established_connection)
}
pub(crate) fn agents_by_relay_mut(
&mut self,
id: RId,
) -> impl Iterator<Item = (TId, &mut IceAgent)> + '_ {
let initial_connections = self
.initial
.iter_mut()
.filter_map(move |(cid, i)| (i.relay == id).then_some((*cid, &mut i.agent)));
let established_connections = self
.established
.iter_mut()
.filter_map(move |(cid, c)| (c.relay.id == id).then_some((*cid, &mut c.agent)));
initial_connections.chain(established_connections)
}
pub(crate) fn agents_mut(&mut self) -> impl Iterator<Item = (TId, &mut IceAgent)> {
let initial_agents = self.initial.iter_mut().map(|(id, c)| (*id, &mut c.agent));
let negotiated_agents = self
.established
.iter_mut()
.map(|(id, c)| (*id, &mut c.agent));
initial_agents.chain(negotiated_agents)
}
pub(crate) fn get_established_mut(
&mut self,
id: &TId,
now: Instant,
) -> Result<&mut Connection<RId>> {
let connection = self
.established
.get_mut(id)
.context(UnknownConnection::by_id(*id, &self.disconnected_ids, now))?;
Ok(connection)
}
pub(crate) fn get_established_mut_session_index(
&mut self,
index: Index,
now: Instant,
) -> Result<(TId, &mut Connection<RId>)> {
let id = self
.established_by_wireguard_session_index
.get(&index.global())
.context(UnknownConnection::by_index(
index.global(),
&self.disconnected_session_indices,
now,
))?;
let connection = self
.established
.get_mut(id)
.context(UnknownConnection::by_id(*id, &self.disconnected_ids, now))?;
Ok((*id, connection))
}
pub(crate) fn get_established_mut_by_public_key(
&mut self,
key: [u8; 32],
now: Instant,
) -> Result<(TId, &mut Connection<RId>)> {
let (id, conn) = self
.established
.iter_mut()
.find(|(_, c)| c.tunnel.remote_static_public().as_bytes() == &key)
.context(UnknownConnection::by_public_key(
key,
&self.disconnected_public_keys,
now,
))?;
Ok((*id, conn))
}
pub(crate) fn iter_initial_mut(
&mut self,
) -> impl Iterator<Item = (TId, &mut InitialConnection<RId>)> {
self.initial.iter_mut().map(|(id, conn)| (*id, conn))
}
pub(crate) fn iter_established(&self) -> impl Iterator<Item = (TId, &Connection<RId>)> {
self.established.iter().map(|(id, conn)| (*id, conn))
}
pub(crate) fn iter_established_mut(
&mut self,
) -> impl Iterator<Item = (TId, &mut Connection<RId>)> {
self.established.iter_mut().map(|(id, conn)| (*id, conn))
}
pub(crate) fn len(&self) -> usize {
self.initial.len() + self.established.len()
}
pub(crate) fn clear(&mut self) {
self.initial.clear();
self.established.clear();
self.established_by_wireguard_session_index.clear();
}
pub(crate) fn iter_ids(&self) -> impl Iterator<Item = TId> + '_ {
self.initial.keys().chain(self.established.keys()).copied()
}
pub(crate) fn all_idle(&self) -> bool {
self.established.values().all(|c| c.is_idle())
}
pub(crate) fn poll_timeout(&mut self) -> Option<(Instant, &'static str)> {
iter::empty()
.chain(self.initial.values_mut().filter_map(|c| c.poll_timeout()))
.chain(
self.established
.values_mut()
.filter_map(|c| c.poll_timeout()),
)
.chain(
self.disconnected_ids
.values()
.map(|t| {
(
*t + Self::RECENT_DISCONNECT_TIMEOUT,
"recently disconnected IDs",
)
})
.min_by_key(|(t, _)| *t),
)
.chain(
self.disconnected_public_keys
.values()
.map(|t| {
(
*t + Self::RECENT_DISCONNECT_TIMEOUT,
"recently disconnected public keys",
)
})
.min_by_key(|(t, _)| *t),
)
.chain(
self.disconnected_session_indices
.values()
.map(|t| {
(
*t + Self::RECENT_DISCONNECT_TIMEOUT,
"recently disconnected session indices",
)
})
.min_by_key(|(t, _)| *t),
)
.min_by_key(|(instant, _)| *instant)
}
}
#[derive(Debug)]
pub struct UnknownConnection {
kind: &'static str,
id: String,
disconnected_for: Option<Duration>,
}
impl UnknownConnection {
fn by_id<TId>(id: TId, disconnected_ids: &HashMap<TId, Instant>, now: Instant) -> Self
where
TId: fmt::Display + Eq + Hash,
{
Self {
id: id.to_string(),
kind: "id",
disconnected_for: disconnected_ids
.get(&id)
.map(|disconnected| now.duration_since(*disconnected)),
}
}
fn by_index(id: usize, disconnected_indices: &HashMap<usize, Instant>, now: Instant) -> Self {
Self {
id: id.to_string(),
kind: "index",
disconnected_for: disconnected_indices
.get(&id)
.map(|disconnected| now.duration_since(*disconnected)),
}
}
fn by_public_key(
key: [u8; 32],
disconnected_public_keys: &HashMap<[u8; 32], Instant>,
now: Instant,
) -> Self {
Self {
id: into_u256(key).to_string(),
kind: "public key",
disconnected_for: disconnected_public_keys
.get(&key)
.map(|disconnected| now.duration_since(*disconnected)),
}
}
pub fn recently_disconnected(&self) -> bool {
self.disconnected_for.is_some()
}
}
impl std::error::Error for UnknownConnection {}
impl fmt::Display for UnknownConnection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "No connection for {} {}", self.kind, self.id)?;
if let Some(disconnected_for) = self.disconnected_for {
write!(f, " (disconnected for {disconnected_for:?})")?;
}
Ok(())
}
}
fn into_u256(key: [u8; 32]) -> bnum::BUint<4> {
// Note: `parse_str_radix` panics when the number is too big.
// We are passing 32 u8's though which fits exactly into a u256.
bnum::types::U256::parse_str_radix(&hex::encode(key), 16)
}
#[cfg(test)]
mod tests {
use boringtun::{
noise::Tunn,
x25519::{PublicKey, StaticSecret},
};
use bufferpool::BufferPool;
use rand::random;
use ringbuffer::AllocRingBuffer;
use crate::node::{ConnectionState, SelectedRelay};
use super::*;
#[test]
fn explicitly_removed_connection() {
let mut connections = Connections::default();
let mut now = Instant::now();
let (id, idx, key) = insert_dummy_connection(&mut connections);
connections.remove_established(&id, now);
connections.handle_timeout(&mut VecDeque::default(), now);
now += Duration::from_secs(1);
assert_disconnected(&mut connections, id, idx, key, now, true);
now += Duration::from_secs(5);
connections.handle_timeout(&mut VecDeque::default(), now);
assert_disconnected(&mut connections, id, idx, key, now, false);
}
#[test]
fn failed_connection() {
let mut connections = Connections::default();
let mut now = Instant::now();
let (id, idx, key) = insert_dummy_connection(&mut connections);
connections.get_established_mut(&id, now).unwrap().state = ConnectionState::Failed;
connections.handle_timeout(&mut VecDeque::default(), now);
now += Duration::from_secs(1);
assert_disconnected(&mut connections, id, idx, key, now, true);
now += Duration::from_secs(5);
connections.handle_timeout(&mut VecDeque::default(), now);
assert_disconnected(&mut connections, id, idx, key, now, false);
}
fn insert_dummy_connection(connections: &mut Connections<u32, u32>) -> (u32, Index, PublicKey) {
let conn = new_connection(12345, [1u8; 32]);
let id = 1;
let idx = conn.index;
let key = conn.tunnel.remote_static_public();
connections.insert_established(id, conn.index, conn);
(id, idx, key)
}
fn assert_disconnected(
connections: &mut Connections<u32, u32>,
id: u32,
idx: Index,
key: PublicKey,
now: Instant,
is_recently_disconnected: bool,
) {
// Get by ID
let err = connections
.get_established_mut(&id, now)
.unwrap_err()
.downcast::<UnknownConnection>()
.unwrap();
assert_eq!(err.recently_disconnected(), is_recently_disconnected);
// Get by index
let err = connections
.get_established_mut_session_index(idx, now)
.unwrap_err()
.downcast::<UnknownConnection>()
.unwrap();
assert_eq!(err.recently_disconnected(), is_recently_disconnected);
// Get by key
let err = connections
.get_established_mut_by_public_key(key.to_bytes(), now)
.unwrap_err()
.downcast::<UnknownConnection>()
.unwrap();
assert_eq!(err.recently_disconnected(), is_recently_disconnected);
}
fn new_connection(idx: u32, key: [u8; 32]) -> Connection<u32> {
let private = StaticSecret::random_from_rng(rand::thread_rng());
let new_local = Index::new_local(idx);
Connection {
agent: IceAgent::new(),
index: new_local,
tunnel: Tunn::new_at(
private,
PublicKey::from(key),
None,
None,
new_local,
None,
0,
Instant::now(),
),
remote_pub_key: PublicKey::from(rand::random::<[u8; 32]>()),
next_wg_timer_update: Instant::now(),
last_proactive_handshake_sent_at: None,
relay: SelectedRelay {
id: 0,
logged_sample_failure: false,
},
state: crate::node::ConnectionState::Connecting {
wg_buffer: AllocRingBuffer::new(1),
ip_buffer: AllocRingBuffer::new(1),
},
disconnected_at: None,
stats: Default::default(),
intent_sent_at: Instant::now(),
signalling_completed_at: Instant::now(),
first_handshake_completed_at: None,
buffer: Default::default(),
buffer_pool: BufferPool::new(0, "test"),
}
}
#[test]
fn can_make_u256_out_of_byte_array() {
let bytes = random();
let _num = into_u256(bytes);
}
#[test]
fn u256_renders_as_int() {
let num = into_u256([1; 32]);
assert_eq!(
num.to_string(),
"454086624460063511464984254936031011189294057512315937409637584344757371137"
);
}
}

View File

@@ -452,7 +452,7 @@ impl ClientState {
}
}
p2p_control::GOODBYE_EVENT => {
self.node.remove_connection(gid, "received `goodbye`");
self.node.remove_connection(gid, "received `goodbye`", now);
self.cleanup_connected_gateway(&gid);
}
code => {

View File

@@ -159,7 +159,7 @@ impl GatewayState {
}
p2p_control::GOODBYE_EVENT => {
self.peers.remove(&cid);
self.node.remove_connection(cid, "received `goodbye`");
self.node.remove_connection(cid, "received `goodbye`", now);
None
}

View File

@@ -301,12 +301,6 @@ impl Eventloop {
return Ok(());
}
// Unknown connection just means packets are bouncing on the TUN device because the Client disconnected.
if e.root_cause().is::<snownet::UnknownConnection>() {
tracing::debug!("{e:#}");
return Ok(());
}
if e.root_cause()
.downcast_ref::<io::Error>()
.is_some_and(|e| e.kind() == io::ErrorKind::PermissionDenied)
@@ -326,6 +320,13 @@ impl Eventloop {
return Ok(());
}
if e.downcast_ref::<snownet::UnknownConnection>()
.is_some_and(|e| e.recently_disconnected())
{
tracing::debug!("{e:#}");
continue;
}
if e.root_cause()
.is::<firezone_tunnel::UdpSocketThreadStopped>()
{