chore(phoenix-channel): only flush after writing (#5510)

Currently, `phoenix-channel` calls `flush` manually to ensure we don't
have messages sitting in a buffer somewhere. This is somewhat wasteful
if we haven't actually written any message. We can move the flushing to
directly after sending the message.

To avoid further buffering on the TCP level, we disable Nagle's
algorithm to avoid buffering on the TCP level.
This commit is contained in:
Thomas Eizinger
2024-06-26 06:07:54 +10:00
committed by GitHub
parent 2a539410c5
commit 8fe43a8afe
2 changed files with 32 additions and 30 deletions

View File

@@ -313,7 +313,7 @@ services:
environment:
FIREZONE_DNS_CONTROL: "${FIREZONE_DNS_CONTROL:-etc-resolv-conf}"
FIREZONE_TOKEN: "n.SFMyNTY.g2gDaANtAAAAJGM4OWJjYzhjLTkzOTItNGRhZS1hNDBkLTg4OGFlZjZkMjhlMG0AAAAkN2RhN2QxY2QtMTExYy00NGE3LWI1YWMtNDAyN2I5ZDIzMGU1bQAAACtBaUl5XzZwQmstV0xlUkFQenprQ0ZYTnFJWktXQnMyRGR3XzJ2Z0lRdkZnbgYAGUmu74wBYgABUYA.UN3vSLLcAMkHeEh5VHumPOutkuue8JA6wlxM9JxJEPE"
RUST_LOG: ${RUST_LOG:-firezone_linux_client=trace,wire=trace,connlib_client_shared=trace,firezone_tunnel=trace,connlib_shared=trace,boringtun=debug,snownet=debug,str0m=debug,info}
RUST_LOG: ${RUST_LOG:-firezone_linux_client=trace,wire=trace,connlib_client_shared=trace,firezone_tunnel=trace,connlib_shared=trace,boringtun=debug,snownet=debug,str0m=debug,phoenix_channel=debug,info}
FIREZONE_API_URL: ws://api:8081
init: true
build:

View File

@@ -2,6 +2,9 @@ mod heartbeat;
mod login_url;
use std::collections::{HashSet, VecDeque};
use std::mem;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::{fmt, future, marker::PhantomData};
use backoff::backoff::Backoff;
@@ -15,6 +18,7 @@ use secrecy::{ExposeSecret as _, Secret};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::task::{Context, Poll, Waker};
use tokio::net::TcpStream;
use tokio_tungstenite::connect_async_with_config;
use tokio_tungstenite::tungstenite::http::StatusCode;
use tokio_tungstenite::{
connect_async,
@@ -23,9 +27,6 @@ use tokio_tungstenite::{
};
pub use login_url::{LoginUrl, LoginUrlError};
use std::mem;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
pub struct PhoenixChannel<TInitReq, TInboundMsg, TOutboundRes> {
state: State,
@@ -57,6 +58,18 @@ enum State {
Closed,
}
impl State {
fn connect(url: Secret<LoginUrl>, user_agent: String) -> Self {
Self::Connecting(Box::pin(async move {
let (stream, _) = connect_async_with_config(make_request(url, user_agent), None, true)
.await
.map_err(InternalError::WebSocket)?;
Ok(stream)
}))
}
}
/// Creates a new [PhoenixChannel] to the given endpoint and waits for an `init` message.
///
/// The provided URL must contain a host.
@@ -229,13 +242,7 @@ where
reconnect_backoff,
url: url.clone(),
user_agent: user_agent.clone(),
state: State::Connecting(Box::pin(async move {
let (stream, _) = connect_async(make_request(url, user_agent))
.await
.map_err(InternalError::WebSocket)?;
Ok(stream)
})),
state: State::connect(url, user_agent),
waker: None,
pending_messages: Default::default(),
_phantom: PhantomData,
@@ -277,13 +284,7 @@ where
// 2. Set state to `Connecting` without a timer.
let url = self.url.clone();
let user_agent = self.user_agent.clone();
self.state = State::Connecting(Box::pin(async move {
let (stream, _) = connect_async(make_request(url, user_agent))
.await
.map_err(InternalError::WebSocket)?;
Ok(stream)
}));
self.state = State::connect(url, user_agent);
// 3. In case we were already re-connecting, we need to wake the suspended task.
if let Some(waker) = self.waker.take() {
@@ -385,6 +386,19 @@ where
match stream.start_send_unpin(Message::Text(message.clone())) {
Ok(()) => {
tracing::trace!(target: "wire", to="portal", %message);
match stream.poll_flush_unpin(cx) {
Poll::Ready(Ok(())) => {
tracing::trace!("Flushed websocket");
}
Poll::Ready(Err(e)) => {
self.reconnect_on_transient_error(
InternalError::WebSocket(e),
);
continue;
}
Poll::Pending => {}
}
}
Err(e) => {
self.pending_messages.push_front(message);
@@ -526,18 +540,6 @@ where
Poll::Pending => {}
}
// Priority 4: Flush out.
match stream.poll_flush_unpin(cx) {
Poll::Ready(Ok(())) => {
tracing::trace!("Flushed websocket");
}
Poll::Ready(Err(e)) => {
self.reconnect_on_transient_error(InternalError::WebSocket(e));
continue;
}
Poll::Pending => {}
}
return Poll::Pending;
}
}