fix(connlib): send all unwritten packets before reading new ones (#7342)

With the parallelisation of TUN and UDP operations, we lost
backpressure: Packets can now be read quicker from the UDP sockets than
they can be sent out the TUN device, causing packet loss in extremely
high-throughput situations.

To avoid this, we don't directly send packets into the channel to the
TUN device thread. This channel is bounded, meaning sending can fail if
reading UDP packets is faster than writing packets to the TUN device.

Due to GRO, we may read multiple UDP packets in one go, requiring us to
write multiple IP packets to the TUN device as part of a single
iteration in the event-loop. Thus, we cannot know, how much space we
need in the channel for outgoing IP packets.

By introducing a dedicated buffer, we can temporarily hold on to all of
these packets and on the next call to `poll`, we flush them out into the
channel. If the channel is full, we will suspend and only continue once
there is space in the channel. This behaviour restores backpressue
because we won't read UDP packets from the socket unless we have space
to write the corresponding packet to the TUN device.

UDP itself actually doesn't have any backpressure, instead the packets
will simply get dropped once the receive buffer overflows. The UDP
packets however carry encrypted IP packets, meaning whatever protocol
sits inside these packets will detect the packet loss and should
throttle their sending-pace accordingly.
This commit is contained in:
Thomas Eizinger
2024-11-14 06:25:03 +00:00
committed by GitHub
parent e2117dd220
commit 4e423dc51c
4 changed files with 33 additions and 22 deletions

1
rust/Cargo.lock generated
View File

@@ -2239,6 +2239,7 @@ dependencies = [
"test-strategy",
"thiserror",
"tokio",
"tokio-util",
"tracing",
"tracing-subscriber",
"tun",

View File

@@ -37,6 +37,7 @@ socket-factory = { workspace = true }
socket2 = { workspace = true }
thiserror = { version = "1.0", default-features = false }
tokio = { workspace = true }
tokio-util = "0.7.12"
tracing = { workspace = true, features = ["attributes"] }
tun = { workspace = true }
uuid = { version = "1.10", default-features = false, features = ["std", "v4"] }

View File

@@ -11,6 +11,7 @@ use ip_packet::{IpPacket, MAX_DATAGRAM_PAYLOAD};
use snownet::{EncryptBuffer, EncryptedPacket};
use socket_factory::{DatagramIn, DatagramOut, SocketFactory, TcpSocket, UdpSocket};
use std::{
collections::VecDeque,
io,
net::{Ipv4Addr, Ipv6Addr, SocketAddr},
pin::Pin,
@@ -22,6 +23,7 @@ use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
sync::mpsc,
};
use tokio_util::sync::PollSender;
use tracing::Instrument;
use tun::Tun;
@@ -37,8 +39,10 @@ pub struct Io {
dns_queries: FuturesTupleSet<io::Result<Message<Vec<u8>>>, DnsQueryMetaData>,
timeout: Option<Pin<Box<tokio::time::Sleep>>>,
tun_tx: mpsc::Sender<Box<dyn Tun>>,
outbound_packet_tx: mpsc::Sender<IpPacket>,
outbound_packet_buffer: VecDeque<IpPacket>,
outbound_packet_tx: PollSender<IpPacket>,
inbound_packet_rx: mpsc::Receiver<IpPacket>,
}
@@ -88,7 +92,8 @@ impl Io {
Self {
tun_tx,
outbound_packet_tx,
outbound_packet_buffer: VecDeque::with_capacity(10), // It is unlikely that we process more than 10 packets after 1 GRO call.
outbound_packet_tx: PollSender::new(outbound_packet_tx),
inbound_packet_rx,
timeout: None,
sockets,
@@ -162,11 +167,27 @@ impl Io {
ready!(self.sockets.poll_send_ready(cx))?;
// If the `unwritten_packet` is set, `EncryptBuffer` is still holding a packet that we need so send.
let Some(unwritten_packet) = self.unwritten_packet.take() else {
return Poll::Ready(Ok(()));
if let Some(unwritten_packet) = self.unwritten_packet.take() {
self.send_encrypted_packet(unwritten_packet, buf)?;
};
self.send_encrypted_packet(unwritten_packet, buf)?;
loop {
// First, acquire a slot in the channel.
ready!(self
.outbound_packet_tx
.poll_reserve(cx)
.map_err(|_| io::ErrorKind::BrokenPipe)?);
// Second, check if we have any buffer packets.
let Some(packet) = self.outbound_packet_buffer.pop_front() else {
break; // No more packets? All done.
};
// Third, send the packet.
self.outbound_packet_tx
.send_item(packet)
.map_err(|_| io::ErrorKind::BrokenPipe)?;
}
Poll::Ready(Ok(()))
}
@@ -179,20 +200,8 @@ impl Io {
.expect("Channel to set new TUN device should always have capacity");
}
pub fn send_tun(&mut self, packet: IpPacket) -> io::Result<()> {
let Err(e) = self.outbound_packet_tx.try_send(packet) else {
return Ok(());
};
match e {
mpsc::error::TrySendError::Full(_) => {
Err(io::Error::other("Outbound packet channel is at capacity"))
}
mpsc::error::TrySendError::Closed(_) => Err(io::Error::new(
io::ErrorKind::NotConnected,
"Outbound packet channel is disconnected",
)),
}
pub fn send_tun(&mut self, packet: IpPacket) {
self.outbound_packet_buffer.push_back(packet);
}
pub fn rebind_sockets(&mut self) {

View File

@@ -120,7 +120,7 @@ impl ClientTunnel {
}
if let Some(packet) = self.role_state.poll_packets() {
self.io.send_tun(packet)?;
self.io.send_tun(packet);
continue;
}
@@ -177,7 +177,7 @@ impl ClientTunnel {
continue;
};
self.io.send_tun(packet)?;
self.io.send_tun(packet);
}
continue;
@@ -282,7 +282,7 @@ impl GatewayTunnel {
continue;
};
self.io.send_tun(packet)?;
self.io.send_tun(packet);
}
continue;