diff --git a/engine/packages/pegboard-gateway/src/tunnel_to_ws_task.rs b/engine/packages/pegboard-gateway/src/tunnel_to_ws_task.rs index e22ce4bdfd..2f7d710afe 100644 --- a/engine/packages/pegboard-gateway/src/tunnel_to_ws_task.rs +++ b/engine/packages/pegboard-gateway/src/tunnel_to_ws_task.rs @@ -7,7 +7,7 @@ use anyhow::Result; use gas::prelude::*; use rivet_guard_core::{ WebSocketHandle, - errors::{WebSocketServiceHibernate, WebSocketServiceTimeout, WebSocketServiceUnavailable}, + errors::{WebSocketServiceHibernate, WebSocketServiceTimeout}, }; use rivet_runner_protocol as protocol; use tokio::sync::{mpsc, watch}; @@ -83,7 +83,11 @@ pub async fn task( if can_hibernate { return Err(WebSocketServiceHibernate.build()); } else { - return Err(WebSocketServiceUnavailable.build()); + return Ok(LifecycleResult::ServerClose(protocol::mk2::ToServerWebSocketClose { + code: Some(1000), + reason: Some("actor.stopped".to_owned()), + hibernate: false, + })); } } _ = drop_rx.changed() => { diff --git a/engine/packages/pegboard-gateway2/src/tunnel_to_ws_task.rs b/engine/packages/pegboard-gateway2/src/tunnel_to_ws_task.rs index 69080a5aa5..0c0993ffc4 100644 --- a/engine/packages/pegboard-gateway2/src/tunnel_to_ws_task.rs +++ b/engine/packages/pegboard-gateway2/src/tunnel_to_ws_task.rs @@ -83,7 +83,11 @@ pub async fn task( if can_hibernate { return Err(WebSocketServiceHibernate.build()); } else { - return Err(WebSocketServiceUnavailable.build()); + return Ok(LifecycleResult::ServerClose(protocol::ToRivetWebSocketClose { + code: Some(1000), + reason: Some("actor.stopped".to_owned()), + hibernate: false, + })); } } _ = drop_rx.changed() => { diff --git a/examples/kitchen-sink-vercel/package.json b/examples/kitchen-sink-vercel/package.json index a01f0b3adf..60fadcec78 100644 --- a/examples/kitchen-sink-vercel/package.json +++ b/examples/kitchen-sink-vercel/package.json @@ -6,6 +6,7 @@ "scripts": { "dev": "vercel dev", "build": "vite build", + "smoke:raw-websocket-serverless": "tsx scripts/raw-websocket-serverless-smoke.ts", "check-types": "tsc --noEmit" }, "dependencies": { diff --git a/examples/kitchen-sink-vercel/scripts/raw-websocket-serverless-smoke.ts b/examples/kitchen-sink-vercel/scripts/raw-websocket-serverless-smoke.ts new file mode 100644 index 0000000000..44027a24ec --- /dev/null +++ b/examples/kitchen-sink-vercel/scripts/raw-websocket-serverless-smoke.ts @@ -0,0 +1,323 @@ +// Raw onWebSocket serverless smoke test. +// +// Usage: +// RIVET_ENDPOINT=http://127.0.0.1:6420 \ +// RIVET_SERVERLESS_URL=http://127.0.0.1:3000/api/rivet \ +// SMOKE_PARALLELISM=4 \ +// SMOKE_STAGGER_MS=1000 \ +// pnpm --filter kitchen-sink-vercel smoke:raw-websocket-serverless + +import { createClient } from "rivetkit/client"; +import type { registry } from "../src/index.ts"; + +const ENDPOINT = process.env.RIVET_ENDPOINT ?? "http://127.0.0.1:6420"; +const SERVERLESS_URL = process.env.RIVET_SERVERLESS_URL; +const NAMESPACE = + process.env.SMOKE_NAMESPACE ?? process.env.RIVET_NAMESPACE ?? "default"; +const TOKEN = process.env.SMOKE_TOKEN ?? process.env.RIVET_TOKEN ?? "dev"; +const POOL_NAME = process.env.SMOKE_POOL ?? process.env.RIVET_POOL ?? "default"; +const KEY = process.env.SMOKE_KEY ?? `raw-ws-serverless-smoke-${Date.now()}`; +const DURATION_MS = Number(process.env.SMOKE_DURATION_MS ?? "120000"); +const PARALLELISM = Number(process.env.SMOKE_PARALLELISM ?? "1"); +const SHARED_KEY = process.env.SMOKE_SHARED_KEY === "1"; +const LOG_MESSAGES = process.env.SMOKE_LOG_MESSAGES !== "0"; +const GAP_WARN_MS = Number(process.env.SMOKE_GAP_WARN_MS ?? "3000"); +const STALE_TIMEOUT_MS = Number( + process.env.SMOKE_STALE_TIMEOUT_MS ?? String(GAP_WARN_MS * 2), +); +const SLEEP_INTERVAL_MS = Number( + process.env.SMOKE_SLEEP_INTERVAL_MS ?? "15000", +); +const STAGGER_MS = Number(process.env.SMOKE_STAGGER_MS ?? "1000"); +const POST_SLEEP = process.env.SMOKE_POST_SLEEP !== "0"; +const CONNECT_ERROR_DELAY_MS = Number( + process.env.SMOKE_CONNECT_ERROR_DELAY_MS ?? "250", +); + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +function formatError(error: unknown): string { + if (error instanceof Error) return `${error.name}: ${error.message}`; + return String(error); +} + +function eventDataToString(data: unknown): string { + if (typeof data === "string") return data; + if (data instanceof ArrayBuffer) { + return ``; + } + if (ArrayBuffer.isView(data)) { + return ``; + } + return String(data); +} + +function appendPath(endpoint: string, path: string): URL { + const url = new URL(endpoint); + const prefix = url.pathname.replace(/\/$/, ""); + url.pathname = `${prefix}${path}`; + url.search = ""; + url.hash = ""; + return url; +} + +function buildSleepUrl(actorId: string): string { + const url = appendPath( + ENDPOINT, + `/actors/${encodeURIComponent(actorId)}/sleep`, + ); + url.searchParams.set("namespace", NAMESPACE); + return url.toString(); +} + +function buildWebSocketUrl(actorId: string): string { + const tokenSegment = TOKEN ? `@${encodeURIComponent(TOKEN)}` : ""; + const url = appendPath( + ENDPOINT, + `/gateway/${encodeURIComponent(actorId)}${tokenSegment}/websocket`, + ); + url.protocol = url.protocol === "https:" ? "wss:" : "ws:"; + return url.toString(); +} + +async function waitForOpen(ws: WebSocket): Promise { + if (ws.readyState === WebSocket.OPEN) return; + + await new Promise((resolve, reject) => { + ws.addEventListener("open", () => resolve(), { once: true }); + ws.addEventListener("error", () => reject(new Error("websocket error")), { + once: true, + }); + ws.addEventListener( + "close", + (event) => + reject( + new Error( + `websocket closed before open code=${event.code} reason=${event.reason}`, + ), + ), + { once: true }, + ); + }); +} + +async function triggerServerlessConfiguration() { + if (!SERVERLESS_URL) return; + + const url = `${SERVERLESS_URL.replace(/\/$/, "")}/metadata`; + console.log(`[configure] hitting ${url}`); + const response = await fetch(url); + console.log(`[configure] status=${response.status}`); +} + +async function postSleep(actorId: string, label: string, stopAt: number) { + if (!POST_SLEEP || SLEEP_INTERVAL_MS <= 0) { + return { sleepPosts: 0, sleepErrors: 0 }; + } + + let sleepPosts = 0; + let sleepErrors = 0; + const sleepUrl = buildSleepUrl(actorId); + + while (Date.now() < stopAt) { + await sleep(Math.min(SLEEP_INTERVAL_MS, Math.max(0, stopAt - Date.now()))); + if (Date.now() >= stopAt) break; + + try { + sleepPosts += 1; + console.log(`[sleep] ${label} post=${sleepPosts} url=${sleepUrl}`); + const response = await fetch(sleepUrl, { + method: "POST", + headers: { + Authorization: TOKEN ? `Bearer ${TOKEN}` : "", + "content-type": "application/json", + }, + body: "{}", + }); + const body = await response.text(); + console.log( + `[sleep] ${label} post=${sleepPosts} status=${response.status} body=${body}`, + ); + } catch (error) { + sleepErrors += 1; + console.error( + `[sleep-error] ${label} post=${sleepPosts} ${formatError(error)}`, + ); + } + } + + return { sleepPosts, sleepErrors }; +} + +async function runWorker(workerIndex: number, stopAt: number) { + const startDelayMs = workerIndex * STAGGER_MS; + if (startDelayMs > 0) { + await sleep(startDelayMs); + } + + const key = SHARED_KEY ? KEY : `${KEY}-${workerIndex}`; + const label = `worker=${workerIndex} key=${key}`; + const client = createClient({ + endpoint: ENDPOINT, + namespace: NAMESPACE, + token: TOKEN, + poolName: POOL_NAME, + }); + const handle = client.rawWebSocketServerlessSmoke.getOrCreate([key]); + const actorId = await handle.resolve(); + const webSocketUrl = buildWebSocketUrl(actorId); + let current: WebSocket | undefined; + let attempt = 0; + let messageCount = 0; + let gapCount = 0; + let staleReconnects = 0; + let lastGlobalMessageAt = 0; + const sleepResultPromise = postSleep(actorId, label, stopAt); + + while (Date.now() < stopAt) { + attempt += 1; + const openedAt = Date.now(); + let lastMessageAt = 0; + console.log( + `[connect] ${label} actorId=${actorId} attempt=${attempt} url=${webSocketUrl}`, + ); + if (lastGlobalMessageAt > 0) { + const reconnectGapMs = Date.now() - lastGlobalMessageAt; + if (reconnectGapMs > GAP_WARN_MS) { + gapCount += 1; + console.error( + `[gap] ${label} attempt=${attempt} reconnectGapMs=${reconnectGapMs} thresholdMs=${GAP_WARN_MS}`, + ); + } + } + + try { + const ws = new WebSocket(webSocketUrl, ["rivet", "rivet_encoding.json"]); + current = ws; + await waitForOpen(ws); + console.log(`[open] ${label} attempt=${attempt}`); + + await new Promise((resolve) => { + const timeout = setTimeout( + () => { + ws.close(1000, "smoke complete"); + }, + Math.max(0, stopAt - Date.now()), + ); + const staleWatchdog = + STALE_TIMEOUT_MS > 0 + ? setInterval(() => { + if ( + ws.readyState === WebSocket.OPEN && + lastMessageAt > 0 && + Date.now() - lastMessageAt > STALE_TIMEOUT_MS + ) { + staleReconnects += 1; + console.error( + `[stale] ${label} attempt=${attempt} lastMessageAgeMs=${Date.now() - lastMessageAt}`, + ); + ws.close(4000, "stale smoke connection"); + } + }, Math.min(1000, STALE_TIMEOUT_MS)) + : undefined; + + ws.addEventListener("message", (event) => { + const now = Date.now(); + const gapMs = lastMessageAt > 0 ? now - lastMessageAt : 0; + if (gapMs > GAP_WARN_MS) { + gapCount += 1; + console.error( + `[gap] ${label} attempt=${attempt} gapMs=${gapMs} thresholdMs=${GAP_WARN_MS}`, + ); + } + lastMessageAt = now; + lastGlobalMessageAt = now; + messageCount += 1; + if (LOG_MESSAGES) { + console.log( + `[message] ${label} count=${messageCount} attempt=${attempt} data=${eventDataToString(event.data)}`, + ); + } + }); + ws.addEventListener( + "close", + (event) => { + clearTimeout(timeout); + if (staleWatchdog) clearInterval(staleWatchdog); + console.log( + `[close] ${label} attempt=${attempt} code=${event.code} reason=${event.reason} openMs=${Date.now() - openedAt}`, + ); + resolve(); + }, + { once: true }, + ); + ws.addEventListener("error", () => { + console.error(`[error] ${label} attempt=${attempt}`); + }); + }); + } catch (error) { + console.error( + `[connect-error] ${label} attempt=${attempt} ${formatError(error)}`, + ); + await sleep(CONNECT_ERROR_DELAY_MS); + } finally { + current = undefined; + } + } + + current?.close(1000, "smoke complete"); + const sleepResult = await sleepResultPromise; + console.log( + `[done] ${label} actorId=${actorId} attempts=${attempt} messages=${messageCount} gaps=${gapCount} staleReconnects=${staleReconnects} sleepPosts=${sleepResult.sleepPosts} sleepErrors=${sleepResult.sleepErrors}`, + ); + return { + workerIndex, + attempts: attempt, + messages: messageCount, + gaps: gapCount, + staleReconnects, + sleepPosts: sleepResult.sleepPosts, + sleepErrors: sleepResult.sleepErrors, + }; +} + +async function main() { + if (!Number.isInteger(PARALLELISM) || PARALLELISM < 1) { + throw new Error("SMOKE_PARALLELISM must be a positive integer"); + } + + console.log( + `[smoke] endpoint=${ENDPOINT} namespace=${NAMESPACE} pool=${POOL_NAME} key=${KEY} durationMs=${DURATION_MS} parallelism=${PARALLELISM} sharedKey=${SHARED_KEY} staggerMs=${STAGGER_MS} gapWarnMs=${GAP_WARN_MS} staleTimeoutMs=${STALE_TIMEOUT_MS} sleepIntervalMs=${SLEEP_INTERVAL_MS} postSleep=${POST_SLEEP}`, + ); + await triggerServerlessConfiguration(); + + const stopAt = Date.now() + DURATION_MS; + const results = await Promise.all( + Array.from({ length: PARALLELISM }, (_, i) => runWorker(i, stopAt)), + ); + const attempts = results.reduce((sum, result) => sum + result.attempts, 0); + const messages = results.reduce((sum, result) => sum + result.messages, 0); + const gaps = results.reduce((sum, result) => sum + result.gaps, 0); + const staleReconnects = results.reduce( + (sum, result) => sum + result.staleReconnects, + 0, + ); + const sleepPosts = results.reduce( + (sum, result) => sum + result.sleepPosts, + 0, + ); + const sleepErrors = results.reduce( + (sum, result) => sum + result.sleepErrors, + 0, + ); + console.log( + `[summary] workers=${PARALLELISM} attempts=${attempts} messages=${messages} gaps=${gaps} staleReconnects=${staleReconnects} sleepPosts=${sleepPosts} sleepErrors=${sleepErrors}`, + ); +} + +main().catch((error) => { + console.error(`[fatal] ${formatError(error)}`); + process.exit(1); +}); diff --git a/examples/kitchen-sink-vercel/src/actors/http/raw-websocket-serverless-smoke.ts b/examples/kitchen-sink-vercel/src/actors/http/raw-websocket-serverless-smoke.ts new file mode 100644 index 0000000000..4872579e83 --- /dev/null +++ b/examples/kitchen-sink-vercel/src/actors/http/raw-websocket-serverless-smoke.ts @@ -0,0 +1,98 @@ +import { actor, type RivetMessageEvent, type UniversalWebSocket } from "rivetkit"; + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +export const rawWebSocketServerlessSmoke = actor({ + options: { + canHibernateWebSocket: false, + sleepGracePeriod: 5_000, + }, + state: { + connectionCount: 0, + sleepCount: 0, + totalTickCount: 0, + totalMessageCount: 0, + }, + async onSleep(c) { + const delayMs = 10 + Math.floor(Math.random() * 1_991); + c.state.sleepCount += 1; + c.log.info({ + msg: "raw websocket serverless smoke onSleep delay", + delayMs, + sleepCount: c.state.sleepCount, + }); + await sleep(delayMs); + }, + onWebSocket(c, websocket: UniversalWebSocket) { + c.state.connectionCount += 1; + const connectionId = crypto.randomUUID(); + let index = 0; + + const sendTick = () => { + if (websocket.readyState !== 1) return; + + const timestamp = Date.now(); + const message = { + type: "tick", + connectionId, + index, + timestamp, + iso: new Date(timestamp).toISOString(), + totalTickCount: c.state.totalTickCount, + }; + + c.state.totalTickCount += 1; + index += 1; + websocket.send(JSON.stringify(message)); + }; + + c.log.info({ + msg: "raw websocket serverless smoke connected", + connectionId, + connectionCount: c.state.connectionCount, + }); + + sendTick(); + const interval = setInterval(sendTick, 1_000); + + websocket.addEventListener("message", (event: RivetMessageEvent) => { + c.state.totalMessageCount += 1; + c.log.info({ + msg: "raw websocket serverless smoke received message", + connectionId, + totalMessageCount: c.state.totalMessageCount, + }); + websocket.send( + JSON.stringify({ + type: "ack", + connectionId, + index, + timestamp: Date.now(), + received: event.data, + }), + ); + }); + + websocket.addEventListener("close", () => { + clearInterval(interval); + c.state.connectionCount -= 1; + c.log.info({ + msg: "raw websocket serverless smoke disconnected", + connectionId, + connectionCount: c.state.connectionCount, + }); + }); + }, + actions: { + getStats(c) { + return { + connectionCount: c.state.connectionCount, + sleepCount: c.state.sleepCount, + totalTickCount: c.state.totalTickCount, + totalMessageCount: c.state.totalMessageCount, + }; + }, + }, +}); diff --git a/examples/kitchen-sink-vercel/src/index.ts b/examples/kitchen-sink-vercel/src/index.ts index 8bfeaebaa9..a6ec2a22de 100644 --- a/examples/kitchen-sink-vercel/src/index.ts +++ b/examples/kitchen-sink-vercel/src/index.ts @@ -57,6 +57,7 @@ import { } from "./actors/http/raw-websocket.ts"; import { rawFetchCounter } from "./actors/http/raw-fetch-counter.ts"; import { rawWebSocketChatRoom } from "./actors/http/raw-websocket-chat-room.ts"; +import { rawWebSocketServerlessSmoke } from "./actors/http/raw-websocket-serverless-smoke.ts"; // Lifecycle import { runWithTicks, @@ -119,7 +120,46 @@ import { testSqliteBench } from "./actors/testing/test-sqlite-bench.ts"; // AI import { aiAgent } from "./actors/ai/ai-agent.ts"; +function numberFromEnv(name: string, fallback: number): number { + const value = process.env[name]; + if (value === undefined || value === "") return fallback; + + const parsed = Number(value); + if (!Number.isFinite(parsed)) { + throw new Error(`${name} must be a finite number`); + } + + return parsed; +} + +function serverlessPoolConfig() { + const url = + process.env.RIVET_SERVERLESS_URL ?? + process.env.KITCHEN_SINK_SERVERLESS_URL ?? + (process.env.RIVET_RUN_ENGINE === "1" + ? "http://127.0.0.1:3000/api/rivet" + : undefined); + + if (!url) return undefined; + + return { + name: process.env.RIVET_POOL, + url, + requestLifespan: numberFromEnv("RIVET_SERVERLESS_REQUEST_LIFESPAN", 30), + drainGracePeriod: numberFromEnv("RIVET_SERVERLESS_DRAIN_GRACE_PERIOD", 5), + metadataPollInterval: numberFromEnv( + "RIVET_SERVERLESS_METADATA_POLL_INTERVAL_MS", + 1000, + ), + metadata: { + source: "kitchen-sink-vercel", + smoke: "raw-websocket-serverless", + }, + }; +} + export const registry = setup({ + configurePool: serverlessPoolConfig(), use: { // Overview + state basics counter, @@ -165,6 +205,7 @@ export const registry = setup({ rawWebSocketBinaryActor, rawFetchCounter, rawWebSocketChatRoom, + rawWebSocketServerlessSmoke, // Lifecycle and scheduling runWithTicks, runWithQueueConsumer, diff --git a/examples/kitchen-sink/package.json b/examples/kitchen-sink/package.json index 27b2157681..9670e86a60 100644 --- a/examples/kitchen-sink/package.json +++ b/examples/kitchen-sink/package.json @@ -9,6 +9,7 @@ "check-types": "echo 'skipped - workflow history types broken'", "build": "vite build", "start": "node --import @rivetkit/sql-loader --import tsx src/server.ts", + "smoke:raw-websocket-serverless": "tsx scripts/raw-websocket-serverless-smoke.ts", "benchmark": "tsx scripts/benchmark.ts", "db:generate": "find src/actors -name drizzle.config.ts -exec drizzle-kit generate --config {} \\;" }, diff --git a/examples/kitchen-sink/scripts/raw-websocket-serverless-smoke.ts b/examples/kitchen-sink/scripts/raw-websocket-serverless-smoke.ts new file mode 100644 index 0000000000..32d651de4a --- /dev/null +++ b/examples/kitchen-sink/scripts/raw-websocket-serverless-smoke.ts @@ -0,0 +1,327 @@ +// Raw onWebSocket serverless smoke test. +// +// Usage: +// RIVET_ENDPOINT=http://127.0.0.1:6420 \ +// RIVET_SERVERLESS_URL=http://127.0.0.1:3000/api/rivet \ +// SMOKE_PARALLELISM=4 \ +// SMOKE_STAGGER_MS=1000 \ +// pnpm --filter kitchen-sink smoke:raw-websocket-serverless +// +// The serverless pool is configured by the kitchen-sink server when +// RIVET_SERVERLESS_URL is set. It uses a 30s request lifespan and a 5s drain +// grace period by default. + +import { createClient } from "rivetkit/client"; +import type { registry } from "../src/index.ts"; + +const ENDPOINT = process.env.RIVET_ENDPOINT ?? "http://127.0.0.1:6420"; +const SERVERLESS_URL = process.env.RIVET_SERVERLESS_URL; +const NAMESPACE = + process.env.SMOKE_NAMESPACE ?? process.env.RIVET_NAMESPACE ?? "default"; +const TOKEN = process.env.SMOKE_TOKEN ?? process.env.RIVET_TOKEN ?? "dev"; +const POOL_NAME = process.env.SMOKE_POOL ?? process.env.RIVET_POOL ?? "default"; +const KEY = process.env.SMOKE_KEY ?? `raw-ws-serverless-smoke-${Date.now()}`; +const DURATION_MS = Number(process.env.SMOKE_DURATION_MS ?? "120000"); +const PARALLELISM = Number(process.env.SMOKE_PARALLELISM ?? "1"); +const SHARED_KEY = process.env.SMOKE_SHARED_KEY === "1"; +const LOG_MESSAGES = process.env.SMOKE_LOG_MESSAGES !== "0"; +const GAP_WARN_MS = Number(process.env.SMOKE_GAP_WARN_MS ?? "3000"); +const STALE_TIMEOUT_MS = Number( + process.env.SMOKE_STALE_TIMEOUT_MS ?? String(GAP_WARN_MS * 2), +); +const SLEEP_INTERVAL_MS = Number( + process.env.SMOKE_SLEEP_INTERVAL_MS ?? "15000", +); +const STAGGER_MS = Number(process.env.SMOKE_STAGGER_MS ?? "1000"); +const POST_SLEEP = process.env.SMOKE_POST_SLEEP !== "0"; +const CONNECT_ERROR_DELAY_MS = Number( + process.env.SMOKE_CONNECT_ERROR_DELAY_MS ?? "250", +); + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +function formatError(error: unknown): string { + if (error instanceof Error) return `${error.name}: ${error.message}`; + return String(error); +} + +function eventDataToString(data: unknown): string { + if (typeof data === "string") return data; + if (data instanceof ArrayBuffer) { + return ``; + } + if (ArrayBuffer.isView(data)) { + return ``; + } + return String(data); +} + +function appendPath(endpoint: string, path: string): URL { + const url = new URL(endpoint); + const prefix = url.pathname.replace(/\/$/, ""); + url.pathname = `${prefix}${path}`; + url.search = ""; + url.hash = ""; + return url; +} + +function buildSleepUrl(actorId: string): string { + const url = appendPath( + ENDPOINT, + `/actors/${encodeURIComponent(actorId)}/sleep`, + ); + url.searchParams.set("namespace", NAMESPACE); + return url.toString(); +} + +function buildWebSocketUrl(actorId: string): string { + const tokenSegment = TOKEN ? `@${encodeURIComponent(TOKEN)}` : ""; + const url = appendPath( + ENDPOINT, + `/gateway/${encodeURIComponent(actorId)}${tokenSegment}/websocket`, + ); + url.protocol = url.protocol === "https:" ? "wss:" : "ws:"; + return url.toString(); +} + +async function waitForOpen(ws: WebSocket): Promise { + if (ws.readyState === WebSocket.OPEN) return; + + await new Promise((resolve, reject) => { + ws.addEventListener("open", () => resolve(), { once: true }); + ws.addEventListener("error", () => reject(new Error("websocket error")), { + once: true, + }); + ws.addEventListener( + "close", + (event) => + reject( + new Error( + `websocket closed before open code=${event.code} reason=${event.reason}`, + ), + ), + { once: true }, + ); + }); +} + +async function triggerServerlessConfiguration() { + if (!SERVERLESS_URL) return; + + const url = `${SERVERLESS_URL.replace(/\/$/, "")}/metadata`; + console.log(`[configure] hitting ${url}`); + const response = await fetch(url); + console.log(`[configure] status=${response.status}`); +} + +async function postSleep(actorId: string, label: string, stopAt: number) { + if (!POST_SLEEP || SLEEP_INTERVAL_MS <= 0) { + return { sleepPosts: 0, sleepErrors: 0 }; + } + + let sleepPosts = 0; + let sleepErrors = 0; + const sleepUrl = buildSleepUrl(actorId); + + while (Date.now() < stopAt) { + await sleep(Math.min(SLEEP_INTERVAL_MS, Math.max(0, stopAt - Date.now()))); + if (Date.now() >= stopAt) break; + + try { + sleepPosts += 1; + console.log(`[sleep] ${label} post=${sleepPosts} url=${sleepUrl}`); + const response = await fetch(sleepUrl, { + method: "POST", + headers: { + Authorization: TOKEN ? `Bearer ${TOKEN}` : "", + "content-type": "application/json", + }, + body: "{}", + }); + const body = await response.text(); + console.log( + `[sleep] ${label} post=${sleepPosts} status=${response.status} body=${body}`, + ); + } catch (error) { + sleepErrors += 1; + console.error( + `[sleep-error] ${label} post=${sleepPosts} ${formatError(error)}`, + ); + } + } + + return { sleepPosts, sleepErrors }; +} + +async function runWorker(workerIndex: number, stopAt: number) { + const startDelayMs = workerIndex * STAGGER_MS; + if (startDelayMs > 0) { + await sleep(startDelayMs); + } + + const key = SHARED_KEY ? KEY : `${KEY}-${workerIndex}`; + const label = `worker=${workerIndex} key=${key}`; + const client = createClient({ + endpoint: ENDPOINT, + namespace: NAMESPACE, + token: TOKEN, + poolName: POOL_NAME, + }); + const handle = client.rawWebSocketServerlessSmoke.getOrCreate([key]); + const actorId = await handle.resolve(); + const webSocketUrl = buildWebSocketUrl(actorId); + let current: WebSocket | undefined; + let attempt = 0; + let messageCount = 0; + let gapCount = 0; + let staleReconnects = 0; + let lastGlobalMessageAt = 0; + const sleepResultPromise = postSleep(actorId, label, stopAt); + + while (Date.now() < stopAt) { + attempt += 1; + const openedAt = Date.now(); + let lastMessageAt = 0; + console.log( + `[connect] ${label} actorId=${actorId} attempt=${attempt} url=${webSocketUrl}`, + ); + if (lastGlobalMessageAt > 0) { + const reconnectGapMs = Date.now() - lastGlobalMessageAt; + if (reconnectGapMs > GAP_WARN_MS) { + gapCount += 1; + console.error( + `[gap] ${label} attempt=${attempt} reconnectGapMs=${reconnectGapMs} thresholdMs=${GAP_WARN_MS}`, + ); + } + } + + try { + const ws = new WebSocket(webSocketUrl, ["rivet", "rivet_encoding.json"]); + current = ws; + await waitForOpen(ws); + console.log(`[open] ${label} attempt=${attempt}`); + + await new Promise((resolve) => { + const timeout = setTimeout( + () => { + ws.close(1000, "smoke complete"); + }, + Math.max(0, stopAt - Date.now()), + ); + const staleWatchdog = + STALE_TIMEOUT_MS > 0 + ? setInterval(() => { + if ( + ws.readyState === WebSocket.OPEN && + lastMessageAt > 0 && + Date.now() - lastMessageAt > STALE_TIMEOUT_MS + ) { + staleReconnects += 1; + console.error( + `[stale] ${label} attempt=${attempt} lastMessageAgeMs=${Date.now() - lastMessageAt}`, + ); + ws.close(4000, "stale smoke connection"); + } + }, Math.min(1000, STALE_TIMEOUT_MS)) + : undefined; + + ws.addEventListener("message", (event) => { + const now = Date.now(); + const gapMs = lastMessageAt > 0 ? now - lastMessageAt : 0; + if (gapMs > GAP_WARN_MS) { + gapCount += 1; + console.error( + `[gap] ${label} attempt=${attempt} gapMs=${gapMs} thresholdMs=${GAP_WARN_MS}`, + ); + } + lastMessageAt = now; + lastGlobalMessageAt = now; + messageCount += 1; + if (LOG_MESSAGES) { + console.log( + `[message] ${label} count=${messageCount} attempt=${attempt} data=${eventDataToString(event.data)}`, + ); + } + }); + ws.addEventListener( + "close", + (event) => { + clearTimeout(timeout); + if (staleWatchdog) clearInterval(staleWatchdog); + console.log( + `[close] ${label} attempt=${attempt} code=${event.code} reason=${event.reason} openMs=${Date.now() - openedAt}`, + ); + resolve(); + }, + { once: true }, + ); + ws.addEventListener("error", () => { + console.error(`[error] ${label} attempt=${attempt}`); + }); + }); + } catch (error) { + console.error( + `[connect-error] ${label} attempt=${attempt} ${formatError(error)}`, + ); + await sleep(CONNECT_ERROR_DELAY_MS); + } finally { + current = undefined; + } + } + + current?.close(1000, "smoke complete"); + const sleepResult = await sleepResultPromise; + console.log( + `[done] ${label} actorId=${actorId} attempts=${attempt} messages=${messageCount} gaps=${gapCount} staleReconnects=${staleReconnects} sleepPosts=${sleepResult.sleepPosts} sleepErrors=${sleepResult.sleepErrors}`, + ); + return { + workerIndex, + attempts: attempt, + messages: messageCount, + gaps: gapCount, + staleReconnects, + sleepPosts: sleepResult.sleepPosts, + sleepErrors: sleepResult.sleepErrors, + }; +} + +async function main() { + if (!Number.isInteger(PARALLELISM) || PARALLELISM < 1) { + throw new Error("SMOKE_PARALLELISM must be a positive integer"); + } + + console.log( + `[smoke] endpoint=${ENDPOINT} namespace=${NAMESPACE} pool=${POOL_NAME} key=${KEY} durationMs=${DURATION_MS} parallelism=${PARALLELISM} sharedKey=${SHARED_KEY} staggerMs=${STAGGER_MS} gapWarnMs=${GAP_WARN_MS} staleTimeoutMs=${STALE_TIMEOUT_MS} sleepIntervalMs=${SLEEP_INTERVAL_MS} postSleep=${POST_SLEEP}`, + ); + await triggerServerlessConfiguration(); + + const stopAt = Date.now() + DURATION_MS; + const results = await Promise.all( + Array.from({ length: PARALLELISM }, (_, i) => runWorker(i, stopAt)), + ); + const attempts = results.reduce((sum, result) => sum + result.attempts, 0); + const messages = results.reduce((sum, result) => sum + result.messages, 0); + const gaps = results.reduce((sum, result) => sum + result.gaps, 0); + const staleReconnects = results.reduce( + (sum, result) => sum + result.staleReconnects, + 0, + ); + const sleepPosts = results.reduce( + (sum, result) => sum + result.sleepPosts, + 0, + ); + const sleepErrors = results.reduce( + (sum, result) => sum + result.sleepErrors, + 0, + ); + console.log( + `[summary] workers=${PARALLELISM} attempts=${attempts} messages=${messages} gaps=${gaps} staleReconnects=${staleReconnects} sleepPosts=${sleepPosts} sleepErrors=${sleepErrors}`, + ); +} + +main().catch((error) => { + console.error(`[fatal] ${formatError(error)}`); + process.exit(1); +}); diff --git a/examples/kitchen-sink/src/actors/http/raw-websocket-serverless-smoke.ts b/examples/kitchen-sink/src/actors/http/raw-websocket-serverless-smoke.ts new file mode 100644 index 0000000000..4872579e83 --- /dev/null +++ b/examples/kitchen-sink/src/actors/http/raw-websocket-serverless-smoke.ts @@ -0,0 +1,98 @@ +import { actor, type RivetMessageEvent, type UniversalWebSocket } from "rivetkit"; + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +export const rawWebSocketServerlessSmoke = actor({ + options: { + canHibernateWebSocket: false, + sleepGracePeriod: 5_000, + }, + state: { + connectionCount: 0, + sleepCount: 0, + totalTickCount: 0, + totalMessageCount: 0, + }, + async onSleep(c) { + const delayMs = 10 + Math.floor(Math.random() * 1_991); + c.state.sleepCount += 1; + c.log.info({ + msg: "raw websocket serverless smoke onSleep delay", + delayMs, + sleepCount: c.state.sleepCount, + }); + await sleep(delayMs); + }, + onWebSocket(c, websocket: UniversalWebSocket) { + c.state.connectionCount += 1; + const connectionId = crypto.randomUUID(); + let index = 0; + + const sendTick = () => { + if (websocket.readyState !== 1) return; + + const timestamp = Date.now(); + const message = { + type: "tick", + connectionId, + index, + timestamp, + iso: new Date(timestamp).toISOString(), + totalTickCount: c.state.totalTickCount, + }; + + c.state.totalTickCount += 1; + index += 1; + websocket.send(JSON.stringify(message)); + }; + + c.log.info({ + msg: "raw websocket serverless smoke connected", + connectionId, + connectionCount: c.state.connectionCount, + }); + + sendTick(); + const interval = setInterval(sendTick, 1_000); + + websocket.addEventListener("message", (event: RivetMessageEvent) => { + c.state.totalMessageCount += 1; + c.log.info({ + msg: "raw websocket serverless smoke received message", + connectionId, + totalMessageCount: c.state.totalMessageCount, + }); + websocket.send( + JSON.stringify({ + type: "ack", + connectionId, + index, + timestamp: Date.now(), + received: event.data, + }), + ); + }); + + websocket.addEventListener("close", () => { + clearInterval(interval); + c.state.connectionCount -= 1; + c.log.info({ + msg: "raw websocket serverless smoke disconnected", + connectionId, + connectionCount: c.state.connectionCount, + }); + }); + }, + actions: { + getStats(c) { + return { + connectionCount: c.state.connectionCount, + sleepCount: c.state.sleepCount, + totalTickCount: c.state.totalTickCount, + totalMessageCount: c.state.totalMessageCount, + }; + }, + }, +}); diff --git a/examples/kitchen-sink/src/index.ts b/examples/kitchen-sink/src/index.ts index dd08cf4d18..dedd52c4f1 100644 --- a/examples/kitchen-sink/src/index.ts +++ b/examples/kitchen-sink/src/index.ts @@ -57,6 +57,7 @@ import { } from "./actors/http/raw-websocket.ts"; import { rawFetchCounter } from "./actors/http/raw-fetch-counter.ts"; import { rawWebSocketChatRoom } from "./actors/http/raw-websocket-chat-room.ts"; +import { rawWebSocketServerlessSmoke } from "./actors/http/raw-websocket-serverless-smoke.ts"; // Lifecycle import { runWithTicks, @@ -120,7 +121,46 @@ import { rawSqliteFuzzer } from "./actors/testing/raw-sqlite-fuzzer.ts"; // AI import { aiAgent } from "./actors/ai/ai-agent.ts"; +function numberFromEnv(name: string, fallback: number): number { + const value = process.env[name]; + if (value === undefined || value === "") return fallback; + + const parsed = Number(value); + if (!Number.isFinite(parsed)) { + throw new Error(`${name} must be a finite number`); + } + + return parsed; +} + +function serverlessPoolConfig() { + const url = + process.env.RIVET_SERVERLESS_URL ?? + process.env.KITCHEN_SINK_SERVERLESS_URL ?? + (process.env.RIVET_RUN_ENGINE === "1" + ? "http://127.0.0.1:3000/api/rivet" + : undefined); + + if (!url) return undefined; + + return { + name: process.env.RIVET_POOL, + url, + requestLifespan: numberFromEnv("RIVET_SERVERLESS_REQUEST_LIFESPAN", 30), + drainGracePeriod: numberFromEnv("RIVET_SERVERLESS_DRAIN_GRACE_PERIOD", 5), + metadataPollInterval: numberFromEnv( + "RIVET_SERVERLESS_METADATA_POLL_INTERVAL_MS", + 1000, + ), + metadata: { + source: "kitchen-sink", + smoke: "raw-websocket-serverless", + }, + }; +} + export const registry = setup({ + configurePool: serverlessPoolConfig(), use: { // Overview + state basics counter, @@ -166,6 +206,7 @@ export const registry = setup({ rawWebSocketBinaryActor, rawFetchCounter, rawWebSocketChatRoom, + rawWebSocketServerlessSmoke, // Lifecycle and scheduling runWithTicks, runWithQueueConsumer, diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/raw-websocket.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/raw-websocket.ts index 1c7254dcea..244cd998a2 100644 --- a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/raw-websocket.ts +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/raw-websocket.ts @@ -169,6 +169,10 @@ export const rawWebSocketBinaryActor = actor({ }); export const rawWebSocketAsyncOpenActor = actor({ + options: { + canHibernateWebSocket: false, + sleepGracePeriod: 2_000, + }, state: { openCount: 0, }, diff --git a/rivetkit-typescript/packages/rivetkit/tests/driver/raw-websocket.test.ts b/rivetkit-typescript/packages/rivetkit/tests/driver/raw-websocket.test.ts index 5242f55c04..a1e850ca9c 100644 --- a/rivetkit-typescript/packages/rivetkit/tests/driver/raw-websocket.test.ts +++ b/rivetkit-typescript/packages/rivetkit/tests/driver/raw-websocket.test.ts @@ -5,6 +5,7 @@ import { getHibernatableWebSocketAckState } from "@/common/websocket-test-hooks" import { setupDriverTest } from "./shared-utils"; const HIBERNATABLE_ACK_SETTLE_TIMEOUT_MS = 12_000; +const DRIVER_API_TOKEN = "dev"; async function waitForJsonMessage( ws: WebSocket, @@ -471,6 +472,52 @@ describeDriverMatrix("Raw Websocket", (driverTestConfig) => { ws.close(); }); + test("force sleep disconnects non-hibernatable raw websocket", async (c) => { + const { client, endpoint, namespace } = await setupDriverTest( + c, + driverTestConfig, + ); + const actor = client.rawWebSocketAsyncOpenActor.getOrCreate([ + "force-sleep-disconnect", + ]); + const ws = await actor.webSocket(); + const ready = await waitForJsonMessage(ws, 5_000); + expect(ready?.type).toBe("async-open"); + const actorId = await actor.resolve(); + + const closePromise = new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + reject(new Error("timed out waiting for websocket close")); + }, 5_000); + ws.addEventListener("close", (event) => { + clearTimeout(timeout); + resolve(event); + }, { + once: true, + }); + }); + const response = await fetch( + `${endpoint}/actors/${encodeURIComponent(actorId)}/sleep?namespace=${encodeURIComponent(namespace)}`, + { + method: "POST", + headers: { + Authorization: `Bearer ${DRIVER_API_TOKEN}`, + "Content-Type": "application/json", + }, + body: "{}", + }, + ); + + if (!response.ok) { + throw new Error( + `failed to force actor sleep: ${response.status} ${await response.text()}`, + ); + } + const closeEvent = await closePromise; + expect(closeEvent.code).not.toBe(1006); + expect(ws.readyState).toBe(WebSocket.CLOSED); + }); + test("should properly handle onWebSocket open and close events", async (c) => { const { client } = await setupDriverTest(c, driverTestConfig); const actor = client.rawWebSocketActor.getOrCreate([ diff --git a/rivetkit-typescript/packages/rivetkit/tests/driver/shared-utils.ts b/rivetkit-typescript/packages/rivetkit/tests/driver/shared-utils.ts index 3ef7faf17b..845282b5c5 100644 --- a/rivetkit-typescript/packages/rivetkit/tests/driver/shared-utils.ts +++ b/rivetkit-typescript/packages/rivetkit/tests/driver/shared-utils.ts @@ -29,6 +29,7 @@ export async function setupDriverTest( ): Promise<{ client: Client; endpoint: string; + namespace: string; hardCrashActor?: (actorId: string) => Promise; hardCrashPreservesData: boolean; getRuntimeOutput: () => string; @@ -83,6 +84,7 @@ export async function setupDriverTest( return { client, endpoint, + namespace, hardCrashActor, hardCrashPreservesData: hardCrashPreservesData ?? false, getRuntimeOutput: getRuntimeOutput ?? (() => ""),