diff --git a/examples/kitchen-sink/CLAUDE.md b/examples/kitchen-sink/CLAUDE.md index 7437a1eae9..5744d73690 100644 --- a/examples/kitchen-sink/CLAUDE.md +++ b/examples/kitchen-sink/CLAUDE.md @@ -43,7 +43,7 @@ export GW="https://api.rivet.dev/gateway" curl -s -X PUT "https://api.rivet.dev/actors?namespace=${RIVET_NS}" \ -H "Authorization: Bearer ${RIVET_TOKEN}" \ -H 'Content-Type: application/json' \ - -d '{"name":"","key":"","runner_name_selector":"default","crash_policy":"sleep"}' + -d '{"name":"","key":"","runner_name_selector":"k8s","crash_policy":"sleep"}' ``` This returns `{"actor":{"actor_id":"", ...}, "created": true/false}`. diff --git a/examples/kitchen-sink/Dockerfile b/examples/kitchen-sink/Dockerfile index fe0ebf5dd2..9082737a5c 100644 --- a/examples/kitchen-sink/Dockerfile +++ b/examples/kitchen-sink/Dockerfile @@ -2,8 +2,11 @@ FROM node:22-slim RUN corepack enable && corepack prepare pnpm@10.13.1 --activate WORKDIR /app ENV NODE_OPTIONS=--max-old-space-size=7168 +ENV SKIP_WASM_BUILD=1 +ENV SKIP_NAPI_BUILD=1 COPY package.json pnpm-lock.yaml pnpm-workspace.yaml turbo.json tsconfig.base.json tsup.base.ts ./ +COPY patches/ ./patches/ COPY examples/kitchen-sink/ ./examples/kitchen-sink/ COPY rivetkit-typescript/packages/ ./rivetkit-typescript/packages/ COPY engine/sdks/typescript/ ./engine/sdks/typescript/ @@ -17,4 +20,6 @@ COPY examples/kitchen-sink/public ./examples/kitchen-sink/dist/public WORKDIR /app/examples/kitchen-sink EXPOSE 8080 -CMD ["pnpm", "start"] +# Run the bundled server directly so node is pid1. Signals reach the rivetkit +# SIGTERM handler without going through pnpm/tsx wrappers. +CMD ["node", "--enable-source-maps", "dist-server/server.mjs"] diff --git a/examples/kitchen-sink/package.json b/examples/kitchen-sink/package.json index 701a031b90..3912e8a397 100644 --- a/examples/kitchen-sink/package.json +++ b/examples/kitchen-sink/package.json @@ -9,16 +9,17 @@ "dev:serverless": "RIVET_RUN_ENGINE=1 concurrently -n server,vite,configure \"node --import @rivetkit/sql-loader --import tsx src/server.ts\" \"vite\" \"pnpm dev:serverless:configure\"", "dev:serverless:configure": "node -e \"void (async () => { const port = process.env.PORT ?? '3000'; const url = 'http://127.0.0.1:' + port + '/api/rivet/metadata'; const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms)); for (let i = 0; i < 120; i++) { try { const res = await fetch(url); if (res.ok) { console.log('serverless pool configured'); return; } console.log('serverless configuration returned ' + res.status); } catch {} await sleep(1000); } throw new Error('timed out waiting for serverless configuration at ' + url); })();\"", "check-types": "echo 'skipped - workflow history types broken'", - "build": "vite build", + "build": "vite build && pnpm exec tsup --config tsup.server.config.ts", "test": "node --import tsx --test tests/*.test.ts", - "start": "node --import @rivetkit/sql-loader --import tsx src/server.ts", + "start": "node --enable-source-maps dist-server/server.mjs", "memory-soak": "tsx scripts/sqlite-memory-soak.ts", "proc-metrics": "tsx scripts/proc-metrics-report.ts", "smoke:raw-websocket-serverless": "tsx scripts/raw-websocket-serverless-smoke.ts", "smoke:on-sleep-sigterm": "tsc --noEmit --pretty false --allowImportingTsExtensions --moduleResolution bundler --module esnext --target esnext --types node --lib esnext,dom --skipLibCheck --allowJs --checkJs false scripts/on-sleep-sigterm-smoke.ts src/actors/testing/sigterm-sleep-probe.ts src/actors/state/sqlite-drizzle/drizzle/migrations.js && node --experimental-strip-types --experimental-transform-types --import @rivetkit/sql-loader scripts/on-sleep-sigterm-smoke.ts", + "smoke:on-sleep-remote": "tsc --noEmit --pretty false --allowImportingTsExtensions --moduleResolution bundler --module esnext --target esnext --types node --lib esnext,dom --skipLibCheck --allowJs --checkJs false scripts/on-sleep-remote-watch.ts && node --experimental-strip-types --experimental-transform-types --import @rivetkit/sql-loader scripts/on-sleep-remote-watch.ts", "fuzz:sleep-close": "tsx scripts/sleep-close-fuzz.ts", "mock-agentic-loop": "tsx scripts/mock-agentic-loop.ts", - "counter-latency": "tsx scripts/counter-latency.ts", + "counter-latency": "cargo run --quiet --release --manifest-path scripts/counter-latency/Cargo.toml --", "benchmark": "tsx scripts/benchmark.ts", "db:generate": "find src/actors -name drizzle.config.ts -exec drizzle-kit generate --config {} \\;" }, diff --git a/examples/kitchen-sink/scripts/bench.ts b/examples/kitchen-sink/scripts/bench.ts index 930d973da8..560ad6e5e6 100644 --- a/examples/kitchen-sink/scripts/bench.ts +++ b/examples/kitchen-sink/scripts/bench.ts @@ -29,6 +29,7 @@ const url = new URL(RAW_ENDPOINT); const NAMESPACE = url.username; const TOKEN = url.password; const HOST = `${url.protocol}//${url.host}`; +const RVT_RUNNER = process.env.RIVET_POOL ?? "k8s"; async function callAction( actorName: string, @@ -42,7 +43,7 @@ async function callAction( "rvt-key": key.join(","), "rvt-token": TOKEN, "rvt-namespace": NAMESPACE, - "rvt-runner": "default", + "rvt-runner": RVT_RUNNER, }); const actionUrl = `${HOST}/gateway/${actorName}/action/${action}?${params}`; const res = await fetch(actionUrl, { diff --git a/examples/kitchen-sink/scripts/counter-latency/Cargo.lock b/examples/kitchen-sink/scripts/counter-latency/Cargo.lock new file mode 100644 index 0000000000..7954e6a97c --- /dev/null +++ b/examples/kitchen-sink/scripts/counter-latency/Cargo.lock @@ -0,0 +1,1607 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + +[[package]] +name = "anstream" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "824a212faf96e9acacdbd09febd34438f8f711fb84e09a8916013cd7815ca28d" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "940b3a0ca603d1eade50a4846a2afffd5ef57a9feac2c0e2ec2e14f9ead76000" + +[[package]] +name = "anstyle-parse" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52ce7f38b242319f7cabaa6813055467063ecdc9d355bbb4ce0c68908cd8130e" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" +dependencies = [ + "windows-sys", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" +dependencies = [ + "anstyle", + "once_cell_polyfill", + "windows-sys", +] + +[[package]] +name = "anyhow" +version = "1.0.102" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" + +[[package]] +name = "autocfg" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" + +[[package]] +name = "bitflags" +version = "2.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4512299f36f043ab09a583e57bceb5a5aab7a73db1805848e8fef3c9e8c78b3" + +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + +[[package]] +name = "bumpalo" +version = "3.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb" + +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + +[[package]] +name = "bytes" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" + +[[package]] +name = "cc" +version = "1.2.62" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1dce859f0832a7d088c4f1119888ab94ef4b5d6795d1ce05afb7fe159d79f98" +dependencies = [ + "find-msvc-tools", + "shlex", +] + +[[package]] +name = "cfg-if" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" + +[[package]] +name = "chrono" +version = "0.4.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c673075a2e0e5f4a1dde27ce9dee1ea4558c7ffe648f576438a20ca1d2acc4b0" +dependencies = [ + "iana-time-zone", + "js-sys", + "num-traits", + "wasm-bindgen", + "windows-link", +] + +[[package]] +name = "clap" +version = "4.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ddb117e43bbf7dacf0a4190fef4d345b9bad68dfc649cb349e7d17d28428e51" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "714a53001bf66416adb0e2ef5ac857140e7dc3a0c48fb28b2f10762fc4b5069f" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2ce8604710f6733aa641a2b3731eaa1e8b3d9973d5e3565da11800813f997a9" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "clap_lex" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9" + +[[package]] +name = "colorchoice" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d07550c9036bf2ae0c684c4297d503f838287c83c53686d05370d0e139ae570" + +[[package]] +name = "core-foundation" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2a6cd9ae233e7f62ba4e9353e81a88df7fc8a5987b8d445b4d90c879bd156f6" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "core-foundation-sys" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" + +[[package]] +name = "counter-latency" +version = "0.0.0" +dependencies = [ + "anyhow", + "chrono", + "clap", + "futures-util", + "http", + "serde", + "serde_json", + "tokio", + "tokio-tungstenite", + "url", +] + +[[package]] +name = "cpufeatures" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" +dependencies = [ + "libc", +] + +[[package]] +name = "crypto-common" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78c8292055d1c1df0cce5d180393dc8cce0abec0a7102adb6c7b1eef6016d60a" +dependencies = [ + "generic-array", + "typenum", +] + +[[package]] +name = "data-encoding" +version = "2.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4ae5f15dda3c708c0ade84bfee31ccab44a3da4f88015ed22f63732abe300c8" + +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer", + "crypto-common", +] + +[[package]] +name = "displaydoc" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "equivalent" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" + +[[package]] +name = "errno" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" +dependencies = [ + "libc", + "windows-sys", +] + +[[package]] +name = "fastrand" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6" + +[[package]] +name = "find-msvc-tools" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" + +[[package]] +name = "foldhash" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" + +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + +[[package]] +name = "form_urlencoded" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb4cb245038516f5f85277875cdaa4f7d2c9a0fa0468de06ed190163b1581fcf" +dependencies = [ + "percent-encoding", +] + +[[package]] +name = "futures-core" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" + +[[package]] +name = "futures-macro" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c39754e157331b013978ec91992bde1ac089843443c49cbc7f46150b0fad0893" + +[[package]] +name = "futures-task" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "037711b3d59c33004d3856fbdc83b99d4ff37a24768fa1be9ce3538a1cde4393" + +[[package]] +name = "futures-util" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" +dependencies = [ + "futures-core", + "futures-macro", + "futures-sink", + "futures-task", + "pin-project-lite", + "slab", +] + +[[package]] +name = "generic-array" +version = "0.14.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +dependencies = [ + "typenum", + "version_check", +] + +[[package]] +name = "getrandom" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff2abc00be7fca6ebc474524697ae276ad847ad0a6b3faa4bcb027e9a4614ad0" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + +[[package]] +name = "getrandom" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de51e6874e94e7bf76d726fc5d13ba782deca734ff60d5bb2fb2607c7406555" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "wasip2", + "wasip3", +] + +[[package]] +name = "hashbrown" +version = "0.15.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" +dependencies = [ + "foldhash", +] + +[[package]] +name = "hashbrown" +version = "0.17.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed5909b6e89a2db4456e54cd5f673791d7eca6732202bbf2a9cc504fe2f9b84a" + +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + +[[package]] +name = "http" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3ba2a386d7f85a81f119ad7498ebe444d2e22c2af0b86b069416ace48b3311a" +dependencies = [ + "bytes", + "itoa", +] + +[[package]] +name = "httparse" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" + +[[package]] +name = "iana-time-zone" +version = "0.1.65" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e31bc9ad994ba00e440a8aa5c9ef0ec67d5cb5e5cb0cc7f8b744a35b389cc470" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "log", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + +[[package]] +name = "icu_collections" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2984d1cd16c883d7935b9e07e44071dca8d917fd52ecc02c04d5fa0b5a3f191c" +dependencies = [ + "displaydoc", + "potential_utf", + "utf8_iter", + "yoke", + "zerofrom", + "zerovec", +] + +[[package]] +name = "icu_locale_core" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92219b62b3e2b4d88ac5119f8904c10f8f61bf7e95b640d25ba3075e6cac2c29" +dependencies = [ + "displaydoc", + "litemap", + "tinystr", + "writeable", + "zerovec", +] + +[[package]] +name = "icu_normalizer" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c56e5ee99d6e3d33bd91c5d85458b6005a22140021cc324cea84dd0e72cff3b4" +dependencies = [ + "icu_collections", + "icu_normalizer_data", + "icu_properties", + "icu_provider", + "smallvec", + "zerovec", +] + +[[package]] +name = "icu_normalizer_data" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da3be0ae77ea334f4da67c12f149704f19f81d1adf7c51cf482943e84a2bad38" + +[[package]] +name = "icu_properties" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bee3b67d0ea5c2cca5003417989af8996f8604e34fb9ddf96208a033901e70de" +dependencies = [ + "icu_collections", + "icu_locale_core", + "icu_properties_data", + "icu_provider", + "zerotrie", + "zerovec", +] + +[[package]] +name = "icu_properties_data" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e2bbb201e0c04f7b4b3e14382af113e17ba4f63e2c9d2ee626b720cbce54a14" + +[[package]] +name = "icu_provider" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "139c4cf31c8b5f33d7e199446eff9c1e02decfc2f0eec2c8d71f65befa45b421" +dependencies = [ + "displaydoc", + "icu_locale_core", + "writeable", + "yoke", + "zerofrom", + "zerotrie", + "zerovec", +] + +[[package]] +name = "id-arena" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d3067d79b975e8844ca9eb072e16b31c3c1c36928edf9c6789548c524d0d954" + +[[package]] +name = "idna" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b0875f23caa03898994f6ddc501886a45c7d3d62d04d2d90788d47be1b1e4de" +dependencies = [ + "idna_adapter", + "smallvec", + "utf8_iter", +] + +[[package]] +name = "idna_adapter" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb68373c0d6620ef8105e855e7745e18b0d00d3bdb07fb532e434244cdb9a714" +dependencies = [ + "icu_normalizer", + "icu_properties", +] + +[[package]] +name = "indexmap" +version = "2.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d466e9454f08e4a911e14806c24e16fba1b4c121d1ea474396f396069cf949d9" +dependencies = [ + "equivalent", + "hashbrown 0.17.1", + "serde", + "serde_core", +] + +[[package]] +name = "is_terminal_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" + +[[package]] +name = "itoa" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" + +[[package]] +name = "js-sys" +version = "0.3.98" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67df7112613f8bfd9150013a0314e196f4800d3201ae742489d999db2f979f08" +dependencies = [ + "cfg-if", + "futures-util", + "once_cell", + "wasm-bindgen", +] + +[[package]] +name = "leb128fmt" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" + +[[package]] +name = "libc" +version = "0.2.186" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68ab91017fe16c622486840e4c83c9a37afeff978bd239b5293d61ece587de66" + +[[package]] +name = "linux-raw-sys" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53" + +[[package]] +name = "litemap" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92daf443525c4cce67b150400bc2316076100ce0b3686209eb8cf3c31612e6f0" + +[[package]] +name = "lock_api" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "224399e74b87b5f3557511d98dff8b14089b3dadafcab6bb93eab67d3aace965" +dependencies = [ + "scopeguard", +] + +[[package]] +name = "log" +version = "0.4.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" + +[[package]] +name = "memchr" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" + +[[package]] +name = "mio" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50b7e5b27aa02a74bac8c3f23f448f8d87ff11f92d3aac1a6ed369ee08cc56c1" +dependencies = [ + "libc", + "wasi", + "windows-sys", +] + +[[package]] +name = "native-tls" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "465500e14ea162429d264d44189adc38b199b62b1c21eea9f69e4b73cb03bbf2" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + +[[package]] +name = "num-traits" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" +dependencies = [ + "autocfg", +] + +[[package]] +name = "once_cell" +version = "1.21.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" + +[[package]] +name = "once_cell_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" + +[[package]] +name = "openssl" +version = "0.10.79" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf0b434746ee2832f4f0baf10137e1cabb18cbe6912c69e2e33263c45250f542" +dependencies = [ + "bitflags", + "cfg-if", + "foreign-types", + "libc", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "openssl-probe" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" + +[[package]] +name = "openssl-sys" +version = "0.9.115" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "158fe5b292746440aa6e7a7e690e55aeb72d41505e2804c23c6973ad0e9c9781" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + +[[package]] +name = "parking_lot" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93857453250e3077bd71ff98b6a65ea6621a19bb0f559a85248955ac12c45a1a" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-link", +] + +[[package]] +name = "percent-encoding" +version = "2.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" + +[[package]] +name = "pin-project-lite" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" + +[[package]] +name = "pkg-config" +version = "0.3.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e" + +[[package]] +name = "potential_utf" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0103b1cef7ec0cf76490e969665504990193874ea05c85ff9bab8b911d0a0564" +dependencies = [ + "zerovec", +] + +[[package]] +name = "ppv-lite86" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" +dependencies = [ + "zerocopy", +] + +[[package]] +name = "prettyplease" +version = "0.2.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" +dependencies = [ + "proc-macro2", + "syn", +] + +[[package]] +name = "proc-macro2" +version = "1.0.106" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fd00f0bb2e90d81d1044c2b32617f68fcb9fa3bb7640c23e9c748e53fb30934" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41f2619966050689382d2b44f664f4bc593e129785a36d6ee376ddf37259b924" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "r-efi" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" + +[[package]] +name = "rand" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ca0ecfa931c29007047d1bc58e623ab12e5590e8c7cc53200d5202b69266d8a" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom 0.2.17", +] + +[[package]] +name = "redox_syscall" +version = "0.5.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" +dependencies = [ + "bitflags", +] + +[[package]] +name = "rustix" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6fe4565b9518b83ef4f91bb47ce29620ca828bd32cb7e408f0062e9930ba190" +dependencies = [ + "bitflags", + "errno", + "libc", + "linux-raw-sys", + "windows-sys", +] + +[[package]] +name = "rustversion" +version = "1.0.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" + +[[package]] +name = "schannel" +version = "0.1.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91c1b7e4904c873ef0710c1f407dde2e6287de2bebc1bbbf7d430bb7cbffd939" +dependencies = [ + "windows-sys", +] + +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + +[[package]] +name = "security-framework" +version = "3.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7f4bc775c73d9a02cde8bf7b2ec4c9d12743edf609006c7facc23998404cd1d" +dependencies = [ + "bitflags", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ce2691df843ecc5d231c0b14ece2acc3efb62c0a398c7e1d875f3983ce020e3" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "semver" +version = "1.0.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a7852d02fc848982e0c167ef163aaff9cd91dc640ba85e263cb1ce46fae51cd" + +[[package]] +name = "serde" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", + "serde_derive", +] + +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.149" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" +dependencies = [ + "itoa", + "memchr", + "serde", + "serde_core", + "zmij", +] + +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + +[[package]] +name = "signal-hook-registry" +version = "1.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4db69cba1110affc0e9f7bcd48bbf87b3f4fc7c61fc9155afd4c469eb3d6c1b" +dependencies = [ + "errno", + "libc", +] + +[[package]] +name = "slab" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c790de23124f9ab44544d7ac05d60440adc586479ce501c1d6d7da3cd8c9cf5" + +[[package]] +name = "smallvec" +version = "1.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" + +[[package]] +name = "socket2" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" +dependencies = [ + "libc", + "windows-sys", +] + +[[package]] +name = "stable_deref_trait" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596" + +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + +[[package]] +name = "syn" +version = "2.0.117" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e665b8803e7b1d2a727f4023456bbbbe74da67099c585258af0ad9c5013b9b99" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "synstructure" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tempfile" +version = "3.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd" +dependencies = [ + "fastrand", + "getrandom 0.4.2", + "once_cell", + "rustix", + "windows-sys", +] + +[[package]] +name = "thiserror" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tinystr" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8323304221c2a851516f22236c5722a72eaa19749016521d6dff0824447d96d" +dependencies = [ + "displaydoc", + "zerovec", +] + +[[package]] +name = "tokio" +version = "1.52.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fc7f01b389ac15039e4dc9531aa973a135d7a4135281b12d7c1bc79fd57fffe" +dependencies = [ + "bytes", + "libc", + "mio", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "socket2", + "tokio-macros", + "windows-sys", +] + +[[package]] +name = "tokio-macros" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "385a6cb71ab9ab790c5fe8d67f1645e6c450a7ce006a33de03daa956cf70a496" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + +[[package]] +name = "tokio-tungstenite" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9" +dependencies = [ + "futures-util", + "log", + "native-tls", + "tokio", + "tokio-native-tls", + "tungstenite", +] + +[[package]] +name = "tungstenite" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "native-tls", + "rand", + "sha1", + "thiserror", + "utf-8", +] + +[[package]] +name = "typenum" +version = "1.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40ce102ab67701b8526c123c1bab5cbe42d7040ccfd0f64af1a385808d2f43de" + +[[package]] +name = "unicode-ident" +version = "1.0.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" + +[[package]] +name = "unicode-xid" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" + +[[package]] +name = "url" +version = "2.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff67a8a4397373c3ef660812acab3268222035010ab8680ec4215f38ba3d0eed" +dependencies = [ + "form_urlencoded", + "idna", + "percent-encoding", + "serde", +] + +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + +[[package]] +name = "utf8_iter" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" + +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + +[[package]] +name = "wasi" +version = "0.11.1+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" + +[[package]] +name = "wasip2" +version = "1.0.3+wasi-0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20064672db26d7cdc89c7798c48a0fdfac8213434a1186e5ef29fd560ae223d6" +dependencies = [ + "wit-bindgen 0.57.1", +] + +[[package]] +name = "wasip3" +version = "0.4.0+wasi-0.3.0-rc-2026-01-06" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5428f8bf88ea5ddc08faddef2ac4a67e390b88186c703ce6dbd955e1c145aca5" +dependencies = [ + "wit-bindgen 0.51.0", +] + +[[package]] +name = "wasm-bindgen" +version = "0.2.121" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49ace1d07c165b0864824eee619580c4689389afa9dc9ed3a4c75040d82e6790" +dependencies = [ + "cfg-if", + "once_cell", + "rustversion", + "wasm-bindgen-macro", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.121" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e68e6f4afd367a562002c05637acb8578ff2dea1943df76afb9e83d177c8578" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.121" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d95a9ec35c64b2a7cb35d3fead40c4238d0940c86d107136999567a4703259f2" +dependencies = [ + "bumpalo", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.121" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4e0100b01e9f0d03189a92b96772a1fb998639d981193d7dbab487302513441" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "wasm-encoder" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "990065f2fe63003fe337b932cfb5e3b80e0b4d0f5ff650e6985b1048f62c8319" +dependencies = [ + "leb128fmt", + "wasmparser", +] + +[[package]] +name = "wasm-metadata" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909" +dependencies = [ + "anyhow", + "indexmap", + "wasm-encoder", + "wasmparser", +] + +[[package]] +name = "wasmparser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" +dependencies = [ + "bitflags", + "hashbrown 0.15.5", + "indexmap", + "semver", +] + +[[package]] +name = "windows-core" +version = "0.62.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8e83a14d34d0623b51dce9581199302a221863196a1dde71a7663a4c2be9deb" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-link", + "windows-result", + "windows-strings", +] + +[[package]] +name = "windows-implement" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-interface" +version = "0.59.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-link" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" + +[[package]] +name = "windows-result" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7781fa89eaf60850ac3d2da7af8e5242a5ea78d1a11c49bf2910bb5a73853eb5" +dependencies = [ + "windows-link", +] + +[[package]] +name = "windows-strings" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7837d08f69c77cf6b07689544538e017c1bfcf57e34b4c0ff58e6c2cd3b37091" +dependencies = [ + "windows-link", +] + +[[package]] +name = "windows-sys" +version = "0.61.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc" +dependencies = [ + "windows-link", +] + +[[package]] +name = "wit-bindgen" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5" +dependencies = [ + "wit-bindgen-rust-macro", +] + +[[package]] +name = "wit-bindgen" +version = "0.57.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ebf944e87a7c253233ad6766e082e3cd714b5d03812acc24c318f549614536e" + +[[package]] +name = "wit-bindgen-core" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea61de684c3ea68cb082b7a88508a8b27fcc8b797d738bfc99a82facf1d752dc" +dependencies = [ + "anyhow", + "heck", + "wit-parser", +] + +[[package]] +name = "wit-bindgen-rust" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21" +dependencies = [ + "anyhow", + "heck", + "indexmap", + "prettyplease", + "syn", + "wasm-metadata", + "wit-bindgen-core", + "wit-component", +] + +[[package]] +name = "wit-bindgen-rust-macro" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c0f9bfd77e6a48eccf51359e3ae77140a7f50b1e2ebfe62422d8afdaffab17a" +dependencies = [ + "anyhow", + "prettyplease", + "proc-macro2", + "quote", + "syn", + "wit-bindgen-core", + "wit-bindgen-rust", +] + +[[package]] +name = "wit-component" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" +dependencies = [ + "anyhow", + "bitflags", + "indexmap", + "log", + "serde", + "serde_derive", + "serde_json", + "wasm-encoder", + "wasm-metadata", + "wasmparser", + "wit-parser", +] + +[[package]] +name = "wit-parser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736" +dependencies = [ + "anyhow", + "id-arena", + "indexmap", + "log", + "semver", + "serde", + "serde_derive", + "serde_json", + "unicode-xid", + "wasmparser", +] + +[[package]] +name = "writeable" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ffae5123b2d3fc086436f8834ae3ab053a283cfac8fe0a0b8eaae044768a4c4" + +[[package]] +name = "yoke" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "abe8c5fda708d9ca3df187cae8bfb9ceda00dd96231bed36e445a1a48e66f9ca" +dependencies = [ + "stable_deref_trait", + "yoke-derive", + "zerofrom", +] + +[[package]] +name = "yoke-derive" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de844c262c8848816172cef550288e7dc6c7b7814b4ee56b3e1553f275f1858e" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + +[[package]] +name = "zerocopy" +version = "0.8.48" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eed437bf9d6692032087e337407a86f04cd8d6a16a37199ed57949d415bd68e9" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.8.48" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70e3cd084b1788766f53af483dd21f93881ff30d7320490ec3ef7526d203bad4" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "zerofrom" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ec05a11813ea801ff6d75110ad09cd0824ddba17dfe17128ea0d5f68e6c5272" +dependencies = [ + "zerofrom-derive", +] + +[[package]] +name = "zerofrom-derive" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11532158c46691caf0f2593ea8358fed6bbf68a0315e80aae9bd41fbade684a1" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + +[[package]] +name = "zerotrie" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f9152d31db0792fa83f70fb2f83148effb5c1f5b8c7686c3459e361d9bc20bf" +dependencies = [ + "displaydoc", + "yoke", + "zerofrom", +] + +[[package]] +name = "zerovec" +version = "0.11.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90f911cbc359ab6af17377d242225f4d75119aec87ea711a880987b18cd7b239" +dependencies = [ + "yoke", + "zerofrom", + "zerovec-derive", +] + +[[package]] +name = "zerovec-derive" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "625dc425cab0dca6dc3c3319506e6593dcb08a9f387ea3b284dbd52a92c40555" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "zmij" +version = "1.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa" diff --git a/examples/kitchen-sink/scripts/counter-latency/Cargo.toml b/examples/kitchen-sink/scripts/counter-latency/Cargo.toml new file mode 100644 index 0000000000..df75380fdd --- /dev/null +++ b/examples/kitchen-sink/scripts/counter-latency/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "counter-latency" +version = "0.0.0" +edition = "2021" +publish = false + +[[bin]] +name = "counter-latency" +path = "src/main.rs" + +[dependencies] +anyhow = "1" +chrono = "0.4" +clap = { version = "4", features = ["derive"] } +futures-util = "0.3" +http = "1" +serde = { version = "1", features = ["derive"] } +serde_json = "1" +tokio = { version = "1", features = ["full"] } +tokio-tungstenite = { version = "0.24", features = ["native-tls"] } +url = "2" + +[workspace] diff --git a/examples/kitchen-sink/scripts/counter-latency/src/args.rs b/examples/kitchen-sink/scripts/counter-latency/src/args.rs new file mode 100644 index 0000000000..54038139eb --- /dev/null +++ b/examples/kitchen-sink/scripts/counter-latency/src/args.rs @@ -0,0 +1,186 @@ +// CLI + env parsing. Uses clap derive with short flags for every option. + +use std::env; +use std::process; + +use clap::{Parser, Subcommand}; + +pub const DEFAULT_CONCURRENCY: u32 = 1_000; +pub const DEFAULT_CONCURRENT_INTERVAL_MS: u64 = 300; +pub const DEFAULT_MESSAGE_INTERVAL_MS: u64 = 1_000; +pub const DEFAULT_AGENT_MESSAGE_INTERVAL_MS: u64 = 30_000; +pub const DEFAULT_TOKENS_PER_SECOND: f64 = 20.0; +pub const DEFAULT_DURATION_MS: u64 = 5_000; +pub const MESSAGE_GAP_WARN_MS: f64 = 3_000.0; +pub const ACTOR_STOPPED_CLOSE_CODE: u16 = 1000; +pub const ACTOR_STOPPED_CLOSE_REASON: &str = "hack_force_close"; + +#[derive(Parser)] +#[command( + name = "counter-latency", + about = "Mini load-test client for Rivet kitchen-sink actors", + long_about = "Subcommands:\n \ + rtt spawn fresh counter actors and measure action RTTs\n \ + concurrent ramp persistent raw WebSocket tunnel-stress actors\n \ + agent-concurrent ramp persistent SQLite-backed agent actors\n\nEnv:\n \ + RIVET_ENDPOINT required, proto://:@host\n \ + RIVET_POOL runner pool name (default k8s)\n \ + BATCHES total workers in rtt mode (default infinite)\n \ + SERIAL 1/true to serialize rtt workers\n \ + RUN_FOR_MS stop concurrent modes after this many ms" +)] +struct Cli { + #[command(subcommand)] + command: Cmd, +} + +#[derive(Subcommand)] +enum Cmd { + /// Spawn fresh counter actors and measure action RTTs. + Rtt(RttCli), + /// Ramp persistent raw WebSocket tunnel-stress actors. + Concurrent(ConcurrentCli), + /// Ramp persistent SQLite-backed agent actors. + #[command(name = "agent-concurrent")] + AgentConcurrent(ConcurrentCli), +} + +#[derive(clap::Args, Clone)] +struct RttCli { + /// Gap in ms between worker starts (required). + #[arg(short = 'i', long)] + interval: u64, + /// Wait for actor ready before measuring (default: skip). + #[arg(short = 'w', long)] + wait_ready: bool, +} + +#[derive(clap::Args, Clone)] +struct ConcurrentCli { + /// Ramp-up gap in ms between connections. + #[arg(short = 'i', long, default_value_t = DEFAULT_CONCURRENT_INTERVAL_MS)] + interval: u64, + /// Number of persistent connections. + #[arg(short = 'c', long, default_value_t = DEFAULT_CONCURRENCY)] + concurrency: u32, + /// Gap between client messages in ms (default: 1000 concurrent / 30000 agent-concurrent). + #[arg(short = 'm', long = "message-interval-ms")] + message_interval_ms: Option, + /// SQLite token inserts per second (agent-concurrent only). + #[arg(short = 't', long, default_value_t = DEFAULT_TOKENS_PER_SECOND)] + tokens_per_second: f64, + /// Inference stream duration in ms (agent-concurrent only). + #[arg(short = 'd', long, default_value_t = DEFAULT_DURATION_MS)] + duration_ms: u64, + /// Log all received WebSocket messages. + #[arg(short = 's', long)] + show_messages: bool, + /// Wait for actor ready before connecting (default: skip). + #[arg(short = 'w', long)] + wait_ready: bool, +} + +#[derive(Clone)] +pub struct RttArgs { + pub interval: u64, + pub skip_ready_wait: bool, +} + +#[derive(Clone)] +pub struct ConcurrentArgs { + pub mode: ConcurrentMode, + pub interval: u64, + pub concurrency: u32, + pub message_interval: u64, + pub show_messages: bool, + pub skip_ready_wait: bool, + pub tokens_per_second: f64, + pub duration_ms: u64, +} + +#[derive(Clone, Copy, PartialEq, Eq)] +pub enum ConcurrentMode { + Concurrent, + AgentConcurrent, +} + +#[derive(Clone)] +pub enum Args { + Rtt(RttArgs), + Concurrent(ConcurrentArgs), +} + +impl Args { + pub fn interval(&self) -> u64 { + match self { + Args::Rtt(a) => a.interval, + Args::Concurrent(a) => a.interval, + } + } + + pub fn skip_ready_wait(&self) -> bool { + match self { + Args::Rtt(a) => a.skip_ready_wait, + Args::Concurrent(a) => a.skip_ready_wait, + } + } +} + +pub struct EnvConfig { + pub batches: u64, + pub serial: bool, + pub run_for_ms: u64, + pub rivet_pool: String, + pub endpoint: String, +} + +impl EnvConfig { + pub fn from_env() -> Self { + let batches = env::var("BATCHES").ok().and_then(|v| v.parse().ok()).unwrap_or(0); + let serial = matches!(env::var("SERIAL").as_deref(), Ok("1") | Ok("true")); + let run_for_ms = env::var("RUN_FOR_MS") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(0); + let rivet_pool = env::var("RIVET_POOL").unwrap_or_else(|_| "k8s".to_string()); + let endpoint = match env::var("RIVET_ENDPOINT") { + Ok(v) if !v.is_empty() => v, + _ => { + eprintln!("RIVET_ENDPOINT is required (proto://:@host)"); + process::exit(1); + } + }; + Self { batches, serial, run_for_ms, rivet_pool, endpoint } + } +} + +pub fn parse_cli() -> Args { + let cli = Cli::parse(); + match cli.command { + Cmd::Rtt(rtt) => Args::Rtt(RttArgs { + interval: rtt.interval, + skip_ready_wait: !rtt.wait_ready, + }), + Cmd::Concurrent(c) => Args::Concurrent(build_concurrent(ConcurrentMode::Concurrent, c)), + Cmd::AgentConcurrent(c) => { + Args::Concurrent(build_concurrent(ConcurrentMode::AgentConcurrent, c)) + } + } +} + +fn build_concurrent(mode: ConcurrentMode, cli: ConcurrentCli) -> ConcurrentArgs { + let default_message_interval = match mode { + ConcurrentMode::AgentConcurrent => DEFAULT_AGENT_MESSAGE_INTERVAL_MS, + ConcurrentMode::Concurrent => DEFAULT_MESSAGE_INTERVAL_MS, + }; + ConcurrentArgs { + mode, + interval: cli.interval, + concurrency: cli.concurrency, + message_interval: cli.message_interval_ms.unwrap_or(default_message_interval), + show_messages: cli.show_messages, + skip_ready_wait: !cli.wait_ready, + tokens_per_second: cli.tokens_per_second, + duration_ms: cli.duration_ms, + } +} diff --git a/examples/kitchen-sink/scripts/counter-latency/src/concurrent.rs b/examples/kitchen-sink/scripts/counter-latency/src/concurrent.rs new file mode 100644 index 0000000000..a80547155f --- /dev/null +++ b/examples/kitchen-sink/scripts/counter-latency/src/concurrent.rs @@ -0,0 +1,762 @@ +// Concurrent + agent-concurrent mode. Owns the per-worker WS loop with +// reconnect logic, the workload trait, and the live logging that mirrors +// scripts/counter-latency.ts. + +use std::collections::HashMap; +use std::sync::Arc; +use std::sync::atomic::Ordering; +use std::time::Instant; + +use futures_util::{SinkExt, StreamExt}; +use tokio::sync::Mutex; +use tokio::sync::mpsc; +use tokio::time::sleep; +use tokio_tungstenite::tungstenite::Message; +use tokio_tungstenite::tungstenite::protocol::CloseFrame; +use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode; + +use crate::args::{ + ACTOR_STOPPED_CLOSE_CODE, ACTOR_STOPPED_CLOSE_REASON, ConcurrentArgs, ConcurrentMode, + EnvConfig, MESSAGE_GAP_WARN_MS, +}; +use crate::endpoint::Endpoint; +use crate::log::{ + BLUE, BOLD, DIM, GREEN, RED, RESET, YELLOW, color_ms, format_actor, iso_now, pad, +}; +use crate::rtt::make_key; +use crate::stats::{State, WorkerHealth}; +use crate::ws::open_raw_ws; + +#[derive(Clone)] +pub struct ConcurrentWorkerOptions { + pub message_interval: u64, + pub show_messages: bool, + pub skip_ready_wait: bool, + pub tokens_per_second: f64, + pub duration_ms: u64, +} + +pub struct WorkloadCtx { + pub endpoint: Arc, + pub args: Arc, + pub env: Arc, + pub state: Arc, +} + +impl WorkloadCtx { + pub fn log_prefix(&self) -> String { + let ts = iso_now(); + let (pending, healthy, warning, ended) = self.state.count_worker_health(); + let width = format!("{}", self.args.concurrency).len(); + let pad_num = |n: i64| format!("{:>width$}", n, width = width); + let workers_started = self.state.workers_started(); + let concurrency_part = format!( + "c={}/{}", + pad_num(workers_started), + self.args.concurrency, + ); + let status_part = format!( + "s={}{}{}/{}{}{}/{}{}{}/{}{}{}", + BLUE, + pad_num(pending), + RESET, + GREEN, + pad_num(healthy), + RESET, + YELLOW, + pad_num(warning), + RESET, + RED, + pad_num(ended), + RESET, + ); + format!("{}{}{} [{} {}]", DIM, ts, RESET, concurrency_part, status_part) + } +} + +pub trait Workload: Send + Sync { + fn key_prefix(&self) -> &'static str; + fn suppress_generic_gap(&self) -> bool { + false + } + fn actor_name(&self) -> &'static str; + fn on_open( + &self, + _ctx: Arc, + _worker: u32, + _key: String, + _options: ConcurrentWorkerOptions, + _send_tx: mpsc::Sender, + ) -> WorkloadHooks { + WorkloadHooks::default() + } +} + +#[derive(Default)] +pub struct WorkloadHooks { + pub on_message: Option>, +} + +pub struct TunnelStressWorkload; + +impl Workload for TunnelStressWorkload { + fn key_prefix(&self) -> &'static str { + "cl-t" + } + fn actor_name(&self) -> &'static str { + "tunnelStress" + } + fn on_open( + &self, + _ctx: Arc, + _worker: u32, + _key: String, + options: ConcurrentWorkerOptions, + send_tx: mpsc::Sender, + ) -> WorkloadHooks { + tokio::spawn(async move { + let mut sequence: u64 = 0; + loop { + sleep(std::time::Duration::from_millis(options.message_interval)).await; + sequence += 1; + let payload = serde_json::json!({ + "sequence": sequence, + "timestamp": chrono::Utc::now().timestamp_millis(), + }) + .to_string(); + if send_tx.send(payload).await.is_err() { + break; + } + } + }); + WorkloadHooks::default() + } +} + +pub struct AgentWorkload; + +impl Workload for AgentWorkload { + fn key_prefix(&self) -> &'static str { + "cl-a" + } + fn suppress_generic_gap(&self) -> bool { + true + } + fn actor_name(&self) -> &'static str { + "loadTestAgent" + } + fn on_open( + &self, + ctx: Arc, + worker: u32, + key: String, + options: ConcurrentWorkerOptions, + send_tx: mpsc::Sender, + ) -> WorkloadHooks { + let pending_inference_sends: Arc>> = + Arc::new(Mutex::new(HashMap::new())); + + // Periodic inference sender. + let pending_sends_clone = pending_inference_sends.clone(); + let key_for_send = key.clone(); + let tokens_per_second = options.tokens_per_second; + let duration_ms = options.duration_ms; + let message_interval = options.message_interval; + tokio::spawn(async move { + let mut sequence: u64 = 0; + let mut first = true; + loop { + if !first { + sleep(std::time::Duration::from_millis(message_interval)).await; + } + first = false; + sequence += 1; + let now_ms = chrono::Utc::now().timestamp_millis() as u64; + let request_id = format!( + "agent-{}-{}-{}", + worker, + to_base36(now_ms), + sequence + ); + pending_sends_clone + .lock() + .await + .insert(request_id.clone(), Instant::now()); + let payload = serde_json::json!({ + "type": "inference", + "requestId": request_id, + "tokensPerSecond": tokens_per_second, + "durationMs": duration_ms, + }) + .to_string(); + if send_tx.send(payload).await.is_err() { + break; + } + } + let _ = key_for_send; + }); + + let ctx_for_hook = ctx.clone(); + let key_for_hook = key.clone(); + let on_message: Box = Box::new(move |data: &str| { + let Ok(message) = serde_json::from_str::(data) else { + return; + }; + let ty = message.get("type").and_then(|v| v.as_str()).unwrap_or(""); + if ty == "inference-start" { + if let Some(request_id) = message.get("requestId").and_then(|v| v.as_str()) { + let pending = pending_inference_sends.clone(); + let request_id = request_id.to_string(); + let ctx_inner = ctx_for_hook.clone(); + let key_inner = key_for_hook.clone(); + tokio::spawn(async move { + let mut map = pending.lock().await; + if let Some(sent_at) = map.remove(&request_id) { + let elapsed_ms = + sent_at.elapsed().as_secs_f64() * 1000.0; + if elapsed_ms > MESSAGE_GAP_WARN_MS { + log_message_gap( + &ctx_inner, + worker, + &key_inner, + None, + elapsed_ms, + ); + } + } + }); + } + } else if ty == "slow-sql" { + let elapsed_ms = message.get("elapsedMs").and_then(|v| v.as_f64()); + let request_id = + message.get("requestId").and_then(|v| v.as_str()).unwrap_or("?"); + let token_index = message + .get("tokenIndex") + .and_then(|v| v.as_i64()) + .map(|n| n.to_string()) + .unwrap_or_else(|| "?".to_string()); + if let Some(ms) = elapsed_ms { + let detail = format!("req={} token={}", request_id, token_index); + log_slow_sql(&ctx_for_hook, worker, &key_for_hook, None, ms, &detail); + } + } + }); + WorkloadHooks { + on_message: Some(on_message), + } + } +} + +fn to_base36(mut n: u64) -> String { + if n == 0 { + return "0".to_string(); + } + let mut s = String::new(); + const ALPHA: &[u8] = b"0123456789abcdefghijklmnopqrstuvwxyz"; + while n > 0 { + s.push(ALPHA[(n % 36) as usize] as char); + n /= 36; + } + s.chars().rev().collect() +} + +pub async fn run_concurrent_mode( + args: ConcurrentArgs, + env: Arc, + endpoint: Arc, + state: Arc, +) { + let args = Arc::new(args); + let workload: Arc = match args.mode { + ConcurrentMode::AgentConcurrent => Arc::new(AgentWorkload), + ConcurrentMode::Concurrent => Arc::new(TunnelStressWorkload), + }; + let ctx = Arc::new(WorkloadCtx { + endpoint: endpoint.clone(), + args: args.clone(), + env: env.clone(), + state: state.clone(), + }); + + // Run-for-ms guard: close workers after the deadline. + if env.run_for_ms > 0 { + let state_clone = state.clone(); + let dur = std::time::Duration::from_millis(env.run_for_ms); + tokio::spawn(async move { + sleep(dur).await; + state_clone.set_stopping(); + }); + } + + let mut handles: Vec> = Vec::new(); + for i in 0..args.concurrency { + let id = i + 1; + state.set_workers_started(id as i64); + state.set_worker_health(id, WorkerHealth::Pending); + let ctx_clone = ctx.clone(); + let workload_clone = workload.clone(); + let options = ConcurrentWorkerOptions { + message_interval: args.message_interval, + show_messages: args.show_messages, + skip_ready_wait: args.skip_ready_wait, + tokens_per_second: args.tokens_per_second, + duration_ms: args.duration_ms, + }; + let handle = tokio::spawn(async move { + run_concurrent_worker(id, workload_clone, ctx_clone, options).await; + }); + handles.push(handle); + if i < args.concurrency - 1 { + sleep(std::time::Duration::from_millis(args.interval)).await; + } + } + + for h in handles { + let _ = h.await; + } + print_concurrent_summary(&ctx, "complete"); +} + +async fn run_concurrent_worker( + worker: u32, + workload: Arc, + ctx: Arc, + options: ConcurrentWorkerOptions, +) { + let key = make_key(worker, workload.key_prefix()); + let mut reconnect = false; + let actor_id: Option = None; + + while !ctx.state.stopping() { + let t0 = Instant::now(); + let url = ctx.endpoint.build_raw_ws_url( + workload.actor_name(), + &key, + options.skip_ready_wait, + ); + + let ws = match open_raw_ws(&url).await { + Ok(ws) => ws, + Err(err) => { + let elapsed = t0.elapsed().as_secs_f64() * 1000.0; + log_connect_error( + &ctx, + worker, + &key, + actor_id.as_deref(), + elapsed, + &err.to_string(), + ); + return; + } + }; + let connect_ms = t0.elapsed().as_secs_f64() * 1000.0; + log_connect(&ctx, worker, &key, actor_id.as_deref(), connect_ms, reconnect); + reconnect = false; + + let (mut sink, mut stream) = ws.split(); + let (send_tx, mut send_rx) = mpsc::channel::(64); + let hooks = + workload.on_open(ctx.clone(), worker, key.clone(), options.clone(), send_tx); + + let mut first_message_logged = false; + let mut last_message_at: Option = None; + let mut saw_websocket_error = false; + let mut close_info: Option<(u16, String)> = None; + + loop { + tokio::select! { + biased; + maybe = send_rx.recv() => { + match maybe { + Some(payload) => { + if let Err(err) = sink.send(Message::Text(payload.into())).await { + saw_websocket_error = true; + log_websocket_error(&ctx, worker, &key, actor_id.as_deref()); + let _ = err; + break; + } + } + None => {} + } + } + incoming = stream.next() => { + match incoming { + Some(Ok(Message::Text(text))) => { + let now = Instant::now(); + let data = text.as_str(); + handle_incoming_message( + &ctx, + worker, + &key, + actor_id.as_deref(), + &workload, + t0, + data, + &mut first_message_logged, + &mut last_message_at, + now, + options.show_messages, + &hooks, + ); + } + Some(Ok(Message::Binary(bin))) => { + let now = Instant::now(); + handle_incoming_binary( + &ctx, + worker, + &key, + actor_id.as_deref(), + &workload, + t0, + bin.len(), + &mut first_message_logged, + &mut last_message_at, + now, + options.show_messages, + ); + } + Some(Ok(Message::Ping(_) | Message::Pong(_) | Message::Frame(_))) => { + continue; + } + Some(Ok(Message::Close(frame))) => { + let (code, reason) = match frame { + Some(f) => (u16::from(f.code), f.reason.to_string()), + None => (0, String::new()), + }; + close_info = Some((code, reason)); + break; + } + Some(Err(_)) => { + saw_websocket_error = true; + log_websocket_error(&ctx, worker, &key, actor_id.as_deref()); + break; + } + None => break, + } + } + } + } + + // Clean shutdown: try to send a polite close. + let _ = sink + .send(Message::Close(Some(CloseFrame { + code: CloseCode::Normal, + reason: "counter-latency complete".into(), + }))) + .await; + + let (code, reason) = close_info.unwrap_or((0, String::new())); + if !ctx.state.stopping() + && !saw_websocket_error + && code == ACTOR_STOPPED_CLOSE_CODE + && reason == ACTOR_STOPPED_CLOSE_REASON + { + log_reconnect(&ctx, worker, &key, actor_id.as_deref(), code, &reason); + reconnect = true; + } else { + let unclean = !ctx.state.stopping(); + let detail = format!("code={} reason={}", code, reason); + log_disconnect(&ctx, worker, &key, actor_id.as_deref(), &detail, unclean); + } + if saw_websocket_error { + ctx.state.set_worker_health(worker, WorkerHealth::Ended); + } + if !reconnect { + break; + } + } +} + +#[allow(clippy::too_many_arguments)] +fn handle_incoming_message( + ctx: &Arc, + worker: u32, + key: &str, + actor_id: Option<&str>, + workload: &Arc, + t0: Instant, + data: &str, + first_message_logged: &mut bool, + last_message_at: &mut Option, + now: Instant, + show_messages: bool, + hooks: &WorkloadHooks, +) { + if !*first_message_logged { + *first_message_logged = true; + let elapsed_ms = now.duration_since(t0).as_secs_f64() * 1000.0; + log_first_message(ctx, worker, key, actor_id, elapsed_ms); + } else if !workload.suppress_generic_gap() { + if let Some(prev) = *last_message_at { + let gap_ms = now.duration_since(prev).as_secs_f64() * 1000.0; + if gap_ms > MESSAGE_GAP_WARN_MS { + log_message_gap(ctx, worker, key, actor_id, gap_ms); + } + } + } + *last_message_at = Some(now); + + if show_messages { + let prefix = ctx.log_prefix(); + crate::out!( + "{} {}{} message={}", + prefix, + pad(key, 32), + format_actor(actor_id), + data, + ); + } + + if let Some(handler) = &hooks.on_message { + handler(data); + } +} + +#[allow(clippy::too_many_arguments)] +fn handle_incoming_binary( + ctx: &Arc, + worker: u32, + key: &str, + actor_id: Option<&str>, + workload: &Arc, + t0: Instant, + bytes_len: usize, + first_message_logged: &mut bool, + last_message_at: &mut Option, + now: Instant, + show_messages: bool, +) { + if !*first_message_logged { + *first_message_logged = true; + let elapsed_ms = now.duration_since(t0).as_secs_f64() * 1000.0; + log_first_message(ctx, worker, key, actor_id, elapsed_ms); + } else if !workload.suppress_generic_gap() { + if let Some(prev) = *last_message_at { + let gap_ms = now.duration_since(prev).as_secs_f64() * 1000.0; + if gap_ms > MESSAGE_GAP_WARN_MS { + log_message_gap(ctx, worker, key, actor_id, gap_ms); + } + } + } + *last_message_at = Some(now); + + if show_messages { + let prefix = ctx.log_prefix(); + crate::out!( + "{} {}{} message=", + prefix, + pad(key, 32), + format_actor(actor_id), + bytes_len, + ); + } +} + +fn log_connect( + ctx: &Arc, + worker: u32, + key: &str, + actor_id: Option<&str>, + connect_ms: f64, + reconnect: bool, +) { + ctx.state.stats.connects.fetch_add(1, Ordering::Relaxed); + if reconnect { + ctx.state.stats.reconnects.fetch_add(1, Ordering::Relaxed); + } + ctx.state.set_worker_health(worker, WorkerHealth::Healthy); + let prefix = ctx.log_prefix(); + let label = if reconnect { "reconnect" } else { "connect" }; + crate::out!( + "{} {}{} {}={}", + prefix, + pad(key, 32), + format_actor(actor_id), + label, + color_ms(connect_ms), + ); +} + +fn log_first_message( + ctx: &Arc, + worker: u32, + key: &str, + actor_id: Option<&str>, + first_message_ms: f64, +) { + ctx.state.stats.first_messages.fetch_add(1, Ordering::Relaxed); + let _ = worker; + let prefix = ctx.log_prefix(); + crate::out!( + "{} {}{} first-message={}", + prefix, + pad(key, 32), + format_actor(actor_id), + color_ms(first_message_ms), + ); +} + +fn log_disconnect( + ctx: &Arc, + worker: u32, + key: &str, + actor_id: Option<&str>, + reason: &str, + unclean: bool, +) { + ctx.state.stats.disconnects.fetch_add(1, Ordering::Relaxed); + if unclean { + ctx.state.stats.unclean_failures_or_disconnects.fetch_add(1, Ordering::Relaxed); + } + ctx.state.set_worker_health(worker, WorkerHealth::Ended); + let prefix = ctx.log_prefix(); + let (label_prefix, label) = if unclean { + (RED, "DISCONNECT") + } else { + (DIM, "disconnect") + }; + crate::out!( + "{} {}{} {}{} {}{}", + prefix, + pad(key, 32), + format_actor(actor_id), + label_prefix, + label, + reason, + RESET, + ); +} + +fn log_reconnect( + ctx: &Arc, + worker: u32, + key: &str, + actor_id: Option<&str>, + code: u16, + reason: &str, +) { + ctx.state.set_worker_health(worker, WorkerHealth::Pending); + let prefix = ctx.log_prefix(); + crate::out!( + "{} {}{} actor-stopped reconnect code={} reason={}", + prefix, + pad(key, 32), + format_actor(actor_id), + code, + reason, + ); +} + +fn log_message_gap( + ctx: &Arc, + worker: u32, + key: &str, + actor_id: Option<&str>, + gap_ms: f64, +) { + ctx.state.stats.message_gaps.fetch_add(1, Ordering::Relaxed); + ctx.state.stats.unclean_failures_or_disconnects.fetch_add(1, Ordering::Relaxed); + ctx.state.flag_worker_warning(worker); + let prefix = ctx.log_prefix(); + crate::out!( + "{} {}{} {}MESSAGE-GAP {}{}", + prefix, + pad(key, 32), + format_actor(actor_id), + RED, + color_ms(gap_ms), + RESET, + ); +} + +fn log_slow_sql( + ctx: &Arc, + worker: u32, + key: &str, + actor_id: Option<&str>, + elapsed_ms: f64, + detail: &str, +) { + ctx.state.stats.slow_sql.fetch_add(1, Ordering::Relaxed); + ctx.state.flag_worker_warning(worker); + let prefix = ctx.log_prefix(); + crate::out!( + "{} {}{} {}SLOW-SQL {} {}{}", + prefix, + pad(key, 32), + format_actor(actor_id), + YELLOW, + color_ms(elapsed_ms), + detail, + RESET, + ); +} + +fn log_connect_error( + ctx: &Arc, + worker: u32, + key: &str, + actor_id: Option<&str>, + elapsed_ms: f64, + reason: &str, +) { + ctx.state.stats.connect_errors.fetch_add(1, Ordering::Relaxed); + ctx.state.stats.unclean_failures_or_disconnects.fetch_add(1, Ordering::Relaxed); + ctx.state.set_worker_health(worker, WorkerHealth::Ended); + let prefix = ctx.log_prefix(); + crate::out!( + "{} {}{} {}CONNECT-ERROR {}{} ({})", + prefix, + pad(key, 32), + format_actor(actor_id), + RED, + reason, + RESET, + color_ms(elapsed_ms), + ); +} + +fn log_websocket_error( + ctx: &Arc, + worker: u32, + key: &str, + actor_id: Option<&str>, +) { + ctx.state.stats.websocket_errors.fetch_add(1, Ordering::Relaxed); + ctx.state.stats.unclean_failures_or_disconnects.fetch_add(1, Ordering::Relaxed); + ctx.state.flag_worker_warning(worker); + let prefix = ctx.log_prefix(); + crate::out!( + "{} {}{} {}WEBSOCKET-ERROR{}", + prefix, + pad(key, 32), + format_actor(actor_id), + RED, + RESET, + ); +} + +pub fn print_concurrent_summary(ctx: &Arc, reason: &str) { + let (pending, healthy, warning, ended) = ctx.state.count_worker_health(); + crate::out!( + "{}counter-latency summary{} reason={} c={}/{} s={}{}{}/{}{}{}/{}{}{}/{}{}{} disconnects={} connect-errors={} websocket-errors={} message-gaps={} slow-sql={} connects={} reconnects={} first-messages={}", + BOLD, + RESET, + reason, + ctx.state.workers_started(), + ctx.args.concurrency, + BLUE, pending, RESET, + GREEN, healthy, RESET, + YELLOW, warning, RESET, + RED, ended, RESET, + ctx.state.stats.disconnects.load(Ordering::Relaxed), + ctx.state.stats.connect_errors.load(Ordering::Relaxed), + ctx.state.stats.websocket_errors.load(Ordering::Relaxed), + ctx.state.stats.message_gaps.load(Ordering::Relaxed), + ctx.state.stats.slow_sql.load(Ordering::Relaxed), + ctx.state.stats.connects.load(Ordering::Relaxed), + ctx.state.stats.reconnects.load(Ordering::Relaxed), + ctx.state.stats.first_messages.load(Ordering::Relaxed), + ); +} diff --git a/examples/kitchen-sink/scripts/counter-latency/src/endpoint.rs b/examples/kitchen-sink/scripts/counter-latency/src/endpoint.rs new file mode 100644 index 0000000000..0184be1faf --- /dev/null +++ b/examples/kitchen-sink/scripts/counter-latency/src/endpoint.rs @@ -0,0 +1,122 @@ +// Endpoint URL parsing + raw WebSocket URL builder. Mirrors the top-level +// constants `NAMESPACE`, `TOKEN`, `WS_ORIGIN`, `RVT_RUNNER` and +// `buildRawWebSocketUrl` in scripts/counter-latency.ts. + +use anyhow::{Context, Result, anyhow}; +use url::Url; + +pub struct Endpoint { + pub namespace: String, + pub token: String, + pub ws_origin: String, + pub display_origin: String, + pub rvt_runner: String, +} + +impl Endpoint { + pub fn parse(raw_endpoint: &str, rivet_pool: String) -> Result { + let url = Url::parse(raw_endpoint).context("invalid endpoint URL")?; + let namespace = percent_decode(url.username())?; + let token = match url.password() { + Some(p) => percent_decode(p)?, + None => String::new(), + }; + let ws_proto = if url.scheme() == "https" { "wss" } else { "ws" }; + let host = url + .host_str() + .ok_or_else(|| anyhow!("endpoint missing host"))?; + let port = url.port(); + let host_with_port = match port { + Some(p) => format!("{}:{}", host, p), + None => host.to_string(), + }; + let ws_origin = format!("{}://{}", ws_proto, host_with_port); + let display_origin = format!("{}://{}", url.scheme(), host_with_port); + Ok(Self { + namespace, + token, + ws_origin, + display_origin, + rvt_runner: rivet_pool, + }) + } + + pub fn build_raw_ws_url(&self, actor_name: &str, key: &str, skip_ready_wait: bool) -> String { + let mut params = Vec::<(String, String)>::new(); + params.push(("rvt-namespace".into(), self.namespace.clone())); + params.push(("rvt-method".into(), "getOrCreate".into())); + params.push(("rvt-runner".into(), self.rvt_runner.clone())); + params.push(("rvt-key".into(), key.into())); + params.push(("rvt-crash-policy".into(), "sleep".into())); + if !self.token.is_empty() { + params.push(("rvt-token".into(), self.token.clone())); + } + if skip_ready_wait { + params.push(("rvt-skip-ready-wait".into(), "true".into())); + } + let qs = params + .iter() + .map(|(k, v)| format!("{}={}", encode_query(k), encode_query(v))) + .collect::>() + .join("&"); + format!( + "{}/gateway/{}/websocket?{}", + self.ws_origin, + encode_path(actor_name), + qs, + ) + } +} + +fn percent_decode(s: &str) -> Result { + let decoded = urlencoding_decode(s)?; + Ok(decoded) +} + +// Minimal URL-encoding helpers using percent-encoding semantics compatible +// with `encodeURIComponent` for the characters we care about +// (alphanumerics + `-._~` left as-is; everything else percent-encoded). +fn encode_query(s: &str) -> String { + encode_uri_component(s) +} + +fn encode_path(s: &str) -> String { + encode_uri_component(s) +} + +fn encode_uri_component(s: &str) -> String { + let mut buf = String::with_capacity(s.len()); + for b in s.bytes() { + if b.is_ascii_alphanumeric() || matches!(b, b'-' | b'_' | b'.' | b'~') { + buf.push(b as char); + } else { + buf.push_str(&format!("%{:02X}", b)); + } + } + buf +} + +fn urlencoding_decode(s: &str) -> Result { + let bytes = s.as_bytes(); + let mut out = Vec::with_capacity(bytes.len()); + let mut i = 0; + while i < bytes.len() { + match bytes[i] { + b'%' if i + 2 < bytes.len() => { + let hex = std::str::from_utf8(&bytes[i + 1..i + 3])?; + let v = u8::from_str_radix(hex, 16).context("invalid percent-encoding")?; + out.push(v); + i += 3; + } + b'+' => { + out.push(b' '); + i += 1; + } + b => { + out.push(b); + i += 1; + } + } + } + Ok(String::from_utf8(out).context("invalid UTF-8 after decode")?) +} diff --git a/examples/kitchen-sink/scripts/counter-latency/src/log.rs b/examples/kitchen-sink/scripts/counter-latency/src/log.rs new file mode 100644 index 0000000000..eafd891e0d --- /dev/null +++ b/examples/kitchen-sink/scripts/counter-latency/src/log.rs @@ -0,0 +1,59 @@ +// ANSI helpers, gradient color, log formatting helpers. 1:1 port of the +// console.log layer in scripts/counter-latency.ts. + +use chrono::Utc; + +pub const RESET: &str = "\x1b[0m"; +pub const GREEN: &str = "\x1b[38;2;0;255;0m"; +pub const RED: &str = "\x1b[38;2;255;0;0m"; +pub const YELLOW: &str = "\x1b[38;2;255;200;0m"; +pub const BLUE: &str = "\x1b[38;2;80;160;255m"; +pub const DIM: &str = "\x1b[2m"; +pub const BOLD: &str = "\x1b[1m"; + +pub const COLOR_MIN_MS: f64 = 800.0; +pub const COLOR_MAX_MS: f64 = 2_000.0; + +pub fn gradient_color(ms: f64) -> String { + let clamped = ms.clamp(COLOR_MIN_MS, COLOR_MAX_MS); + let t = (clamped - COLOR_MIN_MS) / (COLOR_MAX_MS - COLOR_MIN_MS); + let r; + let g; + if t <= 0.5 { + r = (t * 2.0 * 255.0).round() as u32; + g = 255u32; + } else { + r = 255u32; + g = ((1.0 - (t - 0.5) * 2.0) * 255.0).round() as u32; + } + format!("\x1b[38;2;{};{};0m", r, g) +} + +pub fn color_ms(ms: f64) -> String { + let fixed = format!("{:>5}", ms.round() as i64); + format!("{}{}ms{}", gradient_color(ms), fixed, RESET) +} + +pub fn pad(s: &str, n: usize) -> String { + if s.len() >= n { + s.to_string() + } else { + let mut buf = String::with_capacity(n); + buf.push_str(s); + for _ in s.len()..n { + buf.push(' '); + } + buf + } +} + +pub fn format_actor(actor_id: Option<&str>) -> String { + match actor_id { + Some(id) if !id.is_empty() => format!(" actor={}", id), + _ => String::new(), + } +} + +pub fn iso_now() -> String { + Utc::now().format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string() +} diff --git a/examples/kitchen-sink/scripts/counter-latency/src/main.rs b/examples/kitchen-sink/scripts/counter-latency/src/main.rs new file mode 100644 index 0000000000..2086494c28 --- /dev/null +++ b/examples/kitchen-sink/scripts/counter-latency/src/main.rs @@ -0,0 +1,192 @@ +// counter-latency: Rust port of scripts/counter-latency.ts. +// Subcommands: +// rtt spawn fresh counter actors and measure raw-WS RTTs. +// concurrent ramp persistent raw WS tunnel-stress actors. +// agent-concurrent ramp persistent SQLite-backed agent actors. +// +// Env: +// BATCHES total workers spawned before exit in rtt mode. Default: infinite. +// SERIAL "1" / "true" to await each worker before the next in rtt mode. +// RUN_FOR_MS optional run cap for concurrent modes. +// RIVET_POOL runner pool name (default "k8s"). + +mod args; +mod concurrent; +mod endpoint; +mod log; +mod rtt; +mod stats; +mod tee; +mod ws; + +use std::sync::Arc; + +use crate::args::{Args, EnvConfig}; +use crate::concurrent::{WorkloadCtx, print_concurrent_summary}; +use crate::endpoint::Endpoint; +use crate::log::{BOLD, COLOR_MIN_MS, COLOR_MAX_MS, DIM, RESET, gradient_color}; +use crate::stats::State; + +fn main() { + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("tokio runtime"); + runtime.block_on(run()); +} + +async fn run() { + let parsed = args::parse_cli(); + let env_cfg = Arc::new(EnvConfig::from_env()); + + let run_id = format!( + "{}-{}", + chrono::Utc::now().format("%Y%m%dT%H%M%S"), + std::process::id(), + ); + match tee::init(&run_id) { + Ok(path) => eprintln!("counter-latency log: {}", path), + Err(err) => { + eprintln!("fatal: cannot open log file: {}", err); + std::process::exit(1); + } + } + + let endpoint = match Endpoint::parse(&env_cfg.endpoint, env_cfg.rivet_pool.clone()) { + Ok(e) => Arc::new(e), + Err(err) => { + eprintln!("fatal: {}", err); + std::process::exit(1); + } + }; + print_header(&parsed, &env_cfg, &endpoint); + + match parsed { + Args::Rtt(rtt_args) => { + rtt::run_rtt_mode(rtt_args, env_cfg.clone(), endpoint.clone()).await; + } + Args::Concurrent(concurrent_args) => { + let state = Arc::new(State::new()); + let ctx = Arc::new(WorkloadCtx { + endpoint: endpoint.clone(), + args: Arc::new(concurrent_args.clone()), + env: env_cfg.clone(), + state: state.clone(), + }); + install_signal_handlers(ctx.clone()); + concurrent::run_concurrent_mode( + concurrent_args, + env_cfg.clone(), + endpoint.clone(), + state.clone(), + ) + .await; + } + } +} + +fn install_signal_handlers(ctx: Arc) { + let ctx_int = ctx.clone(); + tokio::spawn(async move { + if tokio::signal::ctrl_c().await.is_ok() { + ctx_int.state.set_stopping(); + print_concurrent_summary(&ctx_int, "sigint"); + std::process::exit(130); + } + }); + + #[cfg(unix)] + { + use tokio::signal::unix::{SignalKind, signal}; + let ctx_term = ctx.clone(); + tokio::spawn(async move { + let Ok(mut sig) = signal(SignalKind::terminate()) else { + return; + }; + if sig.recv().await.is_some() { + ctx_term.state.set_stopping(); + print_concurrent_summary(&ctx_term, "sigterm"); + std::process::exit(143); + } + }); + } +} + +fn print_header(args: &Args, env: &EnvConfig, endpoint: &Endpoint) { + let mode = match args { + Args::Rtt(_) => "rtt", + Args::Concurrent(a) => match a.mode { + args::ConcurrentMode::Concurrent => "concurrent", + args::ConcurrentMode::AgentConcurrent => "agent-concurrent", + }, + }; + let header = format!( + "{}counter-latency{} endpoint={} ns={} mode={} interval={}ms", + BOLD, + RESET, + endpoint.display_origin, + endpoint.namespace, + mode, + args.interval(), + ); + match args { + Args::Rtt(_) => { + let batches = if env.batches == 0 { + "∞".to_string() + } else { + env.batches.to_string() + }; + out!( + "{} batches={} serial={} skip-ready-wait={} rvt-runner={}", + header, + batches, + env.serial, + args.skip_ready_wait(), + env.rivet_pool, + ); + } + Args::Concurrent(a) => { + let agent_part = if matches!(a.mode, args::ConcurrentMode::AgentConcurrent) { + format!( + " tokens-per-second={} duration-ms={}", + a.tokens_per_second, a.duration_ms, + ) + } else { + String::new() + }; + let run_for_part = if env.run_for_ms > 0 { + format!(" run-for-ms={}", env.run_for_ms) + } else { + String::new() + }; + out!( + "{} concurrency={} message-every={}ms show-messages={} skip-ready-wait={} rvt-runner={}{}{}", + header, + a.concurrency, + a.message_interval, + a.show_messages, + a.skip_ready_wait, + env.rivet_pool, + agent_part, + run_for_part, + ); + } + } + let mid = (COLOR_MIN_MS + COLOR_MAX_MS) / 2.0; + out!( + "{}gradient: {}{}ms{}{} -> {}{}ms{}{} -> {}{}ms{}", + DIM, + gradient_color(COLOR_MIN_MS), + COLOR_MIN_MS as i64, + RESET, + DIM, + gradient_color(mid), + mid as i64, + RESET, + DIM, + gradient_color(COLOR_MAX_MS), + COLOR_MAX_MS as i64, + RESET, + ); + out!(); +} diff --git a/examples/kitchen-sink/scripts/counter-latency/src/rtt.rs b/examples/kitchen-sink/scripts/counter-latency/src/rtt.rs new file mode 100644 index 0000000000..1a72385ee8 --- /dev/null +++ b/examples/kitchen-sink/scripts/counter-latency/src/rtt.rs @@ -0,0 +1,189 @@ +// Rtt subcommand: spawn workers that open a raw WS, send "1" + "2", and +// measure connect/first/second/total latency. + +use std::sync::Arc; +use std::time::Instant; + +use futures_util::{SinkExt, StreamExt}; +use tokio::time::sleep; +use tokio_tungstenite::tungstenite::Message; +use tokio_tungstenite::tungstenite::protocol::CloseFrame; +use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode; + +use crate::args::{EnvConfig, RttArgs}; +use crate::endpoint::Endpoint; +use crate::log::{DIM, RED, RESET, color_ms, format_actor, iso_now, pad}; +use crate::ws::open_raw_ws; + +pub struct Sample { + pub worker: u32, + pub key: String, + pub connect_ms: f64, + pub first_ms: f64, + pub second_ms: f64, + pub total_ms: f64, + pub actor_id: Option, + pub error: Option, +} + +pub async fn run_rtt_mode(args: RttArgs, env: Arc, endpoint: Arc) { + let mut worker_id: u32 = 0; + let mut inflight: Vec> = Vec::new(); + loop { + if env.batches != 0 && (worker_id as u64) >= env.batches { + break; + } + worker_id += 1; + let id = worker_id; + let endpoint_clone = endpoint.clone(); + let skip_ready_wait = args.skip_ready_wait; + if env.serial { + let sample = run_rtt_worker(id, endpoint_clone, skip_ready_wait).await; + print_rtt_sample(&sample); + } else { + let handle = tokio::spawn(async move { + let sample = run_rtt_worker(id, endpoint_clone, skip_ready_wait).await; + print_rtt_sample(&sample); + }); + inflight.push(handle); + } + if env.batches == 0 || (worker_id as u64) < env.batches { + sleep(std::time::Duration::from_millis(args.interval)).await; + } + } + for h in inflight { + let _ = h.await; + } +} + +async fn run_rtt_worker(worker: u32, endpoint: Arc, skip_ready_wait: bool) -> Sample { + let key = make_key(worker, "cl"); + let actor_id: Option = None; + let url = endpoint.build_raw_ws_url("counter", &key, skip_ready_wait); + let t0 = Instant::now(); + + match open_and_run_rtt(&url, t0).await { + Ok((connect_ms, first_ms, second_ms, total_ms)) => Sample { + worker, + key, + connect_ms, + first_ms, + second_ms, + total_ms, + actor_id, + error: None, + }, + Err(err) => Sample { + worker, + key, + connect_ms: 0.0, + first_ms: 0.0, + second_ms: 0.0, + total_ms: 0.0, + actor_id, + error: Some(err.to_string()), + }, + } +} + +async fn open_and_run_rtt(url: &str, t0: Instant) -> anyhow::Result<(f64, f64, f64, f64)> { + let ws = open_raw_ws(url).await?; + let t_connect = Instant::now(); + let (mut sink, mut stream) = ws.split(); + + sink.send(Message::Text("1".into())).await?; + wait_for_echo(&mut stream).await?; + let t_first = Instant::now(); + + sink.send(Message::Text("2".into())).await?; + wait_for_echo(&mut stream).await?; + let t_second = Instant::now(); + + let _ = sink + .send(Message::Close(Some(CloseFrame { + code: CloseCode::Normal, + reason: "rtt done".into(), + }))) + .await; + + let connect_ms = elapsed_ms(t0, t_connect); + let first_ms = elapsed_ms(t_connect, t_first); + let second_ms = elapsed_ms(t_first, t_second); + let total_ms = elapsed_ms(t0, t_second); + Ok((connect_ms, first_ms, second_ms, total_ms)) +} + +async fn wait_for_echo(stream: &mut S) -> anyhow::Result<()> +where + S: futures_util::stream::Stream< + Item = Result, + > + Unpin, +{ + loop { + match stream.next().await { + Some(Ok(Message::Text(_) | Message::Binary(_))) => return Ok(()), + Some(Ok(Message::Ping(_) | Message::Pong(_) | Message::Frame(_))) => continue, + Some(Ok(Message::Close(frame))) => { + let (code, reason) = match frame { + Some(f) => (u16::from(f.code), f.reason.to_string()), + None => (0, String::new()), + }; + return Err(anyhow::anyhow!( + "ws closed before echo code={} reason={}", + code, + reason + )); + } + Some(Err(e)) => return Err(anyhow::anyhow!("ws error before echo: {}", e)), + None => return Err(anyhow::anyhow!("ws stream ended before echo")), + } + } +} + +fn elapsed_ms(start: Instant, end: Instant) -> f64 { + end.duration_since(start).as_secs_f64() * 1000.0 +} + +pub fn make_key(worker: u32, prefix: &str) -> String { + let now_ms = chrono::Utc::now().timestamp_millis(); + format!("{}-{}-{}", prefix, worker, base36(now_ms as u64)) +} + +fn base36(mut n: u64) -> String { + if n == 0 { + return "0".to_string(); + } + let mut s = String::new(); + const ALPHA: &[u8] = b"0123456789abcdefghijklmnopqrstuvwxyz"; + while n > 0 { + s.push(ALPHA[(n % 36) as usize] as char); + n /= 36; + } + s.chars().rev().collect() +} + +pub fn print_rtt_sample(s: &Sample) { + let prefix = format!("{}{}{}", DIM, iso_now(), RESET); + if let Some(err) = &s.error { + crate::out!( + "{} {} {}ERROR {}{} ({})", + prefix, + pad(&s.key, 32), + RED, + err, + RESET, + color_ms(s.total_ms), + ); + return; + } + crate::out!( + "{} {}{} connect={} first={} second={} total={}", + prefix, + pad(&s.key, 32), + format_actor(s.actor_id.as_deref()), + color_ms(s.connect_ms), + color_ms(s.first_ms), + color_ms(s.second_ms), + color_ms(s.total_ms), + ); +} diff --git a/examples/kitchen-sink/scripts/counter-latency/src/stats.rs b/examples/kitchen-sink/scripts/counter-latency/src/stats.rs new file mode 100644 index 0000000000..4313310e79 --- /dev/null +++ b/examples/kitchen-sink/scripts/counter-latency/src/stats.rs @@ -0,0 +1,104 @@ +// Worker health + concurrent stats counters. Mirrors the global state at +// the top of scripts/counter-latency.ts. + +use std::collections::HashMap; +use std::sync::Mutex; +use std::sync::atomic::{AtomicBool, AtomicI64, Ordering}; + +#[derive(Clone, Copy, PartialEq, Eq)] +pub enum WorkerHealth { + Pending, + Healthy, + Warning, + Ended, +} + +pub struct ConcurrentStats { + pub connects: AtomicI64, + pub reconnects: AtomicI64, + pub first_messages: AtomicI64, + pub connect_errors: AtomicI64, + pub websocket_errors: AtomicI64, + pub disconnects: AtomicI64, + pub message_gaps: AtomicI64, + pub slow_sql: AtomicI64, + pub unclean_failures_or_disconnects: AtomicI64, +} + +impl ConcurrentStats { + pub fn new() -> Self { + Self { + connects: AtomicI64::new(0), + reconnects: AtomicI64::new(0), + first_messages: AtomicI64::new(0), + connect_errors: AtomicI64::new(0), + websocket_errors: AtomicI64::new(0), + disconnects: AtomicI64::new(0), + message_gaps: AtomicI64::new(0), + slow_sql: AtomicI64::new(0), + unclean_failures_or_disconnects: AtomicI64::new(0), + } + } +} + +pub struct State { + pub stats: ConcurrentStats, + pub workers_started: AtomicI64, + pub stopping: AtomicBool, + pub worker_health: Mutex>, +} + +impl State { + pub fn new() -> Self { + Self { + stats: ConcurrentStats::new(), + workers_started: AtomicI64::new(0), + stopping: AtomicBool::new(false), + worker_health: Mutex::new(HashMap::new()), + } + } + + pub fn set_worker_health(&self, worker: u32, state: WorkerHealth) { + self.worker_health.lock().unwrap().insert(worker, state); + } + + pub fn flag_worker_warning(&self, worker: u32) { + let mut map = self.worker_health.lock().unwrap(); + if let Some(WorkerHealth::Healthy) = map.get(&worker) { + map.insert(worker, WorkerHealth::Warning); + } + } + + pub fn count_worker_health(&self) -> (i64, i64, i64, i64) { + let map = self.worker_health.lock().unwrap(); + let mut pending = 0i64; + let mut healthy = 0i64; + let mut warning = 0i64; + let mut ended = 0i64; + for s in map.values() { + match s { + WorkerHealth::Pending => pending += 1, + WorkerHealth::Healthy => healthy += 1, + WorkerHealth::Warning => warning += 1, + WorkerHealth::Ended => ended += 1, + } + } + (pending, healthy, warning, ended) + } + + pub fn workers_started(&self) -> i64 { + self.workers_started.load(Ordering::Relaxed) + } + + pub fn set_workers_started(&self, n: i64) { + self.workers_started.store(n, Ordering::Relaxed); + } + + pub fn stopping(&self) -> bool { + self.stopping.load(Ordering::Relaxed) + } + + pub fn set_stopping(&self) { + self.stopping.store(true, Ordering::Relaxed); + } +} diff --git a/examples/kitchen-sink/scripts/counter-latency/src/tee.rs b/examples/kitchen-sink/scripts/counter-latency/src/tee.rs new file mode 100644 index 0000000000..d12c47c274 --- /dev/null +++ b/examples/kitchen-sink/scripts/counter-latency/src/tee.rs @@ -0,0 +1,47 @@ +// Mirror every stdout line written via `out!` to a per-run /tmp/counter-latency-.txt +// transcript. Initialized once at startup; all log helpers route through `out!`. + +use std::fs::File; +use std::io::{Write, stdout}; +use std::sync::{Mutex, OnceLock}; + +static LOG_FILE: OnceLock> = OnceLock::new(); +static LOG_FILE_PATH: OnceLock = OnceLock::new(); + +pub fn init(id: &str) -> std::io::Result { + let path = format!("/tmp/counter-latency-{}.txt", id); + let file = File::create(&path)?; + LOG_FILE + .set(Mutex::new(file)) + .map_err(|_| std::io::Error::new(std::io::ErrorKind::AlreadyExists, "log file already initialized"))?; + LOG_FILE_PATH + .set(path.clone()) + .map_err(|_| std::io::Error::new(std::io::ErrorKind::AlreadyExists, "log path already set"))?; + Ok(path) +} + +pub fn log_file_path() -> Option<&'static str> { + LOG_FILE_PATH.get().map(|s| s.as_str()) +} + +pub fn emit(line: &str) { + { + let mut out = stdout().lock(); + let _ = writeln!(out, "{}", line); + } + if let Some(file_mu) = LOG_FILE.get() { + if let Ok(mut f) = file_mu.lock() { + let _ = writeln!(f, "{}", line); + } + } +} + +#[macro_export] +macro_rules! out { + () => { + $crate::tee::emit(""); + }; + ($($arg:tt)*) => { + $crate::tee::emit(&format!($($arg)*)); + }; +} diff --git a/examples/kitchen-sink/scripts/counter-latency/src/ws.rs b/examples/kitchen-sink/scripts/counter-latency/src/ws.rs new file mode 100644 index 0000000000..f6b90b481a --- /dev/null +++ b/examples/kitchen-sink/scripts/counter-latency/src/ws.rs @@ -0,0 +1,23 @@ +// WebSocket connect helper for raw rivet gateway routing. Wraps +// tokio-tungstenite and sets the protocol subheaders the gateway +// expects (`rivet`, `rivet_encoding.json`). + +use anyhow::{Context, Result}; +use http::HeaderValue; +use tokio::net::TcpStream; +use tokio_tungstenite::tungstenite::client::IntoClientRequest; +use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async}; + +pub const RIVET_PROTOCOLS: &[&str] = &["rivet", "rivet_encoding.json"]; + +pub type Ws = WebSocketStream>; + +pub async fn open_raw_ws(url: &str) -> Result { + let mut req = url.into_client_request().context("invalid websocket URL")?; + req.headers_mut().insert( + "Sec-WebSocket-Protocol", + HeaderValue::from_static("rivet, rivet_encoding.json"), + ); + let (ws, _resp) = connect_async(req).await.context("websocket connect failed")?; + Ok(ws) +} diff --git a/examples/kitchen-sink/scripts/mock-agentic-loop.ts b/examples/kitchen-sink/scripts/mock-agentic-loop.ts index 789a8f9baa..2f0709bea2 100644 --- a/examples/kitchen-sink/scripts/mock-agentic-loop.ts +++ b/examples/kitchen-sink/scripts/mock-agentic-loop.ts @@ -38,7 +38,7 @@ const NAMESPACE = const TOKEN = process.env.MOCK_AGENTIC_TOKEN ?? process.env.RIVET_TOKEN ?? "dev"; const POOL_NAME = - process.env.MOCK_AGENTIC_POOL ?? process.env.RIVET_POOL ?? "default"; + process.env.MOCK_AGENTIC_POOL ?? process.env.RIVET_POOL ?? "k8s"; const KEY_PREFIX = process.env.MOCK_AGENTIC_KEY_PREFIX ?? "mock-agentic-loop"; const DURATION_MS = numberFromEnv("MOCK_AGENTIC_DURATION_MS", 180_000); const INFERENCE_MIN_SECONDS = numberFromEnv( diff --git a/examples/kitchen-sink/scripts/on-sleep-remote-watch.ts b/examples/kitchen-sink/scripts/on-sleep-remote-watch.ts new file mode 100644 index 0000000000..4985a41692 --- /dev/null +++ b/examples/kitchen-sink/scripts/on-sleep-remote-watch.ts @@ -0,0 +1,508 @@ +// Remote onSleep watcher for a kitchen-sink envoy pool. +// +// This does not start the engine or kitchen-sink. Point it at a remote Rivet +// endpoint, then manually roll the envoy pods after the WebSocket is open. +// +// Usage: +// RIVET_ENDPOINT=https://namespace:token@... \ +// pnpm smoke:on-sleep-remote -- \ +// --pool kitchen-sink \ +// --on-sleep-duration-ms 60000 \ +// --open-delay-ms 0 \ +// --reconnect-timeout-ms 5000 + +import { createClient } from "rivetkit/client"; + +installTimestampedConsole(); + +const CLI_ARGS = parseCliArgs(process.argv.slice(2)); +const RAW_ENDPOINT = requiredStringFromConfig("endpoint", ["RIVET_ENDPOINT"]); +const POOL_NAME = stringFromConfig("pool", ["RIVET_POOL"], "default"); +const ENDPOINT = parseEndpoint(RAW_ENDPOINT, POOL_NAME); +const KEY = stringFromConfig( + "key", + ["SIGTERM_SLEEP_KEY"], + `remote-sleep-probe-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`, +); +const LABEL = stringFromConfig("label", ["SIGTERM_SLEEP_LABEL"], KEY); +const ON_SLEEP_DURATION_MS = numberFromConfig( + "on-sleep-duration-ms", + ["SIGTERM_SLEEP_ON_SLEEP_DURATION_MS"], + 60_000, +); +const ON_SLEEP_TICK_MS = numberFromConfig( + "on-sleep-tick-ms", + ["SIGTERM_SLEEP_ON_SLEEP_TICK_MS"], + 1_000, +); +const OPEN_TIMEOUT_MS = numberFromConfig("open-timeout-ms", [], 15_000); +const OPEN_DELAY_MS = numberFromConfig("open-delay-ms", [], 0); +const MESSAGE_TIMEOUT_MS = numberFromConfig("message-timeout-ms", [], 5_000); +const RECONNECT_TIMEOUT_MS = numberFromConfig("reconnect-timeout-ms", [], 5_000); +const WATCH_TIMEOUT_MS = numberFromConfig( + "watch-timeout-ms", + [], + Math.max(5 * 60_000, ON_SLEEP_DURATION_MS + 2 * 60_000), +); +const CLOSE_CODE = 1000; +const CLOSE_REASON = "actor stopped"; + +type JsonRecord = Record; + +interface CloseInfo { + code: number; + reason: string; + wasClean: boolean; + at: number; +} + +interface ProofRow { + id: number; + event: string; + sleep_count: number; + detail: string | null; + created_at: number; +} + +interface Proof { + state: { + label: string; + wakeCount: number; + sleepCount: number; + onSleepDurationMs: number; + onSleepTickMs: number; + connectionCount: number; + messageCount: number; + onSleepStartedAt: number | null; + onSleepAsyncFinishedAt: number | null; + onSleepFinishedAt: number | null; + onSleepLastError: string | null; + }; + rows: ProofRow[]; +} + +function installTimestampedConsole(): void { + const originalLog = console.log.bind(console); + const originalError = console.error.bind(console); + const originalWarn = console.warn.bind(console); + console.log = (...args: unknown[]) => originalLog(`[${new Date().toISOString()}]`, ...args); + console.error = (...args: unknown[]) => + originalError(`[${new Date().toISOString()}]`, ...args); + console.warn = (...args: unknown[]) => + originalWarn(`[${new Date().toISOString()}]`, ...args); +} + +function parseCliArgs(args: string[]): Map { + const parsed = new Map(); + for (let i = 0; i < args.length; i += 1) { + const arg = args[i]; + if (arg === "--") continue; + if (!arg.startsWith("--")) { + throw new Error(`unexpected argument "${arg}". Use --name value.`); + } + + const eqIndex = arg.indexOf("="); + if (eqIndex !== -1) { + const name = arg.slice(2, eqIndex); + const value = arg.slice(eqIndex + 1); + if (!name || value === "") { + throw new Error(`invalid argument "${arg}". Use --name=value.`); + } + parsed.set(name, value); + continue; + } + + const name = arg.slice(2); + const value = args[i + 1]; + if (!name || value === undefined || value.startsWith("--")) { + throw new Error(`missing value for --${name}`); + } + parsed.set(name, value); + i += 1; + } + return parsed; +} + +function stringFromConfig( + argName: string, + envNames: string[], + fallback: string, +): string { + const arg = CLI_ARGS.get(argName); + if (arg !== undefined) return arg; + for (const envName of envNames) { + const raw = process.env[envName]; + if (raw !== undefined && raw !== "") return raw; + } + return fallback; +} + +function requiredStringFromConfig(argName: string, envNames: string[]): string { + const arg = CLI_ARGS.get(argName); + if (arg !== undefined && arg !== "") return arg; + for (const envName of envNames) { + const raw = process.env[envName]; + if (raw !== undefined && raw !== "") return raw; + } + throw new Error( + `missing required --${argName}. Set ${envNames.join(" or ")} or pass --${argName}.`, + ); +} + +function numberFromConfig( + argName: string, + envNames: string[], + fallback: number, +): number { + const raw = + CLI_ARGS.get(argName) ?? + envNames.map((envName) => process.env[envName]).find((value) => value); + if (raw === undefined || raw === "") return fallback; + const parsed = Number(raw); + if (!Number.isFinite(parsed) || parsed < 0) { + throw new Error(`--${argName} must be a finite non-negative number`); + } + return parsed; +} + +function formatError(error: unknown): string { + if (error instanceof Error) return `${error.name}: ${error.message}`; + return String(error); +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +function parseEndpoint(raw: string, poolName: string) { + const url = new URL(raw); + const namespace = decodeURIComponent(url.username); + const token = url.password ? decodeURIComponent(url.password) : undefined; + if (!namespace) { + throw new Error("RIVET_ENDPOINT must include namespace auth, e.g. https://namespace:token@host"); + } + url.username = ""; + url.password = ""; + const endpoint = url.toString().replace(/\/$/, ""); + const wsProtocol = url.protocol === "https:" ? "wss:" : "ws:"; + const wsOrigin = `${wsProtocol}//${url.host}`; + return { endpoint, namespace, token, poolName, wsOrigin }; +} + +function buildRawWebSocketUrl(): string { + const params = new URLSearchParams(); + params.set("rvt-namespace", ENDPOINT.namespace); + params.set("rvt-method", "getOrCreate"); + params.set("rvt-runner", ENDPOINT.poolName); + params.set("rvt-key", KEY); + params.set("rvt-crash-policy", "sleep"); + params.set("rvt-skip-ready-wait", "true"); + if (ENDPOINT.token) { + params.set("rvt-token", ENDPOINT.token); + } + return `${ENDPOINT.wsOrigin}/gateway/sigtermSleepProbe/websocket?${params}`; +} + +function client() { + return createClient({ + endpoint: RAW_ENDPOINT, + poolName: ENDPOINT.poolName, + }); +} + +async function waitForOpen(ws: WebSocket, timeoutMs: number): Promise { + if (ws.readyState === WebSocket.OPEN) return; + await new Promise((resolve, reject) => { + const timeout = setTimeout( + () => reject(new Error(`websocket open timeout after ${timeoutMs}ms`)), + timeoutMs, + ); + const cleanup = () => clearTimeout(timeout); + ws.addEventListener( + "open", + () => { + cleanup(); + resolve(); + }, + { once: true }, + ); + ws.addEventListener( + "error", + () => { + cleanup(); + reject(new Error("websocket error before open")); + }, + { once: true }, + ); + ws.addEventListener( + "close", + (event) => { + cleanup(); + reject( + new Error( + `websocket closed before open code=${event.code} reason=${event.reason}`, + ), + ); + }, + { once: true }, + ); + }); +} + +async function waitForMessage( + ws: WebSocket, + predicate: (message: JsonRecord) => boolean, + timeoutMs: number, + label: string, +): Promise { + return await new Promise((resolve, reject) => { + const timeout = setTimeout( + () => reject(new Error(`${label} message timeout after ${timeoutMs}ms`)), + timeoutMs, + ); + const cleanup = () => { + clearTimeout(timeout); + ws.removeEventListener("message", onMessage); + ws.removeEventListener("close", onClose); + }; + const onMessage = (event: MessageEvent) => { + const data = typeof event.data === "string" ? event.data : String(event.data); + console.log(`[ws:message] ${data}`); + let parsed: JsonRecord; + try { + parsed = JSON.parse(data) as JsonRecord; + } catch { + return; + } + if (!predicate(parsed)) return; + cleanup(); + resolve(parsed); + }; + const onClose = (event: CloseEvent) => { + cleanup(); + reject( + new Error( + `${label} closed while waiting code=${event.code} reason=${event.reason}`, + ), + ); + }; + ws.addEventListener("message", onMessage); + ws.addEventListener("close", onClose, { once: true }); + }); +} + +function waitForClose( + ws: WebSocket, + timeoutMs: number, + state: { + sawOnSleepStarted: boolean; + sawOnSleepFinished: boolean; + onSleepTickCount: number; + }, +): Promise { + return new Promise((resolve, reject) => { + const timeout = setTimeout( + () => reject(new Error(`websocket close timeout after ${timeoutMs}ms`)), + timeoutMs, + ); + ws.addEventListener("message", (event) => { + const data = typeof event.data === "string" ? event.data : String(event.data); + console.log(`[ws:message] ${data}`); + try { + const parsed = JSON.parse(data) as JsonRecord; + if (parsed.type === "onSleepStarted") { + state.sawOnSleepStarted = true; + } + if (parsed.type === "onSleepTick") { + state.onSleepTickCount += 1; + } + if (parsed.type === "onSleepFinished") { + state.sawOnSleepFinished = true; + } + } catch { + // Raw non-JSON messages are still logged above. + } + }); + ws.addEventListener( + "close", + (event) => { + clearTimeout(timeout); + resolve({ + code: event.code, + reason: event.reason, + wasClean: event.wasClean, + at: Date.now(), + }); + }, + { once: true }, + ); + }); +} + +async function connectAndPingPong( + label: string, + openTimeoutMs: number, + messageTimeoutMs: number, +): Promise { + const webSocketUrl = buildRawWebSocketUrl(); + console.log(`[ws] ${label} connecting url=${webSocketUrl}`); + const ws = new WebSocket(webSocketUrl, ["rivet", "rivet_encoding.json"]); + ws.addEventListener("error", () => { + console.error(`[ws:error] ${label}`); + }); + ws.addEventListener("close", (event) => { + console.log( + `[ws:close] ${label} code=${event.code} reason=${event.reason} wasClean=${event.wasClean}`, + ); + }); + + try { + await waitForOpen(ws, openTimeoutMs); + console.log(`[ws] ${label} open`); + await waitForMessage( + ws, + (message) => message.type === "welcome", + messageTimeoutMs, + `${label} welcome`, + ); + ws.send(JSON.stringify({ type: "ping", label, timestamp: Date.now() })); + await waitForMessage( + ws, + (message) => message.type === "pong", + messageTimeoutMs, + `${label} pong`, + ); + console.log(`[ws] ${label} ping pong ok`); + return ws; + } catch (error) { + if (ws.readyState === WebSocket.OPEN || ws.readyState === WebSocket.CONNECTING) { + ws.close(1000, `${label} failed`); + } + throw error; + } +} + +async function reconnectAndPingPong(): Promise { + console.log(`[ws] reconnect immediate timeoutMs=${RECONNECT_TIMEOUT_MS}`); + return await connectAndPingPong( + "reconnect", + Math.min(OPEN_TIMEOUT_MS, RECONNECT_TIMEOUT_MS), + Math.min(MESSAGE_TIMEOUT_MS, RECONNECT_TIMEOUT_MS), + ); +} + +function assertClose(close: CloseInfo, startedAt: number): void { + const elapsedMs = close.at - startedAt; + if (close.code !== CLOSE_CODE || close.reason !== CLOSE_REASON) { + throw new Error( + `expected close code=${CLOSE_CODE} reason=${CLOSE_REASON}; got code=${close.code} reason=${close.reason}`, + ); + } + if (elapsedMs < ON_SLEEP_DURATION_MS) { + throw new Error( + `websocket closed too early: ${elapsedMs}ms < ${ON_SLEEP_DURATION_MS}ms`, + ); + } + if (elapsedMs > ON_SLEEP_DURATION_MS + 120_000) { + throw new Error( + `websocket closed too late: ${elapsedMs}ms > ${ON_SLEEP_DURATION_MS + 120_000}ms`, + ); + } +} + +function assertProof(proof: Proof): void { + const events = proof.rows.map((row) => row.event); + const start = proof.rows.find((row) => row.event === "on-sleep-start"); + const afterAwait = proof.rows.find( + (row) => row.event === "on-sleep-after-await", + ); + const finish = proof.rows.find((row) => row.event === "on-sleep-finish"); + const ticks = proof.rows.filter((row) => row.event === "on-sleep-tick"); + + if (proof.state.sleepCount < 1) { + throw new Error(`expected sleepCount >= 1, got ${proof.state.sleepCount}`); + } + if (proof.state.onSleepLastError !== null) { + throw new Error(`onSleep error: ${proof.state.onSleepLastError}`); + } + if (!start || !afterAwait || !finish) { + throw new Error(`missing onSleep proof rows. saw events=${events.join(",")}`); + } + + const elapsedMs = afterAwait.created_at - start.created_at; + if (elapsedMs < ON_SLEEP_DURATION_MS) { + throw new Error( + `onSleep proof delay too short: ${elapsedMs}ms < ${ON_SLEEP_DURATION_MS}ms`, + ); + } + if (finish.created_at < afterAwait.created_at) { + throw new Error("on-sleep-finish row was written before async row"); + } + + const expectedTicks = Math.ceil(ON_SLEEP_DURATION_MS / ON_SLEEP_TICK_MS); + if (ticks.length < expectedTicks) { + throw new Error( + `expected at least ${expectedTicks} on-sleep-tick rows, got ${ticks.length}`, + ); + } +} + +async function main(): Promise { + if (ON_SLEEP_DURATION_MS <= 0) { + throw new Error("--on-sleep-duration-ms must be positive"); + } + if (ON_SLEEP_TICK_MS <= 0) { + throw new Error("--on-sleep-tick-ms must be positive"); + } + + console.log( + `[config] endpoint=${ENDPOINT.endpoint} namespace=${ENDPOINT.namespace} pool=${ENDPOINT.poolName} key=${KEY} durationMs=${ON_SLEEP_DURATION_MS} tickMs=${ON_SLEEP_TICK_MS} openDelayMs=${OPEN_DELAY_MS} reconnectTimeoutMs=${RECONNECT_TIMEOUT_MS} watchTimeoutMs=${WATCH_TIMEOUT_MS}`, + ); + + const handle = client().sigtermSleepProbe.getOrCreate([KEY]); + const actorId = await handle.resolve(); + console.log(`[actor] actorId=${actorId}`); + + const prepared = await handle.prepare( + LABEL, + ON_SLEEP_DURATION_MS, + ON_SLEEP_TICK_MS, + ); + console.log(`[actor] prepared=${JSON.stringify(prepared)}`); + + if (OPEN_DELAY_MS > 0) { + console.log(`[ws] waiting before open delayMs=${OPEN_DELAY_MS}`); + await sleep(OPEN_DELAY_MS); + } + const ws = await connectAndPingPong("initial", OPEN_TIMEOUT_MS, MESSAGE_TIMEOUT_MS); + console.log("[manual] roll/restart the remote kitchen-sink envoy pods now"); + + const state = { + sawOnSleepStarted: false, + sawOnSleepFinished: false, + onSleepTickCount: 0, + }; + const startedAt = Date.now(); + const close = await waitForClose(ws, WATCH_TIMEOUT_MS, state); + const elapsedMs = close.at - startedAt; + console.log( + `[ws:close] code=${close.code} reason=${close.reason} wasClean=${close.wasClean} elapsedMs=${elapsedMs}`, + ); + console.log( + `[ws] observed sleep messages started=${state.sawOnSleepStarted} ticks=${state.onSleepTickCount} finished=${state.sawOnSleepFinished}`, + ); + + const reconnect = await reconnectAndPingPong(); + reconnect.close(1000, "remote smoke done"); + + const proof = (await handle.getProof()) as Proof; + assertClose(close, startedAt); + assertProof(proof); + console.log(`[proof] ${JSON.stringify(proof.state)}`); + console.log("[done] PASS observed shutdown close, reconnected, and verified onSleep proof"); +} + +main().catch((error) => { + console.error(`[fail] ${formatError(error)}`); + process.exitCode = 1; +}); diff --git a/examples/kitchen-sink/scripts/on-sleep-sigterm-smoke.ts b/examples/kitchen-sink/scripts/on-sleep-sigterm-smoke.ts index ea4b4eced9..977f180178 100644 --- a/examples/kitchen-sink/scripts/on-sleep-sigterm-smoke.ts +++ b/examples/kitchen-sink/scripts/on-sleep-sigterm-smoke.ts @@ -13,6 +13,7 @@ // --endpoint http://127.0.0.1:6420 // --namespace default // --pool sigterm-sleep-test +// --reconnect-open-delay-ms 0 import { spawn, @@ -100,13 +101,18 @@ const WS_MESSAGE_TIMEOUT_MS = numberFromConfig( const RECONNECT_MESSAGE_TIMEOUT_MS = numberFromConfig( "reconnect-message-timeout-ms", "SIGTERM_SLEEP_RECONNECT_MESSAGE_TIMEOUT_MS", - 250, + 5_000, ); const RECONNECT_TIMEOUT_MS = numberFromConfig( "reconnect-timeout-ms", "SIGTERM_SLEEP_RECONNECT_TIMEOUT_MS", 5_000, ); +const RECONNECT_OPEN_DELAY_MS = numberFromConfig( + "reconnect-open-delay-ms", + "SIGTERM_SLEEP_RECONNECT_OPEN_DELAY_MS", + 0, +); const CLOSE_REASON = "actor stopped"; const CLOSE_CODE = 1000; let currentPhase: @@ -343,13 +349,21 @@ function buildEnvoysUrl(): string { return url.toString(); } -function buildWebSocketUrl(actorId: string): string { - const tokenSegment = TOKEN ? `@${encodeURIComponent(TOKEN)}` : ""; +function buildWebSocketUrl(_actorId: string): string { const url = appendPath( ENDPOINT, - `/gateway/${encodeURIComponent(actorId)}${tokenSegment}/websocket`, + `/gateway/sigtermSleepProbe/websocket`, ); url.protocol = url.protocol === "https:" ? "wss:" : "ws:"; + url.searchParams.set("rvt-namespace", NAMESPACE); + url.searchParams.set("rvt-method", "getOrCreate"); + url.searchParams.set("rvt-runner", POOL_NAME); + url.searchParams.set("rvt-key", KEY); + url.searchParams.set("rvt-crash-policy", "sleep"); + url.searchParams.set("rvt-skip-ready-wait", "true"); + if (TOKEN) { + url.searchParams.set("rvt-token", TOKEN); + } return url.toString(); } @@ -639,6 +653,10 @@ async function connectAndPingPong( messageTimeoutMs, `${label} pong`, ); + ws.addEventListener("message", (event) => { + const data = typeof event.data === "string" ? event.data : String(event.data); + console.log(`[ws:message] ${label} ${data}`); + }); console.log(`[ws] ${label} ping pong ok`); return ws; } catch (error) { @@ -654,8 +672,12 @@ async function reconnectAndPingPong( timeoutMs: number, ): Promise { console.log( - `[ws] reconnect strict timeoutMs=${timeoutMs} messageTimeoutMs=${RECONNECT_MESSAGE_TIMEOUT_MS}`, + `[ws] reconnect strict timeoutMs=${timeoutMs} messageTimeoutMs=${RECONNECT_MESSAGE_TIMEOUT_MS} openDelayMs=${RECONNECT_OPEN_DELAY_MS}`, ); + if (RECONNECT_OPEN_DELAY_MS > 0) { + console.log(`[ws] reconnect waiting before open delayMs=${RECONNECT_OPEN_DELAY_MS}`); + await sleep(RECONNECT_OPEN_DELAY_MS); + } return await connectAndPingPong( actorId, "reconnect", @@ -754,9 +776,12 @@ async function main(): Promise { if (ON_SLEEP_TICK_MS <= 0) { throw new Error("SIGTERM_SLEEP_ON_SLEEP_TICK_MS must be positive"); } + if (RECONNECT_OPEN_DELAY_MS < 0) { + throw new Error("SIGTERM_SLEEP_RECONNECT_OPEN_DELAY_MS must be non-negative"); + } console.log( - `[test] endpoint=${ENDPOINT} namespace=${NAMESPACE} pool=${POOL_NAME} key=${KEY} durationMs=${ON_SLEEP_DURATION_MS} tickMs=${ON_SLEEP_TICK_MS}`, + `[test] endpoint=${ENDPOINT} namespace=${NAMESPACE} pool=${POOL_NAME} key=${KEY} durationMs=${ON_SLEEP_DURATION_MS} tickMs=${ON_SLEEP_TICK_MS} reconnectOpenDelayMs=${RECONNECT_OPEN_DELAY_MS}`, ); let runner1: ChildProcessWithoutNullStreams | undefined; diff --git a/examples/kitchen-sink/scripts/raw-websocket-serverless-smoke.ts b/examples/kitchen-sink/scripts/raw-websocket-serverless-smoke.ts index 32d651de4a..651229529d 100644 --- a/examples/kitchen-sink/scripts/raw-websocket-serverless-smoke.ts +++ b/examples/kitchen-sink/scripts/raw-websocket-serverless-smoke.ts @@ -19,7 +19,7 @@ const SERVERLESS_URL = process.env.RIVET_SERVERLESS_URL; const NAMESPACE = process.env.SMOKE_NAMESPACE ?? process.env.RIVET_NAMESPACE ?? "default"; const TOKEN = process.env.SMOKE_TOKEN ?? process.env.RIVET_TOKEN ?? "dev"; -const POOL_NAME = process.env.SMOKE_POOL ?? process.env.RIVET_POOL ?? "default"; +const POOL_NAME = process.env.SMOKE_POOL ?? process.env.RIVET_POOL ?? "k8s"; const KEY = process.env.SMOKE_KEY ?? `raw-ws-serverless-smoke-${Date.now()}`; const DURATION_MS = Number(process.env.SMOKE_DURATION_MS ?? "120000"); const PARALLELISM = Number(process.env.SMOKE_PARALLELISM ?? "1"); diff --git a/examples/kitchen-sink/scripts/sqlite-cold-start-bench.ts b/examples/kitchen-sink/scripts/sqlite-cold-start-bench.ts index e4dd3a6f48..6f00b88dbc 100644 --- a/examples/kitchen-sink/scripts/sqlite-cold-start-bench.ts +++ b/examples/kitchen-sink/scripts/sqlite-cold-start-bench.ts @@ -452,7 +452,7 @@ async function configureLocalRunner(endpoint: string): Promise { const datacenter = datacentersBody.datacenters[0]?.name; if (!datacenter) throw new Error("local engine returned no datacenters"); - const response = await fetch(`${base}/runner-configs/default?namespace=default`, { + const response = await fetch(`${base}/runner-configs/k8s?namespace=default`, { method: "PUT", headers: { Authorization: "Bearer dev", @@ -468,7 +468,7 @@ async function configureLocalRunner(endpoint: string): Promise { }); if (!response.ok) { throw new Error( - `failed to configure local default runner: ${response.status} ${await response.text()}`, + `failed to configure local k8s runner: ${response.status} ${await response.text()}`, ); } } @@ -478,7 +478,7 @@ async function waitForEnvoy(endpoint: string): Promise { const deadline = Date.now() + 15_000; while (Date.now() < deadline) { - const response = await fetch(`${base}/envoys?namespace=default&name=default`, { + const response = await fetch(`${base}/envoys?namespace=default&name=k8s`, { headers: { Authorization: "Bearer dev" }, }); if (response.ok) { diff --git a/examples/kitchen-sink/scripts/sqlite-memory-soak.ts b/examples/kitchen-sink/scripts/sqlite-memory-soak.ts index a450f09bf6..857e8ef2f6 100644 --- a/examples/kitchen-sink/scripts/sqlite-memory-soak.ts +++ b/examples/kitchen-sink/scripts/sqlite-memory-soak.ts @@ -674,7 +674,7 @@ async function startKitchenSinkServer( RIVET_ENDPOINT: args.endpoint, RIVET_TOKEN: process.env.RIVET_TOKEN ?? "dev", RIVET_NAMESPACE: process.env.RIVET_NAMESPACE ?? "default", - RIVET_POOL: process.env.RIVET_POOL ?? "default", + RIVET_POOL: process.env.RIVET_POOL ?? "k8s", RIVET_SERVERLESS_URL: serverlessUrl, RIVET_SERVERLESS_REQUEST_LIFESPAN: args.requestLifespanSeconds.toString(), @@ -748,7 +748,7 @@ async function configureServerlessRunner( const base = args.endpoint.replace(/\/$/, ""); const namespace = process.env.RIVET_NAMESPACE ?? "default"; const token = process.env.RIVET_TOKEN ?? "dev"; - const poolName = process.env.RIVET_POOL ?? "default"; + const poolName = process.env.RIVET_POOL ?? "k8s"; const datacentersResponse = await fetch(`${base}/datacenters?namespace=${namespace}`, { headers: { Authorization: `Bearer ${token}` }, }); @@ -1912,7 +1912,7 @@ async function main(): Promise { endpoint: args.endpoint, namespace: process.env.RIVET_NAMESPACE ?? "default", token: process.env.RIVET_TOKEN ?? "dev", - poolName: process.env.RIVET_POOL ?? "default", + poolName: process.env.RIVET_POOL ?? "k8s", }); if (args.preWorkloadWaitMs > 0) { diff --git a/examples/kitchen-sink/scripts/sqlite-realworld-bench.ts b/examples/kitchen-sink/scripts/sqlite-realworld-bench.ts index 5fe05832c0..6f2cab877c 100644 --- a/examples/kitchen-sink/scripts/sqlite-realworld-bench.ts +++ b/examples/kitchen-sink/scripts/sqlite-realworld-bench.ts @@ -1048,7 +1048,7 @@ async function configureLocalRunner(endpoint: string): Promise { const datacenter = datacentersBody.datacenters[0]?.name; if (!datacenter) throw new Error("local engine returned no datacenters"); - const response = await fetch(`${base}/runner-configs/default?namespace=default`, { + const response = await fetch(`${base}/runner-configs/k8s?namespace=default`, { method: "PUT", headers: { Authorization: "Bearer dev", @@ -1064,7 +1064,7 @@ async function configureLocalRunner(endpoint: string): Promise { }); if (!response.ok) { throw new Error( - `failed to configure local default runner: ${response.status} ${await response.text()}`, + `failed to configure local k8s runner: ${response.status} ${await response.text()}`, ); } } @@ -1074,7 +1074,7 @@ async function waitForEnvoy(endpoint: string): Promise { const deadline = Date.now() + 15_000; while (Date.now() < deadline) { - const response = await fetch(`${base}/envoys?namespace=default&name=default`, { + const response = await fetch(`${base}/envoys?namespace=default&name=k8s`, { headers: { Authorization: "Bearer dev" }, }); if (response.ok) { diff --git a/examples/kitchen-sink/src/actors/counter/counter.ts b/examples/kitchen-sink/src/actors/counter/counter.ts index 0cd384f36f..a68a944b76 100644 --- a/examples/kitchen-sink/src/actors/counter/counter.ts +++ b/examples/kitchen-sink/src/actors/counter/counter.ts @@ -1,10 +1,23 @@ -import { actor, event } from "rivetkit"; +import { actor, event, type RivetMessageEvent, type UniversalWebSocket } from "rivetkit"; export const counter = actor({ + options: { + canHibernateWebSocket: false, + sleepGracePeriod: 5_000, + }, state: { count: 0 }, events: { newCount: event(), }, + onWebSocket(_c, websocket: UniversalWebSocket) { + // Plain echo for the rtt counter-latency harness. Any message in → + // the same payload back out. No state mutation, no awaits — keeps the + // echo path as close to raw WS RTT as possible. + websocket.addEventListener("message", (event: RivetMessageEvent) => { + if (websocket.readyState !== 1) return; + websocket.send(event.data as string | ArrayBuffer); + }); + }, actions: { increment: (c, x: number) => { c.state.count += x; diff --git a/examples/kitchen-sink/src/actors/testing/sigterm-sleep-probe.ts b/examples/kitchen-sink/src/actors/testing/sigterm-sleep-probe.ts index d90d742887..4aaf1666c8 100644 --- a/examples/kitchen-sink/src/actors/testing/sigterm-sleep-probe.ts +++ b/examples/kitchen-sink/src/actors/testing/sigterm-sleep-probe.ts @@ -146,6 +146,19 @@ export const sigtermSleepProbe = actor({ }); try { + for (const websocket of c.vars.websockets) { + if (websocket.readyState !== 1) continue; + websocket.send( + JSON.stringify({ + type: "onSleepStarted", + sleepCount, + onSleepDurationMs: c.state.onSleepDurationMs, + onSleepTickMs: c.state.onSleepTickMs, + timestamp: startedAt, + }), + ); + } + await c.db.execute( "INSERT INTO sigterm_sleep_log (event, sleep_count, detail, created_at) VALUES (?, ?, ?, ?)", "on-sleep-start", diff --git a/rivetkit-typescript/packages/rivetkit-napi/index.d.ts b/rivetkit-typescript/packages/rivetkit-napi/index.d.ts index 3f5568efdc..576f632135 100644 --- a/rivetkit-typescript/packages/rivetkit-napi/index.d.ts +++ b/rivetkit-typescript/packages/rivetkit-napi/index.d.ts @@ -316,7 +316,7 @@ export declare class CoreRegistry { * separately to avoid re-entrancy. */ shutdown(): Promise - actorStopThresholdMs(): Promise + actorStopThresholdMs(): Promise health(): Promise metadata(): JsRegistryRouteResponse metrics(): JsRegistryRouteResponse