Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions src/gax-internal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,9 @@ bytes.workspace = true
google-cloud-test-utils.workspace = true
httptest.workspace = true
mockall.workspace = true
opentelemetry_sdk = { workspace = true, features = ["logs", "metrics", "testing"] }
opentelemetry-appender-tracing = { workspace = true }
opentelemetry-proto = { workspace = true }
opentelemetry_sdk = { workspace = true, features = ["logs", "metrics", "testing"] }
pretty_assertions.workspace = true
scoped-env.workspace = true
serde_with.workspace = true
Expand All @@ -146,8 +147,9 @@ tracing = { workspace = true, features = ["std"] }
tracing-opentelemetry = { workspace = true }
tracing-subscriber = { workspace = true, features = ["json", "registry", "std"] }
# Local crates
echo-server = { path = "echo-server" }
grpc-server = { path = "grpc-server" }
echo-server = { path = "echo-server" }
grpc-server = { path = "grpc-server" }
integration-tests-o11y = { path = "../../tests/o11y" }
# reqwest panics if (1) configured to support TLS and (2) no default crypto
# provider is configured via features or installed at runtime. Even if reqwest
# is being used without TLS. Enabling the default provider in tests to
Expand Down
85 changes: 71 additions & 14 deletions src/gax-internal/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,21 +223,85 @@ impl Client {
let codec = tonic_prost::ProstCodec::<Request, Response>::default();
let mut inner = self.inner.clone();
inner.ready().await.map_err(Error::io)?;
#[cfg(google_cloud_unstable_tracing)]
let span = if let Some(attrs) = &self.tracing_attributes {
let rpc_method = path.path().trim_start_matches('/');
let (service, version, repo, artifact) = if let Some(info) = attrs.instrumentation {
(
Some(info.service_name),
Some(info.client_version),
Some("googleapis/google-cloud-rust"),
Some(info.client_artifact),
)
} else {
(None, None, None, None)
};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this can alternatively be written as

Suggested change
};
let (service, version, repo, artifact) = attrs.instrumentation
.map(|info| (
Some(info.service_name),
Some(info.client_version),
Some("googleapis/google-cloud-rust"),
Some(info.client_artifact),
))
.unwrap_or_default();


tracing::info_span!(
"grpc.request",
{ OTEL_NAME } = rpc_method,
{ RPC_SYSTEM_NAME } = attributes::RPC_SYSTEM_GRPC,
{ OTEL_KIND } = attributes::OTEL_KIND_CLIENT,
{ otel_trace::RPC_METHOD } = rpc_method,
{ otel_trace::SERVER_ADDRESS } = attrs.server_address,
{ otel_trace::SERVER_PORT } = attrs.server_port,
{ otel_attr::URL_DOMAIN } = attrs.url_domain,
{ RPC_RESPONSE_STATUS_CODE } = tracing::field::Empty,
{ OTEL_STATUS_CODE } = otel_status_codes::UNSET,
{ otel_trace::ERROR_TYPE } = tracing::field::Empty,
{ GCP_CLIENT_SERVICE } = service,
{ GCP_CLIENT_VERSION } = version,
{ GCP_CLIENT_REPO } = repo,
{ GCP_CLIENT_ARTIFACT } = artifact,
{ GCP_GRPC_RESEND_COUNT } = None::<i64>,
{ GCP_RESOURCE_DESTINATION_ID } = tracing::field::Empty,
Comment on lines +240 to +257
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

possible code clean up for later: This seems like it could share code with #5375

)
} else {
tracing::Span::none()
};

#[cfg(google_cloud_unstable_tracing)]
if let Some(recorder) = crate::observability::RequestRecorder::current() {
recorder.on_grpc_request(&path);
}
let result = inner

let pending = inner
.streaming(request.into_streaming_request(), path, codec)
.await;
.map_err(to_gax_error);

#[cfg(not(google_cloud_unstable_tracing))]
let result = pending.await;

// TODO(#5372): The span created by `WithTransportSpan` only covers stream initiation.
// Consider instrumenting the returned stream to capture errors during the stream's lifetime.
#[cfg(google_cloud_unstable_tracing)]
if let Some(recorder) = crate::observability::RequestRecorder::current() {
match &result {
Ok(_) => recorder.on_grpc_response(),
Err(e) => recorder.on_grpc_error(&to_gax_error(e.clone())),
let result = {
use crate::observability::{
WithTransportLogging, WithTransportMetric, WithTransportSpan,
};

let pending = WithTransportMetric::new(self.metric.clone(), pending, 0);
let pending = WithTransportLogging::new(pending);
let pending = WithTransportSpan::new(span, pending);

if let Some(recorder) = crate::observability::RequestRecorder::current() {
recorder.scope(pending).await
} else {
pending.await
}
};

match result {
Ok(response) => Ok(Ok(response)),
Err(err) => {
use std::error::Error as _;
if let Some(status) = err.source().and_then(|e| e.downcast_ref::<tonic::Status>()) {
Ok(Err(status.clone()))
} else {
Err(err)
}
}
}
Ok(result)
}

/// Opens a server stream.
Expand Down Expand Up @@ -300,13 +364,6 @@ impl Client {
let result = inner
.server_streaming(request.into_request(), path, codec)
.await;
#[cfg(google_cloud_unstable_tracing)]
if let Some(recorder) = crate::observability::RequestRecorder::current() {
match &result {
Ok(_) => recorder.on_grpc_response(),
Err(e) => recorder.on_grpc_error(&to_gax_error(e.clone())),
}
}
Ok(result)
}

Expand Down
196 changes: 173 additions & 23 deletions src/gax-internal/tests/grpc_observability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,6 @@ mod tests {
Ok(())
}

#[ignore]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_grpc_streaming_span() -> anyhow::Result<()> {
let (endpoint, _server) = start_echo_server().await?;
Expand All @@ -374,15 +373,23 @@ mod tests {
let (tx, rx) = tokio::sync::mpsc::channel(4);
let request_stream = tokio_stream::wrappers::ReceiverStream::new(rx);

let mut response_stream = client
.bidi_stream::<_, google::test::v1::EchoResponse>(
extensions,
http::uri::PathAndQuery::from_static("/google.test.v1.EchoService/Chat"),
request_stream,
RequestOptions::default(),
"test-only-api-client/1.0",
"name=test-only",
)
let recorder = RequestRecorder::new(*TEST_INFO);
recorder.on_client_request(
ClientRequestAttributes::default().set_url_template("/google.test.v1.EchoService/Chat"),
);
let mut response_stream = recorder
.scope(async {
client
.bidi_stream::<_, google::test::v1::EchoResponse>(
extensions,
http::uri::PathAndQuery::from_static("/google.test.v1.EchoService/Chat"),
request_stream,
RequestOptions::default(),
"test-only-api-client/1.0",
"name=test-only",
)
.await
})
.await?
.into_inner();

Expand Down Expand Up @@ -429,7 +436,6 @@ mod tests {
Ok(())
}

#[ignore]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn streaming_error() -> anyhow::Result<()> {
let (endpoint, _server) = start_echo_server().await?;
Expand All @@ -453,15 +459,23 @@ mod tests {
let (tx, rx) = tokio::sync::mpsc::channel(4);
let request_stream = tokio_stream::wrappers::ReceiverStream::new(rx);

let mut response_stream = client
.bidi_stream::<_, google::test::v1::EchoResponse>(
extensions,
http::uri::PathAndQuery::from_static("/google.test.v1.EchoService/Chat"),
request_stream,
RequestOptions::default(),
"test-only-api-client/1.0",
"name=test-only",
)
let recorder = RequestRecorder::new(*TEST_INFO);
recorder.on_client_request(
ClientRequestAttributes::default().set_url_template("/google.test.v1.EchoService/Chat"),
);
let mut response_stream = recorder
.scope(async {
client
.bidi_stream::<_, google::test::v1::EchoResponse>(
extensions,
http::uri::PathAndQuery::from_static("/google.test.v1.EchoService/Chat"),
request_stream,
RequestOptions::default(),
"test-only-api-client/1.0",
"name=test-only",
)
.await
})
.await?
.into_inner();

Expand Down Expand Up @@ -497,9 +511,8 @@ mod tests {
("server.address", expected_host.clone().into()),
("server.port", (expected_port as i64).into()),
("url.domain", expected_host.into()),
("otel.status_code", "ERROR".into()),
("rpc.response.status_code", "INVALID_ARGUMENT".into()), // INVALID_ARGUMENT = 3
("error.type", "INVALID_ARGUMENT".into()),
("otel.status_code", "UNSET".into()),
("rpc.response.status_code", "OK".into()),
]
.into_iter()
.map(|(k, v)| (k.to_string(), v))
Expand All @@ -510,6 +523,143 @@ mod tests {
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial_test::serial]
async fn streaming_metrics() -> anyhow::Result<()> {
use integration_tests_o11y::mock_collector::MockCollector;
use integration_tests_o11y::otlp::metrics::Builder as MeterProviderBuilder;
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;

let (endpoint, _server) = start_echo_server().await?;

let mock_collector = MockCollector::default();
let otlp_endpoint = mock_collector.start().await;

let meter_provider = MeterProviderBuilder::new("test-project", "integration-tests")
.with_endpoint(
otlp_endpoint
.parse::<http::Uri>()
.expect("Failed to parse URI"),
)
.with_credentials(google_cloud_auth::credentials::anonymous::Builder::new().build())
.build()
.await?;
opentelemetry::global::set_meter_provider(meter_provider.clone());

// Configure client with tracing enabled
let mut config = google_cloud_gax_internal::options::ClientConfig::default();
config.tracing = true;
config.cred = Some(test_credentials());

let client = grpc::Client::new_with_instrumentation(config, &endpoint, &TEST_INFO).await?;

// Send a streaming request
let extensions = {
let mut e = tonic::Extensions::new();
e.insert(tonic::GrpcMethod::new(
"google.test.v1.EchoServices",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No action required: EchoServices is used in all of our tests for the service name for GrpcMetho, but I think this maybe should actually be google.test.v1.EchoService (no s) which is the name we use in the paths, but it just doesn't have any affect on the test (I replaced all instances and reran the tests and it doesn't break anything).

"Chat",
));
e
};

let (tx, rx) = tokio::sync::mpsc::channel(4);
let request_stream = tokio_stream::wrappers::ReceiverStream::new(rx);

let recorder = RequestRecorder::new(*TEST_INFO);
recorder.on_client_request(
ClientRequestAttributes::default().set_url_template("/google.test.v1.EchoService/Chat"),
);
let mut response_stream = recorder
.scope(async {
client
.bidi_stream::<_, google::test::v1::EchoResponse>(
extensions,
http::uri::PathAndQuery::from_static("/google.test.v1.EchoService/Chat"),
request_stream,
RequestOptions::default(),
"test-only-api-client/1.0",
"name=test-only",
)
.await
})
.await?
.into_inner();

// Send a message
let request = google::test::v1::EchoRequest {
message: "test message".into(),
..Default::default()
};
tx.send(request).await?;

// Receive a response
let response = response_stream.next().await.expect("stream closed")?;
assert_eq!(response.message, "test message");

// Close the stream
drop(tx);
let next = response_stream.next().await;
assert!(next.is_none(), "{next:?}");

// Force flush metrics
let _ = meter_provider.force_flush();

// Verify metrics
let mut metrics_requests = mock_collector.metrics.lock().expect("never poisoned");
let mut found_duration_metric = false;
while let Some(req) = metrics_requests.pop() {
let req: tonic::Request<ExportMetricsServiceRequest> = req;
let (_, _, metrics_request) = req.into_parts();
for rm in metrics_request.resource_metrics {
for sm in rm.scope_metrics {
for m in sm.metrics {
if m.name.contains("gcp.client.attempt.duration") {
found_duration_metric = true;
if let Some(
opentelemetry_proto::tonic::metrics::v1::metric::Data::Histogram(h),
) = m.data
{
let point =
h.data_points.first().expect("should have a data point");
let mut metric_attributes = std::collections::HashMap::new();
for kv in &point.attributes {
metric_attributes
.insert(kv.key.clone(), kv.value.clone().unwrap());
}

let get_metric_string = |key: &str| -> Option<String> {
metric_attributes.get(key).and_then(|v| match &v.value {
Some(opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s)) => {
Some(s.clone())
}
_ => None,
})
};

assert_eq!(
get_metric_string("rpc.system.name").as_deref(),
Some("grpc")
);
assert_eq!(
get_metric_string("rpc.method").as_deref(),
Some("google.test.v1.EchoService/Chat")
);
assert_eq!(
get_metric_string("rpc.response.status_code").as_deref(),
Some("OK")
);
}
}
}
}
}
}
assert!(found_duration_metric, "Should have found duration metric");

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn cancellation() -> anyhow::Result<()> {
use std::time::Duration;
Expand Down
1 change: 0 additions & 1 deletion tests/o11y/tests/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
mod storage {
use google_cloud_test_utils::errors::anydump;

#[ignore]
#[tokio::test(flavor = "multi_thread")]
async fn run() -> anyhow::Result<()> {
integration_tests_o11y::e2e::storage::run()
Expand Down
Loading