diff --git a/.github/workflows/_integration_tests.yml b/.github/workflows/_integration_tests.yml index 10c9f2cfc..ad60735c0 100644 --- a/.github/workflows/_integration_tests.yml +++ b/.github/workflows/_integration_tests.yml @@ -128,7 +128,8 @@ jobs: # Start one-by-one to avoid variability in service startup order docker compose up -d dns.httpbin httpbin download.httpbin docker compose up -d api web domain --no-build - docker compose up -d relay --no-build + docker compose up -d relay-1 --no-build + docker compose up -d relay-2 --no-build docker compose up -d gateway --no-build docker compose up -d client --no-build @@ -137,9 +138,12 @@ jobs: - name: Show Client logs if: "!cancelled()" run: docker compose logs client - - name: Show Relay logs + - name: Show Relay-1 logs if: "!cancelled()" - run: docker compose logs relay + run: docker compose logs relay-1 + - name: Show Relay-2 logs + if: "!cancelled()" + run: docker compose logs relay-2 - name: Show Gateway logs if: "!cancelled()" run: docker compose logs gateway diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d14545a9c..9a0715d66 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -223,7 +223,7 @@ jobs: # Start services in the same order each time for the tests docker compose up -d iperf3 docker compose up -d api web domain --no-build - docker compose up -d relay --no-build + docker compose up -d relay-1 relay-2 --no-build docker compose up -d gateway --no-build docker compose up -d client --no-build - name: 'Performance test: ${{ matrix.test_name }}' @@ -243,9 +243,12 @@ jobs: - name: Show Client UDP stats if: "!cancelled()" run: docker compose exec client cat /proc/net/udp - - name: Show Relay logs + - name: Show Relay-1 logs if: "!cancelled()" - run: docker compose logs relay + run: docker compose logs relay-1 + - name: Show Relay-2 logs + if: "!cancelled()" + run: docker compose logs relay-2 - name: Show Gateway logs if: "!cancelled()" run: docker compose logs gateway diff --git a/docker-compose.yml b/docker-compose.yml index 181b1085a..b4389cb4b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -389,9 +389,9 @@ services: resources: ipv4_address: 172.20.0.110 - relay: + relay-1: environment: - PUBLIC_IP4_ADDR: ${PUBLIC_IP4_ADDR:-172.28.0.101} + PUBLIC_IP4_ADDR: ${RELAY_1_PUBLIC_IP4_ADDR:-172.28.0.101} # PUBLIC_IP6_ADDR: fcff:3990:3990::101 LOWEST_PORT: 55555 HIGHEST_PORT: 55666 @@ -429,7 +429,40 @@ services: - 3478:3478/udp networks: app: - ipv4_address: ${PUBLIC_IP4_ADDR:-172.28.0.101} + ipv4_address: ${RELAY_1_PUBLIC_IP4_ADDR:-172.28.0.101} + + relay-2: + environment: + PUBLIC_IP4_ADDR: ${RELAY_2_PUBLIC_IP4_ADDR:-172.28.0.201} + # PUBLIC_IP6_ADDR: fcff:3990:3990::101 + # Token for self-hosted Relay + # FIREZONE_TOKEN: ".SFMyNTY.g2gDaANtAAAAJGM4OWJjYzhjLTkzOTItNGRhZS1hNDBkLTg4OGFlZjZkMjhlMG0AAAAkNTQ5YzQxMDctMTQ5Mi00ZjhmLWE0ZWMtYTlkMmE2NmQ4YWE5bQAAADhQVTVBSVRFMU84VkRWTk1ITU9BQzc3RElLTU9HVERJQTY3MlM2RzFBQjAyT1MzNEg1TUUwPT09PW4GAEngLBONAWIAAVGA.E-f2MFdGMX7JTL2jwoHBdWcUd2G3UNz2JRZLbQrlf0k" + # Token for global Relay + FIREZONE_TOKEN: ".SFMyNTY.g2gDaAN3A25pbG0AAAAkZTgyZmNkYzEtMDU3YS00MDE1LWI5MGItM2IxOGYwZjI4MDUzbQAAADhDMTROR0E4N0VKUlIwM0c0UVBSMDdBOUM2Rzc4NFRTU1RIU0Y0VEk1VDBHRDhENkwwVlJHPT09PW4GADXgLBONAWIAAVGA.dShU17FgnvO2GLcTSnBBTDoqQ2tScuG7qjiyKhhlq8s" + RUST_LOG: "debug" + RUST_BACKTRACE: 1 + FIREZONE_API_URL: ws://api:8081 + build: + target: dev + context: rust + dockerfile: Dockerfile + cache_from: + - type=registry,ref=us-east1-docker.pkg.dev/firezone-staging/cache/relay:main + args: + PACKAGE: firezone-relay + image: ${RELAY_IMAGE:-us-east1-docker.pkg.dev/firezone-staging/firezone/dev/relay}:${RELAY_TAG:-main} + healthcheck: + test: ["CMD-SHELL", "lsof -i UDP | grep firezone-relay"] + start_period: 10s + interval: 30s + retries: 5 + timeout: 5s + depends_on: + api: + condition: "service_healthy" + networks: + app: + ipv4_address: ${RELAY_2_PUBLIC_IP4_ADDR:-172.28.0.201} # IPv6 is currently causing flakiness with GH actions and on our testbed. diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 7db900c0a..755e2390b 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -3388,7 +3388,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c2a198fb6b0eada2a8df47933734e6d35d350665a33a3593d7164fa52c75c19" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.5", ] [[package]] diff --git a/rust/connlib/clients/shared/src/eventloop.rs b/rust/connlib/clients/shared/src/eventloop.rs index 6a7b1b18c..fb91ac8e4 100644 --- a/rust/connlib/clients/shared/src/eventloop.rs +++ b/rust/connlib/clients/shared/src/eventloop.rs @@ -7,13 +7,13 @@ use crate::{ }; use anyhow::Result; use connlib_shared::{ - messages::{ConnectionAccepted, GatewayResponse, ResourceAccepted, ResourceId}, + messages::{ConnectionAccepted, GatewayResponse, RelaysPresence, ResourceAccepted, ResourceId}, Callbacks, }; use firezone_tunnel::ClientTunnel; use phoenix_channel::{ErrorReply, OutboundRequestId, PhoenixChannel}; use std::{ - collections::HashMap, + collections::{HashMap, HashSet}, net::IpAddr, task::{Context, Poll}, }; @@ -202,7 +202,7 @@ where tracing::info!("Firezone Started!"); let _ = self.tunnel.set_resources(resources); - self.tunnel.upsert_relays(relays) + self.tunnel.update_relays(HashSet::default(), relays) } IngressMessages::ResourceCreatedOrUpdated(resource) => { let resource_id = resource.id(); @@ -214,6 +214,12 @@ where IngressMessages::ResourceDeleted(resource) => { self.tunnel.remove_resources(&[resource]); } + IngressMessages::RelaysPresence(RelaysPresence { + disconnected_ids, + connected, + }) => self + .tunnel + .update_relays(HashSet::from_iter(disconnected_ids), connected), IngressMessages::InvalidateIceCandidates(GatewayIceCandidates { gateway_id, candidates, diff --git a/rust/connlib/clients/shared/src/messages.rs b/rust/connlib/clients/shared/src/messages.rs index 8b05b41e1..4227133a7 100644 --- a/rust/connlib/clients/shared/src/messages.rs +++ b/rust/connlib/clients/shared/src/messages.rs @@ -1,6 +1,6 @@ use connlib_shared::messages::{ - GatewayId, GatewayResponse, Interface, Key, Relay, RequestConnection, ResourceDescription, - ResourceId, ReuseConnection, + GatewayId, GatewayResponse, Interface, Key, Relay, RelaysPresence, RequestConnection, + ResourceDescription, ResourceId, ReuseConnection, }; use serde::{Deserialize, Serialize}; use std::{collections::HashSet, net::IpAddr}; @@ -50,6 +50,8 @@ pub enum IngressMessages { InvalidateIceCandidates(GatewayIceCandidates), ConfigChanged(ConfigUpdate), + + RelaysPresence(RelaysPresence), } #[derive(Debug, Deserialize, Serialize, Clone, PartialEq)] @@ -575,4 +577,48 @@ mod test { let reply_message = serde_json::from_str(message).unwrap(); assert_eq!(m, reply_message); } + + #[test] + fn relays_presence() { + let message = r#" + { + "event": "relays_presence", + "ref": null, + "topic": "client", + "payload": { + "disconnected_ids": [ + "e95f9517-2152-4677-a16a-fbb2687050a3", + "b0724bd1-a8cc-4faf-88cd-f21159cfec47" + ], + "connected": [ + { + "id": "0a133356-7a9e-4b9a-b413-0d95a5720fd8", + "type": "turn", + "username": "1719367575:ZQHcVGkdnfgGmcP1", + "password": "ZWYiBeFHOJyYq0mcwAXjRpcuXIJJpzWlOXVdxwttrWg", + "addr": "172.28.0.101:3478", + "expires_at": 1719367575 + } + ] + } + } + "#; + let expected = IngressMessages::RelaysPresence(RelaysPresence { + disconnected_ids: vec![ + "e95f9517-2152-4677-a16a-fbb2687050a3".parse().unwrap(), + "b0724bd1-a8cc-4faf-88cd-f21159cfec47".parse().unwrap(), + ], + connected: vec![Relay::Turn(Turn { + id: "0a133356-7a9e-4b9a-b413-0d95a5720fd8".parse().unwrap(), + expires_at: DateTime::from_timestamp(1719367575, 0).unwrap(), + addr: "172.28.0.101:3478".parse().unwrap(), + username: "1719367575:ZQHcVGkdnfgGmcP1".to_owned(), + password: "ZWYiBeFHOJyYq0mcwAXjRpcuXIJJpzWlOXVdxwttrWg".to_owned(), + })], + }); + + let ingress_message = serde_json::from_str::(message).unwrap(); + + assert_eq!(ingress_message, expected); + } } diff --git a/rust/connlib/shared/src/messages.rs b/rust/connlib/shared/src/messages.rs index c92993d9d..7a30a0da2 100644 --- a/rust/connlib/shared/src/messages.rs +++ b/rust/connlib/shared/src/messages.rs @@ -413,6 +413,15 @@ pub struct Stun { pub addr: SocketAddr, } +/// A update to the presence of several relays. +#[derive(Debug, Deserialize, Clone, PartialEq, Eq)] +pub struct RelaysPresence { + /// These relays have disconnected from the portal. We need to stop using them. + pub disconnected_ids: Vec, + /// These relays are still online. We can/should use these. + pub connected: Vec, +} + #[cfg(test)] mod tests { use std::str::FromStr; diff --git a/rust/connlib/snownet/src/node.rs b/rust/connlib/snownet/src/node.rs index 58d52d9bc..26cffc85c 100644 --- a/rust/connlib/snownet/src/node.rs +++ b/rust/connlib/snownet/src/node.rs @@ -469,12 +469,32 @@ where self.buffered_transmits.pop_front() } - pub fn upsert_turn_servers( + pub fn update_relays( &mut self, - servers: &HashSet<(RId, SocketAddr, String, String, String)>, + to_remove: HashSet, + to_add: &HashSet<(RId, SocketAddr, String, String, String)>, now: Instant, ) { - for (id, server, username, password, realm) in servers { + // First, invalidate all candidates from relays that we should stop using. + for id in to_remove { + let Some(allocation) = self.allocations.remove(&id) else { + continue; + }; + + for (id, agent) in self.connections.agents_mut() { + let _span = info_span!("connection", %id).entered(); + + for candidate in allocation + .current_candidates() + .filter(|c| c.kind() == CandidateKind::Relayed) + { + agent.invalidate_candidate(&candidate); + } + } + } + + // Second, upsert all new relays. + for (id, server, username, password, realm) in to_add { let Ok(username) = Username::new(username.to_owned()) else { tracing::debug!(%username, "Invalid TURN username"); continue; @@ -494,7 +514,7 @@ where Allocation::new(*server, username, password.clone(), realm, now), ); - tracing::info!(address = %server, "Added new TURN server"); + tracing::info!(%id, address = %server, "Added new TURN server"); } } @@ -782,7 +802,7 @@ where }; self.upsert_stun_servers(&stun_servers, now); - self.upsert_turn_servers(&turn_servers, now); + self.update_relays(HashSet::default(), &turn_servers, now); let mut agent = IceAgent::new(); agent.set_controlling(true); @@ -884,7 +904,7 @@ where }; self.upsert_stun_servers(&stun_servers, now); - self.upsert_turn_servers(&turn_servers, now); + self.update_relays(HashSet::default(), &turn_servers, now); let mut agent = IceAgent::new(); agent.set_controlling(false); @@ -1723,6 +1743,7 @@ where self.agent .local_candidates() .iter() + .filter(|c| !c.discarded()) .find(|c| c.addr() == source) } diff --git a/rust/connlib/snownet/tests/lib.rs b/rust/connlib/snownet/tests/lib.rs index 3deb6039f..243bc76fa 100644 --- a/rust/connlib/snownet/tests/lib.rs +++ b/rust/connlib/snownet/tests/lib.rs @@ -57,10 +57,16 @@ fn smoke_relayed() { 1, TestRelay::new(IpAddr::V4(Ipv4Addr::LOCALHOST), debug_span!("Roger")), )]; - let mut alice = TestNode::new(debug_span!("Alice"), alice, "1.1.1.1:80") - .with_relays(&mut relays, clock.now); - let mut bob = - TestNode::new(debug_span!("Bob"), bob, "2.2.2.2:80").with_relays(&mut relays, clock.now); + let mut alice = TestNode::new(debug_span!("Alice"), alice, "1.1.1.1:80").with_relays( + HashSet::default(), + &mut relays, + clock.now, + ); + let mut bob = TestNode::new(debug_span!("Bob"), bob, "2.2.2.2:80").with_relays( + HashSet::default(), + &mut relays, + clock.now, + ); let firewall = Firewall::default() .with_block_rule(&alice, &bob) .with_block_rule(&bob, &alice); @@ -96,10 +102,16 @@ fn reconnect_discovers_new_interface() { 1, TestRelay::new(IpAddr::V4(Ipv4Addr::LOCALHOST), debug_span!("Roger")), )]; - let mut alice = TestNode::new(debug_span!("Alice"), alice, "1.1.1.1:80") - .with_relays(&mut relays, clock.now); - let mut bob = - TestNode::new(debug_span!("Bob"), bob, "2.2.2.2:80").with_relays(&mut relays, clock.now); + let mut alice = TestNode::new(debug_span!("Alice"), alice, "1.1.1.1:80").with_relays( + HashSet::default(), + &mut relays, + clock.now, + ); + let mut bob = TestNode::new(debug_span!("Bob"), bob, "2.2.2.2:80").with_relays( + HashSet::default(), + &mut relays, + clock.now, + ); handshake(&mut alice, &mut bob, &clock); @@ -139,6 +151,59 @@ fn reconnect_discovers_new_interface() { assert_eq!(bob.failed_connections().count(), 0); } +#[test] +fn migrate_connection_to_new_relay() { + let _guard = setup_tracing(); + let mut clock = Clock::new(); + + let (alice, bob) = alice_and_bob(); + + let mut relays = [( + 1, + TestRelay::new(IpAddr::V4(Ipv4Addr::LOCALHOST), debug_span!("Roger")), + )]; + let mut alice = TestNode::new(debug_span!("Alice"), alice, "1.1.1.1:80").with_relays( + HashSet::default(), + &mut relays, + clock.now, + ); + let mut bob = TestNode::new(debug_span!("Bob"), bob, "2.2.2.2:80").with_relays( + HashSet::default(), + &mut relays, + clock.now, + ); + let firewall = Firewall::default() + .with_block_rule(&alice, &bob) + .with_block_rule(&bob, &alice); + + handshake(&mut alice, &mut bob, &clock); + + loop { + if alice.is_connected_to(&bob) && bob.is_connected_to(&alice) { + break; + } + + progress(&mut alice, &mut bob, &mut relays, &firewall, &mut clock); + } + + // Swap out the relays. "Roger" is being removed (ID 1) and "Robert" is being added (ID 2). + let mut relays = [(2, TestRelay::new(ip("10.0.0.1"), debug_span!("Robert")))]; + alice = alice.with_relays(HashSet::from([1]), &mut relays, clock.now); + + // Make some progress. (the fact that we only need 5 clock ticks means we are no relying on timeouts here) + for _ in 0..5 { + progress(&mut alice, &mut bob, &mut relays, &firewall, &mut clock); + } + + alice.ping(ip("9.9.9.9"), ip("8.8.8.8"), &bob, clock.now); + progress(&mut alice, &mut bob, &mut relays, &firewall, &mut clock); + assert_eq!(bob.packets_from(ip("9.9.9.9")).count(), 1); + + bob.ping(ip("8.8.8.8"), ip("9.9.9.9"), &alice, clock.now); + progress(&mut alice, &mut bob, &mut relays, &firewall, &mut clock); + assert_eq!(alice.packets_from(ip("8.8.8.8")).count(), 1); +} + #[test] fn connection_times_out_after_20_seconds() { let (mut alice, _) = alice_and_bob(); @@ -695,6 +760,18 @@ impl EitherNode { EitherNode::Server(n) => n.reconnect(now), } } + + fn update_relays( + &mut self, + to_remove: HashSet, + to_add: &HashSet<(u64, SocketAddr, String, String, String)>, + now: Instant, + ) { + match self { + EitherNode::Server(s) => s.update_relays(to_remove, to_add, now), + EitherNode::Client(c) => c.update_relays(to_remove, to_add, now), + } + } } impl TestNode { @@ -713,7 +790,12 @@ impl TestNode { } } - fn with_relays(mut self, relays: &mut [(u64, TestRelay)], now: Instant) -> Self { + fn with_relays( + mut self, + to_remove: HashSet, + relays: &mut [(u64, TestRelay)], + now: Instant, + ) -> Self { let username = match self.node { EitherNode::Server(_) => "server", EitherNode::Client(_) => "client", @@ -734,10 +816,8 @@ impl TestNode { }) .collect::>(); - self.span.in_scope(|| match &mut self.node { - EitherNode::Server(s) => s.upsert_turn_servers(&turn_servers, now), - EitherNode::Client(c) => c.upsert_turn_servers(&turn_servers, now), - }); + self.span + .in_scope(|| self.node.update_relays(to_remove, &turn_servers, now)); self } diff --git a/rust/connlib/tunnel/src/client.rs b/rust/connlib/tunnel/src/client.rs index 87c6c35d6..d655c9473 100644 --- a/rust/connlib/tunnel/src/client.rs +++ b/rust/connlib/tunnel/src/client.rs @@ -73,9 +73,10 @@ where Ok(()) } - pub fn upsert_relays(&mut self, relays: Vec) { - self.role_state.upsert_relays( - turn(&relays, |addr| self.io.sockets_ref().can_handle(addr)), + pub fn update_relays(&mut self, to_remove: HashSet, to_add: Vec) { + self.role_state.update_relays( + to_remove, + turn(&to_add, |addr| self.io.sockets_ref().can_handle(addr)), Instant::now(), ) } @@ -1002,12 +1003,13 @@ impl ClientState { true } - fn upsert_relays( + fn update_relays( &mut self, - relays: HashSet<(RelayId, SocketAddr, String, String, String)>, + to_remove: HashSet, + to_add: HashSet<(RelayId, SocketAddr, String, String, String)>, now: Instant, ) { - self.node.upsert_turn_servers(&relays, now); + self.node.update_relays(to_remove, &to_add, now); } } diff --git a/rust/connlib/tunnel/src/gateway.rs b/rust/connlib/tunnel/src/gateway.rs index 004cc881e..9d5903c48 100644 --- a/rust/connlib/tunnel/src/gateway.rs +++ b/rust/connlib/tunnel/src/gateway.rs @@ -318,11 +318,12 @@ impl GatewayState { self.buffered_events.pop_front() } - pub(crate) fn upsert_relays( + pub(crate) fn update_relays( &mut self, - relays: HashSet<(RelayId, SocketAddr, String, String, String)>, + to_remove: HashSet, + to_add: HashSet<(RelayId, SocketAddr, String, String, String)>, now: Instant, ) { - self.node.upsert_turn_servers(&relays, now); + self.node.update_relays(to_remove, &to_add, now); } } diff --git a/rust/connlib/tunnel/src/lib.rs b/rust/connlib/tunnel/src/lib.rs index 7248471d1..8ba950464 100644 --- a/rust/connlib/tunnel/src/lib.rs +++ b/rust/connlib/tunnel/src/lib.rs @@ -5,7 +5,7 @@ use boringtun::x25519::StaticSecret; use connlib_shared::{ - messages::{ClientId, GatewayId, Relay, ResourceId, ReuseConnection}, + messages::{ClientId, GatewayId, Relay, RelayId, ResourceId, ReuseConnection}, Callbacks, Result, }; use io::Io; @@ -174,9 +174,10 @@ where }) } - pub fn upsert_relays(&mut self, relays: Vec) { - self.role_state.upsert_relays( - turn(&relays, |addr| self.io.sockets_ref().can_handle(addr)), + pub fn update_relays(&mut self, to_remove: HashSet, to_add: Vec) { + self.role_state.update_relays( + to_remove, + turn(&to_add, |addr| self.io.sockets_ref().can_handle(addr)), Instant::now(), ) } diff --git a/rust/gateway/src/eventloop.rs b/rust/gateway/src/eventloop.rs index 3083026d7..f5850f12e 100644 --- a/rust/gateway/src/eventloop.rs +++ b/rust/gateway/src/eventloop.rs @@ -5,6 +5,7 @@ use crate::messages::{ use crate::CallbackHandler; use anyhow::Result; use boringtun::x25519::PublicKey; +use connlib_shared::messages::RelaysPresence; use connlib_shared::{ messages::{GatewayResponse, ResourceAccepted}, Dname, @@ -16,6 +17,7 @@ use firezone_tunnel::GatewayTunnel; use futures_bounded::Timeout; use ip_network::IpNetwork; use phoenix_channel::PhoenixChannel; +use std::collections::HashSet; use std::convert::Infallible; use std::task::{Context, Poll}; use std::time::Duration; @@ -174,6 +176,16 @@ impl Eventloop { } => { self.tunnel.remove_access(&client_id, &resource_id); } + phoenix_channel::Event::InboundMessage { + msg: + IngressMessages::RelaysPresence(RelaysPresence { + disconnected_ids, + connected, + }), + .. + } => self + .tunnel + .update_relays(HashSet::from_iter(disconnected_ids), connected), phoenix_channel::Event::InboundMessage { msg: IngressMessages::Init(_), .. diff --git a/rust/gateway/src/main.rs b/rust/gateway/src/main.rs index 18922f798..04b631d8d 100644 --- a/rust/gateway/src/main.rs +++ b/rust/gateway/src/main.rs @@ -8,6 +8,7 @@ use firezone_cli_utils::{setup_global_subscriber, CommonArgs}; use firezone_tunnel::{GatewayTunnel, Sockets}; use futures::{future, TryFutureExt}; use secrecy::{Secret, SecretString}; +use std::collections::HashSet; use std::convert::Infallible; use std::path::Path; use std::pin::pin; @@ -110,7 +111,7 @@ async fn run(login: LoginUrl, private_key: StaticSecret) -> Result { tunnel .set_interface(&init.interface) .context("Failed to set interface")?; - tunnel.upsert_relays(init.relays); + tunnel.update_relays(HashSet::default(), init.relays); let mut eventloop = Eventloop::new(tunnel, portal); diff --git a/rust/gateway/src/messages.rs b/rust/gateway/src/messages.rs index dbfe4fba2..9b6a7e699 100644 --- a/rust/gateway/src/messages.rs +++ b/rust/gateway/src/messages.rs @@ -1,8 +1,8 @@ use chrono::{serde::ts_seconds_option, DateTime, Utc}; use connlib_shared::{ messages::{ - ClientId, ClientPayload, GatewayResponse, Interface, Peer, Relay, ResourceDescription, - ResourceId, + ClientId, ClientPayload, GatewayResponse, Interface, Peer, Relay, RelaysPresence, + ResourceDescription, ResourceId, }, Dname, }; @@ -74,6 +74,7 @@ pub enum IngressMessages { IceCandidates(ClientIceCandidates), InvalidateIceCandidates(ClientIceCandidates), Init(InitGateway), + RelaysPresence(RelaysPresence), } /// A client's ice candidate message. @@ -114,6 +115,7 @@ pub struct ConnectionReady { #[cfg(test)] mod test { use super::*; + use connlib_shared::messages::Turn; use phoenix_channel::InitMessage; use phoenix_channel::PhoenixMessage; @@ -327,4 +329,48 @@ mod test { let ingress_message = serde_json::from_str::>(message).unwrap(); assert_eq!(m, ingress_message); } + + #[test] + fn relays_presence() { + let message = r#" + { + "event": "relays_presence", + "ref": null, + "topic": "gateway", + "payload": { + "disconnected_ids": [ + "e95f9517-2152-4677-a16a-fbb2687050a3", + "b0724bd1-a8cc-4faf-88cd-f21159cfec47" + ], + "connected": [ + { + "id": "0a133356-7a9e-4b9a-b413-0d95a5720fd8", + "type": "turn", + "username": "1719367575:ZQHcVGkdnfgGmcP1", + "password": "ZWYiBeFHOJyYq0mcwAXjRpcuXIJJpzWlOXVdxwttrWg", + "addr": "172.28.0.101:3478", + "expires_at": 1719367575 + } + ] + } + } + "#; + let expected = IngressMessages::RelaysPresence(RelaysPresence { + disconnected_ids: vec![ + "e95f9517-2152-4677-a16a-fbb2687050a3".parse().unwrap(), + "b0724bd1-a8cc-4faf-88cd-f21159cfec47".parse().unwrap(), + ], + connected: vec![Relay::Turn(Turn { + id: "0a133356-7a9e-4b9a-b413-0d95a5720fd8".parse().unwrap(), + expires_at: DateTime::from_timestamp(1719367575, 0).unwrap(), + addr: "172.28.0.101:3478".parse().unwrap(), + username: "1719367575:ZQHcVGkdnfgGmcP1".to_owned(), + password: "ZWYiBeFHOJyYq0mcwAXjRpcuXIJJpzWlOXVdxwttrWg".to_owned(), + })], + }); + + let ingress_message = serde_json::from_str::(message).unwrap(); + + assert_eq!(ingress_message, expected); + } } diff --git a/scripts/tests/direct-curl-api-relay-down.sh b/scripts/tests/direct-curl-api-relay-down.sh index 2a590d832..47709acb7 100755 --- a/scripts/tests/direct-curl-api-relay-down.sh +++ b/scripts/tests/direct-curl-api-relay-down.sh @@ -4,6 +4,6 @@ source "./scripts/tests/lib.sh" client_curl_resource "172.20.0.100/get" -docker compose stop api relay # Stop portal & relay +docker compose stop api relay-1 relay-2 # Stop portal & relays client_curl_resource "172.20.0.100/get" diff --git a/scripts/tests/direct-dns-relay-down.sh b/scripts/tests/direct-dns-relay-down.sh index e736fec41..b3125bdc7 100755 --- a/scripts/tests/direct-dns-relay-down.sh +++ b/scripts/tests/direct-dns-relay-down.sh @@ -14,6 +14,6 @@ function run_test() { run_test -docker compose stop relay +docker compose stop relay-1 relay-2 run_test diff --git a/scripts/tests/lib.sh b/scripts/tests/lib.sh index 568089a88..991c99d48 100755 --- a/scripts/tests/lib.sh +++ b/scripts/tests/lib.sh @@ -10,8 +10,12 @@ function gateway() { docker compose exec -it gateway "$@" } -function relay() { - docker compose exec -it relay "$@" +function relay1() { + docker compose exec -it relay-1 "$@" +} + +function relay2() { + docker compose exec -it relay-2 "$@" } function install_iptables_drop_rules() { @@ -41,8 +45,8 @@ function client_nslookup() { } function assert_equals() { - local expected="$1" - local actual="$2" + local actual="$1" + local expected="$2" if [[ "$expected" != "$actual" ]]; then echo "Expected $expected but got $actual" diff --git a/scripts/tests/perf/relayed-tcp-client2server.sh b/scripts/tests/perf/relayed-tcp-client2server.sh index 8adcb2f7b..8d17911e9 100755 --- a/scripts/tests/perf/relayed-tcp-client2server.sh +++ b/scripts/tests/perf/relayed-tcp-client2server.sh @@ -9,6 +9,7 @@ docker compose exec --env RUST_LOG=info -it client /bin/sh -c 'iperf3 \ --client 172.20.0.110 \ --json' >>"${TEST_NAME}.json" -assert_process_state "relay" "S" +assert_process_state "relay-1" "S" +assert_process_state "relay-2" "S" assert_process_state "gateway" "S" assert_process_state "client" "S" diff --git a/scripts/tests/perf/relayed-tcp-server2client.sh b/scripts/tests/perf/relayed-tcp-server2client.sh index 0ba3e78d2..ef477ca09 100755 --- a/scripts/tests/perf/relayed-tcp-server2client.sh +++ b/scripts/tests/perf/relayed-tcp-server2client.sh @@ -10,6 +10,7 @@ docker compose exec --env RUST_LOG=info -it client /bin/sh -c 'iperf3 \ --client 172.20.0.110 \ --json' >>"${TEST_NAME}.json" -assert_process_state "relay" "S" +assert_process_state "relay-1" "S" +assert_process_state "relay-2" "S" assert_process_state "gateway" "S" assert_process_state "client" "S" diff --git a/scripts/tests/perf/relayed-udp-client2server.sh b/scripts/tests/perf/relayed-udp-client2server.sh index c453eedb6..4f23bb1d7 100755 --- a/scripts/tests/perf/relayed-udp-client2server.sh +++ b/scripts/tests/perf/relayed-udp-client2server.sh @@ -11,6 +11,7 @@ docker compose exec --env RUST_LOG=info -it client /bin/sh -c 'iperf3 \ --client 172.20.0.110 \ --json' >>"${TEST_NAME}.json" -assert_process_state "relay" "S" +assert_process_state "relay-1" "S" +assert_process_state "relay-2" "S" assert_process_state "gateway" "S" assert_process_state "client" "S" diff --git a/scripts/tests/perf/relayed-udp-server2client.sh b/scripts/tests/perf/relayed-udp-server2client.sh index 8afd962ba..dd1bd4ff7 100755 --- a/scripts/tests/perf/relayed-udp-server2client.sh +++ b/scripts/tests/perf/relayed-udp-server2client.sh @@ -12,6 +12,7 @@ docker compose exec --env RUST_LOG=info -it client /bin/sh -c 'iperf3 \ --client 172.20.0.110 \ --json' >>"${TEST_NAME}.json" -assert_process_state "relay" "S" +assert_process_state "relay-1" "S" +assert_process_state "relay-2" "S" assert_process_state "gateway" "S" assert_process_state "client" "S" diff --git a/scripts/tests/relay-graceful-shutdown.sh b/scripts/tests/relay-graceful-shutdown.sh index b06116547..76d7f72d1 100755 --- a/scripts/tests/relay-graceful-shutdown.sh +++ b/scripts/tests/relay-graceful-shutdown.sh @@ -7,7 +7,7 @@ install_iptables_drop_rules client_curl_resource "172.20.0.100/get" # Act: Send SIGTERM -docker compose kill relay --signal SIGTERM +docker compose kill relay-1 --signal SIGTERM sleep 2 # Closing websocket isn't instant. @@ -15,18 +15,18 @@ sleep 2 # Closing websocket isn't instant. client_curl_resource "172.20.0.100/get" # Assert: Websocket connection is cut -OPEN_SOCKETS=$(relay netstat -tn | grep "ESTABLISHED" | grep 8081 || true) # Portal listens on port 8081 +OPEN_SOCKETS=$(relay1 netstat -tn | grep "ESTABLISHED" | grep 8081 || true) # Portal listens on port 8081 test -z "$OPEN_SOCKETS" # Act: Send 2nd SIGTERM -docker compose kill relay --signal SIGTERM +docker compose kill relay-1 --signal SIGTERM sleep 5 # Wait for container to be fully exited # Seems to be necessary to return the correct state -docker compose ps relay --all +docker compose ps relay-1 --all sleep 1 # Assert: Container exited -container_state=$(docker compose ps relay --all --format json | jq --raw-output '.State') +container_state=$(docker compose ps relay-1 --all --format json | jq --raw-output '.State') assert_equals "$container_state" "exited" diff --git a/scripts/tests/relayed-curl-relay-restart.sh b/scripts/tests/relayed-curl-relay-restart.sh index 23ab61909..74aafc761 100755 --- a/scripts/tests/relayed-curl-relay-restart.sh +++ b/scripts/tests/relayed-curl-relay-restart.sh @@ -6,7 +6,8 @@ install_iptables_drop_rules client_curl_resource "172.20.0.100/get" -# Restart relay with new IP -PUBLIC_IP4_ADDR="172.28.0.102" docker compose up -d relay +# Restart relays with new IPs +RELAY_1_PUBLIC_IP4_ADDR="172.28.0.102" docker compose up -d relay-1 +RELAY_2_PUBLIC_IP4_ADDR="172.28.0.202" docker compose up -d relay-2 client_curl_resource "172.20.0.100/get" diff --git a/scripts/tests/relayed-dns-relay-restart.sh b/scripts/tests/relayed-dns-relay-restart.sh index d9478cc79..e48807dc0 100755 --- a/scripts/tests/relayed-dns-relay-restart.sh +++ b/scripts/tests/relayed-dns-relay-restart.sh @@ -16,7 +16,8 @@ install_iptables_drop_rules run_test -# Restart relay with new IP -PUBLIC_IP4_ADDR="172.28.0.102" docker compose up -d relay +# Restart relays with new IP +RELAY_1_PUBLIC_IP4_ADDR="172.28.0.102" docker compose up -d relay-1 +RELAY_2_PUBLIC_IP4_ADDR="172.28.0.202" docker compose up -d relay-2 run_test