mirror of
https://github.com/outbackdingo/firezone.git
synced 2026-01-27 18:18:55 +00:00
feat(connlib): buffer packets during connection and NAT setup (#7477)
At present, `connlib` will always drop all IP packets until a connection is established and the DNS resource NAT is created. This causes an unnecessary delay until the connection is working because we need to wait for retransmission timers of the host's network stack to resend those packets. With the new idempotent control protocol, it is now much easier to buffer these packets and send them to the gateway once the connection is established. The buffer sizes are chosen somewhat conservatively to ensure we don't consume a lot of memory. The hypothesis here is that every protocol - even if the transport layer is unreliable like UDP - will start with a handshake involving only one or at most a few packets and waiting for a reply before sending more. Thus, as long as we can set up a connection quicker than the re-transmit timer in the host's network stack, buffering those packets should result in no packet loss. Typically, setting up a new connection takes at most 500ms which should be fast enough to not trigger any re-transmits. Resolves: #3246.
This commit is contained in:
8
rust/Cargo.lock
generated
8
rust/Cargo.lock
generated
@@ -2220,6 +2220,7 @@ dependencies = [
|
||||
"proptest-state-machine",
|
||||
"rand 0.8.5",
|
||||
"rangemap",
|
||||
"ringbuffer",
|
||||
"secrecy",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@@ -5242,6 +5243,12 @@ dependencies = [
|
||||
"windows-sys 0.52.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ringbuffer"
|
||||
version = "0.15.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3df6368f71f205ff9c33c076d170dd56ebf68e8161c733c0caa07a7a5509ed53"
|
||||
|
||||
[[package]]
|
||||
name = "rtnetlink"
|
||||
version = "0.14.1"
|
||||
@@ -5900,6 +5907,7 @@ dependencies = [
|
||||
"itertools 0.13.0",
|
||||
"once_cell",
|
||||
"rand 0.8.5",
|
||||
"ringbuffer",
|
||||
"secrecy",
|
||||
"sha2",
|
||||
"str0m",
|
||||
|
||||
@@ -74,6 +74,7 @@ native-dialog = "0.7.0"
|
||||
nix = "0.29.0"
|
||||
nu-ansi-term = "0.50"
|
||||
once_cell = "1.17.1"
|
||||
ringbuffer = "0.15.0"
|
||||
opentelemetry = "0.26.0"
|
||||
opentelemetry-otlp = "0.26.0"
|
||||
opentelemetry_sdk = "0.26.0"
|
||||
|
||||
@@ -16,6 +16,7 @@ ip-packet = { workspace = true }
|
||||
itertools = { workspace = true }
|
||||
once_cell = { workspace = true }
|
||||
rand = { workspace = true }
|
||||
ringbuffer = { workspace = true }
|
||||
secrecy = { workspace = true }
|
||||
sha2 = { workspace = true }
|
||||
str0m = { workspace = true }
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use crate::{
|
||||
backoff::{self, ExponentialBackoff},
|
||||
node::{SessionId, Transmit},
|
||||
ringbuffer::RingBuffer,
|
||||
utils::earliest,
|
||||
EncryptedPacket,
|
||||
};
|
||||
@@ -11,6 +10,7 @@ use firezone_logging::{err_with_src, std_dyn_err};
|
||||
use hex_display::HexDisplayExt as _;
|
||||
use ip_packet::MAX_DATAGRAM_PAYLOAD;
|
||||
use rand::random;
|
||||
use ringbuffer::{AllocRingBuffer, RingBuffer as _};
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
collections::{BTreeMap, VecDeque},
|
||||
@@ -85,7 +85,7 @@ pub struct Allocation {
|
||||
>,
|
||||
|
||||
channel_bindings: ChannelBindings,
|
||||
buffered_channel_bindings: RingBuffer<SocketAddr>,
|
||||
buffered_channel_bindings: AllocRingBuffer<SocketAddr>,
|
||||
|
||||
last_now: Instant,
|
||||
|
||||
@@ -229,7 +229,7 @@ impl Allocation {
|
||||
allocation_lifetime: Default::default(),
|
||||
channel_bindings: Default::default(),
|
||||
last_now: now,
|
||||
buffered_channel_bindings: RingBuffer::new(100),
|
||||
buffered_channel_bindings: AllocRingBuffer::new(100),
|
||||
software: Software::new(format!("snownet; session={session_id}"))
|
||||
.expect("description has less then 128 chars"),
|
||||
explicit_failure: Default::default(),
|
||||
@@ -549,7 +549,7 @@ impl Allocation {
|
||||
|
||||
self.log_update(now);
|
||||
|
||||
while let Some(peer) = self.buffered_channel_bindings.pop() {
|
||||
while let Some(peer) = self.buffered_channel_bindings.dequeue() {
|
||||
debug_assert!(
|
||||
self.has_allocation(),
|
||||
"We just received a successful allocation response"
|
||||
|
||||
@@ -9,7 +9,6 @@ mod candidate_set;
|
||||
mod channel_data;
|
||||
mod index;
|
||||
mod node;
|
||||
mod ringbuffer;
|
||||
mod stats;
|
||||
mod utils;
|
||||
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use crate::allocation::{self, Allocation, RelaySocket, Socket};
|
||||
use crate::candidate_set::CandidateSet;
|
||||
use crate::index::IndexLfsr;
|
||||
use crate::ringbuffer::RingBuffer;
|
||||
use crate::stats::{ConnectionStats, NodeStats};
|
||||
use crate::utils::earliest;
|
||||
use boringtun::noise::errors::WireGuardError;
|
||||
@@ -17,6 +16,7 @@ use ip_packet::{
|
||||
use rand::rngs::StdRng;
|
||||
use rand::seq::IteratorRandom;
|
||||
use rand::{random, Rng, SeedableRng};
|
||||
use ringbuffer::{AllocRingBuffer, RingBuffer as _};
|
||||
use secrecy::{ExposeSecret, Secret};
|
||||
use sha2::Digest;
|
||||
use std::borrow::Cow;
|
||||
@@ -731,7 +731,7 @@ where
|
||||
remote_pub_key: remote,
|
||||
state: ConnectionState::Connecting {
|
||||
relay,
|
||||
buffered: RingBuffer::new(10),
|
||||
buffered: AllocRingBuffer::new(128),
|
||||
},
|
||||
possible_sockets: BTreeSet::default(),
|
||||
span: info_span!(parent: tracing::Span::none(), "connection", %cid),
|
||||
@@ -1638,7 +1638,7 @@ enum ConnectionState<RId> {
|
||||
/// A session initiation requires a response that we must not drop, otherwise the connection setup experiences unnecessary delays.
|
||||
///
|
||||
/// It can also happen if we attempt to encapsulate a packet prior to the WireGuard handshake which triggers the creation of a WireGuard handshake initiation packet.
|
||||
buffered: RingBuffer<Vec<u8>>,
|
||||
buffered: AllocRingBuffer<Vec<u8>>,
|
||||
},
|
||||
/// A socket has been nominated.
|
||||
Connected {
|
||||
|
||||
@@ -1,86 +0,0 @@
|
||||
use std::collections::VecDeque;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct RingBuffer<T> {
|
||||
buffer: VecDeque<T>,
|
||||
}
|
||||
|
||||
impl<T: PartialEq> RingBuffer<T> {
|
||||
pub fn new(capacity: usize) -> Self {
|
||||
RingBuffer {
|
||||
buffer: VecDeque::with_capacity(capacity),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn push(&mut self, item: T) {
|
||||
if self.buffer.len() == self.buffer.capacity() {
|
||||
// Remove the oldest element (at the beginning) if at capacity
|
||||
self.buffer.remove(0);
|
||||
}
|
||||
self.buffer.push_back(item);
|
||||
}
|
||||
|
||||
pub fn pop(&mut self) -> Option<T> {
|
||||
self.buffer.pop_front()
|
||||
}
|
||||
|
||||
pub fn clear(&mut self) {
|
||||
self.buffer.clear();
|
||||
}
|
||||
|
||||
pub fn iter(&self) -> impl Iterator<Item = &T> + '_ {
|
||||
self.buffer.iter()
|
||||
}
|
||||
|
||||
pub fn into_iter(self) -> impl Iterator<Item = T> {
|
||||
self.buffer.into_iter()
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.buffer.len()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn inner(&self) -> (&[T], &[T]) {
|
||||
self.buffer.as_slices()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Extend<T> for RingBuffer<T>
|
||||
where
|
||||
T: PartialEq,
|
||||
{
|
||||
fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
|
||||
for val in iter.into_iter() {
|
||||
self.push(val)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_push_within_capacity() {
|
||||
let mut buffer = RingBuffer::new(3);
|
||||
|
||||
buffer.push(1);
|
||||
buffer.push(2);
|
||||
buffer.push(3);
|
||||
|
||||
assert_eq!(buffer.inner().0, &[1, 2, 3]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_push_exceeds_capacity() {
|
||||
let mut buffer = RingBuffer::new(2);
|
||||
|
||||
buffer.push(1);
|
||||
buffer.push(2);
|
||||
buffer.push(3);
|
||||
|
||||
assert_eq!(buffer.inner().0, &[2]);
|
||||
assert_eq!(buffer.inner().1, &[3]);
|
||||
}
|
||||
}
|
||||
@@ -30,6 +30,7 @@ lru = { workspace = true }
|
||||
proptest = { workspace = true, optional = true }
|
||||
rand = { workspace = true }
|
||||
rangemap = { workspace = true }
|
||||
ringbuffer = { workspace = true }
|
||||
secrecy = { workspace = true, features = ["serde"] }
|
||||
serde = { workspace = true, features = ["derive", "std"] }
|
||||
serde_json = { workspace = true }
|
||||
|
||||
@@ -135,3 +135,5 @@ cc 9ee5fca999d50cf96107b1b275a88af1b921f16bd487387a6389552dc1c5b5b4
|
||||
cc 1a01e04e8dbd861878590647c9d5a6b777a86dd36e299d4519a092f94a928dcd
|
||||
cc d188ee5433064634b07dff90aeb3c26bd3698310bcc4d27f15f35a6296eb8687
|
||||
cc e60fe97280614a96052e1af1c8d3e4661ec2fead4d22106edf6fb5caf330b6a5
|
||||
cc c50c783f60cd7126453f38d2ae37bea3120ebb588c68f99ad5ad4a659f331338
|
||||
cc 606dacf44d60870087c7c65fb404f090c52762167e01e534dee1df0557d70304
|
||||
|
||||
@@ -3,6 +3,7 @@ mod resource;
|
||||
pub(crate) use resource::{CidrResource, Resource};
|
||||
#[cfg(all(feature = "proptest", test))]
|
||||
pub(crate) use resource::{DnsResource, InternetResource};
|
||||
use ringbuffer::{AllocRingBuffer, RingBuffer};
|
||||
|
||||
use crate::dns::StubResolver;
|
||||
use crate::messages::{DnsServer, Interface as InterfaceConfig, IpDnsServer};
|
||||
@@ -151,18 +152,56 @@ pub struct ClientState {
|
||||
}
|
||||
|
||||
enum DnsResourceNatState {
|
||||
Pending { sent_at: Instant },
|
||||
Pending {
|
||||
sent_at: Instant,
|
||||
buffered_packets: AllocRingBuffer<IpPacket>,
|
||||
},
|
||||
Confirmed,
|
||||
}
|
||||
|
||||
impl DnsResourceNatState {
|
||||
fn confirm(&mut self) {
|
||||
*self = Self::Confirmed;
|
||||
fn num_buffered_packets(&self) -> usize {
|
||||
match self {
|
||||
DnsResourceNatState::Pending {
|
||||
buffered_packets, ..
|
||||
} => buffered_packets.len(),
|
||||
DnsResourceNatState::Confirmed => 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn confirm(&mut self) -> impl Iterator<Item = IpPacket> {
|
||||
let buffered_packets = match std::mem::replace(self, DnsResourceNatState::Confirmed) {
|
||||
DnsResourceNatState::Pending {
|
||||
buffered_packets, ..
|
||||
} => Some(buffered_packets.into_iter()),
|
||||
DnsResourceNatState::Confirmed => None,
|
||||
};
|
||||
|
||||
buffered_packets.into_iter().flatten()
|
||||
}
|
||||
}
|
||||
|
||||
struct PendingFlow {
|
||||
last_intent_sent_at: Instant,
|
||||
packets: AllocRingBuffer<IpPacket>,
|
||||
}
|
||||
|
||||
impl PendingFlow {
|
||||
/// How many packets we will at most buffer in a [`PendingFlow`].
|
||||
///
|
||||
/// `PendingFlow`s are per _resource_ (which could be Internet Resource or wildcard DNS resources).
|
||||
/// Thus, we may receive a fair few packets before we can send them.
|
||||
const CAPACITY_POW_2: usize = 7; // 2^7 = 128
|
||||
|
||||
fn new(now: Instant, packet: IpPacket) -> Self {
|
||||
let mut packets = AllocRingBuffer::with_capacity_power_of_2(Self::CAPACITY_POW_2);
|
||||
packets.push(packet);
|
||||
|
||||
Self {
|
||||
last_intent_sent_at: now,
|
||||
packets,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ClientState {
|
||||
@@ -285,7 +324,38 @@ impl ClientState {
|
||||
/// 3. If we don't receive a response within 2s and this function is called again, we send another message.
|
||||
///
|
||||
/// The complexity of this function is O(N) with the number of resolved DNS resources.
|
||||
fn update_dns_resource_nat(&mut self, now: Instant) {
|
||||
fn update_dns_resource_nat(
|
||||
&mut self,
|
||||
now: Instant,
|
||||
buffered_packets: impl Iterator<Item = IpPacket>,
|
||||
) {
|
||||
// Organise all buffered packets by gateway + domain.
|
||||
let mut buffered_packets_by_gateway_and_domain = buffered_packets
|
||||
.map(|packet| {
|
||||
let (domain, resource) = self
|
||||
.stub_resolver
|
||||
.resolve_resource_by_ip(&packet.destination())
|
||||
.context("IP is not associated with a DNS resource domain")?;
|
||||
let gateway_id = self
|
||||
.resources_gateways
|
||||
.get(resource)
|
||||
.context("No gateway for resource")?;
|
||||
|
||||
anyhow::Ok((*gateway_id, domain, packet))
|
||||
})
|
||||
.filter_map(|res| {
|
||||
res.inspect_err(|e| tracing::debug!("Dropping buffered packet: {e}"))
|
||||
.ok()
|
||||
})
|
||||
.fold(
|
||||
BTreeMap::<_, VecDeque<IpPacket>>::new(),
|
||||
|mut map, (gid, domain, packet)| {
|
||||
map.entry((gid, domain)).or_default().push_back(packet);
|
||||
|
||||
map
|
||||
},
|
||||
);
|
||||
|
||||
use std::collections::btree_map::Entry;
|
||||
|
||||
for (domain, rid, proxy_ips, gid) in
|
||||
@@ -305,6 +375,10 @@ impl ClientState {
|
||||
continue;
|
||||
};
|
||||
|
||||
let packets_for_domain = buffered_packets_by_gateway_and_domain
|
||||
.remove(&(*gid, domain))
|
||||
.unwrap_or_default();
|
||||
|
||||
match self
|
||||
.dns_resource_nat_by_gateway
|
||||
.entry((*gid, domain.clone()))
|
||||
@@ -312,13 +386,22 @@ impl ClientState {
|
||||
Entry::Vacant(v) => {
|
||||
self.peers
|
||||
.add_ips_with_resource(gid, proxy_ips.iter().copied(), rid);
|
||||
let mut buffered_packets = AllocRingBuffer::with_capacity_power_of_2(5); // 2^5 = 32
|
||||
buffered_packets.extend(packets_for_domain);
|
||||
|
||||
v.insert(DnsResourceNatState::Pending { sent_at: now });
|
||||
v.insert(DnsResourceNatState::Pending {
|
||||
sent_at: now,
|
||||
buffered_packets,
|
||||
});
|
||||
}
|
||||
Entry::Occupied(mut o) => match o.get_mut() {
|
||||
DnsResourceNatState::Confirmed => continue,
|
||||
DnsResourceNatState::Pending { sent_at } => {
|
||||
DnsResourceNatState::Pending {
|
||||
sent_at,
|
||||
buffered_packets,
|
||||
} => {
|
||||
let time_since_last_attempt = now.duration_since(*sent_at);
|
||||
buffered_packets.extend(packets_for_domain);
|
||||
|
||||
if time_since_last_attempt < Duration::from_secs(2) {
|
||||
continue;
|
||||
@@ -346,18 +429,13 @@ impl ClientState {
|
||||
|
||||
tracing::debug!(%gid, %domain, "Setting up DNS resource NAT");
|
||||
|
||||
let Some(transmit) = self
|
||||
.node
|
||||
.encapsulate(*gid, packet, now)
|
||||
.inspect_err(|e| tracing::debug!(%gid, "Failed to encapsulate: {e}"))
|
||||
.ok()
|
||||
.flatten()
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
|
||||
self.buffered_transmits
|
||||
.push_back(transmit.to_transmit().into_owned());
|
||||
encapsulate_and_buffer(
|
||||
packet,
|
||||
*gid,
|
||||
now,
|
||||
&mut self.node,
|
||||
&mut self.buffered_transmits,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -435,7 +513,14 @@ impl ClientState {
|
||||
}
|
||||
|
||||
if let Some(fz_p2p_control) = packet.as_fz_p2p_control() {
|
||||
handle_p2p_control_packet(gid, fz_p2p_control, &mut self.dns_resource_nat_by_gateway);
|
||||
handle_p2p_control_packet(
|
||||
gid,
|
||||
fz_p2p_control,
|
||||
&mut self.dns_resource_nat_by_gateway,
|
||||
&mut self.node,
|
||||
&mut self.buffered_transmits,
|
||||
now,
|
||||
);
|
||||
return None;
|
||||
}
|
||||
|
||||
@@ -528,24 +613,24 @@ impl ClientState {
|
||||
|
||||
let Some(peer) = peer_by_resource_mut(&self.resources_gateways, &mut self.peers, resource)
|
||||
else {
|
||||
self.on_not_connected_resource(resource, now);
|
||||
self.on_not_connected_resource(resource, packet, now);
|
||||
return None;
|
||||
};
|
||||
|
||||
// TODO: Don't send packets unless we have a positive response for the DNS resource NAT.
|
||||
|
||||
// TODO: Check DNS resource NAT state for the domain that the destination IP belongs to.
|
||||
// Re-send if older than X.
|
||||
|
||||
// if let Some((domain, _)) = self.stub_resolver.resolve_resource_by_ip(&dst) {
|
||||
// if self
|
||||
// .dns_resource_nat_by_gateway
|
||||
// .get(&(peer.id(), domain.clone()))
|
||||
// .is_some_and(|s| s.is_pending())
|
||||
// {
|
||||
// self.update_dns_resource_nat(now);
|
||||
// }
|
||||
// }
|
||||
if let Some((domain, _)) = self.stub_resolver.resolve_resource_by_ip(&dst) {
|
||||
if let Some(DnsResourceNatState::Pending {
|
||||
buffered_packets, ..
|
||||
}) = self
|
||||
.dns_resource_nat_by_gateway
|
||||
.get_mut(&(peer.id(), domain.clone()))
|
||||
{
|
||||
buffered_packets.push(packet);
|
||||
return None;
|
||||
}
|
||||
}
|
||||
|
||||
let gid = peer.id();
|
||||
|
||||
@@ -620,9 +705,11 @@ impl ClientState {
|
||||
.get(&resource_id)
|
||||
.context("Unknown resource")?;
|
||||
|
||||
self.pending_flows
|
||||
let buffered_packets = self
|
||||
.pending_flows
|
||||
.remove(&resource_id)
|
||||
.context("No pending flow for resource")?;
|
||||
.context("No pending flow for resource")?
|
||||
.packets;
|
||||
|
||||
if let Some(old_gateway_id) = self.resources_gateways.insert(resource_id, gateway_id) {
|
||||
if self.peers.get(&old_gateway_id).is_some() {
|
||||
@@ -655,14 +742,27 @@ impl ClientState {
|
||||
self.peers.insert(GatewayOnClient::new(gateway_id), &[]);
|
||||
};
|
||||
|
||||
// This only works for CIDR & Internet Resource.
|
||||
self.peers.add_ips_with_resource(
|
||||
&gateway_id,
|
||||
resource.addresses().into_iter(),
|
||||
&resource_id,
|
||||
);
|
||||
match resource {
|
||||
Resource::Cidr(_) | Resource::Internet(_) => {
|
||||
self.peers.add_ips_with_resource(
|
||||
&gateway_id,
|
||||
resource.addresses().into_iter(),
|
||||
&resource_id,
|
||||
);
|
||||
|
||||
self.update_dns_resource_nat(now);
|
||||
// For CIDR and Internet resources, we can directly queue the buffered packets.
|
||||
for packet in buffered_packets {
|
||||
encapsulate_and_buffer(
|
||||
packet,
|
||||
gateway_id,
|
||||
now,
|
||||
&mut self.node,
|
||||
&mut self.buffered_transmits,
|
||||
);
|
||||
}
|
||||
}
|
||||
Resource::Dns(_) => self.update_dns_resource_nat(now, buffered_packets.into_iter()),
|
||||
}
|
||||
|
||||
Ok(Ok(()))
|
||||
}
|
||||
@@ -711,17 +811,19 @@ impl ClientState {
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip_all, fields(%resource))]
|
||||
fn on_not_connected_resource(&mut self, resource: ResourceId, now: Instant) {
|
||||
fn on_not_connected_resource(&mut self, resource: ResourceId, packet: IpPacket, now: Instant) {
|
||||
debug_assert!(self.resources_by_id.contains_key(&resource));
|
||||
|
||||
match self.pending_flows.entry(resource) {
|
||||
Entry::Vacant(v) => {
|
||||
v.insert(PendingFlow {
|
||||
last_intent_sent_at: now,
|
||||
});
|
||||
v.insert(PendingFlow::new(now, packet));
|
||||
}
|
||||
Entry::Occupied(mut o) => {
|
||||
let pending_flow = o.get_mut();
|
||||
pending_flow.packets.push(packet);
|
||||
let num_buffered = pending_flow.packets.len();
|
||||
|
||||
tracing::debug!(%num_buffered, "Buffering packet in `PendingFlow`");
|
||||
|
||||
let time_since_last_intent = now.duration_since(pending_flow.last_intent_sent_at);
|
||||
|
||||
@@ -1032,7 +1134,7 @@ impl ClientState {
|
||||
match self.stub_resolver.handle(message) {
|
||||
dns::ResolveStrategy::LocalResponse(response) => {
|
||||
self.clear_dns_resource_nat_for_domain(response.for_slice_ref());
|
||||
self.update_dns_resource_nat(now);
|
||||
self.update_dns_resource_nat(now, iter::empty());
|
||||
|
||||
unwrap_or_debug!(
|
||||
self.try_queue_udp_dns_response(upstream, source, &response),
|
||||
@@ -1077,7 +1179,7 @@ impl ClientState {
|
||||
match self.stub_resolver.handle(message.for_slice_ref()) {
|
||||
dns::ResolveStrategy::LocalResponse(response) => {
|
||||
self.clear_dns_resource_nat_for_domain(response.for_slice_ref());
|
||||
self.update_dns_resource_nat(now);
|
||||
self.update_dns_resource_nat(now, iter::empty());
|
||||
|
||||
unwrap_or_debug!(
|
||||
self.tcp_dns_server.send_message(query.socket, response),
|
||||
@@ -1512,6 +1614,25 @@ impl ClientState {
|
||||
}
|
||||
}
|
||||
|
||||
fn encapsulate_and_buffer(
|
||||
packet: IpPacket,
|
||||
gid: GatewayId,
|
||||
now: Instant,
|
||||
node: &mut ClientNode<GatewayId, RelayId>,
|
||||
buffered_transmits: &mut VecDeque<Transmit<'static>>,
|
||||
) {
|
||||
let Some(enc_packet) = node
|
||||
.encapsulate(gid, packet, now)
|
||||
.inspect_err(|e| tracing::debug!(%gid, "Failed to encapsulate: {e}"))
|
||||
.ok()
|
||||
.flatten()
|
||||
else {
|
||||
return;
|
||||
};
|
||||
|
||||
buffered_transmits.push_back(enc_packet.to_transmit().into_owned());
|
||||
}
|
||||
|
||||
fn parse_udp_dns_message<'b>(datagram: &UdpSlice<'b>) -> anyhow::Result<Message<&'b [u8]>> {
|
||||
let port = datagram.destination_port();
|
||||
|
||||
@@ -1530,6 +1651,9 @@ fn handle_p2p_control_packet(
|
||||
gid: GatewayId,
|
||||
fz_p2p_control: ip_packet::FzP2pControlSlice,
|
||||
dns_resource_nat_by_gateway: &mut BTreeMap<(GatewayId, DomainName), DnsResourceNatState>,
|
||||
node: &mut ClientNode<GatewayId, RelayId>,
|
||||
buffered_transmits: &mut VecDeque<Transmit<'static>>,
|
||||
now: Instant,
|
||||
) {
|
||||
use p2p_control::dns_resource_nat;
|
||||
|
||||
@@ -1552,9 +1676,13 @@ fn handle_p2p_control_packet(
|
||||
return;
|
||||
};
|
||||
|
||||
tracing::debug!(%gid, domain = %res.domain, "DNS resource NAT is active");
|
||||
tracing::debug!(%gid, domain = %res.domain, num_buffered_packets = %nat_state.num_buffered_packets(), "DNS resource NAT is active");
|
||||
|
||||
nat_state.confirm();
|
||||
let buffered_packets = nat_state.confirm();
|
||||
|
||||
for packet in buffered_packets {
|
||||
encapsulate_and_buffer(packet, gid, now, node, buffered_transmits);
|
||||
}
|
||||
}
|
||||
code => {
|
||||
tracing::debug!(code = %code.into_u8(), "Unknown control protocol");
|
||||
|
||||
@@ -361,55 +361,9 @@ impl ReferenceState {
|
||||
}
|
||||
}),
|
||||
Transition::SendDnsQueries(queries) => {
|
||||
let mut new_connections = BTreeSet::new();
|
||||
|
||||
for query in queries {
|
||||
// Some queries get answered locally.
|
||||
if state
|
||||
.client
|
||||
.inner()
|
||||
.is_locally_answered_query(&query.domain, query.r_type)
|
||||
{
|
||||
tracing::debug!("Expecting locally answered query");
|
||||
|
||||
state.client.exec_mut(|client| client.on_dns_query(query));
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check if the DNS server is defined as a resource.
|
||||
let Some(resource) = state.client.inner().dns_query_via_resource(query) else {
|
||||
// Not a resource, process normally.
|
||||
state.client.exec_mut(|client| client.on_dns_query(query));
|
||||
continue;
|
||||
};
|
||||
|
||||
let Some(gateway) = state.portal.gateway_for_resource(resource).copied() else {
|
||||
tracing::error!("Unknown gateway for resource");
|
||||
continue;
|
||||
};
|
||||
|
||||
tracing::debug!(%resource, %gateway, "Expecting DNS query via resource");
|
||||
|
||||
if !state
|
||||
.client
|
||||
.inner()
|
||||
.is_connected_to_internet_or_cidr(resource)
|
||||
{
|
||||
tracing::debug!(%resource, %gateway, "Not connected yet, dropping packet");
|
||||
|
||||
new_connections.insert((resource, gateway));
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
state.client.exec_mut(|client| client.on_dns_query(query));
|
||||
}
|
||||
|
||||
for (resource, gateway) in new_connections.into_iter() {
|
||||
state.client.exec_mut(|client| {
|
||||
client.connect_to_internet_or_cidr_resource(resource, gateway)
|
||||
});
|
||||
}
|
||||
}
|
||||
Transition::SendIcmpPacket {
|
||||
src,
|
||||
|
||||
@@ -431,13 +431,6 @@ pub struct RefClient {
|
||||
#[debug(skip)]
|
||||
pub(crate) connected_cidr_resources: HashSet<ResourceId>,
|
||||
|
||||
/// The DNS resources the client is connected to.
|
||||
#[debug(skip)]
|
||||
pub(crate) connected_dns_resources: HashSet<ResourceId>,
|
||||
|
||||
#[debug(skip)]
|
||||
pub(crate) connected_gateways: BTreeSet<GatewayId>,
|
||||
|
||||
/// Actively disabled resources by the UI
|
||||
#[debug(skip)]
|
||||
pub(crate) disabled_resources: BTreeSet<ResourceId>,
|
||||
@@ -486,7 +479,6 @@ impl RefClient {
|
||||
self.ipv6_routes.remove(resource);
|
||||
|
||||
self.connected_cidr_resources.remove(resource);
|
||||
self.connected_dns_resources.remove(resource);
|
||||
|
||||
if self.internet_resource.is_some_and(|r| &r == resource) {
|
||||
self.connected_internet_resource = false;
|
||||
@@ -530,9 +522,7 @@ impl RefClient {
|
||||
|
||||
pub(crate) fn reset_connections(&mut self) {
|
||||
self.connected_cidr_resources.clear();
|
||||
self.connected_dns_resources.clear();
|
||||
self.connected_internet_resource = false;
|
||||
self.connected_gateways.clear();
|
||||
}
|
||||
|
||||
pub(crate) fn add_internet_resource(&mut self, r: InternetResource) {
|
||||
@@ -675,41 +665,22 @@ impl RefClient {
|
||||
return;
|
||||
};
|
||||
|
||||
if self.is_connected_to_resource(resource) && self.is_tunnel_ip(src) {
|
||||
tracing::debug!("Connected to resource, expecting packet to be routed");
|
||||
map(self)
|
||||
.entry(gateway)
|
||||
.or_default()
|
||||
.insert(payload, packet_id);
|
||||
self.connect_to_resource(resource, dst);
|
||||
|
||||
if !self.is_tunnel_ip(src) {
|
||||
return;
|
||||
}
|
||||
|
||||
if let Destination::DomainName { name: dst, .. } = &dst {
|
||||
debug_assert!(
|
||||
self.dns_records.iter().any(|(name, _)| name == dst),
|
||||
"Should only sample domains that we resolved"
|
||||
);
|
||||
}
|
||||
|
||||
tracing::debug!("Not connected to resource, expecting to trigger connection intent");
|
||||
|
||||
self.connect_to_resource(resource, dst, gateway);
|
||||
map(self)
|
||||
.entry(gateway)
|
||||
.or_default()
|
||||
.insert(payload, packet_id);
|
||||
}
|
||||
|
||||
fn connect_to_resource(
|
||||
&mut self,
|
||||
resource: ResourceId,
|
||||
destination: Destination,
|
||||
gateway: GatewayId,
|
||||
) {
|
||||
fn connect_to_resource(&mut self, resource: ResourceId, destination: Destination) {
|
||||
match destination {
|
||||
Destination::DomainName { .. } => {
|
||||
if !self.disabled_resources.contains(&resource) {
|
||||
self.connected_dns_resources.insert(resource);
|
||||
self.connected_gateways.insert(gateway);
|
||||
}
|
||||
}
|
||||
Destination::IpAddr(_) => self.connect_to_internet_or_cidr_resource(resource, gateway),
|
||||
Destination::DomainName { .. } => {}
|
||||
Destination::IpAddr(_) => self.connect_to_internet_or_cidr_resource(resource),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -717,20 +688,14 @@ impl RefClient {
|
||||
self.is_connected_to_cidr(resource) || self.is_connected_to_internet(resource)
|
||||
}
|
||||
|
||||
pub(crate) fn connect_to_internet_or_cidr_resource(
|
||||
&mut self,
|
||||
resource: ResourceId,
|
||||
gateway: GatewayId,
|
||||
) {
|
||||
pub(crate) fn connect_to_internet_or_cidr_resource(&mut self, resource: ResourceId) {
|
||||
if self.internet_resource.is_some_and(|r| r == resource) {
|
||||
self.connected_internet_resource = true;
|
||||
self.connected_gateways.insert(gateway);
|
||||
return;
|
||||
}
|
||||
|
||||
if self.cidr_resources.iter().any(|(_, r)| *r == resource) {
|
||||
self.connected_cidr_resources.insert(resource);
|
||||
self.connected_gateways.insert(gateway);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -766,14 +731,6 @@ impl RefClient {
|
||||
.collect_vec()
|
||||
}
|
||||
|
||||
fn is_connected_to_resource(&self, resource: ResourceId) -> bool {
|
||||
if self.is_connected_to_internet_or_cidr(resource) {
|
||||
return true;
|
||||
}
|
||||
|
||||
self.connected_dns_resources.contains(&resource)
|
||||
}
|
||||
|
||||
fn is_connected_to_internet(&self, id: ResourceId) -> bool {
|
||||
self.active_internet_resource() == Some(id) && self.connected_internet_resource
|
||||
}
|
||||
@@ -787,21 +744,6 @@ impl RefClient {
|
||||
.filter(|r| !self.disabled_resources.contains(r))
|
||||
}
|
||||
|
||||
pub(crate) fn is_locally_answered_query(&self, domain: &DomainName, rtype: Rtype) -> bool {
|
||||
let is_known_host = self.known_hosts.contains_key(&domain.to_string());
|
||||
let is_dns_resource = self.dns_resource_by_domain(domain).is_some();
|
||||
let is_suppported_type = matches!(rtype, Rtype::A | Rtype::AAAA | Rtype::PTR);
|
||||
|
||||
if matches!(rtype, Rtype::PTR)
|
||||
&& crate::dns::reverse_dns_addr(&domain.to_string())
|
||||
.is_some_and(|ip| self.known_hosts.values().flatten().any(|c| c == &ip))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
(is_known_host || is_dns_resource) && is_suppported_type
|
||||
}
|
||||
|
||||
fn resource_by_dst(&self, destination: &Destination) -> Option<ResourceId> {
|
||||
match destination {
|
||||
Destination::DomainName { name, .. } => {
|
||||
@@ -1097,7 +1039,6 @@ fn ref_client(
|
||||
cidr_resources: IpNetworkTable::new(),
|
||||
dns_records: Default::default(),
|
||||
connected_cidr_resources: Default::default(),
|
||||
connected_dns_resources: Default::default(),
|
||||
connected_internet_resource: Default::default(),
|
||||
expected_icmp_handshakes: Default::default(),
|
||||
expected_udp_handshakes: Default::default(),
|
||||
@@ -1108,7 +1049,6 @@ fn ref_client(
|
||||
resources: Default::default(),
|
||||
ipv4_routes: Default::default(),
|
||||
ipv6_routes: Default::default(),
|
||||
connected_gateways: Default::default(),
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
@@ -24,6 +24,9 @@ export default function Android() {
|
||||
Makes use of the new control protocol, delivering faster and more
|
||||
robust connection establishment.
|
||||
</ChangeItem>
|
||||
<ChangeItem pull="7477">
|
||||
Improves connection setup latency by buffering initial packets.
|
||||
</ChangeItem>
|
||||
</Unreleased>
|
||||
<Entry version="1.4.7" date={new Date("2024-11-08")}>
|
||||
<ChangeItem pull="7263">
|
||||
|
||||
@@ -24,6 +24,9 @@ export default function Apple() {
|
||||
Makes use of the new control protocol, delivering faster and more
|
||||
robust connection establishment.
|
||||
</ChangeItem>
|
||||
<ChangeItem pull="7477">
|
||||
Improves connection setup latency by buffering initial packets.
|
||||
</ChangeItem>
|
||||
</Unreleased>
|
||||
<Entry version="1.3.9" date={new Date("2024-11-08")}>
|
||||
<ChangeItem pull="7288">
|
||||
|
||||
@@ -29,6 +29,9 @@ export default function GUI({ title }: { title: string }) {
|
||||
improving performance.
|
||||
</ChangeItem>
|
||||
)}
|
||||
<ChangeItem pull="7477">
|
||||
Improves connection setup latency by buffering initial packets.
|
||||
</ChangeItem>
|
||||
</Unreleased>
|
||||
<Entry version="1.3.13" date={new Date("2024-11-15")}>
|
||||
<ChangeItem pull="7334">
|
||||
|
||||
@@ -27,6 +27,9 @@ export default function Headless() {
|
||||
Uses multiple threads to read & write to the TUN device, greatly
|
||||
improving performance.
|
||||
</ChangeItem>
|
||||
<ChangeItem pull="7477">
|
||||
Improves connection setup latency by buffering initial packets.
|
||||
</ChangeItem>
|
||||
</Unreleased>
|
||||
<Entry version="1.3.7" date={new Date("2024-11-15")}>
|
||||
<ChangeItem pull="7334">
|
||||
|
||||
Reference in New Issue
Block a user