diff --git a/chasm/lib/nexusoperation/library.go b/chasm/lib/nexusoperation/library.go index 04966174551..24f31678842 100644 --- a/chasm/lib/nexusoperation/library.go +++ b/chasm/lib/nexusoperation/library.go @@ -3,17 +3,32 @@ package nexusoperation import ( "go.temporal.io/server/chasm" nexusoperationpb "go.temporal.io/server/chasm/lib/nexusoperation/gen/nexusoperationpb/v1" + "go.temporal.io/server/common/dynamicconfig" "google.golang.org/grpc" ) +type operationContextKeyType struct{} + +// OperationContextKey is the context key for OperationContext, registered as a CHASM component +// context value. Exported for use in tests that need to set up MockContext. +var OperationContextKey = operationContextKeyType{} + +// OperationContext holds dependencies injected into the chasm.Context for use by Operation methods. +type OperationContext struct { + MetricTagConfig dynamicconfig.TypedPropertyFn[NexusMetricTagConfig] +} + // componentOnlyLibrary registers just the components without task executors or gRPC handlers. // Used in the frontend to enable component ref serialization. type componentOnlyLibrary struct { chasm.UnimplementedLibrary + metricTagConfig dynamicconfig.TypedPropertyFn[NexusMetricTagConfig] } -func newComponentOnlyLibrary() *componentOnlyLibrary { - return &componentOnlyLibrary{} +func newComponentOnlyLibrary(dc *dynamicconfig.Collection) *componentOnlyLibrary { + return &componentOnlyLibrary{ + metricTagConfig: MetricTagConfiguration.Get(dc), + } } func (l *componentOnlyLibrary) Name() string { @@ -32,6 +47,11 @@ func (l *componentOnlyLibrary) Components() []*chasm.RegistrableComponent { StatusSearchAttribute, ), chasm.WithBusinessIDAlias("OperationId"), + chasm.WithContextValues(map[any]any{ + OperationContextKey: &OperationContext{ + MetricTagConfig: l.metricTagConfig, + }, + }), ), chasm.NewRegistrableComponent[*Cancellation]("cancellation"), } @@ -61,8 +81,10 @@ func newLibrary( operationStartToCloseTimeoutTaskHandler *operationStartToCloseTimeoutTaskHandler, cancellationInvocationTaskHandler *cancellationInvocationTaskHandler, cancellationBackoffTaskHandler *cancellationBackoffTaskHandler, + dc *dynamicconfig.Collection, ) *Library { return &Library{ + componentOnlyLibrary: *newComponentOnlyLibrary(dc), handler: handler, operationBackoffTaskHandler: operationBackoffTaskHandler, operationInvocationTaskHandler: operationInvocationTaskHandler, diff --git a/chasm/lib/nexusoperation/metrics.go b/chasm/lib/nexusoperation/metrics.go index 9b68d5c4422..64f09da06e5 100644 --- a/chasm/lib/nexusoperation/metrics.go +++ b/chasm/lib/nexusoperation/metrics.go @@ -12,13 +12,46 @@ var OutboundRequestLatency = metrics.NewTimerDef( "nexus_outbound_latency", metrics.WithDescription("Latency of outbound Nexus requests made by the history service."), ) +var NexusOperationSuccessCount = metrics.NewCounterDef( + "nexus_operation_success", + metrics.WithDescription("Nexus Operations successfully completed."), +) +var NexusOperationFailedCount = metrics.NewCounterDef( + "nexus_operation_fail", + metrics.WithDescription("Nexus Operations failures."), +) +var NexusOperationCancelCount = metrics.NewCounterDef( + "nexus_operation_cancel", + metrics.WithDescription("Nexus Operations cancellations."), +) +var NexusOperationTerminateCount = metrics.NewCounterDef( + "nexus_operation_terminate", + metrics.WithDescription("Nexus Operations that were terminated before completion."), +) +var NexusOperationTimeoutCount = metrics.NewCounterDef( + "nexus_operation_timeout", + metrics.WithDescription("Nexus Operations that timed out before completion."), +) + +var NexusOperationScheduleToCloseLatency = metrics.NewTimerDef( + "nexus_operation_schedule_to_close_latency", + metrics.WithDescription("Duration from Nexus Operation scheduled time to terminal state."), +) +var NexusOperationScheduleToStartLatency = metrics.NewTimerDef( + "nexus_operation_schedule_to_start_latency", + metrics.WithDescription("Duration from Nexus Operation scheduled time to started time."), +) +var NexusOperationStartToCloseLatency = metrics.NewTimerDef( + "nexus_operation_start_to_close_latency", + metrics.WithDescription("Duration from Nexus Operation started time to completed time. Only emitted for async operations."), +) type NexusMetricTagConfig struct { - // Include service name as a metric tag + // Include service name as a metric tag. Used for caller and handler metrics. IncludeServiceTag bool - // Include operation name as a metric tag + // Include operation name as a metric tag. Used for caller and handler metrics. IncludeOperationTag bool - // Configuration for mapping request headers to metric tags + // Configuration for mapping request headers to metric tags. Only used for handler metrics. HeaderTagMappings []NexusHeaderTagMapping } diff --git a/chasm/lib/nexusoperation/operation.go b/chasm/lib/nexusoperation/operation.go index 1725e5fd5b3..9c8375dcf00 100644 --- a/chasm/lib/nexusoperation/operation.go +++ b/chasm/lib/nexusoperation/operation.go @@ -2,6 +2,7 @@ package nexusoperation import ( "fmt" + "strings" "time" "github.com/google/uuid" @@ -16,6 +17,7 @@ import ( "go.temporal.io/server/chasm" nexusoperationpb "go.temporal.io/server/chasm/lib/nexusoperation/gen/nexusoperationpb/v1" "go.temporal.io/server/common/backoff" + "go.temporal.io/server/common/metrics" commonnexus "go.temporal.io/server/common/nexus" "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/common/softassert" @@ -44,6 +46,13 @@ var ErrCancellationAlreadyRequested = serviceerror.NewFailedPrecondition("cancel // ErrOperationAlreadyCompleted is returned when trying to cancel an operation that has already completed. var ErrOperationAlreadyCompleted = serviceerror.NewFailedPrecondition("operation already completed") +const ( + // standaloneOperationWorkflowTypeName is the workflow type for tagging standalone operations. + // Used as the WorkflowTypeTag in metrics emitted from standalone operations. + // Do not change. It is exposed in metrics. + standaloneOperationWorkflowTypeName = "__temporal_standalone_nexus_operation__" +) + // InvocationData contains data needed to invoke a Nexus operation. type InvocationData struct { // Input is the operation input payload. @@ -69,6 +78,7 @@ type OperationStore interface { OnNexusOperationCancellationFailed(ctx chasm.MutableContext, operation *Operation, cause *failurepb.Failure) error // NexusOperationInvocationData loads invocation data (Input, Header, NexusLinks) from the scheduled history event. NexusOperationInvocationData(ctx chasm.Context, operation *Operation) (InvocationData, error) + WorkflowTypeName() string } // Operation is a CHASM component that represents a Nexus operation. @@ -398,7 +408,10 @@ func (o *Operation) Terminate( } return chasm.TerminateComponentResponse{}, nil } - return chasm.TerminateComponentResponse{}, TransitionTerminated.Apply(o, ctx, EventTerminated{TerminateComponentRequest: req}) + + return chasm.TerminateComponentResponse{}, TransitionTerminated.Apply(o, ctx, EventTerminated{ + TerminateComponentRequest: req, + }) } func (o *Operation) SearchAttributes(_ chasm.Context) []chasm.SearchAttributeKeyValue { @@ -551,6 +564,99 @@ func (o *Operation) buildExecutionInfo(ctx chasm.Context) *nexuspb.NexusOperatio return info } +// metricsHandler returns a metrics handler enriched with nexus operation tags. +func (o *Operation) metricsHandler(ctx chasm.Context) metrics.Handler { + namespaceName := ctx.NamespaceEntry().Name().String() + + wftt := standaloneOperationWorkflowTypeName + if store, ok := o.Store.TryGet(ctx); ok { + wftt = store.WorkflowTypeName() + } + tags := []metrics.Tag{ + metrics.NamespaceTag(namespaceName), + metrics.NexusEndpointTag(o.GetEndpoint()), + metrics.WorkflowTypeTag(wftt), + } + + // nolint:revive // unchecked-type-assertion: intentional panic on missing context value + opCtx, ok := ctx.Value(OperationContextKey).(*OperationContext) + if !ok { + softassert.Fail(ctx.Logger(), "operation context missing") + } else { + conf := opCtx.MetricTagConfig() + if conf.IncludeServiceTag { + tags = append(tags, metrics.NexusServiceTag(o.GetService())) + } + if conf.IncludeOperationTag { + tags = append(tags, metrics.NexusOperationTag(o.GetOperation())) + } + } + + return ctx.MetricsHandler().WithTags(tags...) +} + +func (o *Operation) emitOnSucceededMetrics(ctx chasm.Context, closeTime time.Time) { + outcomeTag := metrics.OutcomeTag( + strings.ToLower(nexusoperationpb.OPERATION_STATUS_SUCCEEDED.String()), + ) + handler := o.metricsHandler(ctx) + NexusOperationSuccessCount.With(handler).Record(1) + o.emitLatencyMetrics(handler, closeTime, outcomeTag) +} + +func (o *Operation) emitOnFailedMetrics(ctx chasm.Context, closeTime time.Time) { + outcomeTag := metrics.OutcomeTag( + strings.ToLower(nexusoperationpb.OPERATION_STATUS_FAILED.String()), + ) + handler := o.metricsHandler(ctx) + NexusOperationFailedCount.With(handler).Record(1) + o.emitLatencyMetrics(handler, closeTime, outcomeTag) +} + +func (o *Operation) emitOnCanceledMetrics(ctx chasm.Context, closeTime time.Time) { + outcomeTag := metrics.OutcomeTag( + strings.ToLower(nexusoperationpb.OPERATION_STATUS_CANCELED.String()), + ) + handler := o.metricsHandler(ctx) + NexusOperationCancelCount.With(handler).Record(1) + o.emitLatencyMetrics(handler, closeTime, outcomeTag) +} + +func (o *Operation) emitOnTimedOutMetrics(ctx chasm.Context, closeTime time.Time, timeoutType string) { + outcomeTag := metrics.OutcomeTag( + strings.ToLower(nexusoperationpb.OPERATION_STATUS_TIMED_OUT.String()), + ) + handler := o.metricsHandler(ctx) + NexusOperationTimeoutCount.With(handler).Record(1, metrics.StringTag("timeout_type", timeoutType)) + o.emitLatencyMetrics(handler, closeTime, outcomeTag) +} + +func (o *Operation) emitOnTerminatedMetrics(ctx chasm.Context, closeTime time.Time) { + outcomeTag := metrics.OutcomeTag( + strings.ToLower(nexusoperationpb.OPERATION_STATUS_TERMINATED.String()), + ) + handler := o.metricsHandler(ctx) + NexusOperationTerminateCount.With(handler).Record(1) + o.emitLatencyMetrics(handler, closeTime, outcomeTag) +} + +// emitLatencyMetrics emits schedule-to-close, schedule-to-start, and start-to-close latencies. +func (o *Operation) emitLatencyMetrics(handler metrics.Handler, closeTime time.Time, outcomeTag metrics.Tag) { + scheduledTime := o.GetScheduledTime().AsTime() + NexusOperationScheduleToCloseLatency.With(handler).Record(closeTime.Sub(scheduledTime), outcomeTag) + + startedTime := o.GetStartedTime() + if startedTime != nil { + // Async operation that was started. + // Schedule-to-start latency is emitted in TransitionStarted. + NexusOperationStartToCloseLatency.With(handler).Record(closeTime.Sub(startedTime.AsTime()), outcomeTag) + } else { + // Sync operation or operation that never started. + // For sync ops, schedule-to-start equals schedule-to-close. + NexusOperationScheduleToStartLatency.With(handler).Record(closeTime.Sub(scheduledTime), outcomeTag) + } +} + func (o *Operation) closeTime(ctx chasm.Context) *timestamppb.Timestamp { if !o.LifecycleState(ctx).IsClosed() { return nil diff --git a/chasm/lib/nexusoperation/operation_statemachine.go b/chasm/lib/nexusoperation/operation_statemachine.go index f82290c2098..ba6f2498992 100644 --- a/chasm/lib/nexusoperation/operation_statemachine.go +++ b/chasm/lib/nexusoperation/operation_statemachine.go @@ -135,6 +135,10 @@ var TransitionStarted = chasm.NewTransition( // Store the operation token for async completion. o.OperationToken = event.OperationToken + // Emit schedule-to-start latency + metricsHandler := o.metricsHandler(ctx) + NexusOperationScheduleToStartLatency.With(metricsHandler).Record(startTime.Sub(o.GetScheduledTime().AsTime())) + // Emit a start-to-close timeout task if configured. if o.StartToCloseTimeout != nil && o.StartToCloseTimeout.AsDuration() != 0 { deadline := startTime.Add(o.StartToCloseTimeout.AsDuration()) @@ -185,6 +189,7 @@ var TransitionSucceeded = chasm.NewTransition( Successful: &nexusoperationpb.OperationOutcome_Successful{Result: event.Result}, } + o.emitOnSucceededMetrics(ctx, closeTime) // Terminal state - no tasks to emit. return nil }, @@ -212,6 +217,7 @@ var TransitionFailed = chasm.NewTransition( } // Attempts only execute in SCHEDULED, so that status identifies attempt-originated failures. fromAttempt := o.GetStatus() == nexusoperationpb.OPERATION_STATUS_SCHEDULED + o.emitOnFailedMetrics(ctx, closeTime) return o.resolveUnsuccessfully(ctx, event.Failure, closeTime, fromAttempt) }, ) @@ -236,6 +242,7 @@ var TransitionCanceled = chasm.NewTransition( if event.CompleteTime != nil { closeTime = *event.CompleteTime } + o.emitOnCanceledMetrics(ctx, closeTime) // Attempts only execute in SCHEDULED, so that status identifies attempt-originated cancels. fromAttempt := o.GetStatus() == nexusoperationpb.OPERATION_STATUS_SCHEDULED return o.resolveUnsuccessfully(ctx, event.Failure, closeTime, fromAttempt) @@ -268,11 +275,12 @@ var TransitionTerminated = chasm.NewTransition( }, }, } + o.emitOnTerminatedMetrics(ctx, closeTime) return o.resolveUnsuccessfully(ctx, failure, closeTime, false) }, ) -// EventTimedOut is triggered when the schedule-to-close timeout is triggered for an operation. +// EventTimedOut is triggered when a timeout is triggered for an operation. type EventTimedOut struct { Failure *failurepb.Failure // FromAttempt is true when the failure came from an invocation attempt. @@ -287,6 +295,9 @@ var TransitionTimedOut = chasm.NewTransition( }, nexusoperationpb.OPERATION_STATUS_TIMED_OUT, func(o *Operation, ctx chasm.MutableContext, event EventTimedOut) error { - return o.resolveUnsuccessfully(ctx, event.Failure, ctx.Now(o), event.FromAttempt) + closeTime := ctx.Now(o) + timeoutType := event.Failure.GetTimeoutFailureInfo().GetTimeoutType().String() + o.emitOnTimedOutMetrics(ctx, closeTime, timeoutType) + return o.resolveUnsuccessfully(ctx, event.Failure, closeTime, event.FromAttempt) }, ) diff --git a/chasm/lib/nexusoperation/operation_statemachine_test.go b/chasm/lib/nexusoperation/operation_statemachine_test.go index dd07b11349c..e30007d08e3 100644 --- a/chasm/lib/nexusoperation/operation_statemachine_test.go +++ b/chasm/lib/nexusoperation/operation_statemachine_test.go @@ -1,15 +1,21 @@ package nexusoperation import ( + "context" "testing" "time" "github.com/stretchr/testify/require" commonpb "go.temporal.io/api/common/v1" failurepb "go.temporal.io/api/failure/v1" + persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/chasm" "go.temporal.io/server/chasm/lib/nexusoperation/gen/nexusoperationpb/v1" "go.temporal.io/server/common/backoff" + "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/metrics/metricstest" + "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/testing/protorequire" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/durationpb" @@ -295,6 +301,14 @@ func TestTransitionStarted(t *testing.T) { ctx := &chasm.MockMutableContext{ MockContext: chasm.MockContext{ HandleNow: func(chasm.Component) time.Time { return defaultTime }, + HandleNamespaceEntry: func() *namespace.Namespace { + return namespace.NewNamespaceForTest( + &persistencespb.NamespaceInfo{Name: "ns-name"}, nil, false, nil, 0, + ) + }, + GoCtx: context.WithValue(context.Background(), OperationContextKey, &OperationContext{ + MetricTagConfig: dynamicconfig.GetTypedPropertyFn(NexusMetricTagConfig{}), + }), }, } @@ -354,14 +368,31 @@ func TestTransitionSucceeded(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { + metricsHandler := metricstest.NewCaptureHandler() + capture := metricsHandler.StartCapture() + defer metricsHandler.StopCapture(capture) + ctx := &chasm.MockMutableContext{ MockContext: chasm.MockContext{ HandleNow: func(chasm.Component) time.Time { return defaultTime }, + HandleNamespaceEntry: func() *namespace.Namespace { + return namespace.NewNamespaceForTest( + &persistencespb.NamespaceInfo{Name: "ns-name"}, nil, false, nil, 0, + ) + }, + HandleMetricsHandler: func() metrics.Handler { return metricsHandler }, + GoCtx: context.WithValue(context.Background(), OperationContextKey, &OperationContext{ + MetricTagConfig: dynamicconfig.GetTypedPropertyFn(NexusMetricTagConfig{ + IncludeServiceTag: true, + IncludeOperationTag: true, + }), + }), }, } operation := newTestOperation() operation.Status = nexusoperationpb.OPERATION_STATUS_STARTED + operation.StartedTime = operation.ScheduledTime err := TransitionSucceeded.Apply(operation, ctx, EventSucceeded{ CompleteTime: tc.completeTime, @@ -376,6 +407,38 @@ func TestTransitionSucceeded(t *testing.T) { require.NotNil(t, outcome.GetSuccessful()) protorequire.ProtoEqual(t, tc.result, outcome.GetSuccessful().GetResult()) require.Empty(t, ctx.Tasks) + + snap := capture.Snapshot() + + successCount := snap[NexusOperationSuccessCount.Name()] + require.Len(t, successCount, 1) + require.Equal(t, int64(1), successCount[0].Value) + require.Equal(t, "ns-name", successCount[0].Tags["namespace"]) + require.Equal(t, "test-endpoint", successCount[0].Tags["nexus_endpoint"]) + require.Equal(t, "test-service", successCount[0].Tags["nexus_service"]) + require.Equal(t, "test-operation", successCount[0].Tags["nexus_operation"]) + require.Equal(t, standaloneOperationWorkflowTypeName, successCount[0].Tags["workflowType"]) + + expectedLatency := tc.expectedClosedTime.Sub(operation.ScheduledTime.AsTime()) + + stcLatency := snap[NexusOperationScheduleToCloseLatency.Name()] + require.Len(t, stcLatency, 1) + require.Equal(t, expectedLatency, stcLatency[0].Value) + require.Equal(t, "succeeded", stcLatency[0].Tags["outcome"]) + + startToCloseLatency := snap[NexusOperationStartToCloseLatency.Name()] + require.Len(t, startToCloseLatency, 1) + require.Equal(t, expectedLatency, startToCloseLatency[0].Value) + require.Equal(t, "succeeded", startToCloseLatency[0].Tags["outcome"]) + + // Schedule-to-start is emitted in TransitionStarted, not from the success path + // when the operation was already started. + require.Empty(t, snap[NexusOperationScheduleToStartLatency.Name()]) + + require.Empty(t, snap[NexusOperationFailedCount.Name()]) + require.Empty(t, snap[NexusOperationCancelCount.Name()]) + require.Empty(t, snap[NexusOperationTimeoutCount.Name()]) + require.Empty(t, snap[NexusOperationTerminateCount.Name()]) }) } } @@ -430,6 +493,14 @@ func TestTransitionFailed(t *testing.T) { ctx := &chasm.MockMutableContext{ MockContext: chasm.MockContext{ HandleNow: func(chasm.Component) time.Time { return defaultTime }, + HandleNamespaceEntry: func() *namespace.Namespace { + return namespace.NewNamespaceForTest( + &persistencespb.NamespaceInfo{Name: "ns-name"}, nil, false, nil, 0, + ) + }, + GoCtx: context.WithValue(context.Background(), OperationContextKey, &OperationContext{ + MetricTagConfig: dynamicconfig.GetTypedPropertyFn(NexusMetricTagConfig{}), + }), }, } @@ -500,6 +571,14 @@ func TestTransitionCanceled(t *testing.T) { ctx := &chasm.MockMutableContext{ MockContext: chasm.MockContext{ HandleNow: func(chasm.Component) time.Time { return defaultTime }, + HandleNamespaceEntry: func() *namespace.Namespace { + return namespace.NewNamespaceForTest( + &persistencespb.NamespaceInfo{Name: "ns-name"}, nil, false, nil, 0, + ) + }, + GoCtx: context.WithValue(context.Background(), OperationContextKey, &OperationContext{ + MetricTagConfig: dynamicconfig.GetTypedPropertyFn(NexusMetricTagConfig{}), + }), }, } @@ -563,6 +642,14 @@ func TestTransitionTimedOut(t *testing.T) { ctx := &chasm.MockMutableContext{ MockContext: chasm.MockContext{ HandleNow: func(chasm.Component) time.Time { return defaultTime }, + HandleNamespaceEntry: func() *namespace.Namespace { + return namespace.NewNamespaceForTest( + &persistencespb.NamespaceInfo{Name: "ns-name"}, nil, false, nil, 0, + ) + }, + GoCtx: context.WithValue(context.Background(), OperationContextKey, &OperationContext{ + MetricTagConfig: dynamicconfig.GetTypedPropertyFn(NexusMetricTagConfig{}), + }), }, } @@ -625,6 +712,14 @@ func TestTransitionTerminated(t *testing.T) { ctx := &chasm.MockMutableContext{ MockContext: chasm.MockContext{ HandleNow: func(chasm.Component) time.Time { return defaultTime }, + HandleNamespaceEntry: func() *namespace.Namespace { + return namespace.NewNamespaceForTest( + &persistencespb.NamespaceInfo{Name: "ns-name"}, nil, false, nil, 0, + ) + }, + GoCtx: context.WithValue(context.Background(), OperationContextKey, &OperationContext{ + MetricTagConfig: dynamicconfig.GetTypedPropertyFn(NexusMetricTagConfig{}), + }), }, } operation := newTestOperation() diff --git a/chasm/lib/nexusoperation/operation_tasks_test.go b/chasm/lib/nexusoperation/operation_tasks_test.go index 9873169b39c..70266a1c2e3 100644 --- a/chasm/lib/nexusoperation/operation_tasks_test.go +++ b/chasm/lib/nexusoperation/operation_tasks_test.go @@ -50,6 +50,7 @@ type invocationTaskTestEnv struct { op *Operation mockEngine *chasm.MockEngine timeSource *clock.EventTimeSource + nsRegistry *namespace.MockRegistry } func newInvocationTaskTestEnv( @@ -70,6 +71,7 @@ func newInvocationTaskTestEnv( nsRegistry := namespace.NewMockRegistry(ctrl) nsRegistry.EXPECT().GetNamespaceByID(namespace.ID("ns-id")).Return( namespace.NewNamespaceForTest(&persistencespb.NamespaceInfo{Name: "ns-name"}, nil, false, nil, 0), nil) + nsRegistry.EXPECT().GetNamespaceName(namespace.ID("ns-id")).Return(namespace.Name("ns-name"), nil).AnyTimes() callbackTmpl, err := template.New("callback").Parse("http://localhost/callback") require.NoError(t, err) @@ -131,6 +133,7 @@ func newInvocationTaskTestEnv( op: op, mockEngine: mockEngine, timeSource: timeSource, + nsRegistry: nsRegistry, } } @@ -174,6 +177,19 @@ func (e *invocationTaskTestEnv) setupUpdateComponent() { HandleNow: func(_ chasm.Component) time.Time { return e.timeSource.Now() }, + HandleExecutionKey: func() chasm.ExecutionKey { + return chasm.ExecutionKey{ + NamespaceID: "ns-id", + BusinessID: "wf-id", + RunID: "run-id", + } + }, + HandleNamespaceEntry: func() *namespace.Namespace { + return namespace.NewNamespaceForTest(&persistencespb.NamespaceInfo{Name: "ns-name"}, nil, false, nil, 0) + }, + GoCtx: context.WithValue(context.Background(), OperationContextKey, &OperationContext{ + MetricTagConfig: dynamicconfig.GetTypedPropertyFn(NexusMetricTagConfig{}), + }), }, } err := updateFn(mockCtx, e.op) @@ -749,6 +765,15 @@ func TestScheduleToStartTimeoutTaskHandler_Execute(t *testing.T) { ctx := &chasm.MockMutableContext{ MockContext: chasm.MockContext{ HandleNow: func(chasm.Component) time.Time { return defaultTime }, + HandleExecutionKey: func() chasm.ExecutionKey { + return chasm.ExecutionKey{NamespaceID: "ns-id"} + }, + HandleNamespaceEntry: func() *namespace.Namespace { + return namespace.NewNamespaceForTest(&persistencespb.NamespaceInfo{Name: "ns-name"}, nil, false, nil, 0) + }, + GoCtx: context.WithValue(context.Background(), OperationContextKey, &OperationContext{ + MetricTagConfig: dynamicconfig.GetTypedPropertyFn(NexusMetricTagConfig{}), + }), }, } @@ -815,6 +840,15 @@ func TestStartToCloseTimeoutTaskHandler_Execute(t *testing.T) { ctx := &chasm.MockMutableContext{ MockContext: chasm.MockContext{ HandleNow: func(chasm.Component) time.Time { return defaultTime }, + HandleExecutionKey: func() chasm.ExecutionKey { + return chasm.ExecutionKey{NamespaceID: "ns-id"} + }, + HandleNamespaceEntry: func() *namespace.Namespace { + return namespace.NewNamespaceForTest(&persistencespb.NamespaceInfo{Name: "ns-name"}, nil, false, nil, 0) + }, + GoCtx: context.WithValue(context.Background(), OperationContextKey, &OperationContext{ + MetricTagConfig: dynamicconfig.GetTypedPropertyFn(NexusMetricTagConfig{}), + }), }, } @@ -876,6 +910,15 @@ func TestScheduleToCloseTimeoutTaskHandler_Execute(t *testing.T) { ctx := &chasm.MockMutableContext{ MockContext: chasm.MockContext{ HandleNow: func(chasm.Component) time.Time { return defaultTime }, + HandleExecutionKey: func() chasm.ExecutionKey { + return chasm.ExecutionKey{NamespaceID: "ns-id"} + }, + HandleNamespaceEntry: func() *namespace.Namespace { + return namespace.NewNamespaceForTest(&persistencespb.NamespaceInfo{Name: "ns-name"}, nil, false, nil, 0) + }, + GoCtx: context.WithValue(context.Background(), OperationContextKey, &OperationContext{ + MetricTagConfig: dynamicconfig.GetTypedPropertyFn(NexusMetricTagConfig{}), + }), }, } diff --git a/chasm/lib/nexusoperation/operation_test.go b/chasm/lib/nexusoperation/operation_test.go index 8cf690d6d5f..9e65ceb4de9 100644 --- a/chasm/lib/nexusoperation/operation_test.go +++ b/chasm/lib/nexusoperation/operation_test.go @@ -1,6 +1,7 @@ package nexusoperation import ( + "context" "slices" "testing" "time" @@ -12,9 +13,12 @@ import ( persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/chasm" nexusoperationpb "go.temporal.io/server/chasm/lib/nexusoperation/gen/nexusoperationpb/v1" + "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/payload" "go.temporal.io/server/common/testing/protorequire" "go.temporal.io/server/common/testing/protoutils" + "go.uber.org/mock/gomock" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -110,10 +114,23 @@ func TestHandleNexusCompletion(t *testing.T) { require.NoError(t, TransitionStarted.Apply(op, ctx, EventStarted{OperationToken: "tok"})) return op } + ctrl := gomock.NewController(t) + nsRegistry := namespace.NewMockRegistry(ctrl) + nsRegistry.EXPECT().GetNamespaceName(namespace.ID("ns-id")).Return(namespace.Name("ns-name"), nil).AnyTimes() + newCtx := func() *chasm.MockMutableContext { return &chasm.MockMutableContext{ MockContext: chasm.MockContext{ HandleNow: func(chasm.Component) time.Time { return defaultTime }, + HandleExecutionKey: func() chasm.ExecutionKey { + return chasm.ExecutionKey{NamespaceID: "ns-id"} + }, + HandleNamespaceEntry: func() *namespace.Namespace { + return namespace.NewNamespaceForTest(&persistencespb.NamespaceInfo{Name: "ns-name"}, nil, false, nil, 0) + }, + GoCtx: context.WithValue(context.Background(), OperationContextKey, &OperationContext{ + MetricTagConfig: dynamicconfig.GetTypedPropertyFn(NexusMetricTagConfig{}), + }), }, } } diff --git a/chasm/lib/nexusoperation/tasks_test_helpers_test.go b/chasm/lib/nexusoperation/tasks_test_helpers_test.go index b8da24a6c06..9e551028b28 100644 --- a/chasm/lib/nexusoperation/tasks_test_helpers_test.go +++ b/chasm/lib/nexusoperation/tasks_test_helpers_test.go @@ -108,6 +108,10 @@ func (m *mockStoreComponent) OnNexusOperationCancellationFailed(ctx chasm.Mutabl return TransitionCancellationFailed.Apply(cancellation, ctx, EventCancellationFailed{Failure: cause}) } +func (m *mockStoreComponent) WorkflowTypeName() string { + return "" +} + // mockStoreLibrary registers the mockStoreComponent so the CHASM tree can work with it. type mockStoreLibrary struct { chasm.UnimplementedLibrary diff --git a/chasm/lib/nexusoperation/workflow/commands_test.go b/chasm/lib/nexusoperation/workflow/commands_test.go index c73ebda7224..da2a56ede83 100644 --- a/chasm/lib/nexusoperation/workflow/commands_test.go +++ b/chasm/lib/nexusoperation/workflow/commands_test.go @@ -110,6 +110,12 @@ func newTestContext(t *testing.T, cfg *nexusoperation.Config) testContext { HandleEndpointByName: func(name string) (*persistencespb.NexusEndpointEntry, error) { return endpointReg.GetByName(context.Background(), tests.GlobalNamespaceEntry.ID(), name) }, + HandleExecutionKey: func() chasm.ExecutionKey { + return chasm.ExecutionKey{ + NamespaceID: tests.GlobalNamespaceEntry.ID().String(), + } + }, + GoCtx: context.WithValue(context.Background(), nexusoperation.OperationContextKey, &nexusoperation.OperationContext{MetricTagConfig: dynamicconfig.GetTypedPropertyFn(nexusoperation.NexusMetricTagConfig{})}), }, }, chReg) diff --git a/chasm/lib/nexusoperation/workflow/events.go b/chasm/lib/nexusoperation/workflow/events.go index 2483e57cff3..4fbd84fe59f 100644 --- a/chasm/lib/nexusoperation/workflow/events.go +++ b/chasm/lib/nexusoperation/workflow/events.go @@ -218,8 +218,8 @@ func (d CompletedEventDefinition) Apply(ctx chasm.MutableContext, wf *chasmworkf completeTime := event.GetEventTime().AsTime() if err := nexusoperation.TransitionSucceeded.Apply(op, ctx, nexusoperation.EventSucceeded{ - Result: attrs.GetResult(), CompleteTime: &completeTime, + Result: attrs.GetResult(), }); err != nil { return err } diff --git a/chasm/lib/workflow/workflow.go b/chasm/lib/workflow/workflow.go index 61593615124..fd4b189dc0d 100644 --- a/chasm/lib/workflow/workflow.go +++ b/chasm/lib/workflow/workflow.go @@ -432,6 +432,10 @@ func (w *Workflow) NexusOperationInvocationData( }, nil } +func (w *Workflow) WorkflowTypeName() string { + return w.GetWorkflowTypeName() +} + func (w *Workflow) GetNexusCompletion( ctx chasm.Context, requestID string, diff --git a/chasm/ms_pointer.go b/chasm/ms_pointer.go index 41ad04033c1..f46cfad2329 100644 --- a/chasm/ms_pointer.go +++ b/chasm/ms_pointer.go @@ -47,3 +47,8 @@ func (m MSPointer) LoadHistoryEvent(ctx Context, token []byte) (*historypb.Histo func (m MSPointer) GetNexusCompletion(ctx Context, requestID string) (nexusrpc.CompleteOperationOptions, error) { return m.backend.GetNexusCompletion(ctx.goContext(), requestID) } + +// GetWorkflowTypeName retrieves the workflow type name from the underlying mutable state. +func (m MSPointer) GetWorkflowTypeName() string { + return m.backend.GetExecutionInfo().GetWorkflowTypeName() +} diff --git a/components/nexusoperations/config.go b/components/nexusoperations/config.go index 722b8c23ec6..686f18b3067 100644 --- a/components/nexusoperations/config.go +++ b/components/nexusoperations/config.go @@ -147,9 +147,9 @@ var MetricTagConfiguration = dynamicconfig.NewGlobalTypedSetting( "component.nexusoperations.metrics.tags", chasmnexus.NexusMetricTagConfig{}, `Controls which metric tags are included with Nexus operation metrics. This configuration supports: -1. Service name tag - adds the Nexus service name as a metric dimension (IncludeServiceTag) -2. Operation name tag - adds the Nexus operation name as a metric dimension (IncludeOperationTag) -3. Header-based tags - maps values from request headers to metric tags (HeaderTagMappings) +1. Service name tag - adds the Nexus service name as a metric dimension (IncludeServiceTag). Used by callers and handlers. +2. Operation name tag - adds the Nexus operation name as a metric dimension (IncludeOperationTag). Used by callers and handlers. +3. Header-based tags - maps values from request headers to metric tags (HeaderTagMappings). Only used by handlers. Note: default metric tags (like namespace, endpoint) are always included and not affected by this configuration. Adding high-cardinality tags (like unique operation names) can significantly increase metric storage