Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ import {
sleepRawWsDelayedSendOnSleep,
sleepWithWaitUntilInOnWake,
sleepAbortListenerVarsActor,
sleepRawWsVarsExceedsGrace,
} from "./sleep";
import {
sleepWithDb,
Expand Down Expand Up @@ -210,6 +211,7 @@ export const registry = setup({
sleepRawWsDelayedSendOnSleep,
sleepWithWaitUntilInOnWake,
sleepAbortListenerVarsActor,
sleepRawWsVarsExceedsGrace,
counterWaitUntilProbe,
// From sleep-db.ts
sleepWithDb,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { actor, event, type UniversalWebSocket } from "rivetkit";

Check failure on line 1 in rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/sleep.ts

View workflow job for this annotation

GitHub Actions / RivetKit / Quality Check

format

Formatter would have printed the following content:
import { promiseWithResolvers } from "rivetkit/utils";
import { scheduleActorSleep } from "./schedule-sleep";

Expand Down Expand Up @@ -618,3 +618,71 @@
noSleep: true,
},
});

// Reproduces a production crash where c.vars becomes undefined after the
// grace deadline expires and clearNativeRuntimeState unrefs the NAPI
// runtime state object. An async message handler accesses c.vars after an
// await that outlasts the grace period.
//
// The close-handler variant cannot reproduce the bug because the tracked
// websocket callback region blocks can_arm_sleep_timer. Instead we use a
// message handler that starts slow async work, then the actor is told to
// sleep programmatically while the handler is still running.
export const VARS_EXCEEDS_GRACE_DELAY = 2000;
export const VARS_EXCEEDS_GRACE_PERIOD = 200;
export const VARS_EXCEEDS_GRACE_SLEEP_TIMEOUT = 100;

export const sleepRawWsVarsExceedsGrace = actor({
state: {
startCount: 0,
sleepCount: 0,
handlerStarted: 0,
handlerFinished: 0,
},
createVars: () => ({
dirty: false,
}),
onWake: (c) => {
c.state.startCount += 1;
},
onSleep: (c) => {
c.state.sleepCount += 1;
},
onWebSocket: (c, websocket: UniversalWebSocket) => {
websocket.addEventListener("message", async (event: any) => {
if (event.data !== "slow-vars-work") return;

c.state.handlerStarted += 1;
websocket.send(JSON.stringify({ type: "started" }));

// Wait longer than the grace period so the runtime state
// gets cleared while this handler is still running.
await new Promise((resolve) =>
setTimeout(resolve, VARS_EXCEEDS_GRACE_DELAY),
);
// This c.vars access crashes with TypeError in prod because
// the NAPI runtime state reference has been unreffed.
// Do NOT wrap in try/catch: c.state also breaks after cleanup,
// so the error needs to propagate to the process level.
c.vars.dirty = true;
c.state.handlerFinished += 1;
});

websocket.send(JSON.stringify({ type: "connected" }));
},
actions: {
triggerSleep: (c) => {
c.sleep();
},
getStatus: (c) => ({
startCount: c.state.startCount,
sleepCount: c.state.sleepCount,
handlerStarted: c.state.handlerStarted,
handlerFinished: c.state.handlerFinished,
}),
},
options: {
sleepTimeout: VARS_EXCEEDS_GRACE_SLEEP_TIMEOUT,
sleepGracePeriod: VARS_EXCEEDS_GRACE_PERIOD,
},
});
Original file line number Diff line number Diff line change
Expand Up @@ -2335,7 +2335,7 @@ class TrackedWebSocketHandleAdapter implements UniversalWebSocket {
return;
}
const callbackRegionId = this.#ctx.beginWebSocketCallback();
this.#ctx.waitUntil(
this.#ctx.keepAwake(
Promise.resolve(result)
.catch((error) => {
logger().error({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import {
RAW_WS_HANDLER_DELAY,
RAW_WS_HANDLER_SLEEP_TIMEOUT,
SLEEP_TIMEOUT,
VARS_EXCEEDS_GRACE_DELAY,
VARS_EXCEEDS_GRACE_SLEEP_TIMEOUT,
} from "../../fixtures/driver-test-suite/sleep";
import { describeDriverMatrix } from "./shared-matrix";
import { setupDriverTest, waitFor } from "./shared-utils";
Expand Down Expand Up @@ -962,5 +964,69 @@ describeDriverMatrix("Actor Sleep", (driverTestConfig) => {
expect(startCount).toBe(2);
}
});

test(
"c.vars access in ws handler should not crash after grace deadline",
async (c) => {
const { client, getRuntimeOutput } = await setupDriverTest(
c,
driverTestConfig,
);

const actor =
client.sleepRawWsVarsExceedsGrace.getOrCreate([
"ws-vars-exceeds-grace",
]);
const ws = await connectRawWebSocket(actor);

// Send a message that starts slow async work (2000ms delay
// before accessing c.vars).
ws.send("slow-vars-work");

// Wait for the handler to confirm it started.
await new Promise<void>((resolve) => {
const onMessage = (event: MessageEvent) => {
const data = JSON.parse(String(event.data));
if (data.type === "started") {
ws.removeEventListener("message", onMessage);
resolve();
}
};
ws.addEventListener("message", onMessage);
});

// Trigger sleep while the handler is still doing slow work.
// The grace period (200ms) is much shorter than the handler
// delay (2000ms), so onSleep will clear the runtime state
// while the handler is still running.
await actor.triggerSleep();

// Wait for the handler to finish and the actor to complete
// its sleep cycle.
await waitFor(
driverTestConfig,
VARS_EXCEEDS_GRACE_DELAY +
VARS_EXCEEDS_GRACE_SLEEP_TIMEOUT +
500,
);

// Wake the actor and check what happened.
const status = await actor.getStatus();
expect(status.sleepCount).toBeGreaterThanOrEqual(1);
expect(status.handlerStarted).toBe(1);
// The runtime must not crash with TypeError when the
// handler accesses c.vars after the grace deadline.
// The deferred cleanup keeps the runtime state alive
// until the websocket callback region drains.
const output = getRuntimeOutput();
expect(output).not.toContain(
"Cannot set properties of undefined",
);
expect(output).not.toContain(
"Cannot read properties of undefined",
);
},
{ timeout: 15_000 },
);
});
});
Loading