From 51089b89e78fb30e95b0b427c0520ff14cf64414 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Sat, 20 Apr 2024 16:16:35 +1000 Subject: [PATCH] feat(connlib): smoothly migrate relayed connections (#4568) Whenever we receive a `relays_presence` message from the portal, we invalidate the candidates of all now disconnected relays and make allocations on the new ones. This triggers signalling of new candidates to the remote party and migrates the connection to the newly nominated socket. This still relies on #4613 until we have #4634. Resolves: #4548. --------- Co-authored-by: Jamil --- .github/workflows/_integration_tests.yml | 10 +- .github/workflows/ci.yml | 9 +- docker-compose.yml | 39 ++++++- rust/Cargo.lock | 2 +- rust/connlib/clients/shared/src/eventloop.rs | 12 +- rust/connlib/clients/shared/src/messages.rs | 50 ++++++++- rust/connlib/shared/src/messages.rs | 9 ++ rust/connlib/snownet/src/node.rs | 33 +++++- rust/connlib/snownet/tests/lib.rs | 106 +++++++++++++++--- rust/connlib/tunnel/src/client.rs | 14 ++- rust/connlib/tunnel/src/gateway.rs | 7 +- rust/connlib/tunnel/src/lib.rs | 9 +- rust/gateway/src/eventloop.rs | 12 ++ rust/gateway/src/main.rs | 3 +- rust/gateway/src/messages.rs | 50 ++++++++- scripts/tests/direct-curl-api-relay-down.sh | 2 +- scripts/tests/direct-dns-relay-down.sh | 2 +- scripts/tests/lib.sh | 12 +- .../tests/perf/relayed-tcp-client2server.sh | 3 +- .../tests/perf/relayed-tcp-server2client.sh | 3 +- .../tests/perf/relayed-udp-client2server.sh | 3 +- .../tests/perf/relayed-udp-server2client.sh | 3 +- scripts/tests/relay-graceful-shutdown.sh | 10 +- scripts/tests/relayed-curl-relay-restart.sh | 5 +- scripts/tests/relayed-dns-relay-restart.sh | 5 +- 25 files changed, 344 insertions(+), 69 deletions(-) diff --git a/.github/workflows/_integration_tests.yml b/.github/workflows/_integration_tests.yml index 10c9f2cfc..ad60735c0 100644 --- a/.github/workflows/_integration_tests.yml +++ b/.github/workflows/_integration_tests.yml @@ -128,7 +128,8 @@ jobs: # Start one-by-one to avoid variability in service startup order docker compose up -d dns.httpbin httpbin download.httpbin docker compose up -d api web domain --no-build - docker compose up -d relay --no-build + docker compose up -d relay-1 --no-build + docker compose up -d relay-2 --no-build docker compose up -d gateway --no-build docker compose up -d client --no-build @@ -137,9 +138,12 @@ jobs: - name: Show Client logs if: "!cancelled()" run: docker compose logs client - - name: Show Relay logs + - name: Show Relay-1 logs if: "!cancelled()" - run: docker compose logs relay + run: docker compose logs relay-1 + - name: Show Relay-2 logs + if: "!cancelled()" + run: docker compose logs relay-2 - name: Show Gateway logs if: "!cancelled()" run: docker compose logs gateway diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d14545a9c..9a0715d66 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -223,7 +223,7 @@ jobs: # Start services in the same order each time for the tests docker compose up -d iperf3 docker compose up -d api web domain --no-build - docker compose up -d relay --no-build + docker compose up -d relay-1 relay-2 --no-build docker compose up -d gateway --no-build docker compose up -d client --no-build - name: 'Performance test: ${{ matrix.test_name }}' @@ -243,9 +243,12 @@ jobs: - name: Show Client UDP stats if: "!cancelled()" run: docker compose exec client cat /proc/net/udp - - name: Show Relay logs + - name: Show Relay-1 logs if: "!cancelled()" - run: docker compose logs relay + run: docker compose logs relay-1 + - name: Show Relay-2 logs + if: "!cancelled()" + run: docker compose logs relay-2 - name: Show Gateway logs if: "!cancelled()" run: docker compose logs gateway diff --git a/docker-compose.yml b/docker-compose.yml index 181b1085a..b4389cb4b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -389,9 +389,9 @@ services: resources: ipv4_address: 172.20.0.110 - relay: + relay-1: environment: - PUBLIC_IP4_ADDR: ${PUBLIC_IP4_ADDR:-172.28.0.101} + PUBLIC_IP4_ADDR: ${RELAY_1_PUBLIC_IP4_ADDR:-172.28.0.101} # PUBLIC_IP6_ADDR: fcff:3990:3990::101 LOWEST_PORT: 55555 HIGHEST_PORT: 55666 @@ -429,7 +429,40 @@ services: - 3478:3478/udp networks: app: - ipv4_address: ${PUBLIC_IP4_ADDR:-172.28.0.101} + ipv4_address: ${RELAY_1_PUBLIC_IP4_ADDR:-172.28.0.101} + + relay-2: + environment: + PUBLIC_IP4_ADDR: ${RELAY_2_PUBLIC_IP4_ADDR:-172.28.0.201} + # PUBLIC_IP6_ADDR: fcff:3990:3990::101 + # Token for self-hosted Relay + # FIREZONE_TOKEN: ".SFMyNTY.g2gDaANtAAAAJGM4OWJjYzhjLTkzOTItNGRhZS1hNDBkLTg4OGFlZjZkMjhlMG0AAAAkNTQ5YzQxMDctMTQ5Mi00ZjhmLWE0ZWMtYTlkMmE2NmQ4YWE5bQAAADhQVTVBSVRFMU84VkRWTk1ITU9BQzc3RElLTU9HVERJQTY3MlM2RzFBQjAyT1MzNEg1TUUwPT09PW4GAEngLBONAWIAAVGA.E-f2MFdGMX7JTL2jwoHBdWcUd2G3UNz2JRZLbQrlf0k" + # Token for global Relay + FIREZONE_TOKEN: ".SFMyNTY.g2gDaAN3A25pbG0AAAAkZTgyZmNkYzEtMDU3YS00MDE1LWI5MGItM2IxOGYwZjI4MDUzbQAAADhDMTROR0E4N0VKUlIwM0c0UVBSMDdBOUM2Rzc4NFRTU1RIU0Y0VEk1VDBHRDhENkwwVlJHPT09PW4GADXgLBONAWIAAVGA.dShU17FgnvO2GLcTSnBBTDoqQ2tScuG7qjiyKhhlq8s" + RUST_LOG: "debug" + RUST_BACKTRACE: 1 + FIREZONE_API_URL: ws://api:8081 + build: + target: dev + context: rust + dockerfile: Dockerfile + cache_from: + - type=registry,ref=us-east1-docker.pkg.dev/firezone-staging/cache/relay:main + args: + PACKAGE: firezone-relay + image: ${RELAY_IMAGE:-us-east1-docker.pkg.dev/firezone-staging/firezone/dev/relay}:${RELAY_TAG:-main} + healthcheck: + test: ["CMD-SHELL", "lsof -i UDP | grep firezone-relay"] + start_period: 10s + interval: 30s + retries: 5 + timeout: 5s + depends_on: + api: + condition: "service_healthy" + networks: + app: + ipv4_address: ${RELAY_2_PUBLIC_IP4_ADDR:-172.28.0.201} # IPv6 is currently causing flakiness with GH actions and on our testbed. diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 7db900c0a..755e2390b 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -3388,7 +3388,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c2a198fb6b0eada2a8df47933734e6d35d350665a33a3593d7164fa52c75c19" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.5", ] [[package]] diff --git a/rust/connlib/clients/shared/src/eventloop.rs b/rust/connlib/clients/shared/src/eventloop.rs index 6a7b1b18c..fb91ac8e4 100644 --- a/rust/connlib/clients/shared/src/eventloop.rs +++ b/rust/connlib/clients/shared/src/eventloop.rs @@ -7,13 +7,13 @@ use crate::{ }; use anyhow::Result; use connlib_shared::{ - messages::{ConnectionAccepted, GatewayResponse, ResourceAccepted, ResourceId}, + messages::{ConnectionAccepted, GatewayResponse, RelaysPresence, ResourceAccepted, ResourceId}, Callbacks, }; use firezone_tunnel::ClientTunnel; use phoenix_channel::{ErrorReply, OutboundRequestId, PhoenixChannel}; use std::{ - collections::HashMap, + collections::{HashMap, HashSet}, net::IpAddr, task::{Context, Poll}, }; @@ -202,7 +202,7 @@ where tracing::info!("Firezone Started!"); let _ = self.tunnel.set_resources(resources); - self.tunnel.upsert_relays(relays) + self.tunnel.update_relays(HashSet::default(), relays) } IngressMessages::ResourceCreatedOrUpdated(resource) => { let resource_id = resource.id(); @@ -214,6 +214,12 @@ where IngressMessages::ResourceDeleted(resource) => { self.tunnel.remove_resources(&[resource]); } + IngressMessages::RelaysPresence(RelaysPresence { + disconnected_ids, + connected, + }) => self + .tunnel + .update_relays(HashSet::from_iter(disconnected_ids), connected), IngressMessages::InvalidateIceCandidates(GatewayIceCandidates { gateway_id, candidates, diff --git a/rust/connlib/clients/shared/src/messages.rs b/rust/connlib/clients/shared/src/messages.rs index 8b05b41e1..4227133a7 100644 --- a/rust/connlib/clients/shared/src/messages.rs +++ b/rust/connlib/clients/shared/src/messages.rs @@ -1,6 +1,6 @@ use connlib_shared::messages::{ - GatewayId, GatewayResponse, Interface, Key, Relay, RequestConnection, ResourceDescription, - ResourceId, ReuseConnection, + GatewayId, GatewayResponse, Interface, Key, Relay, RelaysPresence, RequestConnection, + ResourceDescription, ResourceId, ReuseConnection, }; use serde::{Deserialize, Serialize}; use std::{collections::HashSet, net::IpAddr}; @@ -50,6 +50,8 @@ pub enum IngressMessages { InvalidateIceCandidates(GatewayIceCandidates), ConfigChanged(ConfigUpdate), + + RelaysPresence(RelaysPresence), } #[derive(Debug, Deserialize, Serialize, Clone, PartialEq)] @@ -575,4 +577,48 @@ mod test { let reply_message = serde_json::from_str(message).unwrap(); assert_eq!(m, reply_message); } + + #[test] + fn relays_presence() { + let message = r#" + { + "event": "relays_presence", + "ref": null, + "topic": "client", + "payload": { + "disconnected_ids": [ + "e95f9517-2152-4677-a16a-fbb2687050a3", + "b0724bd1-a8cc-4faf-88cd-f21159cfec47" + ], + "connected": [ + { + "id": "0a133356-7a9e-4b9a-b413-0d95a5720fd8", + "type": "turn", + "username": "1719367575:ZQHcVGkdnfgGmcP1", + "password": "ZWYiBeFHOJyYq0mcwAXjRpcuXIJJpzWlOXVdxwttrWg", + "addr": "172.28.0.101:3478", + "expires_at": 1719367575 + } + ] + } + } + "#; + let expected = IngressMessages::RelaysPresence(RelaysPresence { + disconnected_ids: vec![ + "e95f9517-2152-4677-a16a-fbb2687050a3".parse().unwrap(), + "b0724bd1-a8cc-4faf-88cd-f21159cfec47".parse().unwrap(), + ], + connected: vec![Relay::Turn(Turn { + id: "0a133356-7a9e-4b9a-b413-0d95a5720fd8".parse().unwrap(), + expires_at: DateTime::from_timestamp(1719367575, 0).unwrap(), + addr: "172.28.0.101:3478".parse().unwrap(), + username: "1719367575:ZQHcVGkdnfgGmcP1".to_owned(), + password: "ZWYiBeFHOJyYq0mcwAXjRpcuXIJJpzWlOXVdxwttrWg".to_owned(), + })], + }); + + let ingress_message = serde_json::from_str::(message).unwrap(); + + assert_eq!(ingress_message, expected); + } } diff --git a/rust/connlib/shared/src/messages.rs b/rust/connlib/shared/src/messages.rs index c92993d9d..7a30a0da2 100644 --- a/rust/connlib/shared/src/messages.rs +++ b/rust/connlib/shared/src/messages.rs @@ -413,6 +413,15 @@ pub struct Stun { pub addr: SocketAddr, } +/// A update to the presence of several relays. +#[derive(Debug, Deserialize, Clone, PartialEq, Eq)] +pub struct RelaysPresence { + /// These relays have disconnected from the portal. We need to stop using them. + pub disconnected_ids: Vec, + /// These relays are still online. We can/should use these. + pub connected: Vec, +} + #[cfg(test)] mod tests { use std::str::FromStr; diff --git a/rust/connlib/snownet/src/node.rs b/rust/connlib/snownet/src/node.rs index 58d52d9bc..26cffc85c 100644 --- a/rust/connlib/snownet/src/node.rs +++ b/rust/connlib/snownet/src/node.rs @@ -469,12 +469,32 @@ where self.buffered_transmits.pop_front() } - pub fn upsert_turn_servers( + pub fn update_relays( &mut self, - servers: &HashSet<(RId, SocketAddr, String, String, String)>, + to_remove: HashSet, + to_add: &HashSet<(RId, SocketAddr, String, String, String)>, now: Instant, ) { - for (id, server, username, password, realm) in servers { + // First, invalidate all candidates from relays that we should stop using. + for id in to_remove { + let Some(allocation) = self.allocations.remove(&id) else { + continue; + }; + + for (id, agent) in self.connections.agents_mut() { + let _span = info_span!("connection", %id).entered(); + + for candidate in allocation + .current_candidates() + .filter(|c| c.kind() == CandidateKind::Relayed) + { + agent.invalidate_candidate(&candidate); + } + } + } + + // Second, upsert all new relays. + for (id, server, username, password, realm) in to_add { let Ok(username) = Username::new(username.to_owned()) else { tracing::debug!(%username, "Invalid TURN username"); continue; @@ -494,7 +514,7 @@ where Allocation::new(*server, username, password.clone(), realm, now), ); - tracing::info!(address = %server, "Added new TURN server"); + tracing::info!(%id, address = %server, "Added new TURN server"); } } @@ -782,7 +802,7 @@ where }; self.upsert_stun_servers(&stun_servers, now); - self.upsert_turn_servers(&turn_servers, now); + self.update_relays(HashSet::default(), &turn_servers, now); let mut agent = IceAgent::new(); agent.set_controlling(true); @@ -884,7 +904,7 @@ where }; self.upsert_stun_servers(&stun_servers, now); - self.upsert_turn_servers(&turn_servers, now); + self.update_relays(HashSet::default(), &turn_servers, now); let mut agent = IceAgent::new(); agent.set_controlling(false); @@ -1723,6 +1743,7 @@ where self.agent .local_candidates() .iter() + .filter(|c| !c.discarded()) .find(|c| c.addr() == source) } diff --git a/rust/connlib/snownet/tests/lib.rs b/rust/connlib/snownet/tests/lib.rs index 3deb6039f..243bc76fa 100644 --- a/rust/connlib/snownet/tests/lib.rs +++ b/rust/connlib/snownet/tests/lib.rs @@ -57,10 +57,16 @@ fn smoke_relayed() { 1, TestRelay::new(IpAddr::V4(Ipv4Addr::LOCALHOST), debug_span!("Roger")), )]; - let mut alice = TestNode::new(debug_span!("Alice"), alice, "1.1.1.1:80") - .with_relays(&mut relays, clock.now); - let mut bob = - TestNode::new(debug_span!("Bob"), bob, "2.2.2.2:80").with_relays(&mut relays, clock.now); + let mut alice = TestNode::new(debug_span!("Alice"), alice, "1.1.1.1:80").with_relays( + HashSet::default(), + &mut relays, + clock.now, + ); + let mut bob = TestNode::new(debug_span!("Bob"), bob, "2.2.2.2:80").with_relays( + HashSet::default(), + &mut relays, + clock.now, + ); let firewall = Firewall::default() .with_block_rule(&alice, &bob) .with_block_rule(&bob, &alice); @@ -96,10 +102,16 @@ fn reconnect_discovers_new_interface() { 1, TestRelay::new(IpAddr::V4(Ipv4Addr::LOCALHOST), debug_span!("Roger")), )]; - let mut alice = TestNode::new(debug_span!("Alice"), alice, "1.1.1.1:80") - .with_relays(&mut relays, clock.now); - let mut bob = - TestNode::new(debug_span!("Bob"), bob, "2.2.2.2:80").with_relays(&mut relays, clock.now); + let mut alice = TestNode::new(debug_span!("Alice"), alice, "1.1.1.1:80").with_relays( + HashSet::default(), + &mut relays, + clock.now, + ); + let mut bob = TestNode::new(debug_span!("Bob"), bob, "2.2.2.2:80").with_relays( + HashSet::default(), + &mut relays, + clock.now, + ); handshake(&mut alice, &mut bob, &clock); @@ -139,6 +151,59 @@ fn reconnect_discovers_new_interface() { assert_eq!(bob.failed_connections().count(), 0); } +#[test] +fn migrate_connection_to_new_relay() { + let _guard = setup_tracing(); + let mut clock = Clock::new(); + + let (alice, bob) = alice_and_bob(); + + let mut relays = [( + 1, + TestRelay::new(IpAddr::V4(Ipv4Addr::LOCALHOST), debug_span!("Roger")), + )]; + let mut alice = TestNode::new(debug_span!("Alice"), alice, "1.1.1.1:80").with_relays( + HashSet::default(), + &mut relays, + clock.now, + ); + let mut bob = TestNode::new(debug_span!("Bob"), bob, "2.2.2.2:80").with_relays( + HashSet::default(), + &mut relays, + clock.now, + ); + let firewall = Firewall::default() + .with_block_rule(&alice, &bob) + .with_block_rule(&bob, &alice); + + handshake(&mut alice, &mut bob, &clock); + + loop { + if alice.is_connected_to(&bob) && bob.is_connected_to(&alice) { + break; + } + + progress(&mut alice, &mut bob, &mut relays, &firewall, &mut clock); + } + + // Swap out the relays. "Roger" is being removed (ID 1) and "Robert" is being added (ID 2). + let mut relays = [(2, TestRelay::new(ip("10.0.0.1"), debug_span!("Robert")))]; + alice = alice.with_relays(HashSet::from([1]), &mut relays, clock.now); + + // Make some progress. (the fact that we only need 5 clock ticks means we are no relying on timeouts here) + for _ in 0..5 { + progress(&mut alice, &mut bob, &mut relays, &firewall, &mut clock); + } + + alice.ping(ip("9.9.9.9"), ip("8.8.8.8"), &bob, clock.now); + progress(&mut alice, &mut bob, &mut relays, &firewall, &mut clock); + assert_eq!(bob.packets_from(ip("9.9.9.9")).count(), 1); + + bob.ping(ip("8.8.8.8"), ip("9.9.9.9"), &alice, clock.now); + progress(&mut alice, &mut bob, &mut relays, &firewall, &mut clock); + assert_eq!(alice.packets_from(ip("8.8.8.8")).count(), 1); +} + #[test] fn connection_times_out_after_20_seconds() { let (mut alice, _) = alice_and_bob(); @@ -695,6 +760,18 @@ impl EitherNode { EitherNode::Server(n) => n.reconnect(now), } } + + fn update_relays( + &mut self, + to_remove: HashSet, + to_add: &HashSet<(u64, SocketAddr, String, String, String)>, + now: Instant, + ) { + match self { + EitherNode::Server(s) => s.update_relays(to_remove, to_add, now), + EitherNode::Client(c) => c.update_relays(to_remove, to_add, now), + } + } } impl TestNode { @@ -713,7 +790,12 @@ impl TestNode { } } - fn with_relays(mut self, relays: &mut [(u64, TestRelay)], now: Instant) -> Self { + fn with_relays( + mut self, + to_remove: HashSet, + relays: &mut [(u64, TestRelay)], + now: Instant, + ) -> Self { let username = match self.node { EitherNode::Server(_) => "server", EitherNode::Client(_) => "client", @@ -734,10 +816,8 @@ impl TestNode { }) .collect::>(); - self.span.in_scope(|| match &mut self.node { - EitherNode::Server(s) => s.upsert_turn_servers(&turn_servers, now), - EitherNode::Client(c) => c.upsert_turn_servers(&turn_servers, now), - }); + self.span + .in_scope(|| self.node.update_relays(to_remove, &turn_servers, now)); self } diff --git a/rust/connlib/tunnel/src/client.rs b/rust/connlib/tunnel/src/client.rs index 87c6c35d6..d655c9473 100644 --- a/rust/connlib/tunnel/src/client.rs +++ b/rust/connlib/tunnel/src/client.rs @@ -73,9 +73,10 @@ where Ok(()) } - pub fn upsert_relays(&mut self, relays: Vec) { - self.role_state.upsert_relays( - turn(&relays, |addr| self.io.sockets_ref().can_handle(addr)), + pub fn update_relays(&mut self, to_remove: HashSet, to_add: Vec) { + self.role_state.update_relays( + to_remove, + turn(&to_add, |addr| self.io.sockets_ref().can_handle(addr)), Instant::now(), ) } @@ -1002,12 +1003,13 @@ impl ClientState { true } - fn upsert_relays( + fn update_relays( &mut self, - relays: HashSet<(RelayId, SocketAddr, String, String, String)>, + to_remove: HashSet, + to_add: HashSet<(RelayId, SocketAddr, String, String, String)>, now: Instant, ) { - self.node.upsert_turn_servers(&relays, now); + self.node.update_relays(to_remove, &to_add, now); } } diff --git a/rust/connlib/tunnel/src/gateway.rs b/rust/connlib/tunnel/src/gateway.rs index 004cc881e..9d5903c48 100644 --- a/rust/connlib/tunnel/src/gateway.rs +++ b/rust/connlib/tunnel/src/gateway.rs @@ -318,11 +318,12 @@ impl GatewayState { self.buffered_events.pop_front() } - pub(crate) fn upsert_relays( + pub(crate) fn update_relays( &mut self, - relays: HashSet<(RelayId, SocketAddr, String, String, String)>, + to_remove: HashSet, + to_add: HashSet<(RelayId, SocketAddr, String, String, String)>, now: Instant, ) { - self.node.upsert_turn_servers(&relays, now); + self.node.update_relays(to_remove, &to_add, now); } } diff --git a/rust/connlib/tunnel/src/lib.rs b/rust/connlib/tunnel/src/lib.rs index 7248471d1..8ba950464 100644 --- a/rust/connlib/tunnel/src/lib.rs +++ b/rust/connlib/tunnel/src/lib.rs @@ -5,7 +5,7 @@ use boringtun::x25519::StaticSecret; use connlib_shared::{ - messages::{ClientId, GatewayId, Relay, ResourceId, ReuseConnection}, + messages::{ClientId, GatewayId, Relay, RelayId, ResourceId, ReuseConnection}, Callbacks, Result, }; use io::Io; @@ -174,9 +174,10 @@ where }) } - pub fn upsert_relays(&mut self, relays: Vec) { - self.role_state.upsert_relays( - turn(&relays, |addr| self.io.sockets_ref().can_handle(addr)), + pub fn update_relays(&mut self, to_remove: HashSet, to_add: Vec) { + self.role_state.update_relays( + to_remove, + turn(&to_add, |addr| self.io.sockets_ref().can_handle(addr)), Instant::now(), ) } diff --git a/rust/gateway/src/eventloop.rs b/rust/gateway/src/eventloop.rs index 3083026d7..f5850f12e 100644 --- a/rust/gateway/src/eventloop.rs +++ b/rust/gateway/src/eventloop.rs @@ -5,6 +5,7 @@ use crate::messages::{ use crate::CallbackHandler; use anyhow::Result; use boringtun::x25519::PublicKey; +use connlib_shared::messages::RelaysPresence; use connlib_shared::{ messages::{GatewayResponse, ResourceAccepted}, Dname, @@ -16,6 +17,7 @@ use firezone_tunnel::GatewayTunnel; use futures_bounded::Timeout; use ip_network::IpNetwork; use phoenix_channel::PhoenixChannel; +use std::collections::HashSet; use std::convert::Infallible; use std::task::{Context, Poll}; use std::time::Duration; @@ -174,6 +176,16 @@ impl Eventloop { } => { self.tunnel.remove_access(&client_id, &resource_id); } + phoenix_channel::Event::InboundMessage { + msg: + IngressMessages::RelaysPresence(RelaysPresence { + disconnected_ids, + connected, + }), + .. + } => self + .tunnel + .update_relays(HashSet::from_iter(disconnected_ids), connected), phoenix_channel::Event::InboundMessage { msg: IngressMessages::Init(_), .. diff --git a/rust/gateway/src/main.rs b/rust/gateway/src/main.rs index 18922f798..04b631d8d 100644 --- a/rust/gateway/src/main.rs +++ b/rust/gateway/src/main.rs @@ -8,6 +8,7 @@ use firezone_cli_utils::{setup_global_subscriber, CommonArgs}; use firezone_tunnel::{GatewayTunnel, Sockets}; use futures::{future, TryFutureExt}; use secrecy::{Secret, SecretString}; +use std::collections::HashSet; use std::convert::Infallible; use std::path::Path; use std::pin::pin; @@ -110,7 +111,7 @@ async fn run(login: LoginUrl, private_key: StaticSecret) -> Result { tunnel .set_interface(&init.interface) .context("Failed to set interface")?; - tunnel.upsert_relays(init.relays); + tunnel.update_relays(HashSet::default(), init.relays); let mut eventloop = Eventloop::new(tunnel, portal); diff --git a/rust/gateway/src/messages.rs b/rust/gateway/src/messages.rs index dbfe4fba2..9b6a7e699 100644 --- a/rust/gateway/src/messages.rs +++ b/rust/gateway/src/messages.rs @@ -1,8 +1,8 @@ use chrono::{serde::ts_seconds_option, DateTime, Utc}; use connlib_shared::{ messages::{ - ClientId, ClientPayload, GatewayResponse, Interface, Peer, Relay, ResourceDescription, - ResourceId, + ClientId, ClientPayload, GatewayResponse, Interface, Peer, Relay, RelaysPresence, + ResourceDescription, ResourceId, }, Dname, }; @@ -74,6 +74,7 @@ pub enum IngressMessages { IceCandidates(ClientIceCandidates), InvalidateIceCandidates(ClientIceCandidates), Init(InitGateway), + RelaysPresence(RelaysPresence), } /// A client's ice candidate message. @@ -114,6 +115,7 @@ pub struct ConnectionReady { #[cfg(test)] mod test { use super::*; + use connlib_shared::messages::Turn; use phoenix_channel::InitMessage; use phoenix_channel::PhoenixMessage; @@ -327,4 +329,48 @@ mod test { let ingress_message = serde_json::from_str::>(message).unwrap(); assert_eq!(m, ingress_message); } + + #[test] + fn relays_presence() { + let message = r#" + { + "event": "relays_presence", + "ref": null, + "topic": "gateway", + "payload": { + "disconnected_ids": [ + "e95f9517-2152-4677-a16a-fbb2687050a3", + "b0724bd1-a8cc-4faf-88cd-f21159cfec47" + ], + "connected": [ + { + "id": "0a133356-7a9e-4b9a-b413-0d95a5720fd8", + "type": "turn", + "username": "1719367575:ZQHcVGkdnfgGmcP1", + "password": "ZWYiBeFHOJyYq0mcwAXjRpcuXIJJpzWlOXVdxwttrWg", + "addr": "172.28.0.101:3478", + "expires_at": 1719367575 + } + ] + } + } + "#; + let expected = IngressMessages::RelaysPresence(RelaysPresence { + disconnected_ids: vec![ + "e95f9517-2152-4677-a16a-fbb2687050a3".parse().unwrap(), + "b0724bd1-a8cc-4faf-88cd-f21159cfec47".parse().unwrap(), + ], + connected: vec![Relay::Turn(Turn { + id: "0a133356-7a9e-4b9a-b413-0d95a5720fd8".parse().unwrap(), + expires_at: DateTime::from_timestamp(1719367575, 0).unwrap(), + addr: "172.28.0.101:3478".parse().unwrap(), + username: "1719367575:ZQHcVGkdnfgGmcP1".to_owned(), + password: "ZWYiBeFHOJyYq0mcwAXjRpcuXIJJpzWlOXVdxwttrWg".to_owned(), + })], + }); + + let ingress_message = serde_json::from_str::(message).unwrap(); + + assert_eq!(ingress_message, expected); + } } diff --git a/scripts/tests/direct-curl-api-relay-down.sh b/scripts/tests/direct-curl-api-relay-down.sh index 2a590d832..47709acb7 100755 --- a/scripts/tests/direct-curl-api-relay-down.sh +++ b/scripts/tests/direct-curl-api-relay-down.sh @@ -4,6 +4,6 @@ source "./scripts/tests/lib.sh" client_curl_resource "172.20.0.100/get" -docker compose stop api relay # Stop portal & relay +docker compose stop api relay-1 relay-2 # Stop portal & relays client_curl_resource "172.20.0.100/get" diff --git a/scripts/tests/direct-dns-relay-down.sh b/scripts/tests/direct-dns-relay-down.sh index e736fec41..b3125bdc7 100755 --- a/scripts/tests/direct-dns-relay-down.sh +++ b/scripts/tests/direct-dns-relay-down.sh @@ -14,6 +14,6 @@ function run_test() { run_test -docker compose stop relay +docker compose stop relay-1 relay-2 run_test diff --git a/scripts/tests/lib.sh b/scripts/tests/lib.sh index 568089a88..991c99d48 100755 --- a/scripts/tests/lib.sh +++ b/scripts/tests/lib.sh @@ -10,8 +10,12 @@ function gateway() { docker compose exec -it gateway "$@" } -function relay() { - docker compose exec -it relay "$@" +function relay1() { + docker compose exec -it relay-1 "$@" +} + +function relay2() { + docker compose exec -it relay-2 "$@" } function install_iptables_drop_rules() { @@ -41,8 +45,8 @@ function client_nslookup() { } function assert_equals() { - local expected="$1" - local actual="$2" + local actual="$1" + local expected="$2" if [[ "$expected" != "$actual" ]]; then echo "Expected $expected but got $actual" diff --git a/scripts/tests/perf/relayed-tcp-client2server.sh b/scripts/tests/perf/relayed-tcp-client2server.sh index 8adcb2f7b..8d17911e9 100755 --- a/scripts/tests/perf/relayed-tcp-client2server.sh +++ b/scripts/tests/perf/relayed-tcp-client2server.sh @@ -9,6 +9,7 @@ docker compose exec --env RUST_LOG=info -it client /bin/sh -c 'iperf3 \ --client 172.20.0.110 \ --json' >>"${TEST_NAME}.json" -assert_process_state "relay" "S" +assert_process_state "relay-1" "S" +assert_process_state "relay-2" "S" assert_process_state "gateway" "S" assert_process_state "client" "S" diff --git a/scripts/tests/perf/relayed-tcp-server2client.sh b/scripts/tests/perf/relayed-tcp-server2client.sh index 0ba3e78d2..ef477ca09 100755 --- a/scripts/tests/perf/relayed-tcp-server2client.sh +++ b/scripts/tests/perf/relayed-tcp-server2client.sh @@ -10,6 +10,7 @@ docker compose exec --env RUST_LOG=info -it client /bin/sh -c 'iperf3 \ --client 172.20.0.110 \ --json' >>"${TEST_NAME}.json" -assert_process_state "relay" "S" +assert_process_state "relay-1" "S" +assert_process_state "relay-2" "S" assert_process_state "gateway" "S" assert_process_state "client" "S" diff --git a/scripts/tests/perf/relayed-udp-client2server.sh b/scripts/tests/perf/relayed-udp-client2server.sh index c453eedb6..4f23bb1d7 100755 --- a/scripts/tests/perf/relayed-udp-client2server.sh +++ b/scripts/tests/perf/relayed-udp-client2server.sh @@ -11,6 +11,7 @@ docker compose exec --env RUST_LOG=info -it client /bin/sh -c 'iperf3 \ --client 172.20.0.110 \ --json' >>"${TEST_NAME}.json" -assert_process_state "relay" "S" +assert_process_state "relay-1" "S" +assert_process_state "relay-2" "S" assert_process_state "gateway" "S" assert_process_state "client" "S" diff --git a/scripts/tests/perf/relayed-udp-server2client.sh b/scripts/tests/perf/relayed-udp-server2client.sh index 8afd962ba..dd1bd4ff7 100755 --- a/scripts/tests/perf/relayed-udp-server2client.sh +++ b/scripts/tests/perf/relayed-udp-server2client.sh @@ -12,6 +12,7 @@ docker compose exec --env RUST_LOG=info -it client /bin/sh -c 'iperf3 \ --client 172.20.0.110 \ --json' >>"${TEST_NAME}.json" -assert_process_state "relay" "S" +assert_process_state "relay-1" "S" +assert_process_state "relay-2" "S" assert_process_state "gateway" "S" assert_process_state "client" "S" diff --git a/scripts/tests/relay-graceful-shutdown.sh b/scripts/tests/relay-graceful-shutdown.sh index b06116547..76d7f72d1 100755 --- a/scripts/tests/relay-graceful-shutdown.sh +++ b/scripts/tests/relay-graceful-shutdown.sh @@ -7,7 +7,7 @@ install_iptables_drop_rules client_curl_resource "172.20.0.100/get" # Act: Send SIGTERM -docker compose kill relay --signal SIGTERM +docker compose kill relay-1 --signal SIGTERM sleep 2 # Closing websocket isn't instant. @@ -15,18 +15,18 @@ sleep 2 # Closing websocket isn't instant. client_curl_resource "172.20.0.100/get" # Assert: Websocket connection is cut -OPEN_SOCKETS=$(relay netstat -tn | grep "ESTABLISHED" | grep 8081 || true) # Portal listens on port 8081 +OPEN_SOCKETS=$(relay1 netstat -tn | grep "ESTABLISHED" | grep 8081 || true) # Portal listens on port 8081 test -z "$OPEN_SOCKETS" # Act: Send 2nd SIGTERM -docker compose kill relay --signal SIGTERM +docker compose kill relay-1 --signal SIGTERM sleep 5 # Wait for container to be fully exited # Seems to be necessary to return the correct state -docker compose ps relay --all +docker compose ps relay-1 --all sleep 1 # Assert: Container exited -container_state=$(docker compose ps relay --all --format json | jq --raw-output '.State') +container_state=$(docker compose ps relay-1 --all --format json | jq --raw-output '.State') assert_equals "$container_state" "exited" diff --git a/scripts/tests/relayed-curl-relay-restart.sh b/scripts/tests/relayed-curl-relay-restart.sh index 23ab61909..74aafc761 100755 --- a/scripts/tests/relayed-curl-relay-restart.sh +++ b/scripts/tests/relayed-curl-relay-restart.sh @@ -6,7 +6,8 @@ install_iptables_drop_rules client_curl_resource "172.20.0.100/get" -# Restart relay with new IP -PUBLIC_IP4_ADDR="172.28.0.102" docker compose up -d relay +# Restart relays with new IPs +RELAY_1_PUBLIC_IP4_ADDR="172.28.0.102" docker compose up -d relay-1 +RELAY_2_PUBLIC_IP4_ADDR="172.28.0.202" docker compose up -d relay-2 client_curl_resource "172.20.0.100/get" diff --git a/scripts/tests/relayed-dns-relay-restart.sh b/scripts/tests/relayed-dns-relay-restart.sh index d9478cc79..e48807dc0 100755 --- a/scripts/tests/relayed-dns-relay-restart.sh +++ b/scripts/tests/relayed-dns-relay-restart.sh @@ -16,7 +16,8 @@ install_iptables_drop_rules run_test -# Restart relay with new IP -PUBLIC_IP4_ADDR="172.28.0.102" docker compose up -d relay +# Restart relays with new IP +RELAY_1_PUBLIC_IP4_ADDR="172.28.0.102" docker compose up -d relay-1 +RELAY_2_PUBLIC_IP4_ADDR="172.28.0.202" docker compose up -d relay-2 run_test