diff --git a/rust/connlib/shared/src/error.rs b/rust/connlib/shared/src/error.rs index cdf0e26f8..86d8bfecd 100644 --- a/rust/connlib/shared/src/error.rs +++ b/rust/connlib/shared/src/error.rs @@ -148,6 +148,8 @@ pub enum ConnlibError { Wintun(#[from] wintun::Error), #[error("Token has expired")] TokenExpired, + #[error("Too many concurrent gateway connection requests")] + TooManyConnectionRequests, } impl ConnlibError { diff --git a/rust/connlib/tunnel/src/client.rs b/rust/connlib/tunnel/src/client.rs index aa8f51218..2d474ddbb 100644 --- a/rust/connlib/tunnel/src/client.rs +++ b/rust/connlib/tunnel/src/client.rs @@ -16,7 +16,7 @@ use connlib_shared::{Callbacks, Dname, DNS_SENTINEL}; use domain::base::Rtype; use futures::channel::mpsc::Receiver; use futures::stream; -use futures_bounded::{PushError, StreamMap}; +use futures_bounded::{FuturesMap, PushError, StreamMap}; use hickory_resolver::lookup::Lookup; use ip_network::{IpNetwork, Ipv4Network, Ipv6Network}; use ip_network_table::IpNetworkTable; @@ -35,6 +35,7 @@ use webrtc::ice_transport::ice_candidate::RTCIceCandidate; // Using str here because Ipv4/6Network doesn't support `const` 🙃 const IPV4_RESOURCES: &str = "100.96.0.0/11"; const IPV6_RESOURCES: &str = "fd00:2021:1111:8000::/107"; +const MAX_CONNECTION_REQUEST_DELAY: Duration = Duration::from_secs(10); #[derive(Debug, Clone, Hash, PartialEq, Eq)] pub struct DnsResource { @@ -167,10 +168,19 @@ pub struct ClientState { // TODO: Make private pub awaiting_connection: HashMap, - pub gateway_awaiting_connection: HashSet, - awaiting_connection_timers: StreamMap, + pub gateway_awaiting_connection: HashSet, + // This timer exist for an unlikely case, on unreliable connections where the RequestConnection message + // or the response is lost: + // This would remove the "PendingConnection" message and be able to try the connection again. + // There are some edge cases that come with this: + // * a gateway in a VERY unlikely case could receive the connection request twice. This will stop any connection attempt and make the whole thing start again. + // if this would happen often the UX would be awful but this is only in cases where messages are delayed for more than 10 seconds, it's enough that it doesn't break correctness. + // * even more unlikely a tunnel could be established in a sort of race condition when this timer goes off. Again a similar behavior to the one above will happen, the webrtc connection will be forcefully terminated from the gateway. + // then the old peer will expire, this might take ~180 seconds. This is an even worse experience but the likelihood of this happen is infinitesimaly small, again correctness is the only important part. + gateway_awaiting_connection_timers: FuturesMap, + pub gateway_public_keys: HashMap, pub gateway_preshared_keys: HashMap, resources_gateways: HashMap, @@ -281,6 +291,22 @@ impl ClientState { channel: p.channel.clone(), }) }) else { + match self + .gateway_awaiting_connection_timers + // Note: we don't need to set a timer here because + // the FutureMap already expires things, it seems redundant + // to also have timer that expires. + .try_push(gateway, std::future::pending()) + { + Ok(_) => {} + Err(PushError::BeyondCapacity(_)) => { + tracing::warn!(%gateway, "Too many concurrent connection attempts"); + return Err(Error::TooManyConnectionRequests); + } + Err(PushError::Replaced(_)) => { + // The timers are equivalent for our purpose so we don't really care about this one. + } + }; self.gateway_awaiting_connection.insert(gateway); return Ok(None); }; @@ -314,6 +340,7 @@ impl ClientState { }; self.gateway_awaiting_connection.remove(&gateway); + self.gateway_awaiting_connection_timers.remove(gateway); } fn is_awaiting_connection_to_dns(&self, resource: &DnsResource) -> bool { @@ -457,6 +484,7 @@ impl ClientState { // Tidy up state once everything succeeded. self.gateway_awaiting_connection.remove(&gateway); + self.gateway_awaiting_connection_timers.remove(gateway); self.awaiting_connection.remove(&resource); Ok(config) @@ -612,9 +640,13 @@ impl Default for ClientState { MAX_CONCURRENT_ICE_GATHERING, ), waiting_for_sdp_from_gateway: Default::default(), + awaiting_connection: Default::default(), - gateway_awaiting_connection: Default::default(), awaiting_connection_timers: StreamMap::new(Duration::from_secs(60), 100), + + gateway_awaiting_connection: Default::default(), + gateway_awaiting_connection_timers: FuturesMap::new(MAX_CONNECTION_REQUEST_DELAY, 100), + gateway_public_keys: Default::default(), resources_gateways: Default::default(), forwarded_dns_queries: BoundedQueue::with_capacity(DNS_QUERIES_QUEUE_SIZE), @@ -654,6 +686,12 @@ impl RoleState for ClientState { Poll::Pending => {} } + if let Poll::Ready((gateway_id, _)) = + self.gateway_awaiting_connection_timers.poll_unpin(cx) + { + self.gateway_awaiting_connection.remove(&gateway_id); + } + match self.awaiting_connection_timers.poll_next_unpin(cx) { Poll::Ready((resource, Some(Ok(_)))) => { let Entry::Occupied(mut entry) = self.awaiting_connection.entry(resource) diff --git a/rust/connlib/tunnel/src/control_protocol.rs b/rust/connlib/tunnel/src/control_protocol.rs index 29858ad27..0181ecdb5 100644 --- a/rust/connlib/tunnel/src/control_protocol.rs +++ b/rust/connlib/tunnel/src/control_protocol.rs @@ -126,7 +126,7 @@ pub(crate) async fn new_ice_connection( } if ice_candidate_tx.send(candidate).await.is_err() { - debug_assert!(false, "receiver was dropped before sender"); + tracing::warn!("ice gatherer receiver was dropped before sender"); } }) }) @@ -172,6 +172,7 @@ fn start_handlers( } }) })); + tokio::spawn({ async move { // If this fails receiver will be dropped and the connection will expire at some point