Fix/lost connection request (#2976)

Should fix #2880

The way I do it is after ~10 seconds dropping the
`gateway_awaiting_connection` and let the client try the connection
again, depending on upper layer, I think this is fine since the cases
where this happens is unlikely.

It's hard to test thoroughly but I'll test with bad-condition
simulators, [pumba](https://github.com/alexei-led/pumba) seems
promising. In the meantime I'm still creating the PR so that I can have
it reviewed.

Edit: Using Pumba with different % of packet loss things seems to go
well, and connections are actually established even if the packets are
loss. (Making a note that we should integrate pumba with our CI)
This commit is contained in:
Gabi
2023-12-21 17:01:52 -03:00
committed by GitHub
parent 5edfe80eb0
commit 6e77978da7
3 changed files with 46 additions and 5 deletions

View File

@@ -148,6 +148,8 @@ pub enum ConnlibError {
Wintun(#[from] wintun::Error),
#[error("Token has expired")]
TokenExpired,
#[error("Too many concurrent gateway connection requests")]
TooManyConnectionRequests,
}
impl ConnlibError {

View File

@@ -16,7 +16,7 @@ use connlib_shared::{Callbacks, Dname, DNS_SENTINEL};
use domain::base::Rtype;
use futures::channel::mpsc::Receiver;
use futures::stream;
use futures_bounded::{PushError, StreamMap};
use futures_bounded::{FuturesMap, PushError, StreamMap};
use hickory_resolver::lookup::Lookup;
use ip_network::{IpNetwork, Ipv4Network, Ipv6Network};
use ip_network_table::IpNetworkTable;
@@ -35,6 +35,7 @@ use webrtc::ice_transport::ice_candidate::RTCIceCandidate;
// Using str here because Ipv4/6Network doesn't support `const` 🙃
const IPV4_RESOURCES: &str = "100.96.0.0/11";
const IPV6_RESOURCES: &str = "fd00:2021:1111:8000::/107";
const MAX_CONNECTION_REQUEST_DELAY: Duration = Duration::from_secs(10);
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub struct DnsResource {
@@ -167,10 +168,19 @@ pub struct ClientState {
// TODO: Make private
pub awaiting_connection: HashMap<ResourceId, AwaitingConnectionDetails>,
pub gateway_awaiting_connection: HashSet<GatewayId>,
awaiting_connection_timers: StreamMap<ResourceId, Instant>,
pub gateway_awaiting_connection: HashSet<GatewayId>,
// This timer exist for an unlikely case, on unreliable connections where the RequestConnection message
// or the response is lost:
// This would remove the "PendingConnection" message and be able to try the connection again.
// There are some edge cases that come with this:
// * a gateway in a VERY unlikely case could receive the connection request twice. This will stop any connection attempt and make the whole thing start again.
// if this would happen often the UX would be awful but this is only in cases where messages are delayed for more than 10 seconds, it's enough that it doesn't break correctness.
// * even more unlikely a tunnel could be established in a sort of race condition when this timer goes off. Again a similar behavior to the one above will happen, the webrtc connection will be forcefully terminated from the gateway.
// then the old peer will expire, this might take ~180 seconds. This is an even worse experience but the likelihood of this happen is infinitesimaly small, again correctness is the only important part.
gateway_awaiting_connection_timers: FuturesMap<GatewayId, ()>,
pub gateway_public_keys: HashMap<GatewayId, PublicKey>,
pub gateway_preshared_keys: HashMap<GatewayId, StaticSecret>,
resources_gateways: HashMap<ResourceId, GatewayId>,
@@ -281,6 +291,22 @@ impl ClientState {
channel: p.channel.clone(),
})
}) else {
match self
.gateway_awaiting_connection_timers
// Note: we don't need to set a timer here because
// the FutureMap already expires things, it seems redundant
// to also have timer that expires.
.try_push(gateway, std::future::pending())
{
Ok(_) => {}
Err(PushError::BeyondCapacity(_)) => {
tracing::warn!(%gateway, "Too many concurrent connection attempts");
return Err(Error::TooManyConnectionRequests);
}
Err(PushError::Replaced(_)) => {
// The timers are equivalent for our purpose so we don't really care about this one.
}
};
self.gateway_awaiting_connection.insert(gateway);
return Ok(None);
};
@@ -314,6 +340,7 @@ impl ClientState {
};
self.gateway_awaiting_connection.remove(&gateway);
self.gateway_awaiting_connection_timers.remove(gateway);
}
fn is_awaiting_connection_to_dns(&self, resource: &DnsResource) -> bool {
@@ -457,6 +484,7 @@ impl ClientState {
// Tidy up state once everything succeeded.
self.gateway_awaiting_connection.remove(&gateway);
self.gateway_awaiting_connection_timers.remove(gateway);
self.awaiting_connection.remove(&resource);
Ok(config)
@@ -612,9 +640,13 @@ impl Default for ClientState {
MAX_CONCURRENT_ICE_GATHERING,
),
waiting_for_sdp_from_gateway: Default::default(),
awaiting_connection: Default::default(),
gateway_awaiting_connection: Default::default(),
awaiting_connection_timers: StreamMap::new(Duration::from_secs(60), 100),
gateway_awaiting_connection: Default::default(),
gateway_awaiting_connection_timers: FuturesMap::new(MAX_CONNECTION_REQUEST_DELAY, 100),
gateway_public_keys: Default::default(),
resources_gateways: Default::default(),
forwarded_dns_queries: BoundedQueue::with_capacity(DNS_QUERIES_QUEUE_SIZE),
@@ -654,6 +686,12 @@ impl RoleState for ClientState {
Poll::Pending => {}
}
if let Poll::Ready((gateway_id, _)) =
self.gateway_awaiting_connection_timers.poll_unpin(cx)
{
self.gateway_awaiting_connection.remove(&gateway_id);
}
match self.awaiting_connection_timers.poll_next_unpin(cx) {
Poll::Ready((resource, Some(Ok(_)))) => {
let Entry::Occupied(mut entry) = self.awaiting_connection.entry(resource)

View File

@@ -126,7 +126,7 @@ pub(crate) async fn new_ice_connection(
}
if ice_candidate_tx.send(candidate).await.is_err() {
debug_assert!(false, "receiver was dropped before sender");
tracing::warn!("ice gatherer receiver was dropped before sender");
}
})
})
@@ -172,6 +172,7 @@ fn start_handlers<TId, TTransform, TRoleState>(
}
})
}));
tokio::spawn({
async move {
// If this fails receiver will be dropped and the connection will expire at some point