From 0629afce3ae29b95635814ee2d2decd32cd1c565 Mon Sep 17 00:00:00 2001 From: Gabi Date: Tue, 23 Jan 2024 21:17:50 -0300 Subject: [PATCH] connlib: make dns request in a new task without blocking peers (#3370) This required making `allow_access` `async` which is ugly, but we can fix it later like we did it with `set_peer_connection_request`, but doing this ASAP otherwise this would block the `peers_by_ip` struct and also block the executor a bunch of times and slow everything down. Co-authored-by: Jamil --- rust/connlib/shared/src/error.rs | 3 + .../tunnel/src/control_protocol/gateway.rs | 70 +++++++++---------- rust/gateway/src/eventloop.rs | 22 +++--- 3 files changed, 50 insertions(+), 45 deletions(-) diff --git a/rust/connlib/shared/src/error.rs b/rust/connlib/shared/src/error.rs index 5a2aeffcd..d31dc42a9 100644 --- a/rust/connlib/shared/src/error.rs +++ b/rust/connlib/shared/src/error.rs @@ -2,6 +2,7 @@ use base64::{DecodeError, DecodeSliceError}; use boringtun::noise::errors::WireGuardError; use thiserror::Error; +use tokio::task::JoinError; /// Unified Result type to use across connlib. pub type Result = std::result::Result; @@ -155,6 +156,8 @@ pub enum ConnlibError { TooManyConnectionRequests, #[error("Channel connection closed by portal")] ClosedByPortal, + #[error(transparent)] + JoinError(#[from] JoinError), } impl ConnlibError { diff --git a/rust/connlib/tunnel/src/control_protocol/gateway.rs b/rust/connlib/tunnel/src/control_protocol/gateway.rs index 50386b414..acff30b7b 100644 --- a/rust/connlib/tunnel/src/control_protocol/gateway.rs +++ b/rust/connlib/tunnel/src/control_protocol/gateway.rs @@ -14,7 +14,7 @@ use connlib_shared::{ Callbacks, Dname, Error, Result, }; use ip_network::IpNetwork; -use std::{net::ToSocketAddrs, sync::Arc}; +use std::sync::Arc; use webrtc::ice_transport::{ ice_role::RTCIceRole, ice_transport_state::RTCIceTransportState, RTCIceTransport, }; @@ -101,9 +101,8 @@ where return Err(Error::InvalidResource); } - // TODO: we should make this async, this is acceptable for now though - // in the future we will use hickory-resolver for this anyways. - resolve_addresses(&domain.to_string())? + tokio::task::spawn_blocking(move || resolve_addresses(&domain.to_string())) + .await?? } ResourceDescription::Cidr(ref cidr) => vec![cidr.address], }; @@ -161,54 +160,53 @@ where }) } - pub fn allow_access( + pub async fn allow_access( &self, resource: ResourceDescription, client_id: ClientId, expires_at: Option>, domain: Option, ) -> Option { - if let Some((_, peer)) = self + let Some(peer) = self .role_state .lock() .peers_by_ip .iter_mut() - .find(|(_, p)| p.inner.conn_id == client_id) - { - let addresses = match &resource { - ResourceDescription::Dns(r) => { - let Some(domain) = domain.as_ref() else { - return None; - }; + .find_map(|(_, p)| (p.inner.conn_id == client_id).then_some(p.inner.clone())) + else { + return None; + }; - if !is_subdomain(domain, &r.address) { - return None; - } + let addresses = match &resource { + ResourceDescription::Dns(r) => { + let Some(domain) = domain.clone() else { + return None; + }; - (domain.to_string(), 0) - .to_socket_addrs() - .ok()? - .map(|a| a.ip()) - .map(Into::into) - .collect() + if !is_subdomain(&domain, &r.address) { + return None; } - ResourceDescription::Cidr(cidr) => vec![cidr.address], - }; - for address in &addresses { - peer.inner - .transform - .add_resource(*address, resource.clone(), expires_at); + tokio::task::spawn_blocking(move || resolve_addresses(&domain.to_string())) + .await + .ok()? + .ok()? } + ResourceDescription::Cidr(cidr) => vec![cidr.address], + }; - if let Some(domain) = domain { - return Some(ResourceAccepted { - domain_response: DomainResponse { - domain, - address: addresses.iter().map(|i| i.network_address()).collect(), - }, - }); - } + for address in &addresses { + peer.transform + .add_resource(*address, resource.clone(), expires_at); + } + + if let Some(domain) = domain { + return Some(ResourceAccepted { + domain_response: DomainResponse { + domain, + address: addresses.iter().map(|i| i.network_address()).collect(), + }, + }); } None diff --git a/rust/gateway/src/eventloop.rs b/rust/gateway/src/eventloop.rs index 7f852818e..89ba1513e 100644 --- a/rust/gateway/src/eventloop.rs +++ b/rust/gateway/src/eventloop.rs @@ -169,20 +169,24 @@ impl Eventloop { }))) => { tracing::debug!(client = %client_id, resource = %resource.id(), expires = ?expires_at.map(|e| e.to_rfc3339()), "Allowing access to resource"); - if let Some(res) = self - .tunnel - .allow_access(resource, client_id, expires_at, payload) - { - let sender = self.portal_sender.clone(); - tokio::spawn(async move { - sender + let tunnel = Arc::clone(&self.tunnel); + let sender = self.portal_sender.clone(); + tokio::spawn(async move { + if let Some(res) = tunnel + .allow_access(resource, client_id, expires_at, payload) + .await + { + if let Err(e) = sender .send(EgressMessages::ConnectionReady(ConnectionReady { reference, gateway_payload: GatewayResponse::ResourceAccepted(res), })) .await - }); - } + { + tracing::warn!("Error while sending gateway response: {e:#?}"); + } + } + }); continue; } Poll::Ready(Some(IngressMessages::IceCandidates(ClientIceCandidates {