diff --git a/rust/phoenix-channel/src/lib.rs b/rust/phoenix-channel/src/lib.rs index e87010289..5ef4f1eb6 100644 --- a/rust/phoenix-channel/src/lib.rs +++ b/rust/phoenix-channel/src/lib.rs @@ -69,10 +69,10 @@ where /// /// The provided URL must contain a host. /// Additionally, you must already provide any query parameters required for authentication. - pub async fn connect(uri: Url, user_agent: String) -> Result { + pub async fn connect(url: Url, user_agent: String) -> Result { tracing::trace!("Trying to connect to the portal..."); - let (stream, _) = connect_async(make_request(&uri, user_agent)?).await?; + let (stream, _) = connect_async(make_request(&url, user_agent)?).await?; tracing::trace!("Successfully connected to portal"); diff --git a/rust/relay/src/main.rs b/rust/relay/src/main.rs index e157d40bb..069f99860 100644 --- a/rust/relay/src/main.rs +++ b/rust/relay/src/main.rs @@ -3,6 +3,7 @@ use clap::Parser; use futures::channel::mpsc; use futures::{future, FutureExt, SinkExt, StreamExt}; use opentelemetry::sdk::export::metrics; +use opentelemetry::{sdk, KeyValue}; use opentelemetry_otlp::WithExportConfig; use phoenix_channel::{Error, Event, PhoenixChannel}; use rand::rngs::StdRng; @@ -18,7 +19,7 @@ use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; use std::pin::Pin; use std::task::Poll; use std::time::SystemTime; -use tracing::{level_filters::LevelFilter, Subscriber}; +use tracing::{level_filters::LevelFilter, Instrument, Subscriber}; use tracing_core::Dispatch; use tracing_stackdriver::CloudTraceConfiguration; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer}; @@ -105,60 +106,15 @@ async fn main() -> Result<()> { args.highest_port, ); - let channel = if let Some(token) = args.portal_token { - let mut url = args.portal_ws_url.clone(); - if !url.path().is_empty() { - tracing::warn!("Overwriting path component of portal URL with '/relay/websocket'"); - } + let channel = if let Some(token) = args.portal_token.as_ref() { + let url = args.portal_ws_url.clone(); + let stamp_secret = server.auth_secret().to_string(); - url.set_path("relay/websocket"); - url.query_pairs_mut().append_pair("token", &token); + let span = tracing::error_span!("connect_to_portal", config_url = %url); - if let Some(public_ip4_addr) = args.public_ip4_addr { - url.query_pairs_mut() - .append_pair("ipv4", &public_ip4_addr.to_string()); - } - if let Some(public_ip6_addr) = args.public_ip6_addr { - url.query_pairs_mut() - .append_pair("ipv6", &public_ip6_addr.to_string()); - } - - let mut channel = PhoenixChannel::::connect( - url, - format!("relay/{}", env!("CARGO_PKG_VERSION")), - ) - .await - .context("Failed to connect to the portal")?; - - tracing::info!("Connected to portal, waiting for init message",); - - channel.join( - "relay", - JoinMessage { - stamp_secret: server.auth_secret().to_string(), - }, - ); - - loop { - match future::poll_fn(|cx| channel.poll(cx)) - .await - .context("portal connection failed")? - { - Event::JoinedRoom { topic } if topic == "relay" => { - tracing::info!("Joined relay room on portal") - } - Event::InboundMessage { - topic, - msg: InboundPortalMessage::Init {}, - } => { - tracing::info!("Received init message from portal on topic {topic}, starting relay activities"); - break Some(channel); - } - other => { - tracing::debug!("Unhandled message from portal: {other:?}"); - } - } - } + connect_to_portal(&args, token, url, stamp_secret) + .instrument(span) + .await? } else { tracing::warn!("No portal token supplied, starting standalone mode"); @@ -203,11 +159,15 @@ async fn setup_tracing(args: &Args) -> Result<()> { .tonic() .with_endpoint(grpc_endpoint.clone()); - let tracer = opentelemetry_otlp::new_pipeline() - .tracing() - .with_exporter(exporter) - .install_batch(opentelemetry::runtime::Tokio) - .context("Failed to create OTLP trace pipeline")?; + let tracer = + opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter(exporter) + .with_trace_config(sdk::trace::Config::default().with_resource( + sdk::Resource::new(vec![KeyValue::new("service.name", "relay")]), + )) + .install_batch(opentelemetry::runtime::Tokio) + .context("Failed to create OTLP trace pipeline")?; tracing::trace!("Successfully initialized trace provider on tokio runtime"); @@ -274,6 +234,63 @@ where log_layer.with_filter(env_filter).boxed() } +async fn connect_to_portal( + args: &Args, + token: &str, + mut url: Url, + stamp_secret: String, +) -> Result>> { + if !url.path().is_empty() { + tracing::warn!("Overwriting path component of portal URL with '/relay/websocket'"); + } + + url.set_path("relay/websocket"); + url.query_pairs_mut().append_pair("token", token); + + if let Some(public_ip4_addr) = args.public_ip4_addr { + url.query_pairs_mut() + .append_pair("ipv4", &public_ip4_addr.to_string()); + } + if let Some(public_ip6_addr) = args.public_ip6_addr { + url.query_pairs_mut() + .append_pair("ipv6", &public_ip6_addr.to_string()); + } + + let mut channel = PhoenixChannel::::connect( + url, + format!("relay/{}", env!("CARGO_PKG_VERSION")), + ) + .await + .context("Failed to connect to the portal")?; + + tracing::info!("Connected to portal, waiting for init message",); + + channel.join("relay", JoinMessage { stamp_secret }); + + loop { + match future::poll_fn(|cx| channel.poll(cx)) + .await + .context("portal connection failed")? + { + Event::JoinedRoom { topic } if topic == "relay" => { + tracing::info!("Joined relay room on portal") + } + Event::InboundMessage { + topic, + msg: InboundPortalMessage::Init {}, + } => { + tracing::info!( + "Received init message from portal on topic {topic}, starting relay activities" + ); + return Ok(Some(channel)); + } + other => { + tracing::debug!("Unhandled message from portal: {other:?}"); + } + } + } +} + #[derive(serde::Serialize, PartialEq, Debug)] struct JoinMessage { stamp_secret: String, @@ -362,6 +379,9 @@ where } fn poll(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { + let span = tracing::error_span!("Eventloop::poll"); + let _guard = span.enter(); + loop { let now = SystemTime::now(); @@ -369,6 +389,9 @@ where if let Some(next_command) = self.server.next_command() { match next_command { Command::SendMessage { payload, recipient } => { + let span = tracing::error_span!("Command::SendMessage"); + let _guard = span.enter(); + let sender = match recipient.family() { AddressFamily::V4 => &mut self.outbound_ip4_data_sender, AddressFamily::V6 => &mut self.outbound_ip6_data_sender, @@ -387,12 +410,19 @@ where } } Command::CreateAllocation { id, family, port } => { + let span = + tracing::error_span!("Command::CreateAllocation", %id, %family, %port); + let _guard = span.enter(); + self.allocations.insert( (id, family), Allocation::new(self.relay_data_sender.clone(), id, family, port), ); } Command::FreeAllocation { id, family } => { + let span = tracing::error_span!("Command::FreeAllocation", %id, %family); + let _guard = span.enter(); + if self.allocations.remove(&(id, family)).is_none() { tracing::debug!("Unknown allocation {id}"); continue; @@ -401,6 +431,9 @@ where tracing::info!("Freeing addresses of allocation {id}"); } Command::Wake { deadline } => { + let span = tracing::error_span!("Command::Wake", ?deadline); + let _guard = span.enter(); + match deadline.duration_since(now) { Ok(duration) => { tracing::trace!(?duration, "Suspending event loop") @@ -418,6 +451,9 @@ where Pin::new(&mut self.sleep).reset(deadline); } Command::ForwardData { id, data, receiver } => { + let span = tracing::error_span!("Command::ForwardData", %id, %receiver); + let _guard = span.enter(); + let mut allocation = match self.allocations.entry((id, receiver.family())) { Entry::Occupied(entry) => entry, Entry::Vacant(_) => { diff --git a/rust/relay/src/server.rs b/rust/relay/src/server.rs index 08d29ddb3..6b56f2015 100644 --- a/rust/relay/src/server.rs +++ b/rust/relay/src/server.rs @@ -213,7 +213,8 @@ where /// /// After calling this method, you should call [`Server::next_command`] until it returns `None`. pub fn handle_client_input(&mut self, bytes: &[u8], sender: SocketAddr, now: SystemTime) { - let span = tracing::error_span!("client", %sender, transaction_id = field::Empty); + let span = + tracing::error_span!("handle_client_input", %sender, transaction_id = field::Empty); let _guard = span.enter(); if tracing::enabled!(target: "wire", tracing::Level::TRACE) { @@ -313,7 +314,7 @@ where sender: SocketAddr, allocation_id: AllocationId, ) { - let span = tracing::error_span!("peer", %sender, %allocation_id, recipient = field::Empty, channel = field::Empty); + let span = tracing::error_span!("handle_relay_input", %sender, %allocation_id, recipient = field::Empty, channel = field::Empty); let _guard = span.enter(); if tracing::enabled!(target: "wire", tracing::Level::TRACE) { @@ -369,6 +370,9 @@ where } pub fn handle_deadline_reached(&mut self, now: SystemTime) { + let span = tracing::error_span!("handle_deadline_reached", ?now); + let _guard = span.enter(); + for action in self.time_events.pending_actions(now) { match action { TimedAction::ExpireAllocation(id) => { @@ -409,11 +413,20 @@ where /// An allocation failed. pub fn handle_allocation_failed(&mut self, id: AllocationId) { + let span = tracing::error_span!("handle_allocation_failed", %id); + let _guard = span.enter(); + self.delete_allocation(id) } /// Return the next command to be executed. pub fn next_command(&mut self) -> Option { + let num_commands = self.pending_commands.len(); + + if num_commands > 0 { + tracing::trace!(%num_commands, "Popping next command"); + } + self.pending_commands.pop_front() } @@ -439,8 +452,7 @@ where ) -> Result<(), Message> { let effective_lifetime = request.effective_lifetime(); - let span = - tracing::error_span!("allocate", lifetime = %effective_lifetime.lifetime().as_secs()); + let span = tracing::error_span!("handle_allocate_request", lifetime = %effective_lifetime.lifetime().as_secs()); let _guard = span.enter(); self.verify_auth(&request, now)?; @@ -548,8 +560,7 @@ where ) -> Result<(), Message> { let effective_lifetime = request.effective_lifetime(); - let span = - tracing::error_span!("refresh", lifetime = %effective_lifetime.lifetime().as_secs()); + let span = tracing::error_span!("handle_refresh_request", lifetime = %effective_lifetime.lifetime().as_secs()); let _guard = span.enter(); self.verify_auth(&request, now)?; @@ -608,7 +619,7 @@ where let requested_channel = request.channel_number().value(); let peer_address = request.xor_peer_address().address(); - let span = tracing::error_span!("channel_bind", %requested_channel, %peer_address, allocation = field::Empty); + let span = tracing::error_span!("handle_channel_bind_request", %requested_channel, %peer_address, allocation = field::Empty); let _guard = span.enter(); self.verify_auth(&request, now)?; @@ -699,7 +710,7 @@ where let channel_number = message.channel(); let data = message.data(); - let span = tracing::error_span!("channel_data", channel = %channel_number, recipient = field::Empty); + let span = tracing::error_span!("handle_channel_data_message", channel = %channel_number, recipient = field::Empty); let _guard = span.enter(); let Some(channel) = self.channels_by_number.get(&channel_number) else { @@ -878,6 +889,8 @@ where fn delete_allocation(&mut self, id: AllocationId) { let Some(client) = self.clients_by_allocation.remove(&id) else { + tracing::debug!("Unknown allocation"); + return; }; let allocation = self diff --git a/rust/relay/src/time_events.rs b/rust/relay/src/time_events.rs index 256f558b2..5f91250dd 100644 --- a/rust/relay/src/time_events.rs +++ b/rust/relay/src/time_events.rs @@ -39,6 +39,8 @@ where let remaining_actions = self.events.split_off(split_index); let events = mem::replace(&mut self.events, remaining_actions); + tracing::trace!("Got {} pending actions", events.len()); + events.into_iter().map(|event| event.action) }