Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ const (
nexusEndpointTagName = "nexus_endpoint"
nexusServiceTagName = "nexus_service"
nexusOperationTagName = "nexus_operation"
nexusCallerTagName = "nexus_caller"
nexusCallerTypeTagName = "nexus_caller_type"
outcomeTagName = "outcome"
versionedTagName = "versioned"
resourceExhaustedTag = "resource_exhausted_cause"
Expand Down Expand Up @@ -733,6 +735,10 @@ var (
"nexus_request_preprocess_errors",
WithDescription("The number of Nexus requests for which pre-processing failed."),
)
NexusRequestErrors = NewCounterDef(
"nexus_request_error_count",
WithDescription("The number of Nexus requests that resulted in errors."),
)
NexusLatency = NewTimerDef(
"nexus_latency",
WithDescription("Latency of Nexus requests."),
Expand Down
14 changes: 14 additions & 0 deletions common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,20 @@ func NexusOperationTag(value string) Tag {
return Tag{Key: nexusOperationTagName, Value: value}
}

func NexusCallerTag(value string) Tag {
if len(value) == 0 {
value = unknownValue
}
return Tag{Key: nexusCallerTagName, Value: value}
}

func NexusCallerTypeTag(value string) Tag {
if len(value) == 0 {
value = unknownValue
}
return Tag{Key: nexusCallerTypeTagName, Value: value}
}

// HttpStatusTag returns a new httpStatusTag.
func HttpStatusTag(value int) Tag {
return Tag{Key: httpStatusTagName, Value: strconv.Itoa(value)}
Expand Down
15 changes: 12 additions & 3 deletions service/frontend/nexus_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ func (c *operationContext) capturePanicAndRecordMetrics(ctxPtr *context.Context,
// Record Nexus-specific metrics
metrics.NexusRequests.With(c.metricsHandler).Record(1)
metrics.NexusLatency.With(c.metricsHandler).Record(time.Since(c.requestStartTime))
if *errPtr != nil {
metrics.NexusRequestErrors.With(c.metricsHandler).Record(1)
}

// Record general telemetry metrics
metrics.ServiceRequests.With(c.metricsHandlerForInterceptors).Record(1)
Expand Down Expand Up @@ -282,7 +285,7 @@ func (c *operationContext) shouldForwardRequest(ctx context.Context, header nexu
}

// enrichNexusOperationMetrics enhances metrics with additional Nexus operation context based on configuration.
func (c *operationContext) enrichNexusOperationMetrics(service, operation string, requestHeader nexus.Header) {
func (c *operationContext) enrichNexusOperationMetrics(ctx context.Context, service, operation string, requestHeader nexus.Header) {
conf := c.metricTagConfig()

var tags []metrics.Tag
Expand All @@ -299,6 +302,12 @@ func (c *operationContext) enrichNexusOperationMetrics(service, operation string
tags = append(tags, metrics.StringTag(mapping.TargetTag, requestHeader.Get(mapping.SourceHeader)))
}

// Add caller tags from the context.
callerInfo := headers.GetCallerInfo(ctx)
tags = append(tags, metrics.NexusCallerTag(callerInfo.CallerName))
// Namespace is the only valid caller_type. To be updated when there are more kinds
tags = append(tags, metrics.NexusCallerTypeTag("namespace"))
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if there's a better way to do this. callerInfo.CallerType seems to be something else (which is maybe a hint that using callerInfo at all is wrong here?).


if len(tags) > 0 {
c.metricsHandler = c.metricsHandler.WithTags(tags...)
}
Expand Down Expand Up @@ -392,7 +401,7 @@ func (h *nexusHandler) StartOperation(
return nil, err
}
ctx = oc.augmentContext(ctx, options.Header)
oc.enrichNexusOperationMetrics(service, operation, options.Header)
oc.enrichNexusOperationMetrics(ctx, service, operation, options.Header)
defer oc.capturePanicAndRecordMetrics(&ctx, &retErr)

var links []*nexuspb.Link
Expand Down Expand Up @@ -630,7 +639,7 @@ func (h *nexusHandler) CancelOperation(ctx context.Context, service, operation,
return err
}
ctx = oc.augmentContext(ctx, options.Header)
oc.enrichNexusOperationMetrics(service, operation, options.Header)
oc.enrichNexusOperationMetrics(ctx, service, operation, options.Header)
defer oc.capturePanicAndRecordMetrics(&ctx, &retErr)

request := oc.matchingRequest(&nexuspb.Request{
Expand Down
60 changes: 56 additions & 4 deletions tests/nexus_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,16 +317,42 @@ func (s *NexusApiTestSuite) TestNexusStartOperation_Outcomes(useTemporalFailures

requests := capture.Metric("nexus_requests")
s.Len(requests, 1)
s.Subset(requests[0].Tags, map[string]string{"namespace": env.Namespace().String(), "method": "StartNexusOperation", "outcome": tc.outcome})
s.Subset(requests[0].Tags, map[string]string{
"namespace": env.Namespace().String(),
"method": "StartNexusOperation",
"outcome": tc.outcome,
"nexus_caller": env.Namespace().String(),
"nexus_caller_type": "namespace",
})
s.Contains(requests[0].Tags, "nexus_endpoint")
s.Equal(int64(1), requests[0].Value)
s.Equal(metrics.MetricUnit(""), requests[0].Unit)

latency := capture.Metric("nexus_latency")
s.Len(latency, 1)
s.Subset(latency[0].Tags, map[string]string{"namespace": env.Namespace().String(), "method": "StartNexusOperation", "outcome": tc.outcome})
s.Subset(latency[0].Tags, map[string]string{
"namespace": env.Namespace().String(),
"method": "StartNexusOperation",
"outcome": tc.outcome,
"nexus_caller": env.Namespace().String(),
"nexus_caller_type": "namespace",
})
s.Contains(latency[0].Tags, "nexus_endpoint")

// Verify error counter is emitted for error outcomes and absent for success.
errorRequests := capture.Metric("nexus_request_error_count")
if tc.outcome == "sync_success" || tc.outcome == "async_success" {
s.Empty(errorRequests)
} else {
s.Len(errorRequests, 1)
s.Subset(errorRequests[0].Tags, map[string]string{
"namespace": env.Namespace().String(),
"method": "StartNexusOperation",
"outcome": tc.outcome,
})
s.Equal(int64(1), errorRequests[0].Value)
}

// Ensure that StartOperation request is tracked as part of normal service telemetry metrics
s.Condition(func() bool {
for _, m := range capture.Metric("service_requests") {
Expand Down Expand Up @@ -576,16 +602,42 @@ func (s *NexusApiTestSuite) TestNexusCancelOperation_Outcomes(useTemporalFailure

requests := capture.Metric("nexus_requests")
s.Len(requests, 1)
s.Subset(requests[0].Tags, map[string]string{"namespace": env.Namespace().String(), "method": "CancelNexusOperation", "outcome": tc.outcome})
s.Subset(requests[0].Tags, map[string]string{
"namespace": env.Namespace().String(),
"method": "CancelNexusOperation",
"outcome": tc.outcome,
"nexus_caller": env.Namespace().String(),
"nexus_caller_type": "namespace",
})
s.Contains(requests[0].Tags, "nexus_endpoint")
s.Equal(int64(1), requests[0].Value)
s.Equal(metrics.MetricUnit(""), requests[0].Unit)

latency := capture.Metric("nexus_latency")
s.Len(latency, 1)
s.Subset(latency[0].Tags, map[string]string{"namespace": env.Namespace().String(), "method": "CancelNexusOperation", "outcome": tc.outcome})
s.Subset(latency[0].Tags, map[string]string{
"namespace": env.Namespace().String(),
"method": "CancelNexusOperation",
"outcome": tc.outcome,
"nexus_caller": env.Namespace().String(),
"nexus_caller_type": "namespace",
})
s.Contains(latency[0].Tags, "nexus_endpoint")

// Verify error counter is emitted for error outcomes and absent for success.
errorRequests := capture.Metric("nexus_request_error_count")
if tc.outcome == "success" {
s.Empty(errorRequests)
} else {
s.Len(errorRequests, 1)
s.Subset(errorRequests[0].Tags, map[string]string{
"namespace": env.Namespace().String(),
"method": "CancelNexusOperation",
"outcome": tc.outcome,
})
s.Equal(int64(1), errorRequests[0].Value)
}

// Ensure that CancelOperation request is tracked as part of normal service telemetry metrics
s.Condition(func() bool {
for _, m := range capture.Metric("service_requests") {
Expand Down
Loading