diff --git a/rust/connlib/snownet/src/node.rs b/rust/connlib/snownet/src/node.rs index 48ffc653c..c0b5b1be5 100644 --- a/rust/connlib/snownet/src/node.rs +++ b/rust/connlib/snownet/src/node.rs @@ -1,5 +1,10 @@ +mod connections; + +pub use connections::UnknownConnection; + use crate::allocation::{self, Allocation, RelaySocket, Socket}; use crate::index::IndexLfsr; +use crate::node::connections::Connections; use crate::stats::{ConnectionStats, NodeStats}; use crate::utils::channel_data_packet_buffer; use anyhow::{Context, Result, anyhow}; @@ -16,7 +21,7 @@ use ip_packet::{Ecn, IpPacket, IpPacketBuf}; use itertools::Itertools; use rand::rngs::StdRng; use rand::seq::IteratorRandom; -use rand::{Rng, RngCore, SeedableRng, random}; +use rand::{RngCore, SeedableRng, random}; use ringbuffer::{AllocRingBuffer, RingBuffer as _}; use secrecy::{ExposeSecret, Secret}; use sha2::Digest; @@ -236,7 +241,7 @@ where let local_creds = local_creds.into(); let remote_creds = remote_creds.into(); - if self.connections.initial.contains_key(&cid) { + if self.connections.contains_initial(&cid) { debug_assert!( false, "The new `upsert_connection` API is incompatible with the previous `new_connection` API" @@ -256,7 +261,7 @@ where // Only if all of those things are the same, will: // - ICE be able to establish a connection // - boringtun be able to handshake a session - if let Some(c) = self.connections.get_established_mut(&cid) + if let Ok(c) = self.connections.get_established_mut(&cid, now) && c.agent.local_credentials() == &local_creds && c.agent .remote_credentials() @@ -290,7 +295,7 @@ where return Ok(()); } - let existing = self.connections.established.remove(&cid); + let existing = self.connections.remove_established(&cid, now); let index = self.index.next(); if let Some(existing) = existing { @@ -333,8 +338,8 @@ where /// Removes a connection by just clearing its local memory. #[tracing::instrument(level = "info", skip_all, fields(%cid))] - pub fn remove_connection(&mut self, cid: TId, reason: impl fmt::Display) { - let existing = self.connections.established.remove(&cid); + pub fn remove_connection(&mut self, cid: TId, reason: impl fmt::Display, now: Instant) { + let existing = self.connections.remove_established(&cid, now); if existing.is_none() { return; @@ -348,7 +353,7 @@ where /// If we are connected, sends the provided "goodbye" packet before discarding the connection. #[tracing::instrument(level = "info", skip_all, fields(%cid))] pub fn close_connection(&mut self, cid: TId, goodbye: IpPacket, now: Instant) { - let Some(mut connection) = self.connections.established.remove(&cid) else { + let Some(mut connection) = self.connections.remove_established(&cid, now) else { tracing::debug!("Cannot close unknown connection"); return; @@ -433,7 +438,7 @@ where allocation.bind_channel(candidate.addr(), now); - let Some(connection) = self.connections.get_established_mut(&cid) else { + let Ok(connection) = self.connections.get_established_mut(&cid, now) else { return; }; @@ -499,10 +504,7 @@ where packet: IpPacket, now: Instant, ) -> Result> { - let conn = self - .connections - .get_established_mut(&cid) - .context(UnknownConnection(cid.to_string()))?; + let conn = self.connections.get_established_mut(&cid, now)?; if self.mode.is_server() && !conn.state.has_nominated_socket() { tracing::debug!( @@ -589,7 +591,7 @@ where connection.handle_timeout(id, now, &mut self.allocations, &mut self.buffered_transmits); } - for (id, connection) in self.connections.initial.iter_mut() { + for (id, connection) in self.connections.iter_initial_mut() { connection.handle_timeout(id, now); } @@ -619,7 +621,8 @@ where &mut self.pending_events, &mut self.rng, ); - self.connections.gc(&mut self.pending_events); + self.connections + .handle_timeout(&mut self.pending_events, now); } /// Returns buffered data that needs to be sent on the socket. @@ -780,6 +783,7 @@ where Connection { agent, + index, tunnel, next_wg_timer_update: now, stats: Default::default(), @@ -958,7 +962,7 @@ where match self .connections - .get_established_mut_by_public_key(handshake.peer_static_public) + .get_established_mut_by_public_key(handshake.peer_static_public, now) { Ok(c) => c, Err(e) => return ControlFlow::Break(Err(e)), @@ -969,7 +973,7 @@ where | Packet::PacketCookieReply(PacketCookieReply { receiver_idx, .. }) | Packet::PacketData(PacketData { receiver_idx, .. }) => match self .connections - .get_established_mut_session_index(Index::from_peer(*receiver_idx)) + .get_established_mut_session_index(Index::from_peer(*receiver_idx), now) { Ok(c) => c, Err(e) => return ControlFlow::Break(Err(e)), @@ -1062,11 +1066,11 @@ where intent_sent_at: Instant, now: Instant, ) -> Result { - if self.connections.initial.remove(&cid).is_some() { + if self.connections.remove_initial(&cid).is_some() { tracing::info!("Replacing existing initial connection"); }; - if self.connections.established.remove(&cid).is_some() { + if self.connections.remove_established(&cid, now).is_some() { tracing::info!("Replacing existing established connection"); }; @@ -1094,7 +1098,7 @@ where }; let duration_since_intent = initial_connection.duration_since_intent(now); - let existing = self.connections.initial.insert(cid, initial_connection); + let existing = self.connections.insert_initial(cid, initial_connection); debug_assert!(existing.is_none()); tracing::info!(?duration_since_intent, "Establishing new connection"); @@ -1107,7 +1111,7 @@ where #[deprecated] #[expect(deprecated)] pub fn accept_answer(&mut self, cid: TId, remote: PublicKey, answer: Answer, now: Instant) { - let Some(initial) = self.connections.initial.remove(&cid) else { + let Some(initial) = self.connections.remove_initial(&cid) else { tracing::debug!("No initial connection state, ignoring answer"); // This can happen if the connection setup timed out. return; }; @@ -1170,11 +1174,11 @@ where now: Instant, ) -> Result { debug_assert!( - !self.connections.initial.contains_key(&cid), + !self.connections.contains_initial(&cid), "server to not use `initial_connections`" ); - if self.connections.established.remove(&cid).is_some() { + if self.connections.remove_established(&cid, now).is_some() { tracing::info!("Replacing existing established connection"); }; @@ -1313,228 +1317,6 @@ fn generate_optimistic_candidates(agent: &mut IceAgent) { } } -struct Connections { - initial: BTreeMap>, - established: BTreeMap>, - - established_by_wireguard_session_index: BTreeMap, -} - -impl Default for Connections { - fn default() -> Self { - Self { - initial: Default::default(), - established: Default::default(), - established_by_wireguard_session_index: Default::default(), - } - } -} - -impl Connections -where - TId: Eq + Hash + Copy + Ord + fmt::Display, - RId: Copy + Eq + Hash + PartialEq + Ord + fmt::Debug + fmt::Display, -{ - fn gc(&mut self, events: &mut VecDeque>) { - self.initial.retain(|id, conn| { - if conn.is_failed { - events.push_back(Event::ConnectionFailed(*id)); - return false; - } - - true - }); - - self.established.retain(|id, conn| { - if conn.is_failed() { - events.push_back(Event::ConnectionFailed(*id)); - self.established_by_wireguard_session_index - .retain(|_, c| c != id); - return false; - } - - true - }); - } - - fn check_relays_available( - &mut self, - allocations: &BTreeMap, - pending_events: &mut VecDeque>, - rng: &mut impl Rng, - ) { - for (_, c) in self.iter_initial_mut() { - if allocations.contains_key(&c.relay) { - continue; - } - - let Some(new_rid) = allocations.keys().copied().choose(rng) else { - continue; - }; - - tracing::info!(old_rid = ?c.relay, %new_rid, "Updating relay"); - c.relay = new_rid; - } - - for (cid, c) in self.iter_established_mut() { - if allocations.contains_key(&c.relay.id) { - continue; // Our relay is still there, no problems. - } - - let Some((rid, allocation)) = allocations.iter().choose(rng) else { - if !c.relay.logged_sample_failure { - tracing::debug!(%cid, "Failed to sample new relay for connection"); - } - c.relay.logged_sample_failure = true; - - continue; - }; - - tracing::info!(%cid, old = %c.relay.id, new = %rid, "Attempting to migrate connection to new relay"); - - c.relay.id = *rid; - - for candidate in allocation.current_relay_candidates() { - add_local_candidate(cid, &mut c.agent, candidate, pending_events); - } - } - } - - fn stats(&self) -> impl Iterator + '_ { - self.established.iter().map(move |(id, c)| (*id, c.stats)) - } - - fn insert_established( - &mut self, - id: TId, - index: Index, - connection: Connection, - ) -> Option> { - let existing = self.established.insert(id, connection); - - // Remove previous mappings for connection. - self.established_by_wireguard_session_index - .retain(|_, c| c != &id); - self.established_by_wireguard_session_index - .insert(index.global(), id); - - existing - } - - fn agent_mut(&mut self, id: TId) -> Option<(&mut IceAgent, RId)> { - let maybe_initial_connection = self.initial.get_mut(&id).map(|i| (&mut i.agent, i.relay)); - let maybe_established_connection = self - .established - .get_mut(&id) - .map(|c| (&mut c.agent, c.relay.id)); - - maybe_initial_connection.or(maybe_established_connection) - } - - fn agents_by_relay_mut(&mut self, id: RId) -> impl Iterator + '_ { - let initial_connections = self - .initial - .iter_mut() - .filter_map(move |(cid, i)| (i.relay == id).then_some((*cid, &mut i.agent))); - let established_connections = self - .established - .iter_mut() - .filter_map(move |(cid, c)| (c.relay.id == id).then_some((*cid, &mut c.agent))); - - initial_connections.chain(established_connections) - } - - fn agents_mut(&mut self) -> impl Iterator { - let initial_agents = self.initial.iter_mut().map(|(id, c)| (*id, &mut c.agent)); - let negotiated_agents = self - .established - .iter_mut() - .map(|(id, c)| (*id, &mut c.agent)); - - initial_agents.chain(negotiated_agents) - } - - fn get_established_mut(&mut self, id: &TId) -> Option<&mut Connection> { - self.established.get_mut(id) - } - - fn get_established_mut_session_index( - &mut self, - index: Index, - ) -> Result<(TId, &mut Connection)> { - let id = self - .established_by_wireguard_session_index - .get(&index.global()) - .with_context(|| format!("No connection for index {}", index.global()))?; - let connection = self - .established - .get_mut(id) - .with_context(|| format!("No connection for ID {id}"))?; - - Ok((*id, connection)) - } - - fn get_established_mut_by_public_key( - &mut self, - key: [u8; 32], - ) -> Result<(TId, &mut Connection)> { - let (id, conn) = self - .established - .iter_mut() - .find(|(_, c)| c.tunnel.remote_static_public().as_bytes() == &key) - .with_context(|| format!("No connection for public key {}", into_u256(key)))?; - - Ok((*id, conn)) - } - - fn iter_initial_mut(&mut self) -> impl Iterator)> { - self.initial.iter_mut().map(|(id, conn)| (*id, conn)) - } - - fn iter_established(&self) -> impl Iterator)> { - self.established.iter().map(|(id, conn)| (*id, conn)) - } - - fn iter_established_mut(&mut self) -> impl Iterator)> { - self.established.iter_mut().map(|(id, conn)| (*id, conn)) - } - - fn len(&self) -> usize { - self.initial.len() + self.established.len() - } - - fn clear(&mut self) { - self.initial.clear(); - self.established.clear(); - self.established_by_wireguard_session_index.clear(); - } - - fn iter_ids(&self) -> impl Iterator + '_ { - self.initial.keys().chain(self.established.keys()).copied() - } - - fn all_idle(&self) -> bool { - self.established.values().all(|c| c.is_idle()) - } - - fn poll_timeout(&mut self) -> Option<(Instant, &'static str)> { - iter::empty() - .chain(self.initial.values_mut().filter_map(|c| c.poll_timeout())) - .chain( - self.established - .values_mut() - .filter_map(|c| c.poll_timeout()), - ) - .min_by_key(|(instant, _)| *instant) - } -} - -fn into_u256(key: [u8; 32]) -> bnum::BUint<4> { - // Note: `parse_str_radix` panics when the number is too big. - // We are passing 32 u8's though which fits exactly into a u256. - bnum::types::U256::parse_str_radix(&hex::encode(key), 16) -} - fn add_local_candidate( id: TId, agent: &mut IceAgent, @@ -1610,10 +1392,6 @@ fn remove_local_candidate( } } -#[derive(thiserror::Error, Debug)] -#[error("Unknown connection: {0}")] -pub struct UnknownConnection(String); - #[deprecated] pub struct Offer { /// The Wireguard session key for a connection. @@ -1754,9 +1532,12 @@ impl InitialConnection { } } +#[derive(derive_more::Debug)] struct Connection { agent: IceAgent, + index: Index, + #[debug(skip)] tunnel: Tunn, remote_pub_key: PublicKey, /// When to next update the [`Tunn`]'s timers. @@ -1777,15 +1558,18 @@ struct Connection { buffer: Vec, + #[debug(skip)] buffer_pool: BufferPool>, } +#[derive(Debug)] struct SelectedRelay { id: RId, /// Whether we've already logged failure to sample a new relay. logged_sample_failure: bool, } +#[derive(Debug)] enum ConnectionState { /// We are still running ICE to figure out, which socket to use to send data. Connecting { @@ -1954,7 +1738,7 @@ fn idle_at(last_activity: Instant) -> Instant { } /// The socket of the peer we are connected to. -#[derive(PartialEq, Clone, Copy)] +#[derive(PartialEq, Clone, Copy, Debug)] enum PeerSocket { PeerToPeer { source: SocketAddr, @@ -2767,20 +2551,4 @@ mod tests { assert!(agent.remote_candidates().contains(&expected_candidate2)); assert!(!agent.remote_candidates().contains(&unexpected_candidate3)); } - - #[test] - fn can_make_u256_out_of_byte_array() { - let bytes = random(); - let _num = into_u256(bytes); - } - - #[test] - fn u256_renders_as_int() { - let num = into_u256([1; 32]); - - assert_eq!( - num.to_string(), - "454086624460063511464984254936031011189294057512315937409637584344757371137" - ); - } } diff --git a/rust/connlib/snownet/src/node/connections.rs b/rust/connlib/snownet/src/node/connections.rs new file mode 100644 index 000000000..cb84f013f --- /dev/null +++ b/rust/connlib/snownet/src/node/connections.rs @@ -0,0 +1,576 @@ +use std::{ + collections::{BTreeMap, HashMap, VecDeque}, + fmt, + hash::Hash, + iter, + time::{Duration, Instant}, +}; + +use anyhow::{Context as _, Result}; +use boringtun::noise::Index; +use rand::{Rng, seq::IteratorRandom as _}; +use str0m::ice::IceAgent; + +use crate::{ + ConnectionStats, Event, + allocation::Allocation, + node::{Connection, InitialConnection, add_local_candidate}, +}; + +pub struct Connections { + initial: BTreeMap>, + established: BTreeMap>, + + established_by_wireguard_session_index: BTreeMap, + + disconnected_ids: HashMap, + disconnected_public_keys: HashMap<[u8; 32], Instant>, + disconnected_session_indices: HashMap, +} + +impl Default for Connections { + fn default() -> Self { + Self { + initial: Default::default(), + established: Default::default(), + established_by_wireguard_session_index: Default::default(), + disconnected_ids: Default::default(), + disconnected_public_keys: Default::default(), + disconnected_session_indices: Default::default(), + } + } +} + +impl Connections +where + TId: Eq + Hash + Copy + Ord + fmt::Display, + RId: Copy + Eq + Hash + PartialEq + Ord + fmt::Debug + fmt::Display, +{ + const RECENT_DISCONNECT_TIMEOUT: Duration = Duration::from_secs(5); + + pub(crate) fn handle_timeout(&mut self, events: &mut VecDeque>, now: Instant) { + self.initial.retain(|id, conn| { + if conn.is_failed { + events.push_back(Event::ConnectionFailed(*id)); + return false; + } + + true + }); + + self.established.retain(|id, conn| { + if conn.is_failed() { + events.push_back(Event::ConnectionFailed(*id)); + self.established_by_wireguard_session_index + .retain(|index, c| { + if c == id { + self.disconnected_session_indices.insert(*index, now); + + return false; + } + + true + }); + self.disconnected_public_keys + .insert(conn.tunnel.remote_static_public().to_bytes(), now); + self.disconnected_ids.insert(*id, now); + return false; + } + + true + }); + + self.disconnected_ids + .retain(|_, v| now.duration_since(*v) < Self::RECENT_DISCONNECT_TIMEOUT); + self.disconnected_public_keys + .retain(|_, v| now.duration_since(*v) < Self::RECENT_DISCONNECT_TIMEOUT); + self.disconnected_session_indices + .retain(|_, v| now.duration_since(*v) < Self::RECENT_DISCONNECT_TIMEOUT); + } + + pub(crate) fn remove_initial(&mut self, id: &TId) -> Option> { + self.initial.remove(id) + } + + pub(crate) fn remove_established(&mut self, id: &TId, now: Instant) -> Option> { + let connection = self.established.remove(id)?; + + self.established_by_wireguard_session_index + .remove(&connection.index.global()); + + self.disconnected_ids.insert(*id, now); + self.disconnected_public_keys + .insert(connection.tunnel.remote_static_public().to_bytes(), now); + self.disconnected_session_indices + .insert(connection.index.global(), now); + + Some(connection) + } + + pub(crate) fn contains_initial(&self, id: &TId) -> bool { + self.initial.contains_key(id) + } + + pub(crate) fn check_relays_available( + &mut self, + allocations: &BTreeMap, + pending_events: &mut VecDeque>, + rng: &mut impl Rng, + ) { + for (_, c) in self.iter_initial_mut() { + if allocations.contains_key(&c.relay) { + continue; + } + + let Some(new_rid) = allocations.keys().copied().choose(rng) else { + continue; + }; + + tracing::info!(old_rid = ?c.relay, %new_rid, "Updating relay"); + c.relay = new_rid; + } + + for (cid, c) in self.iter_established_mut() { + if allocations.contains_key(&c.relay.id) { + continue; // Our relay is still there, no problems. + } + + let Some((rid, allocation)) = allocations.iter().choose(rng) else { + if !c.relay.logged_sample_failure { + tracing::debug!(%cid, "Failed to sample new relay for connection"); + } + c.relay.logged_sample_failure = true; + + continue; + }; + + tracing::info!(%cid, old = %c.relay.id, new = %rid, "Attempting to migrate connection to new relay"); + + c.relay.id = *rid; + + for candidate in allocation.current_relay_candidates() { + add_local_candidate(cid, &mut c.agent, candidate, pending_events); + } + } + } + + pub(crate) fn stats(&self) -> impl Iterator + '_ { + self.established.iter().map(move |(id, c)| (*id, c.stats)) + } + + pub(crate) fn insert_initial( + &mut self, + id: TId, + connection: InitialConnection, + ) -> Option> { + self.initial.insert(id, connection) + } + + pub(crate) fn insert_established( + &mut self, + id: TId, + index: Index, + connection: Connection, + ) -> Option> { + let existing = self.established.insert(id, connection); + + // Remove previous mappings for connection. + self.established_by_wireguard_session_index + .retain(|_, c| c != &id); + self.established_by_wireguard_session_index + .insert(index.global(), id); + + existing + } + + pub(crate) fn agent_mut(&mut self, id: TId) -> Option<(&mut IceAgent, RId)> { + let maybe_initial_connection = self.initial.get_mut(&id).map(|i| (&mut i.agent, i.relay)); + let maybe_established_connection = self + .established + .get_mut(&id) + .map(|c| (&mut c.agent, c.relay.id)); + + maybe_initial_connection.or(maybe_established_connection) + } + + pub(crate) fn agents_by_relay_mut( + &mut self, + id: RId, + ) -> impl Iterator + '_ { + let initial_connections = self + .initial + .iter_mut() + .filter_map(move |(cid, i)| (i.relay == id).then_some((*cid, &mut i.agent))); + let established_connections = self + .established + .iter_mut() + .filter_map(move |(cid, c)| (c.relay.id == id).then_some((*cid, &mut c.agent))); + + initial_connections.chain(established_connections) + } + + pub(crate) fn agents_mut(&mut self) -> impl Iterator { + let initial_agents = self.initial.iter_mut().map(|(id, c)| (*id, &mut c.agent)); + let negotiated_agents = self + .established + .iter_mut() + .map(|(id, c)| (*id, &mut c.agent)); + + initial_agents.chain(negotiated_agents) + } + + pub(crate) fn get_established_mut( + &mut self, + id: &TId, + now: Instant, + ) -> Result<&mut Connection> { + let connection = self + .established + .get_mut(id) + .context(UnknownConnection::by_id(*id, &self.disconnected_ids, now))?; + + Ok(connection) + } + + pub(crate) fn get_established_mut_session_index( + &mut self, + index: Index, + now: Instant, + ) -> Result<(TId, &mut Connection)> { + let id = self + .established_by_wireguard_session_index + .get(&index.global()) + .context(UnknownConnection::by_index( + index.global(), + &self.disconnected_session_indices, + now, + ))?; + + let connection = self + .established + .get_mut(id) + .context(UnknownConnection::by_id(*id, &self.disconnected_ids, now))?; + + Ok((*id, connection)) + } + + pub(crate) fn get_established_mut_by_public_key( + &mut self, + key: [u8; 32], + now: Instant, + ) -> Result<(TId, &mut Connection)> { + let (id, conn) = self + .established + .iter_mut() + .find(|(_, c)| c.tunnel.remote_static_public().as_bytes() == &key) + .context(UnknownConnection::by_public_key( + key, + &self.disconnected_public_keys, + now, + ))?; + + Ok((*id, conn)) + } + + pub(crate) fn iter_initial_mut( + &mut self, + ) -> impl Iterator)> { + self.initial.iter_mut().map(|(id, conn)| (*id, conn)) + } + + pub(crate) fn iter_established(&self) -> impl Iterator)> { + self.established.iter().map(|(id, conn)| (*id, conn)) + } + + pub(crate) fn iter_established_mut( + &mut self, + ) -> impl Iterator)> { + self.established.iter_mut().map(|(id, conn)| (*id, conn)) + } + + pub(crate) fn len(&self) -> usize { + self.initial.len() + self.established.len() + } + + pub(crate) fn clear(&mut self) { + self.initial.clear(); + self.established.clear(); + self.established_by_wireguard_session_index.clear(); + } + + pub(crate) fn iter_ids(&self) -> impl Iterator + '_ { + self.initial.keys().chain(self.established.keys()).copied() + } + + pub(crate) fn all_idle(&self) -> bool { + self.established.values().all(|c| c.is_idle()) + } + + pub(crate) fn poll_timeout(&mut self) -> Option<(Instant, &'static str)> { + iter::empty() + .chain(self.initial.values_mut().filter_map(|c| c.poll_timeout())) + .chain( + self.established + .values_mut() + .filter_map(|c| c.poll_timeout()), + ) + .chain( + self.disconnected_ids + .values() + .map(|t| { + ( + *t + Self::RECENT_DISCONNECT_TIMEOUT, + "recently disconnected IDs", + ) + }) + .min_by_key(|(t, _)| *t), + ) + .chain( + self.disconnected_public_keys + .values() + .map(|t| { + ( + *t + Self::RECENT_DISCONNECT_TIMEOUT, + "recently disconnected public keys", + ) + }) + .min_by_key(|(t, _)| *t), + ) + .chain( + self.disconnected_session_indices + .values() + .map(|t| { + ( + *t + Self::RECENT_DISCONNECT_TIMEOUT, + "recently disconnected session indices", + ) + }) + .min_by_key(|(t, _)| *t), + ) + .min_by_key(|(instant, _)| *instant) + } +} + +#[derive(Debug)] +pub struct UnknownConnection { + kind: &'static str, + id: String, + disconnected_for: Option, +} + +impl UnknownConnection { + fn by_id(id: TId, disconnected_ids: &HashMap, now: Instant) -> Self + where + TId: fmt::Display + Eq + Hash, + { + Self { + id: id.to_string(), + kind: "id", + disconnected_for: disconnected_ids + .get(&id) + .map(|disconnected| now.duration_since(*disconnected)), + } + } + + fn by_index(id: usize, disconnected_indices: &HashMap, now: Instant) -> Self { + Self { + id: id.to_string(), + kind: "index", + disconnected_for: disconnected_indices + .get(&id) + .map(|disconnected| now.duration_since(*disconnected)), + } + } + + fn by_public_key( + key: [u8; 32], + disconnected_public_keys: &HashMap<[u8; 32], Instant>, + now: Instant, + ) -> Self { + Self { + id: into_u256(key).to_string(), + kind: "public key", + disconnected_for: disconnected_public_keys + .get(&key) + .map(|disconnected| now.duration_since(*disconnected)), + } + } + + pub fn recently_disconnected(&self) -> bool { + self.disconnected_for.is_some() + } +} + +impl std::error::Error for UnknownConnection {} + +impl fmt::Display for UnknownConnection { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "No connection for {} {}", self.kind, self.id)?; + + if let Some(disconnected_for) = self.disconnected_for { + write!(f, " (disconnected for {disconnected_for:?})")?; + } + + Ok(()) + } +} + +fn into_u256(key: [u8; 32]) -> bnum::BUint<4> { + // Note: `parse_str_radix` panics when the number is too big. + // We are passing 32 u8's though which fits exactly into a u256. + bnum::types::U256::parse_str_radix(&hex::encode(key), 16) +} + +#[cfg(test)] +mod tests { + use boringtun::{ + noise::Tunn, + x25519::{PublicKey, StaticSecret}, + }; + use bufferpool::BufferPool; + use rand::random; + use ringbuffer::AllocRingBuffer; + + use crate::node::{ConnectionState, SelectedRelay}; + + use super::*; + + #[test] + fn explicitly_removed_connection() { + let mut connections = Connections::default(); + let mut now = Instant::now(); + + let (id, idx, key) = insert_dummy_connection(&mut connections); + + connections.remove_established(&id, now); + connections.handle_timeout(&mut VecDeque::default(), now); + + now += Duration::from_secs(1); + + assert_disconnected(&mut connections, id, idx, key, now, true); + + now += Duration::from_secs(5); + connections.handle_timeout(&mut VecDeque::default(), now); + + assert_disconnected(&mut connections, id, idx, key, now, false); + } + + #[test] + fn failed_connection() { + let mut connections = Connections::default(); + let mut now = Instant::now(); + + let (id, idx, key) = insert_dummy_connection(&mut connections); + + connections.get_established_mut(&id, now).unwrap().state = ConnectionState::Failed; + connections.handle_timeout(&mut VecDeque::default(), now); + now += Duration::from_secs(1); + + assert_disconnected(&mut connections, id, idx, key, now, true); + + now += Duration::from_secs(5); + connections.handle_timeout(&mut VecDeque::default(), now); + + assert_disconnected(&mut connections, id, idx, key, now, false); + } + + fn insert_dummy_connection(connections: &mut Connections) -> (u32, Index, PublicKey) { + let conn = new_connection(12345, [1u8; 32]); + let id = 1; + let idx = conn.index; + let key = conn.tunnel.remote_static_public(); + connections.insert_established(id, conn.index, conn); + + (id, idx, key) + } + + fn assert_disconnected( + connections: &mut Connections, + id: u32, + idx: Index, + key: PublicKey, + now: Instant, + is_recently_disconnected: bool, + ) { + // Get by ID + let err = connections + .get_established_mut(&id, now) + .unwrap_err() + .downcast::() + .unwrap(); + + assert_eq!(err.recently_disconnected(), is_recently_disconnected); + + // Get by index + let err = connections + .get_established_mut_session_index(idx, now) + .unwrap_err() + .downcast::() + .unwrap(); + + assert_eq!(err.recently_disconnected(), is_recently_disconnected); + + // Get by key + let err = connections + .get_established_mut_by_public_key(key.to_bytes(), now) + .unwrap_err() + .downcast::() + .unwrap(); + + assert_eq!(err.recently_disconnected(), is_recently_disconnected); + } + + fn new_connection(idx: u32, key: [u8; 32]) -> Connection { + let private = StaticSecret::random_from_rng(rand::thread_rng()); + let new_local = Index::new_local(idx); + + Connection { + agent: IceAgent::new(), + index: new_local, + tunnel: Tunn::new_at( + private, + PublicKey::from(key), + None, + None, + new_local, + None, + 0, + Instant::now(), + ), + remote_pub_key: PublicKey::from(rand::random::<[u8; 32]>()), + next_wg_timer_update: Instant::now(), + last_proactive_handshake_sent_at: None, + relay: SelectedRelay { + id: 0, + logged_sample_failure: false, + }, + state: crate::node::ConnectionState::Connecting { + wg_buffer: AllocRingBuffer::new(1), + ip_buffer: AllocRingBuffer::new(1), + }, + disconnected_at: None, + stats: Default::default(), + intent_sent_at: Instant::now(), + signalling_completed_at: Instant::now(), + first_handshake_completed_at: None, + buffer: Default::default(), + buffer_pool: BufferPool::new(0, "test"), + } + } + + #[test] + fn can_make_u256_out_of_byte_array() { + let bytes = random(); + let _num = into_u256(bytes); + } + + #[test] + fn u256_renders_as_int() { + let num = into_u256([1; 32]); + + assert_eq!( + num.to_string(), + "454086624460063511464984254936031011189294057512315937409637584344757371137" + ); + } +} diff --git a/rust/connlib/tunnel/src/client.rs b/rust/connlib/tunnel/src/client.rs index ddbee2794..f63acd479 100644 --- a/rust/connlib/tunnel/src/client.rs +++ b/rust/connlib/tunnel/src/client.rs @@ -452,7 +452,7 @@ impl ClientState { } } p2p_control::GOODBYE_EVENT => { - self.node.remove_connection(gid, "received `goodbye`"); + self.node.remove_connection(gid, "received `goodbye`", now); self.cleanup_connected_gateway(&gid); } code => { diff --git a/rust/connlib/tunnel/src/gateway.rs b/rust/connlib/tunnel/src/gateway.rs index ddf9b0e91..cbfe5bfbd 100644 --- a/rust/connlib/tunnel/src/gateway.rs +++ b/rust/connlib/tunnel/src/gateway.rs @@ -159,7 +159,7 @@ impl GatewayState { } p2p_control::GOODBYE_EVENT => { self.peers.remove(&cid); - self.node.remove_connection(cid, "received `goodbye`"); + self.node.remove_connection(cid, "received `goodbye`", now); None } diff --git a/rust/gateway/src/eventloop.rs b/rust/gateway/src/eventloop.rs index 4652de9de..9217d156d 100644 --- a/rust/gateway/src/eventloop.rs +++ b/rust/gateway/src/eventloop.rs @@ -301,12 +301,6 @@ impl Eventloop { return Ok(()); } - // Unknown connection just means packets are bouncing on the TUN device because the Client disconnected. - if e.root_cause().is::() { - tracing::debug!("{e:#}"); - return Ok(()); - } - if e.root_cause() .downcast_ref::() .is_some_and(|e| e.kind() == io::ErrorKind::PermissionDenied) @@ -326,6 +320,13 @@ impl Eventloop { return Ok(()); } + if e.downcast_ref::() + .is_some_and(|e| e.recently_disconnected()) + { + tracing::debug!("{e:#}"); + continue; + } + if e.root_cause() .is::() {