mirror of
https://github.com/outbackdingo/firezone.git
synced 2026-01-27 10:18:54 +00:00
fix(connlib): prioritise GSO batches with smaller segments (#8772)
In order to implement GSO in `connlib`, we opted for an approach where packets of the same length are being appended to a buffer. Each of these buffers is the sent to the kernel in a single syscall, which drastically decreases the per-packet overhead of syscalls and therefore improves performance. Within `connlib` itself, we prioritise control-protocol associated packets over tunnel traffic. The idea here is that even under high-load, we want to ensure that STUN probes between the peers and to the relays are sent in a timely manner. Failing to send these probes results in a false-positive detection of a lost connection because the `connlib`'s internal state uses timeouts to detect such situations. Despite processing the packets itself in a timely manner, it is still possible that they get delayed depending on which order the get flushed to the socket. This order is currently non-deterministic because `GsoQueue` uses a `HashMap` internally and when accessing the batched-together datagrams, we just access it via `iter_mut`. To fix this, we use a `BTreeMap` instead and explicitly define the `Key` to start with the `segment_size` field. As a result, entries within the `BTreeMap` will be sorted ascending by `segment_size` (i.e. the size of individual packets within the batch). Packets of smaller size are more likely to be control messages like STUN binding requests or TURN messages to the relays for managing allocations. By sorting the map explicitly, we ensure that if the UDP socket is ready to send, we flush out these messages first before moving on to bigger packets such as the ones containing (more likely) WireGuard data messages.
This commit is contained in:
@@ -2,6 +2,8 @@ avoid-breaking-exported-api = false # We don't publish anything to crates.io, he
|
||||
disallowed-methods = [
|
||||
{ path = "std::collections::HashMap::iter", reason = "HashMap has non-deterministic iteration order, use BTreeMap instead" },
|
||||
{ path = "std::collections::HashSet::iter", reason = "HashSet has non-deterministic iteration order, use BTreeSet instead" },
|
||||
{ path = "std::collections::HashMap::iter_mut", reason = "HashMap has non-deterministic iteration order, use BTreeMap instead" },
|
||||
{ path = "std::collections::HashSet::iter_mut", reason = "HashSet has non-deterministic iteration order, use BTreeSet instead" },
|
||||
{ path = "std::collections::HashMap::into_iter", reason = "HashMap has non-deterministic iteration order, use BTreeMap instead" },
|
||||
{ path = "std::collections::HashSet::into_iter", reason = "HashSet has non-deterministic iteration order, use BTreeSet instead" },
|
||||
{ path = "tracing::subscriber::set_global_default", reason = "Does not init `LogTracer`, use `firezone_logging::init` instead." },
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
collections::BTreeMap,
|
||||
net::SocketAddr,
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
@@ -19,7 +19,7 @@ const MAX_SEGMENT_SIZE: usize =
|
||||
/// Calling [`Io::send_network`](super::Io::send_network) will copy the provided payload into this buffer.
|
||||
/// The buffer is then flushed using GSO in a single syscall.
|
||||
pub struct GsoQueue {
|
||||
inner: HashMap<Key, DatagramBuffer>,
|
||||
inner: BTreeMap<Key, DatagramBuffer>,
|
||||
buffer_pool: Arc<lockfree_object_pool::SpinLockObjectPool<BytesMut>>,
|
||||
}
|
||||
|
||||
@@ -117,9 +117,9 @@ impl GsoQueue {
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Clone, Copy)]
|
||||
struct Key {
|
||||
segment_size: usize, // `segment_size` comes first to ensure that the datagrams are flushed to the socket in ascending order.
|
||||
src: Option<SocketAddr>,
|
||||
dst: SocketAddr,
|
||||
segment_size: usize,
|
||||
}
|
||||
|
||||
struct DatagramBuffer {
|
||||
@@ -139,7 +139,7 @@ mod tests {
|
||||
let now = Instant::now();
|
||||
let mut send_queue = GsoQueue::new();
|
||||
|
||||
send_queue.enqueue(None, DST, b"foobar", Ecn::NonEct, now);
|
||||
send_queue.enqueue(None, DST_1, b"foobar", Ecn::NonEct, now);
|
||||
for _entry in send_queue.datagrams() {}
|
||||
|
||||
send_queue.handle_timeout(now + Duration::from_secs(60));
|
||||
@@ -152,7 +152,7 @@ mod tests {
|
||||
let now = Instant::now();
|
||||
let mut send_queue = GsoQueue::new();
|
||||
|
||||
send_queue.enqueue(None, DST, b"foobar", Ecn::NonEct, now);
|
||||
send_queue.enqueue(None, DST_1, b"foobar", Ecn::NonEct, now);
|
||||
|
||||
send_queue.handle_timeout(now + Duration::from_secs(60));
|
||||
|
||||
@@ -164,7 +164,7 @@ mod tests {
|
||||
let now = Instant::now();
|
||||
let mut send_queue = GsoQueue::new();
|
||||
|
||||
send_queue.enqueue(None, DST, b"foobar", Ecn::NonEct, now);
|
||||
send_queue.enqueue(None, DST_1, b"foobar", Ecn::NonEct, now);
|
||||
|
||||
let datagrams = send_queue.datagrams();
|
||||
drop(datagrams);
|
||||
@@ -172,7 +172,7 @@ mod tests {
|
||||
let datagrams = send_queue.datagrams().collect::<Vec<_>>();
|
||||
|
||||
assert_eq!(datagrams.len(), 1);
|
||||
assert_eq!(datagrams[0].dst, DST);
|
||||
assert_eq!(datagrams[0].dst, DST_1);
|
||||
assert_eq!(datagrams[0].packet.as_ref(), b"foobar");
|
||||
}
|
||||
|
||||
@@ -181,7 +181,7 @@ mod tests {
|
||||
let now = Instant::now();
|
||||
let mut send_queue = GsoQueue::new();
|
||||
|
||||
send_queue.enqueue(None, DST, b"foobar", Ecn::NonEct, now);
|
||||
send_queue.enqueue(None, DST_1, b"foobar", Ecn::NonEct, now);
|
||||
send_queue.enqueue(None, DST_2, b"bar", Ecn::NonEct, now);
|
||||
|
||||
// Taking it from the iterator is "sending" ...
|
||||
@@ -192,6 +192,35 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
const DST: SocketAddr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 1234));
|
||||
const DST_2: SocketAddr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 5678));
|
||||
#[test]
|
||||
fn prioritises_small_packets() {
|
||||
let now = Instant::now();
|
||||
let mut send_queue = GsoQueue::new();
|
||||
|
||||
send_queue.enqueue(
|
||||
None,
|
||||
DST_1,
|
||||
b"foobarfoobarfoobarfoobarfoobarfoobarfoobarfoobar",
|
||||
Ecn::NonEct,
|
||||
now,
|
||||
);
|
||||
send_queue.enqueue(None, DST_2, b"barbaz", Ecn::NonEct, now);
|
||||
send_queue.enqueue(None, DST_3, b"barbaz1234", Ecn::NonEct, now);
|
||||
send_queue.enqueue(None, DST_4, b"b", Ecn::NonEct, now);
|
||||
send_queue.enqueue(None, DST_5, b"barbazfoobafoobarfoobar", Ecn::NonEct, now);
|
||||
send_queue.enqueue(None, DST_2, b"baz", Ecn::NonEct, now);
|
||||
|
||||
let datagrams = send_queue.datagrams().collect::<Vec<_>>();
|
||||
|
||||
let is_sorted = datagrams.is_sorted_by_key(|datagram| datagram.segment_size);
|
||||
|
||||
assert!(is_sorted);
|
||||
assert_eq!(datagrams[0].segment_size, Some(1));
|
||||
}
|
||||
|
||||
const DST_1: SocketAddr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 1111));
|
||||
const DST_2: SocketAddr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 2222));
|
||||
const DST_3: SocketAddr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 3333));
|
||||
const DST_4: SocketAddr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 4444));
|
||||
const DST_5: SocketAddr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 5555));
|
||||
}
|
||||
|
||||
@@ -15,7 +15,7 @@ use ip_packet::{IcmpEchoHeader, Icmpv4Type, Icmpv6Type, IpPacket};
|
||||
use proptest::prelude::*;
|
||||
use snownet::Transmit;
|
||||
use std::{
|
||||
collections::{BTreeMap, HashMap},
|
||||
collections::BTreeMap,
|
||||
iter,
|
||||
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
|
||||
time::Instant,
|
||||
@@ -36,8 +36,8 @@ pub(crate) struct SimGateway {
|
||||
pub(crate) received_tcp_requests: BTreeMap<u64, IpPacket>,
|
||||
|
||||
site_specific_dns_records: DnsRecords,
|
||||
udp_dns_server_resources: HashMap<SocketAddr, UdpDnsServerResource>,
|
||||
tcp_dns_server_resources: HashMap<SocketAddr, TcpDnsServerResource>,
|
||||
udp_dns_server_resources: BTreeMap<SocketAddr, UdpDnsServerResource>,
|
||||
tcp_dns_server_resources: BTreeMap<SocketAddr, TcpDnsServerResource>,
|
||||
}
|
||||
|
||||
impl SimGateway {
|
||||
|
||||
Reference in New Issue
Block a user