fix(phoenix-channel): lazily create backoff timer (#7414)

Our `phoenix-channel` component is responsible for maintaining a
WebSocket connection to the portal. In case that connection fails, we
want to reconnect to it using an exponential backoff, eventually giving
up after a certain amount of time.

Unfortunately, the code we have today doesn't quite do that. An
`ExponentialBackoff` has a setting for the `max_elapsed_time`.
Regardless of how many and how often we retry something, we won't ever
wait longer than this amount of time. For the Relay, this is set to
15min. For other components its indefinite (Gateway, headless-client),
or very long (30 days for Android, 1 day for Apple).

The point in time from which this duration is counted is when the
`ExponentialBackoff` is **constructed** which translates to when we
**first** connected to the portal. As a result, our backoff would
immediately fail on the first error if it has been longer than
`max_elapsed_time` since we first connected. For most components, this
codepath is not relevant because the `max_elapsed_time` is so long. For
the Relay however, that is only 15 minutes so chances are, the Relay
would immediately fail (and get rebooted) on the first connection error
with the portal.

To fix this, we now lazily create the `ExponentialBackoff` on the first
error.

This bug has some interesting consequences: When a relay reboots, it
looses all its state, i.e. allocations, channel bindings, available
nonces etc, stamp-secret. Thus, all credentials and state that got
distributed to Clients and Gateways get invalidated, causing disconnects
from the Relay. We have observed these alerts in Sentry for a while and
couldn't explain them. Most likely, this is the root cause for those
because whilst a Relay disconnects, the portal also cannot detect its
presence and pro-actively inform Clients and Gateways to no longer use
this Relay.
This commit is contained in:
Thomas Eizinger
2024-11-29 20:19:11 +00:00
committed by GitHub
parent c6e7e6192e
commit 932f6791fb
7 changed files with 53 additions and 32 deletions

View File

@@ -366,9 +366,11 @@ fn connect(
get_user_agent(Some(os_version), env!("CARGO_PKG_VERSION")),
"client",
(),
ExponentialBackoffBuilder::default()
.with_max_elapsed_time(Some(MAX_PARTITION_TIME))
.build(),
|| {
ExponentialBackoffBuilder::default()
.with_max_elapsed_time(Some(MAX_PARTITION_TIME))
.build()
},
tcp_socket_factory,
)?;
let session = Session::connect(

View File

@@ -242,9 +242,11 @@ impl WrappedSession {
get_user_agent(os_version_override, env!("CARGO_PKG_VERSION")),
"client",
(),
ExponentialBackoffBuilder::default()
.with_max_elapsed_time(Some(MAX_PARTITION_TIME))
.build(),
|| {
ExponentialBackoffBuilder::default()
.with_max_elapsed_time(Some(MAX_PARTITION_TIME))
.build()
},
Arc::new(socket_factory::tcp),
)?;
let session = Session::connect(

View File

@@ -129,9 +129,11 @@ async fn run(login: LoginUrl<PublicKeyParam>) -> Result<Infallible> {
get_user_agent(None, env!("CARGO_PKG_VERSION")),
PHOENIX_TOPIC,
(),
ExponentialBackoffBuilder::default()
.with_max_elapsed_time(None)
.build(),
|| {
ExponentialBackoffBuilder::default()
.with_max_elapsed_time(None)
.build()
},
Arc::new(tcp_socket_factory),
)?;

View File

@@ -585,9 +585,11 @@ impl<'a> Handler<'a> {
get_user_agent(None, "1.3.14"),
"client",
(),
ExponentialBackoffBuilder::default()
.with_max_elapsed_time(Some(Duration::from_secs(60 * 60 * 24 * 30)))
.build(),
|| {
ExponentialBackoffBuilder::default()
.with_max_elapsed_time(Some(Duration::from_secs(60 * 60 * 24 * 30)))
.build()
},
Arc::new(tcp_socket_factory),
)?; // Turn this `io::Error` directly into an `Error` so we can distinguish it from others in the GUI client.

View File

@@ -218,9 +218,11 @@ fn main() -> Result<()> {
get_user_agent(None, env!("CARGO_PKG_VERSION")),
"client",
(),
ExponentialBackoffBuilder::default()
.with_max_elapsed_time(max_partition_time)
.build(),
move || {
ExponentialBackoffBuilder::default()
.with_max_elapsed_time(max_partition_time)
.build()
},
Arc::new(tcp_socket_factory),
)?;
let session = Session::connect(

View File

@@ -54,7 +54,8 @@ pub struct PhoenixChannel<TInitReq, TInboundMsg, TOutboundRes, TFinish> {
url_prototype: Secret<LoginUrl<TFinish>>,
last_url: Option<Url>,
user_agent: String,
reconnect_backoff: ExponentialBackoff,
make_reconnect_backoff: Box<dyn Fn() -> ExponentialBackoff + Send>,
reconnect_backoff: Option<ExponentialBackoff>,
resolved_addresses: Vec<IpAddr>,
@@ -135,8 +136,8 @@ pub enum Error {
Client(StatusCode),
#[error("token expired")]
TokenExpired,
#[error("max retries reached")]
MaxRetriesReached,
#[error("max retries reached: {final_error}")]
MaxRetriesReached { final_error: String },
#[error("login failed: {0}")]
LoginFailed(ErrorReply),
}
@@ -146,7 +147,7 @@ impl Error {
match self {
Error::Client(s) => s == &StatusCode::UNAUTHORIZED || s == &StatusCode::FORBIDDEN,
Error::TokenExpired => true,
Error::MaxRetriesReached => false,
Error::MaxRetriesReached { .. } => false,
Error::LoginFailed(_) => false,
}
}
@@ -259,7 +260,7 @@ where
user_agent: String,
login: &'static str,
init_req: TInitReq,
reconnect_backoff: ExponentialBackoff,
make_reconnect_backoff: impl Fn() -> ExponentialBackoff + Send + 'static,
socket_factory: Arc<dyn SocketFactory<TcpSocket>>,
) -> io::Result<Self> {
let next_request_id = Arc::new(AtomicU64::new(0));
@@ -276,7 +277,8 @@ where
.collect();
Ok(Self {
reconnect_backoff,
make_reconnect_backoff: Box::new(make_reconnect_backoff),
reconnect_backoff: None,
url_prototype: url,
user_agent,
state: State::Closed,
@@ -332,7 +334,7 @@ where
}
// 1. Reset the backoff.
self.reconnect_backoff.reset();
self.reconnect_backoff = None;
// 2. Set state to `Connecting` without a timer.
let user_agent = self.user_agent.clone();
@@ -391,7 +393,7 @@ where
State::Connected(stream) => stream,
State::Connecting(future) => match future.poll_unpin(cx) {
Poll::Ready(Ok(stream)) => {
self.reconnect_backoff.reset();
self.reconnect_backoff = None;
self.heartbeat.reset();
self.state = State::Connected(stream);
@@ -408,10 +410,18 @@ where
return Poll::Ready(Err(Error::Client(r.status())));
}
Poll::Ready(Err(e)) => {
let Some(backoff) = self.reconnect_backoff.next_backoff() else {
tracing::warn!("Reconnect backoff expired");
return Poll::Ready(Err(Error::MaxRetriesReached));
};
let socket_addresses = self.socket_addresses();
// If we don't have a backoff yet, this is the first error so create one.
let reconnect_backoff = self
.reconnect_backoff
.get_or_insert_with(|| (self.make_reconnect_backoff)());
let backoff = reconnect_backoff.next_backoff().ok_or_else(|| {
Error::MaxRetriesReached {
final_error: err_with_src(&e).to_string(),
}
})?;
let secret_url = self
.last_url
@@ -420,9 +430,8 @@ where
.clone();
let user_agent = self.user_agent.clone();
let socket_factory = self.socket_factory.clone();
let socket_addresses = self.socket_addresses();
tracing::debug!(?backoff, max_elapsed_time = ?self.reconnect_backoff.max_elapsed_time, "Reconnecting to portal on transient client error: {}", err_with_src(&e));
tracing::debug!(?backoff, max_elapsed_time = ?reconnect_backoff.max_elapsed_time, "Reconnecting to portal on transient client error: {}", err_with_src(&e));
self.state = State::Connecting(Box::pin(async move {
tokio::time::sleep(backoff).await;

View File

@@ -185,9 +185,11 @@ async fn try_main(args: Args) -> Result<()> {
JoinMessage {
stamp_secret: server.auth_secret().expose_secret().to_string(),
},
ExponentialBackoffBuilder::default()
.with_max_elapsed_time(Some(MAX_PARTITION_TIME))
.build(),
|| {
ExponentialBackoffBuilder::default()
.with_max_elapsed_time(Some(MAX_PARTITION_TIME))
.build()
},
Arc::new(socket_factory::tcp),
)?;
channel.connect(NoParams);