Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions chasm/lib/nexusoperation/library.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,32 @@ package nexusoperation
import (
"go.temporal.io/server/chasm"
nexusoperationpb "go.temporal.io/server/chasm/lib/nexusoperation/gen/nexusoperationpb/v1"
"go.temporal.io/server/common/dynamicconfig"
"google.golang.org/grpc"
)

type operationContextKeyType struct{}

// OperationContextKey is the context key for OperationContext, registered as a CHASM component
// context value. Exported for use in tests that need to set up MockContext.
var OperationContextKey = operationContextKeyType{}

// OperationContext holds dependencies injected into the chasm.Context for use by Operation methods.
type OperationContext struct {
MetricTagConfig dynamicconfig.TypedPropertyFn[NexusMetricTagConfig]
}

// componentOnlyLibrary registers just the components without task executors or gRPC handlers.
// Used in the frontend to enable component ref serialization.
type componentOnlyLibrary struct {
chasm.UnimplementedLibrary
metricTagConfig dynamicconfig.TypedPropertyFn[NexusMetricTagConfig]
}

func newComponentOnlyLibrary() *componentOnlyLibrary {
return &componentOnlyLibrary{}
func newComponentOnlyLibrary(dc *dynamicconfig.Collection) *componentOnlyLibrary {
return &componentOnlyLibrary{
metricTagConfig: MetricTagConfiguration.Get(dc),
}
}

func (l *componentOnlyLibrary) Name() string {
Expand All @@ -32,6 +47,11 @@ func (l *componentOnlyLibrary) Components() []*chasm.RegistrableComponent {
StatusSearchAttribute,
),
chasm.WithBusinessIDAlias("OperationId"),
chasm.WithContextValues(map[any]any{
OperationContextKey: &OperationContext{
MetricTagConfig: l.metricTagConfig,
},
}),
),
chasm.NewRegistrableComponent[*Cancellation]("cancellation"),
}
Expand Down Expand Up @@ -61,8 +81,10 @@ func newLibrary(
operationStartToCloseTimeoutTaskHandler *operationStartToCloseTimeoutTaskHandler,
cancellationInvocationTaskHandler *cancellationInvocationTaskHandler,
cancellationBackoffTaskHandler *cancellationBackoffTaskHandler,
dc *dynamicconfig.Collection,
) *Library {
return &Library{
componentOnlyLibrary: *newComponentOnlyLibrary(dc),
handler: handler,
operationBackoffTaskHandler: operationBackoffTaskHandler,
operationInvocationTaskHandler: operationInvocationTaskHandler,
Expand Down
39 changes: 36 additions & 3 deletions chasm/lib/nexusoperation/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,46 @@ var OutboundRequestLatency = metrics.NewTimerDef(
"nexus_outbound_latency",
metrics.WithDescription("Latency of outbound Nexus requests made by the history service."),
)
var NexusOperationSuccessCount = metrics.NewCounterDef(
"nexus_operation_success",
metrics.WithDescription("Nexus Operations successfully completed."),
)
var NexusOperationFailedCount = metrics.NewCounterDef(
"nexus_operation_fail",
metrics.WithDescription("Nexus Operations failures."),
)
var NexusOperationCancelCount = metrics.NewCounterDef(
"nexus_operation_cancel",
metrics.WithDescription("Nexus Operations cancellations."),
)
var NexusOperationTerminateCount = metrics.NewCounterDef(
"nexus_operation_terminate",
metrics.WithDescription("Nexus Operations that were terminated before completion."),
)
var NexusOperationTimeoutCount = metrics.NewCounterDef(
"nexus_operation_timeout",
metrics.WithDescription("Nexus Operations that timed out before completion."),
)

var NexusOperationScheduleToCloseLatency = metrics.NewTimerDef(
"nexus_operation_schedule_to_close_latency",
metrics.WithDescription("Duration from Nexus Operation scheduled time to terminal state."),
)
var NexusOperationScheduleToStartLatency = metrics.NewTimerDef(
"nexus_operation_schedule_to_start_latency",
metrics.WithDescription("Duration from Nexus Operation scheduled time to started time."),
)
var NexusOperationStartToCloseLatency = metrics.NewTimerDef(
"nexus_operation_start_to_close_latency",
metrics.WithDescription("Duration from Nexus Operation started time to completed time. Only emitted for async operations."),
)
Copy link
Copy Markdown
Contributor Author

@S15 S15 Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bergundy I think I've addressed everything mentioned in the previous version of this PR, except I haven't done anything with the names here.

#10026 (comment)

I agree, I don't really care for the names here, but I think it is good to be consistent with activities.

I double checked, and and activity uses:

activity_success
activity_fail
activity_cancel
activity_terminate
activity_timeout

and I have matched those.


type NexusMetricTagConfig struct {
// Include service name as a metric tag
// Include service name as a metric tag. Used for caller and handler metrics.
IncludeServiceTag bool
// Include operation name as a metric tag
// Include operation name as a metric tag. Used for caller and handler metrics.
IncludeOperationTag bool
// Configuration for mapping request headers to metric tags
// Configuration for mapping request headers to metric tags. Only used for handler metrics.
HeaderTagMappings []NexusHeaderTagMapping
}

Expand Down
108 changes: 107 additions & 1 deletion chasm/lib/nexusoperation/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package nexusoperation

import (
"fmt"
"strings"
"time"

"github.com/google/uuid"
Expand All @@ -16,6 +17,7 @@ import (
"go.temporal.io/server/chasm"
nexusoperationpb "go.temporal.io/server/chasm/lib/nexusoperation/gen/nexusoperationpb/v1"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/metrics"
commonnexus "go.temporal.io/server/common/nexus"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/softassert"
Expand Down Expand Up @@ -44,6 +46,13 @@ var ErrCancellationAlreadyRequested = serviceerror.NewFailedPrecondition("cancel
// ErrOperationAlreadyCompleted is returned when trying to cancel an operation that has already completed.
var ErrOperationAlreadyCompleted = serviceerror.NewFailedPrecondition("operation already completed")

const (
// standaloneOperationWorkflowTypeName is the workflow type for tagging standalone operations.
// Used as the WorkflowTypeTag in metrics emitted from standalone operations.
// Do not change. It is exposed in metrics.
standaloneOperationWorkflowTypeName = "__temporal_standalone_nexus_operation__"
)

// InvocationData contains data needed to invoke a Nexus operation.
type InvocationData struct {
// Input is the operation input payload.
Expand All @@ -69,6 +78,7 @@ type OperationStore interface {
OnNexusOperationCancellationFailed(ctx chasm.MutableContext, operation *Operation, cause *failurepb.Failure) error
// NexusOperationInvocationData loads invocation data (Input, Header, NexusLinks) from the scheduled history event.
NexusOperationInvocationData(ctx chasm.Context, operation *Operation) (InvocationData, error)
WorkflowTypeName() string
}

// Operation is a CHASM component that represents a Nexus operation.
Expand Down Expand Up @@ -398,7 +408,10 @@ func (o *Operation) Terminate(
}
return chasm.TerminateComponentResponse{}, nil
}
return chasm.TerminateComponentResponse{}, TransitionTerminated.Apply(o, ctx, EventTerminated{TerminateComponentRequest: req})

return chasm.TerminateComponentResponse{}, TransitionTerminated.Apply(o, ctx, EventTerminated{
TerminateComponentRequest: req,
})
}

func (o *Operation) SearchAttributes(_ chasm.Context) []chasm.SearchAttributeKeyValue {
Expand Down Expand Up @@ -551,6 +564,99 @@ func (o *Operation) buildExecutionInfo(ctx chasm.Context) *nexuspb.NexusOperatio
return info
}

// metricsHandler returns a metrics handler enriched with nexus operation tags.
func (o *Operation) metricsHandler(ctx chasm.Context) metrics.Handler {
namespaceName := ctx.NamespaceEntry().Name().String()

wftt := standaloneOperationWorkflowTypeName
if store, ok := o.Store.TryGet(ctx); ok {
wftt = store.WorkflowTypeName()
}
tags := []metrics.Tag{
metrics.NamespaceTag(namespaceName),
metrics.NexusEndpointTag(o.GetEndpoint()),
metrics.WorkflowTypeTag(wftt),
}

// nolint:revive // unchecked-type-assertion: intentional panic on missing context value
opCtx, ok := ctx.Value(OperationContextKey).(*OperationContext)
if !ok {
softassert.Fail(ctx.Logger(), "operation context missing")
} else {
conf := opCtx.MetricTagConfig()
if conf.IncludeServiceTag {
tags = append(tags, metrics.NexusServiceTag(o.GetService()))
}
if conf.IncludeOperationTag {
tags = append(tags, metrics.NexusOperationTag(o.GetOperation()))
}
}

return ctx.MetricsHandler().WithTags(tags...)
}

func (o *Operation) emitOnSucceededMetrics(ctx chasm.Context, closeTime time.Time) {
outcomeTag := metrics.OutcomeTag(
strings.ToLower(nexusoperationpb.OPERATION_STATUS_SUCCEEDED.String()),
)
handler := o.metricsHandler(ctx)
NexusOperationSuccessCount.With(handler).Record(1)
o.emitLatencyMetrics(handler, closeTime, outcomeTag)
}

func (o *Operation) emitOnFailedMetrics(ctx chasm.Context, closeTime time.Time) {
outcomeTag := metrics.OutcomeTag(
strings.ToLower(nexusoperationpb.OPERATION_STATUS_FAILED.String()),
)
handler := o.metricsHandler(ctx)
NexusOperationFailedCount.With(handler).Record(1)
o.emitLatencyMetrics(handler, closeTime, outcomeTag)
}

func (o *Operation) emitOnCanceledMetrics(ctx chasm.Context, closeTime time.Time) {
outcomeTag := metrics.OutcomeTag(
strings.ToLower(nexusoperationpb.OPERATION_STATUS_CANCELED.String()),
)
handler := o.metricsHandler(ctx)
NexusOperationCancelCount.With(handler).Record(1)
o.emitLatencyMetrics(handler, closeTime, outcomeTag)
}

func (o *Operation) emitOnTimedOutMetrics(ctx chasm.Context, closeTime time.Time, timeoutType string) {
outcomeTag := metrics.OutcomeTag(
strings.ToLower(nexusoperationpb.OPERATION_STATUS_TIMED_OUT.String()),
)
handler := o.metricsHandler(ctx)
NexusOperationTimeoutCount.With(handler).Record(1, metrics.StringTag("timeout_type", timeoutType))
o.emitLatencyMetrics(handler, closeTime, outcomeTag)
}

func (o *Operation) emitOnTerminatedMetrics(ctx chasm.Context, closeTime time.Time) {
outcomeTag := metrics.OutcomeTag(
strings.ToLower(nexusoperationpb.OPERATION_STATUS_TERMINATED.String()),
)
handler := o.metricsHandler(ctx)
NexusOperationTerminateCount.With(handler).Record(1)
o.emitLatencyMetrics(handler, closeTime, outcomeTag)
}

// emitLatencyMetrics emits schedule-to-close, schedule-to-start, and start-to-close latencies.
func (o *Operation) emitLatencyMetrics(handler metrics.Handler, closeTime time.Time, outcomeTag metrics.Tag) {
scheduledTime := o.GetScheduledTime().AsTime()
NexusOperationScheduleToCloseLatency.With(handler).Record(closeTime.Sub(scheduledTime), outcomeTag)

startedTime := o.GetStartedTime()
if startedTime != nil {
// Async operation that was started.
// Schedule-to-start latency is emitted in TransitionStarted.
NexusOperationStartToCloseLatency.With(handler).Record(closeTime.Sub(startedTime.AsTime()), outcomeTag)
} else {
// Sync operation or operation that never started.
// For sync ops, schedule-to-start equals schedule-to-close.
NexusOperationScheduleToStartLatency.With(handler).Record(closeTime.Sub(scheduledTime), outcomeTag)
}
}

func (o *Operation) closeTime(ctx chasm.Context) *timestamppb.Timestamp {
if !o.LifecycleState(ctx).IsClosed() {
return nil
Expand Down
15 changes: 13 additions & 2 deletions chasm/lib/nexusoperation/operation_statemachine.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ var TransitionStarted = chasm.NewTransition(
// Store the operation token for async completion.
o.OperationToken = event.OperationToken

// Emit schedule-to-start latency
metricsHandler := o.metricsHandler(ctx)
NexusOperationScheduleToStartLatency.With(metricsHandler).Record(startTime.Sub(o.GetScheduledTime().AsTime()))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to wrap my head around the fact that emitLatencyMetrics also emits NexusOperationScheduleToStartLatency. Wouldn't be be double-counting?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

emitLatencyMetrics only emits NexusOperationScheduleToStartLatency if startedTime is nil, which implies we never transitioned to started. As I Understand, that happens for sync operations (it could also happen I think w/ completion before start, I think)


// Emit a start-to-close timeout task if configured.
if o.StartToCloseTimeout != nil && o.StartToCloseTimeout.AsDuration() != 0 {
deadline := startTime.Add(o.StartToCloseTimeout.AsDuration())
Expand Down Expand Up @@ -185,6 +189,7 @@ var TransitionSucceeded = chasm.NewTransition(
Successful: &nexusoperationpb.OperationOutcome_Successful{Result: event.Result},
}

o.emitOnSucceededMetrics(ctx, closeTime)
// Terminal state - no tasks to emit.
return nil
},
Expand Down Expand Up @@ -212,6 +217,7 @@ var TransitionFailed = chasm.NewTransition(
}
// Attempts only execute in SCHEDULED, so that status identifies attempt-originated failures.
fromAttempt := o.GetStatus() == nexusoperationpb.OPERATION_STATUS_SCHEDULED
o.emitOnFailedMetrics(ctx, closeTime)
return o.resolveUnsuccessfully(ctx, event.Failure, closeTime, fromAttempt)
},
)
Expand All @@ -236,6 +242,7 @@ var TransitionCanceled = chasm.NewTransition(
if event.CompleteTime != nil {
closeTime = *event.CompleteTime
}
o.emitOnCanceledMetrics(ctx, closeTime)
// Attempts only execute in SCHEDULED, so that status identifies attempt-originated cancels.
Comment thread
S15 marked this conversation as resolved.
fromAttempt := o.GetStatus() == nexusoperationpb.OPERATION_STATUS_SCHEDULED
return o.resolveUnsuccessfully(ctx, event.Failure, closeTime, fromAttempt)
Expand Down Expand Up @@ -268,11 +275,12 @@ var TransitionTerminated = chasm.NewTransition(
},
},
}
o.emitOnTerminatedMetrics(ctx, closeTime)
return o.resolveUnsuccessfully(ctx, failure, closeTime, false)
},
)

// EventTimedOut is triggered when the schedule-to-close timeout is triggered for an operation.
// EventTimedOut is triggered when a timeout is triggered for an operation.
type EventTimedOut struct {
Failure *failurepb.Failure
// FromAttempt is true when the failure came from an invocation attempt.
Expand All @@ -287,6 +295,9 @@ var TransitionTimedOut = chasm.NewTransition(
},
nexusoperationpb.OPERATION_STATUS_TIMED_OUT,
func(o *Operation, ctx chasm.MutableContext, event EventTimedOut) error {
return o.resolveUnsuccessfully(ctx, event.Failure, ctx.Now(o), event.FromAttempt)
closeTime := ctx.Now(o)
timeoutType := event.Failure.GetTimeoutFailureInfo().GetTimeoutType().String()
o.emitOnTimedOutMetrics(ctx, closeTime, timeoutType)
return o.resolveUnsuccessfully(ctx, event.Failure, closeTime, event.FromAttempt)
},
)
Loading
Loading