From 018dde3cc92958a3111989def5c2dc7735cf6876 Mon Sep 17 00:00:00 2001 From: Mirko <48352201+Mirko-von-Leipzig@users.noreply.github.com> Date: Thu, 6 Feb 2025 18:28:55 +0200 Subject: [PATCH] feat: remote tracing context (#669) --- CHANGELOG.md | 1 + Cargo.lock | 1 + crates/block-producer/src/server.rs | 15 ++-- crates/block-producer/src/store/mod.rs | 10 ++- crates/rpc/src/server/api.rs | 22 +++-- crates/store/src/server/mod.rs | 1 + crates/utils/Cargo.toml | 1 + crates/utils/src/errors.rs | 3 + crates/utils/src/lib.rs | 1 + crates/utils/src/tracing/grpc.rs | 116 +++++++++++++++++++++++++ crates/utils/src/tracing/mod.rs | 1 + 11 files changed, 158 insertions(+), 14 deletions(-) create mode 100644 crates/utils/src/tracing/grpc.rs create mode 100644 crates/utils/src/tracing/mod.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 1637ec794..65a7ef2c4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ### Enhancements - Add an optional open-telemetry trace exporter (#659). +- Support tracing across gRPC boundaries using remote tracing context (#669). ### Changes diff --git a/Cargo.lock b/Cargo.lock index 5e6116ea9..e9eac679d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1957,6 +1957,7 @@ version = "0.8.0" dependencies = [ "anyhow", "figment", + "http", "itertools 0.14.0", "miden-objects", "opentelemetry", diff --git a/crates/block-producer/src/server.rs b/crates/block-producer/src/server.rs index 2eaf0e506..251d0b339 100644 --- a/crates/block-producer/src/server.rs +++ b/crates/block-producer/src/server.rs @@ -7,6 +7,7 @@ use miden_node_proto::generated::{ use miden_node_utils::{ errors::ApiError, formatting::{format_input_notes, format_output_notes}, + tracing::grpc::OtelInterceptor, }; use miden_objects::{ block::BlockNumber, transaction::ProvenTransaction, utils::serde::Deserializable, @@ -52,11 +53,14 @@ impl BlockProducer { pub async fn init(config: BlockProducerConfig) -> Result { info!(target: COMPONENT, %config, "Initializing server"); - let store = StoreClient::new( - store_client::ApiClient::connect(config.store_url.to_string()) - .await - .map_err(|err| ApiError::DatabaseConnectionFailed(err.to_string()))?, - ); + let channel = tonic::transport::Endpoint::try_from(config.store_url.to_string()) + .map_err(|err| ApiError::InvalidStoreUrl(err.to_string()))? + .connect() + .await + .map_err(|err| ApiError::DatabaseConnectionFailed(err.to_string()))?; + + let store = store_client::ApiClient::with_interceptor(channel, OtelInterceptor); + let store = StoreClient::new(store); let latest_header = store .latest_header() @@ -208,6 +212,7 @@ impl BlockProducerRpcServer { async fn serve(self, listener: TcpListener) -> Result<(), tonic::transport::Error> { tonic::transport::Server::builder() + .trace_fn(miden_node_utils::tracing::grpc::block_producer_trace_fn) .add_service(api_server::ApiServer::new(self)) .serve_with_incoming(TcpListenerStream::new(listener)) .await diff --git a/crates/block-producer/src/store/mod.rs b/crates/block-producer/src/store/mod.rs index 1aa2638fd..2c4a36fe9 100644 --- a/crates/block-producer/src/store/mod.rs +++ b/crates/block-producer/src/store/mod.rs @@ -19,7 +19,7 @@ use miden_node_proto::{ }, AccountState, }; -use miden_node_utils::formatting::format_opt; +use miden_node_utils::{formatting::format_opt, tracing::grpc::OtelInterceptor}; use miden_objects::{ account::AccountId, block::{Block, BlockHeader, BlockNumber}, @@ -29,7 +29,7 @@ use miden_objects::{ Digest, }; use miden_processor::crypto::RpoDigest; -use tonic::transport::Channel; +use tonic::{service::interceptor::InterceptedService, transport::Channel}; use tracing::{debug, info, instrument}; use crate::{block::BlockInputs, errors::StoreError, COMPONENT}; @@ -121,17 +121,19 @@ impl TryFrom for TransactionInputs { // STORE CLIENT // ================================================================================================ +type InnerClient = store_client::ApiClient>; + /// Interface to the store's gRPC API. /// /// Essentially just a thin wrapper around the generated gRPC client which improves type safety. #[derive(Clone)] pub struct StoreClient { - inner: store_client::ApiClient, + inner: InnerClient, } impl StoreClient { /// TODO: this should probably take store connection string and create a connection internally - pub fn new(store: store_client::ApiClient) -> Self { + pub fn new(store: InnerClient) -> Self { Self { inner: store } } diff --git a/crates/rpc/src/server/api.rs b/crates/rpc/src/server/api.rs index e9fe29a2a..fbb233eb4 100644 --- a/crates/rpc/src/server/api.rs +++ b/crates/rpc/src/server/api.rs @@ -18,12 +18,14 @@ use miden_node_proto::{ }, try_convert, }; +use miden_node_utils::tracing::grpc::OtelInterceptor; use miden_objects::{ account::AccountId, crypto::hash::rpo::RpoDigest, transaction::ProvenTransaction, utils::serde::Deserializable, Digest, MAX_NUM_FOREIGN_ACCOUNTS, MIN_PROOF_SECURITY_LEVEL, }; use miden_tx::TransactionVerifier; use tonic::{ + service::interceptor::InterceptedService, transport::{Channel, Error}, Request, Response, Status, }; @@ -34,19 +36,29 @@ use crate::{config::RpcConfig, COMPONENT}; // RPC API // ================================================================================================ +type StoreClient = store_client::ApiClient>; +type BlockProducerClient = + block_producer_client::ApiClient>; + pub struct RpcApi { - store: store_client::ApiClient, - block_producer: block_producer_client::ApiClient, + store: StoreClient, + block_producer: BlockProducerClient, } impl RpcApi { pub(super) async fn from_config(config: &RpcConfig) -> Result { - let store = store_client::ApiClient::connect(config.store_url.to_string()).await?; + let channel = tonic::transport::Endpoint::try_from(config.store_url.to_string())? + .connect() + .await?; + let store = store_client::ApiClient::with_interceptor(channel, OtelInterceptor); info!(target: COMPONENT, store_endpoint = config.store_url.as_str(), "Store client initialized"); + let channel = tonic::transport::Endpoint::try_from(config.block_producer_url.to_string())? + .connect() + .await?; let block_producer = - block_producer_client::ApiClient::connect(config.block_producer_url.to_string()) - .await?; + block_producer_client::ApiClient::with_interceptor(channel, OtelInterceptor); + info!( target: COMPONENT, block_producer_endpoint = config.block_producer_url.as_str(), diff --git a/crates/store/src/server/mod.rs b/crates/store/src/server/mod.rs index 6ef429d08..2b65a1dc2 100644 --- a/crates/store/src/server/mod.rs +++ b/crates/store/src/server/mod.rs @@ -62,6 +62,7 @@ impl Store { /// Note: this blocks until the server dies. pub async fn serve(self) -> Result<(), ApiError> { tonic::transport::Server::builder() + .trace_fn(miden_node_utils::tracing::grpc::store_trace_fn) .add_service(self.api_service) .serve_with_incoming(TcpListenerStream::new(self.listener)) .await diff --git a/crates/utils/Cargo.toml b/crates/utils/Cargo.toml index c37684c8f..b411cd291 100644 --- a/crates/utils/Cargo.toml +++ b/crates/utils/Cargo.toml @@ -21,6 +21,7 @@ vergen = ["dep:vergen", "dep:vergen-gitcl"] [dependencies] anyhow = { version = "1.0" } figment = { version = "0.10", features = ["env", "toml"] } +http = "1.2" itertools = { workspace = true } miden-objects = { workspace = true } opentelemetry = "0.27" diff --git a/crates/utils/src/errors.rs b/crates/utils/src/errors.rs index 283da7c5b..0bed649e8 100644 --- a/crates/utils/src/errors.rs +++ b/crates/utils/src/errors.rs @@ -21,4 +21,7 @@ pub enum ApiError { #[error("connection to the database has failed: {0}")] DatabaseConnectionFailed(String), + + #[error("parsing store url failed: {0}")] + InvalidStoreUrl(String), } diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index 275ee7e35..8c71955d5 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -3,4 +3,5 @@ pub mod crypto; pub mod errors; pub mod formatting; pub mod logging; +pub mod tracing; pub mod version; diff --git a/crates/utils/src/tracing/grpc.rs b/crates/utils/src/tracing/grpc.rs new file mode 100644 index 000000000..1e951af12 --- /dev/null +++ b/crates/utils/src/tracing/grpc.rs @@ -0,0 +1,116 @@ +use tracing_opentelemetry::OpenTelemetrySpanExt; + +/// A [`trace_fn`](tonic::transport::server::Server) implementation for the block producer which +/// adds open-telemetry information to the span. +/// +/// Creates an `info` span following the open-telemetry standard: `block-producer.rpc/{method}`. +/// Additionally also pulls in remote tracing context which allows the server trace to be connected +/// to the client's origin trace. +pub fn block_producer_trace_fn(request: &http::Request<()>) -> tracing::Span { + let span = if let Some("SubmitProvenTransaction") = request.uri().path().rsplit('/').next() { + tracing::info_span!("block-producer.rpc/SubmitProvenTransaction") + } else { + tracing::info_span!("block-producer.rpc/Unknown") + }; + + add_otel_span_attributes(span, request) +} + +/// A [`trace_fn`](tonic::transport::server::Server) implementation for the store which adds +/// open-telemetry information to the span. +/// +/// Creates an `info` span following the open-telemetry standard: `store.rpc/{method}`. Additionally +/// also pulls in remote tracing context which allows the server trace to be connected to the +/// client's origin trace. +pub fn store_trace_fn(request: &http::Request<()>) -> tracing::Span { + let span = match request.uri().path().rsplit('/').next() { + Some("ApplyBlock") => tracing::info_span!("store.rpc/ApplyBlock"), + Some("CheckNullifiers") => tracing::info_span!("store.rpc/CheckNullifiers"), + Some("CheckNullifiersByPrefix") => tracing::info_span!("store.rpc/CheckNullifiersByPrefix"), + Some("GetAccountDetails") => tracing::info_span!("store.rpc/GetAccountDetails"), + Some("GetAccountProofs") => tracing::info_span!("store.rpc/GetAccountProofs"), + Some("GetAccountStateDelta") => tracing::info_span!("store.rpc/GetAccountStateDelta"), + Some("GetBlockByNumber") => tracing::info_span!("store.rpc/GetBlockByNumber"), + Some("GetBlockHeaderByNumber") => tracing::info_span!("store.rpc/GetBlockHeaderByNumber"), + Some("GetBlockInputs") => tracing::info_span!("store.rpc/GetBlockInputs"), + Some("GetBatchInputs") => tracing::info_span!("store.rpc/GetBatchInputs"), + Some("GetNotesById") => tracing::info_span!("store.rpc/GetNotesById"), + Some("GetTransactionInputs") => tracing::info_span!("store.rpc/GetTransactionInputs"), + Some("SyncNotes") => tracing::info_span!("store.rpc/SyncNotes"), + Some("SyncState") => tracing::info_span!("store.rpc/SyncState"), + _ => tracing::info_span!("store.rpc/Unknown"), + }; + + add_otel_span_attributes(span, request) +} + +/// Adds remote tracing context to the span. +/// +/// Could be expanded in the future by adding in more open-telemetry properties. +fn add_otel_span_attributes(span: tracing::Span, request: &http::Request<()>) -> tracing::Span { + // Pull the open-telemetry parent context using the HTTP extractor. We could make a more + // generic gRPC extractor by utilising the gRPC metadata. However that + // (a) requires cloning headers, + // (b) we would have to write this ourselves, and + // (c) gRPC metadata is transferred using HTTP headers in any case. + use tracing_opentelemetry::OpenTelemetrySpanExt; + let otel_ctx = opentelemetry::global::get_text_map_propagator(|propagator| { + propagator.extract(&MetadataExtractor(&tonic::metadata::MetadataMap::from_headers( + request.headers().clone(), + ))) + }); + span.set_parent(otel_ctx); + + span +} + +/// Injects open-telemetry remote context into traces. +#[derive(Copy, Clone)] +pub struct OtelInterceptor; + +impl tonic::service::Interceptor for OtelInterceptor { + fn call( + &mut self, + mut request: tonic::Request<()>, + ) -> Result, tonic::Status> { + let ctx = tracing::Span::current().context(); + opentelemetry::global::get_text_map_propagator(|propagator| { + propagator.inject_context(&ctx, &mut MetadataInjector(request.metadata_mut())); + }); + + Ok(request) + } +} + +struct MetadataExtractor<'a>(&'a tonic::metadata::MetadataMap); +impl opentelemetry::propagation::Extractor for MetadataExtractor<'_> { + /// Get a value for a key from the `MetadataMap`. If the value can't be converted to &str, + /// returns None + fn get(&self, key: &str) -> Option<&str> { + self.0.get(key).and_then(|metadata| metadata.to_str().ok()) + } + + /// Collect all the keys from the `MetadataMap`. + fn keys(&self) -> Vec<&str> { + self.0 + .keys() + .map(|key| match key { + tonic::metadata::KeyRef::Ascii(v) => v.as_str(), + tonic::metadata::KeyRef::Binary(v) => v.as_str(), + }) + .collect::>() + } +} + +struct MetadataInjector<'a>(&'a mut tonic::metadata::MetadataMap); +impl opentelemetry::propagation::Injector for MetadataInjector<'_> { + /// Set a key and value in the `MetadataMap`. Does nothing if the key or value are not valid + /// inputs + fn set(&mut self, key: &str, value: String) { + if let Ok(key) = tonic::metadata::MetadataKey::from_bytes(key.as_bytes()) { + if let Ok(val) = tonic::metadata::MetadataValue::try_from(&value) { + self.0.insert(key, val); + } + } + } +} diff --git a/crates/utils/src/tracing/mod.rs b/crates/utils/src/tracing/mod.rs new file mode 100644 index 000000000..773d491c6 --- /dev/null +++ b/crates/utils/src/tracing/mod.rs @@ -0,0 +1 @@ +pub mod grpc;