fix(phoenix-channel): report connection hiccups to upper layer (#8203)

The WebSocket connection to the portal from within the Clients, Gateways
and Relays may be temporarily interrupted by IO errors. In such cases we
simply reconnect to it. This isn't as much of a problem for Clients and
Gateways. For Relays however, a disconnect can be disruptive for
customers because the portal will send `relays_presence` events to all
Clients and Gateways. Any relayed connection will therefore be
interrupted. See #8177.

Relays run on our own infrastructure and we want to be notified if their
connection flaps.

In order to differentiate between these scenarios, we remove the logging
from within `phoenix-channel` and report these connection hiccups one
layer up. This allows Clients and Gateways to log them on DEBUG whereas
the Relay can log them on WARN.

Related: #8177 
Related: #7004
This commit is contained in:
Thomas Eizinger
2025-02-20 11:54:43 +11:00
committed by GitHub
parent cad84922db
commit 81da120c17
6 changed files with 48 additions and 27 deletions

1
rust/Cargo.lock generated
View File

@@ -4683,6 +4683,7 @@ dependencies = [
name = "phoenix-channel"
version = "0.1.0"
dependencies = [
"anyhow",
"backoff",
"base64 0.22.1",
"firezone-logging",

View File

@@ -193,6 +193,11 @@ where
phoenix_channel::Event::Closed => {
unimplemented!("Client never actively closes the portal connection")
}
phoenix_channel::Event::Hiccup {
backoff,
max_elapsed_time,
error,
} => tracing::debug!(?backoff, ?max_elapsed_time, "{error:#}"),
}
}

View File

@@ -376,6 +376,11 @@ impl Eventloop {
phoenix_channel::Event::SuccessResponse { res: (), .. }
| phoenix_channel::Event::HeartbeatSent
| phoenix_channel::Event::JoinedRoom { .. } => {}
phoenix_channel::Event::Hiccup {
backoff,
max_elapsed_time,
error,
} => tracing::debug!(?backoff, ?max_elapsed_time, "{error:#}"),
}
}

View File

@@ -6,6 +6,7 @@ license = { workspace = true }
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = { workspace = true }
backoff = { workspace = true }
base64 = { workspace = true }
firezone-logging = { workspace = true }

View File

@@ -77,11 +77,12 @@ impl State {
fn connect(
url: Url,
addresses: Vec<SocketAddr>,
host: String,
user_agent: String,
socket_factory: Arc<dyn SocketFactory<TcpSocket>>,
) -> Self {
Self::Connecting(
create_and_connect_websocket(url, addresses, user_agent, socket_factory).boxed(),
create_and_connect_websocket(url, addresses, host, user_agent, socket_factory).boxed(),
)
}
}
@@ -89,17 +90,18 @@ impl State {
async fn create_and_connect_websocket(
url: Url,
addresses: Vec<SocketAddr>,
host: String,
user_agent: String,
socket_factory: Arc<dyn SocketFactory<TcpSocket>>,
) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>, InternalError> {
tracing::debug!(host = url.host().map(tracing::field::display), ?addresses, %user_agent, "Connecting to portal");
tracing::debug!(%host, ?addresses, %user_agent, "Connecting to portal");
let duration = Duration::from_secs(5);
let socket = tokio::time::timeout(duration, connect(addresses, &*socket_factory))
.await
.map_err(|_| InternalError::Timeout { duration })??;
let (stream, _) = client_async_tls(make_request(url, user_agent)?, socket)
let (stream, _) = client_async_tls(make_request(url, host, user_agent), socket)
.await
.map_err(InternalError::WebSocket)?;
@@ -125,10 +127,6 @@ async fn connect(
}
}
if errors.is_empty() {
return Err(InternalError::InvalidUrl);
}
Err(InternalError::SocketConnection(errors))
}
@@ -162,8 +160,6 @@ enum InternalError {
MissedHeartbeat,
CloseMessage,
StreamClosed,
InvalidUrl,
FailedToBuildRequest(tokio_tungstenite::tungstenite::http::Error),
SocketConnection(Vec<(SocketAddr, std::io::Error)>),
Timeout { duration: Duration },
}
@@ -186,7 +182,6 @@ impl fmt::Display for InternalError {
InternalError::MissedHeartbeat => write!(f, "portal did not respond to our heartbeat"),
InternalError::CloseMessage => write!(f, "portal closed the websocket connection"),
InternalError::StreamClosed => write!(f, "websocket stream was closed"),
InternalError::InvalidUrl => write!(f, "failed to resolve url"),
InternalError::SocketConnection(errors) => {
write!(
f,
@@ -200,9 +195,6 @@ impl fmt::Display for InternalError {
InternalError::Timeout { duration, .. } => {
write!(f, "operation timed out after {duration:?}")
}
InternalError::FailedToBuildRequest(_) => {
write!(f, "failed to build request")
}
}
}
}
@@ -213,12 +205,10 @@ impl std::error::Error for InternalError {
InternalError::WebSocket(tokio_tungstenite::tungstenite::Error::Http(_)) => None,
InternalError::WebSocket(e) => Some(e),
InternalError::Serde(e) => Some(e),
InternalError::FailedToBuildRequest(e) => Some(e),
InternalError::SocketConnection(_) => None,
InternalError::MissedHeartbeat => None,
InternalError::CloseMessage => None,
InternalError::StreamClosed => None,
InternalError::InvalidUrl => None,
InternalError::Timeout { .. } => None,
}
}
@@ -352,6 +342,7 @@ where
self.state = State::connect(
url.clone(),
self.socket_addresses(),
self.host(),
user_agent,
self.socket_factory.clone(),
);
@@ -422,6 +413,7 @@ where
}
Poll::Ready(Err(e)) => {
let socket_addresses = self.socket_addresses();
let host = self.host();
// If we don't have a backoff yet, this is the first error so create one.
let reconnect_backoff = self
@@ -442,20 +434,24 @@ where
let user_agent = self.user_agent.clone();
let socket_factory = self.socket_factory.clone();
tracing::debug!(?backoff, max_elapsed_time = ?reconnect_backoff.max_elapsed_time, "Reconnecting to portal on transient client error: {}", err_with_src(&e));
self.state = State::Connecting(Box::pin(async move {
tokio::time::sleep(backoff).await;
create_and_connect_websocket(
secret_url,
socket_addresses,
host,
user_agent,
socket_factory,
)
.await
}));
continue;
return Poll::Ready(Ok(Event::Hiccup {
backoff,
max_elapsed_time: reconnect_backoff.max_elapsed_time,
error: anyhow::Error::new(e)
.context("Reconnecting to portal on transient error"),
}));
}
Poll::Pending => {
// Save a waker in case we want to reset the `Connecting` state while we are waiting.
@@ -666,6 +662,14 @@ where
.map(|ip| SocketAddr::new(*ip, port))
.collect()
}
fn host(&self) -> String {
self.url_prototype
.expose_secret()
.host_and_port()
.0
.to_owned()
}
}
#[derive(Debug)]
@@ -690,6 +694,11 @@ pub enum Event<TInboundMsg, TOutboundRes> {
topic: String,
msg: TInboundMsg,
},
Hiccup {
backoff: Duration,
max_elapsed_time: Option<Duration>,
error: anyhow::Error,
},
/// The connection was closed successfully.
Closed,
}
@@ -808,17 +817,14 @@ impl<T, R> PhoenixMessage<T, R> {
}
// This is basically the same as tungstenite does but we add some new headers (namely user-agent)
fn make_request(url: Url, user_agent: String) -> Result<Request, InternalError> {
fn make_request(url: Url, host: String, user_agent: String) -> Request {
let mut r = [0u8; 16];
OsRng.fill_bytes(&mut r);
let key = base64::engine::general_purpose::STANDARD.encode(r);
let request = Request::builder()
Request::builder()
.method("GET")
.header(
"Host",
url.host().ok_or(InternalError::InvalidUrl)?.to_string(),
)
.header("Host", host)
.header("Connection", "Upgrade")
.header("Upgrade", "websocket")
.header("Sec-WebSocket-Version", "13")
@@ -826,9 +832,7 @@ fn make_request(url: Url, user_agent: String) -> Result<Request, InternalError>
.header("User-Agent", user_agent)
.uri(url.to_string())
.body(())
.map_err(InternalError::FailedToBuildRequest)?;
Ok(request)
.expect("should always be able to build a request if we only pass strings to it")
}
#[derive(Debug, Deserialize, Serialize, Clone)]

View File

@@ -618,6 +618,11 @@ where
Event::Closed => {
self.channel = None;
}
Event::Hiccup {
backoff,
max_elapsed_time,
error,
} => tracing::warn!(?backoff, ?max_elapsed_time, "{error:#}"),
}
}
}