diff --git a/Cargo.lock b/Cargo.lock index dbb1a29cea..592441cbcb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2860,10 +2860,12 @@ dependencies = [ "http-body-util", "httptest", "hyper", + "integration-tests-o11y", "lazy_static", "mockall", "opentelemetry", "opentelemetry-appender-tracing", + "opentelemetry-proto", "opentelemetry-semantic-conventions", "opentelemetry_sdk", "percent-encoding", diff --git a/src/gax-internal/Cargo.toml b/src/gax-internal/Cargo.toml index cba373343a..0227e310ce 100644 --- a/src/gax-internal/Cargo.toml +++ b/src/gax-internal/Cargo.toml @@ -134,8 +134,9 @@ bytes.workspace = true google-cloud-test-utils.workspace = true httptest.workspace = true mockall.workspace = true -opentelemetry_sdk = { workspace = true, features = ["logs", "metrics", "testing"] } opentelemetry-appender-tracing = { workspace = true } +opentelemetry-proto = { workspace = true } +opentelemetry_sdk = { workspace = true, features = ["logs", "metrics", "testing"] } pretty_assertions.workspace = true scoped-env.workspace = true serde_with.workspace = true @@ -146,8 +147,9 @@ tracing = { workspace = true, features = ["std"] } tracing-opentelemetry = { workspace = true } tracing-subscriber = { workspace = true, features = ["json", "registry", "std"] } # Local crates -echo-server = { path = "echo-server" } -grpc-server = { path = "grpc-server" } +echo-server = { path = "echo-server" } +grpc-server = { path = "grpc-server" } +integration-tests-o11y = { path = "../../tests/o11y" } # reqwest panics if (1) configured to support TLS and (2) no default crypto # provider is configured via features or installed at runtime. Even if reqwest # is being used without TLS. Enabling the default provider in tests to diff --git a/src/gax-internal/src/grpc.rs b/src/gax-internal/src/grpc.rs index 6698bf4f68..4b4bcdad03 100644 --- a/src/gax-internal/src/grpc.rs +++ b/src/gax-internal/src/grpc.rs @@ -18,8 +18,6 @@ pub mod from_status; pub mod status; pub mod tonic; -#[cfg(google_cloud_unstable_tracing)] -use crate::observability::attributes::{self, keys::*, otel_status_codes}; use ::tonic::client::Grpc; use ::tonic::transport::Channel; use from_status::to_gax_error; @@ -45,8 +43,6 @@ use google_cloud_gax::retry_policy::{ }; use google_cloud_gax::retry_throttler::SharedRetryThrottler; use http::HeaderMap; -#[cfg(google_cloud_unstable_tracing)] -use opentelemetry_semantic_conventions::{attribute as otel_attr, trace as otel_trace}; use std::sync::Arc; use std::time::Duration; @@ -282,7 +278,6 @@ impl Client { Request: prost::Message + Clone + 'static, Response: prost::Message + Default + 'static, { - use ::tonic::IntoRequest; let headers = Self::make_headers(api_client_header, request_params, &options).await?; let headers = self.add_auth_headers(headers).await?; let metadata = tonic::MetadataMap::from_headers(headers); @@ -293,21 +288,52 @@ impl Client { let codec = tonic_prost::ProstCodec::::default(); let mut inner = self.inner.clone(); inner.ready().await.map_err(Error::io)?; + #[cfg(google_cloud_unstable_tracing)] + let span = self.create_grpc_span(&path, None); + #[cfg(google_cloud_unstable_tracing)] if let Some(recorder) = crate::observability::RequestRecorder::current() { recorder.on_grpc_request(&path); } - let result = inner - .server_streaming(request.into_request(), path, codec) - .await; + + let pending = inner + .server_streaming(request, path, codec) + .map_err(to_gax_error); + + #[cfg(not(google_cloud_unstable_tracing))] + let result = pending.await; + + // TODO(#5372): The span created by `WithTransportSpan` only covers stream initiation. + // Consider instrumenting the returned stream to capture errors during the stream's lifetime. + #[cfg(google_cloud_unstable_tracing)] - if let Some(recorder) = crate::observability::RequestRecorder::current() { - match &result { - Ok(_) => recorder.on_grpc_response(), - Err(e) => recorder.on_grpc_error(&to_gax_error(e.clone())), + let result = { + use crate::observability::{ + WithTransportLogging, WithTransportMetric, WithTransportSpan, + }; + + let pending = WithTransportMetric::new(self.metric.clone(), pending, 0); + let pending = WithTransportLogging::new(pending); + let pending = WithTransportSpan::new(span, pending); + + if let Some(recorder) = crate::observability::RequestRecorder::current() { + recorder.scope(pending).await + } else { + pending.await + } + }; + + match result { + Ok(response) => Ok(Ok(response)), + Err(err) => { + use std::error::Error as _; + if let Some(status) = err.source().and_then(|e| e.downcast_ref::()) { + Ok(Err(status.clone())) + } else { + Err(err) + } } } - Ok(result) } /// Runs the retry loop. @@ -373,46 +399,14 @@ impl Client { Response: prost::Message + std::default::Default + 'static, { #[cfg(google_cloud_unstable_tracing)] - let span = if let Some(attrs) = &self.tracing_attributes { - let rpc_method = path.path().trim_start_matches('/'); - let (service, version, repo, artifact) = if let Some(info) = attrs.instrumentation { - ( - Some(info.service_name), - Some(info.client_version), - Some("googleapis/google-cloud-rust"), - Some(info.client_artifact), - ) - } else { - (None, None, None, None) - }; - let resend_count = if _prior_attempt_count > 0 { + let span = self.create_grpc_span( + &path, + if _prior_attempt_count > 0 { Some(_prior_attempt_count) } else { None - }; - - tracing::info_span!( - "grpc.request", - { OTEL_NAME } = rpc_method, - { RPC_SYSTEM_NAME } = attributes::RPC_SYSTEM_GRPC, - { OTEL_KIND } = attributes::OTEL_KIND_CLIENT, - { otel_trace::RPC_METHOD } = rpc_method, - { otel_trace::SERVER_ADDRESS } = attrs.server_address, - { otel_trace::SERVER_PORT } = attrs.server_port, - { otel_attr::URL_DOMAIN } = attrs.url_domain, - { RPC_RESPONSE_STATUS_CODE } = tracing::field::Empty, - { OTEL_STATUS_CODE } = otel_status_codes::UNSET, - { otel_trace::ERROR_TYPE } = tracing::field::Empty, - { GCP_CLIENT_SERVICE } = service, - { GCP_CLIENT_VERSION } = version, - { GCP_CLIENT_REPO } = repo, - { GCP_CLIENT_ARTIFACT } = artifact, - { GCP_GRPC_RESEND_COUNT } = resend_count, - { GCP_RESOURCE_DESTINATION_ID } = tracing::field::Empty, - ) - } else { - tracing::Span::none() - }; + }, + ); #[allow(unused_mut)] let mut headers = self.add_auth_headers(headers).await?; @@ -626,6 +620,52 @@ impl Client { Ok(headers) } + #[cfg(google_cloud_unstable_tracing)] + fn create_grpc_span( + &self, + path: &http::uri::PathAndQuery, + resend_count: Option, + ) -> tracing::Span { + use crate::observability::attributes::{self, keys, otel_status_codes}; + use opentelemetry_semantic_conventions::attribute as otel_attr; + + if let Some(attrs) = &self.tracing_attributes { + let rpc_method = path.path().trim_start_matches('/'); + let (service, version, repo, artifact) = if let Some(info) = attrs.instrumentation { + ( + Some(info.service_name), + Some(info.client_version), + Some("googleapis/google-cloud-rust"), + Some(info.client_artifact), + ) + } else { + (None, None, None, None) + }; + + tracing::info_span!( + "grpc.request", + { keys::OTEL_NAME } = rpc_method, + { keys::RPC_SYSTEM_NAME } = attributes::RPC_SYSTEM_GRPC, + { keys::OTEL_KIND } = attributes::OTEL_KIND_CLIENT, + { keys::RPC_METHOD } = rpc_method, + { keys::SERVER_ADDRESS } = attrs.server_address, + { keys::SERVER_PORT } = attrs.server_port, + { otel_attr::URL_DOMAIN } = attrs.url_domain, + { keys::RPC_RESPONSE_STATUS_CODE } = tracing::field::Empty, + { keys::OTEL_STATUS_CODE } = otel_status_codes::UNSET, + { keys::ERROR_TYPE } = tracing::field::Empty, + { keys::GCP_CLIENT_SERVICE } = service, + { keys::GCP_CLIENT_VERSION } = version, + { keys::GCP_CLIENT_REPO } = repo, + { keys::GCP_CLIENT_ARTIFACT } = artifact, + { keys::GCP_GRPC_RESEND_COUNT } = resend_count, + { keys::GCP_RESOURCE_DESTINATION_ID } = tracing::field::Empty, + ) + } else { + tracing::Span::none() + } + } + fn get_retry_policy(&self, options: &RequestOptions) -> Arc { options .retry_policy() diff --git a/src/gax-internal/tests/grpc_observability.rs b/src/gax-internal/tests/grpc_observability.rs index 9eb1a21b31..73a05de722 100644 --- a/src/gax-internal/tests/grpc_observability.rs +++ b/src/gax-internal/tests/grpc_observability.rs @@ -510,6 +510,138 @@ mod tests { Ok(()) } + #[cfg(feature = "_internal-grpc-server-streaming")] + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + #[serial_test::serial] + async fn server_streaming_metrics() -> anyhow::Result<()> { + use integration_tests_o11y::mock_collector::MockCollector; + use integration_tests_o11y::otlp::metrics::Builder as MeterProviderBuilder; + use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; + + let (endpoint, _server) = start_echo_server().await?; + + let mock_collector = MockCollector::default(); + let otlp_endpoint = mock_collector.start().await; + + let meter_provider = MeterProviderBuilder::new("test-project", "integration-tests") + .with_endpoint( + otlp_endpoint + .parse::() + .expect("Failed to parse URI"), + ) + .with_credentials(google_cloud_auth::credentials::anonymous::Builder::new().build()) + .build() + .await?; + opentelemetry::global::set_meter_provider(meter_provider.clone()); + + // Configure client with tracing enabled + let mut config = google_cloud_gax_internal::options::ClientConfig::default(); + config.tracing = true; + config.cred = Some(test_credentials()); + + let client = grpc::Client::new_with_instrumentation(config, &endpoint, &TEST_INFO).await?; + + // Send a server streaming request + let extensions = { + let mut e = tonic::Extensions::new(); + e.insert(tonic::GrpcMethod::new( + "google.test.v1.EchoServices", + "Expand", + )); + e + }; + + let request = google::test::v1::EchoRequest { + message: "test message".into(), + ..Default::default() + }; + + let recorder = RequestRecorder::new(*TEST_INFO); + recorder.on_client_request( + ClientRequestAttributes::default() + .set_url_template("/google.test.v1.EchoService/Expand"), + ); + let mut response_stream = recorder + .scope(async { + client + .server_streaming::<_, google::test::v1::EchoResponse>( + extensions, + http::uri::PathAndQuery::from_static("/google.test.v1.EchoService/Expand"), + request, + RequestOptions::default(), + "test-only-api-client/1.0", + "name=test-only", + ) + .await + }) + .await? + .into_inner(); + + // Receive responses + let mut responses = Vec::new(); + while let Some(res) = futures::StreamExt::next(&mut response_stream).await { + responses.push(res?); + } + assert!(!responses.is_empty()); + + // Force flush metrics + let _ = meter_provider.force_flush(); + + // Verify metrics + let mut metrics_requests = mock_collector.metrics.lock().expect("never poisoned"); + let mut found_duration_metric = false; + while let Some(req) = metrics_requests.pop() { + let req: tonic::Request = req; + let (_, _, metrics_request) = req.into_parts(); + for rm in metrics_request.resource_metrics { + for sm in rm.scope_metrics { + for m in sm.metrics { + if m.name.contains("gcp.client.attempt.duration") { + found_duration_metric = true; + if let Some( + opentelemetry_proto::tonic::metrics::v1::metric::Data::Histogram(h), + ) = m.data + { + let point = + h.data_points.first().expect("should have a data point"); + let mut metric_attributes = std::collections::HashMap::new(); + for kv in &point.attributes { + metric_attributes + .insert(kv.key.clone(), kv.value.clone().unwrap()); + } + + let get_metric_string = |key: &str| -> Option { + metric_attributes.get(key).and_then(|v| match &v.value { + Some(opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s)) => { + Some(s.clone()) + } + _ => None, + }) + }; + + assert_eq!( + get_metric_string("rpc.system.name").as_deref(), + Some("grpc") + ); + assert_eq!( + get_metric_string("rpc.method").as_deref(), + Some("google.test.v1.EchoService/Expand") + ); + assert_eq!( + get_metric_string("rpc.response.status_code").as_deref(), + Some("OK") + ); + } + } + } + } + } + } + assert!(found_duration_metric, "Should have found duration metric"); + + Ok(()) + } + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn cancellation() -> anyhow::Result<()> { use std::time::Duration;