From 8fe43a8afed3c984f1bbce5ddcc82d48f3f2c31d Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 26 Jun 2024 06:07:54 +1000 Subject: [PATCH] chore(phoenix-channel): only flush after writing (#5510) Currently, `phoenix-channel` calls `flush` manually to ensure we don't have messages sitting in a buffer somewhere. This is somewhat wasteful if we haven't actually written any message. We can move the flushing to directly after sending the message. To avoid further buffering on the TCP level, we disable Nagle's algorithm to avoid buffering on the TCP level. --- docker-compose.yml | 2 +- rust/phoenix-channel/src/lib.rs | 60 +++++++++++++++++---------------- 2 files changed, 32 insertions(+), 30 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 131364c4d..cd6204fff 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -313,7 +313,7 @@ services: environment: FIREZONE_DNS_CONTROL: "${FIREZONE_DNS_CONTROL:-etc-resolv-conf}" FIREZONE_TOKEN: "n.SFMyNTY.g2gDaANtAAAAJGM4OWJjYzhjLTkzOTItNGRhZS1hNDBkLTg4OGFlZjZkMjhlMG0AAAAkN2RhN2QxY2QtMTExYy00NGE3LWI1YWMtNDAyN2I5ZDIzMGU1bQAAACtBaUl5XzZwQmstV0xlUkFQenprQ0ZYTnFJWktXQnMyRGR3XzJ2Z0lRdkZnbgYAGUmu74wBYgABUYA.UN3vSLLcAMkHeEh5VHumPOutkuue8JA6wlxM9JxJEPE" - RUST_LOG: ${RUST_LOG:-firezone_linux_client=trace,wire=trace,connlib_client_shared=trace,firezone_tunnel=trace,connlib_shared=trace,boringtun=debug,snownet=debug,str0m=debug,info} + RUST_LOG: ${RUST_LOG:-firezone_linux_client=trace,wire=trace,connlib_client_shared=trace,firezone_tunnel=trace,connlib_shared=trace,boringtun=debug,snownet=debug,str0m=debug,phoenix_channel=debug,info} FIREZONE_API_URL: ws://api:8081 init: true build: diff --git a/rust/phoenix-channel/src/lib.rs b/rust/phoenix-channel/src/lib.rs index 69bf1546e..0f4e63c4c 100644 --- a/rust/phoenix-channel/src/lib.rs +++ b/rust/phoenix-channel/src/lib.rs @@ -2,6 +2,9 @@ mod heartbeat; mod login_url; use std::collections::{HashSet, VecDeque}; +use std::mem; +use std::sync::atomic::AtomicU64; +use std::sync::Arc; use std::{fmt, future, marker::PhantomData}; use backoff::backoff::Backoff; @@ -15,6 +18,7 @@ use secrecy::{ExposeSecret as _, Secret}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use std::task::{Context, Poll, Waker}; use tokio::net::TcpStream; +use tokio_tungstenite::connect_async_with_config; use tokio_tungstenite::tungstenite::http::StatusCode; use tokio_tungstenite::{ connect_async, @@ -23,9 +27,6 @@ use tokio_tungstenite::{ }; pub use login_url::{LoginUrl, LoginUrlError}; -use std::mem; -use std::sync::atomic::AtomicU64; -use std::sync::Arc; pub struct PhoenixChannel { state: State, @@ -57,6 +58,18 @@ enum State { Closed, } +impl State { + fn connect(url: Secret, user_agent: String) -> Self { + Self::Connecting(Box::pin(async move { + let (stream, _) = connect_async_with_config(make_request(url, user_agent), None, true) + .await + .map_err(InternalError::WebSocket)?; + + Ok(stream) + })) + } +} + /// Creates a new [PhoenixChannel] to the given endpoint and waits for an `init` message. /// /// The provided URL must contain a host. @@ -229,13 +242,7 @@ where reconnect_backoff, url: url.clone(), user_agent: user_agent.clone(), - state: State::Connecting(Box::pin(async move { - let (stream, _) = connect_async(make_request(url, user_agent)) - .await - .map_err(InternalError::WebSocket)?; - - Ok(stream) - })), + state: State::connect(url, user_agent), waker: None, pending_messages: Default::default(), _phantom: PhantomData, @@ -277,13 +284,7 @@ where // 2. Set state to `Connecting` without a timer. let url = self.url.clone(); let user_agent = self.user_agent.clone(); - self.state = State::Connecting(Box::pin(async move { - let (stream, _) = connect_async(make_request(url, user_agent)) - .await - .map_err(InternalError::WebSocket)?; - - Ok(stream) - })); + self.state = State::connect(url, user_agent); // 3. In case we were already re-connecting, we need to wake the suspended task. if let Some(waker) = self.waker.take() { @@ -385,6 +386,19 @@ where match stream.start_send_unpin(Message::Text(message.clone())) { Ok(()) => { tracing::trace!(target: "wire", to="portal", %message); + + match stream.poll_flush_unpin(cx) { + Poll::Ready(Ok(())) => { + tracing::trace!("Flushed websocket"); + } + Poll::Ready(Err(e)) => { + self.reconnect_on_transient_error( + InternalError::WebSocket(e), + ); + continue; + } + Poll::Pending => {} + } } Err(e) => { self.pending_messages.push_front(message); @@ -526,18 +540,6 @@ where Poll::Pending => {} } - // Priority 4: Flush out. - match stream.poll_flush_unpin(cx) { - Poll::Ready(Ok(())) => { - tracing::trace!("Flushed websocket"); - } - Poll::Ready(Err(e)) => { - self.reconnect_on_transient_error(InternalError::WebSocket(e)); - continue; - } - Poll::Pending => {} - } - return Poll::Pending; } }