fix(phoenix-channel): queue join message before others (#4242)

This was discovered as part of
https://github.com/firezone/firezone/pull/4213. When we reconnect to the
portal, we first need to join the correct room before sending any
messages to it. For example, as a client, we need to join the `client`
room before sending messages in it.

This implementation is meant to be a quick fix. The "proper" solution
would be to keep track of which rooms we have joined and reset that upon
reconnect. Introducing such a state machine is a much larger refactoring
that is likely not going to make much of a difference for now because we
only join a fixed number of rooms and that will usually succeed.
This commit is contained in:
Thomas Eizinger
2024-03-21 12:17:59 +10:00
committed by GitHub
parent db62e7bacc
commit 45d31e0b62

View File

@@ -243,14 +243,18 @@ where
///
/// If successful, a [`Event::JoinedRoom`] event will be emitted.
pub fn join(&mut self, topic: impl Into<String>, payload: impl Serialize) {
let request_id = self.send_message(topic, EgressControlMessage::PhxJoin(payload));
let (request_id, msg) = self.make_message(topic, EgressControlMessage::PhxJoin(payload));
self.pending_messages.push_front(msg); // Must send the join message before all others.
self.pending_join_requests.insert(request_id);
}
/// Send a message to a topic.
pub fn send(&mut self, topic: impl Into<String>, message: impl Serialize) -> OutboundRequestId {
self.send_message(topic, message)
let (id, msg) = self.make_message(topic, message);
self.pending_messages.push_back(msg);
id
}
/// Reconnects to the portal.
@@ -445,7 +449,8 @@ where
// Priority 3: Handle heartbeats.
match self.heartbeat.poll(cx) {
Poll::Ready(Ok(msg)) => {
let id = self.send_message("phoenix", msg);
let (id, msg) = self.make_message("phoenix", msg);
self.pending_messages.push_back(msg);
self.heartbeat.set_id(id);
return Poll::Ready(Ok(Event::HeartbeatSent));
@@ -480,11 +485,11 @@ where
self.state = State::Connecting(future::ready(Err(e)).boxed())
}
fn send_message(
fn make_message(
&mut self,
topic: impl Into<String>,
payload: impl Serialize,
) -> OutboundRequestId {
) -> (OutboundRequestId, String) {
let request_id = self.fetch_add_request_id();
// We don't care about the reply type when serializing
@@ -495,9 +500,7 @@ where
))
.expect("we should always be able to serialize a join topic message");
self.pending_messages.push_back(msg);
request_id
(request_id, msg)
}
fn fetch_add_request_id(&mut self) -> OutboundRequestId {