diff --git a/engine/packages/config/src/config/pegboard.rs b/engine/packages/config/src/config/pegboard.rs index 574a4aa251..f934ba54f7 100644 --- a/engine/packages/config/src/config/pegboard.rs +++ b/engine/packages/config/src/config/pegboard.rs @@ -159,7 +159,7 @@ pub struct Pegboard { impl Pegboard { pub fn base_retry_timeout(&self) -> usize { - self.base_retry_timeout.unwrap_or(2000) + self.base_retry_timeout.unwrap_or(2_000) } pub fn actor_allocation_threshold(&self) -> i64 { @@ -177,7 +177,7 @@ impl Pegboard { } pub fn actor_retry_duration_threshold(&self) -> i64 { - self.actor_retry_duration_threshold.unwrap_or(300_000) + self.actor_retry_duration_threshold.unwrap_or(5 * 60 * 1000) } pub fn retry_reset_duration(&self) -> i64 { @@ -202,7 +202,7 @@ impl Pegboard { } pub fn serverless_base_retry_timeout(&self) -> usize { - self.serverless_base_retry_timeout.unwrap_or(2000) + self.serverless_base_retry_timeout.unwrap_or(2_000) } pub fn serverless_retry_reset_duration(&self) -> i64 { @@ -237,7 +237,7 @@ impl Pegboard { pub fn gateway_response_start_timeout_ms(&self) -> u64 { self.gateway_response_start_timeout_ms - .unwrap_or(5 * 60 * 1000) // 5 minutes + .unwrap_or(5 * 60 * 1000) } pub fn gateway_update_ping_interval_ms(&self) -> u64 { diff --git a/examples/kitchen-sink/Dockerfile.local b/examples/kitchen-sink/Dockerfile.local index 9818cfa3bf..90c4ca5f65 100644 --- a/examples/kitchen-sink/Dockerfile.local +++ b/examples/kitchen-sink/Dockerfile.local @@ -1,4 +1,4 @@ -FROM node:22-slim +FROM node:22-trixie-slim WORKDIR /app ENV NODE_OPTIONS=--max-old-space-size=7168 diff --git a/examples/kitchen-sink/package.json b/examples/kitchen-sink/package.json index ab170bc529..0767c462e2 100644 --- a/examples/kitchen-sink/package.json +++ b/examples/kitchen-sink/package.json @@ -5,7 +5,7 @@ "type": "module", "packageManager": "pnpm@10.13.1", "scripts": { - "dev": "VITE_RIVET_ENDPOINT=http://127.0.0.1:6420 concurrently -n server,vite \"node --import @rivetkit/sql-loader --import tsx src/server.ts\" \"vite\"", + "dev": "RIVET_KITCHEN_SINK_MODE=serverful VITE_RIVET_ENDPOINT=http://127.0.0.1:6420 concurrently -n server,vite \"node --import @rivetkit/sql-loader --import tsx src/server.ts\" \"vite\"", "dev:serverless": "RIVET_RUN_ENGINE=1 concurrently -n server,vite,configure \"node --import @rivetkit/sql-loader --import tsx src/server.ts\" \"vite\" \"pnpm dev:serverless:configure\"", "dev:serverless:configure": "node -e \"void (async () => { const port = process.env.PORT ?? '3000'; const url = 'http://127.0.0.1:' + port + '/api/rivet/metadata'; const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms)); for (let i = 0; i < 120; i++) { try { const res = await fetch(url); if (res.ok) { console.log('serverless pool configured'); return; } console.log('serverless configuration returned ' + res.status); } catch {} await sleep(1000); } throw new Error('timed out waiting for serverless configuration at ' + url); })();\"", "check-types": "echo 'skipped - workflow history types broken'", diff --git a/examples/kitchen-sink/src/index.ts b/examples/kitchen-sink/src/index.ts index 673e25971d..e1122e1f3d 100644 --- a/examples/kitchen-sink/src/index.ts +++ b/examples/kitchen-sink/src/index.ts @@ -1,4 +1,5 @@ import { setup } from "rivetkit"; +import { resolveMode } from "./mode.ts"; // Counter import { counter } from "./actors/counter/counter.ts"; import { counterConn } from "./actors/counter/counter-conn.ts"; @@ -142,14 +143,16 @@ function numberFromEnv(name: string, fallback: number): number { } function serverlessPoolConfig() { + // Only the local serverless mode self-registers its pool with the engine. + // In the deployed `serverless` mode the pool is configured externally on + // the engine cluster, and the `serverful` mode uses a long-lived runner + // connection rather than a serverless pool. + if (resolveMode() !== "serverless-local") return undefined; + const url = process.env.RIVET_SERVERLESS_URL ?? process.env.KITCHEN_SINK_SERVERLESS_URL ?? - (process.env.RIVET_RUN_ENGINE === "1" - ? "http://127.0.0.1:3000/api/rivet" - : undefined); - - if (!url) return undefined; + "http://127.0.0.1:3000/api/rivet"; return { name: process.env.RIVET_POOL, diff --git a/examples/kitchen-sink/src/mode.ts b/examples/kitchen-sink/src/mode.ts new file mode 100644 index 0000000000..bfb5f284bf --- /dev/null +++ b/examples/kitchen-sink/src/mode.ts @@ -0,0 +1,25 @@ +export type KitchenSinkMode = "serverless" | "serverful" | "serverless-local"; + +export function resolveMode(): KitchenSinkMode { + const explicit = process.env.RIVET_KITCHEN_SINK_MODE; + if ( + explicit === "serverless" || + explicit === "serverful" || + explicit === "serverless-local" + ) { + return explicit; + } + if (explicit !== undefined && explicit !== "") { + throw new Error( + `RIVET_KITCHEN_SINK_MODE must be one of "serverless", "serverful", or "serverless-local" (got "${explicit}")`, + ); + } + + if (process.env.RIVET_RUN_ENGINE === "1") return "serverless-local"; + if (process.env.RIVET_SERVERLESS_URL !== undefined) return "serverless-local"; + if (process.env.KITCHEN_SINK_SERVERLESS_URL !== undefined) { + return "serverless-local"; + } + + return "serverless"; +} diff --git a/examples/kitchen-sink/src/server.ts b/examples/kitchen-sink/src/server.ts index a1317e0841..297a23f397 100644 --- a/examples/kitchen-sink/src/server.ts +++ b/examples/kitchen-sink/src/server.ts @@ -1,4 +1,5 @@ import { registry } from "./index.ts"; +import { resolveMode } from "./mode.ts"; import { serve } from "@hono/node-server"; import { Hono } from "hono"; import type { Server as HttpServer } from "node:http"; @@ -6,10 +7,7 @@ import * as v8 from "node:v8"; const app = new Hono(); const port = Number.parseInt(process.env.PORT ?? "3000", 10); -const serverlessMode = - process.env.RIVET_RUN_ENGINE === "1" || - process.env.RIVET_SERVERLESS_URL !== undefined || - process.env.KITCHEN_SINK_SERVERLESS_URL !== undefined; +const mode = resolveMode(); process.on("exit", (code) => { console.log(JSON.stringify({ kind: "process_exit", code, pid: process.pid })); @@ -150,21 +148,25 @@ app.use("*", async (c, next) => { // ); }); -if (serverlessMode) { +// Only wire the serverless handler in serverless modes. In `serverful` mode +// the runner connects via `registry.start()` and any stray hit on +// `/api/rivet/start` would spin up a second envoy alongside the persistent +// one. +if (mode === "serverful") { + registry.start(); +} else { app.all("/api/rivet/*", (c) => registry.handler(c.req.raw)); app.all("/api/rivet", (c) => registry.handler(c.req.raw)); -} else { - registry.start(); } const server = serve({ fetch: app.fetch, port }, () => { - if (serverlessMode) { + if (mode === "serverful") { console.log( - `serverless RivetKit listening on http://127.0.0.1:${port}/api/rivet`, + `kitchen sink (serverful) listening on http://127.0.0.1:${port}`, ); } else { console.log( - `kitchen sink diagnostics listening on http://127.0.0.1:${port}`, + `kitchen sink (${mode}) listening on http://127.0.0.1:${port}/api/rivet`, ); } }); diff --git a/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs b/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs index b4ec06bd3b..95dfa03814 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs @@ -74,11 +74,6 @@ mod websocket; use inspector::build_actor_inspector; use websocket::is_actor_connect_path; -/// Bound on `handle.shutdown_and_wait` inside `serve_with_config` teardown. -/// Protects against indefinite hangs if the envoy reconnect loop is stuck; -/// the TS/outer-host grace period is the ultimate backstop. -const SHUTDOWN_DRAIN_TIMEOUT: Duration = Duration::from_secs(20); - #[derive(Debug, Default)] pub struct CoreRegistry { factories: HashMap>, @@ -530,10 +525,22 @@ impl CoreRegistry { // trip the `shutdown` token instead. shutdown.cancelled().await; + // TODO: Move into envoy-client since timing out has to do with protocol compliance + // Read threshold from protocol metadata, fall back to 30 min + let stop_threshold = handle + .get_protocol_metadata + .await + .map(|x| x.actor_stop_threshold) + .unwrap_or(30 * 60 * 1000); // Bounded drain. If envoy cannot reach the engine (reconnect loop stuck), // we fall back to immediate `Stop` rather than hanging indefinitely. // The outer host (TS signal handler / Rust binary) is the backstop. - match timeout(SHUTDOWN_DRAIN_TIMEOUT, handle.shutdown_and_wait(false)).await { + match timeout( + Duration::from_millis(SHUTDOWN_DRAIN_TIMEOUT as u64), + handle.shutdown_and_wait(false), + ) + .await + { Ok(()) => {} Err(_) => { tracing::warn!("envoy shutdown drain exceeded timeout; forcing immediate stop"); @@ -718,11 +725,8 @@ impl RegistryDispatcher { let (start_tx, start_rx) = oneshot::channel(); let result: Result> = async { - try_send_lifecycle_command( - &lifecycle_tx, - LifecycleCommand::Start { reply: start_tx }, - ) - .context("send actor task start command")?; + try_send_lifecycle_command(&lifecycle_tx, LifecycleCommand::Start { reply: start_tx }) + .context("send actor task start command")?; start_rx .await .context("receive actor task start reply")? diff --git a/rivetkit-typescript/packages/rivetkit-napi/index.d.ts b/rivetkit-typescript/packages/rivetkit-napi/index.d.ts index 8ad23ac0ae..04d76e10ed 100644 --- a/rivetkit-typescript/packages/rivetkit-napi/index.d.ts +++ b/rivetkit-typescript/packages/rivetkit-napi/index.d.ts @@ -314,7 +314,7 @@ export declare class CoreRegistry { * Idempotent. Safe to call when neither mode has been activated. * Does not block on the `serve()` future; TS awaits that promise * separately to avoid re-entrancy. - */ + */ shutdown(): Promise health(): Promise metadata(): JsRegistryRouteResponse