fix(phoenix-channel): re-queue message upon send failure (#4294)

Previously, we would lose one message to the portal upon failing to send
it. We now mitigate this in two ways:

1. We also check the error from `poll_ready` and don't even pop a
message off from our buffer.
2. If sending still fails, we re-queue it to the front of the buffer.

In certain scenarios as discovered in logs from #4058, this might have
caused a loss of the "answer" message from a gateway to the client,
resulting in a state mismatch where the gateway thinks the connection is
established and the client times out on waiting for the answer.
This commit is contained in:
Thomas Eizinger
2024-03-26 07:00:18 +11:00
committed by GitHub
parent cd5cde6ce1
commit 7e68dff5b5

View File

@@ -335,18 +335,26 @@ where
};
// Priority 1: Keep local buffers small and send pending messages.
if stream.poll_ready_unpin(cx).is_ready() {
if let Some(message) = self.pending_messages.pop_front() {
tracing::trace!(target: "wire", to="portal", %message);
match stream.start_send_unpin(Message::Text(message)) {
Ok(()) => {}
Err(e) => {
self.reconnect_on_transient_error(InternalError::WebSocket(e));
match stream.poll_ready_unpin(cx) {
Poll::Ready(Ok(())) => {
if let Some(message) = self.pending_messages.pop_front() {
match stream.start_send_unpin(Message::Text(message.clone())) {
Ok(()) => {
tracing::trace!(target: "wire", to="portal", %message);
}
Err(e) => {
self.pending_messages.push_front(message);
self.reconnect_on_transient_error(InternalError::WebSocket(e));
}
}
continue;
}
}
Poll::Ready(Err(e)) => {
self.reconnect_on_transient_error(InternalError::WebSocket(e));
continue;
}
Poll::Pending => {}
}
// Priority 2: Handle incoming messages.