diff --git a/packages/common/src/workflow-definition-options.ts b/packages/common/src/workflow-definition-options.ts index 536d3a54d..d8e2a3629 100644 --- a/packages/common/src/workflow-definition-options.ts +++ b/packages/common/src/workflow-definition-options.ts @@ -5,6 +5,29 @@ import type { VersioningBehavior } from './worker-deployments'; */ export interface WorkflowDefinitionOptions { versioningBehavior?: VersioningBehavior; + + /** + * The types of errors that, if thrown by the Workflow function, a signal handler, or an update + * handler, will cause the Workflow Execution or the Update to fail instead of failing the + * Workflow Task (which would result in retrying the Workflow Task until it eventually succeeds). + * + * This is a per-Workflow-type equivalent of {@link WorkerOptions.workflowFailureErrorTypes}. + * Both settings are evaluated; an error matching either will cause Workflow failure. + * Unlike the string-based {@link WorkerOptions.workflowFailureErrorTypes}, this accepts + * actual class references, enabling subclass matching via `instanceof`, but doesn't allow + * failing the workflow execution on _non-determinism errors_. Failing the workflow execution on + * non-determinism errors can only be set via {@link WorkerOptions.workflowFailureErrorTypes}. + * + * Passing the `Error` class to this setting will fail the Workflow on any error, except + * non-determinism errors. + * + * Note that {@link TemporalFailure} subclasses and cancellation errors that bubbles out + * of the Workflow always fail the Workflow Execution, regardless of either this and the + * {@link WorkerOptions.workflowFailureErrorTypes} settings. + * + * @experimental + */ + failureExceptionTypes?: Array Error>; } type AsyncFunction = (...args: Args) => Promise; diff --git a/packages/core-bridge/src/helpers/try_from_js.rs b/packages/core-bridge/src/helpers/try_from_js.rs index 62e6aff11..d7c03b4f3 100644 --- a/packages/core-bridge/src/helpers/try_from_js.rs +++ b/packages/core-bridge/src/helpers/try_from_js.rs @@ -1,4 +1,8 @@ -use std::{collections::HashMap, net::SocketAddr, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + net::SocketAddr, + time::Duration, +}; use neon::{ handle::Handle, @@ -163,6 +167,24 @@ impl TryFromJs for Vec { } } +#[allow(clippy::implicit_hasher)] +impl TryFromJs for HashSet { + fn try_from_js<'cx, 'b>( + cx: &mut impl Context<'cx>, + js_value: Handle<'b, JsValue>, + ) -> BridgeResult { + let array = js_value.downcast::(cx)?; + let len = array.len(cx); + let mut result = Self::with_capacity(len as usize); + + for i in 0..len { + let value = array.get_value(cx, i)?; + result.insert(T::try_from_js(cx, value)?); + } + Ok(result) + } +} + #[allow(clippy::implicit_hasher)] impl TryFromJs for HashMap { fn try_from_js<'cx, 'b>( diff --git a/packages/core-bridge/src/worker.rs b/packages/core-bridge/src/worker.rs index 979d1a74b..8a236f2de 100644 --- a/packages/core-bridge/src/worker.rs +++ b/packages/core-bridge/src/worker.rs @@ -487,7 +487,7 @@ impl MutableFinalize for HistoryForReplayTunnelHandle {} //////////////////////////////////////////////////////////////////////////////////////////////////// mod config { - use std::collections::HashSet; + use std::collections::{HashMap, HashSet}; use std::{sync::Arc, time::Duration}; use temporalio_common::protos::temporal::api::enums::v1::VersioningBehavior as CoreVersioningBehavior; use temporalio_common::protos::temporal::api::worker::v1::PluginInfo; @@ -499,7 +499,8 @@ mod config { ActivitySlotKind, LocalActivitySlotKind, NexusSlotKind, PollerBehavior as CorePollerBehavior, ResourceBasedSlotsOptions, ResourceSlotOptions, SlotKind, SlotSupplierOptions as CoreSlotSupplierOptions, TunerHolder, TunerHolderOptions, - WorkerConfig, WorkerVersioningStrategy, WorkflowSlotKind, + WorkerConfig, WorkerVersioningStrategy, WorkflowErrorType as CoreWorkflowErrorType, + WorkflowSlotKind, }; use super::custom_slot_supplier::CustomSlotSupplierOptions; @@ -532,6 +533,8 @@ mod config { max_task_queue_activities_per_second: Option, shutdown_grace_time: Option, plugins: Vec, + workflow_failure_errors: HashSet, + workflow_types_to_failure_errors: HashMap>, } #[derive(TryFromJs)] @@ -629,6 +632,10 @@ mod config { }) .collect::>(), ) + .workflow_failure_errors(into_core_workflow_error_set(self.workflow_failure_errors)) + .workflow_types_to_failure_errors(into_core_workflow_error_map_of_sets( + self.workflow_types_to_failure_errors, + )) .build() .map_err(|err| BridgeError::TypeError { message: format!("Failed to convert WorkerOptions to CoreWorkerConfig: {err}"), @@ -704,6 +711,33 @@ mod config { } } + #[derive(TryFromJs, Hash, Eq, PartialEq)] + pub enum WorkflowErrorType { + Nondeterminism, + } + + impl From for CoreWorkflowErrorType { + fn from(val: WorkflowErrorType) -> Self { + match val { + WorkflowErrorType::Nondeterminism => Self::Nondeterminism, + } + } + } + + fn into_core_workflow_error_set( + val: HashSet, + ) -> HashSet { + val.into_iter().map(Into::into).collect() + } + + fn into_core_workflow_error_map_of_sets( + val: HashMap>, + ) -> HashMap> { + val.into_iter() + .map(|(k, v)| (k, into_core_workflow_error_set(v))) + .collect() + } + #[derive(TryFromJs)] #[allow(clippy::struct_field_names)] pub(super) struct WorkerTuner { diff --git a/packages/core-bridge/ts/native.ts b/packages/core-bridge/ts/native.ts index a2a666731..97a74f373 100644 --- a/packages/core-bridge/ts/native.ts +++ b/packages/core-bridge/ts/native.ts @@ -240,6 +240,8 @@ export interface WorkerOptions { maxActivitiesPerSecond: Option; shutdownGraceTime: number; plugins: string[]; + workflowFailureErrors: WorkflowErrorType[]; + workflowTypesToFailureErrors: Record; } export type PollerBehavior = @@ -267,6 +269,8 @@ export type WorkerDeploymentVersion = { export type VersioningBehavior = { type: 'pinned' } | { type: 'auto-upgrade' }; +export type WorkflowErrorType = { type: 'nondeterminism' }; + //////////////////////////////////////////////////////////////////////////////////////////////////// // Worker Tuner //////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/packages/test/src/test-bridge.ts b/packages/test/src/test-bridge.ts index a0afc721d..dd4b0add9 100644 --- a/packages/test/src/test-bridge.ts +++ b/packages/test/src/test-bridge.ts @@ -313,6 +313,8 @@ const GenericConfigs = { maxActivitiesPerSecond: null, shutdownGraceTime: 1000, plugins: [], + workflowFailureErrors: [], + workflowTypesToFailureErrors: {}, } satisfies native.WorkerOptions, }, ephemeralServer: { diff --git a/packages/test/src/test-workflow-fail-on-errors-policy.ts b/packages/test/src/test-workflow-fail-on-errors-policy.ts new file mode 100644 index 000000000..293e02f19 --- /dev/null +++ b/packages/test/src/test-workflow-fail-on-errors-policy.ts @@ -0,0 +1,250 @@ +import asyncRetry from 'async-retry'; +import type { WorkflowHandle } from '@temporalio/client'; +import { WorkflowFailedError } from '@temporalio/client'; +import * as workflow from '@temporalio/workflow'; +import { ApplicationFailure } from '@temporalio/common'; +import { helpers, makeTestFunction } from './helpers-integration'; + +const test = makeTestFunction({ + workflowsPath: __filename, +}); + +//////////////////////////////////////////////////////////////////////////////// +// Test fixtures: custom error classes and workflows +//////////////////////////////////////////////////////////////////////////////// + +export class CustomWorkflowError extends Error { + constructor(message: string) { + super(message); + this.name = 'CustomWorkflowError'; + } +} + +export class CustomWorkflowSubError extends CustomWorkflowError { + constructor(message: string) { + super(message); + this.name = 'CustomWorkflowSubError'; + } +} + +export async function throwCustomError(): Promise { + throw new CustomWorkflowError('custom error'); +} + +export async function throwCustomSubError(): Promise { + throw new CustomWorkflowSubError('custom sub error'); +} + +export async function throwPlainError(): Promise { + throw new Error('plain error'); +} + +export async function throwCustomErrorWithDefinitionOptions(): Promise { + throw new CustomWorkflowError('custom error from definition options'); +} +workflow.setWorkflowOptions({ failureExceptionTypes: [CustomWorkflowError] }, throwCustomErrorWithDefinitionOptions); + +export async function throwSubErrorWithParentInDefinitionOptions(): Promise { + throw new CustomWorkflowSubError('sub error with parent in definition options'); +} +workflow.setWorkflowOptions( + { failureExceptionTypes: [CustomWorkflowError] }, + throwSubErrorWithParentInDefinitionOptions +); + +/** + * Forces a non-determinism error by branching on `unsafe.isReplaying`. The + * first execution issues a `startTimer` command; on the next WFT, replay + * (forced via `maxCachedWorkflows: 0`) takes the no-command branch, which + * mismatches the recorded history. + */ +export async function nondeterministicWorkflow(): Promise { + if (!workflow.workflowInfo().unsafe.isReplaying) { + await workflow.sleep('1ms'); + } +} + +//////////////////////////////////////////////////////////////////////////////// +// Helpers +//////////////////////////////////////////////////////////////////////////////// + +/** + * Polls workflow history until a `WorkflowTaskFailed` event referencing the + * given error type/message is observed, then terminates the workflow. This is + * how we assert "default WFT failure" behavior: the workflow remains running + * (retrying the task) rather than failing the execution outright. + */ +async function assertWftFailureAndTerminate( + handle: WorkflowHandle, + expected: { messageContains?: string; errorType?: string } +): Promise { + try { + await asyncRetry( + async () => { + const history = await handle.fetchHistory(); + const wftFailedEvent = history.events?.findLast((ev) => ev.workflowTaskFailedEventAttributes); + if (wftFailedEvent === undefined) { + throw new Error('No WorkflowTaskFailed event found yet'); + } + const { failure } = wftFailedEvent.workflowTaskFailedEventAttributes ?? {}; + if (!failure) { + throw new Error('Expected `failure` in workflowTaskFailedEventAttributes'); + } + if (expected.messageContains && !failure.message?.includes(expected.messageContains)) { + throw new Error( + `Expected failure message to contain ${JSON.stringify(expected.messageContains)}, got ${JSON.stringify( + failure.message + )}` + ); + } + if (expected.errorType && failure.applicationFailureInfo?.type !== expected.errorType) { + throw new Error( + `Expected applicationFailureInfo.type=${JSON.stringify(expected.errorType)}, got ${JSON.stringify( + failure.applicationFailureInfo?.type + )}` + ); + } + }, + { minTimeout: 300, factor: 1, retries: 15 } + ); + } finally { + await handle.terminate().catch(() => { + /* ignore */ + }); + } +} + +//////////////////////////////////////////////////////////////////////////////// +// Tests for workflowFailureErrorTypes / WorkflowDefinitionOptions.failureExceptionTypes +//////////////////////////////////////////////////////////////////////////////// + +// Default behavior: non-TemporalFailure errors cause Workflow Task failure (not execution failure) +test('failureExceptionTypes - default causes WFT failure', async (t) => { + const { createWorker, startWorkflow } = helpers(t); + const worker = await createWorker(); + await worker.runUntil(async () => { + const handle = await startWorkflow(throwCustomError); + await assertWftFailureAndTerminate(handle, { messageContains: 'custom error' }); + t.pass(); + }); +}); + +// WorkerOptions path: workflowFailureErrorTypes causes execution failure (exact class name match) +test('failureExceptionTypes - WorkerOptions causes WF execution failure', async (t) => { + const { createWorker, executeWorkflow } = helpers(t); + const worker = await createWorker({ + workflowFailureErrorTypes: { '*': ['CustomWorkflowError'] }, + }); + await worker.runUntil(async () => { + const err = (await t.throwsAsync(executeWorkflow(throwCustomError), { + instanceOf: WorkflowFailedError, + })) as WorkflowFailedError; + t.true(err.cause instanceof ApplicationFailure); + t.is(err.cause?.message, 'custom error'); + t.is((err.cause as ApplicationFailure).type, 'CustomWorkflowError'); + }); +}); + +// WorkerOptions path: parent class name matches subclass when using string-based +// names (lookup walks the prototype chain by `constructor.name`) +test('failureExceptionTypes - WorkerOptions parent class name matches subclass via prototype chain', async (t) => { + const { createWorker, executeWorkflow } = helpers(t); + const worker = await createWorker({ + workflowFailureErrorTypes: { '*': ['CustomWorkflowError'] }, + }); + await worker.runUntil(async () => { + const err = (await t.throwsAsync(executeWorkflow(throwCustomSubError), { + instanceOf: WorkflowFailedError, + })) as WorkflowFailedError; + t.true(err.cause instanceof ApplicationFailure); + t.is(err.cause?.message, 'custom sub error'); + }); +}); + +// WorkerOptions path: 'Error' base class matches any Error subclass +test('failureExceptionTypes - WorkerOptions Error base class matches any error', async (t) => { + const { createWorker, executeWorkflow } = helpers(t); + const worker = await createWorker({ + workflowFailureErrorTypes: { '*': ['Error'] }, + }); + await worker.runUntil(async () => { + const err = (await t.throwsAsync(executeWorkflow(throwPlainError), { + instanceOf: WorkflowFailedError, + })) as WorkflowFailedError; + t.true(err.cause instanceof ApplicationFailure); + t.is(err.cause?.message, 'plain error'); + }); +}); + +// WorkerOptions path: 'NondeterminismError' alias does NOT match an unrelated error +test('failureExceptionTypes - WorkerOptions NondeterminismError alias does not match unrelated error', async (t) => { + const { createWorker, startWorkflow } = helpers(t); + const worker = await createWorker({ + workflowFailureErrorTypes: { '*': ['NondeterminismError'] }, + }); + await worker.runUntil(async () => { + const handle = await startWorkflow(throwCustomError); + await assertWftFailureAndTerminate(handle, { messageContains: 'custom error' }); + t.pass(); + }); +}); + +// WorkflowDefinitionOptions path: failureExceptionTypes causes execution failure +test('failureExceptionTypes - WorkflowDefinitionOptions causes WF execution failure', async (t) => { + const { createWorker, executeWorkflow } = helpers(t); + const worker = await createWorker(); + await worker.runUntil(async () => { + const err = (await t.throwsAsync(executeWorkflow(throwCustomErrorWithDefinitionOptions), { + instanceOf: WorkflowFailedError, + })) as WorkflowFailedError; + t.true(err.cause instanceof ApplicationFailure); + t.is(err.cause?.message, 'custom error from definition options'); + t.is((err.cause as ApplicationFailure).type, 'CustomWorkflowError'); + }); +}); + +// WorkflowDefinitionOptions path: instanceof check matches subclasses +test('failureExceptionTypes - WorkflowDefinitionOptions instanceof matches subclass', async (t) => { + const { createWorker, executeWorkflow } = helpers(t); + const worker = await createWorker(); + await worker.runUntil(async () => { + const err = (await t.throwsAsync(executeWorkflow(throwSubErrorWithParentInDefinitionOptions), { + instanceOf: WorkflowFailedError, + })) as WorkflowFailedError; + t.true(err.cause instanceof ApplicationFailure); + t.is(err.cause?.message, 'sub error with parent in definition options'); + t.is((err.cause as ApplicationFailure).type, 'CustomWorkflowSubError'); + }); +}); + +//////////////////////////////////////////////////////////////////////////////// +// Non-determinism handling +//////////////////////////////////////////////////////////////////////////////// + +// Default behavior: a non-determinism error causes WFT failure (workflow keeps retrying) +test('non-determinism - default causes WFT failure', async (t) => { + const { createWorker, startWorkflow } = helpers(t); + const worker = await createWorker({ maxCachedWorkflows: 0 }); + await worker.runUntil(async () => { + const handle = await startWorkflow(nondeterministicWorkflow); + await assertWftFailureAndTerminate(handle, { messageContains: 'Nondeterminism' }); + t.pass(); + }); +}); + +// WorkerOptions path: 'NondeterminismError' alias causes the workflow execution +// to fail when a non-determinism error is detected. +test('non-determinism - WorkerOptions NondeterminismError causes WF execution failure', async (t) => { + const { createWorker, executeWorkflow } = helpers(t); + const worker = await createWorker({ + maxCachedWorkflows: 0, + workflowFailureErrorTypes: { '*': ['NondeterminismError'] }, + }); + await worker.runUntil(async () => { + const err = (await t.throwsAsync(executeWorkflow(nondeterministicWorkflow), { + instanceOf: WorkflowFailedError, + })) as WorkflowFailedError; + t.true(err.cause instanceof ApplicationFailure); + t.regex(err.cause?.message ?? '', /[Nn]ondeterminism/); + }); +}); diff --git a/packages/worker/src/worker-options.ts b/packages/worker/src/worker-options.ts index adc14aec8..6985ddb70 100644 --- a/packages/worker/src/worker-options.ts +++ b/packages/worker/src/worker-options.ts @@ -530,6 +530,37 @@ export interface WorkerOptions { */ sinks?: InjectedSinks; + /** + * The types of errors that, if thrown by a Workflow function, a signal handler, or an update + * handler, will cause the Workflow Execution or the Update to fail instead of failing the + * Workflow Task (which would result in retrying the Workflow Task until it eventually succeeds). + * + * This property expects a record of Workflow-type names to the list of error types that will + * cause that type of Workflow to fail. Uses the `'*'` key to specify a list of error types that + * applies to all Workflow types. This is a worker-level equivalent of + * {@link WorkflowDefinitionOptions.failureExceptionTypes}. Both settings are evaluated; an error + * matching either will cause Workflow failure. + * + * Unlike {@link WorkflowDefinitionOptions.failureExceptionTypes}, this setting requires error + * types to be specified as string names, not actual class references, and consequently, doesn't + * support subclass matching via `instanceof`. It however allows failing the workflow execution + * on _non-determinism errors_, by including the `NondeterminismError` type to the list of error + * types, either globally (via the `'*'` key) or per-Workflow-type. + * + * Passing the `'Error'` error type string here will result in failing the Workflow on any error, + * including non-determinism errors. + + * Note that {@link TemporalFailure} subclasses (with the exception of {@link ApplicationFailure}) + * and cancellation errors that bubbles out of the Workflow always fail the Workflow Execution, + * regardless of either this and the {@link WorkflowDefinitionOptions.failureExceptionTypes} settings. + * + * @experimental + */ + workflowFailureErrorTypes?: Record< + '*' | string, + ('NondeterminismError' | 'DeterminismViolationError' | 'Error' | (string & {}))[] + >; + /** * @deprecated SDK tracing is no longer supported. This option is ignored. */ @@ -1111,6 +1142,25 @@ function nexusServiceHandlersFromOptions(opts: WorkerOptions): Map 0; + + const workflowFailureErrors: native.WorkflowErrorType[] = []; + const workflowTypesToFailureErrors: Record = {}; + + for (const [k, v] of Object.entries(opts.workflowFailureErrorTypes ?? {})) { + const errorTypes: native.WorkflowErrorType[] = []; + + // Core only cares about Non-Determinism Error; other error types are handled by lang side + if (v.includes('NondeterminismError') || v.includes('DeterminismViolationError') || v.includes('Error')) { + errorTypes.push({ type: 'nondeterminism' }); + } + + if (k === '*') { + workflowFailureErrors.push(...errorTypes); + } else { + workflowTypesToFailureErrors[k] = errorTypes; + } + } + return { identity: opts.identity, buildId: opts.buildId, // eslint-disable-line @typescript-eslint/no-deprecated @@ -1137,6 +1187,8 @@ export function toNativeWorkerOptions(opts: CompiledWorkerOptionsWithBuildId): n maxActivitiesPerSecond: opts.maxActivitiesPerSecond ?? null, shutdownGraceTime: msToNumber(opts.shutdownGraceTime), plugins: opts.plugins?.map((p) => p.name) ?? [], + workflowFailureErrors, + workflowTypesToFailureErrors, }; } diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index 5ba752f94..503e581ca 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -1572,6 +1572,7 @@ export class Worker { randomnessSeed: randomnessSeed.toBytes(), now: tsToMs(activation.timestamp), showStackTraceSources: this.options.showStackTraceSources, + failureExceptionTypeNames: resolveFailureExceptionTypeNames(workflowType, this.options.workflowFailureErrorTypes), }); this.numCachedWorkflowsSubject.next(this.numCachedWorkflowsSubject.value + 1); @@ -2257,3 +2258,23 @@ async function timeoutPromise(promise: Promise, timeout: number): Promise< async function setTimeoutUnref(timeout: number): Promise { return new Promise((resolve) => setTimeoutCallback(resolve, timeout).unref()); } + +/** + * Resolves the list of failure exception type names applicable to a given workflow type, + * combining the wildcard `'*'` entry and any entry matching the specific workflow type. + * + * `NondeterminismError` is passed through as-is (the lang-side matching treats it as an alias + * for `DeterminismViolationError`). All other type names are passed through unchanged. + */ +function resolveFailureExceptionTypeNames( + workflowType: string, + workflowFailureErrorTypes: Record | undefined +): string[] | undefined { + if (!workflowFailureErrorTypes) return undefined; + + const typeNames = new Set(); + for (const name of workflowFailureErrorTypes['*'] ?? []) typeNames.add(name); + for (const name of workflowFailureErrorTypes[workflowType] ?? []) typeNames.add(name); + + return typeNames.size > 0 ? [...typeNames] : undefined; +} diff --git a/packages/workflow/src/interfaces.ts b/packages/workflow/src/interfaces.ts index a77984ae0..65bcf45f1 100644 --- a/packages/workflow/src/interfaces.ts +++ b/packages/workflow/src/interfaces.ts @@ -646,6 +646,7 @@ export interface WorkflowCreateOptions { randomnessSeed: number[]; now: number; showStackTraceSources: boolean; + failureExceptionTypeNames?: string[]; } export interface WorkflowCreateOptionsInternal extends WorkflowCreateOptions { diff --git a/packages/workflow/src/internals.ts b/packages/workflow/src/internals.ts index 02f3bcb80..4644b40ce 100644 --- a/packages/workflow/src/internals.ts +++ b/packages/workflow/src/internals.ts @@ -268,6 +268,22 @@ export class Activator implements ActivationHandler { */ public workflowTaskError: unknown; + /** + * Error type _names_ (from {@link WorkerOptions.workflowFailureErrorTypes}) that + * should cause Workflow Execution failure rather than WFT failure. + * + * Set at workflow creation time from the worker options. + */ + public failureExceptionTypeNames: string[] = []; + + /** + * Error _types_ (from {@link WorkflowDefinitionOptions.failureExceptionTypes}) + * that should cause Workflow Execution failure rather than WFT failure. + * + * Set in `worker-interface.ts` after the workflow definition options are read. + */ + public workflowDefinitionFailureExceptionTypes: Array Error> | undefined = undefined; + /** * Set to true when running synchronous code (e.g. while processing activation jobs and when calling * `tryUnblockConditions()`). While this flag is set, it is safe to let errors bubble up. @@ -472,6 +488,7 @@ export class Activator implements ActivationHandler { randomnessSeed, registeredActivityNames, stackTracesEnabled, + failureExceptionTypeNames, }: WorkflowCreateOptionsInternal) { this.getTimeOfDay = getTimeOfDay; this.info = info; @@ -481,6 +498,7 @@ export class Activator implements ActivationHandler { this.random = alea(randomnessSeed); this.registeredActivityNames = registeredActivityNames; this.stackTracesEnabled = stackTracesEnabled; + this.failureExceptionTypeNames = failureExceptionTypeNames ?? []; } /** @@ -582,8 +600,10 @@ export class Activator implements ActivationHandler { ? this.failureConverter.failureToError(continuedFailure, this.payloadConverter) : undefined, })); - if (this.workflowDefinitionOptionsGetter) { - this.versioningBehavior = this.workflowDefinitionOptionsGetter().versioningBehavior; + const workflowDefinitionOpts = this.workflowDefinitionOptionsGetter?.(); + if (workflowDefinitionOpts) { + this.versioningBehavior = workflowDefinitionOpts.versioningBehavior; + this.workflowDefinitionFailureExceptionTypes = workflowDefinitionOpts.failureExceptionTypes; } } @@ -1171,7 +1191,7 @@ export class Activator implements ActivationHandler { this.pushCommand({ cancelWorkflowExecution: {} }, true); } else if (error instanceof ContinueAsNew) { this.pushCommand({ continueAsNewWorkflowExecution: error.command }, true); - } else if (error instanceof TemporalFailure) { + } else if (error instanceof TemporalFailure || this.isConfiguredFailureException(error)) { // Fail the workflow. We do not want to issue unfinishedHandlers warnings. To achieve that, we // mark all handlers as completed now. this.inProgressSignals.clear(); @@ -1179,7 +1199,7 @@ export class Activator implements ActivationHandler { this.pushCommand( { failWorkflowExecution: { - failure: this.errorToFailure(error), + failure: this.errorToFailure(ensureTemporalFailure(error)), }, }, true @@ -1189,6 +1209,42 @@ export class Activator implements ActivationHandler { } } + /** + * Returns true if the given error matches any of the configured failure exception types + * (from {@link WorkerOptions.workflowFailureErrorTypes} or + * {@link WorkflowDefinitionOptions.failureExceptionTypes}). + */ + private isConfiguredFailureException(error: unknown): boolean { + // Check class references from WorkflowDefinitionOptions (instanceof-based, supports subclasses) + if (this.workflowDefinitionFailureExceptionTypes) { + // We guarantee that including Error in the list will catch _any_ error. + if (this.workflowDefinitionFailureExceptionTypes.includes(Error)) return true; + + for (const errorType of this.workflowDefinitionFailureExceptionTypes) { + if (error instanceof errorType) return true; + } + } + + // Check class name strings from WorkerOptions (prototype-chain-based) + if (this.failureExceptionTypeNames.length > 0) { + // We guarantee that including 'Error' in the list will catch _any_ error. + if (this.failureExceptionTypeNames.includes('Error')) return true; + + if (typeof error === 'object' && error !== null) { + let ctor = (error as any).constructor; + while (ctor != null && ctor !== Function.prototype) { + const name = (ctor as any).name as string | undefined; + if (name) { + if (this.failureExceptionTypeNames.includes(name)) return true; + } + ctor = Object.getPrototypeOf(ctor); + } + } + } + + return false; + } + recordWorkflowTaskError(error: unknown): void { // Only keep the first error that bubbles up; subsequent errors will be ignored. if (this.workflowTaskError === undefined) this.workflowTaskError = error; diff --git a/packages/workflow/src/worker-interface.ts b/packages/workflow/src/worker-interface.ts index df59f0d00..99e515ef9 100644 --- a/packages/workflow/src/worker-interface.ts +++ b/packages/workflow/src/worker-interface.ts @@ -91,6 +91,8 @@ export function initRuntime(options: WorkflowCreateOptionsInternal): void { if (isWorkflowFunctionWithOptions(activator.workflow)) { if (typeof activator.workflow.workflowDefinitionOptions === 'object') { activator.versioningBehavior = activator.workflow.workflowDefinitionOptions.versioningBehavior; + activator.workflowDefinitionFailureExceptionTypes = + activator.workflow.workflowDefinitionOptions.failureExceptionTypes; } else { activator.workflowDefinitionOptionsGetter = activator.workflow.workflowDefinitionOptions; } diff --git a/packages/workflow/src/workflow.ts b/packages/workflow/src/workflow.ts index f762a0f15..be087eba1 100644 --- a/packages/workflow/src/workflow.ts +++ b/packages/workflow/src/workflow.ts @@ -1728,7 +1728,10 @@ export function allHandlersFinished(): boolean { * @example * For example: * ```ts - * setWorkflowOptions({ versioningBehavior: 'PINNED' }, myWorkflow); + * setWorkflowOptions({ + * versioningBehavior: 'PINNED', + * failureExceptionTypes: [CustomWorkflowError] + * }, myWorkflow); * export async function myWorkflow(): Promise { * // Workflow code here * return "hi"; @@ -1742,7 +1745,10 @@ export function allHandlersFinished(): boolean { * // Workflow code here * return "hi"; * } - * setWorkflowOptions({ versioningBehavior: 'PINNED' }, module.exports.default); + * setWorkflowOptions({ + * versioningBehavior: 'PINNED', + * failureExceptionTypes: [CustomWorkflowError] + * }, module.exports.default); * ``` * * @param options Options for the workflow defintion, or a function that returns options. If a