From 1c9aa391324441f9be53b71bd0ee2207c56ba465 Mon Sep 17 00:00:00 2001 From: Sam Mathis Date: Wed, 22 Apr 2026 14:24:12 -0700 Subject: [PATCH] Adds nexus_caller and nexus_caller_type tags --- common/metrics/metric_defs.go | 6 ++++ common/metrics/tags.go | 14 ++++++++ service/frontend/nexus_handler.go | 15 ++++++-- tests/nexus_api_test.go | 60 ++++++++++++++++++++++++++++--- 4 files changed, 88 insertions(+), 7 deletions(-) diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index 930b6bcd52c..5b0e176e40c 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -21,6 +21,8 @@ const ( nexusEndpointTagName = "nexus_endpoint" nexusServiceTagName = "nexus_service" nexusOperationTagName = "nexus_operation" + nexusCallerTagName = "nexus_caller" + nexusCallerTypeTagName = "nexus_caller_type" outcomeTagName = "outcome" versionedTagName = "versioned" resourceExhaustedTag = "resource_exhausted_cause" @@ -733,6 +735,10 @@ var ( "nexus_request_preprocess_errors", WithDescription("The number of Nexus requests for which pre-processing failed."), ) + NexusRequestErrors = NewCounterDef( + "nexus_request_error_count", + WithDescription("The number of Nexus requests that resulted in errors."), + ) NexusLatency = NewTimerDef( "nexus_latency", WithDescription("Latency of Nexus requests."), diff --git a/common/metrics/tags.go b/common/metrics/tags.go index a165bd57a4c..6a0c13ae2ff 100644 --- a/common/metrics/tags.go +++ b/common/metrics/tags.go @@ -395,6 +395,20 @@ func NexusOperationTag(value string) Tag { return Tag{Key: nexusOperationTagName, Value: value} } +func NexusCallerTag(value string) Tag { + if len(value) == 0 { + value = unknownValue + } + return Tag{Key: nexusCallerTagName, Value: value} +} + +func NexusCallerTypeTag(value string) Tag { + if len(value) == 0 { + value = unknownValue + } + return Tag{Key: nexusCallerTypeTagName, Value: value} +} + // HttpStatusTag returns a new httpStatusTag. func HttpStatusTag(value int) Tag { return Tag{Key: httpStatusTagName, Value: strconv.Itoa(value)} diff --git a/service/frontend/nexus_handler.go b/service/frontend/nexus_handler.go index e04dbfb6789..c07ebc3f322 100644 --- a/service/frontend/nexus_handler.go +++ b/service/frontend/nexus_handler.go @@ -106,6 +106,9 @@ func (c *operationContext) capturePanicAndRecordMetrics(ctxPtr *context.Context, // Record Nexus-specific metrics metrics.NexusRequests.With(c.metricsHandler).Record(1) metrics.NexusLatency.With(c.metricsHandler).Record(time.Since(c.requestStartTime)) + if *errPtr != nil { + metrics.NexusRequestErrors.With(c.metricsHandler).Record(1) + } // Record general telemetry metrics metrics.ServiceRequests.With(c.metricsHandlerForInterceptors).Record(1) @@ -282,7 +285,7 @@ func (c *operationContext) shouldForwardRequest(ctx context.Context, header nexu } // enrichNexusOperationMetrics enhances metrics with additional Nexus operation context based on configuration. -func (c *operationContext) enrichNexusOperationMetrics(service, operation string, requestHeader nexus.Header) { +func (c *operationContext) enrichNexusOperationMetrics(ctx context.Context, service, operation string, requestHeader nexus.Header) { conf := c.metricTagConfig() var tags []metrics.Tag @@ -299,6 +302,12 @@ func (c *operationContext) enrichNexusOperationMetrics(service, operation string tags = append(tags, metrics.StringTag(mapping.TargetTag, requestHeader.Get(mapping.SourceHeader))) } + // Add caller tags from the context. + callerInfo := headers.GetCallerInfo(ctx) + tags = append(tags, metrics.NexusCallerTag(callerInfo.CallerName)) + // Namespace is the only valid caller_type. To be updated when there are more kinds + tags = append(tags, metrics.NexusCallerTypeTag("namespace")) + if len(tags) > 0 { c.metricsHandler = c.metricsHandler.WithTags(tags...) } @@ -392,7 +401,7 @@ func (h *nexusHandler) StartOperation( return nil, err } ctx = oc.augmentContext(ctx, options.Header) - oc.enrichNexusOperationMetrics(service, operation, options.Header) + oc.enrichNexusOperationMetrics(ctx, service, operation, options.Header) defer oc.capturePanicAndRecordMetrics(&ctx, &retErr) var links []*nexuspb.Link @@ -630,7 +639,7 @@ func (h *nexusHandler) CancelOperation(ctx context.Context, service, operation, return err } ctx = oc.augmentContext(ctx, options.Header) - oc.enrichNexusOperationMetrics(service, operation, options.Header) + oc.enrichNexusOperationMetrics(ctx, service, operation, options.Header) defer oc.capturePanicAndRecordMetrics(&ctx, &retErr) request := oc.matchingRequest(&nexuspb.Request{ diff --git a/tests/nexus_api_test.go b/tests/nexus_api_test.go index e45fddfdf30..85a810c323c 100644 --- a/tests/nexus_api_test.go +++ b/tests/nexus_api_test.go @@ -317,16 +317,42 @@ func (s *NexusApiTestSuite) TestNexusStartOperation_Outcomes(useTemporalFailures requests := capture.Metric("nexus_requests") s.Len(requests, 1) - s.Subset(requests[0].Tags, map[string]string{"namespace": env.Namespace().String(), "method": "StartNexusOperation", "outcome": tc.outcome}) + s.Subset(requests[0].Tags, map[string]string{ + "namespace": env.Namespace().String(), + "method": "StartNexusOperation", + "outcome": tc.outcome, + "nexus_caller": env.Namespace().String(), + "nexus_caller_type": "namespace", + }) s.Contains(requests[0].Tags, "nexus_endpoint") s.Equal(int64(1), requests[0].Value) s.Equal(metrics.MetricUnit(""), requests[0].Unit) latency := capture.Metric("nexus_latency") s.Len(latency, 1) - s.Subset(latency[0].Tags, map[string]string{"namespace": env.Namespace().String(), "method": "StartNexusOperation", "outcome": tc.outcome}) + s.Subset(latency[0].Tags, map[string]string{ + "namespace": env.Namespace().String(), + "method": "StartNexusOperation", + "outcome": tc.outcome, + "nexus_caller": env.Namespace().String(), + "nexus_caller_type": "namespace", + }) s.Contains(latency[0].Tags, "nexus_endpoint") + // Verify error counter is emitted for error outcomes and absent for success. + errorRequests := capture.Metric("nexus_request_error_count") + if tc.outcome == "sync_success" || tc.outcome == "async_success" { + s.Empty(errorRequests) + } else { + s.Len(errorRequests, 1) + s.Subset(errorRequests[0].Tags, map[string]string{ + "namespace": env.Namespace().String(), + "method": "StartNexusOperation", + "outcome": tc.outcome, + }) + s.Equal(int64(1), errorRequests[0].Value) + } + // Ensure that StartOperation request is tracked as part of normal service telemetry metrics s.Condition(func() bool { for _, m := range capture.Metric("service_requests") { @@ -576,16 +602,42 @@ func (s *NexusApiTestSuite) TestNexusCancelOperation_Outcomes(useTemporalFailure requests := capture.Metric("nexus_requests") s.Len(requests, 1) - s.Subset(requests[0].Tags, map[string]string{"namespace": env.Namespace().String(), "method": "CancelNexusOperation", "outcome": tc.outcome}) + s.Subset(requests[0].Tags, map[string]string{ + "namespace": env.Namespace().String(), + "method": "CancelNexusOperation", + "outcome": tc.outcome, + "nexus_caller": env.Namespace().String(), + "nexus_caller_type": "namespace", + }) s.Contains(requests[0].Tags, "nexus_endpoint") s.Equal(int64(1), requests[0].Value) s.Equal(metrics.MetricUnit(""), requests[0].Unit) latency := capture.Metric("nexus_latency") s.Len(latency, 1) - s.Subset(latency[0].Tags, map[string]string{"namespace": env.Namespace().String(), "method": "CancelNexusOperation", "outcome": tc.outcome}) + s.Subset(latency[0].Tags, map[string]string{ + "namespace": env.Namespace().String(), + "method": "CancelNexusOperation", + "outcome": tc.outcome, + "nexus_caller": env.Namespace().String(), + "nexus_caller_type": "namespace", + }) s.Contains(latency[0].Tags, "nexus_endpoint") + // Verify error counter is emitted for error outcomes and absent for success. + errorRequests := capture.Metric("nexus_request_error_count") + if tc.outcome == "success" { + s.Empty(errorRequests) + } else { + s.Len(errorRequests, 1) + s.Subset(errorRequests[0].Tags, map[string]string{ + "namespace": env.Namespace().String(), + "method": "CancelNexusOperation", + "outcome": tc.outcome, + }) + s.Equal(int64(1), errorRequests[0].Value) + } + // Ensure that CancelOperation request is tracked as part of normal service telemetry metrics s.Condition(func() bool { for _, m := range capture.Metric("service_requests") {