diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/config.ts b/rivetkit-typescript/packages/rivetkit/src/actor/config.ts index f82717ec6f..b2e2a88aec 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/config.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/config.ts @@ -26,6 +26,7 @@ export const DEFAULT_SLEEP_GRACE_PERIOD = 15_000; export const ACTOR_CONTEXT_INTERNAL_SYMBOL = Symbol( "rivetkit.actor_context_internal", ); +export const RAW_STATE_SYMBOL = Symbol("rivetkit.raw_state"); export const CONN_DRIVER_SYMBOL = Symbol("rivetkit.conn_driver"); export const CONN_STATE_MANAGER_SYMBOL = Symbol("rivetkit.conn_state_manager"); @@ -310,6 +311,8 @@ export interface ActorContext< TQueues extends QueueSchemaConfig = Record, > { [ACTOR_CONTEXT_INTERNAL_SYMBOL]?: unknown; + /** Returns the raw unwrapped state without the write-through proxy. */ + [RAW_STATE_SYMBOL](): TState; state: TState; vars: TVars; readonly kv: ActorKv; diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/native.ts b/rivetkit-typescript/packages/rivetkit/src/registry/native.ts index 49a2044756..3059e3f37d 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/native.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/native.ts @@ -2,6 +2,7 @@ import { VirtualWebSocket } from "@rivetkit/virtual-websocket"; import { ACTOR_CONTEXT_INTERNAL_SYMBOL, CONN_STATE_MANAGER_SYMBOL, + RAW_STATE_SYMBOL, getRunFunction, getRunInspectorConfig, type WorkflowInspectorConfig, @@ -1162,6 +1163,10 @@ class NativeConnAdapter { ); } + [RAW_STATE_SYMBOL](): unknown { + return this.#readState(); + } + get state(): unknown { const nextState = this.#readState(); return createWriteThroughProxy(nextState, (nextValue) => { @@ -2455,6 +2460,13 @@ export class ActorContextHandleAdapter { throw databaseClientNotReadyError(); } + [RAW_STATE_SYMBOL](): unknown { + if (!this.#stateEnabled) { + throw stateNotEnabledError(); + } + return this.#readState(); + } + get state(): unknown { if (!this.#stateEnabled) { throw stateNotEnabledError(); diff --git a/rivetkit-typescript/packages/rivetkit/src/workflow/context.ts b/rivetkit-typescript/packages/rivetkit/src/workflow/context.ts index adac7fa055..5e3af11bbb 100644 --- a/rivetkit-typescript/packages/rivetkit/src/workflow/context.ts +++ b/rivetkit-typescript/packages/rivetkit/src/workflow/context.ts @@ -1,5 +1,5 @@ // @ts-nocheck -import type { RunContext } from "@/actor/config"; +import { RAW_STATE_SYMBOL, type RunContext } from "@/actor/config"; import type { QueueFilterName, QueueNextBatchOptions, @@ -247,14 +247,14 @@ export class ActorWorkflowContext< } return await this.#wrapActive(() => this.#inner.step(nameOrConfig, () => - this.#withActorAccess(run), + this.#withActorAccessAndStateRollback(run), ), ); } const stepConfig = nameOrConfig as StepConfig; const config: StepConfig = { ...stepConfig, - run: () => this.#withActorAccess(stepConfig.run), + run: () => this.#withActorAccessAndStateRollback(stepConfig.run), }; return await this.#wrapActive(() => this.#inner.step(config)); } @@ -271,14 +271,14 @@ export class ActorWorkflowContext< } return await this.#wrapActive(() => this.#inner.tryStep(nameOrConfig, () => - this.#withActorAccess(run), + this.#withActorAccessAndStateRollback(run), ), ); } const stepConfig = nameOrConfig as TryStepConfig; const config: TryStepConfig = { ...stepConfig, - run: () => this.#withActorAccess(stepConfig.run), + run: () => this.#withActorAccessAndStateRollback(stepConfig.run), }; return await this.#wrapActive(() => this.#inner.tryStep(config)); } @@ -612,6 +612,20 @@ export class ActorWorkflowContext< } } + async #withActorAccessAndStateRollback( + run: () => Promise, + ): Promise { + const stateSnapshot = structuredClone(this.#runCtx[RAW_STATE_SYMBOL]()); + const varsSnapshot = structuredClone(this.#runCtx.vars); + try { + return await this.#withActorAccess(run); + } catch (error) { + this.#runCtx.state = stateSnapshot; + this.#runCtx.vars = varsSnapshot; + throw error; + } + } + #ensureActorAccess(feature: string): void { if (!this.#allowActorAccess) { this.#guardViolation = true; diff --git a/rivetkit-typescript/packages/workflow-engine/src/context.ts b/rivetkit-typescript/packages/workflow-engine/src/context.ts index af193fbdfe..d23f820bc7 100644 --- a/rivetkit-typescript/packages/workflow-engine/src/context.ts +++ b/rivetkit-typescript/packages/workflow-engine/src/context.ts @@ -825,10 +825,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface { const maxRetries = config.maxRetries ?? DEFAULT_MAX_RETRIES; if (metadata.attempts > maxRetries) { - // Prefer step history error, but fall back to metadata since - // driver implementations may persist metadata without the history - // entry error (e.g. partial writes/crashes between attempts). - const lastError = stepData.error ?? metadata.error; + const lastError = metadata.error; const exhaustedError = new StepExhaustedError( config.name, lastError, @@ -941,15 +938,15 @@ export class WorkflowContextImpl implements WorkflowContextInterface { }); return output; } catch (error) { + if (entry.kind.type === "step") { + entry.kind.data.error = String(error); + } + entry.dirty = true; + // Timeout errors are treated as critical (no retry) if (error instanceof StepTimeoutError) { - if (entry.kind.type === "step") { - entry.kind.data.error = String(error); - } - entry.dirty = true; metadata.status = "exhausted"; metadata.error = String(error); - await this.flushStorage(); await this.notifyStepError(config, metadata.attempts, error, { willRetry: false, }); @@ -967,13 +964,8 @@ export class WorkflowContextImpl implements WorkflowContextInterface { error instanceof CriticalError || error instanceof RollbackError ) { - if (entry.kind.type === "step") { - entry.kind.data.error = String(error); - } - entry.dirty = true; metadata.status = "exhausted"; metadata.error = String(error); - await this.flushStorage(); await this.notifyStepError(config, metadata.attempts, error, { willRetry: false, }); @@ -990,15 +982,10 @@ export class WorkflowContextImpl implements WorkflowContextInterface { ); } - if (entry.kind.type === "step") { - entry.kind.data.error = String(error); - } - entry.dirty = true; const willRetry = metadata.attempts <= maxRetries; metadata.status = willRetry ? "failed" : "exhausted"; metadata.error = String(error); - await this.flushStorage(); if (willRetry) { const retryDelay = calculateBackoff( metadata.attempts, diff --git a/rivetkit-typescript/packages/workflow-engine/tests/loops.test.ts b/rivetkit-typescript/packages/workflow-engine/tests/loops.test.ts index 60817bb007..2a18dbebf2 100644 --- a/rivetkit-typescript/packages/workflow-engine/tests/loops.test.ts +++ b/rivetkit-typescript/packages/workflow-engine/tests/loops.test.ts @@ -597,8 +597,8 @@ for (const mode of modes) { } // Crash at iteration 3 during first run. State was - // persisted at iteration 2 (deferred) and awaited at - // the start of iteration 3, so state should be saved. + // persisted at the end of iteration 2 after + // Loop.continue, so state should be saved. if (state.count === 3 && firstRun) { throw new Error("Crash after state save"); } @@ -628,8 +628,8 @@ for (const mode of modes) { expect(result.state).toBe("completed"); expect(result.output).toBe(5); - // Should resume from saved state at iteration 2, not from 0 - expect(iterationsExecuted[0]).toBe(2); + // Should resume from saved state at iteration 3, not from 0 + expect(iterationsExecuted[0]).toBe(3); }); it("should handle loop that breaks before first prune interval", async () => { diff --git a/rivetkit-typescript/packages/workflow-engine/tests/steps.test.ts b/rivetkit-typescript/packages/workflow-engine/tests/steps.test.ts index 403d0f53a5..0383255ded 100644 --- a/rivetkit-typescript/packages/workflow-engine/tests/steps.test.ts +++ b/rivetkit-typescript/packages/workflow-engine/tests/steps.test.ts @@ -10,8 +10,8 @@ import { type WorkflowErrorEvent, type WorkflowContextInterface, } from "../src/testing.js"; -import { buildHistoryPrefixAll, keyStartsWith } from "../src/keys.js"; -import { deserializeEntry, serializeEntry } from "../schemas/serde.js"; +import { buildHistoryPrefixAll } from "../src/keys.js"; +import { deserializeEntry } from "../schemas/serde.js"; const modes = ["yield", "live"] as const; @@ -26,30 +26,6 @@ class CountingDriver extends InMemoryDriver { } } -class StripStepHistoryErrorDriver extends InMemoryDriver { - override async batch( - writes: { key: Uint8Array; value: Uint8Array }[], - ): Promise { - const historyPrefix = buildHistoryPrefixAll(); - const rewritten = writes.map((write) => { - if (!keyStartsWith(write.key, historyPrefix)) { - return write; - } - - const entry = deserializeEntry(write.value); - if (entry.kind.type === "step") { - // Simulate a driver/crash scenario where the step error is not persisted - // to the history entry, even though retries/exhaustion metadata is. - entry.kind.data.error = undefined; - return { key: write.key, value: serializeEntry(entry) }; - } - - return write; - }); - - return await super.batch(rewritten); - } -} for (const mode of modes) { describe(`Workflow Engine Steps (${mode})`, { sequential: true }, () => { @@ -485,43 +461,6 @@ for (const mode of modes) { ).rejects.toThrow(StepExhaustedError); }); - it("should surface the last error even if step history is missing the error", async () => { - const driver = new StripStepHistoryErrorDriver(); - driver.latency = 0; - - const workflow = async (ctx: WorkflowContextInterface) => { - return await ctx.step({ - name: "always-fails", - maxRetries: 1, - retryBackoffBase: 0, - retryBackoffMax: 0, - run: async () => { - throw new Error("Always fails"); - }, - }); - }; - - if (mode === "yield") { - const firstResult = await runWorkflow( - "wf-1", - workflow, - undefined, - driver, - { mode }, - ).result; - expect(firstResult.state).toBe("sleeping"); - } - - await expect( - runWorkflow("wf-1", workflow, undefined, driver, { mode }) - .result, - ).rejects.toThrow(StepExhaustedError); - await expect( - runWorkflow("wf-1", workflow, undefined, driver, { mode }) - .result, - ).rejects.toThrow(/Always fails/); - }); - it("should recover exhausted retries", async () => { let attempts = 0; @@ -634,5 +573,40 @@ for (const mode of modes) { expect(result.output).toBe("done"); expect(driver.batchCalls).toBe(2); }); + + it("should not commit step error data to entry on failure", async () => { + const workflow = async (ctx: WorkflowContextInterface) => { + return await ctx.step({ + name: "fail-once", + maxRetries: 3, + retryBackoffBase: 0, + retryBackoffMax: 0, + run: async () => { + throw new Error("step failed"); + }, + }); + }; + + // Run once so the step fails and state is flushed + try { + await runWorkflow("wf-1", workflow, undefined, driver, { + mode, + }).result; + } catch {} + + // Inspect all history entries in KV + const historyPrefix = buildHistoryPrefixAll(); + const entries = await driver.list(historyPrefix); + + for (const kv of entries) { + const entry = deserializeEntry(kv.value); + if (entry.kind.type === "step") { + // The step entry should have no output committed on failure. + expect(entry.kind.data.output).toBeUndefined(); + // The error should be committed for inspection. + expect(entry.kind.data.error).toBe("Error: step failed"); + } + } + }); }); }