From 81da120c170fb46c4edbaae9268f301bc75881e6 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 20 Feb 2025 11:54:43 +1100 Subject: [PATCH] fix(phoenix-channel): report connection hiccups to upper layer (#8203) The WebSocket connection to the portal from within the Clients, Gateways and Relays may be temporarily interrupted by IO errors. In such cases we simply reconnect to it. This isn't as much of a problem for Clients and Gateways. For Relays however, a disconnect can be disruptive for customers because the portal will send `relays_presence` events to all Clients and Gateways. Any relayed connection will therefore be interrupted. See #8177. Relays run on our own infrastructure and we want to be notified if their connection flaps. In order to differentiate between these scenarios, we remove the logging from within `phoenix-channel` and report these connection hiccups one layer up. This allows Clients and Gateways to log them on DEBUG whereas the Relay can log them on WARN. Related: #8177 Related: #7004 --- rust/Cargo.lock | 1 + rust/connlib/clients/shared/src/eventloop.rs | 5 ++ rust/gateway/src/eventloop.rs | 5 ++ rust/phoenix-channel/Cargo.toml | 1 + rust/phoenix-channel/src/lib.rs | 58 +++++++++++--------- rust/relay/src/main.rs | 5 ++ 6 files changed, 48 insertions(+), 27 deletions(-) diff --git a/rust/Cargo.lock b/rust/Cargo.lock index a4429569c..e095b678e 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -4683,6 +4683,7 @@ dependencies = [ name = "phoenix-channel" version = "0.1.0" dependencies = [ + "anyhow", "backoff", "base64 0.22.1", "firezone-logging", diff --git a/rust/connlib/clients/shared/src/eventloop.rs b/rust/connlib/clients/shared/src/eventloop.rs index e6a3d9ade..6e06c68f4 100644 --- a/rust/connlib/clients/shared/src/eventloop.rs +++ b/rust/connlib/clients/shared/src/eventloop.rs @@ -193,6 +193,11 @@ where phoenix_channel::Event::Closed => { unimplemented!("Client never actively closes the portal connection") } + phoenix_channel::Event::Hiccup { + backoff, + max_elapsed_time, + error, + } => tracing::debug!(?backoff, ?max_elapsed_time, "{error:#}"), } } diff --git a/rust/gateway/src/eventloop.rs b/rust/gateway/src/eventloop.rs index 0290abfbd..0d9c02afd 100644 --- a/rust/gateway/src/eventloop.rs +++ b/rust/gateway/src/eventloop.rs @@ -376,6 +376,11 @@ impl Eventloop { phoenix_channel::Event::SuccessResponse { res: (), .. } | phoenix_channel::Event::HeartbeatSent | phoenix_channel::Event::JoinedRoom { .. } => {} + phoenix_channel::Event::Hiccup { + backoff, + max_elapsed_time, + error, + } => tracing::debug!(?backoff, ?max_elapsed_time, "{error:#}"), } } diff --git a/rust/phoenix-channel/Cargo.toml b/rust/phoenix-channel/Cargo.toml index 67cde49fe..7d92aad5b 100644 --- a/rust/phoenix-channel/Cargo.toml +++ b/rust/phoenix-channel/Cargo.toml @@ -6,6 +6,7 @@ license = { workspace = true } # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +anyhow = { workspace = true } backoff = { workspace = true } base64 = { workspace = true } firezone-logging = { workspace = true } diff --git a/rust/phoenix-channel/src/lib.rs b/rust/phoenix-channel/src/lib.rs index 54ed3ee3e..b92a293be 100644 --- a/rust/phoenix-channel/src/lib.rs +++ b/rust/phoenix-channel/src/lib.rs @@ -77,11 +77,12 @@ impl State { fn connect( url: Url, addresses: Vec, + host: String, user_agent: String, socket_factory: Arc>, ) -> Self { Self::Connecting( - create_and_connect_websocket(url, addresses, user_agent, socket_factory).boxed(), + create_and_connect_websocket(url, addresses, host, user_agent, socket_factory).boxed(), ) } } @@ -89,17 +90,18 @@ impl State { async fn create_and_connect_websocket( url: Url, addresses: Vec, + host: String, user_agent: String, socket_factory: Arc>, ) -> Result>, InternalError> { - tracing::debug!(host = url.host().map(tracing::field::display), ?addresses, %user_agent, "Connecting to portal"); + tracing::debug!(%host, ?addresses, %user_agent, "Connecting to portal"); let duration = Duration::from_secs(5); let socket = tokio::time::timeout(duration, connect(addresses, &*socket_factory)) .await .map_err(|_| InternalError::Timeout { duration })??; - let (stream, _) = client_async_tls(make_request(url, user_agent)?, socket) + let (stream, _) = client_async_tls(make_request(url, host, user_agent), socket) .await .map_err(InternalError::WebSocket)?; @@ -125,10 +127,6 @@ async fn connect( } } - if errors.is_empty() { - return Err(InternalError::InvalidUrl); - } - Err(InternalError::SocketConnection(errors)) } @@ -162,8 +160,6 @@ enum InternalError { MissedHeartbeat, CloseMessage, StreamClosed, - InvalidUrl, - FailedToBuildRequest(tokio_tungstenite::tungstenite::http::Error), SocketConnection(Vec<(SocketAddr, std::io::Error)>), Timeout { duration: Duration }, } @@ -186,7 +182,6 @@ impl fmt::Display for InternalError { InternalError::MissedHeartbeat => write!(f, "portal did not respond to our heartbeat"), InternalError::CloseMessage => write!(f, "portal closed the websocket connection"), InternalError::StreamClosed => write!(f, "websocket stream was closed"), - InternalError::InvalidUrl => write!(f, "failed to resolve url"), InternalError::SocketConnection(errors) => { write!( f, @@ -200,9 +195,6 @@ impl fmt::Display for InternalError { InternalError::Timeout { duration, .. } => { write!(f, "operation timed out after {duration:?}") } - InternalError::FailedToBuildRequest(_) => { - write!(f, "failed to build request") - } } } } @@ -213,12 +205,10 @@ impl std::error::Error for InternalError { InternalError::WebSocket(tokio_tungstenite::tungstenite::Error::Http(_)) => None, InternalError::WebSocket(e) => Some(e), InternalError::Serde(e) => Some(e), - InternalError::FailedToBuildRequest(e) => Some(e), InternalError::SocketConnection(_) => None, InternalError::MissedHeartbeat => None, InternalError::CloseMessage => None, InternalError::StreamClosed => None, - InternalError::InvalidUrl => None, InternalError::Timeout { .. } => None, } } @@ -352,6 +342,7 @@ where self.state = State::connect( url.clone(), self.socket_addresses(), + self.host(), user_agent, self.socket_factory.clone(), ); @@ -422,6 +413,7 @@ where } Poll::Ready(Err(e)) => { let socket_addresses = self.socket_addresses(); + let host = self.host(); // If we don't have a backoff yet, this is the first error so create one. let reconnect_backoff = self @@ -442,20 +434,24 @@ where let user_agent = self.user_agent.clone(); let socket_factory = self.socket_factory.clone(); - tracing::debug!(?backoff, max_elapsed_time = ?reconnect_backoff.max_elapsed_time, "Reconnecting to portal on transient client error: {}", err_with_src(&e)); - self.state = State::Connecting(Box::pin(async move { tokio::time::sleep(backoff).await; create_and_connect_websocket( secret_url, socket_addresses, + host, user_agent, socket_factory, ) .await })); - continue; + return Poll::Ready(Ok(Event::Hiccup { + backoff, + max_elapsed_time: reconnect_backoff.max_elapsed_time, + error: anyhow::Error::new(e) + .context("Reconnecting to portal on transient error"), + })); } Poll::Pending => { // Save a waker in case we want to reset the `Connecting` state while we are waiting. @@ -666,6 +662,14 @@ where .map(|ip| SocketAddr::new(*ip, port)) .collect() } + + fn host(&self) -> String { + self.url_prototype + .expose_secret() + .host_and_port() + .0 + .to_owned() + } } #[derive(Debug)] @@ -690,6 +694,11 @@ pub enum Event { topic: String, msg: TInboundMsg, }, + Hiccup { + backoff: Duration, + max_elapsed_time: Option, + error: anyhow::Error, + }, /// The connection was closed successfully. Closed, } @@ -808,17 +817,14 @@ impl PhoenixMessage { } // This is basically the same as tungstenite does but we add some new headers (namely user-agent) -fn make_request(url: Url, user_agent: String) -> Result { +fn make_request(url: Url, host: String, user_agent: String) -> Request { let mut r = [0u8; 16]; OsRng.fill_bytes(&mut r); let key = base64::engine::general_purpose::STANDARD.encode(r); - let request = Request::builder() + Request::builder() .method("GET") - .header( - "Host", - url.host().ok_or(InternalError::InvalidUrl)?.to_string(), - ) + .header("Host", host) .header("Connection", "Upgrade") .header("Upgrade", "websocket") .header("Sec-WebSocket-Version", "13") @@ -826,9 +832,7 @@ fn make_request(url: Url, user_agent: String) -> Result .header("User-Agent", user_agent) .uri(url.to_string()) .body(()) - .map_err(InternalError::FailedToBuildRequest)?; - - Ok(request) + .expect("should always be able to build a request if we only pass strings to it") } #[derive(Debug, Deserialize, Serialize, Clone)] diff --git a/rust/relay/src/main.rs b/rust/relay/src/main.rs index 49d848f1e..321c3b254 100644 --- a/rust/relay/src/main.rs +++ b/rust/relay/src/main.rs @@ -618,6 +618,11 @@ where Event::Closed => { self.channel = None; } + Event::Hiccup { + backoff, + max_elapsed_time, + error, + } => tracing::warn!(?backoff, ?max_elapsed_time, "{error:#}"), } } }