diff --git a/rivetkit-typescript/packages/rivetkit/tests/driver/engine-restart-serverless.md b/rivetkit-typescript/packages/rivetkit/tests/driver/engine-restart-serverless.md new file mode 100644 index 0000000000..e51d3631a6 --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/tests/driver/engine-restart-serverless.md @@ -0,0 +1,276 @@ +# Engine Restart Serverless Investigation + +## Scope + +This documents the local reproduction work for serverless Rivet Actor access after restarting `rivet-engine`. + +The harness runs: + +- `tests/driver/engine-restart-serverless.ts` +- `tests/fixtures/engine-restart-serverless-runtime.ts` + +The runtime is a normal Node process, not Vitest. It listens on a port with `serverless.basePath = "/api/rivet"`. The engine is started as a real `rivet-engine` binary and configured with a serverless runner config pointing at that runtime. + +## Main Finding + +The bug is not SQLite-specific. + +After `rivet-engine` restarts, there is a short window where gateway traffic to already-existing warmed actors can hang. This affects: + +- action calls through the client +- HTTP actor requests through `/gateway/{actor}/request/...` +- raw actor WebSockets through `/gateway/{actor}/websocket/...` + +New actor keys work immediately while warmed existing keys can hang, so this looks like stale or not-yet-settled gateway/serverless actor routing state for existing actors. + +## Timing Window + +Timing is measured from the moment engine `/health` returns after restart. + +Observed local window: + +- Unsafe: `0-2400ms` +- Flaky edge: roughly `2500-3250ms` +- Local minimum to trust: `3000ms` +- Conservative diagnostic delay: `5000ms` + +This is not a crisp timer. The edge moves run-to-run. + +## Baseline Actor Action Results + +These probes use the same existing actor key and a new key after engine restart. + +No heartbeat, immediate probe: + +- same-handle `getCount`: timed out +- same-handle `tick`: timed out +- fresh handle with same key `getCount`: timed out +- new key `tick`: succeeded + +This reproduced the "actor bricked" symptom: + +```text +bricked actor symptom reproduced. mode=idle failedPostRestartProbes=3 before=0 +``` + +No heartbeat, delayed probe: + +- same existing key succeeded +- new key succeeded + +This means the actor is not permanently broken in the local harness. It is unreachable through the existing-key gateway path during the post-restart race window. + +## Heartbeat Results + +The runtime supports `RIVETKIT_HEARTBEAT_MODE=none|sqlite|kv`. + +Immediate post-restart probes still reproduced the brick with: + +- no heartbeat +- SQLite heartbeat +- KV heartbeat + +Both SQLite and KV actor-originated heartbeat traffic could recover after restart while gateway/client traffic to the same warmed existing actor key still hung. + +Conclusion: heartbeat success does not prove gateway readiness, and SQLite is not special. + +## HTTP Gateway Health Sweep + +The actor exposes an `onRequest` health endpoint at: + +```text +/gateway/{actor}/request/health +``` + +The sweep warms one existing actor key per delay before restart, then probes each key once after engine `/health` returns. This avoids an early failed probe poisoning later delay measurements. + +Coarse sweep: + +```text +0ms: timeout +250ms: timeout +500ms: timeout +1000ms: timeout +2000ms: timeout +3000ms: 200 OK +5000ms: 200 OK +8000ms: 200 OK +12000ms: 200 OK +``` + +Narrow sweeps: + +```text +2000ms: timeout +2250ms: timeout +2500ms: timeout in one run, success in another +2750ms: success in one run +3000ms: success +3250ms: success +``` + +Low repeat: + +```text +1000ms: timeout +1500ms: timeout +1800ms: timeout +2000ms: timeout +2200ms: timeout +2400ms: timeout +``` + +HTTP conclusion: gateway HTTP actor requests become reliably usable around `3s` locally, with `5s` as the safer diagnostic delay. + +## WebSocket Ping/Pong Sweep + +The actor exposes `onWebSocket` ping/pong at: + +```text +/gateway/{actor}/websocket/ping +``` + +The client opens the WebSocket with Rivet gateway subprotocols: + +```text +rivet +rivet_encoding.bare +``` + +Then it sends: + +```json +{"type":"ping","sentAt":...} +``` + +And waits for: + +```json +{"type":"pong","sentAt":...} +``` + +Coarse sweep: + +```text +0ms: timeout +250ms: timeout +500ms: timeout +1000ms: timeout +2000ms: timeout +3000ms: pong +5000ms: pong +8000ms: pong +12000ms: pong +``` + +Narrow sweeps: + +```text +2000ms: timeout +2250ms: timeout +2500ms: timeout +2750ms: timeout in one run +3000ms: timeout in one run, success in another +3250ms: pong +``` + +Edge repeat: + +```text +2800ms: pong +3000ms: pong +3200ms: pong +3400ms: pong +3600ms: pong +``` + +Low repeat: + +```text +2200ms: timeout +2400ms: timeout +2600ms: timeout +2800ms: pong +``` + +WebSocket conclusion: raw actor WebSocket ping/pong has the same post-restart readiness window as HTTP gateway traffic. It is not action-specific. + +## Commit During Restart + +The harness can coordinate an actor action that starts a SQLite transaction, signals the driver, then attempts `COMMIT` after the engine is stopped. + +Immediate post-restart probes after this failed commit reproduced the same shape: + +- failed in-flight operation +- same existing key probes timed out +- new key succeeded + +Delayed post-restart probes passed. + +This is consistent with the gateway/serverless actor routing race, not durable SQLite session poisoning. + +## Important Corrections + +Earlier, it looked like a SQLite heartbeat caused later gateway probes to pass. That was wrong. A no-heartbeat delayed control also passed. + +The actual variable was delay after engine health, not SQLite activity. + +## Useful Logs + +Representative local logs: + +```text +/tmp/rivet-idle-none-immediate.log +/tmp/rivet-idle-kv-immediate.log +/tmp/rivet-idle-sqlite-immediate.log +/tmp/rivet-idle-none-after.log +/tmp/rivet-idle-kv-after.log +/tmp/rivet-idle-sqlite-after.log +/tmp/rivet-commit-none-immediate.log +/tmp/rivet-commit-none-after.log +/tmp/rivet-gateway-health-sweep.log +/tmp/rivet-gateway-health-sweep-narrow.log +/tmp/rivet-gateway-health-sweep-edge.log +/tmp/rivet-gateway-health-sweep-low.log +/tmp/rivet-gateway-websocket-sweep.log +/tmp/rivet-gateway-websocket-sweep-narrow.log +/tmp/rivet-gateway-websocket-sweep-edge.log +/tmp/rivet-gateway-websocket-sweep-low.log +``` + +## Commands + +Action/client repro: + +```bash +RIVETKIT_ENGINE_RESTART_MODE=idle \ +RIVETKIT_HEARTBEAT_MODE=none \ +RIVETKIT_POST_RESTART_PROBE_TIMING=immediate \ +node --import tsx tests/driver/engine-restart-serverless.ts +``` + +HTTP health sweep: + +```bash +RIVETKIT_ENGINE_RESTART_MODE=idle \ +RIVETKIT_HEARTBEAT_MODE=none \ +RIVETKIT_GATEWAY_HEALTH_DELAYS_MS=0,250,500,1000,2000,3000,5000,8000,12000 \ +node --import tsx tests/driver/engine-restart-serverless.ts +``` + +WebSocket ping/pong sweep: + +```bash +RIVETKIT_ENGINE_RESTART_MODE=idle \ +RIVETKIT_HEARTBEAT_MODE=none \ +RIVETKIT_GATEWAY_WEBSOCKET_DELAYS_MS=0,250,500,1000,2000,3000,5000,8000,12000 \ +node --import tsx tests/driver/engine-restart-serverless.ts +``` + +## Current Theory + +Engine `/health` returning does not mean gateway/serverless routing for previously warmed existing actors is fully settled. + +Requests in the first few seconds can attach to stale or incomplete actor routing/request state. Those requests hang until the caller timeout. A new key succeeds because it follows a fresh actor allocation path instead of the stale existing-key path. + +The likely fix area is the gateway/serverless/pegboard-envoy readiness and stale actor state path after engine restart, especially around existing actor generation, in-flight wake, stopped state, and pending request routing. diff --git a/rivetkit-typescript/packages/rivetkit/tests/driver/engine-restart-serverless.ts b/rivetkit-typescript/packages/rivetkit/tests/driver/engine-restart-serverless.ts new file mode 100644 index 0000000000..07b04c276a --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/tests/driver/engine-restart-serverless.ts @@ -0,0 +1,1327 @@ +import { type ChildProcess, spawn } from "node:child_process"; +import { existsSync, mkdtempSync, rmSync, writeFileSync } from "node:fs"; +import { createServer, type Server } from "node:http"; +import { tmpdir } from "node:os"; +import { dirname, join } from "node:path"; +import { fileURLToPath } from "node:url"; +import { getEnginePath } from "@rivetkit/engine-cli"; +import getPort from "get-port"; +import { createClient } from "../../src/client/mod"; + +const TEST_DIR = join(dirname(fileURLToPath(import.meta.url)), ".."); +const PACKAGE_DIR = dirname(TEST_DIR); +const REPO_ENGINE_BINARY = process.env.ENGINE_BINARY ?? join( + PACKAGE_DIR, + "../../../target/debug/rivet-engine", +); +const SERVERLESS_RUNTIME_PATH = join( + TEST_DIR, + "fixtures/engine-restart-serverless-runtime.ts", +); +const TOKEN = "dev"; +const HOST = "127.0.0.1"; +const ENGINE_START_TIMEOUT_MS = 90_000; +const SERVERLESS_START_TIMEOUT_MS = 30_000; +const INITIAL_COUNTER_READY_TIMEOUT_MS = 90_000; +const POST_RESTART_HEARTBEAT_OBSERVATION_MS = Number( + process.env.RIVETKIT_POST_RESTART_WAIT_MS ?? "12000", +); +const POST_RESTART_PROBE_TIMEOUT_MS = 20_000; +const GATEWAY_HEALTH_TIMEOUT_MS = Number( + process.env.RIVETKIT_GATEWAY_HEALTH_TIMEOUT_MS ?? "5000", +); +const COMMIT_FAILURE_TIMEOUT_MS = 45_000; +const RESTART_MODE = process.env.RIVETKIT_ENGINE_RESTART_MODE ?? "commit"; +const HEARTBEAT_MODE = process.env.RIVETKIT_HEARTBEAT_MODE ?? "sqlite"; +const POST_RESTART_PROBE_TIMING = + process.env.RIVETKIT_POST_RESTART_PROBE_TIMING ?? "after-heartbeat"; +const GATEWAY_HEALTH_DELAYS_MS = parseDelayList( + process.env.RIVETKIT_GATEWAY_HEALTH_DELAYS_MS, +); +const GATEWAY_WEBSOCKET_DELAYS_MS = parseDelayList( + process.env.RIVETKIT_GATEWAY_WEBSOCKET_DELAYS_MS, +); + +if (!["none", "sqlite", "kv"].includes(HEARTBEAT_MODE)) { + throw new Error("RIVETKIT_HEARTBEAT_MODE must be one of: none, sqlite, kv"); +} +if (!["immediate", "after-heartbeat"].includes(POST_RESTART_PROBE_TIMING)) { + throw new Error( + "RIVETKIT_POST_RESTART_PROBE_TIMING must be one of: immediate, after-heartbeat", + ); +} + +interface RuntimeLogs { + stdout: string; + stderr: string; +} + +interface HeartbeatStats { + ticks: number; + sqlOk: number; + sqlErr: number; + kvOk: number; + kvErr: number; + onWake: number; + onSleep: number; + abort: number; + rollbackErr: number; + lastOkCount: number | undefined; + lastError: string | undefined; +} + +interface CommitSignalServer { + url: string; + waitForSignal: Promise; + close: () => Promise; +} + +interface GatewayHealthTarget { + delayMs: number; + key: string; + url: string; +} + +interface GatewayWebSocketTarget { + delayMs: number; + key: string; + url: string; +} + +class OwnedEngine { + readonly dbRoot = mkdtempSync(join(tmpdir(), "rivetkit-engine-restart-")); + readonly endpoint: string; + readonly peerUrl: string; + readonly configPath = join(this.dbRoot, "config.json"); + readonly #guardPort: number; + readonly #apiPeerPort: number; + readonly #metricsPort: number; + #child: ChildProcess | undefined; + #logs: RuntimeLogs = { stdout: "", stderr: "" }; + + private constructor( + guardPort: number, + apiPeerPort: number, + metricsPort: number, + ) { + this.#guardPort = guardPort; + this.#apiPeerPort = apiPeerPort; + this.#metricsPort = metricsPort; + this.endpoint = `http://${HOST}:${guardPort}`; + this.peerUrl = `http://${HOST}:${apiPeerPort}`; + this.#writeConfig(); + } + + static async start(): Promise { + const guardPort = await getPort({ host: HOST }); + const apiPeerPort = await getPort({ host: HOST, exclude: [guardPort] }); + const metricsPort = await getPort({ + host: HOST, + exclude: [guardPort, apiPeerPort], + }); + const engine = new OwnedEngine(guardPort, apiPeerPort, metricsPort); + await engine.startProcess(); + return engine; + } + + async startProcess(): Promise { + if (isProcessRunning(this.#child)) { + return; + } + + this.#logs = { stdout: "", stderr: "" }; + const child = spawn( + resolveEngineBinaryPath(), + ["start", "--config", this.configPath], + { + env: { + ...process.env, + RIVET__GUARD__HOST: HOST, + RIVET__GUARD__PORT: this.#guardPort.toString(), + RIVET__API_PEER__HOST: HOST, + RIVET__API_PEER__PORT: this.#apiPeerPort.toString(), + RIVET__METRICS__HOST: HOST, + RIVET__METRICS__PORT: this.#metricsPort.toString(), + RIVET__FILE_SYSTEM__PATH: join(this.dbRoot, "db"), + }, + stdio: ["ignore", "pipe", "pipe"], + }, + ); + this.#child = child; + child.stdout?.on("data", (chunk) => { + const text = chunk.toString(); + this.#logs.stdout += text; + if (process.env.DRIVER_ENGINE_LOGS === "1") { + process.stderr.write(`[RESTART_ENG.OUT] ${text}`); + } + }); + child.stderr?.on("data", (chunk) => { + const text = chunk.toString(); + this.#logs.stderr += text; + if (process.env.DRIVER_ENGINE_LOGS === "1") { + process.stderr.write(`[RESTART_ENG.ERR] ${text}`); + } + }); + await waitForEngineHealth( + child, + this.#logs, + this.endpoint, + ENGINE_START_TIMEOUT_MS, + ); + console.log(`engine listening at ${this.endpoint} (${this.dbRoot})`); + } + + async stopProcess(signal: NodeJS.Signals = "SIGTERM"): Promise { + const child = this.#child; + if (!isProcessRunning(child)) { + return; + } + + await stopChildProcess(child, signal); + } + + async cleanup(): Promise { + await this.stopProcess(); + rmSync(this.dbRoot, { force: true, recursive: true }); + } + + #writeConfig(): void { + writeFileSync( + this.configPath, + JSON.stringify({ + topology: { + datacenter_label: 1, + datacenters: { + default: { + datacenter_label: 1, + is_leader: true, + public_url: this.endpoint, + peer_url: this.peerUrl, + }, + }, + }, + }), + ); + } +} + +async function stopChildProcess( + child: ChildProcess, + signal: NodeJS.Signals = "SIGTERM", +): Promise { + if (!isProcessRunning(child)) { + return; + } + + await new Promise((resolve) => { + let forceKill: NodeJS.Timeout | undefined; + const finish = () => { + if (forceKill) { + clearTimeout(forceKill); + } + resolve(); + }; + child.once("exit", finish); + child.kill(signal); + forceKill = setTimeout(() => { + if (isProcessRunning(child)) { + child.kill("SIGKILL"); + } + }, 5_000); + }); +} + +class ServerlessRuntime { + readonly url: string; + readonly #port: number; + #child: ChildProcess | undefined; + #logs: RuntimeLogs = { stdout: "", stderr: "" }; + + private constructor(port: number) { + this.#port = port; + this.url = `http://${HOST}:${port}/api/rivet`; + } + + static async start(input: { + endpoint: string; + namespace: string; + poolName: string; + }): Promise { + const port = await getPort({ host: HOST }); + const runtime = new ServerlessRuntime(port); + await runtime.startProcess(input); + return runtime; + } + + getOutput(): string { + return childOutput(this.#logs); + } + + async startProcess(input: { + endpoint: string; + namespace: string; + poolName: string; + }): Promise { + const child = spawn( + process.execPath, + ["--import", "tsx", SERVERLESS_RUNTIME_PATH], + { + cwd: PACKAGE_DIR, + env: { + ...process.env, + RIVET_TOKEN: TOKEN, + RIVET_NAMESPACE: input.namespace, + RIVETKIT_TEST_ENDPOINT: input.endpoint, + RIVETKIT_HEARTBEAT_MODE: HEARTBEAT_MODE, + RIVETKIT_TEST_HOST: HOST, + RIVETKIT_TEST_POOL_NAME: input.poolName, + RIVETKIT_TEST_PORT: this.#port.toString(), + }, + stdio: ["ignore", "pipe", "pipe"], + }, + ); + this.#child = child; + child.stdout?.on("data", (chunk) => { + const text = chunk.toString(); + this.#logs.stdout += text; + if (process.env.DRIVER_RUNTIME_LOGS === "1") { + process.stderr.write(`[SERVERLESS_RT.OUT] ${text}`); + } + }); + child.stderr?.on("data", (chunk) => { + const text = chunk.toString(); + this.#logs.stderr += text; + if (process.env.DRIVER_RUNTIME_LOGS === "1") { + process.stderr.write(`[SERVERLESS_RT.ERR] ${text}`); + } + }); + await waitForHttpOk({ + url: `${this.url}/health`, + child, + logs: this.#logs, + timeoutMs: SERVERLESS_START_TIMEOUT_MS, + }); + console.log(`serverless runtime listening at ${this.url}`); + } + + async cleanup(): Promise { + const child = this.#child; + if (!isProcessRunning(child)) { + return; + } + + await stopChildProcess(child, "SIGTERM"); + } +} + +async function main() { + const namespace = `restart-${crypto.randomUUID()}`; + const poolName = `serverless-restart-${crypto.randomUUID()}`; + const actorKey = `sqlite-counter-${crypto.randomUUID()}`; + const engine = await OwnedEngine.start(); + let runtime: ServerlessRuntime | undefined; + let client: ReturnType | undefined; + let signalServer: CommitSignalServer | undefined; + + try { + await createNamespace(engine.endpoint, namespace); + runtime = await ServerlessRuntime.start({ + endpoint: engine.endpoint, + namespace, + poolName, + }); + await upsertServerlessRunnerConfig({ + endpoint: engine.endpoint, + namespace, + poolName, + serverlessUrl: runtime.url, + }); + await waitForRunnerConfigReady({ + endpoint: engine.endpoint, + namespace, + poolName, + }); + + client = createClient({ + endpoint: engine.endpoint, + namespace, + poolName, + token: TOKEN, + encoding: "bare", + disableMetadataLookup: true, + }); + + const actorHandle = client.sqliteCounter.getOrCreate([actorKey]); + const heartbeatWarmupStartedAt = Date.now(); + const countBeforeRestart = (await actorHandle.getCount()) as number; + console.log( + `restart scenario configured. restartMode=${RESTART_MODE} heartbeatMode=${HEARTBEAT_MODE} postRestartProbeTiming=${POST_RESTART_PROBE_TIMING}`, + ); + if (HEARTBEAT_MODE !== "none") { + await waitFor( + () => + heartbeatSuccessCount( + getHeartbeatStats( + runtime?.getOutput() ?? "", + heartbeatWarmupStartedAt, + ), + ) >= 2, + INITIAL_COUNTER_READY_TIMEOUT_MS, + () => { + const stats = getHeartbeatStats( + runtime?.getOutput() ?? "", + heartbeatWarmupStartedAt, + ); + return `actor heartbeat did not start. ${formatHeartbeatStats(stats)}`; + }, + ); + } + const warmupStats = getHeartbeatStats( + runtime.getOutput(), + heartbeatWarmupStartedAt, + ); + console.log( + `actor-originated heartbeat warmup finished. before=${countBeforeRestart} ${formatHeartbeatStats(warmupStats)}`, + ); + const gatewayHealthTargets = + GATEWAY_HEALTH_DELAYS_MS.length > 0 + ? await prepareGatewayHealthTargets({ + client, + baseKey: actorKey, + delaysMs: GATEWAY_HEALTH_DELAYS_MS, + }) + : []; + const gatewayWebSocketTargets = + GATEWAY_WEBSOCKET_DELAYS_MS.length > 0 + ? await prepareGatewayWebSocketTargets({ + client, + baseKey: actorKey, + delaysMs: GATEWAY_WEBSOCKET_DELAYS_MS, + }) + : []; + if (RESTART_MODE === "idle") { + console.log( + `restarting engine while actor is idle at count=${countBeforeRestart}`, + ); + await sleep(1_000); + const restartStartedAt = Date.now(); + await engine.stopProcess("SIGTERM"); + console.log("restarting engine"); + await engine.startProcess(); + const engineRestartedAt = Date.now(); + + await runPostRestartSequence({ + runtime, + client, + actorHandle, + actorKey, + countBeforeRestart, + mode: "idle", + restartStartedAt, + engineRestartedAt, + gatewayHealthTargets, + gatewayWebSocketTargets, + }); + return; + } + + if (RESTART_MODE !== "commit") { + throw new Error( + `unsupported RIVETKIT_ENGINE_RESTART_MODE: ${RESTART_MODE}`, + ); + } + + console.log( + `starting coordinated commit failure at count=${countBeforeRestart}`, + ); + signalServer = await createCommitSignalServer(); + const commitAttemptStartedAt = Date.now(); + const commitAttempt = withTimeout( + actorHandle.commitDuringEngineRestart({ + signalUrl: signalServer.url, + delayBeforeCommitMs: 500, + payloadBytes: 8192, + }) as Promise, + COMMIT_FAILURE_TIMEOUT_MS, + "commit attempt did not finish after engine restart signal", + ).then( + (value) => ({ ok: true as const, value }), + (error) => ({ ok: false as const, error }), + ); + + await signalServer.waitForSignal; + console.log("actor reached pre-commit point; sending engine SIGTERM"); + const restartStartedAt = Date.now(); + await engine.stopProcess("SIGTERM"); + + const commitResult = await commitAttempt; + if (commitResult.ok) { + console.warn( + `commit unexpectedly succeeded after engine shutdown in ${Date.now() - commitAttemptStartedAt}ms`, + ); + } else { + console.log( + `commit failed after engine shutdown in ${Date.now() - commitAttemptStartedAt}ms: ${stringifyError(commitResult.error)}`, + ); + } + + console.log("restarting engine"); + await engine.startProcess(); + const engineRestartedAt = Date.now(); + + await runPostRestartSequence({ + runtime, + client, + actorHandle, + actorKey, + countBeforeRestart, + mode: "commit", + restartStartedAt, + engineRestartedAt, + gatewayHealthTargets, + gatewayWebSocketTargets, + }); + } finally { + await signalServer?.close(); + await client?.dispose(); + await runtime?.cleanup(); + await engine.cleanup(); + } +} + +function resolveEngineBinaryPath(): string { + if (existsSync(REPO_ENGINE_BINARY)) { + return REPO_ENGINE_BINARY; + } + + return getEnginePath(); +} + +function childOutput(logs: RuntimeLogs): string { + return [logs.stdout, logs.stderr].filter(Boolean).join("\n"); +} + +function isProcessRunning( + child: ChildProcess | undefined, +): child is ChildProcess { + return ( + child !== undefined && + child.exitCode === null && + child.signalCode === null + ); +} + +async function waitForEngineHealth( + child: ChildProcess, + logs: RuntimeLogs, + endpoint: string, + timeoutMs: number, +): Promise { + await waitForHttpOk({ + child, + logs, + timeoutMs, + url: `${endpoint}/health`, + }); +} + +async function waitForHttpOk(input: { + child?: ChildProcess; + logs: RuntimeLogs; + timeoutMs: number; + url: string; +}): Promise { + const deadline = Date.now() + input.timeoutMs; + + while (Date.now() < deadline) { + if (input.child && !isProcessRunning(input.child)) { + throw new Error( + `process exited before health check passed:\n${childOutput(input.logs)}`, + ); + } + + try { + const response = await fetch(input.url); + if (response.ok) { + return; + } + } catch { } + + await sleep(500); + } + + throw new Error( + `timed out waiting for health at ${input.url}:\n${childOutput(input.logs)}`, + ); +} + +async function createNamespace( + endpoint: string, + namespace: string, +): Promise { + const response = await fetch(`${endpoint}/namespaces`, { + method: "POST", + headers: { + Authorization: `Bearer ${TOKEN}`, + "Content-Type": "application/json", + }, + body: JSON.stringify({ + name: namespace, + display_name: `Engine restart ${namespace}`, + }), + }); + + if (!response.ok) { + throw new Error( + `failed to create namespace ${namespace}: ${response.status} ${await response.text()}`, + ); + } +} + +async function getDatacenter( + endpoint: string, + namespace: string, +): Promise { + const response = await fetch( + `${endpoint}/datacenters?namespace=${encodeURIComponent(namespace)}`, + { + headers: { + Authorization: `Bearer ${TOKEN}`, + }, + }, + ); + + if (!response.ok) { + throw new Error( + `failed to list datacenters: ${response.status} ${await response.text()}`, + ); + } + + const body = (await response.json()) as { + datacenters: Array<{ name: string }>; + }; + const datacenter = body.datacenters[0]?.name; + if (!datacenter) { + throw new Error("engine returned no datacenters"); + } + + return datacenter; +} + +async function upsertServerlessRunnerConfig(input: { + endpoint: string; + namespace: string; + poolName: string; + serverlessUrl: string; +}): Promise { + const datacenter = await getDatacenter(input.endpoint, input.namespace); + const response = await fetch( + `${input.endpoint}/runner-configs/${encodeURIComponent(input.poolName)}?namespace=${encodeURIComponent(input.namespace)}`, + { + method: "PUT", + headers: { + Authorization: `Bearer ${TOKEN}`, + "Content-Type": "application/json", + }, + body: JSON.stringify({ + datacenters: { + [datacenter]: { + serverless: { + url: input.serverlessUrl, + headers: { + "x-rivet-token": TOKEN, + }, + request_lifespan: 3600, + drain_grace_period: 5, + metadata_poll_interval: 1000, + max_runners: 10, + min_runners: 0, + runners_margin: 0, + slots_per_runner: 1, + }, + metadata: {}, + drain_on_version_upgrade: true, + }, + }, + }), + }, + ); + + if (!response.ok) { + throw new Error( + `failed to upsert serverless runner config: ${response.status} ${await response.text()}`, + ); + } +} + +async function waitForRunnerConfigReady(input: { + endpoint: string; + namespace: string; + poolName: string; +}): Promise { + const deadline = Date.now() + 30_000; + let bodyText = ""; + + while (Date.now() < deadline) { + const response = await fetch( + `${input.endpoint}/runner-configs?namespace=${encodeURIComponent(input.namespace)}&runner_name=${encodeURIComponent(input.poolName)}`, + { + headers: { + Authorization: `Bearer ${TOKEN}`, + }, + }, + ); + bodyText = await response.text(); + if (response.ok) { + const body = JSON.parse(bodyText) as { + runner_configs?: Record< + string, + { + datacenters?: Record< + string, + { + protocol_version?: number | null; + } + >; + } + >; + }; + const config = body.runner_configs?.[input.poolName]; + const datacenters = Object.values(config?.datacenters ?? {}); + if ( + datacenters.length > 0 && + datacenters.every( + (datacenter) => datacenter.protocol_version != null, + ) + ) { + return; + } + } + + await sleep(250); + } + + throw new Error(`serverless runner config was not ready: ${bodyText}`); +} + +async function createCommitSignalServer(): Promise { + const port = await getPort({ host: HOST }); + let resolveSignal!: () => void; + const waitForSignal = new Promise((resolve) => { + resolveSignal = resolve; + }); + const server: Server = createServer((request, response) => { + if (request.method === "POST" && request.url === "/before-commit") { + resolveSignal(); + response.writeHead(204); + response.end(); + return; + } + + response.writeHead(404); + response.end(); + }); + + await new Promise((resolve) => { + server.listen(port, HOST, resolve); + }); + + return { + url: `http://${HOST}:${port}/before-commit`, + waitForSignal, + close: () => + new Promise((resolve, reject) => { + server.close((error) => { + if (error) { + reject(error); + } else { + resolve(); + } + }); + }), + }; +} + +async function probeActor( + name: string, + operation: () => Promise, +): Promise<{ + name: string; + elapsedMs: number; + result: unknown; +}> { + const startedAt = Date.now(); + const result = await withTimeout( + operation(), + POST_RESTART_PROBE_TIMEOUT_MS, + `${name} did not complete after engine restart`, + ); + + return { + name, + elapsedMs: Date.now() - startedAt, + result, + }; +} + +async function prepareGatewayHealthTargets(input: { + client: ReturnType; + baseKey: string; + delaysMs: number[]; +}): Promise { + const targets: GatewayHealthTarget[] = []; + for (const delayMs of input.delaysMs) { + const key = `${input.baseKey}-gateway-health-${delayMs}`; + const handle = input.client.sqliteCounter.getOrCreate([key]); + await handle.getCount(); + const url = buildActorRequestUrl(await handle.getGatewayUrl(), "health"); + const response = await fetch(url, { + signal: AbortSignal.timeout(GATEWAY_HEALTH_TIMEOUT_MS), + }); + if (!response.ok) { + throw new Error( + `gateway health preflight failed for delay ${delayMs}: ${response.status} ${await response.text()}`, + ); + } + targets.push({ + delayMs, + key, + url, + }); + } + + console.log( + `gateway health targets warmed. delaysMs=${input.delaysMs.join(",")}`, + ); + return targets; +} + +async function runGatewayHealthDelaySweep(input: { + targets: GatewayHealthTarget[]; + engineRestartedAt: number; + mode: string; +}): Promise { + const startedAt = Date.now(); + console.log( + `gateway health delay sweep starting. mode=${input.mode} delaysMs=${input.targets.map((target) => target.delayMs).join(",")}`, + ); + + const results = await Promise.all( + input.targets.map(async (target) => { + const sleepMs = Math.max(0, input.engineRestartedAt + target.delayMs - Date.now()); + if (sleepMs > 0) { + await sleep(sleepMs); + } + + const probeStartedAt = Date.now(); + try { + const response = await fetch(target.url, { + signal: AbortSignal.timeout(GATEWAY_HEALTH_TIMEOUT_MS), + }); + const body = await response.text(); + return { + ...target, + ok: response.ok, + status: response.status, + elapsedMs: Date.now() - probeStartedAt, + startOffsetMs: probeStartedAt - input.engineRestartedAt, + body, + }; + } catch (error) { + return { + ...target, + ok: false, + status: "error", + elapsedMs: Date.now() - probeStartedAt, + startOffsetMs: probeStartedAt - input.engineRestartedAt, + body: stringifyError(error), + }; + } + }), + ); + + for (const result of results.sort((a, b) => a.delayMs - b.delayMs)) { + const body = result.body.length > 240 ? `${result.body.slice(0, 240)}...` : result.body; + console.log( + `gateway-health delayMs=${result.delayMs} startOffsetMs=${result.startOffsetMs} ok=${result.ok} status=${result.status} elapsedMs=${result.elapsedMs} key=${result.key} body=${JSON.stringify(body)}`, + ); + } + + const firstOk = results + .filter((result) => result.ok) + .sort((a, b) => a.delayMs - b.delayMs)[0]; + if (firstOk) { + console.log( + `gateway health first success. mode=${input.mode} delayMs=${firstOk.delayMs} totalSweepMs=${Date.now() - startedAt}`, + ); + } else { + console.log( + `gateway health no successes. mode=${input.mode} totalSweepMs=${Date.now() - startedAt}`, + ); + } +} + +function buildActorRequestUrl(gatewayUrl: string, path: string): string { + const url = new URL(gatewayUrl); + const normalizedPath = path.replace(/^\/+/, ""); + url.pathname = `${url.pathname.replace(/\/$/, "")}/request/${normalizedPath}`; + return url.toString(); +} + +async function prepareGatewayWebSocketTargets(input: { + client: ReturnType; + baseKey: string; + delaysMs: number[]; +}): Promise { + const targets: GatewayWebSocketTarget[] = []; + for (const delayMs of input.delaysMs) { + const key = `${input.baseKey}-gateway-ws-${delayMs}`; + const handle = input.client.sqliteCounter.getOrCreate([key]); + await handle.getCount(); + const url = buildActorWebSocketUrl(await handle.getGatewayUrl(), "ping"); + await openWebSocketPingPong(url); + targets.push({ + delayMs, + key, + url, + }); + } + + console.log( + `gateway websocket targets warmed. delaysMs=${input.delaysMs.join(",")}`, + ); + return targets; +} + +async function runGatewayWebSocketDelaySweep(input: { + targets: GatewayWebSocketTarget[]; + engineRestartedAt: number; + mode: string; +}): Promise { + const startedAt = Date.now(); + console.log( + `gateway websocket delay sweep starting. mode=${input.mode} delaysMs=${input.targets.map((target) => target.delayMs).join(",")}`, + ); + + const results = await Promise.all( + input.targets.map(async (target) => { + const sleepMs = Math.max(0, input.engineRestartedAt + target.delayMs - Date.now()); + if (sleepMs > 0) { + await sleep(sleepMs); + } + + const probeStartedAt = Date.now(); + try { + const message = await openWebSocketPingPong(target.url); + return { + ...target, + ok: true, + elapsedMs: Date.now() - probeStartedAt, + startOffsetMs: probeStartedAt - input.engineRestartedAt, + body: message, + }; + } catch (error) { + return { + ...target, + ok: false, + elapsedMs: Date.now() - probeStartedAt, + startOffsetMs: probeStartedAt - input.engineRestartedAt, + body: stringifyError(error), + }; + } + }), + ); + + for (const result of results.sort((a, b) => a.delayMs - b.delayMs)) { + const body = result.body.length > 240 ? `${result.body.slice(0, 240)}...` : result.body; + console.log( + `gateway-websocket delayMs=${result.delayMs} startOffsetMs=${result.startOffsetMs} ok=${result.ok} elapsedMs=${result.elapsedMs} key=${result.key} body=${JSON.stringify(body)}`, + ); + } + + const firstOk = results + .filter((result) => result.ok) + .sort((a, b) => a.delayMs - b.delayMs)[0]; + if (firstOk) { + console.log( + `gateway websocket first success. mode=${input.mode} delayMs=${firstOk.delayMs} totalSweepMs=${Date.now() - startedAt}`, + ); + } else { + console.log( + `gateway websocket no successes. mode=${input.mode} totalSweepMs=${Date.now() - startedAt}`, + ); + } +} + +function buildActorWebSocketUrl(gatewayUrl: string, path: string): string { + const url = new URL(gatewayUrl); + const normalizedPath = path.replace(/^\/+/, ""); + url.pathname = `${url.pathname.replace(/\/$/, "")}/websocket/${normalizedPath}`; + url.protocol = url.protocol === "https:" ? "wss:" : "ws:"; + return url.toString(); +} + +function openWebSocketPingPong(url: string): Promise { + return new Promise((resolve, reject) => { + let settled = false; + const websocket = new WebSocket(url, ["rivet", "rivet_encoding.bare"]); + const timeout = setTimeout(() => { + finish(new Error("websocket ping/pong timed out")); + }, GATEWAY_HEALTH_TIMEOUT_MS); + + const finish = (result: string | Error) => { + if (settled) { + return; + } + settled = true; + clearTimeout(timeout); + try { + websocket.close(); + } catch { } + if (result instanceof Error) { + reject(result); + } else { + resolve(result); + } + }; + + websocket.addEventListener("open", () => { + websocket.send(JSON.stringify({ type: "ping", sentAt: Date.now() })); + }); + websocket.addEventListener("message", (event) => { + const data = typeof event.data === "string" ? event.data : String(event.data); + try { + const message = JSON.parse(data) as { type?: string }; + if (message.type === "pong") { + finish(data); + } + } catch { + finish(new Error(`invalid websocket message: ${data}`)); + } + }); + websocket.addEventListener("error", () => { + finish(new Error("websocket error")); + }); + websocket.addEventListener("close", (event) => { + if (!settled) { + finish( + new Error( + `websocket closed before pong: code=${event.code} reason=${event.reason}`, + ), + ); + } + }); + }); +} + +async function runPostRestartSequence(input: { + runtime: ServerlessRuntime; + client: ReturnType; + actorHandle: ReturnType< + ReturnType["sqliteCounter"]["getOrCreate"] + >; + actorKey: string; + countBeforeRestart: number; + mode: string; + restartStartedAt: number; + engineRestartedAt: number; + gatewayHealthTargets: GatewayHealthTarget[]; + gatewayWebSocketTargets: GatewayWebSocketTarget[]; +}): Promise { + if (input.gatewayWebSocketTargets.length > 0) { + await runGatewayWebSocketDelaySweep({ + targets: input.gatewayWebSocketTargets, + engineRestartedAt: input.engineRestartedAt, + mode: input.mode, + }); + return; + } + + if (input.gatewayHealthTargets.length > 0) { + await runGatewayHealthDelaySweep({ + targets: input.gatewayHealthTargets, + engineRestartedAt: input.engineRestartedAt, + mode: input.mode, + }); + return; + } + + if (POST_RESTART_PROBE_TIMING === "immediate") { + await runPostRestartProbes(input); + await observePostRestartHeartbeat(input); + return; + } + + await observePostRestartHeartbeat(input); + await runPostRestartProbes(input); +} + +async function runPostRestartProbes(input: { + client: ReturnType; + actorHandle: ReturnType< + ReturnType["sqliteCounter"]["getOrCreate"] + >; + actorKey: string; + countBeforeRestart: number; + mode: string; +}): Promise { + const probeResults = await Promise.allSettled([ + probeActor("same-handle-getCount", () => input.actorHandle.getCount()), + probeActor("same-handle-tick", () => input.actorHandle.tick(8192)), + probeActor("fresh-handle-getCount", () => + input.client.sqliteCounter.getOrCreate([input.actorKey]).getCount(), + ), + probeActor("new-key-tick", () => + input.client.sqliteCounter + .getOrCreate([`post-restart-${crypto.randomUUID()}`]) + .tick(8192), + ), + ]); + let postRestartProbeFailures = 0; + for (const probeResult of probeResults) { + if (probeResult.status === "fulfilled") { + console.log( + `${probeResult.value.name} post-restart probe ok elapsedMs=${probeResult.value.elapsedMs} result=${JSON.stringify(probeResult.value.result)}`, + ); + } else { + postRestartProbeFailures += 1; + console.warn( + `post-restart probe failed: ${stringifyError(probeResult.reason)}`, + ); + } + } + + if (postRestartProbeFailures > 0) { + console.log( + `bricked actor symptom reproduced. mode=${input.mode} failedPostRestartProbes=${postRestartProbeFailures} before=${input.countBeforeRestart}`, + ); + } else { + console.log( + `serverless restart scenario passed without bricking. mode=${input.mode} before=${input.countBeforeRestart}`, + ); + } +} + +async function observePostRestartHeartbeat(input: { + runtime: ServerlessRuntime; + mode: string; + restartStartedAt: number; + engineRestartedAt: number; +}): Promise { + await sleep(POST_RESTART_HEARTBEAT_OBSERVATION_MS); + + const output = input.runtime.getOutput(); + const duringRestart = getHeartbeatStats(output, input.restartStartedAt); + const afterEngineRestarted = getHeartbeatStats(output, input.engineRestartedAt); + + console.log( + `heartbeat observation since restart signal. mode=${input.mode} ${formatHeartbeatStats(duringRestart)}`, + ); + console.log( + `heartbeat observation after engine healthy. mode=${input.mode} ${formatHeartbeatStats(afterEngineRestarted)}`, + ); + + if (HEARTBEAT_MODE === "none") { + console.log( + `no actor-originated heartbeat configured after engine restart. mode=${input.mode}`, + ); + } else if (heartbeatSuccessCount(afterEngineRestarted) > 0) { + console.log( + `actor-originated ${HEARTBEAT_MODE} survived engine restart. mode=${input.mode}`, + ); + } else if (heartbeatErrorCount(afterEngineRestarted) > 0) { + console.log( + `actor-originated ${HEARTBEAT_MODE} is failing after engine restart. mode=${input.mode} lastError=${afterEngineRestarted.lastError}`, + ); + } else if (duringRestart.onSleep > 0 || duringRestart.abort > 0) { + console.log( + `actor heartbeat stopped because actor shutdown ran during restart. mode=${input.mode}`, + ); + } else { + console.log( + `actor heartbeat produced no post-restart ${HEARTBEAT_MODE} signal. mode=${input.mode}`, + ); + } +} + +function getHeartbeatStats(output: string, sinceTs: number): HeartbeatStats { + const stats: HeartbeatStats = { + ticks: 0, + sqlOk: 0, + sqlErr: 0, + kvOk: 0, + kvErr: 0, + onWake: 0, + onSleep: 0, + abort: 0, + rollbackErr: 0, + lastOkCount: undefined, + lastError: undefined, + }; + + for (const line of output.split(/\r?\n/)) { + if (!line.startsWith("{")) { + continue; + } + + let event: { + event?: string; + ts?: number; + count?: number; + error?: string; + }; + try { + event = JSON.parse(line); + } catch { + continue; + } + + if (!event.event || typeof event.ts !== "number" || event.ts < sinceTs) { + continue; + } + + switch (event.event) { + case "heartbeat_tick": + stats.ticks += 1; + break; + case "heartbeat_sql_ok": + stats.sqlOk += 1; + if (typeof event.count === "number") { + stats.lastOkCount = event.count; + } + break; + case "heartbeat_sql_err": + stats.sqlErr += 1; + stats.lastError = event.error; + break; + case "heartbeat_kv_ok": + stats.kvOk += 1; + if (typeof event.count === "number") { + stats.lastOkCount = event.count; + } + break; + case "heartbeat_kv_err": + stats.kvErr += 1; + stats.lastError = event.error; + break; + case "heartbeat_on_wake": + stats.onWake += 1; + break; + case "heartbeat_on_sleep": + stats.onSleep += 1; + break; + case "heartbeat_abort": + stats.abort += 1; + break; + case "heartbeat_rollback_err": + stats.rollbackErr += 1; + stats.lastError = event.error; + break; + default: + break; + } + } + + return stats; +} + +function formatHeartbeatStats(stats: HeartbeatStats): string { + return [ + `ticks=${stats.ticks}`, + `sqlOk=${stats.sqlOk}`, + `sqlErr=${stats.sqlErr}`, + `kvOk=${stats.kvOk}`, + `kvErr=${stats.kvErr}`, + `onWake=${stats.onWake}`, + `onSleep=${stats.onSleep}`, + `abort=${stats.abort}`, + `rollbackErr=${stats.rollbackErr}`, + `lastOkCount=${stats.lastOkCount ?? "none"}`, + `lastError=${stats.lastError ?? "none"}`, + ].join(" "); +} + +function heartbeatSuccessCount(stats: HeartbeatStats): number { + if (HEARTBEAT_MODE === "sqlite") { + return stats.sqlOk; + } + if (HEARTBEAT_MODE === "kv") { + return stats.kvOk; + } + return 0; +} + +function heartbeatErrorCount(stats: HeartbeatStats): number { + if (HEARTBEAT_MODE === "sqlite") { + return stats.sqlErr; + } + if (HEARTBEAT_MODE === "kv") { + return stats.kvErr; + } + return 0; +} + +function parseDelayList(value: string | undefined): number[] { + if (!value) { + return []; + } + + return value + .split(",") + .map((part) => Number(part.trim())) + .filter((delayMs) => Number.isFinite(delayMs) && delayMs >= 0) + .sort((a, b) => a - b); +} + +async function waitFor( + predicate: () => boolean, + timeoutMs: number, + makeError: () => string, +): Promise { + const deadline = Date.now() + timeoutMs; + + while (Date.now() < deadline) { + if (predicate()) { + return; + } + await sleep(250); + } + + throw new Error(makeError()); +} + +function stringifyError(error: unknown): string { + return error instanceof Error ? error.message : String(error); +} + +function withTimeout( + promise: Promise, + timeoutMs: number, + message: string, +): Promise { + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + reject(new Error(message)); + }, timeoutMs); + + promise.then( + (value) => { + clearTimeout(timeout); + resolve(value); + }, + (error) => { + clearTimeout(timeout); + reject(error); + }, + ); + }); +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +main().catch((error) => { + console.error(error); + process.exit(1); +}); diff --git a/rivetkit-typescript/packages/rivetkit/tests/fixtures/engine-restart-serverless-runtime.ts b/rivetkit-typescript/packages/rivetkit/tests/fixtures/engine-restart-serverless-runtime.ts new file mode 100644 index 0000000000..fa3e2a9372 --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/tests/fixtures/engine-restart-serverless-runtime.ts @@ -0,0 +1,472 @@ +import { serve } from "@hono/node-server"; +import { actor, setup } from "rivetkit"; + +const host = process.env.RIVETKIT_TEST_HOST ?? "127.0.0.1"; +const port = Number(process.env.RIVETKIT_TEST_PORT); +const endpoint = process.env.RIVETKIT_TEST_ENDPOINT; +const namespace = process.env.RIVET_NAMESPACE; +const token = process.env.RIVET_TOKEN ?? "dev"; +const poolName = process.env.RIVETKIT_TEST_POOL_NAME; +const heartbeatMode = process.env.RIVETKIT_HEARTBEAT_MODE ?? "sqlite"; + +if (!Number.isInteger(port) || port <= 0) { + throw new Error("RIVETKIT_TEST_PORT must be a positive integer"); +} +if (!endpoint) { + throw new Error("RIVETKIT_TEST_ENDPOINT is required"); +} +if (!namespace) { + throw new Error("RIVET_NAMESPACE is required"); +} +if (!poolName) { + throw new Error("RIVETKIT_TEST_POOL_NAME is required"); +} +if (!["none", "sqlite", "kv"].includes(heartbeatMode)) { + throw new Error("RIVETKIT_HEARTBEAT_MODE must be one of: none, sqlite, kv"); +} + +interface SqliteDatabase { + run(sql: string, params?: unknown[]): Promise; + query( + sql: string, + params?: unknown[], + ): Promise<{ + rows: unknown[][]; + }>; +} + +interface HeartbeatVars { + heartbeatTimer?: ReturnType; + heartbeatInFlight?: boolean; + heartbeatSeq?: number; +} + +const rawSqlDatabaseProvider = { + createClient: async () => ({ + execute: async () => [], + close: async () => {}, + }), + onMigrate: async () => {}, +}; + +async function ensureTables(database: SqliteDatabase) { + await database.run(` + CREATE TABLE IF NOT EXISTS restart_counter ( + id INTEGER PRIMARY KEY CHECK (id = 1), + count INTEGER NOT NULL + ) + `); + await database.run(` + CREATE TABLE IF NOT EXISTS restart_counter_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + count INTEGER NOT NULL, + payload TEXT NOT NULL, + created_at INTEGER NOT NULL + ) + `); + await database.run(` + CREATE TABLE IF NOT EXISTS restart_heartbeat ( + id INTEGER PRIMARY KEY CHECK (id = 1), + count INTEGER NOT NULL, + updated_at INTEGER NOT NULL + ) + `); +} + +function stringifyError(error: unknown): string { + return error instanceof Error ? error.message : String(error); +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +function logRuntimeEvent(event: string, fields: Record) { + console.log( + JSON.stringify({ + event, + ts: Date.now(), + ...fields, + }), + ); +} + +async function runHeartbeatSql(database: SqliteDatabase): Promise { + await ensureTables(database); + await database.run("BEGIN"); + try { + await database.run( + ` + INSERT INTO restart_heartbeat (id, count, updated_at) + VALUES (1, 1, ?) + ON CONFLICT(id) DO UPDATE SET + count = count + 1, + updated_at = excluded.updated_at + `, + [Date.now()], + ); + await database.run("COMMIT"); + + const rows = await database.query( + "SELECT count FROM restart_heartbeat WHERE id = ?", + [1], + ); + return Number(rows.rows[0]?.[0] ?? 0); + } catch (error) { + try { + await database.run("ROLLBACK"); + } catch (rollbackError) { + logRuntimeEvent("heartbeat_rollback_err", { + error: stringifyError(rollbackError), + }); + } + throw error; + } +} + +async function runHeartbeatKv( + kv: { + get(key: string): Promise; + put(key: string, value: string): Promise; + }, +): Promise { + const current = Number((await kv.get("heartbeat_count")) ?? "0"); + const next = current + 1; + await kv.put("heartbeat_count", String(next)); + return next; +} + +const sqliteCounter = actor({ + state: {}, + vars: {} as HeartbeatVars, + db: rawSqlDatabaseProvider, + onRequest: (ctx, request) => { + const url = new URL(request.url); + if (url.pathname !== "/health") { + return new Response("not found", { status: 404 }); + } + + logRuntimeEvent("gateway_health_request", { + actorId: ctx.actorId, + key: ctx.key, + }); + return new Response( + JSON.stringify({ + ok: true, + actorId: ctx.actorId, + key: ctx.key, + }), + { + headers: { "Content-Type": "application/json" }, + }, + ); + }, + onWebSocket: (ctx, websocket) => { + logRuntimeEvent("gateway_websocket_open", { + actorId: ctx.actorId, + key: ctx.key, + path: ctx.request ? new URL(ctx.request.url).pathname : "unknown", + }); + websocket.addEventListener("message", (event: { data: unknown }) => { + const data = typeof event.data === "string" ? event.data : String(event.data); + try { + const message = JSON.parse(data) as { type?: string; sentAt?: number }; + if (message.type === "ping") { + websocket.send( + JSON.stringify({ + type: "pong", + sentAt: message.sentAt, + actorId: ctx.actorId, + key: ctx.key, + }), + ); + return; + } + } catch {} + + websocket.send(data); + }); + websocket.addEventListener("close", () => { + logRuntimeEvent("gateway_websocket_close", { + actorId: ctx.actorId, + key: ctx.key, + }); + }); + }, + onWake: async (ctx) => { + const vars = ctx.vars as HeartbeatVars; + if (vars.heartbeatTimer) { + return; + } + + const database = ctx.sql as SqliteDatabase; + vars.heartbeatSeq = 0; + logRuntimeEvent("heartbeat_on_wake", { + actorId: ctx.actorId, + key: ctx.key, + mode: heartbeatMode, + }); + if (heartbeatMode === "none") { + return; + } + + const tick = async () => { + if (ctx.abortSignal.aborted || vars.heartbeatInFlight) { + return; + } + + vars.heartbeatInFlight = true; + const seq = (vars.heartbeatSeq ?? 0) + 1; + vars.heartbeatSeq = seq; + logRuntimeEvent("heartbeat_tick", { + actorId: ctx.actorId, + key: ctx.key, + seq, + mode: heartbeatMode, + }); + + try { + if (heartbeatMode === "sqlite") { + const count = await runHeartbeatSql(database); + logRuntimeEvent("heartbeat_sql_ok", { + actorId: ctx.actorId, + key: ctx.key, + seq, + count, + }); + } else { + const count = await runHeartbeatKv(ctx.kv); + logRuntimeEvent("heartbeat_kv_ok", { + actorId: ctx.actorId, + key: ctx.key, + seq, + count, + }); + } + } catch (error) { + logRuntimeEvent( + heartbeatMode === "sqlite" ? "heartbeat_sql_err" : "heartbeat_kv_err", + { + actorId: ctx.actorId, + key: ctx.key, + seq, + error: stringifyError(error), + }, + ); + } finally { + vars.heartbeatInFlight = false; + } + }; + + vars.heartbeatTimer = setInterval(() => { + void tick(); + }, 1_000); + + ctx.abortSignal.addEventListener( + "abort", + () => { + if (vars.heartbeatTimer) { + clearInterval(vars.heartbeatTimer); + vars.heartbeatTimer = undefined; + } + logRuntimeEvent("heartbeat_abort", { + actorId: ctx.actorId, + key: ctx.key, + mode: heartbeatMode, + }); + }, + { once: true }, + ); + + await tick(); + }, + onSleep: (ctx) => { + const vars = ctx.vars as HeartbeatVars; + if (vars.heartbeatTimer) { + clearInterval(vars.heartbeatTimer); + vars.heartbeatTimer = undefined; + } + logRuntimeEvent("heartbeat_on_sleep", { + actorId: ctx.actorId, + key: ctx.key, + mode: heartbeatMode, + }); + }, + actions: { + tick: async (ctx, payloadBytes = 4096) => { + const database = ctx.sql as SqliteDatabase; + const payload = "x".repeat(Math.max(0, Math.trunc(payloadBytes))); + const now = Date.now(); + + await ensureTables(database); + await database.run("BEGIN"); + try { + await database.run( + ` + INSERT INTO restart_counter (id, count) + VALUES (1, 1) + ON CONFLICT(id) DO UPDATE SET count = count + 1 + `, + ); + const counterRows = await database.query( + "SELECT count FROM restart_counter WHERE id = ?", + [1], + ); + const count = Number(counterRows.rows[0]?.[0] ?? 0); + await database.run( + ` + INSERT INTO restart_counter_events (count, payload, created_at) + VALUES (?, ?, ?) + `, + [count, payload, now], + ); + await database.run( + ` + DELETE FROM restart_counter_events + WHERE id IN ( + SELECT id FROM restart_counter_events + ORDER BY id ASC + LIMIT max((SELECT COUNT(*) FROM restart_counter_events) - 200, 0) + ) + `, + ); + await database.run("COMMIT"); + + const eventRows = await database.query( + "SELECT COUNT(*) AS events FROM restart_counter_events", + ); + + return { + count, + events: Number(eventRows.rows[0]?.[0] ?? 0), + }; + } catch (error) { + await database.run("ROLLBACK"); + throw error; + } + }, + getCount: async (ctx) => { + const database = ctx.sql as SqliteDatabase; + await ensureTables(database); + const rows = await database.query( + "SELECT count FROM restart_counter WHERE id = ?", + [1], + ); + return Number(rows.rows[0]?.[0] ?? 0); + }, + commitDuringEngineRestart: async ( + ctx, + input: { + signalUrl: string; + delayBeforeCommitMs?: number; + payloadBytes?: number; + }, + ) => { + const database = ctx.sql as SqliteDatabase; + const payload = "x".repeat( + Math.max(0, Math.trunc(input.payloadBytes ?? 8192)), + ); + + await ensureTables(database); + await database.run("BEGIN"); + await database.run( + ` + INSERT INTO restart_counter (id, count) + VALUES (1, 1) + ON CONFLICT(id) DO UPDATE SET count = count + 1 + `, + ); + await database.run( + ` + INSERT INTO restart_counter_events (count, payload, created_at) + VALUES ((SELECT count FROM restart_counter WHERE id = 1), ?, ?) + `, + [payload, Date.now()], + ); + + await fetch(input.signalUrl, { method: "POST" }); + await sleep( + Math.max(0, Math.trunc(input.delayBeforeCommitMs ?? 500)), + ); + + const commitStartedAt = Date.now(); + try { + await database.run("COMMIT"); + return { + ok: true, + commitDurationMs: Date.now() - commitStartedAt, + }; + } catch (commitError) { + let rollbackErrorMessage: string | undefined; + try { + await database.run("ROLLBACK"); + } catch (rollbackError) { + rollbackErrorMessage = stringifyError(rollbackError); + } + + console.error( + JSON.stringify({ + event: "commitDuringEngineRestartFailed", + commitDurationMs: Date.now() - commitStartedAt, + commitError: stringifyError(commitError), + rollbackError: rollbackErrorMessage, + }), + ); + + throw new Error( + `commit failed: ${stringifyError(commitError)}; rollback failed: ${rollbackErrorMessage ?? "no"}`, + ); + } + }, + }, + options: { + sleepTimeout: 300_000, + }, +}); + +const registry = setup({ + use: { + sqliteCounter, + }, + runtime: "native", + sqlite: "remote", + endpoint, + namespace, + token, + envoy: { + poolName, + }, + serverless: { + basePath: "/api/rivet", + }, + noWelcome: true, + test: { + enabled: true, + sqliteBackend: "remote", + }, +}); + +const server = serve( + { + fetch: (request) => registry.handler(request), + hostname: host, + port, + }, + () => { + console.log( + JSON.stringify({ + event: "listening", + url: `http://${host}:${port}/api/rivet`, + }), + ); + }, +); + +function shutdown() { + server.close(() => { + process.exit(0); + }); +} + +process.on("SIGTERM", shutdown); +process.on("SIGINT", shutdown);