feat(relay): drastically improve usefulness of spans (#2056)

This commit is contained in:
Thomas Eizinger
2023-09-14 14:04:20 +10:00
committed by GitHub
parent 89d7b0f5f4
commit 608488d718
4 changed files with 119 additions and 68 deletions

View File

@@ -69,10 +69,10 @@ where
///
/// The provided URL must contain a host.
/// Additionally, you must already provide any query parameters required for authentication.
pub async fn connect(uri: Url, user_agent: String) -> Result<Self, Error> {
pub async fn connect(url: Url, user_agent: String) -> Result<Self, Error> {
tracing::trace!("Trying to connect to the portal...");
let (stream, _) = connect_async(make_request(&uri, user_agent)?).await?;
let (stream, _) = connect_async(make_request(&url, user_agent)?).await?;
tracing::trace!("Successfully connected to portal");

View File

@@ -3,6 +3,7 @@ use clap::Parser;
use futures::channel::mpsc;
use futures::{future, FutureExt, SinkExt, StreamExt};
use opentelemetry::sdk::export::metrics;
use opentelemetry::{sdk, KeyValue};
use opentelemetry_otlp::WithExportConfig;
use phoenix_channel::{Error, Event, PhoenixChannel};
use rand::rngs::StdRng;
@@ -18,7 +19,7 @@ use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
use std::pin::Pin;
use std::task::Poll;
use std::time::SystemTime;
use tracing::{level_filters::LevelFilter, Subscriber};
use tracing::{level_filters::LevelFilter, Instrument, Subscriber};
use tracing_core::Dispatch;
use tracing_stackdriver::CloudTraceConfiguration;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};
@@ -105,60 +106,15 @@ async fn main() -> Result<()> {
args.highest_port,
);
let channel = if let Some(token) = args.portal_token {
let mut url = args.portal_ws_url.clone();
if !url.path().is_empty() {
tracing::warn!("Overwriting path component of portal URL with '/relay/websocket'");
}
let channel = if let Some(token) = args.portal_token.as_ref() {
let url = args.portal_ws_url.clone();
let stamp_secret = server.auth_secret().to_string();
url.set_path("relay/websocket");
url.query_pairs_mut().append_pair("token", &token);
let span = tracing::error_span!("connect_to_portal", config_url = %url);
if let Some(public_ip4_addr) = args.public_ip4_addr {
url.query_pairs_mut()
.append_pair("ipv4", &public_ip4_addr.to_string());
}
if let Some(public_ip6_addr) = args.public_ip6_addr {
url.query_pairs_mut()
.append_pair("ipv6", &public_ip6_addr.to_string());
}
let mut channel = PhoenixChannel::<InboundPortalMessage, ()>::connect(
url,
format!("relay/{}", env!("CARGO_PKG_VERSION")),
)
.await
.context("Failed to connect to the portal")?;
tracing::info!("Connected to portal, waiting for init message",);
channel.join(
"relay",
JoinMessage {
stamp_secret: server.auth_secret().to_string(),
},
);
loop {
match future::poll_fn(|cx| channel.poll(cx))
.await
.context("portal connection failed")?
{
Event::JoinedRoom { topic } if topic == "relay" => {
tracing::info!("Joined relay room on portal")
}
Event::InboundMessage {
topic,
msg: InboundPortalMessage::Init {},
} => {
tracing::info!("Received init message from portal on topic {topic}, starting relay activities");
break Some(channel);
}
other => {
tracing::debug!("Unhandled message from portal: {other:?}");
}
}
}
connect_to_portal(&args, token, url, stamp_secret)
.instrument(span)
.await?
} else {
tracing::warn!("No portal token supplied, starting standalone mode");
@@ -203,11 +159,15 @@ async fn setup_tracing(args: &Args) -> Result<()> {
.tonic()
.with_endpoint(grpc_endpoint.clone());
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(exporter)
.install_batch(opentelemetry::runtime::Tokio)
.context("Failed to create OTLP trace pipeline")?;
let tracer =
opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(exporter)
.with_trace_config(sdk::trace::Config::default().with_resource(
sdk::Resource::new(vec![KeyValue::new("service.name", "relay")]),
))
.install_batch(opentelemetry::runtime::Tokio)
.context("Failed to create OTLP trace pipeline")?;
tracing::trace!("Successfully initialized trace provider on tokio runtime");
@@ -274,6 +234,63 @@ where
log_layer.with_filter(env_filter).boxed()
}
async fn connect_to_portal(
args: &Args,
token: &str,
mut url: Url,
stamp_secret: String,
) -> Result<Option<PhoenixChannel<InboundPortalMessage, ()>>> {
if !url.path().is_empty() {
tracing::warn!("Overwriting path component of portal URL with '/relay/websocket'");
}
url.set_path("relay/websocket");
url.query_pairs_mut().append_pair("token", token);
if let Some(public_ip4_addr) = args.public_ip4_addr {
url.query_pairs_mut()
.append_pair("ipv4", &public_ip4_addr.to_string());
}
if let Some(public_ip6_addr) = args.public_ip6_addr {
url.query_pairs_mut()
.append_pair("ipv6", &public_ip6_addr.to_string());
}
let mut channel = PhoenixChannel::<InboundPortalMessage, ()>::connect(
url,
format!("relay/{}", env!("CARGO_PKG_VERSION")),
)
.await
.context("Failed to connect to the portal")?;
tracing::info!("Connected to portal, waiting for init message",);
channel.join("relay", JoinMessage { stamp_secret });
loop {
match future::poll_fn(|cx| channel.poll(cx))
.await
.context("portal connection failed")?
{
Event::JoinedRoom { topic } if topic == "relay" => {
tracing::info!("Joined relay room on portal")
}
Event::InboundMessage {
topic,
msg: InboundPortalMessage::Init {},
} => {
tracing::info!(
"Received init message from portal on topic {topic}, starting relay activities"
);
return Ok(Some(channel));
}
other => {
tracing::debug!("Unhandled message from portal: {other:?}");
}
}
}
}
#[derive(serde::Serialize, PartialEq, Debug)]
struct JoinMessage {
stamp_secret: String,
@@ -362,6 +379,9 @@ where
}
fn poll(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<()>> {
let span = tracing::error_span!("Eventloop::poll");
let _guard = span.enter();
loop {
let now = SystemTime::now();
@@ -369,6 +389,9 @@ where
if let Some(next_command) = self.server.next_command() {
match next_command {
Command::SendMessage { payload, recipient } => {
let span = tracing::error_span!("Command::SendMessage");
let _guard = span.enter();
let sender = match recipient.family() {
AddressFamily::V4 => &mut self.outbound_ip4_data_sender,
AddressFamily::V6 => &mut self.outbound_ip6_data_sender,
@@ -387,12 +410,19 @@ where
}
}
Command::CreateAllocation { id, family, port } => {
let span =
tracing::error_span!("Command::CreateAllocation", %id, %family, %port);
let _guard = span.enter();
self.allocations.insert(
(id, family),
Allocation::new(self.relay_data_sender.clone(), id, family, port),
);
}
Command::FreeAllocation { id, family } => {
let span = tracing::error_span!("Command::FreeAllocation", %id, %family);
let _guard = span.enter();
if self.allocations.remove(&(id, family)).is_none() {
tracing::debug!("Unknown allocation {id}");
continue;
@@ -401,6 +431,9 @@ where
tracing::info!("Freeing addresses of allocation {id}");
}
Command::Wake { deadline } => {
let span = tracing::error_span!("Command::Wake", ?deadline);
let _guard = span.enter();
match deadline.duration_since(now) {
Ok(duration) => {
tracing::trace!(?duration, "Suspending event loop")
@@ -418,6 +451,9 @@ where
Pin::new(&mut self.sleep).reset(deadline);
}
Command::ForwardData { id, data, receiver } => {
let span = tracing::error_span!("Command::ForwardData", %id, %receiver);
let _guard = span.enter();
let mut allocation = match self.allocations.entry((id, receiver.family())) {
Entry::Occupied(entry) => entry,
Entry::Vacant(_) => {

View File

@@ -213,7 +213,8 @@ where
///
/// After calling this method, you should call [`Server::next_command`] until it returns `None`.
pub fn handle_client_input(&mut self, bytes: &[u8], sender: SocketAddr, now: SystemTime) {
let span = tracing::error_span!("client", %sender, transaction_id = field::Empty);
let span =
tracing::error_span!("handle_client_input", %sender, transaction_id = field::Empty);
let _guard = span.enter();
if tracing::enabled!(target: "wire", tracing::Level::TRACE) {
@@ -313,7 +314,7 @@ where
sender: SocketAddr,
allocation_id: AllocationId,
) {
let span = tracing::error_span!("peer", %sender, %allocation_id, recipient = field::Empty, channel = field::Empty);
let span = tracing::error_span!("handle_relay_input", %sender, %allocation_id, recipient = field::Empty, channel = field::Empty);
let _guard = span.enter();
if tracing::enabled!(target: "wire", tracing::Level::TRACE) {
@@ -369,6 +370,9 @@ where
}
pub fn handle_deadline_reached(&mut self, now: SystemTime) {
let span = tracing::error_span!("handle_deadline_reached", ?now);
let _guard = span.enter();
for action in self.time_events.pending_actions(now) {
match action {
TimedAction::ExpireAllocation(id) => {
@@ -409,11 +413,20 @@ where
/// An allocation failed.
pub fn handle_allocation_failed(&mut self, id: AllocationId) {
let span = tracing::error_span!("handle_allocation_failed", %id);
let _guard = span.enter();
self.delete_allocation(id)
}
/// Return the next command to be executed.
pub fn next_command(&mut self) -> Option<Command> {
let num_commands = self.pending_commands.len();
if num_commands > 0 {
tracing::trace!(%num_commands, "Popping next command");
}
self.pending_commands.pop_front()
}
@@ -439,8 +452,7 @@ where
) -> Result<(), Message<Attribute>> {
let effective_lifetime = request.effective_lifetime();
let span =
tracing::error_span!("allocate", lifetime = %effective_lifetime.lifetime().as_secs());
let span = tracing::error_span!("handle_allocate_request", lifetime = %effective_lifetime.lifetime().as_secs());
let _guard = span.enter();
self.verify_auth(&request, now)?;
@@ -548,8 +560,7 @@ where
) -> Result<(), Message<Attribute>> {
let effective_lifetime = request.effective_lifetime();
let span =
tracing::error_span!("refresh", lifetime = %effective_lifetime.lifetime().as_secs());
let span = tracing::error_span!("handle_refresh_request", lifetime = %effective_lifetime.lifetime().as_secs());
let _guard = span.enter();
self.verify_auth(&request, now)?;
@@ -608,7 +619,7 @@ where
let requested_channel = request.channel_number().value();
let peer_address = request.xor_peer_address().address();
let span = tracing::error_span!("channel_bind", %requested_channel, %peer_address, allocation = field::Empty);
let span = tracing::error_span!("handle_channel_bind_request", %requested_channel, %peer_address, allocation = field::Empty);
let _guard = span.enter();
self.verify_auth(&request, now)?;
@@ -699,7 +710,7 @@ where
let channel_number = message.channel();
let data = message.data();
let span = tracing::error_span!("channel_data", channel = %channel_number, recipient = field::Empty);
let span = tracing::error_span!("handle_channel_data_message", channel = %channel_number, recipient = field::Empty);
let _guard = span.enter();
let Some(channel) = self.channels_by_number.get(&channel_number) else {
@@ -878,6 +889,8 @@ where
fn delete_allocation(&mut self, id: AllocationId) {
let Some(client) = self.clients_by_allocation.remove(&id) else {
tracing::debug!("Unknown allocation");
return;
};
let allocation = self

View File

@@ -39,6 +39,8 @@ where
let remaining_actions = self.events.split_off(split_index);
let events = mem::replace(&mut self.events, remaining_actions);
tracing::trace!("Got {} pending actions", events.len());
events.into_iter().map(|event| event.action)
}