chore(connlib): upsert relays from "init" message (#4567)

This is another step towards #4548. The portal now includes a list of
relays as part of the "init" message. Any time we receive an "init", we
will now upsert those relays based on their ID. This requires us to
change our internal bookkeeping of relays from indexing them by address
to indexing by ID.

To ensure that this works correctly, the unit tests are rewritten to use
the new `upsert_relays` API.

---------

Signed-off-by: Thomas Eizinger <thomas@eizinger.io>
Co-authored-by: Jamil <jamilbk@users.noreply.github.com>
This commit is contained in:
Thomas Eizinger
2024-04-16 07:30:49 +10:00
committed by GitHub
parent 53968063a5
commit bfe07d7ebd
13 changed files with 380 additions and 185 deletions

View File

@@ -179,6 +179,7 @@ where
IngressMessages::Init(InitClient {
interface,
resources,
relays,
}) => {
if let Err(e) = self.tunnel.set_new_interface_config(interface) {
tracing::warn!("Failed to set interface on tunnel: {e}");
@@ -187,6 +188,7 @@ where
tracing::info!("Firezone Started!");
let _ = self.tunnel.set_resources(resources);
self.tunnel.upsert_relays(relays)
}
IngressMessages::ResourceCreatedOrUpdated(resource) => {
let resource_id = resource.id();

View File

@@ -10,6 +10,8 @@ pub struct InitClient {
pub interface: Interface,
#[serde(default)]
pub resources: Vec<ResourceDescription>,
#[serde(default)]
pub relays: Vec<Relay>,
}
#[derive(Debug, PartialEq, Eq, Deserialize, Clone)]
@@ -196,6 +198,7 @@ mod test {
name: "gitlab.mycorp.com".to_string(),
}),
],
relays: vec![],
}),
None,
);
@@ -254,6 +257,7 @@ mod test {
name: "gitlab.mycorp.com".to_string(),
}),
],
relays: vec![],
}),
None,
);
@@ -304,6 +308,7 @@ mod test {
upstream_dns: vec![],
},
resources: vec![],
relays: vec![],
}),
None,
);
@@ -337,6 +342,7 @@ mod test {
upstream_dns: vec![],
},
resources: vec![],
relays: vec![],
}),
None,
);
@@ -370,6 +376,7 @@ mod test {
upstream_dns: vec![],
},
resources: vec![],
relays: vec![],
}),
None,
);
@@ -403,6 +410,7 @@ mod test {
upstream_dns: vec![],
},
resources: vec![],
relays: vec![],
}),
None,
);
@@ -460,18 +468,22 @@ mod test {
resource_id: "f16ecfa0-a94f-4bfd-a2ef-1cc1f2ef3da3".parse().unwrap(),
relays: vec![
Relay::Stun(Stun {
id: "c9cb8892-e355-41e6-a882-b6d6c38beb66".parse().unwrap(),
addr: "189.172.73.111:3478".parse().unwrap(),
}),
Relay::Turn(Turn {
id: "6a7f3ba9-d9c4-4633-81ab-311276993fbd".parse().unwrap(),
expires_at: DateTime::from_timestamp(1686629954, 0).unwrap(),
addr: "189.172.73.111:3478".parse().unwrap(),
username: "1686629954:C7I74wXYFdFugMYM".to_string(),
password: "OXXRDJ7lJN1cm+4+2BWgL87CxDrvpVrn5j3fnJHye98".to_string(),
}),
Relay::Stun(Stun {
id: "1ea93681-aeda-467f-9dca-219c06c18c3d".parse().unwrap(),
addr: "[::1]:3478".parse().unwrap(),
}),
Relay::Turn(Turn {
id: "94209389-e18d-4453-a00d-2583ba857592".parse().unwrap(),
expires_at: DateTime::from_timestamp(1686629954, 0).unwrap(),
addr: "[::1]:3478".parse().unwrap(),
username: "1686629954:dpHxHfNfOhxPLfMG".to_string(),
@@ -493,10 +505,12 @@ mod test {
"gateway_remote_ip": "172.28.0.1",
"relays": [
{
"id": "c9cb8892-e355-41e6-a882-b6d6c38beb66",
"type":"stun",
"addr": "189.172.73.111:3478"
},
{
"id": "6a7f3ba9-d9c4-4633-81ab-311276993fbd",
"expires_at": 1686629954,
"password": "OXXRDJ7lJN1cm+4+2BWgL87CxDrvpVrn5j3fnJHye98",
"type": "turn",
@@ -504,10 +518,12 @@ mod test {
"username":"1686629954:C7I74wXYFdFugMYM"
},
{
"id": "1ea93681-aeda-467f-9dca-219c06c18c3d",
"type": "stun",
"addr": "[::1]:3478"
},
{
"id": "94209389-e18d-4453-a00d-2583ba857592",
"expires_at": 1686629954,
"password": "8Wtb+3YGxO6ia23JUeSEfZ2yFD6RhGLkbgZwqjebyKY",
"type": "turn",

View File

@@ -18,9 +18,21 @@ use crate::Dname;
#[derive(Hash, Debug, Deserialize, Serialize, Clone, Copy, PartialEq, Eq)]
pub struct GatewayId(Uuid);
#[derive(Hash, Debug, Deserialize, Serialize, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct ResourceId(Uuid);
#[derive(Hash, Debug, Deserialize, Serialize, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct RelayId(Uuid);
impl FromStr for RelayId {
type Err = uuid::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(RelayId(Uuid::parse_str(s)?))
}
}
impl ResourceId {
pub fn random() -> ResourceId {
ResourceId(Uuid::new_v4())
@@ -68,6 +80,12 @@ impl fmt::Display for GatewayId {
}
}
impl fmt::Display for RelayId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
/// Represents a wireguard peer.
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct Peer {
@@ -365,6 +383,7 @@ pub enum Relay {
/// Represent a TURN relay
#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
pub struct Turn {
pub id: RelayId,
//// Expire time of the username/password in unix millisecond timestamp UTC
#[serde(with = "ts_seconds")]
pub expires_at: DateTime<Utc>,
@@ -380,6 +399,8 @@ pub struct Turn {
/// Stun kind of relay
#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
pub struct Stun {
pub id: RelayId,
/// Address for the relay
pub addr: SocketAddr,
}

View File

@@ -6,6 +6,7 @@ use crate::{
};
use ::backoff::backoff::Backoff;
use bytecodec::{DecodeExt as _, EncodeExt as _};
use core::fmt;
use rand::random;
use std::{
collections::{HashMap, VecDeque},
@@ -35,7 +36,9 @@ const REQUEST_TIMEOUT: Duration = Duration::from_secs(1);
///
/// Allocations have a lifetime and need to be continuously refreshed to stay active.
#[derive(Debug)]
pub struct Allocation {
pub struct Allocation<RId> {
id: RId,
server: SocketAddr,
/// If present, the last address the relay observed for us.
@@ -70,16 +73,19 @@ pub struct Allocation {
/// Note that any combination of IP versions is possible here.
/// We might have allocated an IPv6 address on a TURN server that we are talking to IPv4 and vice versa.
#[derive(Debug, Clone, Copy)]
pub struct Socket {
/// The server this socket was allocated on.
server: SocketAddr,
pub struct Socket<RId> {
/// The ID of the relay.
id: RId,
/// The address of the socket that was allocated.
address: SocketAddr,
}
impl Socket {
pub fn server(&self) -> SocketAddr {
self.server
impl<RId> Socket<RId>
where
RId: Copy,
{
pub fn id(&self) -> RId {
self.id
}
pub fn address(&self) -> SocketAddr {
@@ -87,8 +93,12 @@ impl Socket {
}
}
impl Allocation {
impl<RId> Allocation<RId>
where
RId: Copy + fmt::Debug,
{
pub fn new(
id: RId,
server: SocketAddr,
username: Username,
password: String,
@@ -96,6 +106,7 @@ impl Allocation {
now: Instant,
) -> Self {
let mut allocation = Self {
id,
server,
last_srflx_candidate: Default::default(),
ip4_allocation: Default::default(),
@@ -134,11 +145,13 @@ impl Allocation {
/// This will implicitly trigger a [`refresh`](Allocation::refresh) to ensure these credentials are valid.
pub fn update_credentials(
&mut self,
socket: SocketAddr,
username: Username,
password: &str,
realm: Realm,
now: Instant,
) {
self.server = socket;
self.username = username;
self.realm = realm;
self.password = password.to_owned();
@@ -392,7 +405,7 @@ impl Allocation {
from: SocketAddr,
packet: &'p [u8],
now: Instant,
) -> Option<(SocketAddr, &'p [u8], Socket)> {
) -> Option<(SocketAddr, &'p [u8], Socket<RId>)> {
if from != self.server {
return None;
}
@@ -534,7 +547,7 @@ impl Allocation {
}
pub fn encode_to_slice(
&mut self,
&self,
peer: SocketAddr,
packet: &[u8],
header: &mut [u8],
@@ -595,24 +608,28 @@ impl Allocation {
is_ip4 || is_ip6
}
pub fn ip4_socket(&self) -> Option<Socket> {
pub fn server(&self) -> SocketAddr {
self.server
}
pub fn ip4_socket(&self) -> Option<Socket<RId>> {
let address = self.ip4_allocation.as_ref().map(|c| c.addr())?;
debug_assert!(address.is_ipv4());
Some(Socket {
server: self.server,
id: self.id,
address,
})
}
pub fn ip6_socket(&self) -> Option<Socket> {
pub fn ip6_socket(&self) -> Option<Socket<RId>> {
let address = self.ip6_allocation.as_ref().map(|c| c.addr())?;
debug_assert!(address.is_ipv6());
Some(Socket {
server: self.server,
id: self.id,
address,
})
}
@@ -1080,6 +1097,7 @@ mod tests {
const PEER2_IP6: SocketAddr = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 20000);
const RELAY: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3478);
const RELAY2: SocketAddr = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 3478);
const RELAY_ADDR_IP4: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9999);
const RELAY_ADDR_IP6: SocketAddr = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 9999);
@@ -1909,6 +1927,23 @@ mod tests {
)
}
#[test]
fn new_address_is_used_for_new_messages() {
let now = Instant::now();
let mut allocation = Allocation::for_test(now).with_allocate_response(&[RELAY_ADDR_IP4]);
let _drained_messages = iter::from_fn(|| allocation.poll_transmit()).collect::<Vec<_>>();
allocation.update_credentials(
RELAY2,
allocation.username.clone(),
&allocation.password.clone(),
allocation.realm.clone(),
now,
);
assert_eq!(allocation.poll_transmit().unwrap().dst, RELAY2)
}
fn ch(peer: SocketAddr, now: Instant) -> Channel {
Channel {
peer,
@@ -2007,9 +2042,10 @@ mod tests {
message.get_attribute::<XorPeerAddress>().unwrap().address()
}
impl Allocation {
fn for_test(start: Instant) -> Allocation {
impl Allocation<u64> {
fn for_test(start: Instant) -> Self {
Allocation::new(
1,
RELAY,
Username::new("foobar".to_owned()).unwrap(),
"baz".to_owned(),
@@ -2044,6 +2080,7 @@ mod tests {
fn refresh_with_same_credentials(&mut self) {
self.update_credentials(
self.server,
Username::new("foobar".to_owned()).unwrap(),
"baz",
Realm::new("firezone".to_owned()).unwrap(),

View File

@@ -43,9 +43,9 @@ const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(20);
const MAX_UDP_SIZE: usize = (1 << 16) - 1;
/// Manages a set of wireguard connections for a server.
pub type ServerNode<TId> = Node<Server, TId>;
pub type ServerNode<TId, RId> = Node<Server, TId, RId>;
/// Manages a set of wireguard connections for a client.
pub type ClientNode<TId> = Node<Client, TId>;
pub type ClientNode<TId, RId> = Node<Client, TId, RId>;
pub enum Server {}
pub enum Client {}
@@ -72,7 +72,14 @@ pub enum Client {}
/// 1. Change [`Node`]'s state (either via network messages, adding a new connection, etc)
/// 2. Check [`Node::poll_timeout`] for when to wake the [`Node`]
/// 3. Call [`Node::handle_timeout`] once that time is reached
pub struct Node<T, TId> {
///
/// A [`Node`] is generic over three things:
/// - `T`: The mode it is operating in, either [`Client`] or [`Server`].
/// - `TId`: The type to use for uniquely identifying connections.
/// - `RId`: The type to use for uniquely identifying relays.
///
/// We favor these generic parameters over having our own IDs to avoid mapping back and forth in upper layers.
pub struct Node<T, TId, RId> {
private_key: StaticSecret,
index: IndexLfsr,
rate_limiter: Arc<RateLimiter>,
@@ -82,9 +89,9 @@ pub struct Node<T, TId> {
next_rate_limiter_reset: Option<Instant>,
bindings: HashMap<SocketAddr, StunBinding>,
allocations: HashMap<SocketAddr, Allocation>,
allocations: HashMap<RId, Allocation<RId>>,
connections: Connections<TId>,
connections: Connections<TId, RId>,
pending_events: VecDeque<Event<TId>>,
buffer: Box<[u8; MAX_UDP_SIZE]>,
@@ -112,9 +119,10 @@ pub enum Error {
BadLocalAddress(#[from] str0m::error::IceError),
}
impl<T, TId> Node<T, TId>
impl<T, TId, RId> Node<T, TId, RId>
where
TId: Eq + Hash + Copy + fmt::Display,
RId: Copy + Eq + Hash + PartialEq + fmt::Debug + fmt::Display,
{
pub fn new(private_key: StaticSecret) -> Self {
let public_key = &(&private_key).into();
@@ -230,7 +238,7 @@ where
/// To do that, we need to check all candidates of each allocation and compare their IP.
/// The same relay might be reachable over IPv4 and IPv6.
#[must_use]
fn same_relay_as_peer(&mut self, candidate: &Candidate) -> Option<&mut Allocation> {
fn same_relay_as_peer(&mut self, candidate: &Candidate) -> Option<&mut Allocation<RId>> {
self.allocations.iter_mut().find_map(|(_, allocation)| {
allocation
.current_candidates()
@@ -322,7 +330,7 @@ where
payload: Cow::Borrowed(packet),
})),
PeerSocket::Relay { relay, dest: peer } => {
let Some(allocation) = self.allocations.get_mut(&relay) else {
let Some(allocation) = self.allocations.get(&relay) else {
tracing::warn!(%relay, "No allocation");
return Ok(None);
};
@@ -339,7 +347,7 @@ where
Ok(Some(Transmit {
src: None,
dst: relay,
dst: allocation.server(),
payload: Cow::Borrowed(channel_data_packet),
}))
}
@@ -440,6 +448,35 @@ where
self.buffered_transmits.pop_front()
}
pub fn upsert_turn_servers(
&mut self,
servers: &HashSet<(RId, SocketAddr, String, String, String)>,
now: Instant,
) {
for (id, server, username, password, realm) in servers {
let Ok(username) = Username::new(username.to_owned()) else {
tracing::debug!(%username, "Invalid TURN username");
continue;
};
let Ok(realm) = Realm::new(realm.to_owned()) else {
tracing::debug!(%realm, "Invalid TURN realm");
continue;
};
if let Some(existing) = self.allocations.get_mut(id) {
existing.update_credentials(*server, username, password, realm, now);
continue;
}
self.allocations.insert(
*id,
Allocation::new(*id, *server, username, password.clone(), realm, now),
);
tracing::info!(address = %server, "Added new TURN server");
}
}
#[must_use]
#[allow(clippy::too_many_arguments)]
fn init_connection(
@@ -449,7 +486,7 @@ where
key: [u8; 32],
intent_sent_at: Instant,
now: Instant,
) -> Connection {
) -> Connection<RId> {
agent.handle_timeout(now);
/// We set a Wireguard keep-alive to ensure the WG session doesn't timeout on an idle connection.
@@ -523,23 +560,36 @@ where
}
/// Tries to handle the packet using one of our [`Allocation`]s.
///
/// This function is in the hot-path of packet processing and thus must be as efficient as possible.
/// Even look-ups in [`HashMap`]s and linear searches across small lists are expensive at this point.
/// Thus, we use the first byte of the message as a heuristic for whether we should attempt to handle it here.
///
/// See <https://www.rfc-editor.org/rfc/rfc8656#name-channels-2> for details on de-multiplexing.
///
/// This heuristic might fail because we are also handling wireguard packets.
/// Those are fully encrypted and thus any byte pattern may appear at the front of the packet.
/// We can detect this by further checking the origin of the packet.
#[must_use]
#[allow(clippy::type_complexity)]
fn allocations_try_handle<'p>(
&mut self,
from: SocketAddr,
local: SocketAddr,
packet: &'p [u8],
now: Instant,
) -> ControlFlow<(), (SocketAddr, &'p [u8], Option<Socket>)> {
// First, check whether the packet is from a known allocation.
let Some(allocation) = self.allocations.get_mut(&from) else {
return ControlFlow::Continue((from, packet, None));
};
// See <https://www.rfc-editor.org/rfc/rfc8656#name-channels-2> for details on de-multiplexing.
match packet.first() {
) -> ControlFlow<(), (SocketAddr, &'p [u8], Option<Socket<RId>>)> {
match packet.first().copied() {
// STUN method range
Some(0..=3) => {
let Some(allocation) = self.allocations.values_mut().find(|a| a.server() == from)
else {
// False-positive, continue processing packet elsewhere
return ControlFlow::Continue((from, packet, None));
};
if allocation.handle_input(from, local, packet, now) {
// Successfully handled the packet
return ControlFlow::Break(());
}
@@ -547,8 +597,17 @@ where
ControlFlow::Break(()) // Stop processing the packet.
}
// Channel data number range
Some(64..=79) => {
let Some(allocation) = self.allocations.values_mut().find(|a| a.server() == from)
else {
// False-positive, continue processing packet elsewhere
return ControlFlow::Continue((from, packet, None));
};
if let Some((from, packet, socket)) = allocation.decapsulate(from, packet, now) {
// Successfully handled the packet and decapsulated the channel data message.
// Continue processing with the _unwrapped_ packet.
return ControlFlow::Continue((from, packet, Some(socket)));
}
@@ -556,7 +615,8 @@ where
ControlFlow::Break(()) // Stop processing the packet.
}
_ => ControlFlow::Continue((from, packet, None)),
// Byte is in a different range? Move on with processing the packet.
Some(_) | None => ControlFlow::Continue((from, packet, None)),
}
}
@@ -601,7 +661,7 @@ where
from: SocketAddr,
local: SocketAddr,
packet: &[u8],
relayed: Option<Socket>,
relayed: Option<Socket<RId>>,
buffer: &'b mut [u8],
now: Instant,
) -> ControlFlow<Result<(), Error>, (TId, MutableIpPacket<'b>)> {
@@ -676,9 +736,10 @@ where
}
}
impl<TId> Node<Client, TId>
impl<TId, RId> Node<Client, TId, RId>
where
TId: Eq + Hash + Copy + fmt::Display,
RId: Copy + Eq + Hash + PartialEq + fmt::Debug + fmt::Display,
{
/// Create a new connection indexed by the given ID.
///
@@ -690,7 +751,7 @@ where
&mut self,
id: TId,
stun_servers: HashSet<SocketAddr>,
turn_servers: HashSet<(SocketAddr, String, String, String)>,
turn_servers: HashSet<(RId, SocketAddr, String, String, String)>,
intent_sent_at: Instant,
now: Instant,
) -> Offer {
@@ -775,9 +836,10 @@ where
}
}
impl<TId> Node<Server, TId>
impl<TId, RId> Node<Server, TId, RId>
where
TId: Eq + Hash + Copy + fmt::Display,
RId: Copy + Eq + Hash + PartialEq + fmt::Debug + fmt::Display,
{
/// Accept a new connection indexed by the given ID.
///
@@ -791,7 +853,7 @@ where
offer: Offer,
remote: PublicKey,
stun_servers: HashSet<SocketAddr>,
turn_servers: HashSet<(SocketAddr, String, String, String)>,
turn_servers: HashSet<(RId, SocketAddr, String, String, String)>,
now: Instant,
) -> Answer {
debug_assert!(
@@ -838,9 +900,10 @@ where
}
}
impl<T, TId> Node<T, TId>
impl<T, TId, RId> Node<T, TId, RId>
where
TId: Eq + Hash + Copy + fmt::Display,
RId: Copy + Eq + Hash + PartialEq + fmt::Debug + fmt::Display,
{
fn upsert_stun_servers(&mut self, servers: &HashSet<SocketAddr>, now: Instant) {
for server in servers {
@@ -853,35 +916,6 @@ where
}
}
fn upsert_turn_servers(
&mut self,
servers: &HashSet<(SocketAddr, String, String, String)>,
now: Instant,
) {
for (server, username, password, realm) in servers {
let Ok(username) = Username::new(username.to_owned()) else {
tracing::debug!(%username, "Invalid TURN username");
continue;
};
let Ok(realm) = Realm::new(realm.to_owned()) else {
tracing::debug!(%realm, "Invalid TURN realm");
continue;
};
if let Some(existing) = self.allocations.get_mut(server) {
existing.update_credentials(username, password, realm, now);
continue;
}
self.allocations.insert(
*server,
Allocation::new(*server, username, password.clone(), realm, now),
);
tracing::info!(address = %server, "Added new TURN server");
}
}
fn seed_agent_with_local_candidates(&mut self, connection: TId, agent: &mut IceAgent) {
for candidate in self.host_candidates.iter().cloned() {
add_local_candidate(connection, agent, candidate, &mut self.pending_events);
@@ -915,12 +949,12 @@ where
}
}
struct Connections<TId> {
struct Connections<TId, RId> {
initial: HashMap<TId, InitialConnection>,
established: HashMap<TId, Connection>,
established: HashMap<TId, Connection<RId>>,
}
impl<TId> Default for Connections<TId> {
impl<TId, RId> Default for Connections<TId, RId> {
fn default() -> Self {
Self {
initial: Default::default(),
@@ -929,7 +963,7 @@ impl<TId> Default for Connections<TId> {
}
}
impl<TId> Connections<TId>
impl<TId, RId> Connections<TId, RId>
where
TId: Eq + Hash + Copy + fmt::Display,
{
@@ -974,15 +1008,15 @@ where
initial_agents.chain(negotiated_agents)
}
fn get_established_mut(&mut self, id: &TId) -> Option<&mut Connection> {
fn get_established_mut(&mut self, id: &TId) -> Option<&mut Connection<RId>> {
self.established.get_mut(id)
}
fn iter_established(&self) -> impl Iterator<Item = (TId, &Connection)> {
fn iter_established(&self) -> impl Iterator<Item = (TId, &Connection<RId>)> {
self.established.iter().map(|(id, conn)| (*id, conn))
}
fn iter_established_mut(&mut self) -> impl Iterator<Item = (TId, &mut Connection)> {
fn iter_established_mut(&mut self) -> impl Iterator<Item = (TId, &mut Connection<RId>)> {
self.established.iter_mut().map(|(id, conn)| (*id, conn))
}
@@ -996,13 +1030,16 @@ where
/// - `relay` is in fact a relay
/// - We have an allocation on the relay
/// - There is a channel bound to the provided peer
fn encode_as_channel_data(
relay: SocketAddr,
fn encode_as_channel_data<RId>(
relay: RId,
dest: SocketAddr,
contents: &[u8],
allocations: &mut HashMap<SocketAddr, Allocation>,
allocations: &mut HashMap<RId, Allocation<RId>>,
now: Instant,
) -> Result<Transmit<'static>, EncodeError> {
) -> Result<Transmit<'static>, EncodeError>
where
RId: Copy + Eq + Hash + PartialEq + fmt::Debug,
{
let allocation = allocations
.get_mut(&relay)
.ok_or(EncodeError::NoAllocation)?;
@@ -1012,7 +1049,7 @@ fn encode_as_channel_data(
Ok(Transmit {
src: None,
dst: relay,
dst: allocation.server(),
payload: Cow::Owned(payload),
})
}
@@ -1023,9 +1060,9 @@ enum EncodeError {
NoChannel,
}
fn add_local_candidate_to_all<TId>(
fn add_local_candidate_to_all<TId, RId>(
candidate: Candidate,
connections: &mut Connections<TId>,
connections: &mut Connections<TId, RId>,
pending_events: &mut VecDeque<Event<TId>>,
) where
TId: Copy + fmt::Display,
@@ -1156,7 +1193,7 @@ impl InitialConnection {
}
}
struct Connection {
struct Connection<RId> {
agent: IceAgent,
remote_pub_key: PublicKey,
@@ -1165,7 +1202,7 @@ struct Connection {
next_timer_update: Instant,
// When this is `Some`, we are connected.
peer_socket: Option<PeerSocket>,
peer_socket: Option<PeerSocket<RId>>,
// Socket addresses from which we might receive data (even before we are connected).
possible_sockets: HashSet<SocketAddr>,
@@ -1181,27 +1218,21 @@ struct Connection {
/// The socket of the peer we are connected to.
#[derive(Debug, PartialEq, Clone, Copy)]
enum PeerSocket {
enum PeerSocket<RId> {
Direct {
source: SocketAddr,
dest: SocketAddr,
},
Relay {
relay: SocketAddr,
relay: RId,
dest: SocketAddr,
},
}
impl PeerSocket {
fn our_socket(&self) -> SocketAddr {
match self {
PeerSocket::Direct { source, .. } => *source,
PeerSocket::Relay { relay, .. } => *relay,
}
}
}
impl Connection {
impl<RId> Connection<RId>
where
RId: PartialEq + Eq + Hash + fmt::Debug + Copy,
{
/// Checks if we want to accept a packet from a certain address.
///
/// Whilst we establish connections, we may see traffic from a certain address, prior to the negotiation being fully complete.
@@ -1229,11 +1260,11 @@ impl Connection {
&mut self,
local: SocketAddr,
dest: SocketAddr,
relay_socket: Option<Socket>,
) -> PeerSocket {
relay_socket: Option<Socket<RId>>,
) -> PeerSocket<RId> {
let remote_socket = match relay_socket {
Some(relay_socket) => PeerSocket::Relay {
relay: relay_socket.server(),
relay: relay_socket.id(),
dest,
},
None => PeerSocket::Direct {
@@ -1272,10 +1303,11 @@ impl Connection {
&mut self,
id: TId,
now: Instant,
allocations: &mut HashMap<SocketAddr, Allocation>,
allocations: &mut HashMap<RId, Allocation<RId>>,
transmits: &mut VecDeque<Transmit<'static>>,
) where
TId: fmt::Display + Copy,
RId: Copy + fmt::Display,
{
self.agent.handle_timeout(now);
@@ -1375,7 +1407,7 @@ impl Connection {
tracing::info!(old = ?self.peer_socket, new = ?remote_socket, duration_since_intent = ?self.duration_since_intent(now), "Updating remote socket");
self.peer_socket = Some(remote_socket);
self.invalidate_candiates();
self.invalidate_candiates(allocations);
self.force_handshake(allocations, transmits, now);
}
}
@@ -1416,7 +1448,7 @@ impl Connection {
transmits.push_back(Transmit {
src: None,
dst: *relay,
dst: allocation.server(),
payload: Cow::Owned(channel_data),
});
}
@@ -1445,9 +1477,9 @@ impl Connection {
from: SocketAddr,
local: SocketAddr,
packet: &[u8],
relayed: Option<Socket>,
relayed: Option<Socket<RId>>,
buffer: &'b mut [u8],
allocations: &mut HashMap<SocketAddr, Allocation>,
allocations: &mut HashMap<RId, Allocation<RId>>,
transmits: &mut VecDeque<Transmit<'static>>,
now: Instant,
) -> ControlFlow<Result<(), Error>, MutableIpPacket<'b>> {
@@ -1500,10 +1532,12 @@ impl Connection {
fn force_handshake(
&mut self,
allocations: &mut HashMap<SocketAddr, Allocation>,
allocations: &mut HashMap<RId, Allocation<RId>>,
transmits: &mut VecDeque<Transmit<'static>>,
now: Instant,
) {
) where
RId: Copy,
{
/// [`boringtun`] requires us to pass buffers in where it can construct its packets.
///
/// When updating the timers, the largest packet that we may have to send is `148` bytes as per `HANDSHAKE_INIT_SZ` constant in [`boringtun`].
@@ -1529,12 +1563,17 @@ impl Connection {
/// Each time we nominate a candidate pair, we don't really want to keep all the others active because it creates a lot of noise.
/// At the same time, we want to retain trickle ICE and allow the ICE agent to find a _better_ pair, hence we invalidate by priority.
#[tracing::instrument(level = "debug", skip_all, fields(nominated_prio))]
fn invalidate_candiates(&mut self) {
let Some(socket) = self.peer_socket else {
return;
fn invalidate_candiates(&mut self, allocations: &HashMap<RId, Allocation<RId>>) {
let socket = match self.peer_socket {
Some(PeerSocket::Direct { source, .. }) => source,
Some(PeerSocket::Relay { relay, .. }) => match allocations.get(&relay) {
Some(alloc) => alloc.server(),
None => return,
},
None => return,
};
let Some(nominated) = self.local_candidate(socket.our_socket()).cloned() else {
let Some(nominated) = self.local_candidate(socket).cloned() else {
return;
};
@@ -1562,12 +1601,15 @@ impl Connection {
}
#[must_use]
fn make_owned_transmit(
socket: PeerSocket,
fn make_owned_transmit<RId>(
socket: PeerSocket<RId>,
message: &[u8],
allocations: &mut HashMap<SocketAddr, Allocation>,
allocations: &mut HashMap<RId, Allocation<RId>>,
now: Instant,
) -> Option<Transmit<'static>> {
) -> Option<Transmit<'static>>
where
RId: Copy + Eq + Hash + PartialEq + fmt::Debug,
{
let transmit = match socket {
PeerSocket::Direct {
dest: remote,

View File

@@ -16,6 +16,8 @@ use tracing_subscriber::util::SubscriberInitExt;
#[test]
fn smoke_direct() {
let _guard = setup_tracing();
let firewall = Firewall::default();
let mut clock = Clock::new();
let (alice, bob) = alice_and_bob();
@@ -23,10 +25,8 @@ fn smoke_direct() {
TestNode::new(info_span!("Alice"), alice, "1.1.1.1:80").with_primary_as_host_candidate();
let mut bob =
TestNode::new(info_span!("Bob"), bob, "1.1.1.2:80").with_primary_as_host_candidate();
let firewall = Firewall::default();
let mut clock = Clock::new();
handshake(&mut alice, &mut bob, &[], &clock);
handshake(&mut alice, &mut bob, &clock);
loop {
if alice.is_connected_to(&bob) && bob.is_connected_to(&alice) {
@@ -40,20 +40,23 @@ fn smoke_direct() {
#[test]
fn smoke_relayed() {
let _guard = setup_tracing();
let (alice, bob) = alice_and_bob();
let relay = TestRelay::new(IpAddr::V4(Ipv4Addr::LOCALHOST), debug_span!("Roger"));
let mut alice = TestNode::new(debug_span!("Alice"), alice, "1.1.1.1:80");
let mut bob = TestNode::new(debug_span!("Bob"), bob, "2.2.2.2:80");
let mut clock = Clock::new();
let firewall = Firewall::default()
.with_block_rule("1.1.1.1:80", "2.2.2.2:80")
.with_block_rule("2.2.2.2:80", "1.1.1.1:80");
let mut clock = Clock::new();
let mut relays = [relay];
let (alice, bob) = alice_and_bob();
handshake(&mut alice, &mut bob, &relays, &clock);
let mut relays = [TestRelay::new(
IpAddr::V4(Ipv4Addr::LOCALHOST),
debug_span!("Roger"),
)];
let mut alice = TestNode::new(debug_span!("Alice"), alice, "1.1.1.1:80")
.with_relays(&mut relays, clock.now);
let mut bob =
TestNode::new(debug_span!("Bob"), bob, "2.2.2.2:80").with_relays(&mut relays, clock.now);
handshake(&mut alice, &mut bob, &clock);
loop {
if alice.is_connected_to(&bob) && bob.is_connected_to(&alice) {
@@ -67,18 +70,21 @@ fn smoke_relayed() {
#[test]
fn reconnect_discovers_new_interface() {
let _guard = setup_tracing();
let mut clock = Clock::new();
let firewall = Firewall::default();
let (alice, bob) = alice_and_bob();
let relay = TestRelay::new(IpAddr::V4(Ipv4Addr::LOCALHOST), debug_span!("Roger"));
let mut alice = TestNode::new(debug_span!("Alice"), alice, "1.1.1.1:80");
let mut bob = TestNode::new(debug_span!("Bob"), bob, "2.2.2.2:80");
let firewall = Firewall::default();
let mut clock = Clock::new();
let mut relays = [TestRelay::new(
IpAddr::V4(Ipv4Addr::LOCALHOST),
debug_span!("Roger"),
)];
let mut alice = TestNode::new(debug_span!("Alice"), alice, "1.1.1.1:80")
.with_relays(&mut relays, clock.now);
let mut bob =
TestNode::new(debug_span!("Bob"), bob, "2.2.2.2:80").with_relays(&mut relays, clock.now);
let mut relays = [relay];
handshake(&mut alice, &mut bob, &relays, &clock);
handshake(&mut alice, &mut bob, &clock);
loop {
if alice.is_connected_to(&bob) && bob.is_connected_to(&alice) {
@@ -177,11 +183,10 @@ fn answer_after_stale_connection_does_not_panic() {
fn only_generate_candidate_event_after_answer() {
let local_candidate = SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), 10000);
let mut alice = ClientNode::<u64>::new(StaticSecret::random_from_rng(rand::thread_rng()));
let mut alice = ClientNode::<u64, u64>::new(StaticSecret::random_from_rng(rand::thread_rng()));
alice.add_local_host_candidate(local_candidate).unwrap();
let mut bob = ServerNode::<u64>::new(StaticSecret::random_from_rng(rand::thread_rng()));
let mut bob = ServerNode::<u64, u64>::new(StaticSecret::random_from_rng(rand::thread_rng()));
let offer = alice.new_connection(
1,
@@ -219,12 +224,11 @@ fn only_generate_candidate_event_after_answer() {
#[test]
fn second_connection_with_same_relay_reuses_allocation() {
let mut alice = ClientNode::<u64>::new(StaticSecret::random_from_rng(rand::thread_rng()));
let _ = alice.new_connection(
let mut alice = ClientNode::new(StaticSecret::random_from_rng(rand::thread_rng()));
_ = alice.new_connection(
1,
HashSet::new(),
HashSet::from([relay("user1", "pass1", "realm1")]),
HashSet::from([relay(1, "user1", "pass1", "realm1")]),
Instant::now(),
Instant::now(),
);
@@ -236,7 +240,7 @@ fn second_connection_with_same_relay_reuses_allocation() {
let _ = alice.new_connection(
2,
HashSet::new(),
HashSet::from([relay("user1", "pass1", "realm1")]),
HashSet::from([relay(1, "user1", "pass1", "realm1")]),
Instant::now(),
Instant::now(),
);
@@ -252,14 +256,18 @@ fn setup_tracing() -> tracing::subscriber::DefaultGuard {
.set_default()
}
fn alice_and_bob() -> (ClientNode<u64>, ServerNode<u64>) {
let alice = ClientNode::<u64>::new(StaticSecret::random_from_rng(rand::thread_rng()));
let bob = ServerNode::<u64>::new(StaticSecret::random_from_rng(rand::thread_rng()));
fn alice_and_bob() -> (ClientNode<u64, u64>, ServerNode<u64, u64>) {
let alice = ClientNode::new(StaticSecret::random_from_rng(rand::thread_rng()));
let bob = ServerNode::new(StaticSecret::random_from_rng(rand::thread_rng()));
(alice, bob)
}
fn send_offer(alice: &mut ClientNode<u64>, bob: &mut ServerNode<u64>, now: Instant) -> Answer {
fn send_offer(
alice: &mut ClientNode<u64, u64>,
bob: &mut ServerNode<u64, u64>,
now: Instant,
) -> Answer {
let offer = alice.new_connection(1, HashSet::new(), HashSet::new(), Instant::now(), now);
bob.accept_connection(
@@ -272,8 +280,14 @@ fn send_offer(alice: &mut ClientNode<u64>, bob: &mut ServerNode<u64>, now: Insta
)
}
fn relay(username: &str, pass: &str, realm: &str) -> (SocketAddr, String, String, String) {
fn relay(
id: u64,
username: &str,
pass: &str,
realm: &str,
) -> (u64, SocketAddr, String, String, String) {
(
id,
RELAY,
username.to_owned(),
pass.to_owned(),
@@ -520,18 +534,18 @@ impl TestRelay {
}
enum EitherNode {
Server(ServerNode<u64>),
Client(ClientNode<u64>),
Server(ServerNode<u64, u64>),
Client(ClientNode<u64, u64>),
}
impl From<ClientNode<u64>> for EitherNode {
fn from(value: ClientNode<u64>) -> Self {
impl From<ClientNode<u64, u64>> for EitherNode {
fn from(value: ClientNode<u64, u64>) -> Self {
Self::Client(value)
}
}
impl From<ServerNode<u64>> for EitherNode {
fn from(value: ServerNode<u64>) -> Self {
impl From<ServerNode<u64, u64>> for EitherNode {
fn from(value: ServerNode<u64, u64>) -> Self {
Self::Server(value)
}
}
@@ -586,14 +600,14 @@ impl EitherNode {
}
}
fn as_client_mut(&mut self) -> Option<&mut ClientNode<u64>> {
fn as_client_mut(&mut self) -> Option<&mut ClientNode<u64, u64>> {
match self {
EitherNode::Server(_) => None,
EitherNode::Client(c) => Some(c),
}
}
fn as_server_mut(&mut self) -> Option<&mut ServerNode<u64>> {
fn as_server_mut(&mut self) -> Option<&mut ServerNode<u64, u64>> {
match self {
EitherNode::Server(s) => Some(s),
EitherNode::Client(_) => None,
@@ -644,6 +658,36 @@ impl TestNode {
}
}
fn with_relays(mut self, relays: &mut [TestRelay], now: Instant) -> Self {
let username = match self.node {
EitherNode::Server(_) => "server",
EitherNode::Client(_) => "client",
};
let turn_servers = relays
.iter()
.enumerate()
.map(|(idx, relay)| {
let (username, password) = relay.make_credentials(username);
(
idx as u64,
relay.listen_addr,
username,
password,
"firezone".to_owned(),
)
})
.collect::<HashSet<_>>();
match &mut self.node {
EitherNode::Server(s) => s.upsert_turn_servers(&turn_servers, now),
EitherNode::Client(c) => c.upsert_turn_servers(&turn_servers, now),
}
self
}
fn switch_network(&mut self, new_primary: &str) {
self.primary = new_primary.parse().unwrap();
self.local.push(self.primary);
@@ -742,26 +786,15 @@ impl TestNode {
}
}
fn handshake(client: &mut TestNode, server: &mut TestNode, relays: &[TestRelay], clock: &Clock) {
fn handshake(client: &mut TestNode, server: &mut TestNode, clock: &Clock) {
let client_node = &mut client.node.as_client_mut().unwrap();
let server_node = &mut server.node.as_server_mut().unwrap();
let client_credentials = relays.iter().map(|relay| {
let (username, password) = relay.make_credentials("client");
(relay.listen_addr, username, password, "firezone".to_owned())
});
let server_credentials = relays.iter().map(|relay| {
let (username, password) = relay.make_credentials("client");
(relay.listen_addr, username, password, "firezone".to_owned())
});
let offer = client.span.in_scope(|| {
client_node.new_connection(
1,
HashSet::default(),
HashSet::from_iter(client_credentials),
HashSet::default(),
clock.now,
clock.now,
)
@@ -772,7 +805,7 @@ fn handshake(client: &mut TestNode, server: &mut TestNode, relays: &[TestRelay],
offer,
client_node.public_key(),
HashSet::default(),
HashSet::from_iter(server_credentials),
HashSet::default(),
clock.now,
)
});

View File

@@ -6,7 +6,7 @@ use bimap::BiMap;
use connlib_shared::error::{ConnlibError as Error, ConnlibError};
use connlib_shared::messages::{
Answer, ClientPayload, DnsServer, DomainResponse, GatewayId, Interface as InterfaceConfig,
IpDnsServer, Key, Offer, Relay, RequestConnection, ResourceDescription,
IpDnsServer, Key, Offer, Relay, RelayId, RequestConnection, ResourceDescription,
ResourceDescriptionCidr, ResourceDescriptionDns, ResourceId, ReuseConnection,
};
use connlib_shared::{Callbacks, Dname, PublicKey, StaticSecret};
@@ -73,6 +73,13 @@ where
Ok(())
}
pub fn upsert_relays(&mut self, relays: Vec<Relay>) {
self.role_state.upsert_relays(
turn(&relays, |addr| self.io.sockets_ref().can_handle(addr)),
Instant::now(),
)
}
/// Adds a the given resource to the tunnel.
pub fn add_resources(
&mut self,
@@ -250,7 +257,7 @@ pub struct ClientState {
pub peers: PeerStore<GatewayId, PacketTransformClient, HashSet<ResourceId>>,
node: ClientNode<GatewayId>,
node: ClientNode<GatewayId, RelayId>,
pub ip_provider: IpProvider,
@@ -425,7 +432,7 @@ impl ClientState {
resource_id: ResourceId,
gateway_id: GatewayId,
allowed_stun_servers: HashSet<SocketAddr>,
allowed_turn_servers: HashSet<(SocketAddr, String, String, String)>,
allowed_turn_servers: HashSet<(RelayId, SocketAddr, String, String, String)>,
) -> connlib_shared::Result<Request> {
tracing::trace!("create_or_reuse_connection");
@@ -985,6 +992,14 @@ impl ClientState {
true
}
fn upsert_relays(
&mut self,
relays: HashSet<(RelayId, SocketAddr, String, String, String)>,
now: Instant,
) {
self.node.upsert_turn_servers(&relays, now);
}
}
fn effective_dns_servers(

View File

@@ -7,7 +7,7 @@ use boringtun::x25519::PublicKey;
use chrono::{DateTime, Utc};
use connlib_shared::messages::{
Answer, ClientId, ConnectionAccepted, DomainResponse, Interface as InterfaceConfig, Key, Offer,
Relay, ResolvedResourceDescriptionDns, ResourceDescription, ResourceId,
Relay, RelayId, ResolvedResourceDescriptionDns, ResourceDescription, ResourceId,
};
use connlib_shared::{Callbacks, Dname, Error, Result, StaticSecret};
use ip_network::IpNetwork;
@@ -189,8 +189,7 @@ where
pub struct GatewayState {
pub peers: PeerStore<ClientId, PacketTransformGateway, ()>,
node: ServerNode<ClientId>,
node: ServerNode<ClientId, RelayId>,
next_expiry_resources_check: Option<Instant>,
buffered_events: VecDeque<GatewayEvent>,
}
@@ -309,4 +308,12 @@ impl GatewayState {
pub(crate) fn poll_event(&mut self) -> Option<GatewayEvent> {
self.buffered_events.pop_front()
}
pub(crate) fn upsert_relays(
&mut self,
relays: HashSet<(RelayId, SocketAddr, String, String, String)>,
now: Instant,
) {
self.node.upsert_turn_servers(&relays, now);
}
}

View File

@@ -5,7 +5,7 @@
use boringtun::x25519::StaticSecret;
use connlib_shared::{
messages::{ClientId, GatewayId, ResourceId, ReuseConnection},
messages::{ClientId, GatewayId, Relay, ResourceId, ReuseConnection},
Callbacks, Result,
};
use io::Io;
@@ -18,6 +18,7 @@ use std::{
pub use client::{ClientState, Request};
pub use gateway::GatewayState;
pub use sockets::Sockets;
use utils::turn;
mod client;
mod device_channel;
@@ -171,6 +172,13 @@ where
})
}
pub fn upsert_relays(&mut self, relays: Vec<Relay>) {
self.role_state.upsert_relays(
turn(&relays, |addr| self.io.sockets_ref().can_handle(addr)),
Instant::now(),
)
}
pub fn poll_next_event(&mut self, cx: &mut Context<'_>) -> Poll<Result<GatewayEvent>> {
loop {
if let Some(other) = self.role_state.poll_event() {

View File

@@ -1,5 +1,5 @@
use crate::REALM;
use connlib_shared::messages::Relay;
use connlib_shared::messages::{Relay, RelayId};
use std::{collections::HashSet, net::SocketAddr, time::Instant};
pub fn stun(relays: &[Relay], predicate: impl Fn(&SocketAddr) -> bool) -> HashSet<SocketAddr> {
@@ -19,12 +19,13 @@ pub fn stun(relays: &[Relay], predicate: impl Fn(&SocketAddr) -> bool) -> HashSe
pub fn turn(
relays: &[Relay],
predicate: impl Fn(&SocketAddr) -> bool,
) -> HashSet<(SocketAddr, String, String, String)> {
) -> HashSet<(RelayId, SocketAddr, String, String, String)> {
relays
.iter()
.filter_map(|r| {
if let Relay::Turn(r) = r {
Some((
r.id,
r.addr,
r.username.clone(),
r.password.clone(),
@@ -34,7 +35,7 @@ pub fn turn(
None
}
})
.filter(|(socket, _, _, _)| predicate(socket))
.filter(|(_, socket, _, _, _)| predicate(socket))
.collect()
}

View File

@@ -110,6 +110,7 @@ async fn run(login: LoginUrl, private_key: StaticSecret) -> Result<Infallible> {
tunnel
.set_interface(&init.interface)
.context("Failed to set interface")?;
tunnel.upsert_relays(init.relays);
let mut eventloop = Eventloop::new(tunnel, portal);

View File

@@ -13,6 +13,8 @@ use serde::{Deserialize, Serialize};
pub struct InitGateway {
pub interface: Interface,
pub config: Config,
#[serde(default)]
pub relays: Vec<Relay>,
}
#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
@@ -149,10 +151,12 @@ mod test {
},
"relays": [
{
"id": "0bfc5e02-a093-423b-827b-002d7d2bb407",
"type": "stun",
"addr": "172.28.0.101:3478"
},
{
"id": "0a133356-7a9e-4b9a-b413-0d95a5720fd8",
"type": "turn",
"username": "1719367575:ZQHcVGkdnfgGmcP1",
"password": "ZWYiBeFHOJyYq0mcwAXjRpcuXIJJpzWlOXVdxwttrWg",
@@ -178,6 +182,7 @@ mod test {
ipv4_masquerade_enabled: true,
ipv6_masquerade_enabled: true,
},
relays: vec![],
});
let message = r#"{"event":"init","ref":null,"topic":"gateway","payload":{"interface":{"ipv6":"fd00:2021:1111::2c:f6ab","ipv4":"100.115.164.78"},"config":{"ipv4_masquerade_enabled":true,"ipv6_masquerade_enabled":true}}}"#;
@@ -197,6 +202,7 @@ mod test {
ipv4_masquerade_enabled: true,
ipv6_masquerade_enabled: true,
},
relays: vec![],
});
let message = r#"{"event":"init","ref":null,"topic":"gateway","irrelevant":"field","payload":{"more":"info","interface":{"ipv6":"fd00:2021:1111::2c:f6ab","ipv4":"100.115.164.78"},"config":{"ipv4_masquerade_enabled":true,"ipv6_masquerade_enabled":true,"ignored":"field"}}}"#;
@@ -216,6 +222,7 @@ mod test {
ipv4_masquerade_enabled: true,
ipv6_masquerade_enabled: true,
},
relays: vec![],
});
let message = r#"{"event":"init","ref":null,"topic":"gateway","payload":{"additional":null,"interface":{"ipv6":"fd00:2021:1111::2c:f6ab","ipv4":"100.115.164.78"},"config":{"ipv4_masquerade_enabled":true,"ipv6_masquerade_enabled":true}}}"#;
@@ -235,6 +242,7 @@ mod test {
ipv4_masquerade_enabled: true,
ipv6_masquerade_enabled: true,
},
relays: vec![],
});
let message = r#"{"event":"init","ref":null,"topic":"gateway","payload":{"additional":0.3,"interface":{"ipv6":"fd00:2021:1111::2c:f6ab","ipv4":"100.115.164.78"},"config":{"ipv4_masquerade_enabled":true,"ipv6_masquerade_enabled":true}}}"#;
@@ -254,6 +262,7 @@ mod test {
ipv4_masquerade_enabled: true,
ipv6_masquerade_enabled: true,
},
relays: vec![],
});
let message = r#"{"event":"init","ref":null,"topic":"gateway","payload":{"additional":true,"interface":{"ipv6":"fd00:2021:1111::2c:f6ab","ipv4":"100.115.164.78"},"config":{"ipv4_masquerade_enabled":true,"ipv6_masquerade_enabled":true}}}"#;
@@ -273,6 +282,7 @@ mod test {
ipv4_masquerade_enabled: true,
ipv6_masquerade_enabled: true,
},
relays: vec![],
});
let message = r#"{"event":"init","ref":null,"topic":"gateway","payload":{"additional":{"ignored":"field"},"interface":{"ipv6":"fd00:2021:1111::2c:f6ab","ipv4":"100.115.164.78"},"config":{"ipv4_masquerade_enabled":true,"ipv6_masquerade_enabled":true}}}"#;
@@ -292,6 +302,7 @@ mod test {
ipv4_masquerade_enabled: true,
ipv6_masquerade_enabled: true,
},
relays: vec![],
});
let message = r#"{"event":"init","ref":null,"topic":"gateway","payload":{"additional":[true,false],"interface":{"ipv6":"fd00:2021:1111::2c:f6ab","ipv4":"100.115.164.78"},"config":{"ipv4_masquerade_enabled":true,"ipv6_masquerade_enabled":true}}}"#;

View File

@@ -52,6 +52,7 @@ async fn main() -> Result<()> {
.context("Failed to parse `TURNERVER`")?
.map(|ip| {
(
1,
SocketAddr::new(ip, 3478),
"2000000000:client".to_owned(), // TODO: Use different credentials per role.
"+Qou8TSjw9q3JMnWET7MbFsQh/agwz/LURhpfX7a0hE".to_owned(),
@@ -76,7 +77,7 @@ async fn main() -> Result<()> {
match role {
Role::Dialer => {
let mut pool = ClientNode::<u64>::new(private_key);
let mut pool = ClientNode::<u64, u64>::new(private_key);
let offer = pool.new_connection(
1,
@@ -161,7 +162,7 @@ async fn main() -> Result<()> {
}
}
Role::Listener => {
let mut pool = ServerNode::<u64>::new(private_key);
let mut pool = ServerNode::<u64, u64>::new(private_key);
let offer = redis_connection
.blpop::<_, (String, wire::Offer)>("offers", 10.0)
@@ -334,7 +335,7 @@ impl FromStr for Role {
struct Eventloop<T> {
socket: UdpSocket,
pool: Node<T, u64>,
pool: Node<T, u64, u64>,
timeout: BoxFuture<'static, Instant>,
candidate_rx: mpsc::Receiver<wire::Candidate>,
read_buffer: Box<[u8; MAX_UDP_SIZE]>,
@@ -344,7 +345,7 @@ struct Eventloop<T> {
impl<T> Eventloop<T> {
fn new(
socket: UdpSocket,
pool: Node<T, u64>,
pool: Node<T, u64, u64>,
candidate_rx: mpsc::Receiver<wire::Candidate>,
) -> Self {
Self {