connlib: improve logging and detect channel close earlier (#1964)

This detects earlier when a channel disconnects and closes the peer but
more importantly it prints periodically some stats about the tunnel that
will make debugging easier.
This commit is contained in:
Gabi
2023-08-29 14:17:53 -03:00
committed by GitHub
parent 90ac18c273
commit ff7f2de7d9
7 changed files with 190 additions and 15 deletions

View File

@@ -230,9 +230,8 @@ impl<CB: Callbacks + 'static> ControlPlane<CB> {
}
}
#[tracing::instrument(level = "trace", skip(self))]
pub(super) async fn stats_event(&mut self) {
// TODO
tracing::debug!(target: "tunnel_state", "{:#?}", self.tunnel.stats());
}
}

View File

@@ -1,4 +1,5 @@
use base64::{display::Base64Display, engine::general_purpose::STANDARD, Engine};
use boringtun::x25519::PublicKey;
use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
use std::{fmt, str::FromStr};
@@ -37,6 +38,12 @@ impl FromStr for Key {
}
}
impl From<PublicKey> for Key {
fn from(value: PublicKey) -> Self {
Self(value.to_bytes())
}
}
impl fmt::Display for Key {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", Base64Display::new(&self.0, &STANDARD))

View File

@@ -134,9 +134,8 @@ impl<CB: Callbacks + 'static> ControlPlane<CB> {
Ok(())
}
#[tracing::instrument(level = "trace", skip(self))]
pub(super) async fn stats_event(&mut self) {
tracing::debug!("TODO: STATS EVENT");
tracing::debug!(target: "tunnel_state", "{:#?}", self.tunnel.stats());
}
}

View File

@@ -84,6 +84,20 @@ where
}
}
if let Some(conn) = self.peer_connections.lock().get(&conn_id) {
self.set_connection_state_with_peer(conn, index, conn_id)
}
data_channel.on_close({
let tunnel = Arc::clone(self);
Box::new(move || {
tracing::debug!("channel closed");
let tunnel = tunnel.clone();
Box::pin(async move {
tunnel.stop_peer(index, conn_id).await;
})
})
});
self.start_peer_handler(peer)?;
Ok(())
}
@@ -163,7 +177,7 @@ where
state: RTCPeerConnectionState,
client_id: Id,
) {
tracing::trace!("Peer Connection State has changed: {state}");
tracing::trace!(?state, "Peer Connection State has changed");
if state == RTCPeerConnectionState::Failed {
self.peer_connections.lock().remove(&client_id);
tracing::warn!("Peer Connection has gone to failed exiting");
@@ -187,6 +201,40 @@ where
));
}
#[tracing::instrument(level = "trace", skip(self))]
async fn handle_connection_state_update_with_peer(
self: &Arc<Self>,
state: RTCPeerConnectionState,
index: u32,
conn_id: Id,
) {
tracing::trace!(?state, "Peer Connection State has changed");
if state == RTCPeerConnectionState::Failed {
self.stop_peer(index, conn_id).await;
tracing::warn!("Peer Connection has gone to failed exiting");
}
}
#[tracing::instrument(level = "trace", skip(self))]
fn set_connection_state_with_peer(
self: &Arc<Self>,
peer_connection: &Arc<RTCPeerConnection>,
index: u32,
conn_id: Id,
) {
let tunnel = Arc::clone(self);
peer_connection.on_peer_connection_state_change(Box::new(
move |state: RTCPeerConnectionState| {
let tunnel = Arc::clone(&tunnel);
Box::pin(async move {
tunnel
.handle_connection_state_update_with_peer(state, index, conn_id)
.await
})
},
));
}
/// Initiate an ice connection request.
///
/// Given a resource id and a list of relay creates a [RequestConnection]
@@ -258,13 +306,20 @@ where
let p_key = preshared_key.clone();
data_channel.on_open(Box::new(move || {
Box::pin(async move {
tracing::trace!("new data channel opened!");
tracing::trace!("new data channel opened!");
let index = tunnel.next_index();
let Some(gateway_public_key) = tunnel.gateway_public_keys.lock().remove(&gateway_id) else {
let Some(gateway_public_key) =
tunnel.gateway_public_keys.lock().remove(&gateway_id)
else {
tunnel.awaiting_connection.lock().remove(&resource_id);
tunnel.peer_connections.lock().remove(&gateway_id);
tunnel.gateway_awaiting_connection.lock().remove(&gateway_id);
tracing::warn!("Opened ICE channel with gateway without ever receiving public key");
tunnel
.gateway_awaiting_connection
.lock()
.remove(&gateway_id);
tracing::warn!(
"Opened ICE channel with gateway without ever receiving public key"
);
let _ = tunnel.callbacks.on_error(&Error::ControlProtocolError);
return;
};
@@ -275,11 +330,19 @@ where
preshared_key: p_key,
};
if let Err(e) = tunnel.handle_channel_open(d, index, peer_config, gateway_id, None).await {
tracing::error!("Couldn't establish wireguard link after channel was opened: {e}");
if let Err(e) = tunnel
.handle_channel_open(d, index, peer_config, gateway_id, None)
.await
{
tracing::error!(
"Couldn't establish wireguard link after channel was opened: {e}"
);
let _ = tunnel.callbacks.on_error(&e);
tunnel.peer_connections.lock().remove(&gateway_id);
tunnel.gateway_awaiting_connection.lock().remove(&gateway_id);
tunnel
.gateway_awaiting_connection
.lock()
.remove(&gateway_id);
}
tunnel.awaiting_connection.lock().remove(&resource_id);
})

View File

@@ -11,13 +11,13 @@ use boringtun::{
};
use ip_network::IpNetwork;
use ip_network_table::IpNetworkTable;
use libs_common::{Callbacks, Error, DNS_SENTINEL};
use libs_common::{messages::Key, Callbacks, Error, DNS_SENTINEL};
use async_trait::async_trait;
use bytes::Bytes;
use itertools::Itertools;
use parking_lot::{Mutex, RwLock};
use peer::Peer;
use peer::{Peer, PeerStats};
use resource_table::ResourceTable;
use tokio::time::MissedTickBehavior;
use webrtc::{
@@ -153,6 +153,61 @@ pub struct Tunnel<C: ControlSignal, CB: Callbacks> {
callbacks: CallbackErrorFacade<CB>,
}
// TODO: For now we only use these fields with debug
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct TunnelStats {
public_key: String,
peers_by_ip: HashMap<IpNetwork, PeerStats>,
peer_connections: Vec<Id>,
awaiting_connection: HashSet<Id>,
gateway_awaiting_connection: HashMap<Id, Vec<IpNetwork>>,
resource_gateways: HashMap<Id, Id>,
dns_resources: HashMap<String, ResourceDescription>,
network_resources: HashMap<IpNetwork, ResourceDescription>,
gateway_public_keys: HashMap<Id, String>,
}
impl<C, CB> Tunnel<C, CB>
where
C: ControlSignal + Send + Sync + 'static,
CB: Callbacks + 'static,
{
pub fn stats(&self) -> TunnelStats {
let peers_by_ip = self
.peers_by_ip
.read()
.iter()
.map(|(ip, peer)| (ip, peer.stats()))
.collect();
let peer_connections = self.peer_connections.lock().keys().cloned().collect();
let awaiting_connection = self.awaiting_connection.lock().clone();
let gateway_awaiting_connection = self.gateway_awaiting_connection.lock().clone();
let resource_gateways = self.resources_gateways.lock().clone();
let (network_resources, dns_resources) = {
let resources = self.resources.read();
(resources.network_resources(), resources.dns_resources())
};
let gateway_public_keys = self
.gateway_public_keys
.lock()
.iter()
.map(|(&id, &k)| (id, Key::from(k).to_string()))
.collect();
TunnelStats {
public_key: Key::from(self.public_key).to_string(),
peers_by_ip,
peer_connections,
awaiting_connection,
gateway_awaiting_connection,
resource_gateways,
dns_resources,
network_resources,
gateway_public_keys,
}
}
}
impl<C, CB> Tunnel<C, CB>
where
C: ControlSignal + Send + Sync + 'static,
@@ -275,12 +330,13 @@ where
Ok(())
}
#[tracing::instrument(level = "trace", skip(self))]
async fn stop_peer(&self, index: u32, conn_id: Id) {
self.peers_by_ip.write().retain(|_, p| p.index != index);
let conn = self.peer_connections.lock().remove(&conn_id);
if let Some(conn) = conn {
if let Err(e) = conn.close().await {
tracing::error!("Problem while trying to close channel: {e:?}");
tracing::warn!(error = ?e, "Can't close peer");
let _ = self.callbacks().on_error(&e.into());
}
}
@@ -478,6 +534,9 @@ where
}
}
}
let peer_stats = peer.stats();
tracing::debug!(peer = ?peer_stats, "Peer stopped");
tunnel.stop_peer(peer.index, peer.conn_id).await;
});
Ok(())

View File

@@ -37,7 +37,38 @@ pub(crate) struct Peer {
pub translated_resource_addresses: RwLock<HashMap<IpAddr, Id>>,
}
// TODO: For now we only use these fields with debug
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub(crate) struct PeerStats {
pub index: u32,
pub allowed_ips: Vec<IpNetwork>,
pub conn_id: Id,
pub dns_resources: HashMap<String, ExpiryingResource>,
pub network_resources: HashMap<IpNetwork, ExpiryingResource>,
pub translated_resource_addresses: HashMap<IpAddr, Id>,
}
impl Peer {
pub(crate) fn stats(&self) -> PeerStats {
let (network_resources, dns_resources) = self.resources.as_ref().map_or_else(
|| (HashMap::new(), HashMap::new()),
|resources| {
let resources = resources.read();
(resources.network_resources(), resources.dns_resources())
},
);
let allowed_ips = self.allowed_ips.read().iter().map(|(ip, _)| ip).collect();
let translated_resource_addresses = self.translated_resource_addresses.read().clone();
PeerStats {
index: self.index,
allowed_ips,
conn_id: self.conn_id,
dns_resources,
network_resources,
translated_resource_addresses,
}
}
pub(crate) async fn send_infallible<CB: Callbacks>(&self, data: &[u8], callbacks: &CB) {
if let Err(e) = self.channel.write(&Bytes::copy_from_slice(data)).await {
tracing::error!("Couldn't send packet to connected peer: {e}");

View File

@@ -2,6 +2,7 @@
use std::{collections::HashMap, net::IpAddr, ptr::NonNull};
use chrono::{DateTime, Utc};
use ip_network::IpNetwork;
use ip_network_table::IpNetworkTable;
use libs_common::messages::{Id, ResourceDescription};
@@ -62,6 +63,22 @@ where
self.id_table.values()
}
pub fn network_resources(&self) -> HashMap<IpNetwork, T> {
// Safety: Due to internal consistency, since the value is stored the reference should be valid
self.network_table
.iter()
.map(|(wg_ip, res)| (wg_ip, unsafe { res.as_ref() }.clone()))
.collect()
}
pub fn dns_resources(&self) -> HashMap<String, T> {
// Safety: Due to internal consistency, since the value is stored the reference should be valid
self.dns_name
.iter()
.map(|(name, res)| (name.clone(), unsafe { res.as_ref() }.clone()))
.collect()
}
/// Tells you if it's empty
pub fn is_empty(&self) -> bool {
self.id_table.is_empty()