diff --git a/examples/kitchen-sink/frontend/App.tsx b/examples/kitchen-sink/frontend/App.tsx index bd8cee5e0f..95cd5f957d 100644 --- a/examples/kitchen-sink/frontend/App.tsx +++ b/examples/kitchen-sink/frontend/App.tsx @@ -97,6 +97,7 @@ type ActorPanelActor = { handle: { action: (request: { name: string; args: unknown[] }) => Promise; fetch: (input: RequestInfo | URL, init?: RequestInit) => Promise; + resolve: () => Promise; webSocket: () => Promise; } | null; connection: { @@ -424,6 +425,7 @@ function ActorView({
+ {stateAction && ( { + if (!actor.handle) return; + + setIsChecking(true); + const start = performance.now(); + try { + const actorId = await actor.handle.resolve(); + const url = appendEndpointPath( + endpoint, + `/gateway/${encodeURIComponent(actorId)}/health`, + ); + const response = await fetch(url); + const text = await response.text(); + const latencyMs = performance.now() - start; + setResult( + `Status ${response.status} in ${latencyMs.toFixed(0)}ms${text ? `\n${text}` : ""}`, + ); + } catch (error) { + setResult( + `Error: ${error instanceof Error ? error.message : String(error)}`, + ); + } finally { + setIsChecking(false); + } + }, [actor.handle, endpoint]); + + return ( +
+
+ Health + +
+
{result}
+
+ ); +} + // ── State Panel ─────────────────────────────────── function StatePanel({ @@ -1078,8 +1137,10 @@ function MockAgenticLoopPanel({ page }: { page: PageConfig }) { const [lastVerification, setLastVerification] = useState("No requests yet."); const [lastHistory, setLastHistory] = useState("No history loaded yet."); const [lastBypass, setLastBypass] = useState("No bypass requests yet."); + const [lastHealth, setLastHealth] = useState("No health checks yet."); const [isConnecting, setIsConnecting] = useState(false); const [isRunningInference, setIsRunningInference] = useState(false); + const [isCheckingHealth, setIsCheckingHealth] = useState(false); const [stats, setStats] = useState({ requests: 0, expectedRows: 0, @@ -1088,6 +1149,8 @@ function MockAgenticLoopPanel({ page }: { page: PageConfig }) { maxReconnectMs: 0, sleepPosts: 0, sleepErrors: 0, + httpOk: 0, + wsOk: 0, bypassHttpOk: 0, bypassWsOk: 0, actorStopping: 0, @@ -1189,6 +1252,7 @@ function MockAgenticLoopPanel({ page }: { page: PageConfig }) { setLastVerification("No requests yet."); setLastHistory("No history loaded yet."); setLastBypass("No bypass requests yet."); + setLastHealth("No health checks yet."); setStats({ requests: 0, expectedRows: 0, @@ -1197,6 +1261,8 @@ function MockAgenticLoopPanel({ page }: { page: PageConfig }) { maxReconnectMs: 0, sleepPosts: 0, sleepErrors: 0, + httpOk: 0, + wsOk: 0, bypassHttpOk: 0, bypassWsOk: 0, actorStopping: 0, @@ -1207,6 +1273,36 @@ function MockAgenticLoopPanel({ page }: { page: PageConfig }) { setLogs([]); }, [clearProgressTimer]); + const checkHealth = useCallback(async () => { + if (!actorId) { + addLog("error", "resolve an actor before checking health"); + return; + } + + const url = appendEndpointPath( + endpoint, + `/gateway/${encodeURIComponent(actorId)}/health`, + ); + const start = performance.now(); + setIsCheckingHealth(true); + addLog("info", "health check sent"); + try { + const response = await fetch(url, { + headers: token ? { "x-rivet-token": token } : undefined, + }); + const text = await response.text(); + const message = `health ${response.status} in ${(performance.now() - start).toFixed(0)}ms${text ? `: ${text}` : ""}`; + setLastHealth(message); + addLog(response.ok ? "ok" : "error", message); + } catch (error) { + const message = `health failed: ${error instanceof Error ? error.message : String(error)}`; + setLastHealth(message); + addLog("error", message); + } finally { + setIsCheckingHealth(false); + } + }, [actorId, addLog, endpoint, token]); + const requestHistory = useCallback(() => { if (socketRef.current?.readyState !== WebSocket.OPEN) return; socketRef.current.send(JSON.stringify({ type: "history" })); @@ -1393,7 +1489,9 @@ function MockAgenticLoopPanel({ page }: { page: PageConfig }) { token, encoding: "json", }); - const handle = client.mockAgenticLoop.getOrCreate([key]) as AgenticHandle; + const handle = client.mockAgenticLoop.getOrCreate([ + key, + ]) as unknown as AgenticHandle; handleRef.current = handle; const resolvedActorId = await handle.resolve(); setActorId(resolvedActorId); @@ -1527,10 +1625,10 @@ function MockAgenticLoopPanel({ page }: { page: PageConfig }) { } }, [actorId, addLog, endpoint, namespace, token]); - const noteActorStopping = useCallback((label: string, status: number, text: string) => { + const noteActorStopping = useCallback((message: string, text: string) => { setStats((prev) => ({ ...prev, actorStopping: prev.actorStopping + 1 })); - setLastBypass(`${label}: actor.stopping (${status})`); - addLog("warn", `${label} actor.stopping ${text}`); + setLastBypass(message); + addLog("warn", `${message} ${text}`); }, [addLog]); const testHttpBypass = useCallback(async () => { @@ -1539,18 +1637,22 @@ function MockAgenticLoopPanel({ page }: { page: PageConfig }) { addLog("error", "connect before testing bypass"); return; } + const start = performance.now(); + addLog("info", "http bypass sent"); try { const response = await handle.fetch("/bypass", { gateway: { bypassConnectable: true }, }); const text = await response.text(); + const latencyMs = performance.now() - start; if (!response.ok) { + const message = `http bypass ${response.status} in ${latencyMs.toFixed(0)}ms: ${text}`; if (text.includes('"code":"stopping"')) { - noteActorStopping("http bypass", response.status, text); + noteActorStopping(message, text); return; } - setLastBypass(`http bypass failed ${response.status}: ${text}`); - addLog("error", `http bypass ${response.status}: ${text}`); + setLastBypass(message); + addLog("error", message); return; } const payload = JSON.parse(text) as { @@ -1569,10 +1671,9 @@ function MockAgenticLoopPanel({ page }: { page: PageConfig }) { sleepProofHttp: prev.sleepProofHttp + (sleepStatus.sleepStarted ? 1 : 0), })); - setLastBypass( - `http bypass ok: sleepStarted=${sleepStatus.sleepStarted}`, - ); - addLog("ok", `http bypass sleepStarted=${sleepStatus.sleepStarted}`); + const message = `http bypass ${response.status} in ${latencyMs.toFixed(0)}ms: ok`; + setLastBypass(message); + addLog("ok", message); } catch (error) { const message = error instanceof Error ? error.message : String(error); setLastBypass(`http bypass error: ${message}`); @@ -1580,6 +1681,54 @@ function MockAgenticLoopPanel({ page }: { page: PageConfig }) { } }, [addLog, noteActorStopping]); + const testHttp = useCallback(async () => { + const handle = handleRef.current; + if (!handle) { + addLog("error", "connect before testing http"); + return; + } + const start = performance.now(); + addLog("info", "http sent"); + try { + const response = await handle.fetch("/bypass"); + const text = await response.text(); + const latencyMs = performance.now() - start; + if (!response.ok) { + const message = `http ${response.status} in ${latencyMs.toFixed(0)}ms: ${text}`; + if (text.includes('"code":"stopping"')) { + noteActorStopping(message, text); + return; + } + setLastBypass(message); + addLog("error", message); + return; + } + const payload = JSON.parse(text) as { + type?: string; + transport?: string; + sleepStarted?: unknown; + sleepStartedAt?: unknown; + }; + const sleepStatus = sleepStatusFromPayload("http", payload); + if (payload.type !== "bypass" || payload.transport !== "http") { + throw new Error(`unexpected body ${text}`); + } + setStats((prev) => ({ + ...prev, + httpOk: prev.httpOk + 1, + sleepProofHttp: + prev.sleepProofHttp + (sleepStatus.sleepStarted ? 1 : 0), + })); + const message = `http ${response.status} in ${latencyMs.toFixed(0)}ms: ok`; + setLastBypass(message); + addLog("ok", message); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + setLastBypass(`http error: ${message}`); + addLog("error", `http error: ${message}`); + } + }, [addLog, noteActorStopping]); + const testWebSocketBypass = useCallback(async () => { const handle = handleRef.current; if (!handle) { @@ -1657,6 +1806,81 @@ function MockAgenticLoopPanel({ page }: { page: PageConfig }) { } }, [addLog]); + const testWebSocket = useCallback(async () => { + const handle = handleRef.current; + if (!handle) { + addLog("error", "connect before testing ws"); + return; + } + const probeId = crypto.randomUUID(); + let socket: WebSocket | null = null; + try { + socket = await handle.webSocket("/bypass"); + await waitForSocketOpen(socket); + const result = await new Promise>( + (resolve, reject) => { + const timeout = setTimeout(() => { + cleanup(); + reject(new Error("timed out waiting for ws pong")); + }, 10_000); + const cleanup = () => { + clearTimeout(timeout); + socket?.removeEventListener("message", onMessage); + socket?.removeEventListener("close", onClose); + socket?.removeEventListener("error", onError); + }; + const onMessage = (event: MessageEvent) => { + if (typeof event.data !== "string") return; + const message = JSON.parse(event.data) as AgenticServerMessage; + if (message.type !== "pong" || message.probeId !== probeId) return; + cleanup(); + resolve(message); + }; + const onClose = (event: CloseEvent) => { + cleanup(); + reject( + new Error(`closed code=${event.code} reason=${event.reason}`), + ); + }; + const onError = () => { + cleanup(); + reject(new Error("websocket error")); + }; + socket?.addEventListener("message", onMessage); + socket?.addEventListener("close", onClose, { once: true }); + socket?.addEventListener("error", onError, { once: true }); + socket?.send(JSON.stringify({ type: "ping", probeId })); + }, + ); + const sleepStatus = sleepStatusFromPayload("ws", result); + setStats((prev) => ({ + ...prev, + wsOk: prev.wsOk + 1, + sleepProofWs: prev.sleepProofWs + (sleepStatus.sleepStarted ? 1 : 0), + })); + setLastBypass(`ws ok: sleepStarted=${sleepStatus.sleepStarted}`); + addLog("ok", `ws sleepStarted=${sleepStatus.sleepStarted}`); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + if (message.includes("actor.stopping") || message.includes("Server Error")) { + setStats((prev) => ({ ...prev, actorStopping: prev.actorStopping + 1 })); + setLastBypass(`ws transient close: ${message}`); + addLog("warn", `ws transient close: ${message}`); + } else { + setLastBypass(`ws error: ${message}`); + addLog("error", `ws error: ${message}`); + } + } finally { + if ( + socket && + (socket.readyState === WebSocket.OPEN || + socket.readyState === WebSocket.CONNECTING) + ) { + socket.close(1000, "ws probe complete"); + } + } + }, [addLog]); + useEffect(() => { return () => { if (reconnectTimerRef.current) clearTimeout(reconnectTimerRef.current); @@ -1817,6 +2041,15 @@ function MockAgenticLoopPanel({ page }: { page: PageConfig }) { + + + @@ -1825,6 +2058,7 @@ function MockAgenticLoopPanel({ page }: { page: PageConfig }) {
{lastBypass}
+
{lastHealth}
@@ -1872,6 +2106,8 @@ function MockAgenticLoopPanel({ page }: { page: PageConfig }) { + + diff --git a/examples/kitchen-sink/src/server.ts b/examples/kitchen-sink/src/server.ts index ea5b8cb8ef..521c4ea6da 100644 --- a/examples/kitchen-sink/src/server.ts +++ b/examples/kitchen-sink/src/server.ts @@ -6,6 +6,10 @@ import * as v8 from "node:v8"; const app = new Hono(); const port = Number.parseInt(process.env.PORT ?? "3000", 10); +const serverlessMode = + process.env.RIVET_RUN_ENGINE === "1" || + process.env.RIVET_SERVERLESS_URL !== undefined || + process.env.KITCHEN_SINK_SERVERLESS_URL !== undefined; process.on("exit", (code) => { console.log(JSON.stringify({ kind: "process_exit", code, pid: process.pid })); @@ -139,25 +143,35 @@ app.post("/debug/heap-snapshot", (c) => { app.use("*", async (c, next) => { const startedAt = Date.now(); await next(); - console.log( - JSON.stringify({ - kind: "request", - method: c.req.method, - path: new URL(c.req.url).pathname, - headers: requestHeaders(c.req.raw.headers), - status: c.res.status, - durationMs: Date.now() - startedAt, - }), - ); + // console.log( + // JSON.stringify({ + // kind: "request", + // method: c.req.method, + // path: new URL(c.req.url).pathname, + // headers: requestHeaders(c.req.raw.headers), + // status: c.res.status, + // durationMs: Date.now() - startedAt, + // }), + // ); }); -app.all("/api/rivet/*", (c) => registry.handler(c.req.raw)); -app.all("/api/rivet", (c) => registry.handler(c.req.raw)); +if (serverlessMode) { + app.all("/api/rivet/*", (c) => registry.handler(c.req.raw)); + app.all("/api/rivet", (c) => registry.handler(c.req.raw)); +} else { + registry.start(); +} const server = serve({ fetch: app.fetch, port }, () => { - console.log( - `serverless RivetKit listening on http://127.0.0.1:${port}/api/rivet`, - ); + if (serverlessMode) { + console.log( + `serverless RivetKit listening on http://127.0.0.1:${port}/api/rivet`, + ); + } else { + console.log( + `kitchen sink diagnostics listening on http://127.0.0.1:${port}`, + ); + } }); const httpServer = server as unknown as HttpServer; httpServer.requestTimeout = 0;