diff --git a/rust/connlib/phoenix-channel/src/lib.rs b/rust/connlib/phoenix-channel/src/lib.rs index 1d1361154..c38d1b793 100644 --- a/rust/connlib/phoenix-channel/src/lib.rs +++ b/rust/connlib/phoenix-channel/src/lib.rs @@ -38,6 +38,7 @@ const MAX_BUFFERED_MESSAGES: usize = 32; // Chosen pretty arbitrarily. If we are pub struct PhoenixChannel { state: State, waker: Option, + pending_joins: VecDeque, pending_messages: VecDeque, next_request_id: u64, socket_factory: Arc>, @@ -277,6 +278,7 @@ where state: State::Closed, socket_factory, waker: None, + pending_joins: VecDeque::with_capacity(MAX_BUFFERED_MESSAGES), pending_messages: VecDeque::with_capacity(MAX_BUFFERED_MESSAGES), _phantom: PhantomData, heartbeat: tokio::time::interval(Duration::from_secs(30)), @@ -294,8 +296,8 @@ where /// If successful, a [`Event::JoinedRoom`] event will be emitted. pub fn join(&mut self, topic: impl Into, payload: impl Serialize) { let (request_id, msg) = self.make_message(topic, EgressControlMessage::PhxJoin(payload)); - self.pending_messages.push_front(msg); // Must send the join message before all others. + self.pending_joins.push_back(msg); self.pending_join_requests.insert(request_id); } @@ -463,7 +465,17 @@ where // Priority 1: Keep local buffers small and send pending messages. match stream.poll_ready_unpin(cx) { Poll::Ready(Ok(())) => { - if let Some(message) = self.pending_messages.pop_front() { + // Process join messages before other messages. + // Only process other messages if no room joins are pending. + let next_message = self.pending_joins.pop_front().or_else(|| { + if self.pending_join_requests.is_empty() { + return None; + } + + self.pending_messages.pop_front() + }); + + if let Some(message) = next_message { match stream.start_send_unpin(Message::Text(message.clone())) { Ok(()) => { tracing::trace!(target: "wire::api::send", %message); @@ -488,6 +500,7 @@ where self.reconnect_on_transient_error(InternalError::WebSocket(e)); } } + continue; } }