Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions packages/common/src/workflow-definition-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,29 @@ import type { VersioningBehavior } from './worker-deployments';
*/
export interface WorkflowDefinitionOptions {
versioningBehavior?: VersioningBehavior;

/**
* The types of errors that, if thrown by the Workflow function, a signal handler, or an update
* handler, will cause the Workflow Execution or the Update to fail instead of failing the
* Workflow Task (which would result in retrying the Workflow Task until it eventually succeeds).
*
* This is a per-Workflow-type equivalent of {@link WorkerOptions.workflowFailureErrorTypes}.
* Both settings are evaluated; an error matching either will cause Workflow failure.
* Unlike the string-based {@link WorkerOptions.workflowFailureErrorTypes}, this accepts
* actual class references, enabling subclass matching via `instanceof`, but doesn't allow
* failing the workflow execution on _non-determinism errors_. Failing the workflow execution on
* non-determinism errors can only be set via {@link WorkerOptions.workflowFailureErrorTypes}.
*
* Passing the `Error` class to this setting will fail the Workflow on any error, except
* non-determinism errors.
*
* Note that {@link TemporalFailure} subclasses and cancellation errors that bubbles out
* of the Workflow always fail the Workflow Execution, regardless of either this and the
* {@link WorkerOptions.workflowFailureErrorTypes} settings.
*
* @experimental
*/
failureExceptionTypes?: Array<new (...args: any[]) => Error>;
}

type AsyncFunction<Args extends any[], ReturnType> = (...args: Args) => Promise<ReturnType>;
Expand Down
24 changes: 23 additions & 1 deletion packages/core-bridge/src/helpers/try_from_js.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use std::{collections::HashMap, net::SocketAddr, time::Duration};
use std::{
collections::{HashMap, HashSet},
net::SocketAddr,
time::Duration,
};

use neon::{
handle::Handle,
Expand Down Expand Up @@ -163,6 +167,24 @@ impl<T: TryFromJs> TryFromJs for Vec<T> {
}
}

#[allow(clippy::implicit_hasher)]
impl<T: TryFromJs + std::hash::Hash + Eq> TryFromJs for HashSet<T> {
fn try_from_js<'cx, 'b>(
cx: &mut impl Context<'cx>,
js_value: Handle<'b, JsValue>,
) -> BridgeResult<Self> {
let array = js_value.downcast::<JsArray, _>(cx)?;
let len = array.len(cx);
let mut result = Self::with_capacity(len as usize);

for i in 0..len {
let value = array.get_value(cx, i)?;
result.insert(T::try_from_js(cx, value)?);
}
Ok(result)
}
}

#[allow(clippy::implicit_hasher)]
impl<T: TryFromJs> TryFromJs for HashMap<String, T> {
fn try_from_js<'cx, 'b>(
Expand Down
38 changes: 36 additions & 2 deletions packages/core-bridge/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ impl MutableFinalize for HistoryForReplayTunnelHandle {}
////////////////////////////////////////////////////////////////////////////////////////////////////

mod config {
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::{sync::Arc, time::Duration};
use temporalio_common::protos::temporal::api::enums::v1::VersioningBehavior as CoreVersioningBehavior;
use temporalio_common::protos::temporal::api::worker::v1::PluginInfo;
Expand All @@ -499,7 +499,8 @@ mod config {
ActivitySlotKind, LocalActivitySlotKind, NexusSlotKind,
PollerBehavior as CorePollerBehavior, ResourceBasedSlotsOptions, ResourceSlotOptions,
SlotKind, SlotSupplierOptions as CoreSlotSupplierOptions, TunerHolder, TunerHolderOptions,
WorkerConfig, WorkerVersioningStrategy, WorkflowSlotKind,
WorkerConfig, WorkerVersioningStrategy, WorkflowErrorType as CoreWorkflowErrorType,
WorkflowSlotKind,
};

use super::custom_slot_supplier::CustomSlotSupplierOptions;
Expand Down Expand Up @@ -532,6 +533,8 @@ mod config {
max_task_queue_activities_per_second: Option<f64>,
shutdown_grace_time: Option<Duration>,
plugins: Vec<String>,
workflow_failure_errors: HashSet<WorkflowErrorType>,
workflow_types_to_failure_errors: HashMap<String, HashSet<WorkflowErrorType>>,
}

#[derive(TryFromJs)]
Expand Down Expand Up @@ -629,6 +632,10 @@ mod config {
})
.collect::<HashSet<_>>(),
)
.workflow_failure_errors(into_core_workflow_error_set(self.workflow_failure_errors))
.workflow_types_to_failure_errors(into_core_workflow_error_map_of_sets(
self.workflow_types_to_failure_errors,
))
.build()
.map_err(|err| BridgeError::TypeError {
message: format!("Failed to convert WorkerOptions to CoreWorkerConfig: {err}"),
Expand Down Expand Up @@ -704,6 +711,33 @@ mod config {
}
}

#[derive(TryFromJs, Hash, Eq, PartialEq)]
pub enum WorkflowErrorType {
Nondeterminism,
}

impl From<WorkflowErrorType> for CoreWorkflowErrorType {
fn from(val: WorkflowErrorType) -> Self {
match val {
WorkflowErrorType::Nondeterminism => Self::Nondeterminism,
}
}
}

fn into_core_workflow_error_set(
val: HashSet<WorkflowErrorType>,
) -> HashSet<CoreWorkflowErrorType> {
val.into_iter().map(Into::into).collect()
}

fn into_core_workflow_error_map_of_sets(
val: HashMap<String, HashSet<WorkflowErrorType>>,
) -> HashMap<String, HashSet<CoreWorkflowErrorType>> {
val.into_iter()
.map(|(k, v)| (k, into_core_workflow_error_set(v)))
.collect()
}

#[derive(TryFromJs)]
#[allow(clippy::struct_field_names)]
pub(super) struct WorkerTuner {
Expand Down
4 changes: 4 additions & 0 deletions packages/core-bridge/ts/native.ts
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ export interface WorkerOptions {
maxActivitiesPerSecond: Option<number>;
shutdownGraceTime: number;
plugins: string[];
workflowFailureErrors: WorkflowErrorType[];
workflowTypesToFailureErrors: Record<string, WorkflowErrorType[]>;
}

export type PollerBehavior =
Expand Down Expand Up @@ -267,6 +269,8 @@ export type WorkerDeploymentVersion = {

export type VersioningBehavior = { type: 'pinned' } | { type: 'auto-upgrade' };

export type WorkflowErrorType = { type: 'nondeterminism' };

////////////////////////////////////////////////////////////////////////////////////////////////////
// Worker Tuner
////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down
2 changes: 2 additions & 0 deletions packages/test/src/test-bridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,8 @@ const GenericConfigs = {
maxActivitiesPerSecond: null,
shutdownGraceTime: 1000,
plugins: [],
workflowFailureErrors: [],
workflowTypesToFailureErrors: {},
} satisfies native.WorkerOptions,
},
ephemeralServer: {
Expand Down
Loading
Loading