refactor(rust): use a buffer pool for network packets (#7489)

In order to achieve concurrency within `connlib`, we needed to create a
way for IP packets to own the piece of memory they are sitting in. This
allows us to concurrently read IP packets and them batch-process them
(as opposed to have a dedicated buffer and reference it). At the moment,
those IP packets are defined on the stack. With a size of ~1300 bytes
that isn't very large but still causes _some_ amount of copying.

We can avoid this copying by relying on a buffer pool:

1. When reading a new IP packet, we request a new buffer from the pool.
2. When the IP packet gets dropped, the buffer gets returned to the
pool.

This allows us to reuse an allocation for a packet once it finished
processing, resulting in less CPU time spent on copying around memory.

This causes us to make more _individual_ heap-allocations in the
beginning: Each packet is being processed by `connlib` is allocated on
the heap somewhere. At some point during the lifetime of the tunnel,
this will settle in an ideal state where we have allocated enough slots
to cover new packets whilst also reusing memory from packets that
finished processing already.

The actual `IpPacket` data type is now just a pointer. As a result, the
channels to and from the TUN thread (where we were holding multiple of
these packets) are now significantly smaller, leading to roughly the
same memory usage overall.

In my local testing on Linux, the client still only uses about ~15MB of
RAM even with multiple concurrent speedtests running.
This commit is contained in:
Thomas Eizinger
2024-12-16 09:02:17 +08:00
committed by GitHub
parent 8cecdc6906
commit aa8c53a20d
8 changed files with 123 additions and 42 deletions

2
rust/Cargo.lock generated
View File

@@ -3182,6 +3182,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"etherparse",
"lockfree-object-pool",
"proptest",
"test-strategy",
"thiserror",
@@ -5907,6 +5908,7 @@ dependencies = [
"hex-display",
"ip-packet",
"itertools 0.13.0",
"lockfree-object-pool",
"once_cell",
"rand 0.8.5",
"ringbuffer",

View File

@@ -109,6 +109,7 @@ subprocess = "0.2.9"
subtle = "2.5.0"
swift-bridge = "0.1.57"
swift-bridge-build = "0.1.57"
lockfree-object-pool = "0.1.6"
tauri = "2.0.3"
tauri-build = "2.0.1"
tauri-plugin-dialog = "2.0.1"

View File

@@ -14,6 +14,7 @@ hex = { workspace = true }
hex-display = { workspace = true }
ip-packet = { workspace = true }
itertools = { workspace = true }
lockfree-object-pool = { workspace = true }
once_cell = { workspace = true }
rand = { workspace = true }
ringbuffer = { workspace = true }

View File

@@ -8,7 +8,6 @@ use ::backoff::backoff::Backoff;
use bytecodec::{DecodeExt as _, EncodeExt as _};
use firezone_logging::{err_with_src, std_dyn_err};
use hex_display::HexDisplayExt as _;
use ip_packet::MAX_DATAGRAM_PAYLOAD;
use rand::random;
use ringbuffer::{AllocRingBuffer, RingBuffer as _};
use std::{
@@ -767,7 +766,7 @@ impl Allocation {
pub fn encode_to_encrypted_packet(
&self,
peer: SocketAddr,
mut buffer: [u8; MAX_DATAGRAM_PAYLOAD],
mut buffer: lockfree_object_pool::SpinLockOwnedReusable<Vec<u8>>,
buffer_len: usize,
now: Instant,
) -> Option<EncryptedPacket> {

View File

@@ -10,9 +10,7 @@ use boringtun::{noise::rate_limiter::RateLimiter, x25519::StaticSecret};
use core::fmt;
use firezone_logging::err_with_src;
use hex_display::HexDisplayExt;
use ip_packet::{
ConvertibleIpv4Packet, ConvertibleIpv6Packet, IpPacket, IpPacketBuf, MAX_DATAGRAM_PAYLOAD,
};
use ip_packet::{ConvertibleIpv4Packet, ConvertibleIpv6Packet, IpPacket, IpPacketBuf};
use rand::rngs::StdRng;
use rand::seq::IteratorRandom;
use rand::{random, Rng, SeedableRng};
@@ -126,6 +124,9 @@ pub struct Node<T, TId, RId> {
pending_events: VecDeque<Event<TId>>,
stats: NodeStats,
// All access to [`Node`] happens in the same thread, so we should never get contention which makes a spinlock ideal.
// This is wrapped in an `Arc` so we can use `pull_owned`.
buffer_pool: Arc<lockfree_object_pool::SpinLockObjectPool<Vec<u8>>>,
mode: T,
rng: StdRng,
@@ -180,6 +181,10 @@ where
allocations: Default::default(),
connections: Default::default(),
stats: Default::default(),
buffer_pool: Arc::new(lockfree_object_pool::SpinLockObjectPool::new(
|| vec![0; ip_packet::MAX_DATAGRAM_PAYLOAD],
|v| v.fill(0),
)),
}
}
@@ -438,11 +443,11 @@ where
.get_established_mut(&connection)
.ok_or(Error::NotConnected)?;
let mut buffer = EncryptBuffer::default();
let mut buffer = self.buffer_pool.pull_owned();
// Encode the packet with an offset of 4 bytes, in case we need to wrap it in a channel-data message.
let Some(packet_len) = conn
.encapsulate(packet.packet(), &mut buffer.inner[4..], now)?
.encapsulate(packet.packet(), &mut buffer[4..], now)?
.map(|p| p.len())
// Mapping to len() here terminate the mutable borrow of buffer, allowing re-borrowing further down.
else {
@@ -454,7 +459,7 @@ where
let socket = match &mut conn.state {
ConnectionState::Connecting { buffered, .. } => {
buffered.push(buffer.inner[packet_start..packet_end].to_vec());
buffered.push(buffer[packet_start..packet_end].to_vec());
let num_buffered = buffered.len();
let _guard = conn.span.enter();
@@ -477,7 +482,7 @@ where
dst: remote,
packet_start,
packet_len,
buffer: buffer.inner,
buffer,
})),
PeerSocket::Relay { relay, dest: peer } => {
let Some(allocation) = self.allocations.get(&relay) else {
@@ -485,7 +490,7 @@ where
return Ok(None);
};
let Some(enc_packet) =
allocation.encode_to_encrypted_packet(peer, buffer.inner, packet_end, now)
allocation.encode_to_encrypted_packet(peer, buffer, packet_end, now)
else {
tracing::warn!(%peer, "No channel");
return Ok(None);
@@ -1473,31 +1478,12 @@ pub enum Event<TId> {
ConnectionClosed(TId),
}
struct EncryptBuffer {
inner: [u8; MAX_DATAGRAM_PAYLOAD],
}
impl EncryptBuffer {
fn new() -> Self {
Self {
inner: [0u8; MAX_DATAGRAM_PAYLOAD],
}
}
}
impl Default for EncryptBuffer {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct EncryptedPacket {
pub(crate) src: Option<SocketAddr>,
pub(crate) dst: SocketAddr,
pub(crate) packet_start: usize,
pub(crate) packet_len: usize,
pub(crate) buffer: [u8; MAX_DATAGRAM_PAYLOAD],
pub(crate) buffer: lockfree_object_pool::SpinLockOwnedReusable<Vec<u8>>,
}
impl EncryptedPacket {

View File

@@ -13,6 +13,7 @@ proptest = ["dep:proptest"]
[dependencies]
anyhow = { workspace = true }
etherparse = { workspace = true }
lockfree-object-pool = { workspace = true }
proptest = { workspace = true, optional = true }
thiserror = { workspace = true }
tracing = { workspace = true }

View File

@@ -0,0 +1,96 @@
use std::{
ops::{Deref, DerefMut},
sync::{Arc, LazyLock},
};
use crate::MAX_DATAGRAM_PAYLOAD;
type BufferPool = Arc<lockfree_object_pool::MutexObjectPool<Vec<u8>>>;
static BUFFER_POOL: LazyLock<BufferPool> = LazyLock::new(|| {
Arc::new(lockfree_object_pool::MutexObjectPool::new(
|| vec![0; MAX_DATAGRAM_PAYLOAD],
|v| v.fill(0),
))
});
pub struct Buffer(lockfree_object_pool::MutexOwnedReusable<Vec<u8>>);
impl Clone for Buffer {
fn clone(&self) -> Self {
let mut copy = Buffer::default();
copy.0.resize(self.len(), 0);
copy.copy_from_slice(self);
copy
}
}
impl PartialEq for Buffer {
fn eq(&self, other: &Self) -> bool {
self.as_ref() == other.as_ref()
}
}
impl std::fmt::Debug for Buffer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("Buffer").finish()
}
}
impl Deref for Buffer {
type Target = [u8];
fn deref(&self) -> &Self::Target {
&self.0[..]
}
}
impl DerefMut for Buffer {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0[..]
}
}
impl Default for Buffer {
fn default() -> Self {
Self(BUFFER_POOL.pull_owned())
}
}
impl Drop for Buffer {
fn drop(&mut self) {
debug_assert_eq!(
self.0.capacity(),
MAX_DATAGRAM_PAYLOAD,
"Buffer should never re-allocate"
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn buffer_can_be_cloned() {
let mut buffer = Buffer::default();
buffer[..11].copy_from_slice(b"hello world");
let buffer2 = buffer.clone();
assert_eq!(&buffer2[..], &buffer[..]);
}
#[test]
fn cloned_buffer_owns_its_own_memory() {
let mut buffer = Buffer::default();
buffer[..11].copy_from_slice(b"hello world");
let buffer2 = buffer.clone();
drop(buffer);
assert_eq!(&buffer2[..11], b"hello world");
}
}

View File

@@ -2,6 +2,7 @@
pub mod make;
mod buffer_pool;
mod fz_p2p_control;
mod fz_p2p_control_slice;
mod icmp_dest_unreachable;
@@ -18,6 +19,7 @@ mod slice_utils;
mod tcp_header_slice_mut;
mod udp_header_slice_mut;
use buffer_pool::Buffer;
pub use etherparse::*;
pub use fz_p2p_control::EventType as FzP2pEventType;
pub use fz_p2p_control_slice::FzP2pControlSlice;
@@ -101,15 +103,14 @@ pub enum Layer4Protocol {
}
/// A buffer for reading a new [`IpPacket`] from the network.
#[derive(Default)]
pub struct IpPacketBuf {
inner: [u8; MAX_DATAGRAM_PAYLOAD],
inner: Buffer,
}
impl IpPacketBuf {
pub fn new() -> Self {
Self {
inner: [0u8; MAX_DATAGRAM_PAYLOAD],
}
Self::default()
}
pub fn buf(&mut self) -> &mut [u8] {
@@ -117,12 +118,6 @@ impl IpPacketBuf {
}
}
impl Default for IpPacketBuf {
fn default() -> Self {
Self::new()
}
}
#[derive(PartialEq, Clone)]
pub enum IpPacket {
Ipv4(ConvertibleIpv4Packet),
@@ -168,7 +163,7 @@ impl std::fmt::Debug for IpPacket {
#[derive(Debug, PartialEq, Clone)]
pub struct ConvertibleIpv4Packet {
buf: [u8; MAX_DATAGRAM_PAYLOAD],
buf: Buffer,
start: usize,
len: usize,
}
@@ -248,7 +243,7 @@ impl ConvertibleIpv4Packet {
#[derive(Debug, PartialEq, Clone)]
pub struct ConvertibleIpv6Packet {
buf: [u8; MAX_DATAGRAM_PAYLOAD],
buf: Buffer,
start: usize,
len: usize,
}