diff --git a/packages/client/src/workflow-client.ts b/packages/client/src/workflow-client.ts index a609c352c..3d0973aae 100644 --- a/packages/client/src/workflow-client.ts +++ b/packages/client/src/workflow-client.ts @@ -23,7 +23,9 @@ import { encodeWorkflowIdConflictPolicy, WorkflowIdConflictPolicy, compilePriority, + LoadedDataConverter, } from '@temporalio/common'; +import { withSerializationContext } from '@temporalio/common/lib/converter/serialization-context'; import { encodeUserMetadata } from '@temporalio/common/lib/internal-non-workflow/codec-helpers'; import { encodeUnifiedSearchAttributes } from '@temporalio/common/lib/converter/payload-search-attributes'; import { composeInterceptors } from '@temporalio/common/lib/interceptors'; @@ -527,6 +529,14 @@ export class WorkflowClient extends BaseClient { return this.connection.workflowService; } + protected dataConverterWithWorkflowContext(workflowId: string): LoadedDataConverter { + return withSerializationContext(this.dataConverter, { + type: 'workflow', + namespace: this.options.namespace, + workflowId, + }); + } + protected async _start( workflowTypeOrFunc: string | T, options: WorkflowStartOptions, @@ -793,6 +803,7 @@ export class WorkflowClient extends BaseClient { runId?: string, opts?: WorkflowResultOptions ): Promise> { + const dataConverter = this.dataConverterWithWorkflowContext(workflowId); const followRuns = opts?.followRuns ?? true; const execution: temporal.api.common.v1.IWorkflowExecution = { workflowId, runId }; const req: GetWorkflowExecutionHistoryRequest = { @@ -832,7 +843,7 @@ export class WorkflowClient extends BaseClient { // Note that we can only return one value from our workflow function in JS. // Ignore any other payloads in result const [result] = await decodeArrayFromPayloads( - this.dataConverter, + dataConverter, ev.workflowExecutionCompletedEventAttributes.result?.payloads ); return result as any; @@ -845,16 +856,13 @@ export class WorkflowClient extends BaseClient { const { failure, retryState } = ev.workflowExecutionFailedEventAttributes; throw new WorkflowFailedError( 'Workflow execution failed', - await decodeOptionalFailureToOptionalError(this.dataConverter, failure), + await decodeOptionalFailureToOptionalError(dataConverter, failure), decodeRetryState(retryState) ); } else if (ev.workflowExecutionCanceledEventAttributes) { const failure = new CancelledFailure( 'Workflow canceled', - await decodeArrayFromPayloads( - this.dataConverter, - ev.workflowExecutionCanceledEventAttributes.details?.payloads - ) + await decodeArrayFromPayloads(dataConverter, ev.workflowExecutionCanceledEventAttributes.details?.payloads) ); failure.stack = ''; throw new WorkflowFailedError('Workflow execution cancelled', failure, RetryState.NON_RETRYABLE_FAILURE); @@ -941,13 +949,14 @@ export class WorkflowClient extends BaseClient { * Used as the final function of the query interceptor chain */ protected async _queryWorkflowHandler(input: WorkflowQueryInput): Promise { + const dataConverter = this.dataConverterWithWorkflowContext(input.workflowExecution.workflowId!); const req: temporal.api.workflowservice.v1.IQueryWorkflowRequest = { queryRejectCondition: input.queryRejectCondition, namespace: this.options.namespace, execution: input.workflowExecution, query: { queryType: input.queryType, - queryArgs: { payloads: await encodeToPayloads(this.dataConverter, ...input.args) }, + queryArgs: { payloads: await encodeToPayloads(dataConverter, ...input.args) }, header: { fields: input.headers }, }, }; @@ -973,13 +982,14 @@ export class WorkflowClient extends BaseClient { throw new TypeError('Invalid response from server'); } // We ignore anything but the first result - return await decodeFromPayloadsAtIndex(this.dataConverter, 0, response.queryResult?.payloads); + return await decodeFromPayloadsAtIndex(dataConverter, 0, response.queryResult?.payloads); } protected async _createUpdateWorkflowRequest( lifecycleStage: temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage, input: WorkflowStartUpdateInput ): Promise { + const dataConverter = this.dataConverterWithWorkflowContext(input.workflowExecution.workflowId!); const updateId = input.options?.updateId ?? uuid4(); return { namespace: this.options.namespace, @@ -996,7 +1006,7 @@ export class WorkflowClient extends BaseClient { input: { header: { fields: input.headers }, name: input.updateName, - args: { payloads: await encodeToPayloads(this.dataConverter, ...input.args) }, + args: { payloads: await encodeToPayloads(dataConverter, ...input.args) }, }, }, }; @@ -1140,6 +1150,7 @@ export class WorkflowClient extends BaseClient { workflowRunId?: string, outcome?: temporal.api.update.v1.IOutcome ): WorkflowUpdateHandle { + const dataConverter = this.dataConverterWithWorkflowContext(workflowId); return { updateId, workflowId, @@ -1150,10 +1161,10 @@ export class WorkflowClient extends BaseClient { if (completedOutcome.failure) { throw new WorkflowUpdateFailedError( 'Workflow Update failed', - await decodeOptionalFailureToOptionalError(this.dataConverter, completedOutcome.failure) + await decodeOptionalFailureToOptionalError(dataConverter, completedOutcome.failure) ); } else { - return await decodeFromPayloadsAtIndex(this.dataConverter, 0, completedOutcome.success?.payloads); + return await decodeFromPayloadsAtIndex(dataConverter, 0, completedOutcome.success?.payloads); } }, }; @@ -1194,6 +1205,7 @@ export class WorkflowClient extends BaseClient { * Used as the final function of the signal interceptor chain */ protected async _signalWorkflowHandler(input: WorkflowSignalInput): Promise { + const dataConverter = this.dataConverterWithWorkflowContext(input.workflowExecution.workflowId!); const req: temporal.api.workflowservice.v1.ISignalWorkflowExecutionRequest = { identity: this.options.identity, namespace: this.options.namespace, @@ -1202,7 +1214,7 @@ export class WorkflowClient extends BaseClient { // control is unused, signalName: input.signalName, header: { fields: input.headers }, - input: { payloads: await encodeToPayloads(this.dataConverter, ...input.args) }, + input: { payloads: await encodeToPayloads(dataConverter, ...input.args) }, }; try { await this.workflowService.signalWorkflowExecution(req); @@ -1219,6 +1231,7 @@ export class WorkflowClient extends BaseClient { protected async _signalWithStartWorkflowHandler(input: WorkflowSignalWithStartInput): Promise { const { identity } = this.options; const { options, workflowType, signalName, signalArgs, headers } = input; + const dataConverter = this.dataConverterWithWorkflowContext(options.workflowId); const req: temporal.api.workflowservice.v1.ISignalWithStartWorkflowExecutionRequest = { namespace: this.options.namespace, identity, @@ -1227,9 +1240,9 @@ export class WorkflowClient extends BaseClient { workflowIdReusePolicy: encodeWorkflowIdReusePolicy(options.workflowIdReusePolicy), workflowIdConflictPolicy: encodeWorkflowIdConflictPolicy(options.workflowIdConflictPolicy), workflowType: { name: workflowType }, - input: { payloads: await encodeToPayloads(this.dataConverter, ...options.args) }, + input: { payloads: await encodeToPayloads(dataConverter, ...options.args) }, signalName, - signalInput: { payloads: await encodeToPayloads(this.dataConverter, ...signalArgs) }, + signalInput: { payloads: await encodeToPayloads(dataConverter, ...signalArgs) }, taskQueue: { kind: temporal.api.enums.v1.TaskQueueKind.TASK_QUEUE_KIND_NORMAL, name: options.taskQueue, @@ -1239,7 +1252,7 @@ export class WorkflowClient extends BaseClient { workflowTaskTimeout: options.workflowTaskTimeout, workflowStartDelay: options.startDelay, retryPolicy: options.retry ? compileRetryPolicy(options.retry) : undefined, - memo: options.memo ? { fields: await encodeMapToPayloads(this.dataConverter, options.memo) } : undefined, + memo: options.memo ? { fields: await encodeMapToPayloads(dataConverter, options.memo) } : undefined, searchAttributes: options.searchAttributes || options.typedSearchAttributes // eslint-disable-line @typescript-eslint/no-deprecated ? { @@ -1248,7 +1261,7 @@ export class WorkflowClient extends BaseClient { : undefined, cronSchedule: options.cronSchedule, header: { fields: headers }, - userMetadata: await encodeUserMetadata(this.dataConverter, options.staticSummary, options.staticDetails), + userMetadata: await encodeUserMetadata(dataConverter, options.staticSummary, options.staticDetails), priority: options.priority ? compilePriority(options.priority) : undefined, versioningOverride: options.versioningOverride ?? undefined, }; @@ -1299,6 +1312,7 @@ export class WorkflowClient extends BaseClient { protected async createStartWorkflowRequest(input: WorkflowStartInput): Promise { const { options: opts, workflowType, headers } = input; const { identity, namespace } = this.options; + const dataConverter = this.dataConverterWithWorkflowContext(opts.workflowId); const internalOptions = (opts as InternalWorkflowStartOptions)[InternalWorkflowStartOptionsSymbol]; const supportsEagerStart = (this.connection as InternalConnectionLike)?.[InternalConnectionLikeSymbol] ?.supportsEagerStart; @@ -1318,7 +1332,7 @@ export class WorkflowClient extends BaseClient { workflowIdReusePolicy: encodeWorkflowIdReusePolicy(opts.workflowIdReusePolicy), workflowIdConflictPolicy: encodeWorkflowIdConflictPolicy(opts.workflowIdConflictPolicy), workflowType: { name: workflowType }, - input: { payloads: await encodeToPayloads(this.dataConverter, ...opts.args) }, + input: { payloads: await encodeToPayloads(dataConverter, ...opts.args) }, taskQueue: { kind: temporal.api.enums.v1.TaskQueueKind.TASK_QUEUE_KIND_NORMAL, name: opts.taskQueue, @@ -1328,7 +1342,7 @@ export class WorkflowClient extends BaseClient { workflowTaskTimeout: opts.workflowTaskTimeout, workflowStartDelay: opts.startDelay, retryPolicy: opts.retry ? compileRetryPolicy(opts.retry) : undefined, - memo: opts.memo ? { fields: await encodeMapToPayloads(this.dataConverter, opts.memo) } : undefined, + memo: opts.memo ? { fields: await encodeMapToPayloads(dataConverter, opts.memo) } : undefined, searchAttributes: opts.searchAttributes || opts.typedSearchAttributes // eslint-disable-line @typescript-eslint/no-deprecated ? { @@ -1337,7 +1351,7 @@ export class WorkflowClient extends BaseClient { : undefined, cronSchedule: opts.cronSchedule, header: { fields: headers }, - userMetadata: await encodeUserMetadata(this.dataConverter, opts.staticSummary, opts.staticDetails), + userMetadata: await encodeUserMetadata(dataConverter, opts.staticSummary, opts.staticDetails), priority: opts.priority ? compilePriority(opts.priority) : undefined, versioningOverride: opts.versioningOverride ?? undefined, requestEagerExecution: opts.requestEagerStart, @@ -1353,12 +1367,13 @@ export class WorkflowClient extends BaseClient { protected async _terminateWorkflowHandler( input: WorkflowTerminateInput ): Promise { + const dataConverter = this.dataConverterWithWorkflowContext(input.workflowExecution.workflowId!); const req: temporal.api.workflowservice.v1.ITerminateWorkflowExecutionRequest = { namespace: this.options.namespace, identity: this.options.identity, ...input, details: { - payloads: input.details ? await encodeToPayloads(this.dataConverter, ...input.details) : undefined, + payloads: input.details ? await encodeToPayloads(dataConverter, ...input.details) : undefined, }, firstExecutionRunId: input.firstExecutionRunId, }; diff --git a/packages/common/src/converter/failure-converter.ts b/packages/common/src/converter/failure-converter.ts index 21138262c..c26c83ce1 100644 --- a/packages/common/src/converter/failure-converter.ts +++ b/packages/common/src/converter/failure-converter.ts @@ -25,6 +25,7 @@ import { isError } from '../type-helpers'; import { msOptionalToTs } from '../time'; import { encode } from '../encoding'; import { arrayFromPayloads, fromPayloadsAtIndex, PayloadConverter, toPayloads } from './payload-converter'; +import type { SerializationContext } from './serialization-context'; // Can't import proto enums into the workflow sandbox, use this helper type and enum converter instead. const NexusHandlerErrorRetryBehavior = { @@ -109,6 +110,17 @@ export interface FailureConverter { * The returned error must be an instance of `TemporalFailure`. */ failureToError(err: ProtoFailure, payloadConverter: PayloadConverter): Error; + + /** + * Optionally return a converter bound to the current serialization context. + * + * The SDK may call this for individual failure conversion operations across + * different workflows and activities. Implementations should treat this as + * side-effect free and may return `this` when no rebinding is needed. + * + * @experimental Serialization context is an experimental feature and may change. + */ + withContext?(context: SerializationContext): FailureConverter; } /** diff --git a/packages/common/src/converter/payload-converter.ts b/packages/common/src/converter/payload-converter.ts index ba8b7dce7..851630ce5 100644 --- a/packages/common/src/converter/payload-converter.ts +++ b/packages/common/src/converter/payload-converter.ts @@ -1,6 +1,7 @@ import { decode, encode } from '../encoding'; import { PayloadConverterError, ValueError } from '../errors'; import { Payload } from '../interfaces'; +import type { SerializationContext } from './serialization-context'; import { encodingKeys, encodingTypes, METADATA_ENCODING_KEY } from './types'; /** @@ -25,6 +26,17 @@ export interface PayloadConverter { * Converts a {@link Payload} back to a value. */ fromPayload(payload: Payload): T; + + /** + * Optionally return a converter bound to the current serialization context. + * + * The SDK may call this for individual conversion operations across different + * workflows and activities. Implementations should treat this as side-effect + * free and may return `this` when no rebinding is needed. + * + * @experimental Serialization context is an experimental feature and may change. + */ + withContext?(context: SerializationContext): PayloadConverter; } /** diff --git a/packages/common/src/converter/serialization-context.ts b/packages/common/src/converter/serialization-context.ts new file mode 100644 index 000000000..6a4795392 --- /dev/null +++ b/packages/common/src/converter/serialization-context.ts @@ -0,0 +1,107 @@ +import type { LoadedDataConverter } from './data-converter'; +import type { FailureConverter } from './failure-converter'; +import type { PayloadConverter } from './payload-converter'; + +/** + * Context for payloads owned by a workflow. + * + * This is used when converting payloads sent to or received from a workflow. + * If a workflow interacts with a child workflow or an external workflow, this + * context refers to that target workflow. + * + * @experimental Serialization context is an experimental feature and may change. + */ +export interface WorkflowSerializationContext { + /** Always `'workflow'` for workflow-owned payloads. */ + type: 'workflow'; + /** Namespace of the workflow that owns the payload. */ + namespace: string; + /** Workflow ID of the workflow that owns the payload. */ + workflowId: string; +} + +/** + * Context for payloads owned by an activity. + * + * This is used when converting activity arguments, results, heartbeat details, + * and activity-related failures. + * + * @experimental Serialization context is an experimental feature and may change. + */ +export interface ActivitySerializationContext { + /** Always `'activity'` for activity-owned payloads. */ + type: 'activity'; + /** Namespace of the activity that owns the payload. */ + namespace: string; + /** + * Activity ID of the activity that owns the payload. + * + * This may be omitted when context is supplied manually and the caller does + * not know the activity ID. + */ + activityId?: string; + /** Workflow ID of the workflow that scheduled the activity, when known. */ + workflowId?: string; + /** Whether the activity is a local activity started from a workflow. */ + isLocal: boolean; +} + +/** + * Context passed to payload and failure converters. + * + * The context describes the workflow or activity whose payload is being converted. + * For example: + * - `client.workflow.start()` uses the target workflow's context. + * - `executeChild()` uses the child workflow's context, not the parent's. + * - `scheduleActivity()` uses the scheduled activity's context. + * + * @experimental Serialization context is an experimental feature and may change. + */ +export type SerializationContext = WorkflowSerializationContext | ActivitySerializationContext; + +/** + * Return a payload converter bound to `context` if the converter supports context binding. + */ +export function withPayloadConverterContext( + converter: PayloadConverter, + context: SerializationContext +): PayloadConverter { + return converter.withContext?.(context) ?? converter; +} + +/** + * Return a failure converter bound to `context` if the converter supports context binding. + */ +export function withFailureConverterContext( + converter: FailureConverter, + context: SerializationContext +): FailureConverter { + return converter.withContext?.(context) ?? converter; +} + +/** + * Return a loaded data converter with its payload and failure converters bound to `context`. + * + * Internal helper for non-workflow code paths. Workflow-isolate code should bind the individual + * payload or failure converter directly to avoid pulling unnecessary code into the workflow bundle. + * + * NOTE: this does *not* bind `context` to payload codecs + */ +// ts-prune-ignore-next +export function withSerializationContext( + converter: LoadedDataConverter, + context: SerializationContext +): LoadedDataConverter { + const payloadConverter = withPayloadConverterContext(converter.payloadConverter, context); + const failureConverter = withFailureConverterContext(converter.failureConverter, context); + + if (payloadConverter === converter.payloadConverter && failureConverter === converter.failureConverter) { + return converter; + } + + return { + payloadConverter, + failureConverter, + payloadCodecs: converter.payloadCodecs, + }; +} diff --git a/packages/common/src/index.ts b/packages/common/src/index.ts index 343bb6fd0..51bf074fd 100644 --- a/packages/common/src/index.ts +++ b/packages/common/src/index.ts @@ -14,6 +14,11 @@ export * from './converter/data-converter'; export * from './converter/failure-converter'; export * from './converter/payload-codec'; export * from './converter/payload-converter'; +export type { + ActivitySerializationContext, + SerializationContext, + WorkflowSerializationContext, +} from './converter/serialization-context'; export * from './converter/types'; export * from './deprecated-time'; export * from './errors'; diff --git a/packages/test/src/activities/serialization-context.ts b/packages/test/src/activities/serialization-context.ts new file mode 100644 index 000000000..1057963ba --- /dev/null +++ b/packages/test/src/activities/serialization-context.ts @@ -0,0 +1,14 @@ +import { activityInfo, heartbeat } from '@temporalio/activity'; +import { ContextTrace, withLabel } from '../payload-converters/serialization-context-converter'; + +export async function echoTrace(inputTrace: ContextTrace, value: T): Promise> { + return withLabel(inputTrace, value); +} + +export async function heartbeatTrace(inputTrace: ContextTrace, value: T): Promise> { + if (activityInfo().attempt === 1) { + heartbeat(withLabel(inputTrace, value)); + throw new Error('retry me'); + } + return activityInfo().heartbeatDetails as ContextTrace; +} diff --git a/packages/test/src/payload-converters/serialization-context-converter.ts b/packages/test/src/payload-converters/serialization-context-converter.ts new file mode 100644 index 000000000..158009632 --- /dev/null +++ b/packages/test/src/payload-converters/serialization-context-converter.ts @@ -0,0 +1,123 @@ +import { + Payload, + PayloadConverter, + FailureConverter, + ProtoFailure, + defaultPayloadConverter, + defaultFailureConverter, + SerializationContext, +} from '@temporalio/common'; + +export interface ContextTrace { + label: T; + trace: string[]; +} + +export function makeContextTrace(label: T): ContextTrace { + return { + label, + trace: [], + }; +} + +export function withLabel(existing: ContextTrace, label: T): ContextTrace { + return { label, trace: existing.trace }; +} + +function isContextTrace(maybeTrace: unknown): maybeTrace is ContextTrace { + return ( + typeof maybeTrace === 'object' && + maybeTrace !== null && + 'label' in maybeTrace && + 'trace' in maybeTrace && + Array.isArray(maybeTrace.trace) + ); +} + +function ctxToTraceStr(context: SerializationContext): string { + const parts = [context.type, context.namespace]; + + if (context.workflowId) parts.push(context.workflowId); + + if (context.type === 'activity') { + if (context.activityId) parts.push(context.activityId); + parts.push(String(context.isLocal)); + } + + return parts.join('.'); +} + +export class FreePayloadConverter implements PayloadConverter { + withContext(context: SerializationContext): PayloadConverter { + return new BoundPayloadConverter(context); + } + + toPayload(value: T): Payload { + if (isContextTrace(value)) { + value.trace.push(`payload.encode.free|${value.label}`); + } + return defaultPayloadConverter.toPayload(value); + } + + fromPayload(payload: Payload): T { + const value = defaultPayloadConverter.fromPayload(payload); + if (isContextTrace(value)) { + value.trace.push(`payload.decode.free|${value.label}`); + } + return value as T; + } +} + +class BoundPayloadConverter implements PayloadConverter { + constructor(private readonly context: SerializationContext) {} + + toPayload(value: T): Payload { + if (isContextTrace(value)) { + value.trace.push(`payload.encode.bound|${value.label}|${ctxToTraceStr(this.context)}`); + } + return defaultPayloadConverter.toPayload(value); + } + + fromPayload(payload: Payload): T { + const value = defaultPayloadConverter.fromPayload(payload); + if (isContextTrace(value)) { + value.trace.push(`payload.decode.bound|${value.label}|${ctxToTraceStr(this.context)}`); + } + return value as T; + } +} + +export class FreeFailureConverter implements FailureConverter { + errorToFailure(err: unknown, payloadConverter: PayloadConverter): ProtoFailure { + const failure = defaultFailureConverter.errorToFailure(err, payloadConverter); + const existing = failure.message ?? ''; + failure.message = `failure.encode.free|${existing}`; + return failure; + } + failureToError(err: ProtoFailure, payloadConverter: PayloadConverter): Error { + const error = defaultFailureConverter.failureToError(err, payloadConverter); + error.message = `failure.decode.free|${error.message}`; + return error; + } + withContext?(context: SerializationContext): FailureConverter { + return new BoundFailureConverter(context); + } +} + +class BoundFailureConverter implements FailureConverter { + constructor(private readonly context: SerializationContext) {} + errorToFailure(err: unknown, payloadConverter: PayloadConverter): ProtoFailure { + const failure = defaultFailureConverter.errorToFailure(err, payloadConverter); + const existing = failure.message ?? ''; + failure.message = `failure.encode.bound|${ctxToTraceStr(this.context)}|${existing}`; + return failure; + } + failureToError(err: ProtoFailure, payloadConverter: PayloadConverter): Error { + const error = defaultFailureConverter.failureToError(err, payloadConverter); + error.message = `failure.decode.bound|${ctxToTraceStr(this.context)}|${error.message}`; + return error; + } +} + +export const payloadConverter = new FreePayloadConverter(); +export const failureConverter = new FreeFailureConverter(); diff --git a/packages/test/src/test-otel.ts b/packages/test/src/test-otel.ts index b640b95ae..b1cb987d4 100644 --- a/packages/test/src/test-otel.ts +++ b/packages/test/src/test-otel.ts @@ -518,7 +518,7 @@ if (RUN_INTEGRATION_TESTS) { tracer, spanName: `test-thrown-${String(thrown)}`, fn: () => { - throw thrown; // eslint-disable-line no-throw-literal + throw thrown; }, }); t.fail('expected instrumentSync to throw'); diff --git a/packages/test/src/test-serialization-context.ts b/packages/test/src/test-serialization-context.ts new file mode 100644 index 000000000..f49237340 --- /dev/null +++ b/packages/test/src/test-serialization-context.ts @@ -0,0 +1,524 @@ +import { randomUUID } from 'crypto'; +import { Client, WorkflowFailedError } from '@temporalio/client'; +import { workflowInterceptorModules } from '@temporalio/testing'; +import { bundleWorkflowCode } from '@temporalio/worker'; +import { decodeOptionalSinglePayload } from '@temporalio/common/lib/internal-non-workflow'; +import { bundlerOptions, TestWorkflowEnvironment } from './helpers'; +import { + makeConfigurableEnvironmentTestFn, + configurableHelpers, + createTestWorkflowEnvironment, + Context, +} from './helpers-integration'; +import { + currentWorkflowContext, + echoQuery, + echoUpdate, + unblockSignal, + messagePassingContexts, + wfContextWithRemoteActivity, + wfContextWithHeartbeatDetails, + wfContextWithLocalActivity, + wfContextWithContinueAsNew, + wfContextWithChildWorkflow, + wfFailureContext, + wfActivityFailureContext, + wfExternalSignalFailureContext, + wfExternalCancelFailureContext, + wfContextSmoke, + wfContextWithUpsertMemo, + wfContextWithTimerSummary, + wfChildWorkflowFailureContext, + wfExternalSignalSuccessContext, + wfLocalActivityFailureContext, +} from './workflows/serialization-context'; +import { makeContextTrace } from './payload-converters/serialization-context-converter'; +import { echoTrace, heartbeatTrace } from './activities/serialization-context'; +import { throwAnError } from './activities'; + +const converterPath = require.resolve('./payload-converters/serialization-context-converter'); +const dataConverter = { payloadConverterPath: converterPath, failureConverterPath: converterPath }; + +const test = makeConfigurableEnvironmentTestFn({ + createTestContext: async () => { + const env = await createTestWorkflowEnvironment(); + const workflowBundle = await bundleWorkflowCode({ + ...bundlerOptions, + workflowInterceptorModules: [...workflowInterceptorModules], + workflowsPath: require.resolve('./workflows/serialization-context'), + payloadConverterPath: converterPath, + failureConverterPath: converterPath, + }); + return { env, workflowBundle }; + }, + teardown: async (c) => { + await c.env.teardown(); + }, +}); + +function makeClient(env: TestWorkflowEnvironment): Client { + return new Client({ + connection: env.client.connection, + namespace: env.client.options.namespace, + dataConverter, + }); +} + +// Helper to assert workflow serialization context in trace string +function workflowCtx(workflowId: string): string { + return `workflow.default.${workflowId}`; +} +// Helper to assert activity serialization context in trace string +function activityCtx(workflowId: string, activityId = '1', isLocal = false): string { + return `activity.default.${workflowId}.${activityId}.${isLocal}`; +} + +// Helper to assert payload encoding in trace string +function enc(label: string, ctx: string): string { + return `payload.encode.bound|${label}|${ctx}`; +} +// Helper to assert payload decoding in trace string +function dec(label: string, ctx: string): string { + return `payload.decode.bound|${label}|${ctx}`; +} +// Helper to assert paired payload encode/decode operations in trace strings +function encdec(label: string, ctx: string): string[] { + return [enc(label, ctx), dec(label, ctx)]; +} + +test('workflow start/result payloads carry workflow context', async (t) => { + const h = configurableHelpers(t, t.context.workflowBundle, t.context.env); + const client = makeClient(t.context.env); + const worker = await h.createWorker({ dataConverter }); + const workflowId = `wf-id-${randomUUID()}`; + const wf = workflowCtx(workflowId); + await worker.runUntil(async () => { + const handle = await client.workflow.start(currentWorkflowContext, { + args: [makeContextTrace('wf-input')], + workflowId, + taskQueue: h.taskQueue, + }); + const wfTrace = await handle.result(); + t.deepEqual(wfTrace, { + label: 'wf-output', + trace: [...encdec('wf-input', wf), ...encdec('wf-output', wf)], + }); + }); +}); + +test('query/signal/update carry workflow context', async (t) => { + const h = configurableHelpers(t, t.context.workflowBundle, t.context.env); + const client = makeClient(t.context.env); + const worker = await h.createWorker({ dataConverter }); + const workflowId = `wf-id-${randomUUID()}`; + const wf = workflowCtx(workflowId); + await worker.runUntil(async () => { + const handle = await client.workflow.start(messagePassingContexts, { + workflowId, + taskQueue: h.taskQueue, + }); + const queryTrace = await handle.query(echoQuery, makeContextTrace('query-input')); + const updateTrace = await handle.executeUpdate(echoUpdate, { args: [makeContextTrace('update-input')] }); + await handle.signal(unblockSignal, makeContextTrace('signal-input')); + const signalTrace = await handle.result(); + t.deepEqual(queryTrace, { + label: 'query-output', + trace: [...encdec('query-input', wf), ...encdec('query-output', wf)], + }); + t.deepEqual(updateTrace, { + label: 'update-output', + trace: [...encdec('update-input', wf), ...encdec('update-output', wf)], + }); + t.deepEqual(signalTrace, { + label: 'signal-received', + trace: [...encdec('signal-input', wf), ...encdec('signal-received', wf)], + }); + }); +}); + +test('activity carries serialization context', async (t) => { + const h = configurableHelpers(t, t.context.workflowBundle, t.context.env); + const client = makeClient(t.context.env); + const worker = await h.createWorker({ dataConverter, activities: { echoTrace } }); + const workflowId = `wf-id-${randomUUID()}`; + const wf = workflowCtx(workflowId); + const act = activityCtx(workflowId); + await worker.runUntil(async () => { + const handle = await client.workflow.start(wfContextWithRemoteActivity, { + args: [makeContextTrace('wf-input')], + workflowId, + taskQueue: h.taskQueue, + }); + const wfTrace = await handle.result(); + t.deepEqual(wfTrace, { + label: 'wf-output', + trace: [ + ...encdec('wf-input', wf), + ...encdec('activity-input', act), + ...encdec('activity-output', act), + ...encdec('wf-output', wf), + ], + }); + }); +}); + +test('activity heartbeat carries workflow context', async (t) => { + const h = configurableHelpers(t, t.context.workflowBundle, t.context.env); + const client = makeClient(t.context.env); + const worker = await h.createWorker({ + dataConverter, + activities: { heartbeatTrace }, + }); + const workflowId = `wf-id-${randomUUID()}`; + const wf = workflowCtx(workflowId); + const act = activityCtx(workflowId); + await worker.runUntil(async () => { + const handle = await client.workflow.start(wfContextWithHeartbeatDetails, { + args: [makeContextTrace('wf-input')], + workflowId, + taskQueue: h.taskQueue, + }); + const wfTrace = await handle.result(); + t.deepEqual(wfTrace, { + label: 'wf-output', + trace: [ + ...encdec('wf-input', wf), + ...encdec('activity-input', act), + ...encdec('activity-heartbeat-details', act), + ...encdec('activity-heartbeat-details', act), + ...encdec('wf-output', wf), + ], + }); + }); +}); + +test('local activity carries serialization context', async (t) => { + const h = configurableHelpers(t, t.context.workflowBundle, t.context.env); + const client = makeClient(t.context.env); + const worker = await h.createWorker({ dataConverter, activities: { echoTrace } }); + const workflowId = `wf-id-${randomUUID()}`; + const wf = workflowCtx(workflowId); + const localAct = activityCtx(workflowId, '1', true); + await worker.runUntil(async () => { + const handle = await client.workflow.start(wfContextWithLocalActivity, { + args: [makeContextTrace('wf-input')], + workflowId, + taskQueue: h.taskQueue, + }); + const wfTrace = await handle.result(); + t.deepEqual(wfTrace, { + label: 'wf-output', + trace: [ + ...encdec('wf-input', wf), + ...encdec('local-activity-input', localAct), + ...encdec('local-activity-output', localAct), + ...encdec('wf-output', wf), + ], + }); + }); +}); + +test('workflow continue-as-new carry workflow context', async (t) => { + const h = configurableHelpers(t, t.context.workflowBundle, t.context.env); + const client = makeClient(t.context.env); + const worker = await h.createWorker({ dataConverter }); + const workflowId = `wf-id-${randomUUID()}`; + const wf = workflowCtx(workflowId); + await worker.runUntil(async () => { + const handle = await client.workflow.start(wfContextWithContinueAsNew, { + args: [makeContextTrace('wf-input'), true], + workflowId, + taskQueue: h.taskQueue, + }); + const wfTrace = await handle.result(); + t.deepEqual(wfTrace, { + label: 'wf-output', + trace: [...encdec('wf-input', wf), ...encdec('continue-as-new', wf), ...encdec('wf-output', wf)], + }); + }); +}); + +test('child workflow carry workflow context', async (t) => { + const h = configurableHelpers(t, t.context.workflowBundle, t.context.env); + const client = makeClient(t.context.env); + const worker = await h.createWorker({ dataConverter }); + const workflowId = `wf-id-${randomUUID()}`; + const childId = `child-wf-id-${randomUUID()}`; + const wf = workflowCtx(workflowId); + const childWf = workflowCtx(childId); + await worker.runUntil(async () => { + const handle = await client.workflow.start(wfContextWithChildWorkflow, { + args: [makeContextTrace('parent-wf-input'), childId], + workflowId, + taskQueue: h.taskQueue, + }); + const wfTrace = await handle.result(); + t.deepEqual(wfTrace, { + label: 'parent-wf-output', + trace: [ + ...encdec('parent-wf-input', wf), + ...encdec('child-wf-input', childWf), + ...encdec('child-wf-output', childWf), + ...encdec('parent-wf-output', wf), + ], + }); + }); +}); + +test('workflow failure carries workflow failure context', async (t) => { + const h = configurableHelpers(t, t.context.workflowBundle, t.context.env); + const client = makeClient(t.context.env); + const worker = await h.createWorker({ dataConverter }); + const workflowId = `wf-id-${randomUUID()}`; + const wf = workflowCtx(workflowId); + + await worker.runUntil(async () => { + const handle = await client.workflow.start(wfFailureContext, { + workflowId, + taskQueue: h.taskQueue, + }); + + const err = (await t.throwsAsync(handle.result(), { + instanceOf: WorkflowFailedError, + })) as WorkflowFailedError; + + t.is(err.cause?.message, `failure.decode.bound|${wf}|failure.encode.bound|${wf}|wf-failure`); + }); +}); + +test('activity failure observed by workflow carries workflow decode context', async (t) => { + const h = configurableHelpers(t, t.context.workflowBundle, t.context.env); + const client = makeClient(t.context.env); + const worker = await h.createWorker({ + dataConverter, + activities: { throwAnError }, + }); + const workflowId = `wf-id-${randomUUID()}`; + const act = activityCtx(workflowId); + + await worker.runUntil(async () => { + const handle = await client.workflow.start(wfActivityFailureContext, { + workflowId, + taskQueue: h.taskQueue, + }); + + const message = await handle.result(); + t.is(message, `failure.decode.bound|${act}|Activity task failed`); + }); +}); + +test('external signal failure carries target workflow decode context', async (t) => { + const h = configurableHelpers(t, t.context.workflowBundle, t.context.env); + const client = makeClient(t.context.env); + const worker = await h.createWorker({ dataConverter }); + const missingWfId = `missing-wf-id-${randomUUID()}`; + await worker.runUntil(async () => { + const handle = await client.workflow.start(wfExternalSignalFailureContext, { + workflowId: `wf-id-${randomUUID()}`, + taskQueue: h.taskQueue, + args: [missingWfId], + }); + + const message = await handle.result(); + t.is( + message, + `failure.decode.bound|workflow.default.${missingWfId}|Unable to signal external workflow because it was not found` + ); + }); +}); + +test('external cancel failure carries target workflow decode context', async (t) => { + const h = configurableHelpers(t, t.context.workflowBundle, t.context.env); + const client = makeClient(t.context.env); + const worker = await h.createWorker({ dataConverter }); + const missingWfId = `missing-wf-id-${randomUUID()}`; + + await worker.runUntil(async () => { + const handle = await client.workflow.start(wfExternalCancelFailureContext, { + workflowId: `wf-id-${randomUUID()}`, + taskQueue: h.taskQueue, + args: [missingWfId], + }); + + const message = await handle.result(); + t.is( + message, + `failure.decode.bound|workflow.default.${missingWfId}|Unable to cancel external workflow because not found` + ); + }); +}); + +test('workflow upsertMemo carries workflow context on encode', async (t) => { + const h = configurableHelpers(t, t.context.workflowBundle, t.context.env); + const client = makeClient(t.context.env); + const worker = await h.createWorker({ dataConverter }); + const workflowId = `wf-id-${randomUUID()}`; + const wf = workflowCtx(workflowId); + + await worker.runUntil(async () => { + const handle = await client.workflow.start(wfContextWithUpsertMemo, { + args: [makeContextTrace('wf-input')], + workflowId, + taskQueue: h.taskQueue, + }); + + const wfTrace = await handle.result(); + t.deepEqual(wfTrace, { + label: 'wf-output', + trace: [...encdec('wf-input', wf), enc('memo-upsert', wf), ...encdec('wf-output', wf)], + }); + }); +}); + +// Timer summary is string metadata, not a ContextTrace payload so there is no +// ContextTrace to inspect. +// This test only verifies that the summary still round-trips through history. +// (it does not prove serialization-context tracing the way the ContextTrace tests do) +test('timer summary still serializes', async (t) => { + const h = configurableHelpers(t, t.context.workflowBundle, t.context.env); + const client = makeClient(t.context.env); + const worker = await h.createWorker({ dataConverter }); + const workflowId = `wf-id-${randomUUID()}`; + + await worker.runUntil(async () => { + await client.workflow.execute(wfContextWithTimerSummary, { + args: [makeContextTrace('wf-input')], + workflowId, + taskQueue: h.taskQueue, + }); + + const resp = await t.context.env.client.workflowService.getWorkflowExecutionHistory({ + namespace: t.context.env.client.options.namespace, + execution: { workflowId }, + }); + + const timerStarted = resp.history?.events?.find((e) => e.timerStartedEventAttributes != null); + + t.is( + await decodeOptionalSinglePayload( + t.context.env.client.options.loadedDataConverter, + timerStarted?.userMetadata?.summary + ), + 'timer-summary' + ); + }); +}); + +test('workflow with many payload boundaries', async (t) => { + const h = configurableHelpers(t, t.context.workflowBundle, t.context.env); + const client = makeClient(t.context.env); + const worker = await h.createWorker({ dataConverter, activities: { echoTrace } }); + const workflowId = `wf-id-${randomUUID()}`; + const childId = `child-id-${randomUUID()}`; + const wf = workflowCtx(workflowId); + const childWf1 = workflowCtx(childId); + const childWf2 = workflowCtx(childId + '-2'); + const act = activityCtx(workflowId, '1', false); + const localAct = activityCtx(workflowId, '2', true); + await worker.runUntil(async () => { + const handle = await client.workflow.start(wfContextSmoke, { + args: [makeContextTrace('wf-input'), true, childId], + workflowId, + taskQueue: h.taskQueue, + }); + const wfTrace = await handle.result(); + t.deepEqual(wfTrace, { + label: 'wf-output', + trace: [ + ...encdec('wf-input', wf), + + ...encdec('activity-input', act), + ...encdec('activity-output', act), + + ...encdec('local-activity-input', localAct), + ...encdec('local-activity-output', localAct), + + ...encdec('child-wf-input', childWf1), + ...encdec('child-wf-output', childWf1), + + ...encdec('continue-as-new', wf), + + ...encdec('activity-input', act), + ...encdec('activity-output', act), + + ...encdec('local-activity-input', localAct), + ...encdec('local-activity-output', localAct), + + ...encdec('child-wf-input', childWf2), + ...encdec('child-wf-output', childWf2), + + ...encdec('wf-output', wf), + ], + }); + }); +}); + +test('external signal success carries target workflow context', async (t) => { + const h = configurableHelpers(t, t.context.workflowBundle, t.context.env); + const client = makeClient(t.context.env); + const worker = await h.createWorker({ dataConverter }); + const workflowId = `wf-id-${randomUUID()}`; + const childId = `child-wf-id-${randomUUID()}`; + const wf = workflowCtx(workflowId); + const childWf = workflowCtx(childId); + + await worker.runUntil(async () => { + const handle = await client.workflow.start(wfExternalSignalSuccessContext, { + args: [makeContextTrace('wf-input'), childId], + workflowId, + taskQueue: h.taskQueue, + }); + + const wfTrace = await handle.result(); + t.deepEqual(wfTrace, { + label: 'wf-output', + trace: [ + ...encdec('wf-input', wf), + ...encdec('signal-input', childWf), + ...encdec('signal-received', childWf), + ...encdec('wf-output', wf), + ], + }); + }); +}); + +test('child workflow failure observed by parent carries child workflow decode context', async (t) => { + const h = configurableHelpers(t, t.context.workflowBundle, t.context.env); + const client = makeClient(t.context.env); + const worker = await h.createWorker({ dataConverter }); + const workflowId = `wf-id-${randomUUID()}`; + const childId = `child-wf-id-${randomUUID()}`; + const childWf = workflowCtx(childId); + + await worker.runUntil(async () => { + const handle = await client.workflow.start(wfChildWorkflowFailureContext, { + args: [childId], + workflowId, + taskQueue: h.taskQueue, + }); + + const message = await handle.result(); + t.is(message, `failure.decode.bound|${childWf}|Child Workflow execution failed`); + }); +}); + +test('local activity failure observed by workflow carries local activity decode context', async (t) => { + const h = configurableHelpers(t, t.context.workflowBundle, t.context.env); + const client = makeClient(t.context.env); + const worker = await h.createWorker({ + dataConverter, + activities: { throwAnError }, + }); + const workflowId = `wf-id-${randomUUID()}`; + const localAct = activityCtx(workflowId, '1', true); + + await worker.runUntil(async () => { + const handle = await client.workflow.start(wfLocalActivityFailureContext, { + workflowId, + taskQueue: h.taskQueue, + }); + + const message = await handle.result(); + t.is(message, `failure.decode.bound|${localAct}|failure.encode.bound|${localAct}|local-activity-failure`); + }); +}); diff --git a/packages/test/src/test-worker-heartbeats.ts b/packages/test/src/test-worker-heartbeats.ts index df62a107b..fd347fa84 100644 --- a/packages/test/src/test-worker-heartbeats.ts +++ b/packages/test/src/test-worker-heartbeats.ts @@ -172,3 +172,38 @@ test('No heartbeat is emitted with rogue activity', async (t) => { }); t.deepEqual(heartbeatsSeen, [1]); }); + +test('activity start heartbeat-details decode failure is encoded with activity serialization context', async (t) => { + const converterPath = require.resolve('./payload-converters/serialization-context-converter'); + + const worker = isolateFreeWorker({ + namespace: 'worker-test', + taskQueue: 'unused', + dataConverter: { + payloadConverterPath: converterPath, + failureConverterPath: converterPath, + }, + activities: { + async rapidHeartbeater() { + throw new Error('should not execute'); + }, + }, + }); + + await worker.runUntil(async () => { + const completion = await worker.native.runActivityTask({ + taskToken: Buffer.from(uuid4()), + start: { + activityType: 'rapidHeartbeater', + activityId: 'act-1', + workflowExecution: { workflowId: 'wfid', runId: 'runid' }, + heartbeatDetails: [{} as any], + }, + }); + + t.is( + completion.result?.failed?.failure?.message, + 'failure.encode.bound|activity.worker-test.wfid.act-1.false|Failed to parse heartbeat details for activity act-1: Unknown encoding: ' + ); + }); +}); diff --git a/packages/test/src/workflows/serialization-context.ts b/packages/test/src/workflows/serialization-context.ts new file mode 100644 index 000000000..213f233fe --- /dev/null +++ b/packages/test/src/workflows/serialization-context.ts @@ -0,0 +1,223 @@ +import * as wf from '@temporalio/workflow'; +import { withLabel, type ContextTrace } from '../payload-converters/serialization-context-converter'; + +/** + * Workflow fixtures for serialization-context tests. + * + * Payload-path fixtures cover serialization across: + * - workflow start and result + * - query, update, and signal payloads + * - child workflows + * - remote activities + * - local activities + * - heartbeat details + * - continue-as-new + * - workflow memo + * - workflow timer + * - one small smoke workflow that combines several boundaries + * + * Failure-path fixtures cover serialization across: + * - workflow completion failure + * - activity failure observed from a workflow + * - external workflow signal failure + * - external workflow cancel failure + */ + +export type Trace = ContextTrace; + +export const { echoTrace, throwAnError, heartbeatTrace } = wf.proxyActivities<{ + echoTrace(input: Trace, label: string): Promise; + throwAnError(useApplicationFailure: boolean, message: string): Promise; + heartbeatTrace(input: Trace, label: string): Promise; +}>({ + startToCloseTimeout: '10s', + heartbeatTimeout: '1s', + retry: { maximumAttempts: 2 }, +}); + +const localActivities = wf.proxyLocalActivities<{ + echoTrace(input: Trace, label: string): Promise; + throwAnError(useApplicationFailure: boolean, message: string): Promise; +}>({ + startToCloseTimeout: '10s', +}); + +export const unblockSignal = wf.defineSignal<[Trace]>('unblock'); +export const echoQuery = wf.defineQuery('echoQuery'); +export const echoUpdate = wf.defineUpdate('echoUpdate'); + +export async function currentWorkflowContext(ctxTrace: Trace): Promise { + return workflowOutput(ctxTrace); +} + +export async function messagePassingContexts(): Promise { + let receivedSignal: Trace | undefined; + + wf.setHandler(echoQuery, (trace) => queryOutput(trace)); + wf.setHandler(echoUpdate, (trace) => updateOutput(trace)); + wf.setHandler(unblockSignal, (trace) => { + receivedSignal = signalReceived(trace); + }); + + await wf.condition(() => receivedSignal !== undefined); + return receivedSignal as Trace; +} + +export async function wfContextWithRemoteActivity(inputTrace: Trace): Promise { + const fromActivity = await echoTrace(activityInput(inputTrace), 'activity-output'); + return workflowOutput(fromActivity); +} + +export async function wfContextWithHeartbeatDetails(inputTrace: Trace): Promise { + const fromHeartbeat = await heartbeatTrace(activityInput(inputTrace), 'activity-heartbeat-details'); + return workflowOutput(fromHeartbeat); +} + +export async function wfContextWithLocalActivity(inputTrace: Trace): Promise { + const fromLocalActivity = await localActivities.echoTrace(localActivityInput(inputTrace), 'local-activity-output'); + return workflowOutput(fromLocalActivity); +} + +export async function wfContextWithContinueAsNew(inputTrace: Trace, shouldContinueAsNew: boolean): Promise { + if (shouldContinueAsNew) { + return await wf.continueAsNew(continueAsNewInput(inputTrace), false); + } + + return workflowOutput(inputTrace); +} + +export async function childWorkflowContext(inputTrace: Trace): Promise { + return childWorkflowOutput(inputTrace); +} + +export async function wfContextWithChildWorkflow(inputTrace: Trace, childId: string): Promise { + const fromChild = await wf.executeChild(childWorkflowContext, { + args: [childWorkflowInput(inputTrace)], + workflowId: childId, + }); + return parentWorkflowOutput(fromChild); +} + +export async function wfFailureContext(): Promise { + throw wf.ApplicationFailure.nonRetryable('wf-failure'); +} + +export async function wfActivityFailureContext(): Promise { + try { + await throwAnError(true, 'activity-failure'); + return 'unexpected success'; + } catch (err) { + return (err as Error).message; + } +} + +export async function wfExternalSignalFailureContext(missingWorkflowId: string): Promise { + try { + const handle = wf.getExternalWorkflowHandle(missingWorkflowId); + await handle.signal(unblockSignal, { label: 'signal', trace: [] }); + return 'unexpected success'; + } catch (err) { + return (err as Error).message; + } +} + +export async function wfExternalCancelFailureContext(missingWorkflowId: string): Promise { + try { + const handle = wf.getExternalWorkflowHandle(missingWorkflowId); + await handle.cancel(); + return 'unexpected success'; + } catch (err) { + return (err as Error).message; + } +} + +export async function wfContextWithUpsertMemo(inputTrace: Trace): Promise { + wf.upsertMemo({ + probe: withLabel(inputTrace, 'memo-upsert'), + }); + return workflowOutput(inputTrace); +} + +export async function wfContextWithTimerSummary(inputTrace: Trace): Promise { + await wf.sleep(1, { summary: 'timer-summary' }); + return workflowOutput(inputTrace); +} + +// Run a bunch of boundaries in a single workflow +export async function wfContextSmoke(trace: Trace, shouldContinueAsNew: boolean, childId: string): Promise { + trace = await echoTrace(activityInput(trace), 'activity-output'); + trace = await localActivities.echoTrace(localActivityInput(trace), 'local-activity-output'); + trace = await wf.executeChild(childWorkflowContext, { + args: [childWorkflowInput(trace)], + workflowId: childId, + }); + + if (shouldContinueAsNew) { + return await wf.continueAsNew(continueAsNewInput(trace), false, childId + '-2'); + } + return workflowOutput(trace); +} + +export async function signalReceivingChildWorkflow(): Promise { + let receivedSignal: Trace | undefined; + + wf.setHandler(unblockSignal, (trace) => { + receivedSignal = signalReceived(trace); + }); + + await wf.condition(() => receivedSignal !== undefined); + return receivedSignal as Trace; +} + +export async function wfExternalSignalSuccessContext(inputTrace: Trace, childId: string): Promise { + const child = await wf.startChild(signalReceivingChildWorkflow, { + workflowId: childId, + }); + + const external = wf.getExternalWorkflowHandle(child.workflowId, child.firstExecutionRunId); + await external.signal(unblockSignal, withLabel(inputTrace, 'signal-input')); + + const childTrace = await child.result(); + return workflowOutput(childTrace); +} + +export async function failingChildWorkflow(): Promise { + throw wf.ApplicationFailure.nonRetryable('child-wf-failure'); +} + +export async function wfChildWorkflowFailureContext(childId: string): Promise { + try { + await wf.executeChild(failingChildWorkflow, { + workflowId: childId, + }); + return 'unexpected success'; + } catch (err) { + return (err as Error).message; + } +} + +export async function wfLocalActivityFailureContext(): Promise { + try { + await localActivities.throwAnError(true, 'local-activity-failure'); + return 'unexpected success'; + } catch (err) { + return (err as Error).message; + } +} + +// TODO: add cases for describe (static metadata - summary/details) and list (memo) + +// Trace labelling helpers +export const workflowOutput = (trace: Trace): Trace => withLabel(trace, 'wf-output'); +export const childWorkflowInput = (trace: Trace): Trace => withLabel(trace, 'child-wf-input'); +export const childWorkflowOutput = (trace: Trace): Trace => withLabel(trace, 'child-wf-output'); +export const parentWorkflowOutput = (trace: Trace): Trace => withLabel(trace, 'parent-wf-output'); +export const continueAsNewInput = (trace: Trace): Trace => withLabel(trace, 'continue-as-new'); + +export const queryOutput = (trace: Trace): Trace => withLabel(trace, 'query-output'); +export const updateOutput = (trace: Trace): Trace => withLabel(trace, 'update-output'); +export const signalReceived = (trace: Trace): Trace => withLabel(trace, 'signal-received'); + +export const activityInput = (trace: Trace): Trace => withLabel(trace, 'activity-input'); +export const activityHeartbeatInput = (trace: Trace): Trace => withLabel(trace, 'activity-heartbeat-input'); +export const localActivityInput = (trace: Trace): Trace => withLabel(trace, 'local-activity-input'); diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index d4bec3475..6f1c4b687 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -22,6 +22,7 @@ import type { RawSourceMap } from 'source-map'; import * as nexus from 'nexus-rpc'; import { Info as ActivityInfo } from '@temporalio/activity'; import { + ActivitySerializationContext, DataConverter, decompileRetryPolicy, defaultPayloadConverter, @@ -44,6 +45,7 @@ import { encodeErrorToFailure, encodeToPayload, } from '@temporalio/common/lib/internal-non-workflow'; +import { withSerializationContext } from '@temporalio/common/lib/converter/serialization-context'; import { historyFromJSON } from '@temporalio/common/lib/proto-utils'; import { Duration, @@ -1068,6 +1070,11 @@ export class Worker { switch (variant) { case 'start': { let info: ActivityInfo | undefined = undefined; + const start = task.start as NonNullableObject; + const loadedDataConverter = withSerializationContext( + this.options.loadedDataConverter, + activitySerializationContextFromTaskStart(start, this.options.namespace) + ); try { if (activity !== undefined) { throw new IllegalStateError( @@ -1076,7 +1083,7 @@ export class Worker { } info = await extractActivityInfo( task, - this.options.loadedDataConverter, + loadedDataConverter, this.options.namespace, this.options.taskQueue ); @@ -1095,7 +1102,7 @@ export class Worker { } let args: unknown[]; try { - args = await decodeArrayFromPayloads(this.options.loadedDataConverter, task.start?.input); + args = await decodeArrayFromPayloads(loadedDataConverter, task.start?.input); } catch (err) { throw ApplicationFailure.fromError(err, { message: `Failed to parse activity args for activity ${activityType}: ${errorMessage(err)}`, @@ -1113,7 +1120,7 @@ export class Worker { activity = new Activity( info, fn, - this.options.loadedDataConverter, + loadedDataConverter, (details) => this.activityHeartbeatSubject.next({ type: 'heartbeat', @@ -1149,7 +1156,7 @@ export class Worker { type: 'result', result: { failed: { - failure: await encodeErrorToFailure(this.options.loadedDataConverter, error), + failure: await encodeErrorToFailure(loadedDataConverter, error), }, }, }; @@ -1763,7 +1770,13 @@ export class Worker { let payload: Payload; try { try { - payload = await encodeToPayload(this.options.loadedDataConverter, details); + payload = await encodeToPayload( + withSerializationContext( + this.options.loadedDataConverter, + activitySerializationContextFromInfo(info) + ), + details + ); } catch (error: any) { this.logger.warn('Failed to encode heartbeat details, cancelling Activity', { error, @@ -2243,6 +2256,29 @@ async function extractActivityInfo( }; } +function activitySerializationContextFromInfo(info: ActivityInfo): ActivitySerializationContext { + return { + type: 'activity', + namespace: info.activityNamespace, + activityId: info.activityId, + workflowId: info.workflowExecution.workflowId, + isLocal: info.isLocal, + }; +} + +function activitySerializationContextFromTaskStart( + start: NonNullableObject, + activityNamespace: string +): ActivitySerializationContext { + return { + type: 'activity', + namespace: activityNamespace, + activityId: start.activityId || undefined, + workflowId: start.workflowExecution?.workflowId ?? undefined, + isLocal: start.isLocal, + }; +} + /** * A utility function to await a promise with a timeout. * diff --git a/packages/workflow/src/internals.ts b/packages/workflow/src/internals.ts index a76e9b844..eaf5edb71 100644 --- a/packages/workflow/src/internals.ts +++ b/packages/workflow/src/internals.ts @@ -1,5 +1,6 @@ import type { RawSourceMap } from 'source-map'; import { + ActivitySerializationContext, defaultFailureConverter, FailureConverter, PayloadConverter, @@ -24,11 +25,16 @@ import { WorkflowFunctionWithOptions, VersioningBehavior, WorkflowDefinitionOptions, + WorkflowSerializationContext, } from '@temporalio/common'; import { decodeSearchAttributes, decodeTypedSearchAttributes, } from '@temporalio/common/lib/converter/payload-search-attributes'; +import { + withFailureConverterContext, + withPayloadConverterContext, +} from '@temporalio/common/lib/converter/serialization-context'; import { composeInterceptors } from '@temporalio/common/lib/interceptors'; import { makeProtoEnumConverters } from '@temporalio/common/lib/internal-workflow'; import type { coresdk, temporal } from '@temporalio/proto'; @@ -102,9 +108,10 @@ export interface PromiseStackStore { promiseToStack: Map, Stack>; } -export interface Completion { +export interface Completion { resolve(val: Success): void; reject(reason: Error): void; + context?: Context; } export interface Condition { @@ -164,13 +171,13 @@ export class Activator implements ActivationHandler { */ readonly completions = { timer: new Map>(), - activity: new Map>(), + activity: new Map>(), nexusOperationStart: new Map>(), nexusOperationComplete: new Map>(), - childWorkflowStart: new Map>(), - childWorkflowComplete: new Map>(), - signalWorkflow: new Map>(), - cancelWorkflow: new Map>(), + childWorkflowStart: new Map>(), + childWorkflowComplete: new Map>(), + signalWorkflow: new Map>(), + cancelWorkflow: new Map>(), }; /** @@ -550,12 +557,13 @@ export class Activator implements ActivationHandler { public startWorkflow(activation: coresdk.workflow_activation.IInitializeWorkflow): void { const execute = composeInterceptors(this.interceptors.inbound, 'execute', this.startWorkflowNextHandler.bind(this)); + const payloadConverter = this.payloadConverterWithWorkflowContext(); untrackPromise( executeWithLifecycleLogging(() => execute({ headers: activation.headers ?? {}, - args: arrayFromPayloads(this.payloadConverter, activation.arguments), + args: arrayFromPayloads(payloadConverter, activation.arguments), }) ).then(this.completeWorkflow.bind(this), this.handleWorkflowFailure.bind(this)) ); @@ -563,6 +571,8 @@ export class Activator implements ActivationHandler { public initializeWorkflow(activation: coresdk.workflow_activation.IInitializeWorkflow): void { const { continuedFailure, lastCompletionResult, memo, searchAttributes } = activation; + const payloadConverter = this.payloadConverterWithWorkflowContext(); + const failureConverter = this.failureConverterWithWorkflowContext(); // Most things related to initialization have already been handled in the constructor this.mutateWorkflowInfo((info) => ({ @@ -571,12 +581,10 @@ export class Activator implements ActivationHandler { searchAttributes: decodeSearchAttributes(searchAttributes?.indexedFields), typedSearchAttributes: decodeTypedSearchAttributes(searchAttributes?.indexedFields), - memo: mapFromPayloads(this.payloadConverter, memo?.fields), - lastResult: fromPayloadsAtIndex(this.payloadConverter, 0, lastCompletionResult?.payloads), + memo: mapFromPayloads(payloadConverter, memo?.fields), + lastResult: fromPayloadsAtIndex(payloadConverter, 0, lastCompletionResult?.payloads), lastFailure: - continuedFailure != null - ? this.failureConverter.failureToError(continuedFailure, this.payloadConverter) - : undefined, + continuedFailure != null ? failureConverter.failureToError(continuedFailure, payloadConverter) : undefined, })); if (this.workflowDefinitionOptionsGetter) { this.versioningBehavior = this.workflowDefinitionOptionsGetter().versioningBehavior; @@ -599,23 +607,29 @@ export class Activator implements ActivationHandler { if (!activation.result) { throw new TypeError('Got ResolveActivity activation with no result'); } - const { resolve, reject } = this.consumeCompletion('activity', getSeq(activation)); + const { resolve, reject, context } = this.consumeCompletion('activity', getSeq(activation)); + const payloadConverter = context + ? withPayloadConverterContext(this.payloadConverter, context) + : this.payloadConverter; + const failureConverter = context + ? withFailureConverterContext(this.failureConverter, context) + : this.failureConverter; if (activation.result.completed) { const completed = activation.result.completed; - const result = completed.result ? this.payloadConverter.fromPayload(completed.result) : undefined; + const result = completed.result ? payloadConverter.fromPayload(completed.result) : undefined; resolve(result); } else if (activation.result.failed) { const { failure } = activation.result.failed; if (failure == null) { throw new TypeError('Got failed result with no failure attribute'); } - reject(this.failureToError(failure)); + reject(failureConverter.failureToError(failure, payloadConverter)); } else if (activation.result.cancelled) { const { failure } = activation.result.cancelled; if (failure == null) { throw new TypeError('Got cancelled result with no failure attribute'); } - reject(this.failureToError(failure)); + reject(failureConverter.failureToError(failure, payloadConverter)); } else if (activation.result.backoff) { reject(new LocalActivityDoBackoff(activation.result.backoff)); } @@ -624,7 +638,7 @@ export class Activator implements ActivationHandler { public resolveChildWorkflowExecutionStart( activation: coresdk.workflow_activation.IResolveChildWorkflowExecutionStart ): void { - const { resolve, reject } = this.consumeCompletion('childWorkflowStart', getSeq(activation)); + const { resolve, reject, context } = this.consumeCompletion('childWorkflowStart', getSeq(activation)); if (activation.succeeded) { if (!activation.succeeded.runId) { throw new TypeError('Got ResolveChildWorkflowExecutionStart with no runId'); @@ -648,7 +662,13 @@ export class Activator implements ActivationHandler { if (!activation.cancelled.failure) { throw new TypeError('Got no failure in cancelled variant'); } - reject(this.failureToError(activation.cancelled.failure)); + const payloadConverter = context + ? withPayloadConverterContext(this.payloadConverter, context) + : this.payloadConverter; + const failureConverter = context + ? withFailureConverterContext(this.failureConverter, context) + : this.failureConverter; + reject(failureConverter.failureToError(activation.cancelled.failure, payloadConverter)); } else { throw new TypeError('Got ResolveChildWorkflowExecutionStart with no status'); } @@ -658,23 +678,29 @@ export class Activator implements ActivationHandler { if (!activation.result) { throw new TypeError('Got ResolveChildWorkflowExecution activation with no result'); } - const { resolve, reject } = this.consumeCompletion('childWorkflowComplete', getSeq(activation)); + const { resolve, reject, context } = this.consumeCompletion('childWorkflowComplete', getSeq(activation)); + const payloadConverter = context + ? withPayloadConverterContext(this.payloadConverter, context) + : this.payloadConverter; + const failureConverter = context + ? withFailureConverterContext(this.failureConverter, context) + : this.failureConverter; if (activation.result.completed) { const completed = activation.result.completed; - const result = completed.result ? this.payloadConverter.fromPayload(completed.result) : undefined; + const result = completed.result ? payloadConverter.fromPayload(completed.result) : undefined; resolve(result); } else if (activation.result.failed) { const { failure } = activation.result.failed; if (failure == null) { throw new TypeError('Got failed result with no failure attribute'); } - reject(this.failureToError(failure)); + reject(failureConverter.failureToError(failure, payloadConverter)); } else if (activation.result.cancelled) { const { failure } = activation.result.cancelled; if (failure == null) { throw new TypeError('Got cancelled result with no failure attribute'); } - reject(this.failureToError(failure)); + reject(failureConverter.failureToError(failure, payloadConverter)); } } @@ -775,9 +801,10 @@ export class Activator implements ActivationHandler { queryType === ENHANCED_STACK_TRACE_QUERY_NAME; const interceptors = isInternalQuery ? [] : this.interceptors.inbound; const execute = composeInterceptors(interceptors, 'handleQuery', this.queryWorkflowNextHandler.bind(this)); + const payloadConverter = this.payloadConverterWithWorkflowContext(); execute({ queryName: queryType, - args: arrayFromPayloads(this.payloadConverter, activation.arguments), + args: arrayFromPayloads(payloadConverter, activation.arguments), queryId, headers: headers ?? {}, }).then( @@ -827,12 +854,15 @@ export class Activator implements ActivationHandler { return; } - const makeInput = (): UpdateInput => ({ - updateId, - args: arrayFromPayloads(this.payloadConverter, activation.input), - name, - headers: headers ?? {}, - }); + const makeInput = (): UpdateInput => { + const payloadConverter = this.payloadConverterWithWorkflowContext(); + return { + updateId, + args: arrayFromPayloads(payloadConverter, activation.input), + name, + headers: headers ?? {}, + }; + }; // The implementation below is responsible for upholding, and constrained // by, the following contract: @@ -1003,8 +1033,9 @@ export class Activator implements ActivationHandler { const signalExecutionNum = this.signalHandlerExecutionSeq++; this.inProgressSignals.set(signalExecutionNum, { name: signalName, unfinishedPolicy }); const execute = composeInterceptors(interceptors, 'handleSignal', this.signalWorkflowNextHandler.bind(this)); + const payloadConverter = this.payloadConverterWithWorkflowContext(); execute({ - args: arrayFromPayloads(this.payloadConverter, activation.input), + args: arrayFromPayloads(payloadConverter, activation.input), signalName, headers: headers ?? {}, }) @@ -1029,9 +1060,15 @@ export class Activator implements ActivationHandler { } public resolveSignalExternalWorkflow(activation: coresdk.workflow_activation.IResolveSignalExternalWorkflow): void { - const { resolve, reject } = this.consumeCompletion('signalWorkflow', getSeq(activation)); + const { resolve, reject, context } = this.consumeCompletion('signalWorkflow', getSeq(activation)); + const payloadConverter = context + ? withPayloadConverterContext(this.payloadConverter, context) + : this.payloadConverter; + const failureConverter = context + ? withFailureConverterContext(this.failureConverter, context) + : this.failureConverter; if (activation.failure) { - reject(this.failureToError(activation.failure)); + reject(failureConverter.failureToError(activation.failure, payloadConverter)); } else { resolve(undefined); } @@ -1040,9 +1077,15 @@ export class Activator implements ActivationHandler { public resolveRequestCancelExternalWorkflow( activation: coresdk.workflow_activation.IResolveRequestCancelExternalWorkflow ): void { - const { resolve, reject } = this.consumeCompletion('cancelWorkflow', getSeq(activation)); + const { resolve, reject, context } = this.consumeCompletion('cancelWorkflow', getSeq(activation)); + const payloadConverter = context + ? withPayloadConverterContext(this.payloadConverter, context) + : this.payloadConverter; + const failureConverter = context + ? withFailureConverterContext(this.failureConverter, context) + : this.failureConverter; if (activation.failure) { - reject(this.failureToError(activation.failure)); + reject(failureConverter.failureToError(activation.failure, payloadConverter)); } else { resolve(undefined); } @@ -1204,8 +1247,9 @@ export class Activator implements ActivationHandler { } private completeQuery(queryId: string, result: unknown): void { + const payloadConverter = this.payloadConverterWithWorkflowContext(); this.pushCommand({ - respondToQuery: { queryId, succeeded: { response: this.payloadConverter.toPayload(result) } }, + respondToQuery: { queryId, succeeded: { response: payloadConverter.toPayload(result) } }, }); } @@ -1223,8 +1267,9 @@ export class Activator implements ActivationHandler { } private completeUpdate(protocolInstanceId: string, result: unknown): void { + const payloadConverter = this.payloadConverterWithWorkflowContext(); this.pushCommand({ - updateResponse: { protocolInstanceId, completed: this.payloadConverter.toPayload(result) }, + updateResponse: { protocolInstanceId, completed: payloadConverter.toPayload(result) }, }); } @@ -1262,10 +1307,11 @@ export class Activator implements ActivationHandler { } private completeWorkflow(result: unknown): void { + const payloadConverter = this.payloadConverterWithWorkflowContext(); this.pushCommand( { completeWorkflowExecution: { - result: this.payloadConverter.toPayload(result), + result: payloadConverter.toPayload(result), }, }, true @@ -1273,11 +1319,31 @@ export class Activator implements ActivationHandler { } errorToFailure(err: unknown): ProtoFailure { - return this.failureConverter.errorToFailure(err, this.payloadConverter); + const payloadConverter = this.payloadConverterWithWorkflowContext(); + const failureConverter = this.failureConverterWithWorkflowContext(); + return failureConverter.errorToFailure(err, payloadConverter); } failureToError(failure: ProtoFailure): Error { - return this.failureConverter.failureToError(failure, this.payloadConverter); + const payloadConverter = this.payloadConverterWithWorkflowContext(); + const failureConverter = this.failureConverterWithWorkflowContext(); + return failureConverter.failureToError(failure, payloadConverter); + } + + private workflowSerializationContext(): WorkflowSerializationContext { + return { + type: 'workflow', + namespace: this.info.namespace, + workflowId: this.info.workflowId, + }; + } + + private payloadConverterWithWorkflowContext(): PayloadConverter { + return withPayloadConverterContext(this.payloadConverter, this.workflowSerializationContext()); + } + + private failureConverterWithWorkflowContext(): FailureConverter { + return withFailureConverterContext(this.failureConverter, this.workflowSerializationContext()); } } diff --git a/packages/workflow/src/workflow.ts b/packages/workflow/src/workflow.ts index 0a6749f40..c9c4a5776 100644 --- a/packages/workflow/src/workflow.ts +++ b/packages/workflow/src/workflow.ts @@ -1,5 +1,6 @@ import { ActivityFunction, + ActivitySerializationContext, ActivityOptions, compileRetryPolicy, compilePriority, @@ -19,6 +20,7 @@ import { UpdateDefinition, WithWorkflowArgs, Workflow, + WorkflowSerializationContext, WorkflowResultType, WorkflowReturnType, WorkflowUpdateValidatorType, @@ -26,6 +28,7 @@ import { WorkflowDefinitionOptionsOrGetter, encodeInitialVersioningBehavior, } from '@temporalio/common'; +import { withPayloadConverterContext } from '@temporalio/common/lib/converter/serialization-context'; import { userMetadataToPayload } from '@temporalio/common/lib/user-metadata'; import { encodeUnifiedSearchAttributes, @@ -74,6 +77,36 @@ import { ChildWorkflowHandle, ExternalWorkflowHandle } from './workflow-handle'; // Avoid a circular dependency registerSleepImplementation(sleep); +function currentWorkflowSerializationContext(info: WorkflowInfo): WorkflowSerializationContext { + return { + type: 'workflow', + namespace: info.namespace, + workflowId: info.workflowId, + }; +} + +function targetWorkflowSerializationContext(info: WorkflowInfo, workflowId: string): WorkflowSerializationContext { + return { + type: 'workflow', + namespace: info.namespace, + workflowId, + }; +} + +function activitySerializationContext( + info: WorkflowInfo, + activityId: string, + isLocal: boolean +): ActivitySerializationContext { + return { + type: 'activity', + namespace: info.namespace, + activityId, + workflowId: info.workflowId, + isLocal, + }; +} + /** * Adds default values of `workflowId` and `cancellationType` to given workflow options. */ @@ -94,6 +127,10 @@ export function addDefaultWorkflowOptions( */ function timerNextHandler({ seq, durationMs, options }: TimerInput) { const activator = getActivator(); + const payloadConverter = withPayloadConverterContext( + activator.payloadConverter, + currentWorkflowSerializationContext(activator.info) + ); return new Promise((resolve, reject) => { const scope = CancellationScope.current(); if (scope.consideredCancelled) { @@ -120,7 +157,7 @@ function timerNextHandler({ seq, durationMs, options }: TimerInput) { seq, startToFireTimeout: msToTs(durationMs), }, - userMetadata: userMetadataToPayload(activator.payloadConverter, options?.summary, undefined), + userMetadata: userMetadataToPayload(payloadConverter, options?.summary, undefined), }); activator.completions.timer.set(seq, { resolve, @@ -168,6 +205,9 @@ const validateLocalActivityOptions = validateActivityOptions; function scheduleActivityNextHandler({ options, args, headers, seq, activityType }: ActivityInput): Promise { const activator = getActivator(); validateActivityOptions(options); + const activityId = options.activityId ?? `${seq}`; + const context = activitySerializationContext(activator.info, activityId, false); + const payloadConverter = withPayloadConverterContext(activator.payloadConverter, context); return new Promise((resolve, reject) => { const scope = CancellationScope.current(); if (scope.consideredCancelled) { @@ -191,9 +231,9 @@ function scheduleActivityNextHandler({ options, args, headers, seq, activityType activator.pushCommand({ scheduleActivity: { seq, - activityId: options.activityId ?? `${seq}`, + activityId, activityType, - arguments: toPayloads(activator.payloadConverter, ...args), + arguments: toPayloads(payloadConverter, ...args), retryPolicy: options.retry ? compileRetryPolicy(options.retry) : undefined, taskQueue: options.taskQueue || activator.info.taskQueue, heartbeatTimeout: msOptionalToTs(options.heartbeatTimeout), @@ -206,11 +246,12 @@ function scheduleActivityNextHandler({ options, args, headers, seq, activityType versioningIntent: versioningIntentToProto(options.versioningIntent), // eslint-disable-line @typescript-eslint/no-deprecated priority: options.priority ? compilePriority(options.priority) : undefined, }, - userMetadata: userMetadataToPayload(activator.payloadConverter, options.summary, undefined), + userMetadata: userMetadataToPayload(payloadConverter, options.summary, undefined), }); activator.completions.activity.set(seq, { resolve, reject, + context, }); }); } @@ -228,6 +269,9 @@ async function scheduleLocalActivityNextHandler({ originalScheduleTime, }: LocalActivityInput): Promise { const activator = getActivator(); + const activityId = `${seq}`; + const context = activitySerializationContext(activator.info, activityId, true); + const payloadConverter = withPayloadConverterContext(activator.payloadConverter, context); // Eagerly fail the local activity (which will in turn fail the workflow task. // Do not fail on replay where the local activities may not be registered on the replay worker. if (!activator.info.unsafe.isReplaying && !activator.registeredActivityNames.has(activityType)) { @@ -261,9 +305,9 @@ async function scheduleLocalActivityNextHandler({ attempt, originalScheduleTime, // Intentionally not exposing activityId as an option - activityId: `${seq}`, + activityId, activityType, - arguments: toPayloads(activator.payloadConverter, ...args), + arguments: toPayloads(payloadConverter, ...args), retryPolicy: options.retry ? compileRetryPolicy(options.retry) : undefined, scheduleToCloseTimeout: msOptionalToTs(options.scheduleToCloseTimeout), startToCloseTimeout: msOptionalToTs(options.startToCloseTimeout), @@ -272,11 +316,12 @@ async function scheduleLocalActivityNextHandler({ headers, cancellationType: encodeActivityCancellationType(options.cancellationType), }, - userMetadata: userMetadataToPayload(activator.payloadConverter, options.summary, undefined), + userMetadata: userMetadataToPayload(payloadConverter, options.summary, undefined), }); activator.completions.activity.set(seq, { resolve, reject, + context, }); }); } @@ -364,6 +409,8 @@ function startChildWorkflowExecutionNextHandler({ }: StartChildWorkflowExecutionInput): Promise<[Promise, Promise]> { const activator = getActivator(); const workflowId = options.workflowId ?? uuid4(); + const context = targetWorkflowSerializationContext(activator.info, workflowId); + const payloadConverter = withPayloadConverterContext(activator.payloadConverter, context); const startPromise = new Promise((resolve, reject) => { const scope = CancellationScope.current(); if (scope.consideredCancelled) { @@ -389,7 +436,7 @@ function startChildWorkflowExecutionNextHandler({ seq, workflowId, workflowType, - input: toPayloads(activator.payloadConverter, ...options.args), + input: toPayloads(payloadConverter, ...options.args), retryPolicy: options.retry ? compileRetryPolicy(options.retry) : undefined, taskQueue: options.taskQueue || activator.info.taskQueue, workflowExecutionTimeout: msOptionalToTs(options.workflowExecutionTimeout), @@ -405,15 +452,16 @@ function startChildWorkflowExecutionNextHandler({ options.searchAttributes || options.typedSearchAttributes // eslint-disable-line @typescript-eslint/no-deprecated ? { indexedFields: encodeUnifiedSearchAttributes(options.searchAttributes, options.typedSearchAttributes) } // eslint-disable-line @typescript-eslint/no-deprecated : undefined, - memo: options.memo && mapToPayloads(activator.payloadConverter, options.memo), + memo: options.memo && mapToPayloads(payloadConverter, options.memo), versioningIntent: versioningIntentToProto(options.versioningIntent), // eslint-disable-line @typescript-eslint/no-deprecated priority: options.priority ? compilePriority(options.priority) : undefined, }, - userMetadata: userMetadataToPayload(activator.payloadConverter, options?.staticSummary, options?.staticDetails), + userMetadata: userMetadataToPayload(payloadConverter, options?.staticSummary, options?.staticDetails), }); activator.completions.childWorkflowStart.set(seq, { resolve, reject, + context, }); }); @@ -425,6 +473,7 @@ function startChildWorkflowExecutionNextHandler({ activator.completions.childWorkflowComplete.set(seq, { resolve, reject, + context, }); }); untrackPromise(startPromise); @@ -438,6 +487,9 @@ function startChildWorkflowExecutionNextHandler({ function signalWorkflowNextHandler({ seq, signalName, args, target, headers }: SignalWorkflowInput) { const activator = getActivator(); + const targetWorkflowId = target.type === 'external' ? target.workflowExecution.workflowId : target.childWorkflowId; + const context = targetWorkflowSerializationContext(activator.info, targetWorkflowId!); + const payloadConverter = withPayloadConverterContext(activator.payloadConverter, context); return new Promise((resolve, reject) => { const scope = CancellationScope.current(); if (scope.consideredCancelled) { @@ -458,7 +510,7 @@ function signalWorkflowNextHandler({ seq, signalName, args, target, headers }: S activator.pushCommand({ signalExternalWorkflowExecution: { seq, - args: toPayloads(activator.payloadConverter, ...args), + args: toPayloads(payloadConverter, ...args), headers, signalName, ...(target.type === 'external' @@ -474,7 +526,7 @@ function signalWorkflowNextHandler({ seq, signalName, args, target, headers }: S }, }); - activator.completions.signalWorkflow.set(seq, { resolve, reject }); + activator.completions.signalWorkflow.set(seq, { resolve, reject, context }); }); } @@ -730,7 +782,11 @@ export function getExternalWorkflowHandle(workflowId: string, runId?: string): E }, }, }); - activator.completions.cancelWorkflow.set(seq, { resolve, reject }); + activator.completions.cancelWorkflow.set(seq, { + resolve, + reject, + context: targetWorkflowSerializationContext(activator.info, workflowId), + }); }); }, signal(def: SignalDefinition | string, ...args: Args): Promise { @@ -1010,14 +1066,18 @@ export function makeContinueAsNewFunc( }; return (...args: Parameters): Promise => { + const payloadConverter = withPayloadConverterContext( + activator.payloadConverter, + currentWorkflowSerializationContext(info) + ); const fn = composeInterceptors(activator.interceptors.outbound, 'continueAsNew', async (input) => { const { headers, args, options } = input; throw new ContinueAsNew({ workflowType: options.workflowType, - arguments: toPayloads(activator.payloadConverter, ...args), + arguments: toPayloads(payloadConverter, ...args), headers, taskQueue: options.taskQueue, - memo: options.memo && mapToPayloads(activator.payloadConverter, options.memo), + memo: options.memo && mapToPayloads(payloadConverter, options.memo), searchAttributes: options.searchAttributes || options.typedSearchAttributes // eslint-disable-line @typescript-eslint/no-deprecated ? { indexedFields: encodeUnifiedSearchAttributes(options.searchAttributes, options.typedSearchAttributes) } // eslint-disable-line @typescript-eslint/no-deprecated @@ -1670,6 +1730,10 @@ export function upsertSearchAttributes(searchAttributes: SearchAttributes | Sear */ export function upsertMemo(memo: Record): void { const activator = assertInWorkflowContext('Workflow.upsertMemo(...) may only be used from a Workflow Execution.'); + const payloadConverter = withPayloadConverterContext( + activator.payloadConverter, + currentWorkflowSerializationContext(activator.info) + ); if (memo == null) { throw new Error('memo must be a non-null Record'); @@ -1679,7 +1743,7 @@ export function upsertMemo(memo: Record): void { modifyWorkflowProperties: { upsertedMemo: { fields: mapToPayloads( - activator.payloadConverter, + payloadConverter, // Convert null to undefined Object.fromEntries(Object.entries(memo).map(([k, v]) => [k, v ?? undefined])) ),