diff --git a/docker-compose.yml b/docker-compose.yml index 131364c4d..cd6204fff 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -313,7 +313,7 @@ services: environment: FIREZONE_DNS_CONTROL: "${FIREZONE_DNS_CONTROL:-etc-resolv-conf}" FIREZONE_TOKEN: "n.SFMyNTY.g2gDaANtAAAAJGM4OWJjYzhjLTkzOTItNGRhZS1hNDBkLTg4OGFlZjZkMjhlMG0AAAAkN2RhN2QxY2QtMTExYy00NGE3LWI1YWMtNDAyN2I5ZDIzMGU1bQAAACtBaUl5XzZwQmstV0xlUkFQenprQ0ZYTnFJWktXQnMyRGR3XzJ2Z0lRdkZnbgYAGUmu74wBYgABUYA.UN3vSLLcAMkHeEh5VHumPOutkuue8JA6wlxM9JxJEPE" - RUST_LOG: ${RUST_LOG:-firezone_linux_client=trace,wire=trace,connlib_client_shared=trace,firezone_tunnel=trace,connlib_shared=trace,boringtun=debug,snownet=debug,str0m=debug,info} + RUST_LOG: ${RUST_LOG:-firezone_linux_client=trace,wire=trace,connlib_client_shared=trace,firezone_tunnel=trace,connlib_shared=trace,boringtun=debug,snownet=debug,str0m=debug,phoenix_channel=debug,info} FIREZONE_API_URL: ws://api:8081 init: true build: diff --git a/rust/phoenix-channel/src/lib.rs b/rust/phoenix-channel/src/lib.rs index 69bf1546e..0f4e63c4c 100644 --- a/rust/phoenix-channel/src/lib.rs +++ b/rust/phoenix-channel/src/lib.rs @@ -2,6 +2,9 @@ mod heartbeat; mod login_url; use std::collections::{HashSet, VecDeque}; +use std::mem; +use std::sync::atomic::AtomicU64; +use std::sync::Arc; use std::{fmt, future, marker::PhantomData}; use backoff::backoff::Backoff; @@ -15,6 +18,7 @@ use secrecy::{ExposeSecret as _, Secret}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use std::task::{Context, Poll, Waker}; use tokio::net::TcpStream; +use tokio_tungstenite::connect_async_with_config; use tokio_tungstenite::tungstenite::http::StatusCode; use tokio_tungstenite::{ connect_async, @@ -23,9 +27,6 @@ use tokio_tungstenite::{ }; pub use login_url::{LoginUrl, LoginUrlError}; -use std::mem; -use std::sync::atomic::AtomicU64; -use std::sync::Arc; pub struct PhoenixChannel { state: State, @@ -57,6 +58,18 @@ enum State { Closed, } +impl State { + fn connect(url: Secret, user_agent: String) -> Self { + Self::Connecting(Box::pin(async move { + let (stream, _) = connect_async_with_config(make_request(url, user_agent), None, true) + .await + .map_err(InternalError::WebSocket)?; + + Ok(stream) + })) + } +} + /// Creates a new [PhoenixChannel] to the given endpoint and waits for an `init` message. /// /// The provided URL must contain a host. @@ -229,13 +242,7 @@ where reconnect_backoff, url: url.clone(), user_agent: user_agent.clone(), - state: State::Connecting(Box::pin(async move { - let (stream, _) = connect_async(make_request(url, user_agent)) - .await - .map_err(InternalError::WebSocket)?; - - Ok(stream) - })), + state: State::connect(url, user_agent), waker: None, pending_messages: Default::default(), _phantom: PhantomData, @@ -277,13 +284,7 @@ where // 2. Set state to `Connecting` without a timer. let url = self.url.clone(); let user_agent = self.user_agent.clone(); - self.state = State::Connecting(Box::pin(async move { - let (stream, _) = connect_async(make_request(url, user_agent)) - .await - .map_err(InternalError::WebSocket)?; - - Ok(stream) - })); + self.state = State::connect(url, user_agent); // 3. In case we were already re-connecting, we need to wake the suspended task. if let Some(waker) = self.waker.take() { @@ -385,6 +386,19 @@ where match stream.start_send_unpin(Message::Text(message.clone())) { Ok(()) => { tracing::trace!(target: "wire", to="portal", %message); + + match stream.poll_flush_unpin(cx) { + Poll::Ready(Ok(())) => { + tracing::trace!("Flushed websocket"); + } + Poll::Ready(Err(e)) => { + self.reconnect_on_transient_error( + InternalError::WebSocket(e), + ); + continue; + } + Poll::Pending => {} + } } Err(e) => { self.pending_messages.push_front(message); @@ -526,18 +540,6 @@ where Poll::Pending => {} } - // Priority 4: Flush out. - match stream.poll_flush_unpin(cx) { - Poll::Ready(Ok(())) => { - tracing::trace!("Flushed websocket"); - } - Poll::Ready(Err(e)) => { - self.reconnect_on_transient_error(InternalError::WebSocket(e)); - continue; - } - Poll::Pending => {} - } - return Poll::Pending; } }