diff --git a/rust/Cargo.lock b/rust/Cargo.lock index a4429569c..e095b678e 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -4683,6 +4683,7 @@ dependencies = [ name = "phoenix-channel" version = "0.1.0" dependencies = [ + "anyhow", "backoff", "base64 0.22.1", "firezone-logging", diff --git a/rust/connlib/clients/shared/src/eventloop.rs b/rust/connlib/clients/shared/src/eventloop.rs index e6a3d9ade..6e06c68f4 100644 --- a/rust/connlib/clients/shared/src/eventloop.rs +++ b/rust/connlib/clients/shared/src/eventloop.rs @@ -193,6 +193,11 @@ where phoenix_channel::Event::Closed => { unimplemented!("Client never actively closes the portal connection") } + phoenix_channel::Event::Hiccup { + backoff, + max_elapsed_time, + error, + } => tracing::debug!(?backoff, ?max_elapsed_time, "{error:#}"), } } diff --git a/rust/gateway/src/eventloop.rs b/rust/gateway/src/eventloop.rs index 0290abfbd..0d9c02afd 100644 --- a/rust/gateway/src/eventloop.rs +++ b/rust/gateway/src/eventloop.rs @@ -376,6 +376,11 @@ impl Eventloop { phoenix_channel::Event::SuccessResponse { res: (), .. } | phoenix_channel::Event::HeartbeatSent | phoenix_channel::Event::JoinedRoom { .. } => {} + phoenix_channel::Event::Hiccup { + backoff, + max_elapsed_time, + error, + } => tracing::debug!(?backoff, ?max_elapsed_time, "{error:#}"), } } diff --git a/rust/phoenix-channel/Cargo.toml b/rust/phoenix-channel/Cargo.toml index 67cde49fe..7d92aad5b 100644 --- a/rust/phoenix-channel/Cargo.toml +++ b/rust/phoenix-channel/Cargo.toml @@ -6,6 +6,7 @@ license = { workspace = true } # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +anyhow = { workspace = true } backoff = { workspace = true } base64 = { workspace = true } firezone-logging = { workspace = true } diff --git a/rust/phoenix-channel/src/lib.rs b/rust/phoenix-channel/src/lib.rs index 54ed3ee3e..b92a293be 100644 --- a/rust/phoenix-channel/src/lib.rs +++ b/rust/phoenix-channel/src/lib.rs @@ -77,11 +77,12 @@ impl State { fn connect( url: Url, addresses: Vec, + host: String, user_agent: String, socket_factory: Arc>, ) -> Self { Self::Connecting( - create_and_connect_websocket(url, addresses, user_agent, socket_factory).boxed(), + create_and_connect_websocket(url, addresses, host, user_agent, socket_factory).boxed(), ) } } @@ -89,17 +90,18 @@ impl State { async fn create_and_connect_websocket( url: Url, addresses: Vec, + host: String, user_agent: String, socket_factory: Arc>, ) -> Result>, InternalError> { - tracing::debug!(host = url.host().map(tracing::field::display), ?addresses, %user_agent, "Connecting to portal"); + tracing::debug!(%host, ?addresses, %user_agent, "Connecting to portal"); let duration = Duration::from_secs(5); let socket = tokio::time::timeout(duration, connect(addresses, &*socket_factory)) .await .map_err(|_| InternalError::Timeout { duration })??; - let (stream, _) = client_async_tls(make_request(url, user_agent)?, socket) + let (stream, _) = client_async_tls(make_request(url, host, user_agent), socket) .await .map_err(InternalError::WebSocket)?; @@ -125,10 +127,6 @@ async fn connect( } } - if errors.is_empty() { - return Err(InternalError::InvalidUrl); - } - Err(InternalError::SocketConnection(errors)) } @@ -162,8 +160,6 @@ enum InternalError { MissedHeartbeat, CloseMessage, StreamClosed, - InvalidUrl, - FailedToBuildRequest(tokio_tungstenite::tungstenite::http::Error), SocketConnection(Vec<(SocketAddr, std::io::Error)>), Timeout { duration: Duration }, } @@ -186,7 +182,6 @@ impl fmt::Display for InternalError { InternalError::MissedHeartbeat => write!(f, "portal did not respond to our heartbeat"), InternalError::CloseMessage => write!(f, "portal closed the websocket connection"), InternalError::StreamClosed => write!(f, "websocket stream was closed"), - InternalError::InvalidUrl => write!(f, "failed to resolve url"), InternalError::SocketConnection(errors) => { write!( f, @@ -200,9 +195,6 @@ impl fmt::Display for InternalError { InternalError::Timeout { duration, .. } => { write!(f, "operation timed out after {duration:?}") } - InternalError::FailedToBuildRequest(_) => { - write!(f, "failed to build request") - } } } } @@ -213,12 +205,10 @@ impl std::error::Error for InternalError { InternalError::WebSocket(tokio_tungstenite::tungstenite::Error::Http(_)) => None, InternalError::WebSocket(e) => Some(e), InternalError::Serde(e) => Some(e), - InternalError::FailedToBuildRequest(e) => Some(e), InternalError::SocketConnection(_) => None, InternalError::MissedHeartbeat => None, InternalError::CloseMessage => None, InternalError::StreamClosed => None, - InternalError::InvalidUrl => None, InternalError::Timeout { .. } => None, } } @@ -352,6 +342,7 @@ where self.state = State::connect( url.clone(), self.socket_addresses(), + self.host(), user_agent, self.socket_factory.clone(), ); @@ -422,6 +413,7 @@ where } Poll::Ready(Err(e)) => { let socket_addresses = self.socket_addresses(); + let host = self.host(); // If we don't have a backoff yet, this is the first error so create one. let reconnect_backoff = self @@ -442,20 +434,24 @@ where let user_agent = self.user_agent.clone(); let socket_factory = self.socket_factory.clone(); - tracing::debug!(?backoff, max_elapsed_time = ?reconnect_backoff.max_elapsed_time, "Reconnecting to portal on transient client error: {}", err_with_src(&e)); - self.state = State::Connecting(Box::pin(async move { tokio::time::sleep(backoff).await; create_and_connect_websocket( secret_url, socket_addresses, + host, user_agent, socket_factory, ) .await })); - continue; + return Poll::Ready(Ok(Event::Hiccup { + backoff, + max_elapsed_time: reconnect_backoff.max_elapsed_time, + error: anyhow::Error::new(e) + .context("Reconnecting to portal on transient error"), + })); } Poll::Pending => { // Save a waker in case we want to reset the `Connecting` state while we are waiting. @@ -666,6 +662,14 @@ where .map(|ip| SocketAddr::new(*ip, port)) .collect() } + + fn host(&self) -> String { + self.url_prototype + .expose_secret() + .host_and_port() + .0 + .to_owned() + } } #[derive(Debug)] @@ -690,6 +694,11 @@ pub enum Event { topic: String, msg: TInboundMsg, }, + Hiccup { + backoff: Duration, + max_elapsed_time: Option, + error: anyhow::Error, + }, /// The connection was closed successfully. Closed, } @@ -808,17 +817,14 @@ impl PhoenixMessage { } // This is basically the same as tungstenite does but we add some new headers (namely user-agent) -fn make_request(url: Url, user_agent: String) -> Result { +fn make_request(url: Url, host: String, user_agent: String) -> Request { let mut r = [0u8; 16]; OsRng.fill_bytes(&mut r); let key = base64::engine::general_purpose::STANDARD.encode(r); - let request = Request::builder() + Request::builder() .method("GET") - .header( - "Host", - url.host().ok_or(InternalError::InvalidUrl)?.to_string(), - ) + .header("Host", host) .header("Connection", "Upgrade") .header("Upgrade", "websocket") .header("Sec-WebSocket-Version", "13") @@ -826,9 +832,7 @@ fn make_request(url: Url, user_agent: String) -> Result .header("User-Agent", user_agent) .uri(url.to_string()) .body(()) - .map_err(InternalError::FailedToBuildRequest)?; - - Ok(request) + .expect("should always be able to build a request if we only pass strings to it") } #[derive(Debug, Deserialize, Serialize, Clone)] diff --git a/rust/relay/src/main.rs b/rust/relay/src/main.rs index 49d848f1e..321c3b254 100644 --- a/rust/relay/src/main.rs +++ b/rust/relay/src/main.rs @@ -618,6 +618,11 @@ where Event::Closed => { self.channel = None; } + Event::Hiccup { + backoff, + max_elapsed_time, + error, + } => tracing::warn!(?backoff, ?max_elapsed_time, "{error:#}"), } } }