build(deps): bump quinn-udp dependency (#5872)

In the new version, `quinn-udp` no longer supports sending multiple
`Transmit`s at once via `sendmmsg`. We made use of that to send all
buffered packets in one go.

In reality, these buffered packets can only be control messages like
STUN requests to relays or something like that. For the hot-patch of
routing packets, we only ever read a single IP packet from the TUN
device and attempt to send it out right away. At most, we may buffer one
packet at a time here in case the socket is busy.

Getting these wake-ups right is quite tricky. I think we should
prioritise #3837 soon. Once that is integrated, we can use `async/await`
for the high-level integration between `Io` and the state which allows
us to simply suspend until we can send the message, avoiding the need
for a dedicated buffer.

---------

Signed-off-by: Thomas Eizinger <thomas@eizinger.io>
Co-authored-by: Reactor Scram <ReactorScram@users.noreply.github.com>
This commit is contained in:
Thomas Eizinger
2024-07-17 08:46:18 +10:00
committed by GitHub
parent 63623346b9
commit 93651d483c
3 changed files with 62 additions and 55 deletions

5
rust/Cargo.lock generated
View File

@@ -4908,10 +4908,9 @@ dependencies = [
[[package]]
name = "quinn-udp"
version = "0.5.0"
source = "git+https://github.com/quinn-rs/quinn?branch=main#88f48b0179f358cea0b76fc550e629007bfc957d"
version = "0.5.2"
source = "git+https://github.com/quinn-rs/quinn?branch=main#3f489e2eab014ddd04de58e570ba56e9b027f0bc"
dependencies = [
"bytes",
"libc",
"once_cell",
"socket2 0.5.7",

View File

@@ -3,7 +3,6 @@ use crate::{
dns::DnsQuery,
sockets::{Received, Sockets},
};
use bytes::Bytes;
use connlib_shared::messages::DnsServer;
use futures::Future;
use futures_bounded::FuturesTupleSet;
@@ -16,7 +15,6 @@ use hickory_resolver::{
AsyncResolver, TokioHandle,
};
use ip_packet::{IpPacket, MutableIpPacket};
use quinn_udp::Transmit;
use socket_factory::SocketFactory;
use std::{
collections::HashMap,
@@ -187,13 +185,7 @@ impl Io {
}
pub fn send_network(&mut self, transmit: snownet::Transmit) -> io::Result<()> {
self.sockets.try_send(Transmit {
destination: transmit.dst,
ecn: None,
contents: Bytes::copy_from_slice(&transmit.payload),
segment_size: None,
src_ip: transmit.src.map(|s| s.ip()),
})?;
self.sockets.send(transmit)?;
Ok(())
}

View File

@@ -2,6 +2,7 @@ use core::slice;
use quinn_udp::{RecvMeta, UdpSockRef, UdpSocketState};
use socket_factory::SocketFactory;
use std::{
collections::VecDeque,
io::{self, IoSliceMut},
net::{Ipv4Addr, Ipv6Addr, SocketAddr},
task::{ready, Context, Poll},
@@ -64,23 +65,18 @@ impl Sockets {
Poll::Ready(Ok(()))
}
pub fn try_send(&mut self, transmit: quinn_udp::Transmit) -> io::Result<()> {
match transmit.destination {
SocketAddr::V4(dst) => {
let socket = self.socket_v4.as_mut().ok_or(io::Error::new(
io::ErrorKind::NotConnected,
format!("failed send packet to {dst}: no IPv4 socket"),
))?;
socket.send(transmit);
}
SocketAddr::V6(dst) => {
let socket = self.socket_v6.as_mut().ok_or(io::Error::new(
io::ErrorKind::NotConnected,
format!("failed send packet to {dst}: no IPv6 socket"),
))?;
socket.send(transmit);
}
}
pub fn send(&mut self, transmit: snownet::Transmit) -> io::Result<()> {
let socket = match transmit.dst {
SocketAddr::V4(dst) => self.socket_v4.as_mut().ok_or(io::Error::new(
io::ErrorKind::NotConnected,
format!("failed send packet to {dst}: no IPv4 socket"),
))?,
SocketAddr::V6(dst) => self.socket_v6.as_mut().ok_or(io::Error::new(
io::ErrorKind::NotConnected,
format!("failed send packet to {dst}: no IPv6 socket"),
))?,
};
socket.send(transmit)?;
Ok(())
}
@@ -166,7 +162,7 @@ struct Socket {
port: u16,
socket: UdpSocket,
buffered_transmits: Vec<quinn_udp::Transmit>,
buffered_transmits: VecDeque<snownet::Transmit<'static>>,
}
impl Socket {
@@ -181,7 +177,7 @@ impl Socket {
state: UdpSocketState::new(UdpSockRef::from(&socket))?,
port,
socket,
buffered_transmits: Vec::new(),
buffered_transmits: VecDeque::new(),
})
}
@@ -252,42 +248,62 @@ impl Socket {
fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
loop {
match self.socket.try_io(Interest::WRITABLE, || {
self.state
.send((&self.socket).into(), &self.buffered_transmits)
}) {
Ok(0) => break,
Err(e) if e.kind() == io::ErrorKind::WouldBlock => break,
Err(e) => return Poll::Ready(Err(e)),
ready!(self.socket.poll_send_ready(cx))?; // Ensure we are ready to send.
Ok(num_sent) => {
self.buffered_transmits.drain(..num_sent);
// I am not sure if we'd ever send less than what is in `buffered_transmits`.
// loop once more to be sure we `break` on either an empty buffer or on `WouldBlock`.
}
let Some(transmit) = self.buffered_transmits.pop_front() else {
break;
};
match self.try_send(&transmit) {
Ok(()) => continue, // Try to send another packet.
Err(e) => {
self.buffered_transmits.push_front(transmit); // Don't lose the packet if we fail.
if e.kind() == io::ErrorKind::WouldBlock {
continue; // False positive send-readiness: Loop to `poll_send_ready` and return `Pending`.
}
return Poll::Ready(Err(e));
}
}
}
// Ensure we are ready to send more data.
ready!(self.socket.poll_send_ready(cx)?);
assert!(
self.buffered_transmits.is_empty(),
"buffer must be empty if we are ready to send more data"
);
assert!(self.buffered_transmits.is_empty());
Poll::Ready(Ok(()))
}
fn send(&mut self, transmit: quinn_udp::Transmit) {
tracing::trace!(target: "wire::net::send", src = ?transmit.src_ip, dst = %transmit.destination, num_bytes = %transmit.contents.len());
self.buffered_transmits.push(transmit);
fn send(&mut self, transmit: snownet::Transmit) -> io::Result<()> {
tracing::trace!(target: "wire::net::send", src = ?transmit.src, dst = %transmit.dst, num_bytes = %transmit.payload.len());
debug_assert!(
self.buffered_transmits.len() < 10_000,
"We are not flushing the packets for some reason"
);
match self.try_send(&transmit) {
Ok(()) => Ok(()),
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
tracing::trace!("Buffering packet because socket is busy");
self.buffered_transmits.push_back(transmit.into_owned());
Ok(())
}
Err(e) => Err(e),
}
}
fn try_send(&self, transmit: &snownet::Transmit) -> io::Result<()> {
let transmit = quinn_udp::Transmit {
destination: transmit.dst,
ecn: None,
contents: &transmit.payload,
segment_size: None,
src_ip: transmit.src.map(|s| s.ip()),
};
self.socket.try_io(Interest::WRITABLE, || {
self.state.send((&self.socket).into(), &transmit)
})
}
}