From 4e423dc51cbdf9700d6bda9c4538aab5a2fc7b36 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 14 Nov 2024 06:25:03 +0000 Subject: [PATCH] fix(connlib): send all unwritten packets before reading new ones (#7342) With the parallelisation of TUN and UDP operations, we lost backpressure: Packets can now be read quicker from the UDP sockets than they can be sent out the TUN device, causing packet loss in extremely high-throughput situations. To avoid this, we don't directly send packets into the channel to the TUN device thread. This channel is bounded, meaning sending can fail if reading UDP packets is faster than writing packets to the TUN device. Due to GRO, we may read multiple UDP packets in one go, requiring us to write multiple IP packets to the TUN device as part of a single iteration in the event-loop. Thus, we cannot know, how much space we need in the channel for outgoing IP packets. By introducing a dedicated buffer, we can temporarily hold on to all of these packets and on the next call to `poll`, we flush them out into the channel. If the channel is full, we will suspend and only continue once there is space in the channel. This behaviour restores backpressue because we won't read UDP packets from the socket unless we have space to write the corresponding packet to the TUN device. UDP itself actually doesn't have any backpressure, instead the packets will simply get dropped once the receive buffer overflows. The UDP packets however carry encrypted IP packets, meaning whatever protocol sits inside these packets will detect the packet loss and should throttle their sending-pace accordingly. --- rust/Cargo.lock | 1 + rust/connlib/tunnel/Cargo.toml | 1 + rust/connlib/tunnel/src/io.rs | 47 ++++++++++++++++++++-------------- rust/connlib/tunnel/src/lib.rs | 6 ++--- 4 files changed, 33 insertions(+), 22 deletions(-) diff --git a/rust/Cargo.lock b/rust/Cargo.lock index d12030c04..1e065d97b 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -2239,6 +2239,7 @@ dependencies = [ "test-strategy", "thiserror", "tokio", + "tokio-util", "tracing", "tracing-subscriber", "tun", diff --git a/rust/connlib/tunnel/Cargo.toml b/rust/connlib/tunnel/Cargo.toml index d8c16f2dc..c37532989 100644 --- a/rust/connlib/tunnel/Cargo.toml +++ b/rust/connlib/tunnel/Cargo.toml @@ -37,6 +37,7 @@ socket-factory = { workspace = true } socket2 = { workspace = true } thiserror = { version = "1.0", default-features = false } tokio = { workspace = true } +tokio-util = "0.7.12" tracing = { workspace = true, features = ["attributes"] } tun = { workspace = true } uuid = { version = "1.10", default-features = false, features = ["std", "v4"] } diff --git a/rust/connlib/tunnel/src/io.rs b/rust/connlib/tunnel/src/io.rs index bd5845393..59f068dba 100644 --- a/rust/connlib/tunnel/src/io.rs +++ b/rust/connlib/tunnel/src/io.rs @@ -11,6 +11,7 @@ use ip_packet::{IpPacket, MAX_DATAGRAM_PAYLOAD}; use snownet::{EncryptBuffer, EncryptedPacket}; use socket_factory::{DatagramIn, DatagramOut, SocketFactory, TcpSocket, UdpSocket}; use std::{ + collections::VecDeque, io, net::{Ipv4Addr, Ipv6Addr, SocketAddr}, pin::Pin, @@ -22,6 +23,7 @@ use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, sync::mpsc, }; +use tokio_util::sync::PollSender; use tracing::Instrument; use tun::Tun; @@ -37,8 +39,10 @@ pub struct Io { dns_queries: FuturesTupleSet>>, DnsQueryMetaData>, timeout: Option>>, + tun_tx: mpsc::Sender>, - outbound_packet_tx: mpsc::Sender, + outbound_packet_buffer: VecDeque, + outbound_packet_tx: PollSender, inbound_packet_rx: mpsc::Receiver, } @@ -88,7 +92,8 @@ impl Io { Self { tun_tx, - outbound_packet_tx, + outbound_packet_buffer: VecDeque::with_capacity(10), // It is unlikely that we process more than 10 packets after 1 GRO call. + outbound_packet_tx: PollSender::new(outbound_packet_tx), inbound_packet_rx, timeout: None, sockets, @@ -162,11 +167,27 @@ impl Io { ready!(self.sockets.poll_send_ready(cx))?; // If the `unwritten_packet` is set, `EncryptBuffer` is still holding a packet that we need so send. - let Some(unwritten_packet) = self.unwritten_packet.take() else { - return Poll::Ready(Ok(())); + if let Some(unwritten_packet) = self.unwritten_packet.take() { + self.send_encrypted_packet(unwritten_packet, buf)?; }; - self.send_encrypted_packet(unwritten_packet, buf)?; + loop { + // First, acquire a slot in the channel. + ready!(self + .outbound_packet_tx + .poll_reserve(cx) + .map_err(|_| io::ErrorKind::BrokenPipe)?); + + // Second, check if we have any buffer packets. + let Some(packet) = self.outbound_packet_buffer.pop_front() else { + break; // No more packets? All done. + }; + + // Third, send the packet. + self.outbound_packet_tx + .send_item(packet) + .map_err(|_| io::ErrorKind::BrokenPipe)?; + } Poll::Ready(Ok(())) } @@ -179,20 +200,8 @@ impl Io { .expect("Channel to set new TUN device should always have capacity"); } - pub fn send_tun(&mut self, packet: IpPacket) -> io::Result<()> { - let Err(e) = self.outbound_packet_tx.try_send(packet) else { - return Ok(()); - }; - - match e { - mpsc::error::TrySendError::Full(_) => { - Err(io::Error::other("Outbound packet channel is at capacity")) - } - mpsc::error::TrySendError::Closed(_) => Err(io::Error::new( - io::ErrorKind::NotConnected, - "Outbound packet channel is disconnected", - )), - } + pub fn send_tun(&mut self, packet: IpPacket) { + self.outbound_packet_buffer.push_back(packet); } pub fn rebind_sockets(&mut self) { diff --git a/rust/connlib/tunnel/src/lib.rs b/rust/connlib/tunnel/src/lib.rs index 0e2fbe613..6f6f45c4f 100644 --- a/rust/connlib/tunnel/src/lib.rs +++ b/rust/connlib/tunnel/src/lib.rs @@ -120,7 +120,7 @@ impl ClientTunnel { } if let Some(packet) = self.role_state.poll_packets() { - self.io.send_tun(packet)?; + self.io.send_tun(packet); continue; } @@ -177,7 +177,7 @@ impl ClientTunnel { continue; }; - self.io.send_tun(packet)?; + self.io.send_tun(packet); } continue; @@ -282,7 +282,7 @@ impl GatewayTunnel { continue; }; - self.io.send_tun(packet)?; + self.io.send_tun(packet); } continue;