feat(relay): allow configuration of OTLP exporter (#2050)

Allows configuration of an OTLP collector as an alternative to Google
Cloud Trace. We also add a temporary logger that allows us to print
things to stdout as we are setting up the more complicated tracing
infrastructure. This might be prove helpful during debugging!
This commit is contained in:
Thomas Eizinger
2023-09-13 10:47:42 +10:00
committed by GitHub
parent 782bbe9417
commit fbfce585b0
4 changed files with 111 additions and 14 deletions

33
rust/Cargo.lock generated
View File

@@ -2375,6 +2375,37 @@ dependencies = [
"opentelemetry_sdk",
]
[[package]]
name = "opentelemetry-otlp"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8af72d59a4484654ea8eb183fea5ae4eb6a41d7ac3e3bae5f4d2a282a3a7d3ca"
dependencies = [
"async-trait",
"futures",
"futures-util",
"http",
"opentelemetry",
"opentelemetry-proto",
"prost",
"thiserror",
"tokio",
"tonic",
]
[[package]]
name = "opentelemetry-proto"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "045f8eea8c0fa19f7d48e7bc3128a39c2e5c533d5c61298c548dfefc1064474c"
dependencies = [
"futures",
"futures-util",
"opentelemetry",
"prost",
"tonic",
]
[[package]]
name = "opentelemetry-semantic-conventions"
version = "0.11.0"
@@ -2935,6 +2966,7 @@ dependencies = [
"hex-literal",
"once_cell",
"opentelemetry",
"opentelemetry-otlp",
"opentelemetry-stackdriver",
"phoenix-channel",
"prometheus-client",
@@ -2948,6 +2980,7 @@ dependencies = [
"test-strategy",
"tokio",
"tracing",
"tracing-core",
"tracing-opentelemetry",
"tracing-stackdriver",
"tracing-subscriber",

View File

@@ -19,7 +19,9 @@ tracing-stackdriver = { version = "0.7.2", features = ["opentelemetry"] }
opentelemetry-stackdriver = { version = "0.16.0", default-features = false, features = ["gcp_auth", "tls-native-roots"] }
tracing-opentelemetry = "0.19.0"
opentelemetry = { version = "0.19.0", features = ["rt-tokio"] }
opentelemetry-otlp = "0.12.0"
env_logger = "0.10.0"
tracing-core = "0.1.31"
bytes = "1.4.0"
sha2 = "0.10.6"
base64 = "0.21.4"

View File

@@ -4,6 +4,7 @@ use futures::channel::mpsc;
use futures::{future, FutureExt, SinkExt, StreamExt};
use opentelemetry::sdk::trace::TracerProvider;
use opentelemetry::trace::TracerProvider as _;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_stackdriver::Authorizer;
use phoenix_channel::{Error, Event, PhoenixChannel};
use prometheus_client::registry::Registry;
@@ -21,7 +22,7 @@ use std::pin::Pin;
use std::task::Poll;
use std::time::SystemTime;
use tracing::level_filters::LevelFilter;
use tracing::Subscriber;
use tracing::{Span, Subscriber};
use tracing_stackdriver::CloudTraceConfiguration;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
@@ -68,6 +69,12 @@ struct Args {
/// Where to send trace data to.
#[arg(long, env)]
trace_collector: Option<TraceCollector>,
/// Which OTLP collector we should connect to.
///
/// This setting only has an effect if `TRACE_COLLECTOR` is set to `otlp`.
#[arg(env, default_value = "127.0.0.1:4317")]
otlp_grpc_endpoint: SocketAddr,
}
#[derive(clap::ValueEnum, Debug, Clone, Copy)]
@@ -81,18 +88,16 @@ enum LogFormat {
enum TraceCollector {
/// Sends traces to Google Cloud Trace.
GoogleCloudTrace,
// TODO: Extend with OTLP receiver
/// Sends traces to an OTLP collector.
Otlp,
}
#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();
setup_tracing(&args).await?;
// Must create a root span for traces to be sampled.
let root = tracing::error_span!("root");
let _root = root.enter();
let root_span = setup_tracing(&args).await?;
let _guard = root_span.enter();
let public_addr = match (args.public_ip4_addr, args.public_ip6_addr) {
(Some(ip4), Some(ip6)) => IpStack::Dual { ip4, ip6 },
@@ -191,6 +196,8 @@ async fn main() -> Result<()> {
///
/// See [`log_layer`] for details on the base log layer.
///
/// ## Integration with Google Cloud Trace
///
/// If the user has specified [`TraceCollector::GoogleCloudTrace`], we will attempt to connect to Google Cloud Trace.
/// This requires authentication.
/// Here is how we will attempt to obtain those, for details see <https://docs.rs/gcp_auth/0.9.0/gcp_auth/struct.AuthenticationManager.html#method.new>.
@@ -199,21 +206,39 @@ async fn main() -> Result<()> {
/// 2. Look for credentials in `.config/gcloud/application_default_credentials.json`; if found, use these credentials to request refresh tokens.
/// 3. Send a HTTP request to the internal metadata server to retrieve a token; if it succeeds, use the default service account as the token source.
/// 4. Check if the `gcloud` tool is available on the PATH; if so, use the `gcloud auth print-access-token` command as the token source.
async fn setup_tracing(args: &Args) -> Result<()> {
let registry = tracing_subscriber::registry();
///
/// ## Integration with OTLP
///
/// If the user has specified [`TraceCollector::Otlp`], we will set up an OTLP-exporter that connects to an OTLP collector specified at `Args.otlp_grpc_endpoint`.
async fn setup_tracing(args: &Args) -> Result<Span> {
// Use `tracing_core` directly for the temp logger because that one does not initialize a `log` logger.
// A `log` Logger cannot be unset once set, so we can't use that for our temp logger during the setup.
let temp_logger_guard = tracing_core::dispatcher::set_default(
&tracing_subscriber::registry()
.with(log_layer(args, None))
.into(),
);
match args.trace_collector {
None => registry.with(log_layer(args, None)).try_init(),
None => {
drop(temp_logger_guard);
tracing_subscriber::registry().with(log_layer(args, None)).try_init()
}
Some(TraceCollector::GoogleCloudTrace) => {
tracing::trace!("Setting up Google-Cloud-Trace collector");
let authorizer = opentelemetry_stackdriver::GcpAuthorizer::new()
.await
.context("Failed to find GCP credentials")?;
let project_id = authorizer.project_id().to_owned();
tracing::trace!(%project_id, "Successfully retrieved authentication token for Google services");
let (exporter, driver) = opentelemetry_stackdriver::Builder::default()
.build(authorizer)
.await?;
.await.context("Failed to create StackDriverExporter")?;
tokio::spawn(driver);
let tracer = TracerProvider::builder()
@@ -221,15 +246,52 @@ async fn setup_tracing(args: &Args) -> Result<()> {
.build()
.tracer("relay");
registry
tracing::trace!("Successfully initialized trace provider on tokio runtime");
drop(temp_logger_guard);
tracing_subscriber::registry()
.with(log_layer(args, Some(project_id)))
.with(tracing_opentelemetry::layer().with_tracer(tracer))
.try_init()
}
Some(TraceCollector::Otlp) => {
let grpc_endpoint = format!("http://{}", args.otlp_grpc_endpoint);
tracing::trace!(%grpc_endpoint, "Setting up OTLP exporter for collector");
let exporter = opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(grpc_endpoint);
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(exporter)
.install_batch(opentelemetry::runtime::Tokio)
.context("Failed to create OTLP pipeline")?;
tracing::trace!("Successfully initialized trace provider on tokio runtime");
// TODO: This is where we could also configure metrics.
drop(temp_logger_guard);
tracing_subscriber::registry()
.with(log_layer(args, None))
.with(tracing_opentelemetry::layer().with_tracer(tracer))
.try_init()
}
}
.context("Failed to init tracing")?;
Ok(())
// If we have a trace collector, we must define a root span, otherwise traces will not be sampled, i.e. discarded.
let root_span = if args.trace_collector.is_some() {
tracing::error_span!("root")
} else {
Span::none()
};
Ok(root_span)
}
/// Constructs the base log layer.

View File

@@ -28,7 +28,7 @@ locals {
},
{
name = "TRACE_COLLECTOR"
value = "google-cloud-trace"
value = "otlp"
},
{
name = "METRICS_ADDR"