mirror of
https://github.com/outbackdingo/firezone.git
synced 2026-01-27 10:18:54 +00:00
feat(connlib): smoothly migrate relayed connections (#4568)
Whenever we receive a `relays_presence` message from the portal, we invalidate the candidates of all now disconnected relays and make allocations on the new ones. This triggers signalling of new candidates to the remote party and migrates the connection to the newly nominated socket. This still relies on #4613 until we have #4634. Resolves: #4548. --------- Co-authored-by: Jamil <jamilbk@users.noreply.github.com>
This commit is contained in:
10
.github/workflows/_integration_tests.yml
vendored
10
.github/workflows/_integration_tests.yml
vendored
@@ -128,7 +128,8 @@ jobs:
|
||||
# Start one-by-one to avoid variability in service startup order
|
||||
docker compose up -d dns.httpbin httpbin download.httpbin
|
||||
docker compose up -d api web domain --no-build
|
||||
docker compose up -d relay --no-build
|
||||
docker compose up -d relay-1 --no-build
|
||||
docker compose up -d relay-2 --no-build
|
||||
docker compose up -d gateway --no-build
|
||||
docker compose up -d client --no-build
|
||||
|
||||
@@ -137,9 +138,12 @@ jobs:
|
||||
- name: Show Client logs
|
||||
if: "!cancelled()"
|
||||
run: docker compose logs client
|
||||
- name: Show Relay logs
|
||||
- name: Show Relay-1 logs
|
||||
if: "!cancelled()"
|
||||
run: docker compose logs relay
|
||||
run: docker compose logs relay-1
|
||||
- name: Show Relay-2 logs
|
||||
if: "!cancelled()"
|
||||
run: docker compose logs relay-2
|
||||
- name: Show Gateway logs
|
||||
if: "!cancelled()"
|
||||
run: docker compose logs gateway
|
||||
|
||||
9
.github/workflows/ci.yml
vendored
9
.github/workflows/ci.yml
vendored
@@ -223,7 +223,7 @@ jobs:
|
||||
# Start services in the same order each time for the tests
|
||||
docker compose up -d iperf3
|
||||
docker compose up -d api web domain --no-build
|
||||
docker compose up -d relay --no-build
|
||||
docker compose up -d relay-1 relay-2 --no-build
|
||||
docker compose up -d gateway --no-build
|
||||
docker compose up -d client --no-build
|
||||
- name: 'Performance test: ${{ matrix.test_name }}'
|
||||
@@ -243,9 +243,12 @@ jobs:
|
||||
- name: Show Client UDP stats
|
||||
if: "!cancelled()"
|
||||
run: docker compose exec client cat /proc/net/udp
|
||||
- name: Show Relay logs
|
||||
- name: Show Relay-1 logs
|
||||
if: "!cancelled()"
|
||||
run: docker compose logs relay
|
||||
run: docker compose logs relay-1
|
||||
- name: Show Relay-2 logs
|
||||
if: "!cancelled()"
|
||||
run: docker compose logs relay-2
|
||||
- name: Show Gateway logs
|
||||
if: "!cancelled()"
|
||||
run: docker compose logs gateway
|
||||
|
||||
@@ -389,9 +389,9 @@ services:
|
||||
resources:
|
||||
ipv4_address: 172.20.0.110
|
||||
|
||||
relay:
|
||||
relay-1:
|
||||
environment:
|
||||
PUBLIC_IP4_ADDR: ${PUBLIC_IP4_ADDR:-172.28.0.101}
|
||||
PUBLIC_IP4_ADDR: ${RELAY_1_PUBLIC_IP4_ADDR:-172.28.0.101}
|
||||
# PUBLIC_IP6_ADDR: fcff:3990:3990::101
|
||||
LOWEST_PORT: 55555
|
||||
HIGHEST_PORT: 55666
|
||||
@@ -429,7 +429,40 @@ services:
|
||||
- 3478:3478/udp
|
||||
networks:
|
||||
app:
|
||||
ipv4_address: ${PUBLIC_IP4_ADDR:-172.28.0.101}
|
||||
ipv4_address: ${RELAY_1_PUBLIC_IP4_ADDR:-172.28.0.101}
|
||||
|
||||
relay-2:
|
||||
environment:
|
||||
PUBLIC_IP4_ADDR: ${RELAY_2_PUBLIC_IP4_ADDR:-172.28.0.201}
|
||||
# PUBLIC_IP6_ADDR: fcff:3990:3990::101
|
||||
# Token for self-hosted Relay
|
||||
# FIREZONE_TOKEN: ".SFMyNTY.g2gDaANtAAAAJGM4OWJjYzhjLTkzOTItNGRhZS1hNDBkLTg4OGFlZjZkMjhlMG0AAAAkNTQ5YzQxMDctMTQ5Mi00ZjhmLWE0ZWMtYTlkMmE2NmQ4YWE5bQAAADhQVTVBSVRFMU84VkRWTk1ITU9BQzc3RElLTU9HVERJQTY3MlM2RzFBQjAyT1MzNEg1TUUwPT09PW4GAEngLBONAWIAAVGA.E-f2MFdGMX7JTL2jwoHBdWcUd2G3UNz2JRZLbQrlf0k"
|
||||
# Token for global Relay
|
||||
FIREZONE_TOKEN: ".SFMyNTY.g2gDaAN3A25pbG0AAAAkZTgyZmNkYzEtMDU3YS00MDE1LWI5MGItM2IxOGYwZjI4MDUzbQAAADhDMTROR0E4N0VKUlIwM0c0UVBSMDdBOUM2Rzc4NFRTU1RIU0Y0VEk1VDBHRDhENkwwVlJHPT09PW4GADXgLBONAWIAAVGA.dShU17FgnvO2GLcTSnBBTDoqQ2tScuG7qjiyKhhlq8s"
|
||||
RUST_LOG: "debug"
|
||||
RUST_BACKTRACE: 1
|
||||
FIREZONE_API_URL: ws://api:8081
|
||||
build:
|
||||
target: dev
|
||||
context: rust
|
||||
dockerfile: Dockerfile
|
||||
cache_from:
|
||||
- type=registry,ref=us-east1-docker.pkg.dev/firezone-staging/cache/relay:main
|
||||
args:
|
||||
PACKAGE: firezone-relay
|
||||
image: ${RELAY_IMAGE:-us-east1-docker.pkg.dev/firezone-staging/firezone/dev/relay}:${RELAY_TAG:-main}
|
||||
healthcheck:
|
||||
test: ["CMD-SHELL", "lsof -i UDP | grep firezone-relay"]
|
||||
start_period: 10s
|
||||
interval: 30s
|
||||
retries: 5
|
||||
timeout: 5s
|
||||
depends_on:
|
||||
api:
|
||||
condition: "service_healthy"
|
||||
networks:
|
||||
app:
|
||||
ipv4_address: ${RELAY_2_PUBLIC_IP4_ADDR:-172.28.0.201}
|
||||
|
||||
|
||||
# IPv6 is currently causing flakiness with GH actions and on our testbed.
|
||||
|
||||
2
rust/Cargo.lock
generated
2
rust/Cargo.lock
generated
@@ -3388,7 +3388,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0c2a198fb6b0eada2a8df47933734e6d35d350665a33a3593d7164fa52c75c19"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"windows-targets 0.48.5",
|
||||
"windows-targets 0.52.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -7,13 +7,13 @@ use crate::{
|
||||
};
|
||||
use anyhow::Result;
|
||||
use connlib_shared::{
|
||||
messages::{ConnectionAccepted, GatewayResponse, ResourceAccepted, ResourceId},
|
||||
messages::{ConnectionAccepted, GatewayResponse, RelaysPresence, ResourceAccepted, ResourceId},
|
||||
Callbacks,
|
||||
};
|
||||
use firezone_tunnel::ClientTunnel;
|
||||
use phoenix_channel::{ErrorReply, OutboundRequestId, PhoenixChannel};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
collections::{HashMap, HashSet},
|
||||
net::IpAddr,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
@@ -202,7 +202,7 @@ where
|
||||
|
||||
tracing::info!("Firezone Started!");
|
||||
let _ = self.tunnel.set_resources(resources);
|
||||
self.tunnel.upsert_relays(relays)
|
||||
self.tunnel.update_relays(HashSet::default(), relays)
|
||||
}
|
||||
IngressMessages::ResourceCreatedOrUpdated(resource) => {
|
||||
let resource_id = resource.id();
|
||||
@@ -214,6 +214,12 @@ where
|
||||
IngressMessages::ResourceDeleted(resource) => {
|
||||
self.tunnel.remove_resources(&[resource]);
|
||||
}
|
||||
IngressMessages::RelaysPresence(RelaysPresence {
|
||||
disconnected_ids,
|
||||
connected,
|
||||
}) => self
|
||||
.tunnel
|
||||
.update_relays(HashSet::from_iter(disconnected_ids), connected),
|
||||
IngressMessages::InvalidateIceCandidates(GatewayIceCandidates {
|
||||
gateway_id,
|
||||
candidates,
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use connlib_shared::messages::{
|
||||
GatewayId, GatewayResponse, Interface, Key, Relay, RequestConnection, ResourceDescription,
|
||||
ResourceId, ReuseConnection,
|
||||
GatewayId, GatewayResponse, Interface, Key, Relay, RelaysPresence, RequestConnection,
|
||||
ResourceDescription, ResourceId, ReuseConnection,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{collections::HashSet, net::IpAddr};
|
||||
@@ -50,6 +50,8 @@ pub enum IngressMessages {
|
||||
InvalidateIceCandidates(GatewayIceCandidates),
|
||||
|
||||
ConfigChanged(ConfigUpdate),
|
||||
|
||||
RelaysPresence(RelaysPresence),
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Serialize, Clone, PartialEq)]
|
||||
@@ -575,4 +577,48 @@ mod test {
|
||||
let reply_message = serde_json::from_str(message).unwrap();
|
||||
assert_eq!(m, reply_message);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn relays_presence() {
|
||||
let message = r#"
|
||||
{
|
||||
"event": "relays_presence",
|
||||
"ref": null,
|
||||
"topic": "client",
|
||||
"payload": {
|
||||
"disconnected_ids": [
|
||||
"e95f9517-2152-4677-a16a-fbb2687050a3",
|
||||
"b0724bd1-a8cc-4faf-88cd-f21159cfec47"
|
||||
],
|
||||
"connected": [
|
||||
{
|
||||
"id": "0a133356-7a9e-4b9a-b413-0d95a5720fd8",
|
||||
"type": "turn",
|
||||
"username": "1719367575:ZQHcVGkdnfgGmcP1",
|
||||
"password": "ZWYiBeFHOJyYq0mcwAXjRpcuXIJJpzWlOXVdxwttrWg",
|
||||
"addr": "172.28.0.101:3478",
|
||||
"expires_at": 1719367575
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
"#;
|
||||
let expected = IngressMessages::RelaysPresence(RelaysPresence {
|
||||
disconnected_ids: vec![
|
||||
"e95f9517-2152-4677-a16a-fbb2687050a3".parse().unwrap(),
|
||||
"b0724bd1-a8cc-4faf-88cd-f21159cfec47".parse().unwrap(),
|
||||
],
|
||||
connected: vec![Relay::Turn(Turn {
|
||||
id: "0a133356-7a9e-4b9a-b413-0d95a5720fd8".parse().unwrap(),
|
||||
expires_at: DateTime::from_timestamp(1719367575, 0).unwrap(),
|
||||
addr: "172.28.0.101:3478".parse().unwrap(),
|
||||
username: "1719367575:ZQHcVGkdnfgGmcP1".to_owned(),
|
||||
password: "ZWYiBeFHOJyYq0mcwAXjRpcuXIJJpzWlOXVdxwttrWg".to_owned(),
|
||||
})],
|
||||
});
|
||||
|
||||
let ingress_message = serde_json::from_str::<IngressMessages>(message).unwrap();
|
||||
|
||||
assert_eq!(ingress_message, expected);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -413,6 +413,15 @@ pub struct Stun {
|
||||
pub addr: SocketAddr,
|
||||
}
|
||||
|
||||
/// A update to the presence of several relays.
|
||||
#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
|
||||
pub struct RelaysPresence {
|
||||
/// These relays have disconnected from the portal. We need to stop using them.
|
||||
pub disconnected_ids: Vec<RelayId>,
|
||||
/// These relays are still online. We can/should use these.
|
||||
pub connected: Vec<Relay>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::str::FromStr;
|
||||
|
||||
@@ -469,12 +469,32 @@ where
|
||||
self.buffered_transmits.pop_front()
|
||||
}
|
||||
|
||||
pub fn upsert_turn_servers(
|
||||
pub fn update_relays(
|
||||
&mut self,
|
||||
servers: &HashSet<(RId, SocketAddr, String, String, String)>,
|
||||
to_remove: HashSet<RId>,
|
||||
to_add: &HashSet<(RId, SocketAddr, String, String, String)>,
|
||||
now: Instant,
|
||||
) {
|
||||
for (id, server, username, password, realm) in servers {
|
||||
// First, invalidate all candidates from relays that we should stop using.
|
||||
for id in to_remove {
|
||||
let Some(allocation) = self.allocations.remove(&id) else {
|
||||
continue;
|
||||
};
|
||||
|
||||
for (id, agent) in self.connections.agents_mut() {
|
||||
let _span = info_span!("connection", %id).entered();
|
||||
|
||||
for candidate in allocation
|
||||
.current_candidates()
|
||||
.filter(|c| c.kind() == CandidateKind::Relayed)
|
||||
{
|
||||
agent.invalidate_candidate(&candidate);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Second, upsert all new relays.
|
||||
for (id, server, username, password, realm) in to_add {
|
||||
let Ok(username) = Username::new(username.to_owned()) else {
|
||||
tracing::debug!(%username, "Invalid TURN username");
|
||||
continue;
|
||||
@@ -494,7 +514,7 @@ where
|
||||
Allocation::new(*server, username, password.clone(), realm, now),
|
||||
);
|
||||
|
||||
tracing::info!(address = %server, "Added new TURN server");
|
||||
tracing::info!(%id, address = %server, "Added new TURN server");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -782,7 +802,7 @@ where
|
||||
};
|
||||
|
||||
self.upsert_stun_servers(&stun_servers, now);
|
||||
self.upsert_turn_servers(&turn_servers, now);
|
||||
self.update_relays(HashSet::default(), &turn_servers, now);
|
||||
|
||||
let mut agent = IceAgent::new();
|
||||
agent.set_controlling(true);
|
||||
@@ -884,7 +904,7 @@ where
|
||||
};
|
||||
|
||||
self.upsert_stun_servers(&stun_servers, now);
|
||||
self.upsert_turn_servers(&turn_servers, now);
|
||||
self.update_relays(HashSet::default(), &turn_servers, now);
|
||||
|
||||
let mut agent = IceAgent::new();
|
||||
agent.set_controlling(false);
|
||||
@@ -1723,6 +1743,7 @@ where
|
||||
self.agent
|
||||
.local_candidates()
|
||||
.iter()
|
||||
.filter(|c| !c.discarded())
|
||||
.find(|c| c.addr() == source)
|
||||
}
|
||||
|
||||
|
||||
@@ -57,10 +57,16 @@ fn smoke_relayed() {
|
||||
1,
|
||||
TestRelay::new(IpAddr::V4(Ipv4Addr::LOCALHOST), debug_span!("Roger")),
|
||||
)];
|
||||
let mut alice = TestNode::new(debug_span!("Alice"), alice, "1.1.1.1:80")
|
||||
.with_relays(&mut relays, clock.now);
|
||||
let mut bob =
|
||||
TestNode::new(debug_span!("Bob"), bob, "2.2.2.2:80").with_relays(&mut relays, clock.now);
|
||||
let mut alice = TestNode::new(debug_span!("Alice"), alice, "1.1.1.1:80").with_relays(
|
||||
HashSet::default(),
|
||||
&mut relays,
|
||||
clock.now,
|
||||
);
|
||||
let mut bob = TestNode::new(debug_span!("Bob"), bob, "2.2.2.2:80").with_relays(
|
||||
HashSet::default(),
|
||||
&mut relays,
|
||||
clock.now,
|
||||
);
|
||||
let firewall = Firewall::default()
|
||||
.with_block_rule(&alice, &bob)
|
||||
.with_block_rule(&bob, &alice);
|
||||
@@ -96,10 +102,16 @@ fn reconnect_discovers_new_interface() {
|
||||
1,
|
||||
TestRelay::new(IpAddr::V4(Ipv4Addr::LOCALHOST), debug_span!("Roger")),
|
||||
)];
|
||||
let mut alice = TestNode::new(debug_span!("Alice"), alice, "1.1.1.1:80")
|
||||
.with_relays(&mut relays, clock.now);
|
||||
let mut bob =
|
||||
TestNode::new(debug_span!("Bob"), bob, "2.2.2.2:80").with_relays(&mut relays, clock.now);
|
||||
let mut alice = TestNode::new(debug_span!("Alice"), alice, "1.1.1.1:80").with_relays(
|
||||
HashSet::default(),
|
||||
&mut relays,
|
||||
clock.now,
|
||||
);
|
||||
let mut bob = TestNode::new(debug_span!("Bob"), bob, "2.2.2.2:80").with_relays(
|
||||
HashSet::default(),
|
||||
&mut relays,
|
||||
clock.now,
|
||||
);
|
||||
|
||||
handshake(&mut alice, &mut bob, &clock);
|
||||
|
||||
@@ -139,6 +151,59 @@ fn reconnect_discovers_new_interface() {
|
||||
assert_eq!(bob.failed_connections().count(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn migrate_connection_to_new_relay() {
|
||||
let _guard = setup_tracing();
|
||||
let mut clock = Clock::new();
|
||||
|
||||
let (alice, bob) = alice_and_bob();
|
||||
|
||||
let mut relays = [(
|
||||
1,
|
||||
TestRelay::new(IpAddr::V4(Ipv4Addr::LOCALHOST), debug_span!("Roger")),
|
||||
)];
|
||||
let mut alice = TestNode::new(debug_span!("Alice"), alice, "1.1.1.1:80").with_relays(
|
||||
HashSet::default(),
|
||||
&mut relays,
|
||||
clock.now,
|
||||
);
|
||||
let mut bob = TestNode::new(debug_span!("Bob"), bob, "2.2.2.2:80").with_relays(
|
||||
HashSet::default(),
|
||||
&mut relays,
|
||||
clock.now,
|
||||
);
|
||||
let firewall = Firewall::default()
|
||||
.with_block_rule(&alice, &bob)
|
||||
.with_block_rule(&bob, &alice);
|
||||
|
||||
handshake(&mut alice, &mut bob, &clock);
|
||||
|
||||
loop {
|
||||
if alice.is_connected_to(&bob) && bob.is_connected_to(&alice) {
|
||||
break;
|
||||
}
|
||||
|
||||
progress(&mut alice, &mut bob, &mut relays, &firewall, &mut clock);
|
||||
}
|
||||
|
||||
// Swap out the relays. "Roger" is being removed (ID 1) and "Robert" is being added (ID 2).
|
||||
let mut relays = [(2, TestRelay::new(ip("10.0.0.1"), debug_span!("Robert")))];
|
||||
alice = alice.with_relays(HashSet::from([1]), &mut relays, clock.now);
|
||||
|
||||
// Make some progress. (the fact that we only need 5 clock ticks means we are no relying on timeouts here)
|
||||
for _ in 0..5 {
|
||||
progress(&mut alice, &mut bob, &mut relays, &firewall, &mut clock);
|
||||
}
|
||||
|
||||
alice.ping(ip("9.9.9.9"), ip("8.8.8.8"), &bob, clock.now);
|
||||
progress(&mut alice, &mut bob, &mut relays, &firewall, &mut clock);
|
||||
assert_eq!(bob.packets_from(ip("9.9.9.9")).count(), 1);
|
||||
|
||||
bob.ping(ip("8.8.8.8"), ip("9.9.9.9"), &alice, clock.now);
|
||||
progress(&mut alice, &mut bob, &mut relays, &firewall, &mut clock);
|
||||
assert_eq!(alice.packets_from(ip("8.8.8.8")).count(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn connection_times_out_after_20_seconds() {
|
||||
let (mut alice, _) = alice_and_bob();
|
||||
@@ -695,6 +760,18 @@ impl EitherNode {
|
||||
EitherNode::Server(n) => n.reconnect(now),
|
||||
}
|
||||
}
|
||||
|
||||
fn update_relays(
|
||||
&mut self,
|
||||
to_remove: HashSet<u64>,
|
||||
to_add: &HashSet<(u64, SocketAddr, String, String, String)>,
|
||||
now: Instant,
|
||||
) {
|
||||
match self {
|
||||
EitherNode::Server(s) => s.update_relays(to_remove, to_add, now),
|
||||
EitherNode::Client(c) => c.update_relays(to_remove, to_add, now),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TestNode {
|
||||
@@ -713,7 +790,12 @@ impl TestNode {
|
||||
}
|
||||
}
|
||||
|
||||
fn with_relays(mut self, relays: &mut [(u64, TestRelay)], now: Instant) -> Self {
|
||||
fn with_relays(
|
||||
mut self,
|
||||
to_remove: HashSet<u64>,
|
||||
relays: &mut [(u64, TestRelay)],
|
||||
now: Instant,
|
||||
) -> Self {
|
||||
let username = match self.node {
|
||||
EitherNode::Server(_) => "server",
|
||||
EitherNode::Client(_) => "client",
|
||||
@@ -734,10 +816,8 @@ impl TestNode {
|
||||
})
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
self.span.in_scope(|| match &mut self.node {
|
||||
EitherNode::Server(s) => s.upsert_turn_servers(&turn_servers, now),
|
||||
EitherNode::Client(c) => c.upsert_turn_servers(&turn_servers, now),
|
||||
});
|
||||
self.span
|
||||
.in_scope(|| self.node.update_relays(to_remove, &turn_servers, now));
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
@@ -73,9 +73,10 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn upsert_relays(&mut self, relays: Vec<Relay>) {
|
||||
self.role_state.upsert_relays(
|
||||
turn(&relays, |addr| self.io.sockets_ref().can_handle(addr)),
|
||||
pub fn update_relays(&mut self, to_remove: HashSet<RelayId>, to_add: Vec<Relay>) {
|
||||
self.role_state.update_relays(
|
||||
to_remove,
|
||||
turn(&to_add, |addr| self.io.sockets_ref().can_handle(addr)),
|
||||
Instant::now(),
|
||||
)
|
||||
}
|
||||
@@ -1002,12 +1003,13 @@ impl ClientState {
|
||||
true
|
||||
}
|
||||
|
||||
fn upsert_relays(
|
||||
fn update_relays(
|
||||
&mut self,
|
||||
relays: HashSet<(RelayId, SocketAddr, String, String, String)>,
|
||||
to_remove: HashSet<RelayId>,
|
||||
to_add: HashSet<(RelayId, SocketAddr, String, String, String)>,
|
||||
now: Instant,
|
||||
) {
|
||||
self.node.upsert_turn_servers(&relays, now);
|
||||
self.node.update_relays(to_remove, &to_add, now);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -318,11 +318,12 @@ impl GatewayState {
|
||||
self.buffered_events.pop_front()
|
||||
}
|
||||
|
||||
pub(crate) fn upsert_relays(
|
||||
pub(crate) fn update_relays(
|
||||
&mut self,
|
||||
relays: HashSet<(RelayId, SocketAddr, String, String, String)>,
|
||||
to_remove: HashSet<RelayId>,
|
||||
to_add: HashSet<(RelayId, SocketAddr, String, String, String)>,
|
||||
now: Instant,
|
||||
) {
|
||||
self.node.upsert_turn_servers(&relays, now);
|
||||
self.node.update_relays(to_remove, &to_add, now);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
|
||||
use boringtun::x25519::StaticSecret;
|
||||
use connlib_shared::{
|
||||
messages::{ClientId, GatewayId, Relay, ResourceId, ReuseConnection},
|
||||
messages::{ClientId, GatewayId, Relay, RelayId, ResourceId, ReuseConnection},
|
||||
Callbacks, Result,
|
||||
};
|
||||
use io::Io;
|
||||
@@ -174,9 +174,10 @@ where
|
||||
})
|
||||
}
|
||||
|
||||
pub fn upsert_relays(&mut self, relays: Vec<Relay>) {
|
||||
self.role_state.upsert_relays(
|
||||
turn(&relays, |addr| self.io.sockets_ref().can_handle(addr)),
|
||||
pub fn update_relays(&mut self, to_remove: HashSet<RelayId>, to_add: Vec<Relay>) {
|
||||
self.role_state.update_relays(
|
||||
to_remove,
|
||||
turn(&to_add, |addr| self.io.sockets_ref().can_handle(addr)),
|
||||
Instant::now(),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ use crate::messages::{
|
||||
use crate::CallbackHandler;
|
||||
use anyhow::Result;
|
||||
use boringtun::x25519::PublicKey;
|
||||
use connlib_shared::messages::RelaysPresence;
|
||||
use connlib_shared::{
|
||||
messages::{GatewayResponse, ResourceAccepted},
|
||||
Dname,
|
||||
@@ -16,6 +17,7 @@ use firezone_tunnel::GatewayTunnel;
|
||||
use futures_bounded::Timeout;
|
||||
use ip_network::IpNetwork;
|
||||
use phoenix_channel::PhoenixChannel;
|
||||
use std::collections::HashSet;
|
||||
use std::convert::Infallible;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::Duration;
|
||||
@@ -174,6 +176,16 @@ impl Eventloop {
|
||||
} => {
|
||||
self.tunnel.remove_access(&client_id, &resource_id);
|
||||
}
|
||||
phoenix_channel::Event::InboundMessage {
|
||||
msg:
|
||||
IngressMessages::RelaysPresence(RelaysPresence {
|
||||
disconnected_ids,
|
||||
connected,
|
||||
}),
|
||||
..
|
||||
} => self
|
||||
.tunnel
|
||||
.update_relays(HashSet::from_iter(disconnected_ids), connected),
|
||||
phoenix_channel::Event::InboundMessage {
|
||||
msg: IngressMessages::Init(_),
|
||||
..
|
||||
|
||||
@@ -8,6 +8,7 @@ use firezone_cli_utils::{setup_global_subscriber, CommonArgs};
|
||||
use firezone_tunnel::{GatewayTunnel, Sockets};
|
||||
use futures::{future, TryFutureExt};
|
||||
use secrecy::{Secret, SecretString};
|
||||
use std::collections::HashSet;
|
||||
use std::convert::Infallible;
|
||||
use std::path::Path;
|
||||
use std::pin::pin;
|
||||
@@ -110,7 +111,7 @@ async fn run(login: LoginUrl, private_key: StaticSecret) -> Result<Infallible> {
|
||||
tunnel
|
||||
.set_interface(&init.interface)
|
||||
.context("Failed to set interface")?;
|
||||
tunnel.upsert_relays(init.relays);
|
||||
tunnel.update_relays(HashSet::default(), init.relays);
|
||||
|
||||
let mut eventloop = Eventloop::new(tunnel, portal);
|
||||
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use chrono::{serde::ts_seconds_option, DateTime, Utc};
|
||||
use connlib_shared::{
|
||||
messages::{
|
||||
ClientId, ClientPayload, GatewayResponse, Interface, Peer, Relay, ResourceDescription,
|
||||
ResourceId,
|
||||
ClientId, ClientPayload, GatewayResponse, Interface, Peer, Relay, RelaysPresence,
|
||||
ResourceDescription, ResourceId,
|
||||
},
|
||||
Dname,
|
||||
};
|
||||
@@ -74,6 +74,7 @@ pub enum IngressMessages {
|
||||
IceCandidates(ClientIceCandidates),
|
||||
InvalidateIceCandidates(ClientIceCandidates),
|
||||
Init(InitGateway),
|
||||
RelaysPresence(RelaysPresence),
|
||||
}
|
||||
|
||||
/// A client's ice candidate message.
|
||||
@@ -114,6 +115,7 @@ pub struct ConnectionReady {
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use connlib_shared::messages::Turn;
|
||||
use phoenix_channel::InitMessage;
|
||||
use phoenix_channel::PhoenixMessage;
|
||||
|
||||
@@ -327,4 +329,48 @@ mod test {
|
||||
let ingress_message = serde_json::from_str::<InitMessage<InitGateway>>(message).unwrap();
|
||||
assert_eq!(m, ingress_message);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn relays_presence() {
|
||||
let message = r#"
|
||||
{
|
||||
"event": "relays_presence",
|
||||
"ref": null,
|
||||
"topic": "gateway",
|
||||
"payload": {
|
||||
"disconnected_ids": [
|
||||
"e95f9517-2152-4677-a16a-fbb2687050a3",
|
||||
"b0724bd1-a8cc-4faf-88cd-f21159cfec47"
|
||||
],
|
||||
"connected": [
|
||||
{
|
||||
"id": "0a133356-7a9e-4b9a-b413-0d95a5720fd8",
|
||||
"type": "turn",
|
||||
"username": "1719367575:ZQHcVGkdnfgGmcP1",
|
||||
"password": "ZWYiBeFHOJyYq0mcwAXjRpcuXIJJpzWlOXVdxwttrWg",
|
||||
"addr": "172.28.0.101:3478",
|
||||
"expires_at": 1719367575
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
"#;
|
||||
let expected = IngressMessages::RelaysPresence(RelaysPresence {
|
||||
disconnected_ids: vec![
|
||||
"e95f9517-2152-4677-a16a-fbb2687050a3".parse().unwrap(),
|
||||
"b0724bd1-a8cc-4faf-88cd-f21159cfec47".parse().unwrap(),
|
||||
],
|
||||
connected: vec![Relay::Turn(Turn {
|
||||
id: "0a133356-7a9e-4b9a-b413-0d95a5720fd8".parse().unwrap(),
|
||||
expires_at: DateTime::from_timestamp(1719367575, 0).unwrap(),
|
||||
addr: "172.28.0.101:3478".parse().unwrap(),
|
||||
username: "1719367575:ZQHcVGkdnfgGmcP1".to_owned(),
|
||||
password: "ZWYiBeFHOJyYq0mcwAXjRpcuXIJJpzWlOXVdxwttrWg".to_owned(),
|
||||
})],
|
||||
});
|
||||
|
||||
let ingress_message = serde_json::from_str::<IngressMessages>(message).unwrap();
|
||||
|
||||
assert_eq!(ingress_message, expected);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,6 @@ source "./scripts/tests/lib.sh"
|
||||
|
||||
client_curl_resource "172.20.0.100/get"
|
||||
|
||||
docker compose stop api relay # Stop portal & relay
|
||||
docker compose stop api relay-1 relay-2 # Stop portal & relays
|
||||
|
||||
client_curl_resource "172.20.0.100/get"
|
||||
|
||||
@@ -14,6 +14,6 @@ function run_test() {
|
||||
|
||||
run_test
|
||||
|
||||
docker compose stop relay
|
||||
docker compose stop relay-1 relay-2
|
||||
|
||||
run_test
|
||||
|
||||
@@ -10,8 +10,12 @@ function gateway() {
|
||||
docker compose exec -it gateway "$@"
|
||||
}
|
||||
|
||||
function relay() {
|
||||
docker compose exec -it relay "$@"
|
||||
function relay1() {
|
||||
docker compose exec -it relay-1 "$@"
|
||||
}
|
||||
|
||||
function relay2() {
|
||||
docker compose exec -it relay-2 "$@"
|
||||
}
|
||||
|
||||
function install_iptables_drop_rules() {
|
||||
@@ -41,8 +45,8 @@ function client_nslookup() {
|
||||
}
|
||||
|
||||
function assert_equals() {
|
||||
local expected="$1"
|
||||
local actual="$2"
|
||||
local actual="$1"
|
||||
local expected="$2"
|
||||
|
||||
if [[ "$expected" != "$actual" ]]; then
|
||||
echo "Expected $expected but got $actual"
|
||||
|
||||
@@ -9,6 +9,7 @@ docker compose exec --env RUST_LOG=info -it client /bin/sh -c 'iperf3 \
|
||||
--client 172.20.0.110 \
|
||||
--json' >>"${TEST_NAME}.json"
|
||||
|
||||
assert_process_state "relay" "S"
|
||||
assert_process_state "relay-1" "S"
|
||||
assert_process_state "relay-2" "S"
|
||||
assert_process_state "gateway" "S"
|
||||
assert_process_state "client" "S"
|
||||
|
||||
@@ -10,6 +10,7 @@ docker compose exec --env RUST_LOG=info -it client /bin/sh -c 'iperf3 \
|
||||
--client 172.20.0.110 \
|
||||
--json' >>"${TEST_NAME}.json"
|
||||
|
||||
assert_process_state "relay" "S"
|
||||
assert_process_state "relay-1" "S"
|
||||
assert_process_state "relay-2" "S"
|
||||
assert_process_state "gateway" "S"
|
||||
assert_process_state "client" "S"
|
||||
|
||||
@@ -11,6 +11,7 @@ docker compose exec --env RUST_LOG=info -it client /bin/sh -c 'iperf3 \
|
||||
--client 172.20.0.110 \
|
||||
--json' >>"${TEST_NAME}.json"
|
||||
|
||||
assert_process_state "relay" "S"
|
||||
assert_process_state "relay-1" "S"
|
||||
assert_process_state "relay-2" "S"
|
||||
assert_process_state "gateway" "S"
|
||||
assert_process_state "client" "S"
|
||||
|
||||
@@ -12,6 +12,7 @@ docker compose exec --env RUST_LOG=info -it client /bin/sh -c 'iperf3 \
|
||||
--client 172.20.0.110 \
|
||||
--json' >>"${TEST_NAME}.json"
|
||||
|
||||
assert_process_state "relay" "S"
|
||||
assert_process_state "relay-1" "S"
|
||||
assert_process_state "relay-2" "S"
|
||||
assert_process_state "gateway" "S"
|
||||
assert_process_state "client" "S"
|
||||
|
||||
@@ -7,7 +7,7 @@ install_iptables_drop_rules
|
||||
client_curl_resource "172.20.0.100/get"
|
||||
|
||||
# Act: Send SIGTERM
|
||||
docker compose kill relay --signal SIGTERM
|
||||
docker compose kill relay-1 --signal SIGTERM
|
||||
|
||||
sleep 2 # Closing websocket isn't instant.
|
||||
|
||||
@@ -15,18 +15,18 @@ sleep 2 # Closing websocket isn't instant.
|
||||
client_curl_resource "172.20.0.100/get"
|
||||
|
||||
# Assert: Websocket connection is cut
|
||||
OPEN_SOCKETS=$(relay netstat -tn | grep "ESTABLISHED" | grep 8081 || true) # Portal listens on port 8081
|
||||
OPEN_SOCKETS=$(relay1 netstat -tn | grep "ESTABLISHED" | grep 8081 || true) # Portal listens on port 8081
|
||||
test -z "$OPEN_SOCKETS"
|
||||
|
||||
# Act: Send 2nd SIGTERM
|
||||
docker compose kill relay --signal SIGTERM
|
||||
docker compose kill relay-1 --signal SIGTERM
|
||||
|
||||
sleep 5 # Wait for container to be fully exited
|
||||
|
||||
# Seems to be necessary to return the correct state
|
||||
docker compose ps relay --all
|
||||
docker compose ps relay-1 --all
|
||||
sleep 1
|
||||
|
||||
# Assert: Container exited
|
||||
container_state=$(docker compose ps relay --all --format json | jq --raw-output '.State')
|
||||
container_state=$(docker compose ps relay-1 --all --format json | jq --raw-output '.State')
|
||||
assert_equals "$container_state" "exited"
|
||||
|
||||
@@ -6,7 +6,8 @@ install_iptables_drop_rules
|
||||
|
||||
client_curl_resource "172.20.0.100/get"
|
||||
|
||||
# Restart relay with new IP
|
||||
PUBLIC_IP4_ADDR="172.28.0.102" docker compose up -d relay
|
||||
# Restart relays with new IPs
|
||||
RELAY_1_PUBLIC_IP4_ADDR="172.28.0.102" docker compose up -d relay-1
|
||||
RELAY_2_PUBLIC_IP4_ADDR="172.28.0.202" docker compose up -d relay-2
|
||||
|
||||
client_curl_resource "172.20.0.100/get"
|
||||
|
||||
@@ -16,7 +16,8 @@ install_iptables_drop_rules
|
||||
|
||||
run_test
|
||||
|
||||
# Restart relay with new IP
|
||||
PUBLIC_IP4_ADDR="172.28.0.102" docker compose up -d relay
|
||||
# Restart relays with new IP
|
||||
RELAY_1_PUBLIC_IP4_ADDR="172.28.0.102" docker compose up -d relay-1
|
||||
RELAY_2_PUBLIC_IP4_ADDR="172.28.0.202" docker compose up -d relay-2
|
||||
|
||||
run_test
|
||||
|
||||
Reference in New Issue
Block a user