From 7e68dff5b5f343cd8e647a3f7ddabeb8e8bdaa71 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 26 Mar 2024 07:00:18 +1100 Subject: [PATCH] fix(phoenix-channel): re-queue message upon send failure (#4294) Previously, we would lose one message to the portal upon failing to send it. We now mitigate this in two ways: 1. We also check the error from `poll_ready` and don't even pop a message off from our buffer. 2. If sending still fails, we re-queue it to the front of the buffer. In certain scenarios as discovered in logs from #4058, this might have caused a loss of the "answer" message from a gateway to the client, resulting in a state mismatch where the gateway thinks the connection is established and the client times out on waiting for the answer. --- rust/phoenix-channel/src/lib.rs | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/rust/phoenix-channel/src/lib.rs b/rust/phoenix-channel/src/lib.rs index 4242493dc..043a32e9d 100644 --- a/rust/phoenix-channel/src/lib.rs +++ b/rust/phoenix-channel/src/lib.rs @@ -335,18 +335,26 @@ where }; // Priority 1: Keep local buffers small and send pending messages. - if stream.poll_ready_unpin(cx).is_ready() { - if let Some(message) = self.pending_messages.pop_front() { - tracing::trace!(target: "wire", to="portal", %message); - - match stream.start_send_unpin(Message::Text(message)) { - Ok(()) => {} - Err(e) => { - self.reconnect_on_transient_error(InternalError::WebSocket(e)); + match stream.poll_ready_unpin(cx) { + Poll::Ready(Ok(())) => { + if let Some(message) = self.pending_messages.pop_front() { + match stream.start_send_unpin(Message::Text(message.clone())) { + Ok(()) => { + tracing::trace!(target: "wire", to="portal", %message); + } + Err(e) => { + self.pending_messages.push_front(message); + self.reconnect_on_transient_error(InternalError::WebSocket(e)); + } } + continue; } + } + Poll::Ready(Err(e)) => { + self.reconnect_on_transient_error(InternalError::WebSocket(e)); continue; } + Poll::Pending => {} } // Priority 2: Handle incoming messages.