Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions src/gax-internal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,9 @@ bytes.workspace = true
google-cloud-test-utils.workspace = true
httptest.workspace = true
mockall.workspace = true
opentelemetry_sdk = { workspace = true, features = ["logs", "metrics", "testing"] }
opentelemetry-appender-tracing = { workspace = true }
opentelemetry-proto = { workspace = true }
opentelemetry_sdk = { workspace = true, features = ["logs", "metrics", "testing"] }
pretty_assertions.workspace = true
scoped-env.workspace = true
serde_with.workspace = true
Expand All @@ -146,8 +147,9 @@ tracing = { workspace = true, features = ["std"] }
tracing-opentelemetry = { workspace = true }
tracing-subscriber = { workspace = true, features = ["json", "registry", "std"] }
# Local crates
echo-server = { path = "echo-server" }
grpc-server = { path = "grpc-server" }
echo-server = { path = "echo-server" }
grpc-server = { path = "grpc-server" }
integration-tests-o11y = { path = "../../tests/o11y" }
# reqwest panics if (1) configured to support TLS and (2) no default crypto
# provider is configured via features or installed at runtime. Even if reqwest
# is being used without TLS. Enabling the default provider in tests to
Expand Down
140 changes: 90 additions & 50 deletions src/gax-internal/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ pub mod from_status;
pub mod status;
pub mod tonic;

#[cfg(google_cloud_unstable_tracing)]
use crate::observability::attributes::{self, keys::*, otel_status_codes};
use ::tonic::client::Grpc;
use ::tonic::transport::Channel;
use from_status::to_gax_error;
Expand All @@ -45,8 +43,6 @@ use google_cloud_gax::retry_policy::{
};
use google_cloud_gax::retry_throttler::SharedRetryThrottler;
use http::HeaderMap;
#[cfg(google_cloud_unstable_tracing)]
use opentelemetry_semantic_conventions::{attribute as otel_attr, trace as otel_trace};
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -282,7 +278,6 @@ impl Client {
Request: prost::Message + Clone + 'static,
Response: prost::Message + Default + 'static,
{
use ::tonic::IntoRequest;
let headers = Self::make_headers(api_client_header, request_params, &options).await?;
let headers = self.add_auth_headers(headers).await?;
let metadata = tonic::MetadataMap::from_headers(headers);
Expand All @@ -293,21 +288,52 @@ impl Client {
let codec = tonic_prost::ProstCodec::<Request, Response>::default();
let mut inner = self.inner.clone();
inner.ready().await.map_err(Error::io)?;
#[cfg(google_cloud_unstable_tracing)]
let span = self.create_grpc_span(&path, None);

#[cfg(google_cloud_unstable_tracing)]
if let Some(recorder) = crate::observability::RequestRecorder::current() {
recorder.on_grpc_request(&path);
}
let result = inner
.server_streaming(request.into_request(), path, codec)
.await;

let pending = inner
.server_streaming(request, path, codec)
.map_err(to_gax_error);

#[cfg(not(google_cloud_unstable_tracing))]
let result = pending.await;

// TODO(#5372): The span created by `WithTransportSpan` only covers stream initiation.
// Consider instrumenting the returned stream to capture errors during the stream's lifetime.

#[cfg(google_cloud_unstable_tracing)]
if let Some(recorder) = crate::observability::RequestRecorder::current() {
match &result {
Ok(_) => recorder.on_grpc_response(),
Err(e) => recorder.on_grpc_error(&to_gax_error(e.clone())),
let result = {
use crate::observability::{
WithTransportLogging, WithTransportMetric, WithTransportSpan,
};

let pending = WithTransportMetric::new(self.metric.clone(), pending, 0);
let pending = WithTransportLogging::new(pending);
let pending = WithTransportSpan::new(span, pending);
Comment on lines +315 to +317
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(no action on this pr), it might be nice to add some helpers that allow us to chain these more easily so the code can look like

let pending = pending
  .with_transport_metric(self.metric.clone(), 0)
  .with_transport_logging()
  .with_transport_spane(span);


if let Some(recorder) = crate::observability::RequestRecorder::current() {
recorder.scope(pending).await
} else {
pending.await
}
};

match result {
Ok(response) => Ok(Ok(response)),
Err(err) => {
use std::error::Error as _;
if let Some(status) = err.source().and_then(|e| e.downcast_ref::<tonic::Status>()) {
Ok(Err(status.clone()))
} else {
Err(err)
}
}
}
Ok(result)
}

/// Runs the retry loop.
Expand Down Expand Up @@ -373,46 +399,14 @@ impl Client {
Response: prost::Message + std::default::Default + 'static,
{
#[cfg(google_cloud_unstable_tracing)]
let span = if let Some(attrs) = &self.tracing_attributes {
let rpc_method = path.path().trim_start_matches('/');
let (service, version, repo, artifact) = if let Some(info) = attrs.instrumentation {
(
Some(info.service_name),
Some(info.client_version),
Some("googleapis/google-cloud-rust"),
Some(info.client_artifact),
)
} else {
(None, None, None, None)
};
let resend_count = if _prior_attempt_count > 0 {
let span = self.create_grpc_span(
&path,
if _prior_attempt_count > 0 {
Some(_prior_attempt_count)
} else {
None
Comment on lines +404 to 407
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if _prior_attempt_count > 0 {
Some(_prior_attempt_count)
} else {
None
(_prior_attempt_count > 0).then_some(_prior_attempt_count)

};

tracing::info_span!(
"grpc.request",
{ OTEL_NAME } = rpc_method,
{ RPC_SYSTEM_NAME } = attributes::RPC_SYSTEM_GRPC,
{ OTEL_KIND } = attributes::OTEL_KIND_CLIENT,
{ otel_trace::RPC_METHOD } = rpc_method,
{ otel_trace::SERVER_ADDRESS } = attrs.server_address,
{ otel_trace::SERVER_PORT } = attrs.server_port,
{ otel_attr::URL_DOMAIN } = attrs.url_domain,
{ RPC_RESPONSE_STATUS_CODE } = tracing::field::Empty,
{ OTEL_STATUS_CODE } = otel_status_codes::UNSET,
{ otel_trace::ERROR_TYPE } = tracing::field::Empty,
{ GCP_CLIENT_SERVICE } = service,
{ GCP_CLIENT_VERSION } = version,
{ GCP_CLIENT_REPO } = repo,
{ GCP_CLIENT_ARTIFACT } = artifact,
{ GCP_GRPC_RESEND_COUNT } = resend_count,
{ GCP_RESOURCE_DESTINATION_ID } = tracing::field::Empty,
)
} else {
tracing::Span::none()
};
},
);

#[allow(unused_mut)]
let mut headers = self.add_auth_headers(headers).await?;
Expand Down Expand Up @@ -626,6 +620,52 @@ impl Client {
Ok(headers)
}

#[cfg(google_cloud_unstable_tracing)]
fn create_grpc_span(
&self,
path: &http::uri::PathAndQuery,
resend_count: Option<i64>,
) -> tracing::Span {
use crate::observability::attributes::{self, keys, otel_status_codes};
use opentelemetry_semantic_conventions::attribute as otel_attr;

if let Some(attrs) = &self.tracing_attributes {
let rpc_method = path.path().trim_start_matches('/');
let (service, version, repo, artifact) = if let Some(info) = attrs.instrumentation {
(
Some(info.service_name),
Some(info.client_version),
Some("googleapis/google-cloud-rust"),
Some(info.client_artifact),
)
} else {
(None, None, None, None)
};
Comment on lines +634 to +643
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let (service, version, repo, artifact) = if let Some(info) = attrs.instrumentation {
(
Some(info.service_name),
Some(info.client_version),
Some("googleapis/google-cloud-rust"),
Some(info.client_artifact),
)
} else {
(None, None, None, None)
};
let (service, version, repo, artifact) = attrs.instrumentation
.map(|info| (
Some(info.service_name),
Some(info.client_version),
Some("googleapis/google-cloud-rust"),
Some(info.client_artifact),
))
.unwrap_or_default();


tracing::info_span!(
"grpc.request",
{ keys::OTEL_NAME } = rpc_method,
{ keys::RPC_SYSTEM_NAME } = attributes::RPC_SYSTEM_GRPC,
{ keys::OTEL_KIND } = attributes::OTEL_KIND_CLIENT,
{ keys::RPC_METHOD } = rpc_method,
{ keys::SERVER_ADDRESS } = attrs.server_address,
{ keys::SERVER_PORT } = attrs.server_port,
{ otel_attr::URL_DOMAIN } = attrs.url_domain,
{ keys::RPC_RESPONSE_STATUS_CODE } = tracing::field::Empty,
{ keys::OTEL_STATUS_CODE } = otel_status_codes::UNSET,
{ keys::ERROR_TYPE } = tracing::field::Empty,
{ keys::GCP_CLIENT_SERVICE } = service,
{ keys::GCP_CLIENT_VERSION } = version,
{ keys::GCP_CLIENT_REPO } = repo,
{ keys::GCP_CLIENT_ARTIFACT } = artifact,
{ keys::GCP_GRPC_RESEND_COUNT } = resend_count,
{ keys::GCP_RESOURCE_DESTINATION_ID } = tracing::field::Empty,
)
} else {
tracing::Span::none()
}
}

fn get_retry_policy(&self, options: &RequestOptions) -> Arc<dyn RetryPolicy> {
options
.retry_policy()
Expand Down
132 changes: 132 additions & 0 deletions src/gax-internal/tests/grpc_observability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,138 @@ mod tests {
Ok(())
}

#[cfg(feature = "_internal-grpc-server-streaming")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial_test::serial]
async fn server_streaming_metrics() -> anyhow::Result<()> {
use integration_tests_o11y::mock_collector::MockCollector;
use integration_tests_o11y::otlp::metrics::Builder as MeterProviderBuilder;
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;

let (endpoint, _server) = start_echo_server().await?;

let mock_collector = MockCollector::default();
let otlp_endpoint = mock_collector.start().await;

let meter_provider = MeterProviderBuilder::new("test-project", "integration-tests")
.with_endpoint(
otlp_endpoint
.parse::<http::Uri>()
.expect("Failed to parse URI"),
)
.with_credentials(google_cloud_auth::credentials::anonymous::Builder::new().build())
.build()
.await?;
opentelemetry::global::set_meter_provider(meter_provider.clone());

// Configure client with tracing enabled
let mut config = google_cloud_gax_internal::options::ClientConfig::default();
config.tracing = true;
config.cred = Some(test_credentials());

let client = grpc::Client::new_with_instrumentation(config, &endpoint, &TEST_INFO).await?;

// Send a server streaming request
let extensions = {
let mut e = tonic::Extensions::new();
e.insert(tonic::GrpcMethod::new(
"google.test.v1.EchoServices",
"Expand",
));
e
};

let request = google::test::v1::EchoRequest {
message: "test message".into(),
..Default::default()
};

let recorder = RequestRecorder::new(*TEST_INFO);
recorder.on_client_request(
ClientRequestAttributes::default()
.set_url_template("/google.test.v1.EchoService/Expand"),
);
let mut response_stream = recorder
.scope(async {
client
.server_streaming::<_, google::test::v1::EchoResponse>(
extensions,
http::uri::PathAndQuery::from_static("/google.test.v1.EchoService/Expand"),
request,
RequestOptions::default(),
"test-only-api-client/1.0",
"name=test-only",
)
.await
})
.await?
.into_inner();

// Receive responses
let mut responses = Vec::new();
while let Some(res) = futures::StreamExt::next(&mut response_stream).await {
responses.push(res?);
}
assert!(!responses.is_empty());

// Force flush metrics
let _ = meter_provider.force_flush();

// Verify metrics
let mut metrics_requests = mock_collector.metrics.lock().expect("never poisoned");
let mut found_duration_metric = false;
while let Some(req) = metrics_requests.pop() {
let req: tonic::Request<ExportMetricsServiceRequest> = req;
let (_, _, metrics_request) = req.into_parts();
for rm in metrics_request.resource_metrics {
for sm in rm.scope_metrics {
for m in sm.metrics {
if m.name.contains("gcp.client.attempt.duration") {
found_duration_metric = true;
if let Some(
opentelemetry_proto::tonic::metrics::v1::metric::Data::Histogram(h),
) = m.data
{
let point =
h.data_points.first().expect("should have a data point");
let mut metric_attributes = std::collections::HashMap::new();
for kv in &point.attributes {
metric_attributes
.insert(kv.key.clone(), kv.value.clone().unwrap());
}

let get_metric_string = |key: &str| -> Option<String> {
metric_attributes.get(key).and_then(|v| match &v.value {
Some(opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s)) => {
Some(s.clone())
}
_ => None,
})
};

assert_eq!(
get_metric_string("rpc.system.name").as_deref(),
Some("grpc")
);
assert_eq!(
get_metric_string("rpc.method").as_deref(),
Some("google.test.v1.EchoService/Expand")
);
assert_eq!(
get_metric_string("rpc.response.status_code").as_deref(),
Some("OK")
);
}
}
}
}
}
}
assert!(found_duration_metric, "Should have found duration metric");

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn cancellation() -> anyhow::Result<()> {
use std::time::Duration;
Expand Down
Loading