diff --git a/rust/connlib/tunnel/src/client.rs b/rust/connlib/tunnel/src/client.rs index 2b146b110..e2f59d431 100644 --- a/rust/connlib/tunnel/src/client.rs +++ b/rust/connlib/tunnel/src/client.rs @@ -344,29 +344,23 @@ impl ClientState { .remove(&(*gid, domain)) .unwrap_or_default(); - let Some(intent) = self.dns_resource_nat.update( + match self.dns_resource_nat.update( domain.clone(), *gid, *rid, proxy_ips, packets_for_domain, now, - ) else { - continue; - }; + ) { + Ok(()) => {} + Err(e) => { + tracing::warn!("Failed to update DNS resource NAT state: {e:#}"); + continue; + } + } self.peers .add_ips_with_resource(gid, proxy_ips.iter().copied(), rid); - - tracing::debug!(%gid, %domain, "Setting up DNS resource NAT"); - - encapsulate_and_buffer( - intent, - *gid, - now, - &mut self.node, - &mut self.buffered_transmits, - ); } } @@ -539,7 +533,7 @@ impl ClientState { if let Some((domain, _)) = self.stub_resolver.resolve_resource_by_ip(&dst) { packet = self .dns_resource_nat - .handle_outgoing(peer.id(), domain, packet)?; + .handle_outgoing(peer.id(), domain, packet, now)?; } let gid = peer.id(); @@ -1028,6 +1022,7 @@ impl ClientState { .handle_timeout(now); self.advance_dns_tcp_sockets(now); + self.send_dns_resource_nat_packets(now); } /// Advance the TCP DNS server and client state machines. @@ -1083,6 +1078,20 @@ impl ClientState { } } + fn send_dns_resource_nat_packets(&mut self, now: Instant) { + while let Some((gid, domain, packet)) = self.dns_resource_nat.poll_packet() { + tracing::debug!(%gid, %domain, "Setting up DNS resource NAT"); + + encapsulate_and_buffer( + packet, + gid, + now, + &mut self.node, + &mut self.buffered_transmits, + ); + } + } + fn handle_udp_dns_query( &mut self, upstream: SocketAddr, diff --git a/rust/connlib/tunnel/src/client/dns_resource_nat.rs b/rust/connlib/tunnel/src/client/dns_resource_nat.rs index eb5195602..ef58188e0 100644 --- a/rust/connlib/tunnel/src/client/dns_resource_nat.rs +++ b/rust/connlib/tunnel/src/client/dns_resource_nat.rs @@ -4,6 +4,7 @@ use std::{ time::{Duration, Instant}, }; +use anyhow::Result; use connlib_model::{GatewayId, ResourceId}; use dns_types::DomainName; use ip_packet::IpPacket; @@ -17,7 +18,9 @@ use crate::{p2p_control, unique_packet_buffer::UniquePacketBuffer}; /// Until the NAT is set up, packets sent to these resources are effectively black-holed. #[derive(Default)] pub struct DnsResourceNat { - inner: BTreeMap<(GatewayId, DomainName), State>, + inner: BTreeMap<(GatewayId, DomainName), (State, IpPacket)>, + + assigned_ips_packets: VecDeque<(GatewayId, DomainName, IpPacket)>, } impl DnsResourceNat { @@ -29,26 +32,37 @@ impl DnsResourceNat { proxy_ips: &[IpAddr], packets_for_domain: VecDeque, now: Instant, - ) -> Option { + ) -> Result<()> { match self.inner.entry((gid, domain.clone())) { Entry::Vacant(v) => { let mut buffered_packets = UniquePacketBuffer::with_capacity_power_of_2(5, "dns-resource-nat-initial"); // 2^5 = 32 buffered_packets.extend(packets_for_domain); - v.insert(State::Pending { - sent_at: now, - buffered_packets, + let assigned_ips = p2p_control::dns_resource_nat::assigned_ips( + rid, + domain.clone(), + proxy_ips.to_vec(), + )?; - should_buffer: true, - }); + v.insert(( + State::Pending { + sent_at: now, + buffered_packets, + + should_buffer: true, + }, + assigned_ips.clone(), + )); + + self.assigned_ips_packets + .push_back((gid, domain, assigned_ips)); } Entry::Occupied(mut o) => { - let state = o.get_mut(); + let (state, assigned_ips) = o.get_mut(); match state { - State::Confirmed => return None, - State::Failed => return None, + State::Failed | State::Confirmed => {} State::Recreating { should_buffer } => { let mut buffered_packets = UniquePacketBuffer::with_capacity_power_of_2( 5, // 2^5 = 32 @@ -61,35 +75,31 @@ impl DnsResourceNat { buffered_packets, should_buffer: *should_buffer, }; + + self.assigned_ips_packets + .push_back((gid, domain, assigned_ips.clone())); } State::Pending { sent_at, buffered_packets, .. } => { - let time_since_last_attempt = now.duration_since(*sent_at); buffered_packets.extend(packets_for_domain); - if time_since_last_attempt < Duration::from_secs(2) { - return None; + if should_send_assigned_ips_packet(now, *sent_at) { + *sent_at = now; + self.assigned_ips_packets.push_back(( + gid, + domain, + assigned_ips.clone(), + )); } - - *sent_at = now; } } } } - let packet = - match p2p_control::dns_resource_nat::assigned_ips(rid, domain, proxy_ips.to_vec()) { - Ok(packet) => packet, - Err(e) => { - tracing::warn!("Failed to create IP packet for `AssignedIp`s event: {e:#}"); - return None; - } - }; - - Some(packet) + Ok(()) } /// Recreate the DNS resource NAT state for a given domain. @@ -105,7 +115,7 @@ impl DnsResourceNat { /// to continue flowing to the Gateway while the DNS resource NAT is being recreated. /// In most cases, the DNS records will not change and as such, performing this will not interrupt the flow of packets. pub fn recreate(&mut self, domain: DomainName) { - for state in self + for (state, _) in self .inner .iter_mut() .filter_map(|((_, candidate), b)| (candidate == &domain).then_some(b)) @@ -130,8 +140,9 @@ impl DnsResourceNat { gid: GatewayId, domain: &DomainName, packet: IpPacket, + now: Instant, ) -> Option { - let Some(state) = self.inner.get_mut(&(gid, domain.clone())) else { + let Some((state, assigned_ips)) = self.inner.get_mut(&(gid, domain.clone())) else { tracing::debug!(%gid, %domain, "No DNS resource NAT entry"); return Some(packet); // Pass-through packet. @@ -141,12 +152,38 @@ impl DnsResourceNat { State::Pending { should_buffer: true, buffered_packets, - .. + sent_at, } => { buffered_packets.push(packet); + + if should_send_assigned_ips_packet(now, *sent_at) { + *sent_at = now; + self.assigned_ips_packets.push_back(( + gid, + domain.clone(), + assigned_ips.clone(), + )); + } + None } - State::Pending { .. } | State::Recreating { .. } | State::Confirmed | State::Failed => { + State::Pending { + should_buffer: false, + sent_at, + .. + } => { + if should_send_assigned_ips_packet(now, *sent_at) { + *sent_at = now; + self.assigned_ips_packets.push_back(( + gid, + domain.clone(), + assigned_ips.clone(), + )); + } + + Some(packet) + } + State::Recreating { .. } | State::Confirmed | State::Failed => { // Some of these might be black-holed on the Gateway (i.e. in `Failed`). // But there isn't much we can do ... Some(packet) @@ -176,7 +213,7 @@ impl DnsResourceNat { return into_iter(None); }; - let nat_state = nat_entry.get_mut(); + let (nat_state, _) = nat_entry.get_mut(); if res.status != p2p_control::dns_resource_nat::NatStatus::Active { tracing::debug!(%gid, domain = %res.domain, "DNS resource NAT is not active"); @@ -188,6 +225,14 @@ impl DnsResourceNat { into_iter(Some(nat_state.confirm())) } + + pub fn poll_packet(&mut self) -> Option<(GatewayId, DomainName, IpPacket)> { + self.assigned_ips_packets.pop_front() + } +} + +fn should_send_assigned_ips_packet(now: Instant, sent_at: Instant) -> bool { + now.duration_since(sent_at) >= Duration::from_secs(2) } fn into_iter(option: Option) -> impl IntoIterator @@ -254,15 +299,17 @@ mod tests { fn no_recreate_nat_for_failed_response() { let mut dns_resource_nat = DnsResourceNat::default(); - let intent = dns_resource_nat.update( - EXAMPLE_COM.to_vec(), - GID, - RID, - PROXY_IPS, - VecDeque::default(), - Instant::now(), - ); - assert!(intent.is_some()); + dns_resource_nat + .update( + EXAMPLE_COM.to_vec(), + GID, + RID, + PROXY_IPS, + VecDeque::default(), + Instant::now(), + ) + .unwrap(); + assert!(dns_resource_nat.poll_packet().is_some()); dns_resource_nat.on_domain_status( GID, @@ -273,29 +320,33 @@ mod tests { }, ); - let intent = dns_resource_nat.update( - EXAMPLE_COM.to_vec(), - GID, - RID, - PROXY_IPS, - VecDeque::default(), - Instant::now(), - ); - assert!(intent.is_none()); + dns_resource_nat + .update( + EXAMPLE_COM.to_vec(), + GID, + RID, + PROXY_IPS, + VecDeque::default(), + Instant::now(), + ) + .unwrap(); + assert!(dns_resource_nat.poll_packet().is_none()); } #[test] fn recreate_failed_nat() { let mut dns_resource_nat = DnsResourceNat::default(); - dns_resource_nat.update( - EXAMPLE_COM.to_vec(), - GID, - RID, - PROXY_IPS, - VecDeque::default(), - Instant::now(), - ); + dns_resource_nat + .update( + EXAMPLE_COM.to_vec(), + GID, + RID, + PROXY_IPS, + VecDeque::default(), + Instant::now(), + ) + .unwrap(); dns_resource_nat.on_domain_status( GID, p2p_control::dns_resource_nat::DomainStatus { @@ -307,22 +358,25 @@ mod tests { dns_resource_nat.recreate(EXAMPLE_COM.to_vec()); - let intent = dns_resource_nat.update( - EXAMPLE_COM.to_vec(), - GID, - RID, - PROXY_IPS, - VecDeque::default(), - Instant::now(), - ); - assert!(intent.is_some()); + dns_resource_nat + .update( + EXAMPLE_COM.to_vec(), + GID, + RID, + PROXY_IPS, + VecDeque::default(), + Instant::now(), + ) + .unwrap(); + assert!(dns_resource_nat.poll_packet().is_some()); // Should buffer packets if we are coming from `Failed`. let packet = ip_packet::make::udp_packet(Ipv4Addr::LOCALHOST, Ipv4Addr::LOCALHOST, 0, 0, vec![]) .unwrap(); - let maybe_packet = dns_resource_nat.handle_outgoing(GID, &EXAMPLE_COM.to_vec(), packet); + let maybe_packet = + dns_resource_nat.handle_outgoing(GID, &EXAMPLE_COM.to_vec(), packet, Instant::now()); assert!(maybe_packet.is_none()); } @@ -331,21 +385,27 @@ mod tests { fn buffer_packets_until_nat_is_active() { let mut dns_resource_nat = DnsResourceNat::default(); - dns_resource_nat.update( - EXAMPLE_COM.to_vec(), - GID, - RID, - PROXY_IPS, - VecDeque::default(), - Instant::now(), - ); + dns_resource_nat + .update( + EXAMPLE_COM.to_vec(), + GID, + RID, + PROXY_IPS, + VecDeque::default(), + Instant::now(), + ) + .unwrap(); let packet = ip_packet::make::udp_packet(Ipv4Addr::LOCALHOST, Ipv4Addr::LOCALHOST, 0, 0, vec![]) .unwrap(); - let maybe_packet = - dns_resource_nat.handle_outgoing(GID, &EXAMPLE_COM.to_vec(), packet.clone()); + let maybe_packet = dns_resource_nat.handle_outgoing( + GID, + &EXAMPLE_COM.to_vec(), + packet.clone(), + Instant::now(), + ); assert!(maybe_packet.is_none()); @@ -365,14 +425,16 @@ mod tests { fn dont_buffer_packets_upon_recreate() { let mut dns_resource_nat = DnsResourceNat::default(); - dns_resource_nat.update( - EXAMPLE_COM.to_vec(), - GID, - RID, - PROXY_IPS, - VecDeque::default(), - Instant::now(), - ); + dns_resource_nat + .update( + EXAMPLE_COM.to_vec(), + GID, + RID, + PROXY_IPS, + VecDeque::default(), + Instant::now(), + ) + .unwrap(); dns_resource_nat.on_domain_status( GID, p2p_control::dns_resource_nat::DomainStatus { @@ -383,22 +445,30 @@ mod tests { ); dns_resource_nat.recreate(EXAMPLE_COM.to_vec()); - dns_resource_nat.update( - EXAMPLE_COM.to_vec(), - GID, - RID, - PROXY_IPS, - VecDeque::default(), - Instant::now(), - ); + dns_resource_nat + .update( + EXAMPLE_COM.to_vec(), + GID, + RID, + PROXY_IPS, + VecDeque::default(), + Instant::now(), + ) + .unwrap(); - let packet = + let app_packet = ip_packet::make::udp_packet(Ipv4Addr::LOCALHOST, Ipv4Addr::LOCALHOST, 0, 0, vec![]) .unwrap(); - let maybe_packet = dns_resource_nat.handle_outgoing(GID, &EXAMPLE_COM.to_vec(), packet); + let maybe_packet = dns_resource_nat.handle_outgoing( + GID, + &EXAMPLE_COM.to_vec(), + app_packet.clone(), + Instant::now(), + ); - assert!(maybe_packet.is_some()); + assert!(maybe_packet.is_some_and(|p| p == app_packet)); + assert!(dns_resource_nat.poll_packet().is_some()); } #[test] @@ -406,8 +476,8 @@ mod tests { let mut dns_resource_nat = DnsResourceNat::default(); let mut now = Instant::now(); - let mut update_fn = |now| { - dns_resource_nat.update( + dns_resource_nat + .update( EXAMPLE_COM.to_vec(), GID, RID, @@ -415,14 +485,64 @@ mod tests { VecDeque::default(), now, ) - }; + .unwrap(); + assert!(dns_resource_nat.poll_packet().is_some()); - assert!(update_fn(now).is_some()); - assert!(update_fn(now).is_none()); + dns_resource_nat + .update( + EXAMPLE_COM.to_vec(), + GID, + RID, + PROXY_IPS, + VecDeque::default(), + now, + ) + .unwrap(); + assert!(dns_resource_nat.poll_packet().is_none()); now += Duration::from_secs(2); - assert!(update_fn(now).is_some()); + dns_resource_nat + .update( + EXAMPLE_COM.to_vec(), + GID, + RID, + PROXY_IPS, + VecDeque::default(), + now, + ) + .unwrap(); + assert!(dns_resource_nat.poll_packet().is_some()); + } + + #[test] + fn resend_intent_on_outgoing_packet_after_2s() { + let mut dns_resource_nat = DnsResourceNat::default(); + let mut now = Instant::now(); + + dns_resource_nat + .update( + EXAMPLE_COM.to_vec(), + GID, + RID, + PROXY_IPS, + VecDeque::default(), + now, + ) + .unwrap(); + assert!(dns_resource_nat.poll_packet().is_some()); + + now += Duration::from_secs(2); + + let app_packet = + ip_packet::make::udp_packet(Ipv4Addr::LOCALHOST, Ipv4Addr::LOCALHOST, 0, 0, vec![]) + .unwrap(); + + let maybe_packet = + dns_resource_nat.handle_outgoing(GID, &EXAMPLE_COM.to_vec(), app_packet, now); + + assert!(maybe_packet.is_none()); + assert!(dns_resource_nat.poll_packet().is_some()); } const EXAMPLE_COM: DomainNameRef =