From 45d31e0b6255a0a696520ce12044f242ba4f5550 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 21 Mar 2024 12:17:59 +1000 Subject: [PATCH] fix(phoenix-channel): queue `join` message before others (#4242) This was discovered as part of https://github.com/firezone/firezone/pull/4213. When we reconnect to the portal, we first need to join the correct room before sending any messages to it. For example, as a client, we need to join the `client` room before sending messages in it. This implementation is meant to be a quick fix. The "proper" solution would be to keep track of which rooms we have joined and reset that upon reconnect. Introducing such a state machine is a much larger refactoring that is likely not going to make much of a difference for now because we only join a fixed number of rooms and that will usually succeed. --- rust/phoenix-channel/src/lib.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/rust/phoenix-channel/src/lib.rs b/rust/phoenix-channel/src/lib.rs index 12a5639a6..4242493dc 100644 --- a/rust/phoenix-channel/src/lib.rs +++ b/rust/phoenix-channel/src/lib.rs @@ -243,14 +243,18 @@ where /// /// If successful, a [`Event::JoinedRoom`] event will be emitted. pub fn join(&mut self, topic: impl Into, payload: impl Serialize) { - let request_id = self.send_message(topic, EgressControlMessage::PhxJoin(payload)); + let (request_id, msg) = self.make_message(topic, EgressControlMessage::PhxJoin(payload)); + self.pending_messages.push_front(msg); // Must send the join message before all others. self.pending_join_requests.insert(request_id); } /// Send a message to a topic. pub fn send(&mut self, topic: impl Into, message: impl Serialize) -> OutboundRequestId { - self.send_message(topic, message) + let (id, msg) = self.make_message(topic, message); + self.pending_messages.push_back(msg); + + id } /// Reconnects to the portal. @@ -445,7 +449,8 @@ where // Priority 3: Handle heartbeats. match self.heartbeat.poll(cx) { Poll::Ready(Ok(msg)) => { - let id = self.send_message("phoenix", msg); + let (id, msg) = self.make_message("phoenix", msg); + self.pending_messages.push_back(msg); self.heartbeat.set_id(id); return Poll::Ready(Ok(Event::HeartbeatSent)); @@ -480,11 +485,11 @@ where self.state = State::Connecting(future::ready(Err(e)).boxed()) } - fn send_message( + fn make_message( &mut self, topic: impl Into, payload: impl Serialize, - ) -> OutboundRequestId { + ) -> (OutboundRequestId, String) { let request_id = self.fetch_add_request_id(); // We don't care about the reply type when serializing @@ -495,9 +500,7 @@ where )) .expect("we should always be able to serialize a join topic message"); - self.pending_messages.push_back(msg); - - request_id + (request_id, msg) } fn fetch_add_request_id(&mut self) -> OutboundRequestId {