fix(gateway): re-implement resource address resolution in eventloop (#3656)

Reimplements what #3654 reverted with a fix

---------

Co-authored-by: Thomas Eizinger <thomas@eizinger.io>
This commit is contained in:
Gabi
2024-02-15 17:51:59 -03:00
committed by GitHub
parent 0fbd40fcb2
commit 55e4fb100f
9 changed files with 157 additions and 96 deletions

4
rust/Cargo.lock generated
View File

@@ -2018,11 +2018,14 @@ dependencies = [
"chrono",
"clap",
"connlib-shared",
"dns-lookup",
"domain",
"firezone-cli-utils",
"firezone-tunnel",
"futures",
"futures-bounded",
"ip_network",
"libc",
"phoenix-channel",
"secrecy",
"serde",
@@ -2108,7 +2111,6 @@ dependencies = [
"bytes",
"chrono",
"connlib-shared",
"dns-lookup",
"domain",
"futures",
"futures-bounded",

View File

@@ -131,8 +131,8 @@ impl Eq for RequestConnection {}
#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ResourceDescription {
Dns(ResourceDescriptionDns),
pub enum ResourceDescription<TDNS = ResourceDescriptionDns> {
Dns(TDNS),
Cidr(ResourceDescriptionCidr),
}

View File

@@ -29,7 +29,6 @@ futures-bounded = { workspace = true }
hickory-resolver = { workspace = true }
arc-swap = "1.6.0"
bimap = "0.6"
dns-lookup = { workspace = true }
resolv-conf = "0.7.0"
# TODO: research replacing for https://github.com/algesten/str0m

View File

@@ -24,7 +24,7 @@ use crate::{
};
mod client;
mod gateway;
pub mod gateway;
const ICE_CANDIDATE_BUFFER: usize = 100;
// We should use not more than 1-2 relays (WebRTC in Firefox breaks at 5) due to combinatoric

View File

@@ -2,23 +2,38 @@ use crate::{
control_protocol::{insert_peers, start_handlers},
dns::is_subdomain,
peer::{PacketTransformGateway, Peer},
ConnectedPeer, GatewayState, PeerConfig, Tunnel, PEER_QUEUE_SIZE,
ConnectedPeer, Error, GatewayState, PeerConfig, Tunnel, PEER_QUEUE_SIZE,
};
use chrono::{DateTime, Utc};
use connlib_shared::{
messages::{
ClientId, ClientPayload, ConnectionAccepted, DomainResponse, Relay, ResourceAccepted,
ResourceDescription,
},
Callbacks, Dname, Error, Result,
messages::{ClientId, ConnectionAccepted, DomainResponse, Relay, ResourceId},
Callbacks, Dname, Result,
};
use ip_network::IpNetwork;
use std::sync::Arc;
use webrtc::ice_transport::{
ice_role::RTCIceRole, ice_transport_state::RTCIceTransportState, RTCIceTransport,
ice_parameters::RTCIceParameters, ice_role::RTCIceRole,
ice_transport_state::RTCIceTransportState, RTCIceTransport,
};
/// Description of a resource that maps to a DNS record which had its domain already resolved.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ResolvedResourceDescriptionDns {
pub id: ResourceId,
/// Internal resource's domain name.
pub domain: String,
/// Name of the resource.
///
/// Used only for display.
pub name: String,
pub addresses: Vec<IpNetwork>,
}
pub type ResourceDescription =
connlib_shared::messages::ResourceDescription<ResolvedResourceDescriptionDns>;
use super::{new_ice_connection, IceConnection};
#[tracing::instrument(level = "trace", skip(ice))]
@@ -57,12 +72,14 @@ where
///
/// # Returns
/// The connection details
#[allow(clippy::too_many_arguments)]
pub async fn set_peer_connection_request(
self: &Arc<Self>,
client_payload: ClientPayload,
ice_parameters: RTCIceParameters,
peer: PeerConfig,
relays: Vec<Relay>,
client_id: ClientId,
domain: Option<Dname>,
expires_at: Option<DateTime<Utc>>,
resource: ResourceDescription,
) -> Result<ConnectionAccepted> {
@@ -92,17 +109,16 @@ where
let resource_addresses = match &resource {
ResourceDescription::Dns(r) => {
let Some(domain) = client_payload.domain.clone() else {
let Some(domain) = domain.clone() else {
return Err(Error::ControlProtocolError);
};
if !is_subdomain(&domain, &r.address) {
if !is_subdomain(&domain, &r.domain) {
let _ = ice.stop().await;
return Err(Error::InvalidResource);
}
tokio::task::spawn_blocking(move || resolve_addresses(&domain.to_string()))
.await??
r.addresses.clone()
}
ResourceDescription::Cidr(ref cidr) => vec![cidr.address],
};
@@ -112,7 +128,7 @@ where
let tunnel = self.clone();
tokio::spawn(async move {
if let Err(e) = ice
.start(&client_payload.ice_parameters, Some(RTCIceRole::Controlled))
.start(&ice_parameters, Some(RTCIceRole::Controlled))
.await
.map_err(Into::into)
.and_then(|_| {
@@ -150,7 +166,7 @@ where
Ok(ConnectionAccepted {
ice_parameters: local_params,
domain_response: client_payload.domain.map(|domain| DomainResponse {
domain_response: domain.map(|domain| DomainResponse {
domain,
address: resource_addresses
.into_iter()
@@ -160,13 +176,13 @@ where
})
}
pub async fn allow_access(
pub fn allow_access(
&self,
resource: ResourceDescription,
client_id: ClientId,
expires_at: Option<DateTime<Utc>>,
domain: Option<Dname>,
) -> Option<ResourceAccepted> {
) -> Option<DomainResponse> {
let Some(peer) = self
.role_state
.lock()
@@ -183,14 +199,11 @@ where
return None;
};
if !is_subdomain(&domain, &r.address) {
if !is_subdomain(&domain, &r.domain) {
return None;
}
tokio::task::spawn_blocking(move || resolve_addresses(&domain.to_string()))
.await
.ok()?
.ok()?
r.addresses.clone()
}
ResourceDescription::Cidr(cidr) => vec![cidr.address],
};
@@ -201,11 +214,9 @@ where
}
if let Some(domain) = domain {
return Some(ResourceAccepted {
domain_response: DomainResponse {
domain,
address: addresses.iter().map(|i| i.network_address()).collect(),
},
return Some(DomainResponse {
domain,
address: addresses.iter().map(|i| i.network_address()).collect(),
});
}
@@ -259,50 +270,3 @@ where
Ok(())
}
}
#[cfg(target_os = "windows")]
fn resolve_addresses(_: &str) -> std::io::Result<Vec<IpNetwork>> {
unimplemented!()
}
#[cfg(not(target_os = "windows"))]
fn resolve_addresses(addr: &str) -> std::io::Result<Vec<IpNetwork>> {
use libc::{AF_INET, AF_INET6};
let addr_v4: std::io::Result<Vec<_>> = resolve_address_family(addr, AF_INET)
.map_err(|e| e.into())
.and_then(|a| a.collect());
let addr_v6: std::io::Result<Vec<_>> = resolve_address_family(addr, AF_INET6)
.map_err(|e| e.into())
.and_then(|a| a.collect());
match (addr_v4, addr_v6) {
(Ok(v4), Ok(v6)) => Ok(v6
.iter()
.map(|a| a.sockaddr.ip().into())
.chain(v4.iter().map(|a| a.sockaddr.ip().into()))
.collect()),
(Ok(v4), Err(_)) => Ok(v4.iter().map(|a| a.sockaddr.ip().into()).collect()),
(Err(_), Ok(v6)) => Ok(v6.iter().map(|a| a.sockaddr.ip().into()).collect()),
(Err(e), Err(_)) => Err(e),
}
}
#[cfg(not(target_os = "windows"))]
use dns_lookup::{AddrInfoHints, AddrInfoIter, LookupError};
#[cfg(not(target_os = "windows"))]
fn resolve_address_family(
addr: &str,
family: i32,
) -> std::result::Result<AddrInfoIter, LookupError> {
use libc::SOCK_STREAM;
dns_lookup::getaddrinfo(
Some(addr),
None,
Some(AddrInfoHints {
socktype: SOCK_STREAM,
address: family,
..Default::default()
}),
)
}

View File

@@ -48,7 +48,7 @@ use connlib_shared::{
pub use client::ClientState;
use connlib_shared::error::ConnlibError;
pub use control_protocol::Request;
pub use control_protocol::{gateway::ResolvedResourceDescriptionDns, Request};
pub use gateway::GatewayState;
use crate::ip_packet::MutableIpPacket;

View File

@@ -13,13 +13,14 @@ use bytes::Bytes;
use chrono::{DateTime, Utc};
use connlib_shared::messages::DnsServer;
use connlib_shared::IpProvider;
use connlib_shared::{messages::ResourceDescription, Error, Result};
use connlib_shared::{Error, Result};
use ip_network::IpNetwork;
use ip_network_table::IpNetworkTable;
use parking_lot::{Mutex, RwLock};
use pnet_packet::Packet;
use secrecy::ExposeSecret;
use crate::control_protocol::gateway::ResourceDescription;
use crate::MAX_UDP_SIZE;
use crate::{device_channel, ip_packet::MutableIpPacket, PeerConfig};

View File

@@ -29,6 +29,9 @@ url = { version = "2.4.1", default-features = false }
webrtc = { workspace = true }
domain = { workspace = true }
uuid = { version = "1.7.0", features = ["v4"] }
ip_network = { version = "0.4", default-features = false }
dns-lookup = { workspace = true }
libc = { version = "0.2", default-features = false, features = ["std", "const-extern-fn", "extra_traits"] }
[dev-dependencies]
serde_json = { version = "1.0", default-features = false, features = ["std"] }

View File

@@ -3,10 +3,13 @@ use crate::messages::{
EgressMessages, IngressMessages,
};
use crate::CallbackHandler;
use anyhow::{anyhow, Result};
use connlib_shared::messages::{ClientId, GatewayResponse, ResourceAccepted};
use connlib_shared::Error;
use firezone_tunnel::{Event, GatewayState, Tunnel};
use anyhow::{anyhow, bail, Result};
use connlib_shared::messages::{
ClientId, DomainResponse, GatewayResponse, ResourceAccepted, ResourceDescription,
};
use connlib_shared::{Dname, Error};
use firezone_tunnel::{Event, GatewayState, ResolvedResourceDescriptionDns, Tunnel};
use ip_network::IpNetwork;
use phoenix_channel::PhoenixChannel;
use std::convert::Infallible;
use std::sync::Arc;
@@ -21,9 +24,9 @@ pub struct Eventloop {
// TODO: Strongly type request reference (currently `String`)
connection_request_tasks:
futures_bounded::FuturesMap<(ClientId, String), Result<GatewayResponse, Error>>,
futures_bounded::FuturesMap<(ClientId, String), Result<GatewayResponse>>,
add_ice_candidate_tasks: futures_bounded::FuturesSet<Result<(), Error>>,
allow_access_tasks: futures_bounded::FuturesMap<String, Option<ResourceAccepted>>,
allow_access_tasks: futures_bounded::FuturesMap<String, Option<DomainResponse>>,
print_stats_timer: tokio::time::Interval,
}
@@ -94,7 +97,7 @@ impl Eventloop {
}
Poll::Ready(((client, _), Ok(Err(e)))) => {
self.tunnel.cleanup_connection(client);
tracing::debug!(%client, "Connection request failed: {:#}", anyhow::Error::new(e));
tracing::debug!(%client, "Connection request failed: {:#}", e);
continue;
}
@@ -110,12 +113,14 @@ impl Eventloop {
}
match self.allow_access_tasks.poll_unpin(cx) {
Poll::Ready((reference, Ok(Some(res)))) => {
Poll::Ready((reference, Ok(Some(domain_response)))) => {
self.portal.send(
PHOENIX_TOPIC,
EgressMessages::ConnectionReady(ConnectionReady {
reference,
gateway_payload: GatewayResponse::ResourceAccepted(res),
gateway_payload: GatewayResponse::ResourceAccepted(ResourceAccepted {
domain_response,
}),
}),
);
continue;
@@ -157,18 +162,24 @@ impl Eventloop {
let tunnel = Arc::clone(&self.tunnel);
let connection_request = async move {
let resource =
resolve_resource_description(req.resource, &req.client.payload.domain)
.await?;
let conn = tunnel
.set_peer_connection_request(
req.client.payload,
req.client.payload.ice_parameters,
req.client.peer.into(),
req.relays,
req.client.id,
req.client.payload.domain,
req.expires_at,
req.resource,
resource,
)
.await?;
Ok(GatewayResponse::ConnectionAccepted(conn))
};
match self
.connection_request_tasks
.try_push((req.client.id, req.reference.clone()), connection_request)
@@ -181,6 +192,7 @@ impl Eventloop {
}
Ok(()) => {}
};
continue;
}
Poll::Ready(phoenix_channel::Event::InboundMessage {
@@ -198,13 +210,17 @@ impl Eventloop {
let tunnel = Arc::clone(&self.tunnel);
let allow_access = async move {
let resource = resolve_resource_description(resource, &payload)
.await
.ok()?;
tunnel.allow_access(resource, client_id, expires_at, payload)
};
if self
.allow_access_tasks
.try_push(reference, async move {
tunnel
.allow_access(resource, client_id, expires_at, payload)
.await
})
.try_push(reference, allow_access)
.is_err()
{
tracing::warn!("Too many allow access requests, dropping existing one");
@@ -257,3 +273,79 @@ impl Eventloop {
}
}
}
async fn resolve_resource_description(
resource: ResourceDescription,
domain: &Option<Dname>,
) -> Result<ResourceDescription<ResolvedResourceDescriptionDns>> {
match resource {
ResourceDescription::Dns(dns) => {
let Some(domain) = domain.clone() else {
debug_assert!(
false,
"We should never get a DNS resource access request without the subdomain"
);
bail!("Protocol error: Request for DNS resource without the subdomain being tried to access.")
};
let addresses =
tokio::task::spawn_blocking(move || resolve_addresses(&domain.to_string()))
.await??;
Ok(ResourceDescription::Dns(ResolvedResourceDescriptionDns {
id: dns.id,
domain: dns.address,
name: dns.name,
addresses,
}))
}
ResourceDescription::Cidr(cdir) => Ok(ResourceDescription::Cidr(cdir)),
}
}
#[cfg(target_os = "windows")]
fn resolve_addresses(_: &str) -> std::io::Result<Vec<IpNetwork>> {
unimplemented!()
}
#[cfg(not(target_os = "windows"))]
fn resolve_addresses(addr: &str) -> std::io::Result<Vec<IpNetwork>> {
use libc::{AF_INET, AF_INET6};
let addr_v4: std::io::Result<Vec<_>> = resolve_address_family(addr, AF_INET)
.map_err(|e| e.into())
.and_then(|a| a.collect());
let addr_v6: std::io::Result<Vec<_>> = resolve_address_family(addr, AF_INET6)
.map_err(|e| e.into())
.and_then(|a| a.collect());
match (addr_v4, addr_v6) {
(Ok(v4), Ok(v6)) => Ok(v6
.iter()
.map(|a| a.sockaddr.ip().into())
.chain(v4.iter().map(|a| a.sockaddr.ip().into()))
.collect()),
(Ok(v4), Err(_)) => Ok(v4.iter().map(|a| a.sockaddr.ip().into()).collect()),
(Err(_), Ok(v6)) => Ok(v6.iter().map(|a| a.sockaddr.ip().into()).collect()),
(Err(e), Err(_)) => Err(e),
}
}
#[cfg(not(target_os = "windows"))]
use dns_lookup::{AddrInfoHints, AddrInfoIter, LookupError};
#[cfg(not(target_os = "windows"))]
fn resolve_address_family(
addr: &str,
family: i32,
) -> std::result::Result<AddrInfoIter, LookupError> {
use libc::SOCK_STREAM;
dns_lookup::getaddrinfo(
Some(addr),
None,
Some(AddrInfoHints {
socktype: SOCK_STREAM,
address: family,
..Default::default()
}),
)
}