From 7764046a187dd826afe1b285a7533c3036f1f0a3 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Mon, 4 May 2026 08:35:25 -0700 Subject: [PATCH] fix(rivetkit): align destroy abort signal timing --- .../engine/rivetkit-core-internals.md | 2 +- docs-internal/engine/sleep-sequence.md | 2 +- .../rivetkit-core/src/actor/context.rs | 10 +++--- .../packages/rivetkit-core/src/actor/task.rs | 19 +++++----- .../packages/rivetkit-core/tests/queue.rs | 2 +- .../fixtures/driver-test-suite/destroy.ts | 29 +++++++++++++++ .../driver-test-suite/registry-static.ts | 7 +++- .../packages/rivetkit/src/actor/config.ts | 4 +-- .../rivetkit/src/actor/instance/mod.ts | 35 +++++++------------ .../tests/driver/actor-destroy.test.ts | 24 +++++++++++++ website/src/content/docs/actors/actions.mdx | 2 +- website/src/content/docs/actors/index.mdx | 2 +- website/src/content/docs/actors/lifecycle.mdx | 4 +-- website/src/content/docs/actors/queues.mdx | 2 +- 14 files changed, 96 insertions(+), 48 deletions(-) diff --git a/docs-internal/engine/rivetkit-core-internals.md b/docs-internal/engine/rivetkit-core-internals.md index 02cc8af296..613cecbbb9 100644 --- a/docs-internal/engine/rivetkit-core-internals.md +++ b/docs-internal/engine/rivetkit-core-internals.md @@ -85,7 +85,7 @@ Two-phase: - `SleepGrace` fires `onSleep` immediately and keeps dispatch/save timers live. - `SleepFinalize` gates dispatch, suspends alarms, and runs teardown. -Sleep grace must fire the actor abort signal on entry and wait for the run handler to exit before finalize. Destroy abort firing remains unchanged. +Sleep grace fires the actor abort signal on entry and waits for the run handler to exit before finalize. Finalize: diff --git a/docs-internal/engine/sleep-sequence.md b/docs-internal/engine/sleep-sequence.md index b4503fa08f..9f9d5c7637 100644 --- a/docs-internal/engine/sleep-sequence.md +++ b/docs-internal/engine/sleep-sequence.md @@ -39,7 +39,7 @@ Removing `preventSleep` deleted both predicate branches. Any future sleep-affect - `start_grace(reason)` fires at the start of `SleepGrace` / `DestroyGrace`. It cancels the sleep idle timer, cancels the actor abort signal (`actor_abort_signal`), installs a `SleepGraceState` with the effective grace deadline, and resets the sleep timer to arm the grace tick. - The actor abort signal is a soft signal: "shutdown has started, please wrap up." User code observes it via `c.abortSignal`. It does not force-stop work. -- For destroy, the abort signal may fire earlier than grace entry because `ctx.destroy()` cancels the abort token immediately via `mark_destroy_requested(...)`. +- Destroy requests also use the normal grace path. The actor abort signal fires when destroy grace starts. ## Grace deadline enforcement diff --git a/rivetkit-rust/packages/rivetkit-core/src/actor/context.rs b/rivetkit-rust/packages/rivetkit-core/src/actor/context.rs index ab964f0206..f734ffd33e 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/actor/context.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/actor/context.rs @@ -478,7 +478,6 @@ impl ActorContext { self.flush_on_shutdown(); self.0.destroy_requested.store(true, Ordering::SeqCst); self.0.destroy_completed.store(false, Ordering::SeqCst); - self.0.abort_signal.lock().cancel(); } #[cfg(feature = "wasm-runtime")] @@ -486,11 +485,10 @@ impl ActorContext { self.cancel_sleep_timer(); self.0.destroy_requested.store(true, Ordering::SeqCst); self.0.destroy_completed.store(false, Ordering::SeqCst); - self.0.abort_signal.lock().cancel(); } #[doc(hidden)] - pub fn cancel_abort_signal_for_sleep(&self) { + pub fn cancel_actor_abort_signal(&self) { self.0.abort_signal.lock().cancel(); } @@ -500,9 +498,9 @@ impl ActorContext { return; } - // Sleep cancels the generation abort signal to break queue waits and the - // run loop out of blocking calls. A restarted actor needs a fresh signal - // so the next generation can wait normally. + // Sleep or destroy cancels the generation abort signal to break actor + // scoped waits. A restarted actor needs a fresh signal so the next + // generation can wait normally. let next_signal = CancellationToken::new(); *abort_signal = next_signal.clone(); *self.0.queue_abort_signal.lock() = next_signal; diff --git a/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs b/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs index 27b5ab2a9b..f76abf22b2 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs @@ -1528,9 +1528,7 @@ impl ActorTask { let grace_period = self.factory.config().effective_sleep_grace_period(); self.sleep_deadline = None; self.ctx.cancel_sleep_timer(); - // Entering grace cancels the actor abort signal so user code blocked on - // queues or other actor scoped waits can unwind and let sleep finalize. - self.ctx.cancel_abort_signal_for_sleep(); + self.ctx.cancel_actor_abort_signal(); self.sleep_grace = Some(SleepGraceState { deadline: Instant::now() + grace_period, reason, @@ -1745,15 +1743,18 @@ impl ActorTask { ShutdownKind::Sleep => LifecycleState::SleepFinalize, ShutdownKind::Destroy => LifecycleState::Destroying, }); - self.save_final_state().await?; - self.close_actor_event_channel(); - self.join_aborted_run_handle().await; - Self::finish_shutdown_cleanup_with_ctx(self.ctx.clone(), reason).await?; - if matches!(reason, ShutdownKind::Destroy) { + let result: Result<()> = async { + self.save_final_state().await?; + self.close_actor_event_channel(); + self.join_aborted_run_handle().await; + Self::finish_shutdown_cleanup_with_ctx(self.ctx.clone(), reason).await + } + .await; + if result.is_ok() && matches!(reason, ShutdownKind::Destroy) { self.ctx.mark_destroy_completed(); } self.ctx.record_shutdown_wait(reason, started_at.elapsed()); - Ok(()) + result } async fn save_final_state(&mut self) -> Result<()> { diff --git a/rivetkit-rust/packages/rivetkit-core/tests/queue.rs b/rivetkit-rust/packages/rivetkit-core/tests/queue.rs index 6d98ad9442..f27cef1404 100644 --- a/rivetkit-rust/packages/rivetkit-core/tests/queue.rs +++ b/rivetkit-rust/packages/rivetkit-core/tests/queue.rs @@ -162,7 +162,7 @@ mod moved_tests { }); yield_now().await; - queue.mark_destroy_requested(); + queue.cancel_actor_abort_signal(); let error = wait .await diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/destroy.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/destroy.ts index 718a93aad7..c752b17d6b 100644 --- a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/destroy.ts +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/destroy.ts @@ -87,3 +87,32 @@ export const destroyActor = actor({ }, }, }); + +export const destroyAbortSignalActor = actor({ + state: { + abortEventCount: 0, + }, + onDestroy: async (c) => { + const client = c.client(); + const observer = client.destroyObserver.getOrCreate(["observer"]); + await observer.notifyDestroyed(c.key.join("/")); + }, + actions: { + requestDestroy: (c) => { + const beforeDestroyAborted = c.aborted; + const beforeDestroySignalAborted = c.abortSignal.aborted; + c.abortSignal.addEventListener("abort", () => { + c.state.abortEventCount += 1; + }); + c.destroy(); + + return { + beforeDestroyAborted, + beforeDestroySignalAborted, + afterDestroyAborted: c.aborted, + afterDestroySignalAborted: c.abortSignal.aborted, + abortEventCountAfterDestroy: c.state.abortEventCount, + }; + }, + }, +}); diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry-static.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry-static.ts index 70bc2bc079..6a940f1c42 100644 --- a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry-static.ts +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry-static.ts @@ -42,7 +42,11 @@ import { dbLifecycleFailing, dbLifecycleObserver, } from "./db-lifecycle"; -import { destroyActor, destroyObserver } from "./destroy"; +import { + destroyAbortSignalActor, + destroyActor, + destroyObserver, +} from "./destroy"; import { customTimeoutActor, errorHandlingActor } from "./error-handling"; import { fileSystemHibernationCleanupActor } from "./file-system-hibernation-cleanup"; import { hibernationActor, hibernationSleepWindowActor } from "./hibernation"; @@ -281,6 +285,7 @@ export const registry = setup({ // From destroy.ts destroyActor, destroyObserver, + destroyAbortSignalActor, // From hibernation.ts hibernationActor, hibernationSleepWindowActor, diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/config.ts b/rivetkit-typescript/packages/rivetkit/src/actor/config.ts index 8dddf16c3c..e812375484 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/config.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/config.ts @@ -1227,7 +1227,7 @@ interface BaseActorConfig< * shutdown window (`sleepGracePeriod`) cover deferred work. * * The handler receives an abort signal via `c.abortSignal` and a - * `c.aborted` alias for loop checks. Use these to gracefully exit. + * `c.aborted` alias. Use these to gracefully exit when shutdown starts. * * If this handler exits, the actor will follow the normal idle sleep timeout * once it becomes idle. @@ -1770,7 +1770,7 @@ export const DocActorOptionsSchema = z .number() .optional() .describe( - `Max time in ms for the graceful shutdown window. Covers lifecycle hooks (onSleep, onDestroy), the run handler abort wait, async raw WebSocket handlers, disconnect callbacks, and final state serialization. Default: ${DEFAULT_SLEEP_GRACE_PERIOD}.`, + `Max time in ms for the graceful shutdown window. Covers lifecycle hooks (onSleep, onDestroy), the run handler wait, async raw WebSocket handlers, disconnect callbacks, and final state serialization. Default: ${DEFAULT_SLEEP_GRACE_PERIOD}.`, ), onDestroyTimeout: z .number() diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts index 30b33138e4..1db2f4f646 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts @@ -957,14 +957,7 @@ export class ActorInstance< // Scheduled events are persisted and will be re-initialized // on wake via initializeAlarms(). this.driver.cancelAlarm?.(this.#actorId); - - // Abort listeners in the canonical stop path. - // This must run for all stop modes, including sleep and remote stop. - // Destroy may have already triggered an early abort, but repeating abort - // is intentional and safe. - try { - this.#abortController.abort(); - } catch {} + this.#abortActorSignal(); // The run-handler join, lifecycle hooks, and remaining shutdown // tasks all share the single sleepGracePeriod budget. @@ -1030,11 +1023,8 @@ export class ActorInstance< } this.driver.cancelAlarm?.(this.#actorId); + this.#abortActorSignal(); this.stateManager.clearPendingSaveTimeout(); - - try { - this.#abortController.abort(); - } catch {} } finally { this.#shutdownComplete = true; await this.#cleanupDatabase(); @@ -1087,14 +1077,6 @@ export class ActorInstance< } this.#destroyCalled = true; - // Abort immediately so in flight waits can exit before the driver stop - // handshake completes. - // The onStop path will call abort again as a safety net for all stop - // modes. - try { - this.#abortController.abort(); - } catch {} - const destroy = this.driver.startDestroy.bind( this.driver, this.#actorId, @@ -1108,6 +1090,15 @@ export class ActorInstance< }); } + #abortActorSignal() { + if (this.#abortController.signal.aborted) { + return; + } + try { + this.#abortController.abort(); + } catch {} + } + // MARK: - HTTP Request Tracking beginHonoHttpRequest() { this.#activeHonoHttpRequests++; @@ -2070,7 +2061,7 @@ export class ActorInstance< if (timeoutMs <= 0) { this.#rLog.warn({ - msg: "run handler did not complete in time, it may have leaked - ensure you use c.aborted (or the abort signal c.abortSignal) to exit gracefully", + msg: "run handler did not complete in time, it may have leaked - ensure long-running work settles before shutdown finalizes", timeoutMs, }); return; @@ -2085,7 +2076,7 @@ export class ActorInstance< if (timedOut) { this.#rLog.warn({ - msg: "run handler did not complete in time, it may have leaked - ensure you use c.aborted (or the abort signal c.abortSignal) to exit gracefully", + msg: "run handler did not complete in time, it may have leaked - ensure long-running work settles before shutdown finalizes", timeoutMs, }); } else { diff --git a/rivetkit-typescript/packages/rivetkit/tests/driver/actor-destroy.test.ts b/rivetkit-typescript/packages/rivetkit/tests/driver/actor-destroy.test.ts index 7bb8c72a3b..39b550a06e 100644 --- a/rivetkit-typescript/packages/rivetkit/tests/driver/actor-destroy.test.ts +++ b/rivetkit-typescript/packages/rivetkit/tests/driver/actor-destroy.test.ts @@ -104,6 +104,30 @@ describeDriverMatrix("Actor Destroy", (driverTestConfig) => { expect(newValue).toBe(0); }); + test("ctx.destroy does not synchronously abort actor signal", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + const actorKey = `test-destroy-abort-signal-${crypto.randomUUID()}`; + const observer = client.destroyObserver.getOrCreate(["observer"]); + await observer.reset(); + + const result = await client.destroyAbortSignalActor + .getOrCreate([actorKey]) + .requestDestroy(); + + expect(result).toEqual({ + beforeDestroyAborted: false, + beforeDestroySignalAborted: false, + afterDestroyAborted: false, + afterDestroySignalAborted: false, + abortEventCountAfterDestroy: 0, + }); + + // Poll until onDestroy records so this action covered the real destroy path. + await vi.waitFor(async () => { + expect(await observer.wasDestroyed(actorKey)).toBe(true); + }); + }); + test("actor destroy clears ephemeral vars on same-key recreation", async (c) => { const { client } = await setupDriverTest(c, driverTestConfig); diff --git a/website/src/content/docs/actors/actions.mdx b/website/src/content/docs/actors/actions.mdx index 58216505d6..0fad4251c8 100644 --- a/website/src/content/docs/actors/actions.mdx +++ b/website/src/content/docs/actors/actions.mdx @@ -325,7 +325,7 @@ Actions have a single return value. To stream realtime data in response to an ac ## Canceling Long-Running Actions -For operations that should be cancelable on-demand, create your own `AbortController` and chain it with `c.abortSignal` for automatic cleanup on actor shutdown. +For operations that should be cancelable on-demand, create your own `AbortController`. Chain it with `c.abortSignal` so actor shutdown also cancels the operation. ```typescript import { actor } from "rivetkit"; diff --git a/website/src/content/docs/actors/index.mdx b/website/src/content/docs/actors/index.mdx index e8736dcf05..94a258d2e9 100644 --- a/website/src/content/docs/actors/index.mdx +++ b/website/src/content/docs/actors/index.mdx @@ -466,7 +466,7 @@ const userAccount = actor({ ### Lifecycle Hooks -Actors support hooks for initialization, background processing, connections, networking, and state changes. Use `run` for long-lived background loops, and exit cleanly on shutdown with `c.aborted` or `c.abortSignal`. +Actors support hooks for initialization, background processing, connections, networking, and state changes. Use `run` for long-lived background loops, and use `c.aborted` or `c.abortSignal` for graceful shutdown. ```ts import { actor, event, queue } from "rivetkit"; diff --git a/website/src/content/docs/actors/lifecycle.mdx b/website/src/content/docs/actors/lifecycle.mdx index f10a7797ed..5a496d8b16 100644 --- a/website/src/content/docs/actors/lifecycle.mdx +++ b/website/src/content/docs/actors/lifecycle.mdx @@ -304,7 +304,7 @@ const tickActor = actor({ c.state.tickCount++; c.log.info({ msg: "tick", count: c.state.tickCount }); - // Wait 1 second, but exit early if aborted + // Wait 1 second. Final shutdown also resolves this wait. await new Promise((resolve) => { const timeout = setTimeout(resolve, 1000); c.abortSignal.addEventListener("abort", () => { @@ -876,7 +876,7 @@ When an actor sleeps or is destroyed, it enters the graceful shutdown window: 1. `c.abortSignal` fires and `c.aborted` becomes `true`. New connections and dispatch are rejected. Alarm timeouts are cancelled. On sleep, scheduled events are persisted and will be re-armed when the actor wakes. 2. `onSleep` or `onDestroy` and `onDisconnect` for each closing connection run during the same window. User `waitUntil` promises and async raw WebSocket handlers are drained. Hibernatable WebSocket connections are preserved on sleep and closed on destroy. -3. Once graceful work has completed, state is saved and the database is cleaned up. +3. Once graceful work has completed, state is saved and final cleanup runs. The entire window is bounded by `sleepGracePeriod` on both sleep and destroy. Defaults to 15 seconds. If the window is exceeded, the actor proceeds to state save anyway. diff --git a/website/src/content/docs/actors/queues.mdx b/website/src/content/docs/actors/queues.mdx index 2f319bc2ec..8155449ee9 100644 --- a/website/src/content/docs/actors/queues.mdx +++ b/website/src/content/docs/actors/queues.mdx @@ -428,7 +428,7 @@ This means you can run normal code in `run` without worrying about sleep interru - Implement connection auth in `onBeforeConnect`. See [Authentication](/docs/actors/authentication). - Route most state changes through one queue loop so ordering stays predictable. - If you need more complex multi-step run loops, consider using workflows. -- Use `c.aborted` and `c.abortSignal` for graceful shutdown and cancellation. +- Use `c.aborted` and `c.abortSignal` for actor shutdown. Use your own `AbortController` for earlier loop cancellation. - Add `timeout` when callers need bounded wait behavior. - Use `wait: true` only when the caller actually needs a response.