From 1896db343c2d8ba447a50f8d006305c437e154c1 Mon Sep 17 00:00:00 2001 From: Sam Mathis Date: Fri, 24 Apr 2026 15:50:59 -0700 Subject: [PATCH 1/6] Add caller-side operation state machine metrics --- chasm/lib/nexusoperation/library.go | 30 +++++- chasm/lib/nexusoperation/metrics.go | 36 +++++++ chasm/lib/nexusoperation/operation.go | 101 +++++++++++++++++- .../nexusoperation/operation_statemachine.go | 39 ++++++- .../operation_statemachine_test.go | 52 +++++++++ .../nexusoperation/operation_tasks_test.go | 43 ++++++++ chasm/lib/nexusoperation/operation_test.go | 17 +++ .../nexusoperation/tasks_test_helpers_test.go | 4 + .../nexusoperation/workflow/commands_test.go | 6 ++ chasm/lib/nexusoperation/workflow/events.go | 5 +- chasm/lib/workflow/workflow.go | 4 + chasm/ms_pointer.go | 5 + components/nexusoperations/config.go | 4 +- 13 files changed, 338 insertions(+), 8 deletions(-) diff --git a/chasm/lib/nexusoperation/library.go b/chasm/lib/nexusoperation/library.go index 04966174551..c74e6499969 100644 --- a/chasm/lib/nexusoperation/library.go +++ b/chasm/lib/nexusoperation/library.go @@ -3,17 +3,35 @@ package nexusoperation import ( "go.temporal.io/server/chasm" nexusoperationpb "go.temporal.io/server/chasm/lib/nexusoperation/gen/nexusoperationpb/v1" + "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/namespace" "google.golang.org/grpc" ) +type operationContextKeyType struct{} + +// OperationContextKey is the context key for OperationContext, registered as a CHASM component +// context value. Exported for use in tests that need to set up MockContext. +var OperationContextKey = operationContextKeyType{} + +// OperationContext holds dependencies injected into the chasm.Context for use by Operation methods. +type OperationContext struct { + MetricTagConfig dynamicconfig.TypedPropertyFn[NexusMetricTagConfig] +} + // componentOnlyLibrary registers just the components without task executors or gRPC handlers. // Used in the frontend to enable component ref serialization. type componentOnlyLibrary struct { chasm.UnimplementedLibrary + namespaceRegistry namespace.Registry + metricTagConfig dynamicconfig.TypedPropertyFn[NexusMetricTagConfig] } -func newComponentOnlyLibrary() *componentOnlyLibrary { - return &componentOnlyLibrary{} +func newComponentOnlyLibrary(namespaceRegistry namespace.Registry, dc *dynamicconfig.Collection) *componentOnlyLibrary { + return &componentOnlyLibrary{ + namespaceRegistry: namespaceRegistry, + metricTagConfig: MetricTagConfiguration.Get(dc), + } } func (l *componentOnlyLibrary) Name() string { @@ -32,6 +50,11 @@ func (l *componentOnlyLibrary) Components() []*chasm.RegistrableComponent { StatusSearchAttribute, ), chasm.WithBusinessIDAlias("OperationId"), + chasm.WithContextValues(map[any]any{ + OperationContextKey: &OperationContext{ + MetricTagConfig: l.metricTagConfig, + }, + }), ), chasm.NewRegistrableComponent[*Cancellation]("cancellation"), } @@ -61,8 +84,11 @@ func newLibrary( operationStartToCloseTimeoutTaskHandler *operationStartToCloseTimeoutTaskHandler, cancellationInvocationTaskHandler *cancellationInvocationTaskHandler, cancellationBackoffTaskHandler *cancellationBackoffTaskHandler, + namespaceRegistry namespace.Registry, + dc *dynamicconfig.Collection, ) *Library { return &Library{ + componentOnlyLibrary: *newComponentOnlyLibrary(namespaceRegistry, dc), handler: handler, operationBackoffTaskHandler: operationBackoffTaskHandler, operationInvocationTaskHandler: operationInvocationTaskHandler, diff --git a/chasm/lib/nexusoperation/metrics.go b/chasm/lib/nexusoperation/metrics.go index 9b68d5c4422..fac5ee4f7f2 100644 --- a/chasm/lib/nexusoperation/metrics.go +++ b/chasm/lib/nexusoperation/metrics.go @@ -12,7 +12,43 @@ var OutboundRequestLatency = metrics.NewTimerDef( "nexus_outbound_latency", metrics.WithDescription("Latency of outbound Nexus requests made by the history service."), ) +var NexusOperationSuccessCount = metrics.NewCounterDef( + "nexus_operation_success_count", + metrics.WithDescription("Nexus Operations successfully completed."), +) +var NexusOperationFailedCount = metrics.NewCounterDef( + "nexus_operation_failed_count", + metrics.WithDescription("Nexus Operations failures."), +) +var NexusOperationCancelCount = metrics.NewCounterDef( + "nexus_operation_cancel_count", + metrics.WithDescription("Nexus Operations cancellations."), +) +var NexusOperationTerminateCount = metrics.NewCounterDef( + "nexus_operation_terminate_count", + metrics.WithDescription("Nexus Operations that were terminated before completion."), +) +var NexusOperationTimeoutCount = metrics.NewCounterDef( + "nexus_operation_timeout_count", + metrics.WithDescription("Nexus Operations that timed out before completion."), +) + +var NexusOperationScheduleToCloseLatency = metrics.NewTimerDef( + "nexus_operation_schedule_to_close_latency", + metrics.WithDescription("Duration from Nexus Operation scheduled time to terminal state."), +) +var NexusOperationScheduleToStartLatency = metrics.NewTimerDef( + "nexus_operation_schedule_to_start_latency", + metrics.WithDescription("Duration from Nexus Operation scheduled time to started time."), +) +var NexusOperationStartToCloseLatency = metrics.NewTimerDef( + "nexus_operation_start_to_close_latency", + metrics.WithDescription("Duration from Nexus Operation scheduled time to completed time. Only emitted for async operations."), +) +// NexusMetricTagConfig controls which optional metric tags are included with Nexus operation +// metrics. Enabling service or operation tags applies to both caller and handler metrics. HeaderTagMappings is +// not used for caller metrics. type NexusMetricTagConfig struct { // Include service name as a metric tag IncludeServiceTag bool diff --git a/chasm/lib/nexusoperation/operation.go b/chasm/lib/nexusoperation/operation.go index 1725e5fd5b3..bcd4b7e1347 100644 --- a/chasm/lib/nexusoperation/operation.go +++ b/chasm/lib/nexusoperation/operation.go @@ -2,6 +2,7 @@ package nexusoperation import ( "fmt" + "strings" "time" "github.com/google/uuid" @@ -16,6 +17,7 @@ import ( "go.temporal.io/server/chasm" nexusoperationpb "go.temporal.io/server/chasm/lib/nexusoperation/gen/nexusoperationpb/v1" "go.temporal.io/server/common/backoff" + "go.temporal.io/server/common/metrics" commonnexus "go.temporal.io/server/common/nexus" "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/common/softassert" @@ -44,6 +46,12 @@ var ErrCancellationAlreadyRequested = serviceerror.NewFailedPrecondition("cancel // ErrOperationAlreadyCompleted is returned when trying to cancel an operation that has already completed. var ErrOperationAlreadyCompleted = serviceerror.NewFailedPrecondition("operation already completed") +const ( + // WorkflowTypeTag is a required workflow tag for standalone operations to ensure consistent + // metric labeling between workflows and activities. + WorkflowTypeTag = "__temporal_standalone_nexus_operation__" +) + // InvocationData contains data needed to invoke a Nexus operation. type InvocationData struct { // Input is the operation input payload. @@ -69,6 +77,7 @@ type OperationStore interface { OnNexusOperationCancellationFailed(ctx chasm.MutableContext, operation *Operation, cause *failurepb.Failure) error // NexusOperationInvocationData loads invocation data (Input, Header, NexusLinks) from the scheduled history event. NexusOperationInvocationData(ctx chasm.Context, operation *Operation) (InvocationData, error) + WorkflowTypeTag() string } // Operation is a CHASM component that represents a Nexus operation. @@ -231,9 +240,11 @@ func (o *Operation) onTimedOut(ctx chasm.MutableContext, cause *failurepb.Failur if store, ok := o.Store.TryGet(ctx); ok { return store.OnNexusOperationTimedOut(ctx, o, cause, fromAttempt) } + timeoutType := cause.GetTimeoutFailureInfo().GetTimeoutType() return TransitionTimedOut.Apply(o, ctx, EventTimedOut{ Failure: cause, FromAttempt: fromAttempt, + TimeoutType: timeoutType, }) } @@ -398,7 +409,10 @@ func (o *Operation) Terminate( } return chasm.TerminateComponentResponse{}, nil } - return chasm.TerminateComponentResponse{}, TransitionTerminated.Apply(o, ctx, EventTerminated{TerminateComponentRequest: req}) + + return chasm.TerminateComponentResponse{}, TransitionTerminated.Apply(o, ctx, EventTerminated{ + TerminateComponentRequest: req, + }) } func (o *Operation) SearchAttributes(_ chasm.Context) []chasm.SearchAttributeKeyValue { @@ -551,6 +565,91 @@ func (o *Operation) buildExecutionInfo(ctx chasm.Context) *nexuspb.NexusOperatio return info } +// EnrichMetricsHandler returns a metrics handler enriched with nexus operation tags. +// Panics if the context value is missing, which indicates a library registration bug. +func (o *Operation) enrichMetricsHandler(ctx chasm.Context) (metrics.Handler, error) { + // nolint:revive // unchecked-type-assertion: intentional panic on missing context value + namespaceName := ctx.NamespaceEntry().Name().String() + + wftt := WorkflowTypeTag + if store, ok := o.Store.TryGet(ctx); ok { + wftt = store.WorkflowTypeTag() + } + tags := []metrics.Tag{ + metrics.NamespaceTag(namespaceName), + metrics.NexusEndpointTag(o.GetEndpoint()), + metrics.WorkflowTypeTag(wftt), + } + + opCtx := ctx.Value(OperationContextKey).(*OperationContext) + conf := opCtx.MetricTagConfig() + if conf.IncludeServiceTag { + tags = append(tags, metrics.NexusServiceTag(o.GetService())) + } + if conf.IncludeOperationTag { + tags = append(tags, metrics.NexusOperationTag(o.GetOperation())) + } + + return ctx.MetricsHandler().WithTags(tags...), nil +} + +func (o *Operation) emitOnSucceededMetrics(handler metrics.Handler, closeTime time.Time) { + outcomeTag := metrics.OutcomeTag( + strings.ToLower(nexusoperationpb.OPERATION_STATUS_SUCCEEDED.String()), + ) + NexusOperationSuccessCount.With(handler).Record(1) + o.emitLatencyMetrics(handler, closeTime, outcomeTag) +} + +func (o *Operation) emitOnFailedMetrics(handler metrics.Handler, closeTime time.Time) { + outcomeTag := metrics.OutcomeTag( + strings.ToLower(nexusoperationpb.OPERATION_STATUS_FAILED.String()), + ) + NexusOperationFailedCount.With(handler).Record(1) + o.emitLatencyMetrics(handler, closeTime, outcomeTag) +} + +func (o *Operation) emitOnCanceledMetrics(handler metrics.Handler, closeTime time.Time) { + outcomeTag := metrics.OutcomeTag( + strings.ToLower(nexusoperationpb.OPERATION_STATUS_CANCELED.String()), + ) + NexusOperationCancelCount.With(handler).Record(1) + o.emitLatencyMetrics(handler, closeTime, outcomeTag) +} + +func (o *Operation) emitOnTimedOutMetrics(handler metrics.Handler, closeTime time.Time, timeoutType string) { + outcomeTag := metrics.OutcomeTag( + strings.ToLower(nexusoperationpb.OPERATION_STATUS_TIMED_OUT.String()), + ) + NexusOperationTimeoutCount.With(handler).Record(1, metrics.StringTag("timeout_type", timeoutType)) + o.emitLatencyMetrics(handler, closeTime, outcomeTag) +} + +func (o *Operation) emitOnTerminatedMetrics(handler metrics.Handler, closeTime time.Time) { + outcomeTag := metrics.OutcomeTag( + strings.ToLower(nexusoperationpb.OPERATION_STATUS_TERMINATED.String()), + ) + NexusOperationTerminateCount.With(handler).Record(1) + o.emitLatencyMetrics(handler, closeTime, outcomeTag) +} + +// emitLatencyMetrics emits schedule-to-close, schedule-to-start, and start-to-close latencies. +func (o *Operation) emitLatencyMetrics(handler metrics.Handler, closeTime time.Time, outcomeTag metrics.Tag) { + scheduledTime := o.GetScheduledTime().AsTime() + NexusOperationScheduleToCloseLatency.With(handler).Record(closeTime.Sub(scheduledTime), outcomeTag) + + startedTime := o.GetStartedTime() + if startedTime != nil { + // Async operation that was started. + // Schedule-to-start latency is emitted in TransitionStarted. + NexusOperationStartToCloseLatency.With(handler).Record(closeTime.Sub(startedTime.AsTime()), outcomeTag) + } else { + // Sync operation or operation that never started. + // For sync ops, schedule-to-start equals schedule-to-close. + NexusOperationScheduleToStartLatency.With(handler).Record(closeTime.Sub(scheduledTime), outcomeTag) + } +} + func (o *Operation) closeTime(ctx chasm.Context) *timestamppb.Timestamp { if !o.LifecycleState(ctx).IsClosed() { return nil diff --git a/chasm/lib/nexusoperation/operation_statemachine.go b/chasm/lib/nexusoperation/operation_statemachine.go index f82290c2098..1b22c36040c 100644 --- a/chasm/lib/nexusoperation/operation_statemachine.go +++ b/chasm/lib/nexusoperation/operation_statemachine.go @@ -4,6 +4,7 @@ import ( "time" commonpb "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" failurepb "go.temporal.io/api/failure/v1" "go.temporal.io/server/chasm" "go.temporal.io/server/chasm/lib/nexusoperation/gen/nexusoperationpb/v1" @@ -135,6 +136,13 @@ var TransitionStarted = chasm.NewTransition( // Store the operation token for async completion. o.OperationToken = event.OperationToken + // Emit schedule-to-start latency on the transition to started. + metricsHandler, err := o.enrichMetricsHandler(ctx) + if err != nil { + return err + } + NexusOperationScheduleToStartLatency.With(metricsHandler).Record(startTime.Sub(o.GetScheduledTime().AsTime())) + // Emit a start-to-close timeout task if configured. if o.StartToCloseTimeout != nil && o.StartToCloseTimeout.AsDuration() != 0 { deadline := startTime.Add(o.StartToCloseTimeout.AsDuration()) @@ -185,6 +193,11 @@ var TransitionSucceeded = chasm.NewTransition( Successful: &nexusoperationpb.OperationOutcome_Successful{Result: event.Result}, } + metricsHandler, err := o.enrichMetricsHandler(ctx) + if err != nil { + return err + } + o.emitOnSucceededMetrics(metricsHandler, closeTime) // Terminal state - no tasks to emit. return nil }, @@ -210,8 +223,13 @@ var TransitionFailed = chasm.NewTransition( if event.CompleteTime != nil { closeTime = *event.CompleteTime } + metricsHandler, err := o.enrichMetricsHandler(ctx) + if err != nil { + return err + } // Attempts only execute in SCHEDULED, so that status identifies attempt-originated failures. fromAttempt := o.GetStatus() == nexusoperationpb.OPERATION_STATUS_SCHEDULED + o.emitOnFailedMetrics(metricsHandler, closeTime) return o.resolveUnsuccessfully(ctx, event.Failure, closeTime, fromAttempt) }, ) @@ -236,7 +254,12 @@ var TransitionCanceled = chasm.NewTransition( if event.CompleteTime != nil { closeTime = *event.CompleteTime } + metricsHandler, err := o.enrichMetricsHandler(ctx) + if err != nil { + return err + } // Attempts only execute in SCHEDULED, so that status identifies attempt-originated cancels. + o.emitOnCanceledMetrics(metricsHandler, closeTime) fromAttempt := o.GetStatus() == nexusoperationpb.OPERATION_STATUS_SCHEDULED return o.resolveUnsuccessfully(ctx, event.Failure, closeTime, fromAttempt) }, @@ -268,15 +291,21 @@ var TransitionTerminated = chasm.NewTransition( }, }, } + metricsHandler, err := o.enrichMetricsHandler(ctx) + if err != nil { + return err + } + o.emitOnTerminatedMetrics(metricsHandler, closeTime) return o.resolveUnsuccessfully(ctx, failure, closeTime, false) }, ) -// EventTimedOut is triggered when the schedule-to-close timeout is triggered for an operation. +// EventTimedOut is triggered when a timeout is triggered for an operation. type EventTimedOut struct { Failure *failurepb.Failure // FromAttempt is true when the failure came from an invocation attempt. FromAttempt bool + TimeoutType enumspb.TimeoutType } var TransitionTimedOut = chasm.NewTransition( @@ -287,6 +316,12 @@ var TransitionTimedOut = chasm.NewTransition( }, nexusoperationpb.OPERATION_STATUS_TIMED_OUT, func(o *Operation, ctx chasm.MutableContext, event EventTimedOut) error { - return o.resolveUnsuccessfully(ctx, event.Failure, ctx.Now(o), event.FromAttempt) + closeTime := ctx.Now(o) + metricsHandler, err := o.enrichMetricsHandler(ctx) + if err != nil { + return err + } + o.emitOnTimedOutMetrics(metricsHandler, closeTime, event.TimeoutType.String()) + return o.resolveUnsuccessfully(ctx, event.Failure, closeTime, event.FromAttempt) }, ) diff --git a/chasm/lib/nexusoperation/operation_statemachine_test.go b/chasm/lib/nexusoperation/operation_statemachine_test.go index dd07b11349c..692580490a0 100644 --- a/chasm/lib/nexusoperation/operation_statemachine_test.go +++ b/chasm/lib/nexusoperation/operation_statemachine_test.go @@ -1,15 +1,19 @@ package nexusoperation import ( + "context" "testing" "time" "github.com/stretchr/testify/require" commonpb "go.temporal.io/api/common/v1" failurepb "go.temporal.io/api/failure/v1" + persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/chasm" "go.temporal.io/server/chasm/lib/nexusoperation/gen/nexusoperationpb/v1" "go.temporal.io/server/common/backoff" + "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/testing/protorequire" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/durationpb" @@ -295,6 +299,14 @@ func TestTransitionStarted(t *testing.T) { ctx := &chasm.MockMutableContext{ MockContext: chasm.MockContext{ HandleNow: func(chasm.Component) time.Time { return defaultTime }, + HandleNamespaceEntry: func() *namespace.Namespace { + return namespace.NewNamespaceForTest( + &persistencespb.NamespaceInfo{Name: "ns-name"}, nil, false, nil, 0, + ) + }, + GoCtx: context.WithValue(context.Background(), OperationContextKey, &OperationContext{ + MetricTagConfig: dynamicconfig.GetTypedPropertyFn(NexusMetricTagConfig{}), + }), }, } @@ -357,6 +369,14 @@ func TestTransitionSucceeded(t *testing.T) { ctx := &chasm.MockMutableContext{ MockContext: chasm.MockContext{ HandleNow: func(chasm.Component) time.Time { return defaultTime }, + HandleNamespaceEntry: func() *namespace.Namespace { + return namespace.NewNamespaceForTest( + &persistencespb.NamespaceInfo{Name: "ns-name"}, nil, false, nil, 0, + ) + }, + GoCtx: context.WithValue(context.Background(), OperationContextKey, &OperationContext{ + MetricTagConfig: dynamicconfig.GetTypedPropertyFn(NexusMetricTagConfig{}), + }), }, } @@ -430,6 +450,14 @@ func TestTransitionFailed(t *testing.T) { ctx := &chasm.MockMutableContext{ MockContext: chasm.MockContext{ HandleNow: func(chasm.Component) time.Time { return defaultTime }, + HandleNamespaceEntry: func() *namespace.Namespace { + return namespace.NewNamespaceForTest( + &persistencespb.NamespaceInfo{Name: "ns-name"}, nil, false, nil, 0, + ) + }, + GoCtx: context.WithValue(context.Background(), OperationContextKey, &OperationContext{ + MetricTagConfig: dynamicconfig.GetTypedPropertyFn(NexusMetricTagConfig{}), + }), }, } @@ -500,6 +528,14 @@ func TestTransitionCanceled(t *testing.T) { ctx := &chasm.MockMutableContext{ MockContext: chasm.MockContext{ HandleNow: func(chasm.Component) time.Time { return defaultTime }, + HandleNamespaceEntry: func() *namespace.Namespace { + return namespace.NewNamespaceForTest( + &persistencespb.NamespaceInfo{Name: "ns-name"}, nil, false, nil, 0, + ) + }, + GoCtx: context.WithValue(context.Background(), OperationContextKey, &OperationContext{ + MetricTagConfig: dynamicconfig.GetTypedPropertyFn(NexusMetricTagConfig{}), + }), }, } @@ -563,6 +599,14 @@ func TestTransitionTimedOut(t *testing.T) { ctx := &chasm.MockMutableContext{ MockContext: chasm.MockContext{ HandleNow: func(chasm.Component) time.Time { return defaultTime }, + HandleNamespaceEntry: func() *namespace.Namespace { + return namespace.NewNamespaceForTest( + &persistencespb.NamespaceInfo{Name: "ns-name"}, nil, false, nil, 0, + ) + }, + GoCtx: context.WithValue(context.Background(), OperationContextKey, &OperationContext{ + MetricTagConfig: dynamicconfig.GetTypedPropertyFn(NexusMetricTagConfig{}), + }), }, } @@ -625,6 +669,14 @@ func TestTransitionTerminated(t *testing.T) { ctx := &chasm.MockMutableContext{ MockContext: chasm.MockContext{ HandleNow: func(chasm.Component) time.Time { return defaultTime }, + HandleNamespaceEntry: func() *namespace.Namespace { + return namespace.NewNamespaceForTest( + &persistencespb.NamespaceInfo{Name: "ns-name"}, nil, false, nil, 0, + ) + }, + GoCtx: context.WithValue(context.Background(), OperationContextKey, &OperationContext{ + MetricTagConfig: dynamicconfig.GetTypedPropertyFn(NexusMetricTagConfig{}), + }), }, } operation := newTestOperation() diff --git a/chasm/lib/nexusoperation/operation_tasks_test.go b/chasm/lib/nexusoperation/operation_tasks_test.go index 9873169b39c..70266a1c2e3 100644 --- a/chasm/lib/nexusoperation/operation_tasks_test.go +++ b/chasm/lib/nexusoperation/operation_tasks_test.go @@ -50,6 +50,7 @@ type invocationTaskTestEnv struct { op *Operation mockEngine *chasm.MockEngine timeSource *clock.EventTimeSource + nsRegistry *namespace.MockRegistry } func newInvocationTaskTestEnv( @@ -70,6 +71,7 @@ func newInvocationTaskTestEnv( nsRegistry := namespace.NewMockRegistry(ctrl) nsRegistry.EXPECT().GetNamespaceByID(namespace.ID("ns-id")).Return( namespace.NewNamespaceForTest(&persistencespb.NamespaceInfo{Name: "ns-name"}, nil, false, nil, 0), nil) + nsRegistry.EXPECT().GetNamespaceName(namespace.ID("ns-id")).Return(namespace.Name("ns-name"), nil).AnyTimes() callbackTmpl, err := template.New("callback").Parse("http://localhost/callback") require.NoError(t, err) @@ -131,6 +133,7 @@ func newInvocationTaskTestEnv( op: op, mockEngine: mockEngine, timeSource: timeSource, + nsRegistry: nsRegistry, } } @@ -174,6 +177,19 @@ func (e *invocationTaskTestEnv) setupUpdateComponent() { HandleNow: func(_ chasm.Component) time.Time { return e.timeSource.Now() }, + HandleExecutionKey: func() chasm.ExecutionKey { + return chasm.ExecutionKey{ + NamespaceID: "ns-id", + BusinessID: "wf-id", + RunID: "run-id", + } + }, + HandleNamespaceEntry: func() *namespace.Namespace { + return namespace.NewNamespaceForTest(&persistencespb.NamespaceInfo{Name: "ns-name"}, nil, false, nil, 0) + }, + GoCtx: context.WithValue(context.Background(), OperationContextKey, &OperationContext{ + MetricTagConfig: dynamicconfig.GetTypedPropertyFn(NexusMetricTagConfig{}), + }), }, } err := updateFn(mockCtx, e.op) @@ -749,6 +765,15 @@ func TestScheduleToStartTimeoutTaskHandler_Execute(t *testing.T) { ctx := &chasm.MockMutableContext{ MockContext: chasm.MockContext{ HandleNow: func(chasm.Component) time.Time { return defaultTime }, + HandleExecutionKey: func() chasm.ExecutionKey { + return chasm.ExecutionKey{NamespaceID: "ns-id"} + }, + HandleNamespaceEntry: func() *namespace.Namespace { + return namespace.NewNamespaceForTest(&persistencespb.NamespaceInfo{Name: "ns-name"}, nil, false, nil, 0) + }, + GoCtx: context.WithValue(context.Background(), OperationContextKey, &OperationContext{ + MetricTagConfig: dynamicconfig.GetTypedPropertyFn(NexusMetricTagConfig{}), + }), }, } @@ -815,6 +840,15 @@ func TestStartToCloseTimeoutTaskHandler_Execute(t *testing.T) { ctx := &chasm.MockMutableContext{ MockContext: chasm.MockContext{ HandleNow: func(chasm.Component) time.Time { return defaultTime }, + HandleExecutionKey: func() chasm.ExecutionKey { + return chasm.ExecutionKey{NamespaceID: "ns-id"} + }, + HandleNamespaceEntry: func() *namespace.Namespace { + return namespace.NewNamespaceForTest(&persistencespb.NamespaceInfo{Name: "ns-name"}, nil, false, nil, 0) + }, + GoCtx: context.WithValue(context.Background(), OperationContextKey, &OperationContext{ + MetricTagConfig: dynamicconfig.GetTypedPropertyFn(NexusMetricTagConfig{}), + }), }, } @@ -876,6 +910,15 @@ func TestScheduleToCloseTimeoutTaskHandler_Execute(t *testing.T) { ctx := &chasm.MockMutableContext{ MockContext: chasm.MockContext{ HandleNow: func(chasm.Component) time.Time { return defaultTime }, + HandleExecutionKey: func() chasm.ExecutionKey { + return chasm.ExecutionKey{NamespaceID: "ns-id"} + }, + HandleNamespaceEntry: func() *namespace.Namespace { + return namespace.NewNamespaceForTest(&persistencespb.NamespaceInfo{Name: "ns-name"}, nil, false, nil, 0) + }, + GoCtx: context.WithValue(context.Background(), OperationContextKey, &OperationContext{ + MetricTagConfig: dynamicconfig.GetTypedPropertyFn(NexusMetricTagConfig{}), + }), }, } diff --git a/chasm/lib/nexusoperation/operation_test.go b/chasm/lib/nexusoperation/operation_test.go index 8cf690d6d5f..9e65ceb4de9 100644 --- a/chasm/lib/nexusoperation/operation_test.go +++ b/chasm/lib/nexusoperation/operation_test.go @@ -1,6 +1,7 @@ package nexusoperation import ( + "context" "slices" "testing" "time" @@ -12,9 +13,12 @@ import ( persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/chasm" nexusoperationpb "go.temporal.io/server/chasm/lib/nexusoperation/gen/nexusoperationpb/v1" + "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/payload" "go.temporal.io/server/common/testing/protorequire" "go.temporal.io/server/common/testing/protoutils" + "go.uber.org/mock/gomock" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -110,10 +114,23 @@ func TestHandleNexusCompletion(t *testing.T) { require.NoError(t, TransitionStarted.Apply(op, ctx, EventStarted{OperationToken: "tok"})) return op } + ctrl := gomock.NewController(t) + nsRegistry := namespace.NewMockRegistry(ctrl) + nsRegistry.EXPECT().GetNamespaceName(namespace.ID("ns-id")).Return(namespace.Name("ns-name"), nil).AnyTimes() + newCtx := func() *chasm.MockMutableContext { return &chasm.MockMutableContext{ MockContext: chasm.MockContext{ HandleNow: func(chasm.Component) time.Time { return defaultTime }, + HandleExecutionKey: func() chasm.ExecutionKey { + return chasm.ExecutionKey{NamespaceID: "ns-id"} + }, + HandleNamespaceEntry: func() *namespace.Namespace { + return namespace.NewNamespaceForTest(&persistencespb.NamespaceInfo{Name: "ns-name"}, nil, false, nil, 0) + }, + GoCtx: context.WithValue(context.Background(), OperationContextKey, &OperationContext{ + MetricTagConfig: dynamicconfig.GetTypedPropertyFn(NexusMetricTagConfig{}), + }), }, } } diff --git a/chasm/lib/nexusoperation/tasks_test_helpers_test.go b/chasm/lib/nexusoperation/tasks_test_helpers_test.go index b8da24a6c06..803cf4c00d9 100644 --- a/chasm/lib/nexusoperation/tasks_test_helpers_test.go +++ b/chasm/lib/nexusoperation/tasks_test_helpers_test.go @@ -108,6 +108,10 @@ func (m *mockStoreComponent) OnNexusOperationCancellationFailed(ctx chasm.Mutabl return TransitionCancellationFailed.Apply(cancellation, ctx, EventCancellationFailed{Failure: cause}) } +func (m *mockStoreComponent) WorkflowTypeTag() string { + return "" +} + // mockStoreLibrary registers the mockStoreComponent so the CHASM tree can work with it. type mockStoreLibrary struct { chasm.UnimplementedLibrary diff --git a/chasm/lib/nexusoperation/workflow/commands_test.go b/chasm/lib/nexusoperation/workflow/commands_test.go index c73ebda7224..da2a56ede83 100644 --- a/chasm/lib/nexusoperation/workflow/commands_test.go +++ b/chasm/lib/nexusoperation/workflow/commands_test.go @@ -110,6 +110,12 @@ func newTestContext(t *testing.T, cfg *nexusoperation.Config) testContext { HandleEndpointByName: func(name string) (*persistencespb.NexusEndpointEntry, error) { return endpointReg.GetByName(context.Background(), tests.GlobalNamespaceEntry.ID(), name) }, + HandleExecutionKey: func() chasm.ExecutionKey { + return chasm.ExecutionKey{ + NamespaceID: tests.GlobalNamespaceEntry.ID().String(), + } + }, + GoCtx: context.WithValue(context.Background(), nexusoperation.OperationContextKey, &nexusoperation.OperationContext{MetricTagConfig: dynamicconfig.GetTypedPropertyFn(nexusoperation.NexusMetricTagConfig{})}), }, }, chReg) diff --git a/chasm/lib/nexusoperation/workflow/events.go b/chasm/lib/nexusoperation/workflow/events.go index 2483e57cff3..90cc68a75c6 100644 --- a/chasm/lib/nexusoperation/workflow/events.go +++ b/chasm/lib/nexusoperation/workflow/events.go @@ -218,8 +218,8 @@ func (d CompletedEventDefinition) Apply(ctx chasm.MutableContext, wf *chasmworkf completeTime := event.GetEventTime().AsTime() if err := nexusoperation.TransitionSucceeded.Apply(op, ctx, nexusoperation.EventSucceeded{ - Result: attrs.GetResult(), CompleteTime: &completeTime, + Result: attrs.GetResult(), }); err != nil { return err } @@ -331,7 +331,8 @@ func (d TimedOutEventDefinition) Apply(ctx chasm.MutableContext, wf *chasmworkfl op := field.Get(ctx) if err := nexusoperation.TransitionTimedOut.Apply(op, ctx, nexusoperation.EventTimedOut{ - Failure: attrs.GetFailure().GetCause(), + Failure: attrs.GetFailure().GetCause(), + TimeoutType: enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE, }); err != nil { return err } diff --git a/chasm/lib/workflow/workflow.go b/chasm/lib/workflow/workflow.go index 61593615124..205e5aca45e 100644 --- a/chasm/lib/workflow/workflow.go +++ b/chasm/lib/workflow/workflow.go @@ -432,6 +432,10 @@ func (w *Workflow) NexusOperationInvocationData( }, nil } +func (w *Workflow) WorkflowTypeTag() string { + return w.MSPointer.GetWorkflowTypeName() +} + func (w *Workflow) GetNexusCompletion( ctx chasm.Context, requestID string, diff --git a/chasm/ms_pointer.go b/chasm/ms_pointer.go index 41ad04033c1..f46cfad2329 100644 --- a/chasm/ms_pointer.go +++ b/chasm/ms_pointer.go @@ -47,3 +47,8 @@ func (m MSPointer) LoadHistoryEvent(ctx Context, token []byte) (*historypb.Histo func (m MSPointer) GetNexusCompletion(ctx Context, requestID string) (nexusrpc.CompleteOperationOptions, error) { return m.backend.GetNexusCompletion(ctx.goContext(), requestID) } + +// GetWorkflowTypeName retrieves the workflow type name from the underlying mutable state. +func (m MSPointer) GetWorkflowTypeName() string { + return m.backend.GetExecutionInfo().GetWorkflowTypeName() +} diff --git a/components/nexusoperations/config.go b/components/nexusoperations/config.go index 722b8c23ec6..226cf7b9518 100644 --- a/components/nexusoperations/config.go +++ b/components/nexusoperations/config.go @@ -149,7 +149,9 @@ var MetricTagConfiguration = dynamicconfig.NewGlobalTypedSetting( `Controls which metric tags are included with Nexus operation metrics. This configuration supports: 1. Service name tag - adds the Nexus service name as a metric dimension (IncludeServiceTag) 2. Operation name tag - adds the Nexus operation name as a metric dimension (IncludeOperationTag) -3. Header-based tags - maps values from request headers to metric tags (HeaderTagMappings) +3. Header-based tags - maps values from request headers to metric tags (HeaderTagMappings), unused by callers. + +Enabling service or operation tags will enable metric tags for both caller and handlers. Note: default metric tags (like namespace, endpoint) are always included and not affected by this configuration. Adding high-cardinality tags (like unique operation names) can significantly increase metric storage From 2fd2bd618e9291a391a7d393dfb32d7a1da31c99 Mon Sep 17 00:00:00 2001 From: Sam Mathis Date: Fri, 24 Apr 2026 15:58:48 -0700 Subject: [PATCH 2/6] Don't pass timeouttype separate from failure --- chasm/lib/nexusoperation/operation.go | 2 -- chasm/lib/nexusoperation/operation_statemachine.go | 5 ++--- chasm/lib/nexusoperation/workflow/events.go | 3 +-- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/chasm/lib/nexusoperation/operation.go b/chasm/lib/nexusoperation/operation.go index bcd4b7e1347..20e33d0c745 100644 --- a/chasm/lib/nexusoperation/operation.go +++ b/chasm/lib/nexusoperation/operation.go @@ -240,11 +240,9 @@ func (o *Operation) onTimedOut(ctx chasm.MutableContext, cause *failurepb.Failur if store, ok := o.Store.TryGet(ctx); ok { return store.OnNexusOperationTimedOut(ctx, o, cause, fromAttempt) } - timeoutType := cause.GetTimeoutFailureInfo().GetTimeoutType() return TransitionTimedOut.Apply(o, ctx, EventTimedOut{ Failure: cause, FromAttempt: fromAttempt, - TimeoutType: timeoutType, }) } diff --git a/chasm/lib/nexusoperation/operation_statemachine.go b/chasm/lib/nexusoperation/operation_statemachine.go index 1b22c36040c..e4ad2a0c3c5 100644 --- a/chasm/lib/nexusoperation/operation_statemachine.go +++ b/chasm/lib/nexusoperation/operation_statemachine.go @@ -4,7 +4,6 @@ import ( "time" commonpb "go.temporal.io/api/common/v1" - enumspb "go.temporal.io/api/enums/v1" failurepb "go.temporal.io/api/failure/v1" "go.temporal.io/server/chasm" "go.temporal.io/server/chasm/lib/nexusoperation/gen/nexusoperationpb/v1" @@ -305,7 +304,6 @@ type EventTimedOut struct { Failure *failurepb.Failure // FromAttempt is true when the failure came from an invocation attempt. FromAttempt bool - TimeoutType enumspb.TimeoutType } var TransitionTimedOut = chasm.NewTransition( @@ -321,7 +319,8 @@ var TransitionTimedOut = chasm.NewTransition( if err != nil { return err } - o.emitOnTimedOutMetrics(metricsHandler, closeTime, event.TimeoutType.String()) + timeoutType := event.Failure.GetTimeoutFailureInfo().GetTimeoutType().String() + o.emitOnTimedOutMetrics(metricsHandler, closeTime, timeoutType) return o.resolveUnsuccessfully(ctx, event.Failure, closeTime, event.FromAttempt) }, ) diff --git a/chasm/lib/nexusoperation/workflow/events.go b/chasm/lib/nexusoperation/workflow/events.go index 90cc68a75c6..4fbd84fe59f 100644 --- a/chasm/lib/nexusoperation/workflow/events.go +++ b/chasm/lib/nexusoperation/workflow/events.go @@ -331,8 +331,7 @@ func (d TimedOutEventDefinition) Apply(ctx chasm.MutableContext, wf *chasmworkfl op := field.Get(ctx) if err := nexusoperation.TransitionTimedOut.Apply(op, ctx, nexusoperation.EventTimedOut{ - Failure: attrs.GetFailure().GetCause(), - TimeoutType: enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE, + Failure: attrs.GetFailure().GetCause(), }); err != nil { return err } From 219d5e002ae300cd7b405600aec87afa97dd97bf Mon Sep 17 00:00:00 2001 From: Sam Mathis Date: Fri, 24 Apr 2026 16:11:41 -0700 Subject: [PATCH 3/6] linting --- chasm/lib/nexusoperation/operation.go | 2 +- chasm/lib/workflow/workflow.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/chasm/lib/nexusoperation/operation.go b/chasm/lib/nexusoperation/operation.go index 20e33d0c745..fd28c17567e 100644 --- a/chasm/lib/nexusoperation/operation.go +++ b/chasm/lib/nexusoperation/operation.go @@ -566,7 +566,6 @@ func (o *Operation) buildExecutionInfo(ctx chasm.Context) *nexuspb.NexusOperatio // EnrichMetricsHandler returns a metrics handler enriched with nexus operation tags. // Panics if the context value is missing, which indicates a library registration bug. func (o *Operation) enrichMetricsHandler(ctx chasm.Context) (metrics.Handler, error) { - // nolint:revive // unchecked-type-assertion: intentional panic on missing context value namespaceName := ctx.NamespaceEntry().Name().String() wftt := WorkflowTypeTag @@ -579,6 +578,7 @@ func (o *Operation) enrichMetricsHandler(ctx chasm.Context) (metrics.Handler, er metrics.WorkflowTypeTag(wftt), } + // nolint:revive // unchecked-type-assertion: intentional panic on missing context value opCtx := ctx.Value(OperationContextKey).(*OperationContext) conf := opCtx.MetricTagConfig() if conf.IncludeServiceTag { diff --git a/chasm/lib/workflow/workflow.go b/chasm/lib/workflow/workflow.go index 205e5aca45e..d0722546b8f 100644 --- a/chasm/lib/workflow/workflow.go +++ b/chasm/lib/workflow/workflow.go @@ -433,7 +433,7 @@ func (w *Workflow) NexusOperationInvocationData( } func (w *Workflow) WorkflowTypeTag() string { - return w.MSPointer.GetWorkflowTypeName() + return w.GetWorkflowTypeName() } func (w *Workflow) GetNexusCompletion( From 359c1d35b73e5bd2ba0b28496999e7cdbb518e11 Mon Sep 17 00:00:00 2001 From: Sam Mathis Date: Mon, 27 Apr 2026 14:05:25 -0700 Subject: [PATCH 4/6] Match SANO metrics to activities --- chasm/lib/nexusoperation/metrics.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/chasm/lib/nexusoperation/metrics.go b/chasm/lib/nexusoperation/metrics.go index fac5ee4f7f2..ca438604b63 100644 --- a/chasm/lib/nexusoperation/metrics.go +++ b/chasm/lib/nexusoperation/metrics.go @@ -13,23 +13,23 @@ var OutboundRequestLatency = metrics.NewTimerDef( metrics.WithDescription("Latency of outbound Nexus requests made by the history service."), ) var NexusOperationSuccessCount = metrics.NewCounterDef( - "nexus_operation_success_count", + "nexus_operation_success", metrics.WithDescription("Nexus Operations successfully completed."), ) var NexusOperationFailedCount = metrics.NewCounterDef( - "nexus_operation_failed_count", + "nexus_operation_fail", metrics.WithDescription("Nexus Operations failures."), ) var NexusOperationCancelCount = metrics.NewCounterDef( - "nexus_operation_cancel_count", + "nexus_operation_cancel", metrics.WithDescription("Nexus Operations cancellations."), ) var NexusOperationTerminateCount = metrics.NewCounterDef( - "nexus_operation_terminate_count", + "nexus_operation_terminate", metrics.WithDescription("Nexus Operations that were terminated before completion."), ) var NexusOperationTimeoutCount = metrics.NewCounterDef( - "nexus_operation_timeout_count", + "nexus_operation_timeout", metrics.WithDescription("Nexus Operations that timed out before completion."), ) From 0086243128a6fc0959d2ecdf913d7c8e34844af3 Mon Sep 17 00:00:00 2001 From: Sam Mathis Date: Tue, 28 Apr 2026 13:42:11 -0700 Subject: [PATCH 5/6] PR feedback --- chasm/lib/nexusoperation/library.go | 12 ++--- chasm/lib/nexusoperation/metrics.go | 11 ++-- chasm/lib/nexusoperation/operation.go | 53 +++++++++++-------- .../nexusoperation/operation_statemachine.go | 37 +++---------- .../nexusoperation/tasks_test_helpers_test.go | 2 +- chasm/lib/workflow/workflow.go | 2 +- components/nexusoperations/config.go | 8 ++- 7 files changed, 51 insertions(+), 74 deletions(-) diff --git a/chasm/lib/nexusoperation/library.go b/chasm/lib/nexusoperation/library.go index c74e6499969..24f31678842 100644 --- a/chasm/lib/nexusoperation/library.go +++ b/chasm/lib/nexusoperation/library.go @@ -4,7 +4,6 @@ import ( "go.temporal.io/server/chasm" nexusoperationpb "go.temporal.io/server/chasm/lib/nexusoperation/gen/nexusoperationpb/v1" "go.temporal.io/server/common/dynamicconfig" - "go.temporal.io/server/common/namespace" "google.golang.org/grpc" ) @@ -23,14 +22,12 @@ type OperationContext struct { // Used in the frontend to enable component ref serialization. type componentOnlyLibrary struct { chasm.UnimplementedLibrary - namespaceRegistry namespace.Registry - metricTagConfig dynamicconfig.TypedPropertyFn[NexusMetricTagConfig] + metricTagConfig dynamicconfig.TypedPropertyFn[NexusMetricTagConfig] } -func newComponentOnlyLibrary(namespaceRegistry namespace.Registry, dc *dynamicconfig.Collection) *componentOnlyLibrary { +func newComponentOnlyLibrary(dc *dynamicconfig.Collection) *componentOnlyLibrary { return &componentOnlyLibrary{ - namespaceRegistry: namespaceRegistry, - metricTagConfig: MetricTagConfiguration.Get(dc), + metricTagConfig: MetricTagConfiguration.Get(dc), } } @@ -84,11 +81,10 @@ func newLibrary( operationStartToCloseTimeoutTaskHandler *operationStartToCloseTimeoutTaskHandler, cancellationInvocationTaskHandler *cancellationInvocationTaskHandler, cancellationBackoffTaskHandler *cancellationBackoffTaskHandler, - namespaceRegistry namespace.Registry, dc *dynamicconfig.Collection, ) *Library { return &Library{ - componentOnlyLibrary: *newComponentOnlyLibrary(namespaceRegistry, dc), + componentOnlyLibrary: *newComponentOnlyLibrary(dc), handler: handler, operationBackoffTaskHandler: operationBackoffTaskHandler, operationInvocationTaskHandler: operationInvocationTaskHandler, diff --git a/chasm/lib/nexusoperation/metrics.go b/chasm/lib/nexusoperation/metrics.go index ca438604b63..64f09da06e5 100644 --- a/chasm/lib/nexusoperation/metrics.go +++ b/chasm/lib/nexusoperation/metrics.go @@ -43,18 +43,15 @@ var NexusOperationScheduleToStartLatency = metrics.NewTimerDef( ) var NexusOperationStartToCloseLatency = metrics.NewTimerDef( "nexus_operation_start_to_close_latency", - metrics.WithDescription("Duration from Nexus Operation scheduled time to completed time. Only emitted for async operations."), + metrics.WithDescription("Duration from Nexus Operation started time to completed time. Only emitted for async operations."), ) -// NexusMetricTagConfig controls which optional metric tags are included with Nexus operation -// metrics. Enabling service or operation tags applies to both caller and handler metrics. HeaderTagMappings is -// not used for caller metrics. type NexusMetricTagConfig struct { - // Include service name as a metric tag + // Include service name as a metric tag. Used for caller and handler metrics. IncludeServiceTag bool - // Include operation name as a metric tag + // Include operation name as a metric tag. Used for caller and handler metrics. IncludeOperationTag bool - // Configuration for mapping request headers to metric tags + // Configuration for mapping request headers to metric tags. Only used for handler metrics. HeaderTagMappings []NexusHeaderTagMapping } diff --git a/chasm/lib/nexusoperation/operation.go b/chasm/lib/nexusoperation/operation.go index fd28c17567e..9c8375dcf00 100644 --- a/chasm/lib/nexusoperation/operation.go +++ b/chasm/lib/nexusoperation/operation.go @@ -47,9 +47,10 @@ var ErrCancellationAlreadyRequested = serviceerror.NewFailedPrecondition("cancel var ErrOperationAlreadyCompleted = serviceerror.NewFailedPrecondition("operation already completed") const ( - // WorkflowTypeTag is a required workflow tag for standalone operations to ensure consistent - // metric labeling between workflows and activities. - WorkflowTypeTag = "__temporal_standalone_nexus_operation__" + // standaloneOperationWorkflowTypeName is the workflow type for tagging standalone operations. + // Used as the WorkflowTypeTag in metrics emitted from standalone operations. + // Do not change. It is exposed in metrics. + standaloneOperationWorkflowTypeName = "__temporal_standalone_nexus_operation__" ) // InvocationData contains data needed to invoke a Nexus operation. @@ -77,7 +78,7 @@ type OperationStore interface { OnNexusOperationCancellationFailed(ctx chasm.MutableContext, operation *Operation, cause *failurepb.Failure) error // NexusOperationInvocationData loads invocation data (Input, Header, NexusLinks) from the scheduled history event. NexusOperationInvocationData(ctx chasm.Context, operation *Operation) (InvocationData, error) - WorkflowTypeTag() string + WorkflowTypeName() string } // Operation is a CHASM component that represents a Nexus operation. @@ -563,14 +564,13 @@ func (o *Operation) buildExecutionInfo(ctx chasm.Context) *nexuspb.NexusOperatio return info } -// EnrichMetricsHandler returns a metrics handler enriched with nexus operation tags. -// Panics if the context value is missing, which indicates a library registration bug. -func (o *Operation) enrichMetricsHandler(ctx chasm.Context) (metrics.Handler, error) { +// metricsHandler returns a metrics handler enriched with nexus operation tags. +func (o *Operation) metricsHandler(ctx chasm.Context) metrics.Handler { namespaceName := ctx.NamespaceEntry().Name().String() - wftt := WorkflowTypeTag + wftt := standaloneOperationWorkflowTypeName if store, ok := o.Store.TryGet(ctx); ok { - wftt = store.WorkflowTypeTag() + wftt = store.WorkflowTypeName() } tags := []metrics.Tag{ metrics.NamespaceTag(namespaceName), @@ -579,54 +579,63 @@ func (o *Operation) enrichMetricsHandler(ctx chasm.Context) (metrics.Handler, er } // nolint:revive // unchecked-type-assertion: intentional panic on missing context value - opCtx := ctx.Value(OperationContextKey).(*OperationContext) - conf := opCtx.MetricTagConfig() - if conf.IncludeServiceTag { - tags = append(tags, metrics.NexusServiceTag(o.GetService())) - } - if conf.IncludeOperationTag { - tags = append(tags, metrics.NexusOperationTag(o.GetOperation())) + opCtx, ok := ctx.Value(OperationContextKey).(*OperationContext) + if !ok { + softassert.Fail(ctx.Logger(), "operation context missing") + } else { + conf := opCtx.MetricTagConfig() + if conf.IncludeServiceTag { + tags = append(tags, metrics.NexusServiceTag(o.GetService())) + } + if conf.IncludeOperationTag { + tags = append(tags, metrics.NexusOperationTag(o.GetOperation())) + } } - return ctx.MetricsHandler().WithTags(tags...), nil + return ctx.MetricsHandler().WithTags(tags...) } -func (o *Operation) emitOnSucceededMetrics(handler metrics.Handler, closeTime time.Time) { +func (o *Operation) emitOnSucceededMetrics(ctx chasm.Context, closeTime time.Time) { outcomeTag := metrics.OutcomeTag( strings.ToLower(nexusoperationpb.OPERATION_STATUS_SUCCEEDED.String()), ) + handler := o.metricsHandler(ctx) NexusOperationSuccessCount.With(handler).Record(1) o.emitLatencyMetrics(handler, closeTime, outcomeTag) } -func (o *Operation) emitOnFailedMetrics(handler metrics.Handler, closeTime time.Time) { +func (o *Operation) emitOnFailedMetrics(ctx chasm.Context, closeTime time.Time) { outcomeTag := metrics.OutcomeTag( strings.ToLower(nexusoperationpb.OPERATION_STATUS_FAILED.String()), ) + handler := o.metricsHandler(ctx) NexusOperationFailedCount.With(handler).Record(1) o.emitLatencyMetrics(handler, closeTime, outcomeTag) } -func (o *Operation) emitOnCanceledMetrics(handler metrics.Handler, closeTime time.Time) { +func (o *Operation) emitOnCanceledMetrics(ctx chasm.Context, closeTime time.Time) { outcomeTag := metrics.OutcomeTag( strings.ToLower(nexusoperationpb.OPERATION_STATUS_CANCELED.String()), ) + handler := o.metricsHandler(ctx) NexusOperationCancelCount.With(handler).Record(1) o.emitLatencyMetrics(handler, closeTime, outcomeTag) } -func (o *Operation) emitOnTimedOutMetrics(handler metrics.Handler, closeTime time.Time, timeoutType string) { +func (o *Operation) emitOnTimedOutMetrics(ctx chasm.Context, closeTime time.Time, timeoutType string) { outcomeTag := metrics.OutcomeTag( strings.ToLower(nexusoperationpb.OPERATION_STATUS_TIMED_OUT.String()), ) + handler := o.metricsHandler(ctx) NexusOperationTimeoutCount.With(handler).Record(1, metrics.StringTag("timeout_type", timeoutType)) o.emitLatencyMetrics(handler, closeTime, outcomeTag) } -func (o *Operation) emitOnTerminatedMetrics(handler metrics.Handler, closeTime time.Time) { +func (o *Operation) emitOnTerminatedMetrics(ctx chasm.Context, closeTime time.Time) { outcomeTag := metrics.OutcomeTag( strings.ToLower(nexusoperationpb.OPERATION_STATUS_TERMINATED.String()), ) + handler := o.metricsHandler(ctx) NexusOperationTerminateCount.With(handler).Record(1) o.emitLatencyMetrics(handler, closeTime, outcomeTag) } diff --git a/chasm/lib/nexusoperation/operation_statemachine.go b/chasm/lib/nexusoperation/operation_statemachine.go index e4ad2a0c3c5..ba6f2498992 100644 --- a/chasm/lib/nexusoperation/operation_statemachine.go +++ b/chasm/lib/nexusoperation/operation_statemachine.go @@ -135,11 +135,8 @@ var TransitionStarted = chasm.NewTransition( // Store the operation token for async completion. o.OperationToken = event.OperationToken - // Emit schedule-to-start latency on the transition to started. - metricsHandler, err := o.enrichMetricsHandler(ctx) - if err != nil { - return err - } + // Emit schedule-to-start latency + metricsHandler := o.metricsHandler(ctx) NexusOperationScheduleToStartLatency.With(metricsHandler).Record(startTime.Sub(o.GetScheduledTime().AsTime())) // Emit a start-to-close timeout task if configured. @@ -192,11 +189,7 @@ var TransitionSucceeded = chasm.NewTransition( Successful: &nexusoperationpb.OperationOutcome_Successful{Result: event.Result}, } - metricsHandler, err := o.enrichMetricsHandler(ctx) - if err != nil { - return err - } - o.emitOnSucceededMetrics(metricsHandler, closeTime) + o.emitOnSucceededMetrics(ctx, closeTime) // Terminal state - no tasks to emit. return nil }, @@ -222,13 +215,9 @@ var TransitionFailed = chasm.NewTransition( if event.CompleteTime != nil { closeTime = *event.CompleteTime } - metricsHandler, err := o.enrichMetricsHandler(ctx) - if err != nil { - return err - } // Attempts only execute in SCHEDULED, so that status identifies attempt-originated failures. fromAttempt := o.GetStatus() == nexusoperationpb.OPERATION_STATUS_SCHEDULED - o.emitOnFailedMetrics(metricsHandler, closeTime) + o.emitOnFailedMetrics(ctx, closeTime) return o.resolveUnsuccessfully(ctx, event.Failure, closeTime, fromAttempt) }, ) @@ -253,12 +242,8 @@ var TransitionCanceled = chasm.NewTransition( if event.CompleteTime != nil { closeTime = *event.CompleteTime } - metricsHandler, err := o.enrichMetricsHandler(ctx) - if err != nil { - return err - } + o.emitOnCanceledMetrics(ctx, closeTime) // Attempts only execute in SCHEDULED, so that status identifies attempt-originated cancels. - o.emitOnCanceledMetrics(metricsHandler, closeTime) fromAttempt := o.GetStatus() == nexusoperationpb.OPERATION_STATUS_SCHEDULED return o.resolveUnsuccessfully(ctx, event.Failure, closeTime, fromAttempt) }, @@ -290,11 +275,7 @@ var TransitionTerminated = chasm.NewTransition( }, }, } - metricsHandler, err := o.enrichMetricsHandler(ctx) - if err != nil { - return err - } - o.emitOnTerminatedMetrics(metricsHandler, closeTime) + o.emitOnTerminatedMetrics(ctx, closeTime) return o.resolveUnsuccessfully(ctx, failure, closeTime, false) }, ) @@ -315,12 +296,8 @@ var TransitionTimedOut = chasm.NewTransition( nexusoperationpb.OPERATION_STATUS_TIMED_OUT, func(o *Operation, ctx chasm.MutableContext, event EventTimedOut) error { closeTime := ctx.Now(o) - metricsHandler, err := o.enrichMetricsHandler(ctx) - if err != nil { - return err - } timeoutType := event.Failure.GetTimeoutFailureInfo().GetTimeoutType().String() - o.emitOnTimedOutMetrics(metricsHandler, closeTime, timeoutType) + o.emitOnTimedOutMetrics(ctx, closeTime, timeoutType) return o.resolveUnsuccessfully(ctx, event.Failure, closeTime, event.FromAttempt) }, ) diff --git a/chasm/lib/nexusoperation/tasks_test_helpers_test.go b/chasm/lib/nexusoperation/tasks_test_helpers_test.go index 803cf4c00d9..9e551028b28 100644 --- a/chasm/lib/nexusoperation/tasks_test_helpers_test.go +++ b/chasm/lib/nexusoperation/tasks_test_helpers_test.go @@ -108,7 +108,7 @@ func (m *mockStoreComponent) OnNexusOperationCancellationFailed(ctx chasm.Mutabl return TransitionCancellationFailed.Apply(cancellation, ctx, EventCancellationFailed{Failure: cause}) } -func (m *mockStoreComponent) WorkflowTypeTag() string { +func (m *mockStoreComponent) WorkflowTypeName() string { return "" } diff --git a/chasm/lib/workflow/workflow.go b/chasm/lib/workflow/workflow.go index d0722546b8f..fd4b189dc0d 100644 --- a/chasm/lib/workflow/workflow.go +++ b/chasm/lib/workflow/workflow.go @@ -432,7 +432,7 @@ func (w *Workflow) NexusOperationInvocationData( }, nil } -func (w *Workflow) WorkflowTypeTag() string { +func (w *Workflow) WorkflowTypeName() string { return w.GetWorkflowTypeName() } diff --git a/components/nexusoperations/config.go b/components/nexusoperations/config.go index 226cf7b9518..686f18b3067 100644 --- a/components/nexusoperations/config.go +++ b/components/nexusoperations/config.go @@ -147,11 +147,9 @@ var MetricTagConfiguration = dynamicconfig.NewGlobalTypedSetting( "component.nexusoperations.metrics.tags", chasmnexus.NexusMetricTagConfig{}, `Controls which metric tags are included with Nexus operation metrics. This configuration supports: -1. Service name tag - adds the Nexus service name as a metric dimension (IncludeServiceTag) -2. Operation name tag - adds the Nexus operation name as a metric dimension (IncludeOperationTag) -3. Header-based tags - maps values from request headers to metric tags (HeaderTagMappings), unused by callers. - -Enabling service or operation tags will enable metric tags for both caller and handlers. +1. Service name tag - adds the Nexus service name as a metric dimension (IncludeServiceTag). Used by callers and handlers. +2. Operation name tag - adds the Nexus operation name as a metric dimension (IncludeOperationTag). Used by callers and handlers. +3. Header-based tags - maps values from request headers to metric tags (HeaderTagMappings). Only used by handlers. Note: default metric tags (like namespace, endpoint) are always included and not affected by this configuration. Adding high-cardinality tags (like unique operation names) can significantly increase metric storage From 78653e10e86b6bb1772cee08171834d3c74fdb7c Mon Sep 17 00:00:00 2001 From: Sam Mathis Date: Tue, 28 Apr 2026 14:08:01 -0700 Subject: [PATCH 6/6] Add tests for metrics capture --- .../operation_statemachine_test.go | 45 ++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/chasm/lib/nexusoperation/operation_statemachine_test.go b/chasm/lib/nexusoperation/operation_statemachine_test.go index 692580490a0..e30007d08e3 100644 --- a/chasm/lib/nexusoperation/operation_statemachine_test.go +++ b/chasm/lib/nexusoperation/operation_statemachine_test.go @@ -13,6 +13,8 @@ import ( "go.temporal.io/server/chasm/lib/nexusoperation/gen/nexusoperationpb/v1" "go.temporal.io/server/common/backoff" "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/metrics/metricstest" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/testing/protorequire" "google.golang.org/protobuf/proto" @@ -366,6 +368,10 @@ func TestTransitionSucceeded(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { + metricsHandler := metricstest.NewCaptureHandler() + capture := metricsHandler.StartCapture() + defer metricsHandler.StopCapture(capture) + ctx := &chasm.MockMutableContext{ MockContext: chasm.MockContext{ HandleNow: func(chasm.Component) time.Time { return defaultTime }, @@ -374,14 +380,19 @@ func TestTransitionSucceeded(t *testing.T) { &persistencespb.NamespaceInfo{Name: "ns-name"}, nil, false, nil, 0, ) }, + HandleMetricsHandler: func() metrics.Handler { return metricsHandler }, GoCtx: context.WithValue(context.Background(), OperationContextKey, &OperationContext{ - MetricTagConfig: dynamicconfig.GetTypedPropertyFn(NexusMetricTagConfig{}), + MetricTagConfig: dynamicconfig.GetTypedPropertyFn(NexusMetricTagConfig{ + IncludeServiceTag: true, + IncludeOperationTag: true, + }), }), }, } operation := newTestOperation() operation.Status = nexusoperationpb.OPERATION_STATUS_STARTED + operation.StartedTime = operation.ScheduledTime err := TransitionSucceeded.Apply(operation, ctx, EventSucceeded{ CompleteTime: tc.completeTime, @@ -396,6 +407,38 @@ func TestTransitionSucceeded(t *testing.T) { require.NotNil(t, outcome.GetSuccessful()) protorequire.ProtoEqual(t, tc.result, outcome.GetSuccessful().GetResult()) require.Empty(t, ctx.Tasks) + + snap := capture.Snapshot() + + successCount := snap[NexusOperationSuccessCount.Name()] + require.Len(t, successCount, 1) + require.Equal(t, int64(1), successCount[0].Value) + require.Equal(t, "ns-name", successCount[0].Tags["namespace"]) + require.Equal(t, "test-endpoint", successCount[0].Tags["nexus_endpoint"]) + require.Equal(t, "test-service", successCount[0].Tags["nexus_service"]) + require.Equal(t, "test-operation", successCount[0].Tags["nexus_operation"]) + require.Equal(t, standaloneOperationWorkflowTypeName, successCount[0].Tags["workflowType"]) + + expectedLatency := tc.expectedClosedTime.Sub(operation.ScheduledTime.AsTime()) + + stcLatency := snap[NexusOperationScheduleToCloseLatency.Name()] + require.Len(t, stcLatency, 1) + require.Equal(t, expectedLatency, stcLatency[0].Value) + require.Equal(t, "succeeded", stcLatency[0].Tags["outcome"]) + + startToCloseLatency := snap[NexusOperationStartToCloseLatency.Name()] + require.Len(t, startToCloseLatency, 1) + require.Equal(t, expectedLatency, startToCloseLatency[0].Value) + require.Equal(t, "succeeded", startToCloseLatency[0].Tags["outcome"]) + + // Schedule-to-start is emitted in TransitionStarted, not from the success path + // when the operation was already started. + require.Empty(t, snap[NexusOperationScheduleToStartLatency.Name()]) + + require.Empty(t, snap[NexusOperationFailedCount.Name()]) + require.Empty(t, snap[NexusOperationCancelCount.Name()]) + require.Empty(t, snap[NexusOperationTimeoutCount.Name()]) + require.Empty(t, snap[NexusOperationTerminateCount.Name()]) }) } }