diff --git a/rust/connlib/tunnel/src/lib.rs b/rust/connlib/tunnel/src/lib.rs index 3b85d0920..db3b4eb9b 100644 --- a/rust/connlib/tunnel/src/lib.rs +++ b/rust/connlib/tunnel/src/lib.rs @@ -12,7 +12,6 @@ use device_channel::Device; use futures_util::{future::BoxFuture, task::AtomicWaker, FutureExt}; use peer::PacketTransform; use peer_store::PeerStore; -use pnet_packet::Packet; use snownet::{Node, Server}; use sockets::{Received, Sockets}; use std::{ @@ -325,8 +324,6 @@ where } }; - tracing::trace!(target: "wire", %local, %from, bytes = %packet.packet().len(), "read new packet"); - let Some(peer) = peer_store.get_mut(&conn_id) else { tracing::error!(%conn_id, %local, %from, "Couldn't find connection"); diff --git a/rust/connlib/tunnel/src/sockets.rs b/rust/connlib/tunnel/src/sockets.rs index a9b1e3448..726c5f9e7 100644 --- a/rust/connlib/tunnel/src/sockets.rs +++ b/rust/connlib/tunnel/src/sockets.rs @@ -81,8 +81,6 @@ impl Sockets { } pub fn try_send(&mut self, transmit: &Transmit) -> Result { - tracing::trace!(target: "wire", action = "write", to = %transmit.dst, src = ?transmit.src, bytes = %transmit.payload.len()); - match transmit.dst { SocketAddr::V4(_) => { let socket = self.socket_v4.as_ref().ok_or(Error::NoIpv4)?; @@ -99,15 +97,59 @@ impl Sockets { &'a mut self, cx: &mut Context<'_>, ) -> Poll>>> { - if let Some(Poll::Ready(packet)) = self.socket_v4.as_mut().map(|s| s.poll_recv_from(cx)) { - return Poll::Ready(packet); + let mut iter = PacketIter::new(); + + if let Some(Poll::Ready(packets)) = self.socket_v4.as_mut().map(|s| s.poll_recv_from(cx)) { + iter.ip4 = Some(packets?); } - if let Some(Poll::Ready(packet)) = self.socket_v6.as_mut().map(|s| s.poll_recv_from(cx)) { - return Poll::Ready(packet); + if let Some(Poll::Ready(packets)) = self.socket_v6.as_mut().map(|s| s.poll_recv_from(cx)) { + iter.ip6 = Some(packets?); } - Poll::Pending + if iter.is_empty() { + return Poll::Pending; + } + + Poll::Ready(Ok(iter)) + } +} + +struct PacketIter { + ip4: Option, + ip6: Option, +} + +impl PacketIter { + fn new() -> Self { + Self { + ip4: None, + ip6: None, + } + } + + fn is_empty(&self) -> bool { + self.ip4.is_none() && self.ip6.is_none() + } +} + +impl<'a, T4, T6> Iterator for PacketIter +where + T4: Iterator>, + T6: Iterator>, +{ + type Item = Received<'a>; + + fn next(&mut self) -> Option { + if let Some(packet) = self.ip4.as_mut().and_then(|i| i.next()) { + return Some(packet); + } + + if let Some(packet) = self.ip6.as_mut().and_then(|i| i.next()) { + return Some(packet); + } + + None } } @@ -183,13 +225,18 @@ impl Socket { let local = SocketAddr::new(local_ip, *port); - return Poll::Ready(Ok(buffer[..meta.len].chunks(meta.stride).map( - move |packet| Received { + let iter = buffer[..meta.len] + .chunks(meta.stride) + .map(move |packet| Received { local, from: meta.addr, packet, - }, - ))); + }) + .inspect(|r| { + tracing::trace!(target: "wire", from = "network", src = %r.from, dst = %r.local, num_bytes = %r.packet.len()); + }); + + return Poll::Ready(Ok(iter)); } } } @@ -204,6 +251,8 @@ impl Socket { dest: SocketAddr, buf: &[u8], ) -> io::Result { + tracing::trace!(target: "wire", to = "network", src = ?local, dst = %dest, num_bytes = %buf.len()); + self.state.send( (&self.socket).into(), &[quinn_udp::Transmit {