diff --git a/rust/connlib/clients/shared/src/eventloop.rs b/rust/connlib/clients/shared/src/eventloop.rs index d9cd5ccbb..fc821164f 100644 --- a/rust/connlib/clients/shared/src/eventloop.rs +++ b/rust/connlib/clients/shared/src/eventloop.rs @@ -179,6 +179,7 @@ where IngressMessages::Init(InitClient { interface, resources, + relays, }) => { if let Err(e) = self.tunnel.set_new_interface_config(interface) { tracing::warn!("Failed to set interface on tunnel: {e}"); @@ -187,6 +188,7 @@ where tracing::info!("Firezone Started!"); let _ = self.tunnel.set_resources(resources); + self.tunnel.upsert_relays(relays) } IngressMessages::ResourceCreatedOrUpdated(resource) => { let resource_id = resource.id(); diff --git a/rust/connlib/clients/shared/src/messages.rs b/rust/connlib/clients/shared/src/messages.rs index 5c78b6dd0..146ee917e 100644 --- a/rust/connlib/clients/shared/src/messages.rs +++ b/rust/connlib/clients/shared/src/messages.rs @@ -10,6 +10,8 @@ pub struct InitClient { pub interface: Interface, #[serde(default)] pub resources: Vec, + #[serde(default)] + pub relays: Vec, } #[derive(Debug, PartialEq, Eq, Deserialize, Clone)] @@ -196,6 +198,7 @@ mod test { name: "gitlab.mycorp.com".to_string(), }), ], + relays: vec![], }), None, ); @@ -254,6 +257,7 @@ mod test { name: "gitlab.mycorp.com".to_string(), }), ], + relays: vec![], }), None, ); @@ -304,6 +308,7 @@ mod test { upstream_dns: vec![], }, resources: vec![], + relays: vec![], }), None, ); @@ -337,6 +342,7 @@ mod test { upstream_dns: vec![], }, resources: vec![], + relays: vec![], }), None, ); @@ -370,6 +376,7 @@ mod test { upstream_dns: vec![], }, resources: vec![], + relays: vec![], }), None, ); @@ -403,6 +410,7 @@ mod test { upstream_dns: vec![], }, resources: vec![], + relays: vec![], }), None, ); @@ -460,18 +468,22 @@ mod test { resource_id: "f16ecfa0-a94f-4bfd-a2ef-1cc1f2ef3da3".parse().unwrap(), relays: vec![ Relay::Stun(Stun { + id: "c9cb8892-e355-41e6-a882-b6d6c38beb66".parse().unwrap(), addr: "189.172.73.111:3478".parse().unwrap(), }), Relay::Turn(Turn { + id: "6a7f3ba9-d9c4-4633-81ab-311276993fbd".parse().unwrap(), expires_at: DateTime::from_timestamp(1686629954, 0).unwrap(), addr: "189.172.73.111:3478".parse().unwrap(), username: "1686629954:C7I74wXYFdFugMYM".to_string(), password: "OXXRDJ7lJN1cm+4+2BWgL87CxDrvpVrn5j3fnJHye98".to_string(), }), Relay::Stun(Stun { + id: "1ea93681-aeda-467f-9dca-219c06c18c3d".parse().unwrap(), addr: "[::1]:3478".parse().unwrap(), }), Relay::Turn(Turn { + id: "94209389-e18d-4453-a00d-2583ba857592".parse().unwrap(), expires_at: DateTime::from_timestamp(1686629954, 0).unwrap(), addr: "[::1]:3478".parse().unwrap(), username: "1686629954:dpHxHfNfOhxPLfMG".to_string(), @@ -493,10 +505,12 @@ mod test { "gateway_remote_ip": "172.28.0.1", "relays": [ { + "id": "c9cb8892-e355-41e6-a882-b6d6c38beb66", "type":"stun", "addr": "189.172.73.111:3478" }, { + "id": "6a7f3ba9-d9c4-4633-81ab-311276993fbd", "expires_at": 1686629954, "password": "OXXRDJ7lJN1cm+4+2BWgL87CxDrvpVrn5j3fnJHye98", "type": "turn", @@ -504,10 +518,12 @@ mod test { "username":"1686629954:C7I74wXYFdFugMYM" }, { + "id": "1ea93681-aeda-467f-9dca-219c06c18c3d", "type": "stun", "addr": "[::1]:3478" }, { + "id": "94209389-e18d-4453-a00d-2583ba857592", "expires_at": 1686629954, "password": "8Wtb+3YGxO6ia23JUeSEfZ2yFD6RhGLkbgZwqjebyKY", "type": "turn", diff --git a/rust/connlib/shared/src/messages.rs b/rust/connlib/shared/src/messages.rs index 9cb9d36f5..c2496c54d 100644 --- a/rust/connlib/shared/src/messages.rs +++ b/rust/connlib/shared/src/messages.rs @@ -18,9 +18,21 @@ use crate::Dname; #[derive(Hash, Debug, Deserialize, Serialize, Clone, Copy, PartialEq, Eq)] pub struct GatewayId(Uuid); + #[derive(Hash, Debug, Deserialize, Serialize, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] pub struct ResourceId(Uuid); +#[derive(Hash, Debug, Deserialize, Serialize, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub struct RelayId(Uuid); + +impl FromStr for RelayId { + type Err = uuid::Error; + + fn from_str(s: &str) -> Result { + Ok(RelayId(Uuid::parse_str(s)?)) + } +} + impl ResourceId { pub fn random() -> ResourceId { ResourceId(Uuid::new_v4()) @@ -68,6 +80,12 @@ impl fmt::Display for GatewayId { } } +impl fmt::Display for RelayId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + /// Represents a wireguard peer. #[derive(Debug, Deserialize, Serialize, Clone)] pub struct Peer { @@ -365,6 +383,7 @@ pub enum Relay { /// Represent a TURN relay #[derive(Debug, Deserialize, Clone, PartialEq, Eq)] pub struct Turn { + pub id: RelayId, //// Expire time of the username/password in unix millisecond timestamp UTC #[serde(with = "ts_seconds")] pub expires_at: DateTime, @@ -380,6 +399,8 @@ pub struct Turn { /// Stun kind of relay #[derive(Debug, Deserialize, Clone, PartialEq, Eq)] pub struct Stun { + pub id: RelayId, + /// Address for the relay pub addr: SocketAddr, } diff --git a/rust/connlib/snownet/src/allocation.rs b/rust/connlib/snownet/src/allocation.rs index 60aa77786..3dceba1a0 100644 --- a/rust/connlib/snownet/src/allocation.rs +++ b/rust/connlib/snownet/src/allocation.rs @@ -6,6 +6,7 @@ use crate::{ }; use ::backoff::backoff::Backoff; use bytecodec::{DecodeExt as _, EncodeExt as _}; +use core::fmt; use rand::random; use std::{ collections::{HashMap, VecDeque}, @@ -35,7 +36,9 @@ const REQUEST_TIMEOUT: Duration = Duration::from_secs(1); /// /// Allocations have a lifetime and need to be continuously refreshed to stay active. #[derive(Debug)] -pub struct Allocation { +pub struct Allocation { + id: RId, + server: SocketAddr, /// If present, the last address the relay observed for us. @@ -70,16 +73,19 @@ pub struct Allocation { /// Note that any combination of IP versions is possible here. /// We might have allocated an IPv6 address on a TURN server that we are talking to IPv4 and vice versa. #[derive(Debug, Clone, Copy)] -pub struct Socket { - /// The server this socket was allocated on. - server: SocketAddr, +pub struct Socket { + /// The ID of the relay. + id: RId, /// The address of the socket that was allocated. address: SocketAddr, } -impl Socket { - pub fn server(&self) -> SocketAddr { - self.server +impl Socket +where + RId: Copy, +{ + pub fn id(&self) -> RId { + self.id } pub fn address(&self) -> SocketAddr { @@ -87,8 +93,12 @@ impl Socket { } } -impl Allocation { +impl Allocation +where + RId: Copy + fmt::Debug, +{ pub fn new( + id: RId, server: SocketAddr, username: Username, password: String, @@ -96,6 +106,7 @@ impl Allocation { now: Instant, ) -> Self { let mut allocation = Self { + id, server, last_srflx_candidate: Default::default(), ip4_allocation: Default::default(), @@ -134,11 +145,13 @@ impl Allocation { /// This will implicitly trigger a [`refresh`](Allocation::refresh) to ensure these credentials are valid. pub fn update_credentials( &mut self, + socket: SocketAddr, username: Username, password: &str, realm: Realm, now: Instant, ) { + self.server = socket; self.username = username; self.realm = realm; self.password = password.to_owned(); @@ -392,7 +405,7 @@ impl Allocation { from: SocketAddr, packet: &'p [u8], now: Instant, - ) -> Option<(SocketAddr, &'p [u8], Socket)> { + ) -> Option<(SocketAddr, &'p [u8], Socket)> { if from != self.server { return None; } @@ -534,7 +547,7 @@ impl Allocation { } pub fn encode_to_slice( - &mut self, + &self, peer: SocketAddr, packet: &[u8], header: &mut [u8], @@ -595,24 +608,28 @@ impl Allocation { is_ip4 || is_ip6 } - pub fn ip4_socket(&self) -> Option { + pub fn server(&self) -> SocketAddr { + self.server + } + + pub fn ip4_socket(&self) -> Option> { let address = self.ip4_allocation.as_ref().map(|c| c.addr())?; debug_assert!(address.is_ipv4()); Some(Socket { - server: self.server, + id: self.id, address, }) } - pub fn ip6_socket(&self) -> Option { + pub fn ip6_socket(&self) -> Option> { let address = self.ip6_allocation.as_ref().map(|c| c.addr())?; debug_assert!(address.is_ipv6()); Some(Socket { - server: self.server, + id: self.id, address, }) } @@ -1080,6 +1097,7 @@ mod tests { const PEER2_IP6: SocketAddr = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 20000); const RELAY: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3478); + const RELAY2: SocketAddr = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 3478); const RELAY_ADDR_IP4: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9999); const RELAY_ADDR_IP6: SocketAddr = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 9999); @@ -1909,6 +1927,23 @@ mod tests { ) } + #[test] + fn new_address_is_used_for_new_messages() { + let now = Instant::now(); + let mut allocation = Allocation::for_test(now).with_allocate_response(&[RELAY_ADDR_IP4]); + let _drained_messages = iter::from_fn(|| allocation.poll_transmit()).collect::>(); + + allocation.update_credentials( + RELAY2, + allocation.username.clone(), + &allocation.password.clone(), + allocation.realm.clone(), + now, + ); + + assert_eq!(allocation.poll_transmit().unwrap().dst, RELAY2) + } + fn ch(peer: SocketAddr, now: Instant) -> Channel { Channel { peer, @@ -2007,9 +2042,10 @@ mod tests { message.get_attribute::().unwrap().address() } - impl Allocation { - fn for_test(start: Instant) -> Allocation { + impl Allocation { + fn for_test(start: Instant) -> Self { Allocation::new( + 1, RELAY, Username::new("foobar".to_owned()).unwrap(), "baz".to_owned(), @@ -2044,6 +2080,7 @@ mod tests { fn refresh_with_same_credentials(&mut self) { self.update_credentials( + self.server, Username::new("foobar".to_owned()).unwrap(), "baz", Realm::new("firezone".to_owned()).unwrap(), diff --git a/rust/connlib/snownet/src/node.rs b/rust/connlib/snownet/src/node.rs index 2ce363a23..4ce77a5f2 100644 --- a/rust/connlib/snownet/src/node.rs +++ b/rust/connlib/snownet/src/node.rs @@ -43,9 +43,9 @@ const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(20); const MAX_UDP_SIZE: usize = (1 << 16) - 1; /// Manages a set of wireguard connections for a server. -pub type ServerNode = Node; +pub type ServerNode = Node; /// Manages a set of wireguard connections for a client. -pub type ClientNode = Node; +pub type ClientNode = Node; pub enum Server {} pub enum Client {} @@ -72,7 +72,14 @@ pub enum Client {} /// 1. Change [`Node`]'s state (either via network messages, adding a new connection, etc) /// 2. Check [`Node::poll_timeout`] for when to wake the [`Node`] /// 3. Call [`Node::handle_timeout`] once that time is reached -pub struct Node { +/// +/// A [`Node`] is generic over three things: +/// - `T`: The mode it is operating in, either [`Client`] or [`Server`]. +/// - `TId`: The type to use for uniquely identifying connections. +/// - `RId`: The type to use for uniquely identifying relays. +/// +/// We favor these generic parameters over having our own IDs to avoid mapping back and forth in upper layers. +pub struct Node { private_key: StaticSecret, index: IndexLfsr, rate_limiter: Arc, @@ -82,9 +89,9 @@ pub struct Node { next_rate_limiter_reset: Option, bindings: HashMap, - allocations: HashMap, + allocations: HashMap>, - connections: Connections, + connections: Connections, pending_events: VecDeque>, buffer: Box<[u8; MAX_UDP_SIZE]>, @@ -112,9 +119,10 @@ pub enum Error { BadLocalAddress(#[from] str0m::error::IceError), } -impl Node +impl Node where TId: Eq + Hash + Copy + fmt::Display, + RId: Copy + Eq + Hash + PartialEq + fmt::Debug + fmt::Display, { pub fn new(private_key: StaticSecret) -> Self { let public_key = &(&private_key).into(); @@ -230,7 +238,7 @@ where /// To do that, we need to check all candidates of each allocation and compare their IP. /// The same relay might be reachable over IPv4 and IPv6. #[must_use] - fn same_relay_as_peer(&mut self, candidate: &Candidate) -> Option<&mut Allocation> { + fn same_relay_as_peer(&mut self, candidate: &Candidate) -> Option<&mut Allocation> { self.allocations.iter_mut().find_map(|(_, allocation)| { allocation .current_candidates() @@ -322,7 +330,7 @@ where payload: Cow::Borrowed(packet), })), PeerSocket::Relay { relay, dest: peer } => { - let Some(allocation) = self.allocations.get_mut(&relay) else { + let Some(allocation) = self.allocations.get(&relay) else { tracing::warn!(%relay, "No allocation"); return Ok(None); }; @@ -339,7 +347,7 @@ where Ok(Some(Transmit { src: None, - dst: relay, + dst: allocation.server(), payload: Cow::Borrowed(channel_data_packet), })) } @@ -440,6 +448,35 @@ where self.buffered_transmits.pop_front() } + pub fn upsert_turn_servers( + &mut self, + servers: &HashSet<(RId, SocketAddr, String, String, String)>, + now: Instant, + ) { + for (id, server, username, password, realm) in servers { + let Ok(username) = Username::new(username.to_owned()) else { + tracing::debug!(%username, "Invalid TURN username"); + continue; + }; + let Ok(realm) = Realm::new(realm.to_owned()) else { + tracing::debug!(%realm, "Invalid TURN realm"); + continue; + }; + + if let Some(existing) = self.allocations.get_mut(id) { + existing.update_credentials(*server, username, password, realm, now); + continue; + } + + self.allocations.insert( + *id, + Allocation::new(*id, *server, username, password.clone(), realm, now), + ); + + tracing::info!(address = %server, "Added new TURN server"); + } + } + #[must_use] #[allow(clippy::too_many_arguments)] fn init_connection( @@ -449,7 +486,7 @@ where key: [u8; 32], intent_sent_at: Instant, now: Instant, - ) -> Connection { + ) -> Connection { agent.handle_timeout(now); /// We set a Wireguard keep-alive to ensure the WG session doesn't timeout on an idle connection. @@ -523,23 +560,36 @@ where } /// Tries to handle the packet using one of our [`Allocation`]s. + /// + /// This function is in the hot-path of packet processing and thus must be as efficient as possible. + /// Even look-ups in [`HashMap`]s and linear searches across small lists are expensive at this point. + /// Thus, we use the first byte of the message as a heuristic for whether we should attempt to handle it here. + /// + /// See for details on de-multiplexing. + /// + /// This heuristic might fail because we are also handling wireguard packets. + /// Those are fully encrypted and thus any byte pattern may appear at the front of the packet. + /// We can detect this by further checking the origin of the packet. #[must_use] + #[allow(clippy::type_complexity)] fn allocations_try_handle<'p>( &mut self, from: SocketAddr, local: SocketAddr, packet: &'p [u8], now: Instant, - ) -> ControlFlow<(), (SocketAddr, &'p [u8], Option)> { - // First, check whether the packet is from a known allocation. - let Some(allocation) = self.allocations.get_mut(&from) else { - return ControlFlow::Continue((from, packet, None)); - }; - - // See for details on de-multiplexing. - match packet.first() { + ) -> ControlFlow<(), (SocketAddr, &'p [u8], Option>)> { + match packet.first().copied() { + // STUN method range Some(0..=3) => { + let Some(allocation) = self.allocations.values_mut().find(|a| a.server() == from) + else { + // False-positive, continue processing packet elsewhere + return ControlFlow::Continue((from, packet, None)); + }; + if allocation.handle_input(from, local, packet, now) { + // Successfully handled the packet return ControlFlow::Break(()); } @@ -547,8 +597,17 @@ where ControlFlow::Break(()) // Stop processing the packet. } + // Channel data number range Some(64..=79) => { + let Some(allocation) = self.allocations.values_mut().find(|a| a.server() == from) + else { + // False-positive, continue processing packet elsewhere + return ControlFlow::Continue((from, packet, None)); + }; + if let Some((from, packet, socket)) = allocation.decapsulate(from, packet, now) { + // Successfully handled the packet and decapsulated the channel data message. + // Continue processing with the _unwrapped_ packet. return ControlFlow::Continue((from, packet, Some(socket))); } @@ -556,7 +615,8 @@ where ControlFlow::Break(()) // Stop processing the packet. } - _ => ControlFlow::Continue((from, packet, None)), + // Byte is in a different range? Move on with processing the packet. + Some(_) | None => ControlFlow::Continue((from, packet, None)), } } @@ -601,7 +661,7 @@ where from: SocketAddr, local: SocketAddr, packet: &[u8], - relayed: Option, + relayed: Option>, buffer: &'b mut [u8], now: Instant, ) -> ControlFlow, (TId, MutableIpPacket<'b>)> { @@ -676,9 +736,10 @@ where } } -impl Node +impl Node where TId: Eq + Hash + Copy + fmt::Display, + RId: Copy + Eq + Hash + PartialEq + fmt::Debug + fmt::Display, { /// Create a new connection indexed by the given ID. /// @@ -690,7 +751,7 @@ where &mut self, id: TId, stun_servers: HashSet, - turn_servers: HashSet<(SocketAddr, String, String, String)>, + turn_servers: HashSet<(RId, SocketAddr, String, String, String)>, intent_sent_at: Instant, now: Instant, ) -> Offer { @@ -775,9 +836,10 @@ where } } -impl Node +impl Node where TId: Eq + Hash + Copy + fmt::Display, + RId: Copy + Eq + Hash + PartialEq + fmt::Debug + fmt::Display, { /// Accept a new connection indexed by the given ID. /// @@ -791,7 +853,7 @@ where offer: Offer, remote: PublicKey, stun_servers: HashSet, - turn_servers: HashSet<(SocketAddr, String, String, String)>, + turn_servers: HashSet<(RId, SocketAddr, String, String, String)>, now: Instant, ) -> Answer { debug_assert!( @@ -838,9 +900,10 @@ where } } -impl Node +impl Node where TId: Eq + Hash + Copy + fmt::Display, + RId: Copy + Eq + Hash + PartialEq + fmt::Debug + fmt::Display, { fn upsert_stun_servers(&mut self, servers: &HashSet, now: Instant) { for server in servers { @@ -853,35 +916,6 @@ where } } - fn upsert_turn_servers( - &mut self, - servers: &HashSet<(SocketAddr, String, String, String)>, - now: Instant, - ) { - for (server, username, password, realm) in servers { - let Ok(username) = Username::new(username.to_owned()) else { - tracing::debug!(%username, "Invalid TURN username"); - continue; - }; - let Ok(realm) = Realm::new(realm.to_owned()) else { - tracing::debug!(%realm, "Invalid TURN realm"); - continue; - }; - - if let Some(existing) = self.allocations.get_mut(server) { - existing.update_credentials(username, password, realm, now); - continue; - } - - self.allocations.insert( - *server, - Allocation::new(*server, username, password.clone(), realm, now), - ); - - tracing::info!(address = %server, "Added new TURN server"); - } - } - fn seed_agent_with_local_candidates(&mut self, connection: TId, agent: &mut IceAgent) { for candidate in self.host_candidates.iter().cloned() { add_local_candidate(connection, agent, candidate, &mut self.pending_events); @@ -915,12 +949,12 @@ where } } -struct Connections { +struct Connections { initial: HashMap, - established: HashMap, + established: HashMap>, } -impl Default for Connections { +impl Default for Connections { fn default() -> Self { Self { initial: Default::default(), @@ -929,7 +963,7 @@ impl Default for Connections { } } -impl Connections +impl Connections where TId: Eq + Hash + Copy + fmt::Display, { @@ -974,15 +1008,15 @@ where initial_agents.chain(negotiated_agents) } - fn get_established_mut(&mut self, id: &TId) -> Option<&mut Connection> { + fn get_established_mut(&mut self, id: &TId) -> Option<&mut Connection> { self.established.get_mut(id) } - fn iter_established(&self) -> impl Iterator { + fn iter_established(&self) -> impl Iterator)> { self.established.iter().map(|(id, conn)| (*id, conn)) } - fn iter_established_mut(&mut self) -> impl Iterator { + fn iter_established_mut(&mut self) -> impl Iterator)> { self.established.iter_mut().map(|(id, conn)| (*id, conn)) } @@ -996,13 +1030,16 @@ where /// - `relay` is in fact a relay /// - We have an allocation on the relay /// - There is a channel bound to the provided peer -fn encode_as_channel_data( - relay: SocketAddr, +fn encode_as_channel_data( + relay: RId, dest: SocketAddr, contents: &[u8], - allocations: &mut HashMap, + allocations: &mut HashMap>, now: Instant, -) -> Result, EncodeError> { +) -> Result, EncodeError> +where + RId: Copy + Eq + Hash + PartialEq + fmt::Debug, +{ let allocation = allocations .get_mut(&relay) .ok_or(EncodeError::NoAllocation)?; @@ -1012,7 +1049,7 @@ fn encode_as_channel_data( Ok(Transmit { src: None, - dst: relay, + dst: allocation.server(), payload: Cow::Owned(payload), }) } @@ -1023,9 +1060,9 @@ enum EncodeError { NoChannel, } -fn add_local_candidate_to_all( +fn add_local_candidate_to_all( candidate: Candidate, - connections: &mut Connections, + connections: &mut Connections, pending_events: &mut VecDeque>, ) where TId: Copy + fmt::Display, @@ -1156,7 +1193,7 @@ impl InitialConnection { } } -struct Connection { +struct Connection { agent: IceAgent, remote_pub_key: PublicKey, @@ -1165,7 +1202,7 @@ struct Connection { next_timer_update: Instant, // When this is `Some`, we are connected. - peer_socket: Option, + peer_socket: Option>, // Socket addresses from which we might receive data (even before we are connected). possible_sockets: HashSet, @@ -1181,27 +1218,21 @@ struct Connection { /// The socket of the peer we are connected to. #[derive(Debug, PartialEq, Clone, Copy)] -enum PeerSocket { +enum PeerSocket { Direct { source: SocketAddr, dest: SocketAddr, }, Relay { - relay: SocketAddr, + relay: RId, dest: SocketAddr, }, } -impl PeerSocket { - fn our_socket(&self) -> SocketAddr { - match self { - PeerSocket::Direct { source, .. } => *source, - PeerSocket::Relay { relay, .. } => *relay, - } - } -} - -impl Connection { +impl Connection +where + RId: PartialEq + Eq + Hash + fmt::Debug + Copy, +{ /// Checks if we want to accept a packet from a certain address. /// /// Whilst we establish connections, we may see traffic from a certain address, prior to the negotiation being fully complete. @@ -1229,11 +1260,11 @@ impl Connection { &mut self, local: SocketAddr, dest: SocketAddr, - relay_socket: Option, - ) -> PeerSocket { + relay_socket: Option>, + ) -> PeerSocket { let remote_socket = match relay_socket { Some(relay_socket) => PeerSocket::Relay { - relay: relay_socket.server(), + relay: relay_socket.id(), dest, }, None => PeerSocket::Direct { @@ -1272,10 +1303,11 @@ impl Connection { &mut self, id: TId, now: Instant, - allocations: &mut HashMap, + allocations: &mut HashMap>, transmits: &mut VecDeque>, ) where TId: fmt::Display + Copy, + RId: Copy + fmt::Display, { self.agent.handle_timeout(now); @@ -1375,7 +1407,7 @@ impl Connection { tracing::info!(old = ?self.peer_socket, new = ?remote_socket, duration_since_intent = ?self.duration_since_intent(now), "Updating remote socket"); self.peer_socket = Some(remote_socket); - self.invalidate_candiates(); + self.invalidate_candiates(allocations); self.force_handshake(allocations, transmits, now); } } @@ -1416,7 +1448,7 @@ impl Connection { transmits.push_back(Transmit { src: None, - dst: *relay, + dst: allocation.server(), payload: Cow::Owned(channel_data), }); } @@ -1445,9 +1477,9 @@ impl Connection { from: SocketAddr, local: SocketAddr, packet: &[u8], - relayed: Option, + relayed: Option>, buffer: &'b mut [u8], - allocations: &mut HashMap, + allocations: &mut HashMap>, transmits: &mut VecDeque>, now: Instant, ) -> ControlFlow, MutableIpPacket<'b>> { @@ -1500,10 +1532,12 @@ impl Connection { fn force_handshake( &mut self, - allocations: &mut HashMap, + allocations: &mut HashMap>, transmits: &mut VecDeque>, now: Instant, - ) { + ) where + RId: Copy, + { /// [`boringtun`] requires us to pass buffers in where it can construct its packets. /// /// When updating the timers, the largest packet that we may have to send is `148` bytes as per `HANDSHAKE_INIT_SZ` constant in [`boringtun`]. @@ -1529,12 +1563,17 @@ impl Connection { /// Each time we nominate a candidate pair, we don't really want to keep all the others active because it creates a lot of noise. /// At the same time, we want to retain trickle ICE and allow the ICE agent to find a _better_ pair, hence we invalidate by priority. #[tracing::instrument(level = "debug", skip_all, fields(nominated_prio))] - fn invalidate_candiates(&mut self) { - let Some(socket) = self.peer_socket else { - return; + fn invalidate_candiates(&mut self, allocations: &HashMap>) { + let socket = match self.peer_socket { + Some(PeerSocket::Direct { source, .. }) => source, + Some(PeerSocket::Relay { relay, .. }) => match allocations.get(&relay) { + Some(alloc) => alloc.server(), + None => return, + }, + None => return, }; - let Some(nominated) = self.local_candidate(socket.our_socket()).cloned() else { + let Some(nominated) = self.local_candidate(socket).cloned() else { return; }; @@ -1562,12 +1601,15 @@ impl Connection { } #[must_use] -fn make_owned_transmit( - socket: PeerSocket, +fn make_owned_transmit( + socket: PeerSocket, message: &[u8], - allocations: &mut HashMap, + allocations: &mut HashMap>, now: Instant, -) -> Option> { +) -> Option> +where + RId: Copy + Eq + Hash + PartialEq + fmt::Debug, +{ let transmit = match socket { PeerSocket::Direct { dest: remote, diff --git a/rust/connlib/snownet/tests/lib.rs b/rust/connlib/snownet/tests/lib.rs index 865165b9b..386c8f885 100644 --- a/rust/connlib/snownet/tests/lib.rs +++ b/rust/connlib/snownet/tests/lib.rs @@ -16,6 +16,8 @@ use tracing_subscriber::util::SubscriberInitExt; #[test] fn smoke_direct() { let _guard = setup_tracing(); + let firewall = Firewall::default(); + let mut clock = Clock::new(); let (alice, bob) = alice_and_bob(); @@ -23,10 +25,8 @@ fn smoke_direct() { TestNode::new(info_span!("Alice"), alice, "1.1.1.1:80").with_primary_as_host_candidate(); let mut bob = TestNode::new(info_span!("Bob"), bob, "1.1.1.2:80").with_primary_as_host_candidate(); - let firewall = Firewall::default(); - let mut clock = Clock::new(); - handshake(&mut alice, &mut bob, &[], &clock); + handshake(&mut alice, &mut bob, &clock); loop { if alice.is_connected_to(&bob) && bob.is_connected_to(&alice) { @@ -40,20 +40,23 @@ fn smoke_direct() { #[test] fn smoke_relayed() { let _guard = setup_tracing(); - - let (alice, bob) = alice_and_bob(); - - let relay = TestRelay::new(IpAddr::V4(Ipv4Addr::LOCALHOST), debug_span!("Roger")); - let mut alice = TestNode::new(debug_span!("Alice"), alice, "1.1.1.1:80"); - let mut bob = TestNode::new(debug_span!("Bob"), bob, "2.2.2.2:80"); + let mut clock = Clock::new(); let firewall = Firewall::default() .with_block_rule("1.1.1.1:80", "2.2.2.2:80") .with_block_rule("2.2.2.2:80", "1.1.1.1:80"); - let mut clock = Clock::new(); - let mut relays = [relay]; + let (alice, bob) = alice_and_bob(); - handshake(&mut alice, &mut bob, &relays, &clock); + let mut relays = [TestRelay::new( + IpAddr::V4(Ipv4Addr::LOCALHOST), + debug_span!("Roger"), + )]; + let mut alice = TestNode::new(debug_span!("Alice"), alice, "1.1.1.1:80") + .with_relays(&mut relays, clock.now); + let mut bob = + TestNode::new(debug_span!("Bob"), bob, "2.2.2.2:80").with_relays(&mut relays, clock.now); + + handshake(&mut alice, &mut bob, &clock); loop { if alice.is_connected_to(&bob) && bob.is_connected_to(&alice) { @@ -67,18 +70,21 @@ fn smoke_relayed() { #[test] fn reconnect_discovers_new_interface() { let _guard = setup_tracing(); + let mut clock = Clock::new(); + let firewall = Firewall::default(); let (alice, bob) = alice_and_bob(); - let relay = TestRelay::new(IpAddr::V4(Ipv4Addr::LOCALHOST), debug_span!("Roger")); - let mut alice = TestNode::new(debug_span!("Alice"), alice, "1.1.1.1:80"); - let mut bob = TestNode::new(debug_span!("Bob"), bob, "2.2.2.2:80"); - let firewall = Firewall::default(); - let mut clock = Clock::new(); + let mut relays = [TestRelay::new( + IpAddr::V4(Ipv4Addr::LOCALHOST), + debug_span!("Roger"), + )]; + let mut alice = TestNode::new(debug_span!("Alice"), alice, "1.1.1.1:80") + .with_relays(&mut relays, clock.now); + let mut bob = + TestNode::new(debug_span!("Bob"), bob, "2.2.2.2:80").with_relays(&mut relays, clock.now); - let mut relays = [relay]; - - handshake(&mut alice, &mut bob, &relays, &clock); + handshake(&mut alice, &mut bob, &clock); loop { if alice.is_connected_to(&bob) && bob.is_connected_to(&alice) { @@ -177,11 +183,10 @@ fn answer_after_stale_connection_does_not_panic() { fn only_generate_candidate_event_after_answer() { let local_candidate = SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), 10000); - let mut alice = ClientNode::::new(StaticSecret::random_from_rng(rand::thread_rng())); - + let mut alice = ClientNode::::new(StaticSecret::random_from_rng(rand::thread_rng())); alice.add_local_host_candidate(local_candidate).unwrap(); - let mut bob = ServerNode::::new(StaticSecret::random_from_rng(rand::thread_rng())); + let mut bob = ServerNode::::new(StaticSecret::random_from_rng(rand::thread_rng())); let offer = alice.new_connection( 1, @@ -219,12 +224,11 @@ fn only_generate_candidate_event_after_answer() { #[test] fn second_connection_with_same_relay_reuses_allocation() { - let mut alice = ClientNode::::new(StaticSecret::random_from_rng(rand::thread_rng())); - - let _ = alice.new_connection( + let mut alice = ClientNode::new(StaticSecret::random_from_rng(rand::thread_rng())); + _ = alice.new_connection( 1, HashSet::new(), - HashSet::from([relay("user1", "pass1", "realm1")]), + HashSet::from([relay(1, "user1", "pass1", "realm1")]), Instant::now(), Instant::now(), ); @@ -236,7 +240,7 @@ fn second_connection_with_same_relay_reuses_allocation() { let _ = alice.new_connection( 2, HashSet::new(), - HashSet::from([relay("user1", "pass1", "realm1")]), + HashSet::from([relay(1, "user1", "pass1", "realm1")]), Instant::now(), Instant::now(), ); @@ -252,14 +256,18 @@ fn setup_tracing() -> tracing::subscriber::DefaultGuard { .set_default() } -fn alice_and_bob() -> (ClientNode, ServerNode) { - let alice = ClientNode::::new(StaticSecret::random_from_rng(rand::thread_rng())); - let bob = ServerNode::::new(StaticSecret::random_from_rng(rand::thread_rng())); +fn alice_and_bob() -> (ClientNode, ServerNode) { + let alice = ClientNode::new(StaticSecret::random_from_rng(rand::thread_rng())); + let bob = ServerNode::new(StaticSecret::random_from_rng(rand::thread_rng())); (alice, bob) } -fn send_offer(alice: &mut ClientNode, bob: &mut ServerNode, now: Instant) -> Answer { +fn send_offer( + alice: &mut ClientNode, + bob: &mut ServerNode, + now: Instant, +) -> Answer { let offer = alice.new_connection(1, HashSet::new(), HashSet::new(), Instant::now(), now); bob.accept_connection( @@ -272,8 +280,14 @@ fn send_offer(alice: &mut ClientNode, bob: &mut ServerNode, now: Insta ) } -fn relay(username: &str, pass: &str, realm: &str) -> (SocketAddr, String, String, String) { +fn relay( + id: u64, + username: &str, + pass: &str, + realm: &str, +) -> (u64, SocketAddr, String, String, String) { ( + id, RELAY, username.to_owned(), pass.to_owned(), @@ -520,18 +534,18 @@ impl TestRelay { } enum EitherNode { - Server(ServerNode), - Client(ClientNode), + Server(ServerNode), + Client(ClientNode), } -impl From> for EitherNode { - fn from(value: ClientNode) -> Self { +impl From> for EitherNode { + fn from(value: ClientNode) -> Self { Self::Client(value) } } -impl From> for EitherNode { - fn from(value: ServerNode) -> Self { +impl From> for EitherNode { + fn from(value: ServerNode) -> Self { Self::Server(value) } } @@ -586,14 +600,14 @@ impl EitherNode { } } - fn as_client_mut(&mut self) -> Option<&mut ClientNode> { + fn as_client_mut(&mut self) -> Option<&mut ClientNode> { match self { EitherNode::Server(_) => None, EitherNode::Client(c) => Some(c), } } - fn as_server_mut(&mut self) -> Option<&mut ServerNode> { + fn as_server_mut(&mut self) -> Option<&mut ServerNode> { match self { EitherNode::Server(s) => Some(s), EitherNode::Client(_) => None, @@ -644,6 +658,36 @@ impl TestNode { } } + fn with_relays(mut self, relays: &mut [TestRelay], now: Instant) -> Self { + let username = match self.node { + EitherNode::Server(_) => "server", + EitherNode::Client(_) => "client", + }; + + let turn_servers = relays + .iter() + .enumerate() + .map(|(idx, relay)| { + let (username, password) = relay.make_credentials(username); + + ( + idx as u64, + relay.listen_addr, + username, + password, + "firezone".to_owned(), + ) + }) + .collect::>(); + + match &mut self.node { + EitherNode::Server(s) => s.upsert_turn_servers(&turn_servers, now), + EitherNode::Client(c) => c.upsert_turn_servers(&turn_servers, now), + } + + self + } + fn switch_network(&mut self, new_primary: &str) { self.primary = new_primary.parse().unwrap(); self.local.push(self.primary); @@ -742,26 +786,15 @@ impl TestNode { } } -fn handshake(client: &mut TestNode, server: &mut TestNode, relays: &[TestRelay], clock: &Clock) { +fn handshake(client: &mut TestNode, server: &mut TestNode, clock: &Clock) { let client_node = &mut client.node.as_client_mut().unwrap(); let server_node = &mut server.node.as_server_mut().unwrap(); - let client_credentials = relays.iter().map(|relay| { - let (username, password) = relay.make_credentials("client"); - - (relay.listen_addr, username, password, "firezone".to_owned()) - }); - let server_credentials = relays.iter().map(|relay| { - let (username, password) = relay.make_credentials("client"); - - (relay.listen_addr, username, password, "firezone".to_owned()) - }); - let offer = client.span.in_scope(|| { client_node.new_connection( 1, HashSet::default(), - HashSet::from_iter(client_credentials), + HashSet::default(), clock.now, clock.now, ) @@ -772,7 +805,7 @@ fn handshake(client: &mut TestNode, server: &mut TestNode, relays: &[TestRelay], offer, client_node.public_key(), HashSet::default(), - HashSet::from_iter(server_credentials), + HashSet::default(), clock.now, ) }); diff --git a/rust/connlib/tunnel/src/client.rs b/rust/connlib/tunnel/src/client.rs index ed234c2c0..3636072ee 100644 --- a/rust/connlib/tunnel/src/client.rs +++ b/rust/connlib/tunnel/src/client.rs @@ -6,7 +6,7 @@ use bimap::BiMap; use connlib_shared::error::{ConnlibError as Error, ConnlibError}; use connlib_shared::messages::{ Answer, ClientPayload, DnsServer, DomainResponse, GatewayId, Interface as InterfaceConfig, - IpDnsServer, Key, Offer, Relay, RequestConnection, ResourceDescription, + IpDnsServer, Key, Offer, Relay, RelayId, RequestConnection, ResourceDescription, ResourceDescriptionCidr, ResourceDescriptionDns, ResourceId, ReuseConnection, }; use connlib_shared::{Callbacks, Dname, PublicKey, StaticSecret}; @@ -73,6 +73,13 @@ where Ok(()) } + pub fn upsert_relays(&mut self, relays: Vec) { + self.role_state.upsert_relays( + turn(&relays, |addr| self.io.sockets_ref().can_handle(addr)), + Instant::now(), + ) + } + /// Adds a the given resource to the tunnel. pub fn add_resources( &mut self, @@ -250,7 +257,7 @@ pub struct ClientState { pub peers: PeerStore>, - node: ClientNode, + node: ClientNode, pub ip_provider: IpProvider, @@ -425,7 +432,7 @@ impl ClientState { resource_id: ResourceId, gateway_id: GatewayId, allowed_stun_servers: HashSet, - allowed_turn_servers: HashSet<(SocketAddr, String, String, String)>, + allowed_turn_servers: HashSet<(RelayId, SocketAddr, String, String, String)>, ) -> connlib_shared::Result { tracing::trace!("create_or_reuse_connection"); @@ -985,6 +992,14 @@ impl ClientState { true } + + fn upsert_relays( + &mut self, + relays: HashSet<(RelayId, SocketAddr, String, String, String)>, + now: Instant, + ) { + self.node.upsert_turn_servers(&relays, now); + } } fn effective_dns_servers( diff --git a/rust/connlib/tunnel/src/gateway.rs b/rust/connlib/tunnel/src/gateway.rs index b2bfd0922..9c64467fa 100644 --- a/rust/connlib/tunnel/src/gateway.rs +++ b/rust/connlib/tunnel/src/gateway.rs @@ -7,7 +7,7 @@ use boringtun::x25519::PublicKey; use chrono::{DateTime, Utc}; use connlib_shared::messages::{ Answer, ClientId, ConnectionAccepted, DomainResponse, Interface as InterfaceConfig, Key, Offer, - Relay, ResolvedResourceDescriptionDns, ResourceDescription, ResourceId, + Relay, RelayId, ResolvedResourceDescriptionDns, ResourceDescription, ResourceId, }; use connlib_shared::{Callbacks, Dname, Error, Result, StaticSecret}; use ip_network::IpNetwork; @@ -189,8 +189,7 @@ where pub struct GatewayState { pub peers: PeerStore, - node: ServerNode, - + node: ServerNode, next_expiry_resources_check: Option, buffered_events: VecDeque, } @@ -309,4 +308,12 @@ impl GatewayState { pub(crate) fn poll_event(&mut self) -> Option { self.buffered_events.pop_front() } + + pub(crate) fn upsert_relays( + &mut self, + relays: HashSet<(RelayId, SocketAddr, String, String, String)>, + now: Instant, + ) { + self.node.upsert_turn_servers(&relays, now); + } } diff --git a/rust/connlib/tunnel/src/lib.rs b/rust/connlib/tunnel/src/lib.rs index e85d27e10..eea8b49b5 100644 --- a/rust/connlib/tunnel/src/lib.rs +++ b/rust/connlib/tunnel/src/lib.rs @@ -5,7 +5,7 @@ use boringtun::x25519::StaticSecret; use connlib_shared::{ - messages::{ClientId, GatewayId, ResourceId, ReuseConnection}, + messages::{ClientId, GatewayId, Relay, ResourceId, ReuseConnection}, Callbacks, Result, }; use io::Io; @@ -18,6 +18,7 @@ use std::{ pub use client::{ClientState, Request}; pub use gateway::GatewayState; pub use sockets::Sockets; +use utils::turn; mod client; mod device_channel; @@ -171,6 +172,13 @@ where }) } + pub fn upsert_relays(&mut self, relays: Vec) { + self.role_state.upsert_relays( + turn(&relays, |addr| self.io.sockets_ref().can_handle(addr)), + Instant::now(), + ) + } + pub fn poll_next_event(&mut self, cx: &mut Context<'_>) -> Poll> { loop { if let Some(other) = self.role_state.poll_event() { diff --git a/rust/connlib/tunnel/src/utils.rs b/rust/connlib/tunnel/src/utils.rs index 9904ab514..4ef1430da 100644 --- a/rust/connlib/tunnel/src/utils.rs +++ b/rust/connlib/tunnel/src/utils.rs @@ -1,5 +1,5 @@ use crate::REALM; -use connlib_shared::messages::Relay; +use connlib_shared::messages::{Relay, RelayId}; use std::{collections::HashSet, net::SocketAddr, time::Instant}; pub fn stun(relays: &[Relay], predicate: impl Fn(&SocketAddr) -> bool) -> HashSet { @@ -19,12 +19,13 @@ pub fn stun(relays: &[Relay], predicate: impl Fn(&SocketAddr) -> bool) -> HashSe pub fn turn( relays: &[Relay], predicate: impl Fn(&SocketAddr) -> bool, -) -> HashSet<(SocketAddr, String, String, String)> { +) -> HashSet<(RelayId, SocketAddr, String, String, String)> { relays .iter() .filter_map(|r| { if let Relay::Turn(r) = r { Some(( + r.id, r.addr, r.username.clone(), r.password.clone(), @@ -34,7 +35,7 @@ pub fn turn( None } }) - .filter(|(socket, _, _, _)| predicate(socket)) + .filter(|(_, socket, _, _, _)| predicate(socket)) .collect() } diff --git a/rust/gateway/src/main.rs b/rust/gateway/src/main.rs index b4f516206..18922f798 100644 --- a/rust/gateway/src/main.rs +++ b/rust/gateway/src/main.rs @@ -110,6 +110,7 @@ async fn run(login: LoginUrl, private_key: StaticSecret) -> Result { tunnel .set_interface(&init.interface) .context("Failed to set interface")?; + tunnel.upsert_relays(init.relays); let mut eventloop = Eventloop::new(tunnel, portal); diff --git a/rust/gateway/src/messages.rs b/rust/gateway/src/messages.rs index c684f4c85..d5c32518b 100644 --- a/rust/gateway/src/messages.rs +++ b/rust/gateway/src/messages.rs @@ -13,6 +13,8 @@ use serde::{Deserialize, Serialize}; pub struct InitGateway { pub interface: Interface, pub config: Config, + #[serde(default)] + pub relays: Vec, } #[derive(Debug, Deserialize, Clone, PartialEq, Eq)] @@ -149,10 +151,12 @@ mod test { }, "relays": [ { + "id": "0bfc5e02-a093-423b-827b-002d7d2bb407", "type": "stun", "addr": "172.28.0.101:3478" }, { + "id": "0a133356-7a9e-4b9a-b413-0d95a5720fd8", "type": "turn", "username": "1719367575:ZQHcVGkdnfgGmcP1", "password": "ZWYiBeFHOJyYq0mcwAXjRpcuXIJJpzWlOXVdxwttrWg", @@ -178,6 +182,7 @@ mod test { ipv4_masquerade_enabled: true, ipv6_masquerade_enabled: true, }, + relays: vec![], }); let message = r#"{"event":"init","ref":null,"topic":"gateway","payload":{"interface":{"ipv6":"fd00:2021:1111::2c:f6ab","ipv4":"100.115.164.78"},"config":{"ipv4_masquerade_enabled":true,"ipv6_masquerade_enabled":true}}}"#; @@ -197,6 +202,7 @@ mod test { ipv4_masquerade_enabled: true, ipv6_masquerade_enabled: true, }, + relays: vec![], }); let message = r#"{"event":"init","ref":null,"topic":"gateway","irrelevant":"field","payload":{"more":"info","interface":{"ipv6":"fd00:2021:1111::2c:f6ab","ipv4":"100.115.164.78"},"config":{"ipv4_masquerade_enabled":true,"ipv6_masquerade_enabled":true,"ignored":"field"}}}"#; @@ -216,6 +222,7 @@ mod test { ipv4_masquerade_enabled: true, ipv6_masquerade_enabled: true, }, + relays: vec![], }); let message = r#"{"event":"init","ref":null,"topic":"gateway","payload":{"additional":null,"interface":{"ipv6":"fd00:2021:1111::2c:f6ab","ipv4":"100.115.164.78"},"config":{"ipv4_masquerade_enabled":true,"ipv6_masquerade_enabled":true}}}"#; @@ -235,6 +242,7 @@ mod test { ipv4_masquerade_enabled: true, ipv6_masquerade_enabled: true, }, + relays: vec![], }); let message = r#"{"event":"init","ref":null,"topic":"gateway","payload":{"additional":0.3,"interface":{"ipv6":"fd00:2021:1111::2c:f6ab","ipv4":"100.115.164.78"},"config":{"ipv4_masquerade_enabled":true,"ipv6_masquerade_enabled":true}}}"#; @@ -254,6 +262,7 @@ mod test { ipv4_masquerade_enabled: true, ipv6_masquerade_enabled: true, }, + relays: vec![], }); let message = r#"{"event":"init","ref":null,"topic":"gateway","payload":{"additional":true,"interface":{"ipv6":"fd00:2021:1111::2c:f6ab","ipv4":"100.115.164.78"},"config":{"ipv4_masquerade_enabled":true,"ipv6_masquerade_enabled":true}}}"#; @@ -273,6 +282,7 @@ mod test { ipv4_masquerade_enabled: true, ipv6_masquerade_enabled: true, }, + relays: vec![], }); let message = r#"{"event":"init","ref":null,"topic":"gateway","payload":{"additional":{"ignored":"field"},"interface":{"ipv6":"fd00:2021:1111::2c:f6ab","ipv4":"100.115.164.78"},"config":{"ipv4_masquerade_enabled":true,"ipv6_masquerade_enabled":true}}}"#; @@ -292,6 +302,7 @@ mod test { ipv4_masquerade_enabled: true, ipv6_masquerade_enabled: true, }, + relays: vec![], }); let message = r#"{"event":"init","ref":null,"topic":"gateway","payload":{"additional":[true,false],"interface":{"ipv6":"fd00:2021:1111::2c:f6ab","ipv4":"100.115.164.78"},"config":{"ipv4_masquerade_enabled":true,"ipv6_masquerade_enabled":true}}}"#; diff --git a/rust/snownet-tests/src/main.rs b/rust/snownet-tests/src/main.rs index fe1f0b157..bb617ff23 100644 --- a/rust/snownet-tests/src/main.rs +++ b/rust/snownet-tests/src/main.rs @@ -52,6 +52,7 @@ async fn main() -> Result<()> { .context("Failed to parse `TURNERVER`")? .map(|ip| { ( + 1, SocketAddr::new(ip, 3478), "2000000000:client".to_owned(), // TODO: Use different credentials per role. "+Qou8TSjw9q3JMnWET7MbFsQh/agwz/LURhpfX7a0hE".to_owned(), @@ -76,7 +77,7 @@ async fn main() -> Result<()> { match role { Role::Dialer => { - let mut pool = ClientNode::::new(private_key); + let mut pool = ClientNode::::new(private_key); let offer = pool.new_connection( 1, @@ -161,7 +162,7 @@ async fn main() -> Result<()> { } } Role::Listener => { - let mut pool = ServerNode::::new(private_key); + let mut pool = ServerNode::::new(private_key); let offer = redis_connection .blpop::<_, (String, wire::Offer)>("offers", 10.0) @@ -334,7 +335,7 @@ impl FromStr for Role { struct Eventloop { socket: UdpSocket, - pool: Node, + pool: Node, timeout: BoxFuture<'static, Instant>, candidate_rx: mpsc::Receiver, read_buffer: Box<[u8; MAX_UDP_SIZE]>, @@ -344,7 +345,7 @@ struct Eventloop { impl Eventloop { fn new( socket: UdpSocket, - pool: Node, + pool: Node, candidate_rx: mpsc::Receiver, ) -> Self { Self {