Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions rivetkit-typescript/packages/rivetkit/src/actor/config.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { z } from "zod/v4";

Check failure on line 1 in rivetkit-typescript/packages/rivetkit/src/actor/config.ts

View workflow job for this annotation

GitHub Actions / RivetKit / Quality Check

format

Formatter would have printed the following content:
import type { UniversalWebSocket } from "@/common/websocket-interface";
import type {
AnyDatabaseProvider,
Expand Down Expand Up @@ -26,6 +26,7 @@
export const ACTOR_CONTEXT_INTERNAL_SYMBOL = Symbol(
"rivetkit.actor_context_internal",
);
export const RAW_STATE_SYMBOL = Symbol("rivetkit.raw_state");
export const CONN_DRIVER_SYMBOL = Symbol("rivetkit.conn_driver");
export const CONN_STATE_MANAGER_SYMBOL = Symbol("rivetkit.conn_state_manager");

Expand Down Expand Up @@ -310,6 +311,8 @@
TQueues extends QueueSchemaConfig = Record<never, never>,
> {
[ACTOR_CONTEXT_INTERNAL_SYMBOL]?: unknown;
/** Returns the raw unwrapped state without the write-through proxy. */
[RAW_STATE_SYMBOL](): TState;
state: TState;
vars: TVars;
readonly kv: ActorKv;
Expand Down
12 changes: 12 additions & 0 deletions rivetkit-typescript/packages/rivetkit/src/registry/native.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { VirtualWebSocket } from "@rivetkit/virtual-websocket";
import {
ACTOR_CONTEXT_INTERNAL_SYMBOL,
CONN_STATE_MANAGER_SYMBOL,
RAW_STATE_SYMBOL,
getRunFunction,
getRunInspectorConfig,
type WorkflowInspectorConfig,
Expand Down Expand Up @@ -1162,6 +1163,10 @@ class NativeConnAdapter {
);
}

[RAW_STATE_SYMBOL](): unknown {
return this.#readState();
}

get state(): unknown {
const nextState = this.#readState();
return createWriteThroughProxy(nextState, (nextValue) => {
Expand Down Expand Up @@ -2455,6 +2460,13 @@ export class ActorContextHandleAdapter {
throw databaseClientNotReadyError();
}

[RAW_STATE_SYMBOL](): unknown {
if (!this.#stateEnabled) {
throw stateNotEnabledError();
}
return this.#readState();
}

get state(): unknown {
if (!this.#stateEnabled) {
throw stateNotEnabledError();
Expand Down
24 changes: 19 additions & 5 deletions rivetkit-typescript/packages/rivetkit/src/workflow/context.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// @ts-nocheck
import type { RunContext } from "@/actor/config";
import { RAW_STATE_SYMBOL, type RunContext } from "@/actor/config";
import type {
QueueFilterName,
QueueNextBatchOptions,
Expand Down Expand Up @@ -247,14 +247,14 @@ export class ActorWorkflowContext<
}
return await this.#wrapActive(() =>
this.#inner.step(nameOrConfig, () =>
this.#withActorAccess(run),
this.#withActorAccessAndStateRollback(run),
),
);
}
const stepConfig = nameOrConfig as StepConfig<T>;
const config: StepConfig<T> = {
...stepConfig,
run: () => this.#withActorAccess(stepConfig.run),
run: () => this.#withActorAccessAndStateRollback(stepConfig.run),
};
return await this.#wrapActive(() => this.#inner.step(config));
}
Expand All @@ -271,14 +271,14 @@ export class ActorWorkflowContext<
}
return await this.#wrapActive(() =>
this.#inner.tryStep(nameOrConfig, () =>
this.#withActorAccess(run),
this.#withActorAccessAndStateRollback(run),
),
);
}
const stepConfig = nameOrConfig as TryStepConfig<T>;
const config: TryStepConfig<T> = {
...stepConfig,
run: () => this.#withActorAccess(stepConfig.run),
run: () => this.#withActorAccessAndStateRollback(stepConfig.run),
};
return await this.#wrapActive(() => this.#inner.tryStep(config));
}
Expand Down Expand Up @@ -612,6 +612,20 @@ export class ActorWorkflowContext<
}
}

async #withActorAccessAndStateRollback<T>(
run: () => Promise<T>,
): Promise<T> {
const stateSnapshot = structuredClone(this.#runCtx[RAW_STATE_SYMBOL]());
const varsSnapshot = structuredClone(this.#runCtx.vars);
try {
return await this.#withActorAccess(run);
} catch (error) {
this.#runCtx.state = stateSnapshot;
this.#runCtx.vars = varsSnapshot;
throw error;
}
}

#ensureActorAccess(feature: string): void {
if (!this.#allowActorAccess) {
this.#guardViolation = true;
Expand Down
25 changes: 6 additions & 19 deletions rivetkit-typescript/packages/workflow-engine/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -825,10 +825,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface {
const maxRetries = config.maxRetries ?? DEFAULT_MAX_RETRIES;

if (metadata.attempts > maxRetries) {
// Prefer step history error, but fall back to metadata since
// driver implementations may persist metadata without the history
// entry error (e.g. partial writes/crashes between attempts).
const lastError = stepData.error ?? metadata.error;
const lastError = metadata.error;
const exhaustedError = new StepExhaustedError(
config.name,
lastError,
Expand Down Expand Up @@ -941,15 +938,15 @@ export class WorkflowContextImpl implements WorkflowContextInterface {
});
return output;
} catch (error) {
if (entry.kind.type === "step") {
entry.kind.data.error = String(error);
}
entry.dirty = true;

// Timeout errors are treated as critical (no retry)
if (error instanceof StepTimeoutError) {
if (entry.kind.type === "step") {
entry.kind.data.error = String(error);
}
entry.dirty = true;
metadata.status = "exhausted";
metadata.error = String(error);
await this.flushStorage();
await this.notifyStepError(config, metadata.attempts, error, {
willRetry: false,
});
Expand All @@ -967,13 +964,8 @@ export class WorkflowContextImpl implements WorkflowContextInterface {
error instanceof CriticalError ||
error instanceof RollbackError
) {
if (entry.kind.type === "step") {
entry.kind.data.error = String(error);
}
entry.dirty = true;
metadata.status = "exhausted";
metadata.error = String(error);
await this.flushStorage();
await this.notifyStepError(config, metadata.attempts, error, {
willRetry: false,
});
Expand All @@ -990,15 +982,10 @@ export class WorkflowContextImpl implements WorkflowContextInterface {
);
}

if (entry.kind.type === "step") {
entry.kind.data.error = String(error);
}
entry.dirty = true;
const willRetry = metadata.attempts <= maxRetries;
metadata.status = willRetry ? "failed" : "exhausted";
metadata.error = String(error);

await this.flushStorage();
if (willRetry) {
const retryDelay = calculateBackoff(
metadata.attempts,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,8 +597,8 @@ for (const mode of modes) {
}

// Crash at iteration 3 during first run. State was
// persisted at iteration 2 (deferred) and awaited at
// the start of iteration 3, so state should be saved.
// persisted at the end of iteration 2 after
// Loop.continue, so state should be saved.
if (state.count === 3 && firstRun) {
throw new Error("Crash after state save");
}
Expand Down Expand Up @@ -628,8 +628,8 @@ for (const mode of modes) {
expect(result.state).toBe("completed");
expect(result.output).toBe(5);

// Should resume from saved state at iteration 2, not from 0
expect(iterationsExecuted[0]).toBe(2);
// Should resume from saved state at iteration 3, not from 0
expect(iterationsExecuted[0]).toBe(3);
});

it("should handle loop that breaks before first prune interval", async () => {
Expand Down
100 changes: 37 additions & 63 deletions rivetkit-typescript/packages/workflow-engine/tests/steps.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import {
type WorkflowErrorEvent,
type WorkflowContextInterface,
} from "../src/testing.js";
import { buildHistoryPrefixAll, keyStartsWith } from "../src/keys.js";
import { deserializeEntry, serializeEntry } from "../schemas/serde.js";
import { buildHistoryPrefixAll } from "../src/keys.js";
import { deserializeEntry } from "../schemas/serde.js";

const modes = ["yield", "live"] as const;

Expand All @@ -26,30 +26,6 @@ class CountingDriver extends InMemoryDriver {
}
}

class StripStepHistoryErrorDriver extends InMemoryDriver {
override async batch(
writes: { key: Uint8Array; value: Uint8Array }[],
): Promise<void> {
const historyPrefix = buildHistoryPrefixAll();
const rewritten = writes.map((write) => {
if (!keyStartsWith(write.key, historyPrefix)) {
return write;
}

const entry = deserializeEntry(write.value);
if (entry.kind.type === "step") {
// Simulate a driver/crash scenario where the step error is not persisted
// to the history entry, even though retries/exhaustion metadata is.
entry.kind.data.error = undefined;
return { key: write.key, value: serializeEntry(entry) };
}

return write;
});

return await super.batch(rewritten);
}
}

for (const mode of modes) {
describe(`Workflow Engine Steps (${mode})`, { sequential: true }, () => {
Expand Down Expand Up @@ -485,43 +461,6 @@ for (const mode of modes) {
).rejects.toThrow(StepExhaustedError);
});

it("should surface the last error even if step history is missing the error", async () => {
const driver = new StripStepHistoryErrorDriver();
driver.latency = 0;

const workflow = async (ctx: WorkflowContextInterface) => {
return await ctx.step({
name: "always-fails",
maxRetries: 1,
retryBackoffBase: 0,
retryBackoffMax: 0,
run: async () => {
throw new Error("Always fails");
},
});
};

if (mode === "yield") {
const firstResult = await runWorkflow(
"wf-1",
workflow,
undefined,
driver,
{ mode },
).result;
expect(firstResult.state).toBe("sleeping");
}

await expect(
runWorkflow("wf-1", workflow, undefined, driver, { mode })
.result,
).rejects.toThrow(StepExhaustedError);
await expect(
runWorkflow("wf-1", workflow, undefined, driver, { mode })
.result,
).rejects.toThrow(/Always fails/);
});

it("should recover exhausted retries", async () => {
let attempts = 0;

Expand Down Expand Up @@ -634,5 +573,40 @@ for (const mode of modes) {
expect(result.output).toBe("done");
expect(driver.batchCalls).toBe(2);
});

it("should not commit step error data to entry on failure", async () => {
const workflow = async (ctx: WorkflowContextInterface) => {
return await ctx.step({
name: "fail-once",
maxRetries: 3,
retryBackoffBase: 0,
retryBackoffMax: 0,
run: async () => {
throw new Error("step failed");
},
});
};

// Run once so the step fails and state is flushed
try {
await runWorkflow("wf-1", workflow, undefined, driver, {
mode,
}).result;
} catch {}

// Inspect all history entries in KV
const historyPrefix = buildHistoryPrefixAll();
const entries = await driver.list(historyPrefix);

for (const kv of entries) {
const entry = deserializeEntry(kv.value);
if (entry.kind.type === "step") {
// The step entry should have no output committed on failure.
expect(entry.kind.data.output).toBeUndefined();
// The error should be committed for inspection.
expect(entry.kind.data.error).toBe("Error: step failed");
}
}
});
});
}
Loading