From ff7f2de7d97ff3cf799e40d2521f30cec0717c8a Mon Sep 17 00:00:00 2001 From: Gabi Date: Tue, 29 Aug 2023 14:17:53 -0300 Subject: [PATCH] connlib: improve logging and detect channel close earlier (#1964) This detects earlier when a channel disconnects and closes the peer but more importantly it prints periodically some stats about the tunnel that will make debugging easier. --- rust/connlib/libs/client/src/control.rs | 3 +- rust/connlib/libs/common/src/messages/key.rs | 7 ++ rust/connlib/libs/gateway/src/control.rs | 3 +- .../libs/tunnel/src/control_protocol.rs | 79 +++++++++++++++++-- rust/connlib/libs/tunnel/src/lib.rs | 65 ++++++++++++++- rust/connlib/libs/tunnel/src/peer.rs | 31 ++++++++ .../connlib/libs/tunnel/src/resource_table.rs | 17 ++++ 7 files changed, 190 insertions(+), 15 deletions(-) diff --git a/rust/connlib/libs/client/src/control.rs b/rust/connlib/libs/client/src/control.rs index f60b6465a..857322993 100644 --- a/rust/connlib/libs/client/src/control.rs +++ b/rust/connlib/libs/client/src/control.rs @@ -230,9 +230,8 @@ impl ControlPlane { } } - #[tracing::instrument(level = "trace", skip(self))] pub(super) async fn stats_event(&mut self) { - // TODO + tracing::debug!(target: "tunnel_state", "{:#?}", self.tunnel.stats()); } } diff --git a/rust/connlib/libs/common/src/messages/key.rs b/rust/connlib/libs/common/src/messages/key.rs index 93275c8f9..873ab45d0 100644 --- a/rust/connlib/libs/common/src/messages/key.rs +++ b/rust/connlib/libs/common/src/messages/key.rs @@ -1,4 +1,5 @@ use base64::{display::Base64Display, engine::general_purpose::STANDARD, Engine}; +use boringtun::x25519::PublicKey; use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; use std::{fmt, str::FromStr}; @@ -37,6 +38,12 @@ impl FromStr for Key { } } +impl From for Key { + fn from(value: PublicKey) -> Self { + Self(value.to_bytes()) + } +} + impl fmt::Display for Key { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}", Base64Display::new(&self.0, &STANDARD)) diff --git a/rust/connlib/libs/gateway/src/control.rs b/rust/connlib/libs/gateway/src/control.rs index 343c72185..05bd1f9a5 100644 --- a/rust/connlib/libs/gateway/src/control.rs +++ b/rust/connlib/libs/gateway/src/control.rs @@ -134,9 +134,8 @@ impl ControlPlane { Ok(()) } - #[tracing::instrument(level = "trace", skip(self))] pub(super) async fn stats_event(&mut self) { - tracing::debug!("TODO: STATS EVENT"); + tracing::debug!(target: "tunnel_state", "{:#?}", self.tunnel.stats()); } } diff --git a/rust/connlib/libs/tunnel/src/control_protocol.rs b/rust/connlib/libs/tunnel/src/control_protocol.rs index dda282d01..dd03003fe 100644 --- a/rust/connlib/libs/tunnel/src/control_protocol.rs +++ b/rust/connlib/libs/tunnel/src/control_protocol.rs @@ -84,6 +84,20 @@ where } } + if let Some(conn) = self.peer_connections.lock().get(&conn_id) { + self.set_connection_state_with_peer(conn, index, conn_id) + } + + data_channel.on_close({ + let tunnel = Arc::clone(self); + Box::new(move || { + tracing::debug!("channel closed"); + let tunnel = tunnel.clone(); + Box::pin(async move { + tunnel.stop_peer(index, conn_id).await; + }) + }) + }); self.start_peer_handler(peer)?; Ok(()) } @@ -163,7 +177,7 @@ where state: RTCPeerConnectionState, client_id: Id, ) { - tracing::trace!("Peer Connection State has changed: {state}"); + tracing::trace!(?state, "Peer Connection State has changed"); if state == RTCPeerConnectionState::Failed { self.peer_connections.lock().remove(&client_id); tracing::warn!("Peer Connection has gone to failed exiting"); @@ -187,6 +201,40 @@ where )); } + #[tracing::instrument(level = "trace", skip(self))] + async fn handle_connection_state_update_with_peer( + self: &Arc, + state: RTCPeerConnectionState, + index: u32, + conn_id: Id, + ) { + tracing::trace!(?state, "Peer Connection State has changed"); + if state == RTCPeerConnectionState::Failed { + self.stop_peer(index, conn_id).await; + tracing::warn!("Peer Connection has gone to failed exiting"); + } + } + + #[tracing::instrument(level = "trace", skip(self))] + fn set_connection_state_with_peer( + self: &Arc, + peer_connection: &Arc, + index: u32, + conn_id: Id, + ) { + let tunnel = Arc::clone(self); + peer_connection.on_peer_connection_state_change(Box::new( + move |state: RTCPeerConnectionState| { + let tunnel = Arc::clone(&tunnel); + Box::pin(async move { + tunnel + .handle_connection_state_update_with_peer(state, index, conn_id) + .await + }) + }, + )); + } + /// Initiate an ice connection request. /// /// Given a resource id and a list of relay creates a [RequestConnection] @@ -258,13 +306,20 @@ where let p_key = preshared_key.clone(); data_channel.on_open(Box::new(move || { Box::pin(async move { - tracing::trace!("new data channel opened!"); + tracing::trace!("new data channel opened!"); let index = tunnel.next_index(); - let Some(gateway_public_key) = tunnel.gateway_public_keys.lock().remove(&gateway_id) else { + let Some(gateway_public_key) = + tunnel.gateway_public_keys.lock().remove(&gateway_id) + else { tunnel.awaiting_connection.lock().remove(&resource_id); tunnel.peer_connections.lock().remove(&gateway_id); - tunnel.gateway_awaiting_connection.lock().remove(&gateway_id); - tracing::warn!("Opened ICE channel with gateway without ever receiving public key"); + tunnel + .gateway_awaiting_connection + .lock() + .remove(&gateway_id); + tracing::warn!( + "Opened ICE channel with gateway without ever receiving public key" + ); let _ = tunnel.callbacks.on_error(&Error::ControlProtocolError); return; }; @@ -275,11 +330,19 @@ where preshared_key: p_key, }; - if let Err(e) = tunnel.handle_channel_open(d, index, peer_config, gateway_id, None).await { - tracing::error!("Couldn't establish wireguard link after channel was opened: {e}"); + if let Err(e) = tunnel + .handle_channel_open(d, index, peer_config, gateway_id, None) + .await + { + tracing::error!( + "Couldn't establish wireguard link after channel was opened: {e}" + ); let _ = tunnel.callbacks.on_error(&e); tunnel.peer_connections.lock().remove(&gateway_id); - tunnel.gateway_awaiting_connection.lock().remove(&gateway_id); + tunnel + .gateway_awaiting_connection + .lock() + .remove(&gateway_id); } tunnel.awaiting_connection.lock().remove(&resource_id); }) diff --git a/rust/connlib/libs/tunnel/src/lib.rs b/rust/connlib/libs/tunnel/src/lib.rs index 60067e2d8..742663780 100644 --- a/rust/connlib/libs/tunnel/src/lib.rs +++ b/rust/connlib/libs/tunnel/src/lib.rs @@ -11,13 +11,13 @@ use boringtun::{ }; use ip_network::IpNetwork; use ip_network_table::IpNetworkTable; -use libs_common::{Callbacks, Error, DNS_SENTINEL}; +use libs_common::{messages::Key, Callbacks, Error, DNS_SENTINEL}; use async_trait::async_trait; use bytes::Bytes; use itertools::Itertools; use parking_lot::{Mutex, RwLock}; -use peer::Peer; +use peer::{Peer, PeerStats}; use resource_table::ResourceTable; use tokio::time::MissedTickBehavior; use webrtc::{ @@ -153,6 +153,61 @@ pub struct Tunnel { callbacks: CallbackErrorFacade, } +// TODO: For now we only use these fields with debug +#[allow(dead_code)] +#[derive(Debug, Clone)] +pub struct TunnelStats { + public_key: String, + peers_by_ip: HashMap, + peer_connections: Vec, + awaiting_connection: HashSet, + gateway_awaiting_connection: HashMap>, + resource_gateways: HashMap, + dns_resources: HashMap, + network_resources: HashMap, + gateway_public_keys: HashMap, +} + +impl Tunnel +where + C: ControlSignal + Send + Sync + 'static, + CB: Callbacks + 'static, +{ + pub fn stats(&self) -> TunnelStats { + let peers_by_ip = self + .peers_by_ip + .read() + .iter() + .map(|(ip, peer)| (ip, peer.stats())) + .collect(); + let peer_connections = self.peer_connections.lock().keys().cloned().collect(); + let awaiting_connection = self.awaiting_connection.lock().clone(); + let gateway_awaiting_connection = self.gateway_awaiting_connection.lock().clone(); + let resource_gateways = self.resources_gateways.lock().clone(); + let (network_resources, dns_resources) = { + let resources = self.resources.read(); + (resources.network_resources(), resources.dns_resources()) + }; + let gateway_public_keys = self + .gateway_public_keys + .lock() + .iter() + .map(|(&id, &k)| (id, Key::from(k).to_string())) + .collect(); + TunnelStats { + public_key: Key::from(self.public_key).to_string(), + peers_by_ip, + peer_connections, + awaiting_connection, + gateway_awaiting_connection, + resource_gateways, + dns_resources, + network_resources, + gateway_public_keys, + } + } +} + impl Tunnel where C: ControlSignal + Send + Sync + 'static, @@ -275,12 +330,13 @@ where Ok(()) } + #[tracing::instrument(level = "trace", skip(self))] async fn stop_peer(&self, index: u32, conn_id: Id) { self.peers_by_ip.write().retain(|_, p| p.index != index); let conn = self.peer_connections.lock().remove(&conn_id); if let Some(conn) = conn { if let Err(e) = conn.close().await { - tracing::error!("Problem while trying to close channel: {e:?}"); + tracing::warn!(error = ?e, "Can't close peer"); let _ = self.callbacks().on_error(&e.into()); } } @@ -478,6 +534,9 @@ where } } } + let peer_stats = peer.stats(); + tracing::debug!(peer = ?peer_stats, "Peer stopped"); + tunnel.stop_peer(peer.index, peer.conn_id).await; }); Ok(()) diff --git a/rust/connlib/libs/tunnel/src/peer.rs b/rust/connlib/libs/tunnel/src/peer.rs index ffe107c30..6022e0cd1 100644 --- a/rust/connlib/libs/tunnel/src/peer.rs +++ b/rust/connlib/libs/tunnel/src/peer.rs @@ -37,7 +37,38 @@ pub(crate) struct Peer { pub translated_resource_addresses: RwLock>, } +// TODO: For now we only use these fields with debug +#[allow(dead_code)] +#[derive(Debug, Clone)] +pub(crate) struct PeerStats { + pub index: u32, + pub allowed_ips: Vec, + pub conn_id: Id, + pub dns_resources: HashMap, + pub network_resources: HashMap, + pub translated_resource_addresses: HashMap, +} + impl Peer { + pub(crate) fn stats(&self) -> PeerStats { + let (network_resources, dns_resources) = self.resources.as_ref().map_or_else( + || (HashMap::new(), HashMap::new()), + |resources| { + let resources = resources.read(); + (resources.network_resources(), resources.dns_resources()) + }, + ); + let allowed_ips = self.allowed_ips.read().iter().map(|(ip, _)| ip).collect(); + let translated_resource_addresses = self.translated_resource_addresses.read().clone(); + PeerStats { + index: self.index, + allowed_ips, + conn_id: self.conn_id, + dns_resources, + network_resources, + translated_resource_addresses, + } + } pub(crate) async fn send_infallible(&self, data: &[u8], callbacks: &CB) { if let Err(e) = self.channel.write(&Bytes::copy_from_slice(data)).await { tracing::error!("Couldn't send packet to connected peer: {e}"); diff --git a/rust/connlib/libs/tunnel/src/resource_table.rs b/rust/connlib/libs/tunnel/src/resource_table.rs index 969448dc9..a0f4847b9 100644 --- a/rust/connlib/libs/tunnel/src/resource_table.rs +++ b/rust/connlib/libs/tunnel/src/resource_table.rs @@ -2,6 +2,7 @@ use std::{collections::HashMap, net::IpAddr, ptr::NonNull}; use chrono::{DateTime, Utc}; +use ip_network::IpNetwork; use ip_network_table::IpNetworkTable; use libs_common::messages::{Id, ResourceDescription}; @@ -62,6 +63,22 @@ where self.id_table.values() } + pub fn network_resources(&self) -> HashMap { + // Safety: Due to internal consistency, since the value is stored the reference should be valid + self.network_table + .iter() + .map(|(wg_ip, res)| (wg_ip, unsafe { res.as_ref() }.clone())) + .collect() + } + + pub fn dns_resources(&self) -> HashMap { + // Safety: Due to internal consistency, since the value is stored the reference should be valid + self.dns_name + .iter() + .map(|(name, res)| (name.clone(), unsafe { res.as_ref() }.clone())) + .collect() + } + /// Tells you if it's empty pub fn is_empty(&self) -> bool { self.id_table.is_empty()