connlib: make dns request in a new task without blocking peers (#3370)

This required making `allow_access` `async` which is ugly, but we can
fix it later like we did it with `set_peer_connection_request`, but
doing this ASAP otherwise this would block the `peers_by_ip` struct and
also block the executor a bunch of times and slow everything down.

Co-authored-by: Jamil <jamilbk@users.noreply.github.com>
This commit is contained in:
Gabi
2024-01-23 21:17:50 -03:00
committed by GitHub
parent 2cc68c067f
commit 0629afce3a
3 changed files with 50 additions and 45 deletions

View File

@@ -2,6 +2,7 @@
use base64::{DecodeError, DecodeSliceError};
use boringtun::noise::errors::WireGuardError;
use thiserror::Error;
use tokio::task::JoinError;
/// Unified Result type to use across connlib.
pub type Result<T> = std::result::Result<T, ConnlibError>;
@@ -155,6 +156,8 @@ pub enum ConnlibError {
TooManyConnectionRequests,
#[error("Channel connection closed by portal")]
ClosedByPortal,
#[error(transparent)]
JoinError(#[from] JoinError),
}
impl ConnlibError {

View File

@@ -14,7 +14,7 @@ use connlib_shared::{
Callbacks, Dname, Error, Result,
};
use ip_network::IpNetwork;
use std::{net::ToSocketAddrs, sync::Arc};
use std::sync::Arc;
use webrtc::ice_transport::{
ice_role::RTCIceRole, ice_transport_state::RTCIceTransportState, RTCIceTransport,
};
@@ -101,9 +101,8 @@ where
return Err(Error::InvalidResource);
}
// TODO: we should make this async, this is acceptable for now though
// in the future we will use hickory-resolver for this anyways.
resolve_addresses(&domain.to_string())?
tokio::task::spawn_blocking(move || resolve_addresses(&domain.to_string()))
.await??
}
ResourceDescription::Cidr(ref cidr) => vec![cidr.address],
};
@@ -161,54 +160,53 @@ where
})
}
pub fn allow_access(
pub async fn allow_access(
&self,
resource: ResourceDescription,
client_id: ClientId,
expires_at: Option<DateTime<Utc>>,
domain: Option<Dname>,
) -> Option<ResourceAccepted> {
if let Some((_, peer)) = self
let Some(peer) = self
.role_state
.lock()
.peers_by_ip
.iter_mut()
.find(|(_, p)| p.inner.conn_id == client_id)
{
let addresses = match &resource {
ResourceDescription::Dns(r) => {
let Some(domain) = domain.as_ref() else {
return None;
};
.find_map(|(_, p)| (p.inner.conn_id == client_id).then_some(p.inner.clone()))
else {
return None;
};
if !is_subdomain(domain, &r.address) {
return None;
}
let addresses = match &resource {
ResourceDescription::Dns(r) => {
let Some(domain) = domain.clone() else {
return None;
};
(domain.to_string(), 0)
.to_socket_addrs()
.ok()?
.map(|a| a.ip())
.map(Into::into)
.collect()
if !is_subdomain(&domain, &r.address) {
return None;
}
ResourceDescription::Cidr(cidr) => vec![cidr.address],
};
for address in &addresses {
peer.inner
.transform
.add_resource(*address, resource.clone(), expires_at);
tokio::task::spawn_blocking(move || resolve_addresses(&domain.to_string()))
.await
.ok()?
.ok()?
}
ResourceDescription::Cidr(cidr) => vec![cidr.address],
};
if let Some(domain) = domain {
return Some(ResourceAccepted {
domain_response: DomainResponse {
domain,
address: addresses.iter().map(|i| i.network_address()).collect(),
},
});
}
for address in &addresses {
peer.transform
.add_resource(*address, resource.clone(), expires_at);
}
if let Some(domain) = domain {
return Some(ResourceAccepted {
domain_response: DomainResponse {
domain,
address: addresses.iter().map(|i| i.network_address()).collect(),
},
});
}
None

View File

@@ -169,20 +169,24 @@ impl Eventloop {
}))) => {
tracing::debug!(client = %client_id, resource = %resource.id(), expires = ?expires_at.map(|e| e.to_rfc3339()), "Allowing access to resource");
if let Some(res) = self
.tunnel
.allow_access(resource, client_id, expires_at, payload)
{
let sender = self.portal_sender.clone();
tokio::spawn(async move {
sender
let tunnel = Arc::clone(&self.tunnel);
let sender = self.portal_sender.clone();
tokio::spawn(async move {
if let Some(res) = tunnel
.allow_access(resource, client_id, expires_at, payload)
.await
{
if let Err(e) = sender
.send(EgressMessages::ConnectionReady(ConnectionReady {
reference,
gateway_payload: GatewayResponse::ResourceAccepted(res),
}))
.await
});
}
{
tracing::warn!("Error while sending gateway response: {e:#?}");
}
}
});
continue;
}
Poll::Ready(Some(IngressMessages::IceCandidates(ClientIceCandidates {