diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 418b29e0f..91555ac40 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -998,7 +998,7 @@ name = "bufferpool" version = "0.1.0" dependencies = [ "bytes", - "lockfree-object-pool", + "crossbeam-queue", "opentelemetry", "opentelemetry_sdk", "tokio", @@ -1589,6 +1589,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -4173,12 +4182,6 @@ dependencies = [ "scopeguard", ] -[[package]] -name = "lockfree-object-pool" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9374ef4228402d4b7e403e5838cb880d9ee663314b0a900d5a6aabf0c213552e" - [[package]] name = "log" version = "0.4.27" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index cca6f9f5a..ba6863ea8 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -63,6 +63,7 @@ chrono = { version = "0.4", default-features = false, features = ["std", "clock" clap = "4.5.41" client-shared = { path = "client-shared" } connlib-model = { path = "connlib/model" } +crossbeam-queue = "0.3.12" dashmap = "6.1.0" derive_more = "2.0.1" difference = "2.0.0" @@ -101,7 +102,6 @@ l3-tcp = { path = "connlib/l3-tcp" } l4-tcp-dns-server = { path = "connlib/l4-tcp-dns-server" } l4-udp-dns-server = { path = "connlib/l4-udp-dns-server" } libc = "0.2.174" -lockfree-object-pool = "0.1.6" log = "0.4" lru = "0.12.5" mio = "1.0.4" diff --git a/rust/connlib/bufferpool/Cargo.toml b/rust/connlib/bufferpool/Cargo.toml index e9ec33581..2a86299c8 100644 --- a/rust/connlib/bufferpool/Cargo.toml +++ b/rust/connlib/bufferpool/Cargo.toml @@ -9,7 +9,7 @@ path = "lib.rs" [dependencies] bytes = { workspace = true } -lockfree-object-pool = { workspace = true } +crossbeam-queue = { workspace = true } opentelemetry = { workspace = true, features = ["metrics"] } tracing = { workspace = true } diff --git a/rust/connlib/bufferpool/lib.rs b/rust/connlib/bufferpool/lib.rs index f7e09c7cd..ebf1ce290 100644 --- a/rust/connlib/bufferpool/lib.rs +++ b/rust/connlib/bufferpool/lib.rs @@ -6,11 +6,26 @@ use std::{ }; use bytes::BytesMut; +use crossbeam_queue::SegQueue; use opentelemetry::{KeyValue, metrics::UpDownCounter}; -#[derive(Clone)] +/// A lock-free pool of buffers that are all equal in size. +/// +/// The buffers are stored in a queue ([`SegQueue`]) and taken from the front and push to the back. +/// This minimizes contention even under high load where buffers are constantly needed and returned. pub struct BufferPool { - inner: Arc>>, + inner: Arc>>, + + new_buffer_fn: Arc BufferStorage + Send + Sync>, +} + +impl Clone for BufferPool { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + new_buffer_fn: self.new_buffer_fn.clone(), + } + } } impl BufferPool @@ -25,26 +40,28 @@ where .build(); Self { - inner: Arc::new(lockfree_object_pool::SpinLockObjectPool::new( - move || { - BufferStorage::new( - B::with_capacity(capacity), - buffer_counter.clone(), - [ - KeyValue::new("system.buffer.pool.name", tag), - KeyValue::new("system.buffer.pool.buffer_size", capacity as i64), - ], - ) - }, - |_| {}, - )), + inner: Arc::new(SegQueue::new()), + + // TODO: It would be nice to eventually create a fixed amount of buffers upfront. + // This however means that getting a buffer can fail which would require us to implement back-pressure. + new_buffer_fn: Arc::new(move || { + BufferStorage::new( + B::with_capacity(capacity), + buffer_counter.clone(), + [ + KeyValue::new("system.buffer.pool.name", tag), + KeyValue::new("system.buffer.pool.buffer_size", capacity as i64), + ], + ) + }), } } pub fn pull(&self) -> Buffer { Buffer { - inner: self.inner.pull_owned(), + inner: Some(self.inner.pop().unwrap_or_else(|| (self.new_buffer_fn)())), pool: self.inner.clone(), + new_buffer_fn: self.new_buffer_fn.clone(), } } } @@ -65,8 +82,10 @@ where } pub struct Buffer { - inner: lockfree_object_pool::SpinLockOwnedReusable>, - pool: Arc>>, + inner: Option>, + + pool: Arc>>, + new_buffer_fn: Arc BufferStorage + Send + Sync>, } impl Buffer> { @@ -74,7 +93,7 @@ impl Buffer> { pub fn shift_start_right(&mut self, num: usize) -> Vec { let num_to_end = self.split_off(num); - std::mem::replace(&mut self.inner.inner, num_to_end) + std::mem::replace(self.storage_mut(), num_to_end) } /// Shifts the start of the buffer to the left by N bytes, returning a slice to the added bytes at the front of the buffer. @@ -88,18 +107,33 @@ impl Buffer> { } } +impl Buffer { + fn storage(&self) -> &BufferStorage { + self.inner + .as_ref() + .expect("should always have buffer storage until dropped") + } + + fn storage_mut(&mut self) -> &mut BufferStorage { + self.inner + .as_mut() + .expect("should always have buffer storage until dropped") + } +} + impl Clone for Buffer where B: Buf, { fn clone(&self) -> Self { - let mut copy = self.pool.pull_owned(); + let mut copy = self.pool.pop().unwrap_or_else(|| (self.new_buffer_fn)()); - self.inner.inner.clone(&mut copy); + self.storage().inner.clone(&mut copy); Self { - inner: copy, + inner: Some(copy), pool: self.pool.clone(), + new_buffer_fn: self.new_buffer_fn.clone(), } } } @@ -143,13 +177,21 @@ impl Deref for Buffer { type Target = B; fn deref(&self) -> &Self::Target { - self.inner.deref() + self.storage().deref() } } impl DerefMut for Buffer { fn deref_mut(&mut self) -> &mut Self::Target { - self.inner.deref_mut() + self.storage_mut().deref_mut() + } +} + +impl Drop for Buffer { + fn drop(&mut self) { + let buffer_storage = self.inner.take().expect("should have storage in `Drop`"); + + self.pool.push(buffer_storage); } }