From 31a9bd4ba0c4e9bceb4c57291cdda1c893f421bb Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Mon, 11 May 2026 05:05:09 -0700 Subject: [PATCH] feat(kitchen-sink): add load testing harness --- examples/kitchen-sink/CLAUDE.md | 28 +- examples/kitchen-sink/Dockerfile.local | 9 + examples/kitchen-sink/package.json | 1 + .../kitchen-sink/scripts/counter-latency.ts | 955 ++++++++++++++---- .../src/actors/http/tunnel-stress.ts | 62 ++ .../src/actors/testing/load-test-agent.ts | 191 ++++ examples/kitchen-sink/src/index.ts | 6 + package.json | 1 + .../docker/build-push-kitchen-sink-local.sh | 134 +++ 9 files changed, 1200 insertions(+), 187 deletions(-) create mode 100644 examples/kitchen-sink/Dockerfile.local create mode 100644 examples/kitchen-sink/src/actors/http/tunnel-stress.ts create mode 100644 examples/kitchen-sink/src/actors/testing/load-test-agent.ts create mode 100755 scripts/docker/build-push-kitchen-sink-local.sh diff --git a/examples/kitchen-sink/CLAUDE.md b/examples/kitchen-sink/CLAUDE.md index e1c9ca1ddd..7437a1eae9 100644 --- a/examples/kitchen-sink/CLAUDE.md +++ b/examples/kitchen-sink/CLAUDE.md @@ -4,29 +4,23 @@ ### Cloud Run Deploys -- Deploy the kitchen-sink to Cloud Run from an isolated temp build context that pins the published `rivetkit` preview version, so root workspace `resolutions` do not silently swap in local packages. -- Copy `examples/kitchen-sink` to a temp directory and edit that temp copy instead of building from the monorepo root. -- Pin the temp copy to the exact published preview packages you want to test, such as `rivetkit@0.0.0-pr.4667.33279e9` and `@rivetkit/react@0.0.0-pr.4667.33279e9`. -- Build and push the image from that temp context, then update the target Cloud Run service to that image. -- Do not build the repo workspace directly when validating a published preview package, because the root `package.json` `resolutions` will route the app back to local workspace packages. +- Deploy local kitchen-sink builds with `scripts/docker/build-push-kitchen-sink-local.sh`; it builds RivetKit packages with Turbo on the host, packs workspace packages into tarballs, installs a portable app `node_modules`, copies the built NAPI `.node`, and Docker copies only the prepared app. +- Use `PUSH=0` for local image smoke tests and default `PUSH=1` for staging deploy images. +- After building and pushing, update Cloud Run service `kitchen-sink-staging` in project `dev-projects-491221`, region `us-east4`, to the pushed image tag. +- Verify staging with `curl -fsS "$(gcloud run services describe kitchen-sink-staging --region us-east4 --project dev-projects-491221 --format='value(status.url)')/api/rivet/metadata"`. +- Only use the old temp-copy preview-package flow when explicitly validating already-published npm preview packages instead of local workspace code. Example flow: ```bash -# 1. Copy the kitchen-sink out of the workspace. -cp -R examples/kitchen-sink /tmp/kitchen-sink-cloud-run +# 1. Build and push a host-built local workspace image. +./scripts/docker/build-push-kitchen-sink-local.sh -# 2. Edit /tmp/kitchen-sink-cloud-run/package.json to pin the published preview packages. - -# 3. Build and push the image from the temp context. -docker build -t us-east4-docker.pkg.dev///: /tmp/kitchen-sink-cloud-run -docker push us-east4-docker.pkg.dev///: - -# 4. Point Cloud Run at that image. -gcloud run services update \ +# 2. Point Cloud Run at that image. +gcloud run services update kitchen-sink-staging \ --region us-east4 \ - --project \ - --image us-east4-docker.pkg.dev///: + --project dev-projects-491221 \ + --image "us-east4-docker.pkg.dev/dev-projects-491221/cloud-run-source-deploy/rivet-dev-rivet/rivet-kitchen-sink:$(git rev-parse HEAD)" ``` The kitchen-sink is deployed on Railway via Rivet Cloud. To test actors and inspect their SQLite databases, use the Rivet gateway API. diff --git a/examples/kitchen-sink/Dockerfile.local b/examples/kitchen-sink/Dockerfile.local new file mode 100644 index 0000000000..9818cfa3bf --- /dev/null +++ b/examples/kitchen-sink/Dockerfile.local @@ -0,0 +1,9 @@ +FROM node:22-slim + +WORKDIR /app +ENV NODE_OPTIONS=--max-old-space-size=7168 + +COPY app/ ./ + +EXPOSE 8080 +CMD ["node", "--import", "@rivetkit/sql-loader", "--import", "tsx", "src/server.ts"] diff --git a/examples/kitchen-sink/package.json b/examples/kitchen-sink/package.json index acb2ac47df..ab170bc529 100644 --- a/examples/kitchen-sink/package.json +++ b/examples/kitchen-sink/package.json @@ -17,6 +17,7 @@ "smoke:raw-websocket-serverless": "tsx scripts/raw-websocket-serverless-smoke.ts", "fuzz:sleep-close": "tsx scripts/sleep-close-fuzz.ts", "mock-agentic-loop": "tsx scripts/mock-agentic-loop.ts", + "counter-latency": "tsx scripts/counter-latency.ts", "benchmark": "tsx scripts/benchmark.ts", "db:generate": "find src/actors -name drizzle.config.ts -exec drizzle-kit generate --config {} \\;" }, diff --git a/examples/kitchen-sink/scripts/counter-latency.ts b/examples/kitchen-sink/scripts/counter-latency.ts index f60105f92b..9b3cc11ee3 100644 --- a/examples/kitchen-sink/scripts/counter-latency.ts +++ b/examples/kitchen-sink/scripts/counter-latency.ts @@ -1,130 +1,423 @@ // Counter-latency mini load test. // -// Two modes: +// Subcommands: // -// rtt Every --interval ms, spawn a background worker that: -// 1. Generates a new key -// 2. handle = client.counter.getOrCreate([key]) -// 3. connection = handle.connect({ skipReadyWait: true }) -// 4. measures connect (via no-op ws roundtrip), first increment, second increment -// Workers run concurrently by default; the interval does not wait on prior workers. -// Set SERIAL=1 to force serial execution. +// rtt Every --interval ms, spawn a background worker that: +// 1. Generates a new key +// 2. handle = client.counter.getOrCreate([key]) +// 3. connection = handle.connect() +// 4. measures connect, first increment, second increment +// Workers run concurrently by default. Set SERIAL=1 to force serial execution. // -// concurrent Ramps up to --concurrency persistent connections (one new connection -// every --interval ms). Each connection holds open and increments once -// every 10s. On disconnect, immediately reconnects, logging connect -// time again. Color codes by elapsed ms: < 800 green, 800-1500 orange, > 1500 red. +// concurrent Ramps up to persistent raw WebSocket tunnel-stress actors. // -// Usage: -// tsx scripts/counter-latency.ts --mode --interval [--concurrency N] -// -// --mode "rtt" or "concurrent" (required) -// --interval gap in ms between worker starts (rtt) or ramp-up between connections (concurrent) -// --concurrency number of persistent connections (concurrent mode only) +// agent-concurrent Ramps up to persistent load-test agent actors. Each worker sends an +// inference message every --message-interval-ms. The actor inserts +// tokens into SQLite and streams one token event per insert. // -// BATCHES total workers spawned before exit in rtt mode (default infinite) -// SERIAL "1" / "true" to await each worker before the next in rtt mode (default off) -// -// Examples: -// tsx scripts/counter-latency.ts --mode rtt --interval 1000 \ -// "http://default:TOKEN@34.110.160.16:80" +// Usage: +// tsx scripts/counter-latency.ts rtt -i +// tsx scripts/counter-latency.ts concurrent [options] +// tsx scripts/counter-latency.ts agent-concurrent [options] // -// tsx scripts/counter-latency.ts --mode concurrent --interval 50 --concurrency 100 \ -// "http://default:TOKEN@34.110.160.16:80" +// BATCHES total workers spawned before exit in rtt mode. Default: infinite. +// SERIAL "1" / "true" to await each worker before the next in rtt mode. +import { parseArgs } from "node:util"; import { createClient } from "rivetkit/client"; import type { registry } from "../src/index.ts"; -interface Args { - mode: "rtt" | "concurrent"; +interface RttArgs { + mode: "rtt"; interval: number; - concurrency?: number; endpoint: string; } -function parseArgs(argv: string[]): Args { - let mode: string | undefined; - let interval: number | undefined; - let concurrency: number | undefined; - let endpoint: string | undefined; - - for (let i = 0; i < argv.length; i++) { - const arg = argv[i]; - if (arg === "--mode") { - mode = argv[++i]; - } else if (arg === "--interval") { - interval = Number(argv[++i]); - } else if (arg === "--concurrency") { - concurrency = Number(argv[++i]); - } else if (arg === "--help" || arg === "-h") { - usage(); - process.exit(0); - } else if (arg.startsWith("--")) { - console.error(`unknown arg: ${arg}`); - usage(); - process.exit(1); - } else if (!endpoint) { - endpoint = arg; - } else { - console.error(`unexpected positional arg: ${arg}`); - usage(); - process.exit(1); - } - } +interface ConcurrentArgs { + mode: "concurrent" | "agent-concurrent"; + interval: number; + concurrency: number; + messageInterval: number; + showMessages: boolean; + tokensPerSecond: number; + durationMs: number; + endpoint: string; +} - if (!mode) { - console.error("--mode is required"); - usage(); +type Args = RttArgs | ConcurrentArgs; + +type WorkerHealth = "pending" | "healthy" | "warning" | "ended"; + +interface Sample { + worker: number; + key: string; + connectMs: number; + firstMs: number; + secondMs: number; + totalMs: number; + actorId?: string; + error?: string; +} + +interface TunnelWebSocket { + readyState: number; + send(data: string): void; + close(code?: number, reason?: string): void; + addEventListener( + type: "open" | "message" | "close" | "error", + listener: (event: any) => void, + options?: { once?: boolean }, + ): void; +} + +interface ConcurrentWorkload { + keyPrefix: string; + resolveActorId(handle: unknown): Promise; + openWebSocket(actorId: string, key: string): Promise; + onOpen( + ws: TunnelWebSocket, + worker: number, + key: string, + options: ConcurrentWorkerOptions, + ): () => void; +} + +interface ConcurrentWorkerOptions { + messageInterval: number; + showMessages: boolean; + tokensPerSecond: number; + durationMs: number; +} + +const DEFAULT_CONCURRENCY = 1_000; +const DEFAULT_CONCURRENT_INTERVAL_MS = 300; +const DEFAULT_MESSAGE_INTERVAL_MS = 1_000; +const DEFAULT_AGENT_MESSAGE_INTERVAL_MS = 30_000; +const DEFAULT_TOKENS_PER_SECOND = 20; +const DEFAULT_DURATION_MS = 5_000; +const MESSAGE_GAP_WARN_MS = 3_000; +const ACTOR_STOPPED_CLOSE_CODE = 1000; +const ACTOR_STOPPED_CLOSE_REASON = "hack_force_close"; + +const ANSI = { + reset: "\x1b[0m", + green: "\x1b[38;2;0;255;0m", + red: "\x1b[38;2;255;0;0m", + yellow: "\x1b[38;2;255;200;0m", + blue: "\x1b[38;2;80;160;255m", + dim: "\x1b[2m", + bold: "\x1b[1m", +}; + +const COLOR_MIN_MS = 800; +const COLOR_MAX_MS = 2_000; + +function usage(): void { + console.error( + "usage:\n" + + " tsx scripts/counter-latency.ts rtt -i \n" + + " tsx scripts/counter-latency.ts concurrent [options] \n" + + " tsx scripts/counter-latency.ts agent-concurrent [options] \n" + + "\n" + + "subcommands:\n" + + " rtt spawn fresh counter actors and measure action RTTs\n" + + " concurrent ramp persistent raw WebSocket tunnel-stress actors\n" + + " agent-concurrent ramp persistent SQLite-backed agent actors\n" + + "\n" + + " -h, --help show usage", + ); +} + +function rttUsage(): void { + console.error( + "usage: tsx scripts/counter-latency.ts rtt -i \n" + + " -i, --interval gap in ms between worker starts (required)\n" + + " -h, --help show usage", + ); +} + +function concurrentUsage(mode: "concurrent" | "agent-concurrent"): void { + const agentOptions = + mode === "agent-concurrent" + ? " --tokens-per-second SQLite token inserts per second (default 20)\n" + + " --duration-ms inference stream duration in ms (default 5000)\n" + : ""; + const messageIntervalDefault = + mode === "agent-concurrent" ? "30000" : "1000"; + console.error( + `usage: tsx scripts/counter-latency.ts ${mode} [options] \n` + + " -i, --interval ramp-up gap in ms between connections (default 300)\n" + + " -c, --concurrency number of persistent connections (default 1000)\n" + + ` --message-interval-ms gap between client messages (default ${messageIntervalDefault})\n` + + agentOptions + + " --show-messages log all received WebSocket messages\n" + + " -h, --help show usage", + ); +} + +function parseRequiredMs(value: string | undefined, name: string): number { + const parsed = Number(value); + if (value === undefined || !Number.isFinite(parsed) || parsed < 0) { + console.error(`${name} is required (ms, >= 0)`); process.exit(1); } - if (mode !== "rtt" && mode !== "concurrent") { - console.error(`--mode must be "rtt" or "concurrent", got: ${mode}`); + return parsed; +} + +function parseOptionalMs( + value: string | undefined, + name: string, + defaultValue: number, +): number { + if (value === undefined) return defaultValue; + const parsed = Number(value); + if (!Number.isFinite(parsed) || parsed < 0) { + console.error(`${name} must be ms, >= 0`); process.exit(1); } - if (interval === undefined || !Number.isFinite(interval) || interval < 0) { - console.error("--interval is required (ms, >= 0)"); + return parsed; +} + +function parseOptionalPositiveNumber( + value: string | undefined, + name: string, + defaultValue: number, +): number { + if (value === undefined) return defaultValue; + const parsed = Number(value); + if (!Number.isFinite(parsed) || parsed <= 0) { + console.error(`${name} must be a positive number`); process.exit(1); } - if (mode === "concurrent") { - if ( - concurrency === undefined || - !Number.isFinite(concurrency) || - concurrency < 1 - ) { - console.error("--concurrency is required for mode=concurrent (>= 1)"); - process.exit(1); - } + return parsed; +} + +function parseOptionalCount( + value: string | undefined, + name: string, + defaultValue: number, +): number { + if (value === undefined) return defaultValue; + const parsed = Number(value); + if (!Number.isFinite(parsed) || !Number.isInteger(parsed) || parsed < 1) { + console.error(`${name} must be an integer, >= 1`); + process.exit(1); } - if (!endpoint) { + return parsed; +} + +function parseEndpoint(positionals: string[], usageFn: () => void): string { + if (positionals.length === 0) { console.error("endpoint is required"); - usage(); + usageFn(); process.exit(1); } + if (positionals.length > 1) { + console.error( + `unexpected positional args: ${positionals.slice(1).join(" ")}`, + ); + usageFn(); + process.exit(1); + } + return positionals[0]; +} - return { mode, interval, concurrency, endpoint }; +function parseRttArgs(argv: string[]): RttArgs { + let parsed: ReturnType< + typeof parseArgs<{ + options: { + interval: { type: "string"; short: "i" }; + help: { type: "boolean"; short: "h" }; + }; + allowPositionals: true; + }> + >; + try { + parsed = parseArgs({ + args: argv, + options: { + interval: { type: "string", short: "i" }, + help: { type: "boolean", short: "h" }, + }, + allowPositionals: true, + }); + } catch (err) { + console.error(err instanceof Error ? err.message : String(err)); + rttUsage(); + process.exit(1); + } + + const { values, positionals } = parsed; + if (values.help) { + rttUsage(); + process.exit(0); + } + + return { + mode: "rtt", + interval: parseRequiredMs(values.interval, "--interval"), + endpoint: parseEndpoint(positionals, rttUsage), + }; } -function usage(): void { - console.error( - "usage: tsx scripts/counter-latency.ts --mode --interval [--concurrency N] ", - ); +function parseConcurrentArgs( + mode: "concurrent" | "agent-concurrent", + argv: string[], +): ConcurrentArgs { + let parsed: ReturnType< + typeof parseArgs<{ + options: { + interval: { type: "string"; short: "i" }; + concurrency: { type: "string"; short: "c" }; + "increment-interval": { type: "string" }; + "message-interval-ms": { type: "string" }; + "show-increments": { type: "boolean" }; + "show-messages": { type: "boolean" }; + "tokens-per-second": { type: "string" }; + "duration-ms": { type: "string" }; + help: { type: "boolean"; short: "h" }; + }; + allowPositionals: true; + }> + >; + try { + parsed = parseArgs({ + args: argv, + options: { + interval: { type: "string", short: "i" }, + concurrency: { type: "string", short: "c" }, + "increment-interval": { type: "string" }, + "message-interval-ms": { type: "string" }, + "show-increments": { type: "boolean" }, + "show-messages": { type: "boolean" }, + "tokens-per-second": { type: "string" }, + "duration-ms": { type: "string" }, + help: { type: "boolean", short: "h" }, + }, + allowPositionals: true, + }); + } catch (err) { + console.error(err instanceof Error ? err.message : String(err)); + concurrentUsage(mode); + process.exit(1); + } + + const { values, positionals } = parsed; + if (values.help) { + concurrentUsage(mode); + process.exit(0); + } + + const defaultMessageInterval = + mode === "agent-concurrent" + ? DEFAULT_AGENT_MESSAGE_INTERVAL_MS + : DEFAULT_MESSAGE_INTERVAL_MS; + + return { + mode, + interval: parseOptionalMs( + values.interval, + "--interval", + DEFAULT_CONCURRENT_INTERVAL_MS, + ), + concurrency: parseOptionalCount( + values.concurrency, + "--concurrency", + DEFAULT_CONCURRENCY, + ), + messageInterval: parseOptionalMs( + values["message-interval-ms"] ?? values["increment-interval"], + "--message-interval-ms", + defaultMessageInterval, + ), + showMessages: + values["show-messages"] === true || + values["show-increments"] === true, + tokensPerSecond: parseOptionalPositiveNumber( + values["tokens-per-second"], + "--tokens-per-second", + DEFAULT_TOKENS_PER_SECOND, + ), + durationMs: parseOptionalMs( + values["duration-ms"], + "--duration-ms", + DEFAULT_DURATION_MS, + ), + endpoint: parseEndpoint(positionals, () => concurrentUsage(mode)), + }; } -const ARGS = parseArgs(process.argv.slice(2)); -const BATCHES = Number(process.env.BATCHES ?? "0"); // 0 = infinite (rtt mode) -const SERIAL = ((v) => v === "1" || v === "true")(process.env.SERIAL ?? ""); -const INCREMENT_INTERVAL_MS = 10_000; +function parseCliArgs(argv: string[]): Args { + const [command, ...rest] = argv; -const ANSI = { - reset: "\x1b[0m", - green: "\x1b[38;2;0;255;0m", - red: "\x1b[38;2;255;0;0m", - dim: "\x1b[2m", - bold: "\x1b[1m", + if (command === undefined || command === "--help" || command === "-h") { + usage(); + process.exit(command === undefined ? 1 : 0); + } + if (command === "rtt") return parseRttArgs(rest); + if (command === "concurrent" || command === "agent-concurrent") { + return parseConcurrentArgs(command, rest); + } + + console.error(`unknown subcommand: ${command}`); + usage(); + process.exit(1); +} + +const ARGS = parseCliArgs(process.argv.slice(2)); +const BATCHES = Number(process.env.BATCHES ?? "0"); +const SERIAL = ((v) => v === "1" || v === "true")(process.env.SERIAL ?? ""); +const RUN_FOR_MS = parseOptionalMs( + process.env.RUN_FOR_MS, + "RUN_FOR_MS", + 0, +); + +let concurrentWorkersStarted = 0; +let stoppingConcurrentWorkers = false; +const concurrentStats = { + connects: 0, + reconnects: 0, + firstMessages: 0, + connectErrors: 0, + websocketErrors: 0, + disconnects: 0, + messageGaps: 0, + uncleanFailuresOrDisconnects: 0, }; +const workerHealth = new Map(); +const workerSockets = new Set(); -const COLOR_MIN_MS = 800; -const COLOR_MAX_MS = 2000; +const client = createClient(ARGS.endpoint); + +function setWorkerHealth(worker: number, state: WorkerHealth): void { + workerHealth.set(worker, state); +} + +function flagWorkerWarning(worker: number): void { + if (workerHealth.get(worker) === "healthy") { + workerHealth.set(worker, "warning"); + } +} + +function countWorkerHealth(): { + pending: number; + healthy: number; + warning: number; + ended: number; +} { + let pending = 0; + let healthy = 0; + let warning = 0; + let ended = 0; + for (const s of workerHealth.values()) { + if (s === "pending") pending++; + else if (s === "healthy") healthy++; + else if (s === "warning") warning++; + else if (s === "ended") ended++; + } + return { pending, healthy, warning, ended }; +} function gradientColor(ms: number): string { const clamped = Math.max(COLOR_MIN_MS, Math.min(COLOR_MAX_MS, ms)); @@ -150,20 +443,37 @@ function pad(s: string, n: number): string { return s.length >= n ? s : s + " ".repeat(n - s.length); } -function sleep(ms: number): Promise { - return new Promise((resolve) => setTimeout(resolve, ms)); +function formatActor(actorId: string | undefined): string { + return actorId ? ` actor=${actorId}` : ""; } -const client = createClient(ARGS.endpoint); +function requireActorId(connection: { actorId?: string }): string { + if (!connection.actorId) { + throw new Error("connection actorId missing after connect"); + } + return connection.actorId; +} -interface Sample { - worker: number; - key: string; - connectMs: number; - firstMs: number; - secondMs: number; - totalMs: number; - error?: string; +function logPrefix(_worker: number): string { + const ts = new Date().toISOString(); + if (ARGS.mode === "rtt") { + return `${ANSI.dim}${ts}${ANSI.reset}`; + } + + const { pending, healthy, warning, ended } = countWorkerHealth(); + const width = String(ARGS.concurrency).length; + const padNumber = (n: number) => String(n).padStart(width); + const concurrencyPart = `c=${padNumber(concurrentWorkersStarted)}/${ARGS.concurrency}`; + const pendingPart = `${ANSI.blue}${padNumber(pending)}${ANSI.reset}`; + const healthyPart = `${ANSI.green}${padNumber(healthy)}${ANSI.reset}`; + const warningPart = `${ANSI.yellow}${padNumber(warning)}${ANSI.reset}`; + const endedPart = `${ANSI.red}${padNumber(ended)}${ANSI.reset}`; + const statusPart = `s=${pendingPart}/${healthyPart}/${warningPart}/${endedPart}`; + return `${ANSI.dim}${ts}${ANSI.reset} [${concurrencyPart} ${statusPart}]`; +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); } async function runRttWorker(worker: number): Promise { @@ -172,10 +482,8 @@ async function runRttWorker(worker: number): Promise { const t0 = performance.now(); try { const handle = client.counter.getOrCreate([key]); - const connection = handle.connect({ skipReadyWait: true }); + const connection = handle.connect(); - // Probe ws open with a no-op so connect time is measured separately - // from the first user-visible action. await connection.noop(); const tConnect = performance.now(); @@ -185,7 +493,7 @@ async function runRttWorker(worker: number): Promise { await connection.increment(1); const tSecond = performance.now(); - // Best-effort cleanup; do not block measurements. + const actorId = requireActorId(connection as { actorId?: string }); void connection.dispose().catch(() => {}); return { @@ -195,6 +503,7 @@ async function runRttWorker(worker: number): Promise { firstMs: tFirst - tConnect, secondMs: tSecond - tFirst, totalMs: tSecond - t0, + actorId, }; } catch (err) { const tEnd = performance.now(); @@ -211,8 +520,7 @@ async function runRttWorker(worker: number): Promise { } function printRttSample(s: Sample): void { - const ts = new Date().toISOString(); - const prefix = `${ANSI.dim}${ts}${ANSI.reset} [w=${String(s.worker).padStart(5)}]`; + const prefix = logPrefix(s.worker); if (s.error) { console.log( `${prefix} ${pad(s.key, 32)} ${ANSI.red}ERROR ${s.error}${ANSI.reset} (${colorMs(s.totalMs)})`, @@ -220,90 +528,359 @@ function printRttSample(s: Sample): void { return; } console.log( - `${prefix} ${pad(s.key, 32)} connect=${colorMs(s.connectMs)} first=${colorMs(s.firstMs)} second=${colorMs(s.secondMs)} total=${colorMs(s.totalMs)}`, + `${prefix} ${pad(s.key, 32)}${formatActor(s.actorId)} connect=${colorMs(s.connectMs)} first=${colorMs(s.firstMs)} second=${colorMs(s.secondMs)} total=${colorMs(s.totalMs)}`, ); } -function logConnect(worker: number, key: string, connectMs: number): void { - const ts = new Date().toISOString(); - const prefix = `${ANSI.dim}${ts}${ANSI.reset} [w=${String(worker).padStart(5)}]`; - console.log(`${prefix} ${pad(key, 32)} connect=${colorMs(connectMs)}`); +function logConnect( + worker: number, + key: string, + actorId: string | undefined, + connectMs: number, + reconnect: boolean, +): void { + concurrentStats.connects += 1; + if (reconnect) concurrentStats.reconnects += 1; + setWorkerHealth(worker, "healthy"); + const prefix = logPrefix(worker); + const label = reconnect ? "reconnect" : "connect"; + console.log( + `${prefix} ${pad(key, 32)}${formatActor(actorId)} ${label}=${colorMs(connectMs)}`, + ); } -function logIncrement(worker: number, key: string, incrementMs: number): void { - const ts = new Date().toISOString(); - const prefix = `${ANSI.dim}${ts}${ANSI.reset} [w=${String(worker).padStart(5)}]`; - console.log(`${prefix} ${pad(key, 32)} increment=${colorMs(incrementMs)}`); +function logFirstMessage( + worker: number, + key: string, + actorId: string | undefined, + firstMessageMs: number, +): void { + concurrentStats.firstMessages += 1; + const prefix = logPrefix(worker); + console.log( + `${prefix} ${pad(key, 32)}${formatActor(actorId)} first-message=${colorMs(firstMessageMs)}`, + ); } -function logDisconnect(worker: number, key: string, reason: string): void { - const ts = new Date().toISOString(); - const prefix = `${ANSI.dim}${ts}${ANSI.reset} [w=${String(worker).padStart(5)}]`; +function logDisconnect( + worker: number, + key: string, + actorId: string | undefined, + reason: string, + unclean = true, +): void { + concurrentStats.disconnects += 1; + if (unclean) { + concurrentStats.uncleanFailuresOrDisconnects += 1; + } + setWorkerHealth(worker, "ended"); + const prefix = logPrefix(worker); + const label = unclean ? `${ANSI.red}DISCONNECT` : `${ANSI.dim}disconnect`; + console.log( + `${prefix} ${pad(key, 32)}${formatActor(actorId)} ${label} ${reason}${ANSI.reset}`, + ); +} + +function logReconnect( + worker: number, + key: string, + actorId: string | undefined, + code: number, + reason: string, +): void { + setWorkerHealth(worker, "pending"); + const prefix = logPrefix(worker); + console.log( + `${prefix} ${pad(key, 32)}${formatActor(actorId)} actor-stopped reconnect code=${code} reason=${reason}`, + ); +} + +function logMessageGap( + worker: number, + key: string, + actorId: string | undefined, + gapMs: number, +): void { + concurrentStats.messageGaps += 1; + concurrentStats.uncleanFailuresOrDisconnects += 1; + flagWorkerWarning(worker); + const prefix = logPrefix(worker); console.log( - `${prefix} ${pad(key, 32)} ${ANSI.red}DISCONNECT ${reason}${ANSI.reset}`, + `${prefix} ${pad(key, 32)}${formatActor(actorId)} ${ANSI.red}MESSAGE-GAP ${colorMs(gapMs)}${ANSI.reset}`, ); } function logConnectError( worker: number, key: string, + actorId: string | undefined, elapsedMs: number, reason: string, ): void { - const ts = new Date().toISOString(); - const prefix = `${ANSI.dim}${ts}${ANSI.reset} [w=${String(worker).padStart(5)}]`; + concurrentStats.connectErrors += 1; + concurrentStats.uncleanFailuresOrDisconnects += 1; + setWorkerHealth(worker, "ended"); + const prefix = logPrefix(worker); + console.log( + `${prefix} ${pad(key, 32)}${formatActor(actorId)} ${ANSI.red}CONNECT-ERROR ${reason}${ANSI.reset} (${colorMs(elapsedMs)})`, + ); +} + +function logWebSocketError( + worker: number, + key: string, + actorId: string | undefined, +): void { + concurrentStats.websocketErrors += 1; + concurrentStats.uncleanFailuresOrDisconnects += 1; + flagWorkerWarning(worker); + const prefix = logPrefix(worker); console.log( - `${prefix} ${pad(key, 32)} ${ANSI.red}CONNECT-ERROR ${reason}${ANSI.reset} (${colorMs(elapsedMs)})`, + `${prefix} ${pad(key, 32)}${formatActor(actorId)} ${ANSI.red}WEBSOCKET-ERROR${ANSI.reset}`, ); } -async function runConcurrentWorker(worker: number): Promise { - const key = `cl-c-${worker}-${Date.now().toString(36)}`; +function printConcurrentSummary(reason: string): void { + if (ARGS.mode === "rtt") return; - while (true) { + const { pending, healthy, warning, ended } = countWorkerHealth(); + console.log( + `${ANSI.bold}counter-latency summary${ANSI.reset} reason=${reason} c=${concurrentWorkersStarted}/${ARGS.concurrency} s=${ANSI.blue}${pending}${ANSI.reset}/${ANSI.green}${healthy}${ANSI.reset}/${ANSI.yellow}${warning}${ANSI.reset}/${ANSI.red}${ended}${ANSI.reset} disconnects=${concurrentStats.disconnects} connect-errors=${concurrentStats.connectErrors} websocket-errors=${concurrentStats.websocketErrors} message-gaps=${concurrentStats.messageGaps} connects=${concurrentStats.connects} reconnects=${concurrentStats.reconnects} first-messages=${concurrentStats.firstMessages}`, + ); +} + +function closeConcurrentWorkers(): void { + stoppingConcurrentWorkers = true; + for (const ws of workerSockets) { + if (ws.readyState <= 1) { + ws.close(1000, "counter-latency complete"); + } + } +} + +process.once("SIGINT", () => { + closeConcurrentWorkers(); + printConcurrentSummary("sigint"); + process.exit(130); +}); + +process.once("SIGTERM", () => { + closeConcurrentWorkers(); + printConcurrentSummary("sigterm"); + process.exit(143); +}); + +async function waitForOpen(ws: TunnelWebSocket): Promise { + if (ws.readyState === 1) return; + + await new Promise((resolve, reject) => { + ws.addEventListener("open", () => resolve(), { once: true }); + ws.addEventListener("error", () => reject(new Error("websocket error")), { + once: true, + }); + ws.addEventListener( + "close", + (event) => + reject( + new Error( + `websocket closed before open code=${event.code} reason=${event.reason}`, + ), + ), + { once: true }, + ); + }); +} + +function eventDataToString(data: unknown): string { + if (typeof data === "string") return data; + if (data instanceof ArrayBuffer) return ``; + if (ArrayBuffer.isView(data)) return ``; + return String(data); +} + +function isActorStoppedClose(event: { code: number; reason?: string }): boolean { + return ( + event.code === ACTOR_STOPPED_CLOSE_CODE && + event.reason === ACTOR_STOPPED_CLOSE_REASON + ); +} + +function makeTunnelStressWorkload(): ConcurrentWorkload { + return { + keyPrefix: "cl-t", + async resolveActorId(handle: unknown) { + return await (handle as ReturnType).resolve(); + }, + async openWebSocket(actorId: string, key: string) { + const handle = actorId + ? client.tunnelStress.getForId(actorId) + : client.tunnelStress.getOrCreate([key]); + return (await handle.webSocket()) as TunnelWebSocket; + }, + onOpen(ws, _worker, _key, options) { + let sequence = 0; + const interval = setInterval(() => { + if (ws.readyState !== 1) return; + sequence += 1; + ws.send( + JSON.stringify({ + sequence, + timestamp: Date.now(), + }), + ); + }, options.messageInterval); + return () => clearInterval(interval); + }, + }; +} + +function makeAgentWorkload(): ConcurrentWorkload { + return { + keyPrefix: "cl-a", + async resolveActorId(handle: unknown) { + return await (handle as ReturnType).resolve(); + }, + async openWebSocket(actorId: string, key: string) { + const handle = actorId + ? client.loadTestAgent.getForId(actorId) + : client.loadTestAgent.getOrCreate([key]); + return (await handle.webSocket()) as TunnelWebSocket; + }, + onOpen(ws, worker, _key, options) { + let sequence = 0; + const sendInference = () => { + if (ws.readyState !== 1) return; + sequence += 1; + ws.send( + JSON.stringify({ + type: "inference", + requestId: `agent-${worker}-${Date.now().toString(36)}-${sequence}`, + tokensPerSecond: options.tokensPerSecond, + durationMs: options.durationMs, + }), + ); + }; + + sendInference(); + const interval = setInterval(sendInference, options.messageInterval); + return () => clearInterval(interval); + }, + }; +} + +async function runConcurrentWorker( + worker: number, + workload: ConcurrentWorkload, + options: ConcurrentWorkerOptions, +): Promise { + const key = `${workload.keyPrefix}-${worker}-${Date.now().toString(36)}`; + let actorId: string | undefined; + let reconnect = false; + + while (!stoppingConcurrentWorkers) { const t0 = performance.now(); - let connection: ReturnType< - ReturnType["connect"] - > | null = null; + let cleanup: (() => void) | undefined; + let sawWebSocketError = false; + try { - const handle = client.counter.getOrCreate([key]); - connection = handle.connect({ skipReadyWait: true }); + if (!actorId) { + const handle = + ARGS.mode === "agent-concurrent" + ? client.loadTestAgent.getOrCreate([key]) + : client.tunnelStress.getOrCreate([key]); + actorId = await workload.resolveActorId(handle); + } - // Probe ws open with a no-op to measure connect time. - await connection.noop(); + const ws = await workload.openWebSocket(actorId, key); + workerSockets.add(ws); + await waitForOpen(ws); const connectMs = performance.now() - t0; - logConnect(worker, key, connectMs); - - // Hold open and increment every INCREMENT_INTERVAL_MS. - while (true) { - await sleep(INCREMENT_INTERVAL_MS); - const incStart = performance.now(); - try { - await connection.increment(1); - const incMs = performance.now() - incStart; - logIncrement(worker, key, incMs); - } catch (err) { - logDisconnect( - worker, - key, - err instanceof Error ? err.message : String(err), - ); - break; - } + logConnect(worker, key, actorId, connectMs, reconnect); + reconnect = false; + + await new Promise((resolve) => { + let firstMessageLogged = false; + let lastMessageAt = 0; + let settled = false; + const settle = () => { + if (settled) return; + settled = true; + resolve(); + }; + + cleanup = workload.onOpen(ws, worker, key, options); + + ws.addEventListener("message", (event) => { + const now = performance.now(); + if (!firstMessageLogged) { + firstMessageLogged = true; + logFirstMessage(worker, key, actorId, now - t0); + } else if ( + lastMessageAt > 0 && + now - lastMessageAt > MESSAGE_GAP_WARN_MS + ) { + logMessageGap(worker, key, actorId, now - lastMessageAt); + } + lastMessageAt = now; + if (options.showMessages) { + const prefix = logPrefix(worker); + console.log( + `${prefix} ${pad(key, 32)}${formatActor(actorId)} message=${eventDataToString(event.data)}`, + ); + } + }); + ws.addEventListener( + "close", + (event) => { + if (settled) return; + workerSockets.delete(ws); + cleanup?.(); + if ( + !stoppingConcurrentWorkers && + !sawWebSocketError && + isActorStoppedClose(event) + ) { + logReconnect(worker, key, actorId, event.code, event.reason); + reconnect = true; + } else { + logDisconnect( + worker, + key, + actorId, + `code=${event.code} reason=${event.reason}`, + !stoppingConcurrentWorkers, + ); + } + settle(); + }, + { once: true }, + ); + ws.addEventListener("error", () => { + sawWebSocketError = true; + logWebSocketError(worker, key, actorId); + workerSockets.delete(ws); + cleanup?.(); + settle(); + }); + }); + if (sawWebSocketError) { + setWorkerHealth(worker, "ended"); } } catch (err) { const elapsed = performance.now() - t0; logConnectError( worker, key, + actorId, elapsed, err instanceof Error ? err.message : String(err), ); + break; } finally { - if (connection) { - void connection.dispose().catch(() => {}); - } + cleanup?.(); } + + if (!reconnect) break; } } @@ -329,16 +906,45 @@ async function runRttMode(): Promise { } async function runConcurrentMode(): Promise { - const concurrency = ARGS.concurrency!; + if (ARGS.mode === "rtt") { + throw new Error("concurrent mode called with rtt args"); + } + const { concurrency, messageInterval, showMessages } = ARGS; + const workload = + ARGS.mode === "agent-concurrent" + ? makeAgentWorkload() + : makeTunnelStressWorkload(); const workers: Promise[] = []; - for (let i = 0; i < concurrency; i++) { - const id = i + 1; - workers.push(runConcurrentWorker(id)); - if (i < concurrency - 1) { - await sleep(ARGS.interval); + + let stopTimer: ReturnType | undefined; + if (RUN_FOR_MS > 0) { + stopTimer = setTimeout(() => { + closeConcurrentWorkers(); + }, RUN_FOR_MS); + } + + try { + for (let i = 0; i < concurrency; i++) { + const id = i + 1; + concurrentWorkersStarted = id; + setWorkerHealth(id, "pending"); + workers.push( + runConcurrentWorker(id, workload, { + messageInterval, + showMessages, + tokensPerSecond: ARGS.tokensPerSecond, + durationMs: ARGS.durationMs, + }), + ); + if (i < concurrency - 1) { + await sleep(ARGS.interval); + } } + await Promise.all(workers); + } finally { + if (stopTimer) clearTimeout(stopTimer); + printConcurrentSummary("complete"); } - await Promise.all(workers); } async function main(): Promise { @@ -347,12 +953,17 @@ async function main(): Promise { if (ARGS.mode === "rtt") { console.log(`${header} batches=${BATCHES || "∞"} serial=${SERIAL}`); } else { + const agentPart = + ARGS.mode === "agent-concurrent" + ? ` tokens-per-second=${ARGS.tokensPerSecond} duration-ms=${ARGS.durationMs}` + : ""; + const runForPart = RUN_FOR_MS > 0 ? ` run-for-ms=${RUN_FOR_MS}` : ""; console.log( - `${header} concurrency=${ARGS.concurrency} increment-every=${INCREMENT_INTERVAL_MS}ms`, + `${header} concurrency=${ARGS.concurrency} message-every=${ARGS.messageInterval}ms show-messages=${ARGS.showMessages}${agentPart}${runForPart}`, ); } console.log( - `${ANSI.dim}gradient: ${gradientColor(COLOR_MIN_MS)}${COLOR_MIN_MS}ms${ANSI.reset}${ANSI.dim} → ${gradientColor((COLOR_MIN_MS + COLOR_MAX_MS) / 2)}${(COLOR_MIN_MS + COLOR_MAX_MS) / 2}ms${ANSI.reset}${ANSI.dim} → ${gradientColor(COLOR_MAX_MS)}${COLOR_MAX_MS}ms${ANSI.reset}`, + `${ANSI.dim}gradient: ${gradientColor(COLOR_MIN_MS)}${COLOR_MIN_MS}ms${ANSI.reset}${ANSI.dim} -> ${gradientColor((COLOR_MIN_MS + COLOR_MAX_MS) / 2)}${(COLOR_MIN_MS + COLOR_MAX_MS) / 2}ms${ANSI.reset}${ANSI.dim} -> ${gradientColor(COLOR_MAX_MS)}${COLOR_MAX_MS}ms${ANSI.reset}`, ); console.log(); @@ -363,7 +974,11 @@ async function main(): Promise { } } -main().catch((err) => { - console.error("fatal:", err); - process.exit(1); -}); +main() + .catch((err) => { + console.error("fatal:", err); + process.exitCode = 1; + }) + .finally(async () => { + await client.dispose().catch(() => undefined); + }); diff --git a/examples/kitchen-sink/src/actors/http/tunnel-stress.ts b/examples/kitchen-sink/src/actors/http/tunnel-stress.ts new file mode 100644 index 0000000000..36e16177e3 --- /dev/null +++ b/examples/kitchen-sink/src/actors/http/tunnel-stress.ts @@ -0,0 +1,62 @@ +import { actor, type RivetMessageEvent, type UniversalWebSocket } from "rivetkit"; + +export const tunnelStress = actor({ + options: { + canHibernateWebSocket: false, + sleepGracePeriod: 5_000, + }, + state: { + connectionCount: 0, + messageCount: 0, + heartbeatCount: 0, + }, + onWebSocket(c, websocket: UniversalWebSocket) { + c.state.connectionCount += 1; + const connectionId = crypto.randomUUID(); + + const sendHeartbeat = () => { + if (websocket.readyState !== 1) return; + + c.state.heartbeatCount += 1; + websocket.send( + JSON.stringify({ + type: "heartbeat", + connectionId, + heartbeatCount: c.state.heartbeatCount, + timestamp: Date.now(), + }), + ); + }; + + const heartbeat = setInterval(sendHeartbeat, 1_000); + sendHeartbeat(); + + websocket.addEventListener("message", async (event: RivetMessageEvent) => { + c.state.messageCount += 1; + await c.kv.put("counter", String(c.state.messageCount)); + websocket.send( + JSON.stringify({ + type: "reply", + connectionId, + messageCount: c.state.messageCount, + timestamp: Date.now(), + received: event.data, + }), + ); + }); + + websocket.addEventListener("close", () => { + clearInterval(heartbeat); + c.state.connectionCount -= 1; + }); + }, + actions: { + getStats(c) { + return { + connectionCount: c.state.connectionCount, + messageCount: c.state.messageCount, + heartbeatCount: c.state.heartbeatCount, + }; + }, + }, +}); diff --git a/examples/kitchen-sink/src/actors/testing/load-test-agent.ts b/examples/kitchen-sink/src/actors/testing/load-test-agent.ts new file mode 100644 index 0000000000..51aa0e1e75 --- /dev/null +++ b/examples/kitchen-sink/src/actors/testing/load-test-agent.ts @@ -0,0 +1,191 @@ +import { actor, type RivetMessageEvent, type UniversalWebSocket } from "rivetkit"; +import { db } from "rivetkit/db"; + +const DEFAULT_TOKENS_PER_SECOND = 20; +const DEFAULT_DURATION_MS = 5_000; + +function send(websocket: UniversalWebSocket, payload: unknown): void { + if (websocket.readyState !== 1) return; + websocket.send(JSON.stringify(payload)); +} + +function parsePositiveNumber( + value: unknown, + name: string, + fallback: number, +): number { + if (value === undefined || value === null) return fallback; + const parsed = Number(value); + if (!Number.isFinite(parsed) || parsed <= 0) { + throw new Error(`${name} must be a positive number`); + } + return parsed; +} + +function sleep(ms: number, signal: AbortSignal): Promise { + if (signal.aborted) return Promise.resolve(); + return new Promise((resolve) => { + const timeout = setTimeout(resolve, ms); + signal.addEventListener( + "abort", + () => { + clearTimeout(timeout); + resolve(); + }, + { once: true }, + ); + }); +} + +export const loadTestAgent = actor({ + options: { + canHibernateWebSocket: false, + sleepGracePeriod: 30_000, + }, + db: db({ + onMigrate: async (db) => { + await db.execute(` + CREATE TABLE IF NOT EXISTS messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + connection_id TEXT NOT NULL, + request_id TEXT NOT NULL, + token_index INTEGER NOT NULL, + token TEXT NOT NULL, + created_at INTEGER NOT NULL + ) + `); + await db.execute(` + CREATE INDEX IF NOT EXISTS messages_request_idx + ON messages (request_id, token_index) + `); + }, + }), + state: { + connectionCount: 0, + inferenceCount: 0, + tokenCount: 0, + }, + onWebSocket(c, websocket: UniversalWebSocket) { + c.state.connectionCount += 1; + const connectionId = crypto.randomUUID(); + + send(websocket, { + type: "connected", + connectionId, + connectionCount: c.state.connectionCount, + timestamp: Date.now(), + }); + + websocket.addEventListener("message", async (event: RivetMessageEvent) => { + try { + const message = + typeof event.data === "string" + ? JSON.parse(event.data) + : undefined; + if (!message || message.type !== "inference") { + throw new Error("expected inference message"); + } + + const requestId = + typeof message.requestId === "string" && message.requestId + ? message.requestId + : crypto.randomUUID(); + const tokensPerSecond = parsePositiveNumber( + message.tokensPerSecond, + "tokensPerSecond", + DEFAULT_TOKENS_PER_SECOND, + ); + const durationMs = parsePositiveNumber( + message.durationMs, + "durationMs", + DEFAULT_DURATION_MS, + ); + const intervalMs = 1_000 / tokensPerSecond; + const targetTokens = Math.max( + 1, + Math.floor((durationMs / 1_000) * tokensPerSecond), + ); + + const inference = (async () => { + c.state.inferenceCount += 1; + send(websocket, { + type: "inference-start", + connectionId, + requestId, + tokensPerSecond, + durationMs, + targetTokens, + timestamp: Date.now(), + }); + + const startedAt = performance.now(); + for (let i = 0; i < targetTokens; i++) { + if (c.abortSignal.aborted || websocket.readyState !== 1) { + break; + } + + const tokenIndex = i + 1; + const token = `token-${tokenIndex}`; + const createdAt = Date.now(); + await c.db.execute( + "INSERT INTO messages (connection_id, request_id, token_index, token, created_at) VALUES (?, ?, ?, ?, ?)", + connectionId, + requestId, + tokenIndex, + token, + createdAt, + ); + c.state.tokenCount += 1; + + send(websocket, { + type: "token", + connectionId, + requestId, + tokenIndex, + token, + timestamp: createdAt, + }); + + const nextAt = startedAt + tokenIndex * intervalMs; + const delayMs = Math.max(0, nextAt - performance.now()); + if (delayMs > 0) { + await sleep(delayMs, c.abortSignal); + } + } + + send(websocket, { + type: "inference-complete", + connectionId, + requestId, + tokenCount: targetTokens, + timestamp: Date.now(), + }); + })(); + + await c.keepAwake(inference); + } catch (error) { + send(websocket, { + type: "error", + message: + error instanceof Error + ? error.message + : "unknown websocket error", + timestamp: Date.now(), + }); + } + }); + + websocket.addEventListener("close", () => { + c.state.connectionCount -= 1; + }); + }, + actions: { + getStats(c) { + return { + connectionCount: c.state.connectionCount, + inferenceCount: c.state.inferenceCount, + tokenCount: c.state.tokenCount, + }; + }, + }, +}); diff --git a/examples/kitchen-sink/src/index.ts b/examples/kitchen-sink/src/index.ts index a4d5f2fb04..673e25971d 100644 --- a/examples/kitchen-sink/src/index.ts +++ b/examples/kitchen-sink/src/index.ts @@ -4,6 +4,7 @@ import { counter } from "./actors/counter/counter.ts"; import { counterConn } from "./actors/counter/counter-conn.ts"; import { counterWithParams } from "./actors/counter/conn-params.ts"; import { counterWithLifecycle } from "./actors/counter/lifecycle.ts"; +import { pingPongCounter } from "./actors/counter/ping-pong-counter.ts"; // Actions import { inputActor } from "./actors/actions/action-inputs.ts"; import { @@ -58,6 +59,7 @@ import { import { rawFetchCounter } from "./actors/http/raw-fetch-counter.ts"; import { rawWebSocketChatRoom } from "./actors/http/raw-websocket-chat-room.ts"; import { rawWebSocketServerlessSmoke } from "./actors/http/raw-websocket-serverless-smoke.ts"; +import { tunnelStress } from "./actors/http/tunnel-stress.ts"; // Lifecycle import { runWithTicks, @@ -123,6 +125,7 @@ import { rawSqliteFuzzer } from "./actors/testing/raw-sqlite-fuzzer.ts"; import { sqliteMemoryPressure } from "./actors/testing/sqlite-memory-pressure.ts"; import { mockAgenticLoop } from "./actors/testing/mock-agentic-loop.ts"; import { sleepCloseFuzz } from "./actors/testing/sleep-close-fuzz.ts"; +import { loadTestAgent } from "./actors/testing/load-test-agent.ts"; // AI import { aiAgent } from "./actors/ai/ai-agent.ts"; @@ -186,6 +189,7 @@ export const registry = setup({ counterConn, counterWithParams, counterWithLifecycle, + pingPongCounter, // Core API inputActor, syncActionActor, @@ -226,6 +230,7 @@ export const registry = setup({ rawFetchCounter, rawWebSocketChatRoom, rawWebSocketServerlessSmoke, + tunnelStress, // Lifecycle and scheduling runWithTicks, runWithQueueConsumer, @@ -279,6 +284,7 @@ export const registry = setup({ sqliteMemoryPressure, mockAgenticLoop, sleepCloseFuzz, + loadTestAgent, // AI aiAgent, }, diff --git a/package.json b/package.json index a11337ea39..a7fb8d47ce 100644 --- a/package.json +++ b/package.json @@ -11,6 +11,7 @@ "test": "npx turbo test", "test:watch": "npx turbo watch test", "check-types": "npx turbo check-types", + "counter-latency": "pnpm --dir examples/kitchen-sink counter-latency", "lint": "pnpm biome check .", "fmt": "pnpm biome check --write --diagnostic-level=error ." }, diff --git a/scripts/docker/build-push-kitchen-sink-local.sh b/scripts/docker/build-push-kitchen-sink-local.sh new file mode 100755 index 0000000000..ffca96e7a4 --- /dev/null +++ b/scripts/docker/build-push-kitchen-sink-local.sh @@ -0,0 +1,134 @@ +#!/usr/bin/env bash +set -euo pipefail + +AR_HOSTNAME=${AR_HOSTNAME:-us-east4-docker.pkg.dev} +AR_PROJECT_ID=${AR_PROJECT_ID:-dev-projects-491221} +AR_REPOSITORY=${AR_REPOSITORY:-cloud-run-source-deploy} +IMAGE_NAMESPACE=${IMAGE_NAMESPACE:-rivet-dev-rivet} +IMAGE_NAME=${IMAGE_NAME:-rivet-kitchen-sink} +IMAGE_REPO=${IMAGE_REPO:-"${AR_HOSTNAME}/${AR_PROJECT_ID}/${AR_REPOSITORY}/${IMAGE_NAMESPACE}/${IMAGE_NAME}"} + +COMMIT_SHA=${COMMIT_SHA:-$(git rev-parse HEAD)} +DOCKERFILE=${DOCKERFILE:-examples/kitchen-sink/Dockerfile.local} +CONTEXT_DIR=${CONTEXT_DIR:-$(mktemp -d -t rivet-kitchen-sink-image.XXXXXX)} +KEEP_CONTEXT=${KEEP_CONTEXT:-0} +PUSH=${PUSH:-1} + +ROOT_DIR=$(git rev-parse --show-toplevel) +APP_DIR="${CONTEXT_DIR}/app" +TARBALL_DIR="${APP_DIR}/tarballs" + +PACKAGES=( + "rivetkit" + "@rivetkit/react" + "@rivetkit/framework-base" + "@rivetkit/sql-loader" + "@rivetkit/rivetkit-napi" + "@rivetkit/rivetkit-wasm" + "@rivetkit/traces" + "@rivetkit/workflow-engine" + "@rivetkit/engine-cli" + "@rivetkit/engine-envoy-protocol" + "@rivetkit/virtual-websocket" +) + +cleanup() { + if [[ "${KEEP_CONTEXT}" != "1" ]]; then + rm -rf "${CONTEXT_DIR}" + fi +} +trap cleanup EXIT + +echo "Building kitchen-sink and RivetKit packages on host" +pnpm build --filter=kitchen-sink + +echo "Preparing portable Docker context at ${CONTEXT_DIR}" +rm -rf "${CONTEXT_DIR}" +mkdir -p "${APP_DIR}" "${TARBALL_DIR}" + +rsync -a --delete \ + --exclude node_modules \ + --exclude .turbo \ + "${ROOT_DIR}/examples/kitchen-sink/" \ + "${APP_DIR}/" + +for package in "${PACKAGES[@]}"; do + pnpm --filter "${package}" pack --pack-destination "${TARBALL_DIR}" >/dev/null +done + +APP_DIR="${APP_DIR}" TARBALL_DIR="${TARBALL_DIR}" node <<'NODE' +const { execFileSync } = require("node:child_process"); +const fs = require("node:fs"); +const path = require("node:path"); + +const appDir = process.env.APP_DIR; +const tarballDir = process.env.TARBALL_DIR; +const packagePath = path.join(appDir, "package.json"); +const pkg = JSON.parse(fs.readFileSync(packagePath, "utf8")); +const tarballs = fs.readdirSync(tarballDir).filter((file) => file.endsWith(".tgz")); +const packageSpecs = new Map(); + +for (const tarball of tarballs) { + const tarballPath = path.join(tarballDir, tarball); + const packageJson = JSON.parse( + execFileSync("tar", ["-xOf", tarballPath, "package/package.json"], { + encoding: "utf8", + }), + ); + packageSpecs.set(packageJson.name, `file:./tarballs/${tarball}`); +} + +for (const section of ["dependencies", "devDependencies", "optionalDependencies"]) { + if (!pkg[section]) continue; + for (const name of Object.keys(pkg[section])) { + const spec = packageSpecs.get(name); + if (spec) pkg[section][name] = spec; + } +} + +pkg.pnpm = pkg.pnpm || {}; +pkg.pnpm.overrides = { + ...(pkg.pnpm.overrides || {}), +}; +for (const [name, spec] of packageSpecs) { + pkg.pnpm.overrides[name] = spec; +} + +fs.writeFileSync(packagePath, `${JSON.stringify(pkg, null, "\t")}\n`); +NODE + +( + cd "${APP_DIR}" + pnpm install --no-frozen-lockfile +) + +NAPI_PACKAGE_DIR=$(node -e "console.log(require('node:fs').realpathSync(process.argv[1]))" "${APP_DIR}/node_modules/@rivetkit/rivetkit-napi") +shopt -s nullglob +napi_binaries=("${ROOT_DIR}"/rivetkit-typescript/packages/rivetkit-napi/*.node) +shopt -u nullglob +if (( ${#napi_binaries[@]} == 0 )); then + echo "Missing built rivetkit-napi .node binary. Run the build on a supported host." >&2 + exit 1 +fi +cp "${napi_binaries[@]}" "${NAPI_PACKAGE_DIR}/" + +rm -rf "${TARBALL_DIR}" + +echo "Building ${IMAGE_REPO}:${COMMIT_SHA} and ${IMAGE_REPO}:latest" +docker build \ + -f "${ROOT_DIR}/${DOCKERFILE}" \ + -t "${IMAGE_REPO}:${COMMIT_SHA}" \ + -t "${IMAGE_REPO}:latest" \ + "${CONTEXT_DIR}" + +if [[ "${PUSH}" == "1" ]]; then + echo "Pushing ${IMAGE_REPO}:${COMMIT_SHA}" + docker push "${IMAGE_REPO}:${COMMIT_SHA}" + + echo "Pushing ${IMAGE_REPO}:latest" + docker push "${IMAGE_REPO}:latest" +else + echo "Skipping push because PUSH=${PUSH}" +fi + +echo "Done"