diff --git a/examples/kitchen-sink/frontend/App.tsx b/examples/kitchen-sink/frontend/App.tsx index f21cf21401..9bc910a3bc 100644 --- a/examples/kitchen-sink/frontend/App.tsx +++ b/examples/kitchen-sink/frontend/App.tsx @@ -1,4 +1,5 @@ import { createRivetKit } from "@rivetkit/react"; +import { createClient } from "rivetkit/client"; import mermaid from "mermaid"; import { Highlight, themes } from "prism-react-renderer"; import { @@ -79,7 +80,8 @@ function MermaidDiagram({ chart }: { chart: string }) { } const rivetEndpoint = - import.meta.env.VITE_RIVET_ENDPOINT ?? "http://localhost:6420"; + import.meta.env.VITE_RIVET_ENDPOINT ?? + `${globalThis.location.origin}/api/rivet`; const { useActor } = createRivetKit(rivetEndpoint); @@ -242,6 +244,9 @@ function DemoPanel({ page }: { page: PageConfig }) { if (page.demo === "diagram") { return ; } + if (page.demo === "mock-agentic-loop") { + return ; + } if (page.actors.length === 0) { return ; } @@ -766,6 +771,1106 @@ function ActionRunner({ ); } +type AgenticEntry = { + request_id: string; + idx: number; + created_at: number; +}; + +type AgenticVerification = { + requestId: string; + expectedSeconds: number; + count: number; + contiguous?: boolean; + missing?: number[]; + indexes: number[]; + ok?: boolean; +}; + +type AgenticHistory = { + type: "history"; + totalRows: number; + entries: AgenticEntry[]; + timestamp: number; +}; + +type AgenticDebugEvent = { + type: "debugEvent"; + eventId: string; + name: string; + actorId: string; + connectionId: string | null; + requestId: string | null; + details: Record; + createdAt: number; + replayed: boolean; +}; + +type AgenticServerMessage = + | { type: "hello"; connectionId: string; timestamp: number } + | AgenticHistory + | AgenticDebugEvent + | { + type: "pong"; + probeId: string; + sleepStarted: boolean; + sleepStartedAt: number | null; + timestamp: number; + } + | { type: "started"; requestId: string; seconds: number; timestamp: number } + | { + type: "progress"; + requestId: string; + idx: number; + seconds: number; + createdAt: number; + } + | { + type: "done"; + requestId: string; + seconds: number; + timestamp: number; + verification: AgenticVerification; + } + | (AgenticVerification & { type: "verified" }) + | { type: "error"; message: string; timestamp: number }; + +type AgenticRequest = { + requestId: string; + seconds: number; +}; + +type AgenticHandle = { + resolve: () => Promise; + webSocket: ( + path?: string, + protocols?: string | string[], + options?: { + gateway?: { bypassConnectable?: boolean }; + }, + ) => Promise; + fetch: ( + input: string, + init?: RequestInit & { + gateway?: { bypassConnectable?: boolean }; + }, + ) => Promise; + verify: ( + requestId: string, + expectedSeconds: number, + ) => Promise; + verifyAll: (expectedRequests: AgenticRequest[]) => Promise<{ + type: "verifiedAll"; + expectedRequests: number; + expectedTotalRows: number; + totalRows: number; + unexpectedRequestIds: string[]; + requests: AgenticVerification[]; + ok: boolean; + }>; +}; + +type ActiveAgenticRequest = { + requestId: string; + seconds: number; + expectedIdx: number; + received: number[]; + lastProgressAt: number; + startedAt: number; +}; + +type AgenticLogEntry = { + id: string; + level: "ok" | "warn" | "error" | "info"; + message: string; + time: string; +}; + +function randomAgenticKey() { + return `manual-agentic-${new Date().toISOString()}-${crypto.randomUUID()}`; +} + +function nowTime() { + return new Date().toLocaleTimeString(); +} + +function appendEndpointPath(endpoint: string, path: string): URL { + const url = new URL(endpoint); + const prefix = url.pathname.replace(/\/$/, ""); + url.pathname = `${prefix}${path}`; + url.search = ""; + url.hash = ""; + return url; +} + +function waitForSocketOpen(socket: WebSocket, timeoutMs = 10_000) { + if (socket.readyState === WebSocket.OPEN) return Promise.resolve(); + + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + cleanup(); + reject(new Error(`websocket open timed out after ${timeoutMs}ms`)); + }, timeoutMs); + const cleanup = () => { + clearTimeout(timeout); + socket.removeEventListener("open", onOpen); + socket.removeEventListener("close", onClose); + socket.removeEventListener("error", onError); + }; + const onOpen = () => { + cleanup(); + resolve(); + }; + const onClose = (event: CloseEvent) => { + cleanup(); + reject( + new Error( + `websocket closed before open code=${event.code} reason=${event.reason}`, + ), + ); + }; + const onError = () => { + cleanup(); + reject(new Error("websocket open error")); + }; + socket.addEventListener("open", onOpen, { once: true }); + socket.addEventListener("close", onClose, { once: true }); + socket.addEventListener("error", onError, { once: true }); + }); +} + +function validateAgenticRows( + entries: AgenticEntry[], + expectedRequests: AgenticRequest[], + activeRequest?: ActiveAgenticRequest | null, +) { + const expectedByRequest = new Map( + expectedRequests.map((request) => [request.requestId, request.seconds]), + ); + const rowsByRequest = new Map(); + + for (const entry of entries) { + const rows = rowsByRequest.get(entry.request_id) ?? []; + rows.push(entry); + rowsByRequest.set(entry.request_id, rows); + } + + const problems: string[] = []; + for (const request of expectedRequests) { + const rows = rowsByRequest.get(request.requestId) ?? []; + const indexes = rows.map((row) => row.idx); + const contiguous = + rows.length === request.seconds && + indexes.every((idx, offset) => idx === offset + 1); + if (!contiguous) { + problems.push( + `${request.requestId.slice(0, 8)} expected ${request.seconds}, got [${indexes.join(", ")}]`, + ); + } + } + + if (activeRequest) { + const rows = rowsByRequest.get(activeRequest.requestId) ?? []; + const indexes = rows.map((row) => row.idx); + const contiguousPrefix = indexes.every( + (idx, offset) => idx === offset + 1, + ); + if (rows.length > activeRequest.seconds || !contiguousPrefix) { + problems.push( + `${activeRequest.requestId.slice(0, 8)} active request expected partial 1-${activeRequest.seconds}, got [${indexes.join(", ")}]`, + ); + } + } + + for (const requestId of rowsByRequest.keys()) { + if ( + !expectedByRequest.has(requestId) && + requestId !== activeRequest?.requestId + ) { + problems.push(`${requestId.slice(0, 8)} was not expected`); + } + } + + return { + ok: problems.length === 0, + problems, + expectedRows: expectedRequests.reduce( + (total, request) => total + request.seconds, + 0, + ) + (activeRequest?.received.length ?? 0), + }; + } + +function sleepStatusFromPayload( + source: string, + payload: { sleepStarted?: unknown; sleepStartedAt?: unknown }, +) { + if (typeof payload.sleepStarted !== "boolean") { + throw new Error(`${source} missing boolean sleepStarted`); + } + if (payload.sleepStarted && typeof payload.sleepStartedAt !== "number") { + throw new Error(`${source} missing numeric sleepStartedAt`); + } + if (!payload.sleepStarted && payload.sleepStartedAt !== null) { + throw new Error(`${source} expected null sleepStartedAt before sleep`); + } + return { + sleepStarted: payload.sleepStarted, + sleepStartedAt: payload.sleepStartedAt, + }; +} + +function formatDebugDetails(details: Record) { + const entries = Object.entries(details).filter( + ([, value]) => value !== undefined && value !== null, + ); + if (entries.length === 0) return ""; + + return ` ${entries + .map(([key, value]) => `${key}=${String(value)}`) + .join(" ")}`; +} + +function formatAgenticDebugEvent(event: AgenticDebugEvent) { + const actorTime = new Date(event.createdAt).toLocaleTimeString(); + const lagMs = Date.now() - event.createdAt; + const connection = event.connectionId + ? ` conn=${event.connectionId.slice(0, 8)}` + : ""; + const request = event.requestId + ? ` req=${event.requestId.slice(0, 8)}` + : ""; + const replay = event.replayed ? " replay" : ""; + + return `actor${replay} ${event.name} at ${actorTime} lagMs=${lagMs}${connection}${request}${formatDebugDetails(event.details)}`; +} + +function MockAgenticLoopPanel({ page }: { page: PageConfig }) { + const [endpoint, setEndpoint] = usePersistedState( + "kitchen-sink:mock-agentic-loop:endpoint", + rivetEndpoint, + ); + const [namespace, setNamespace] = usePersistedState( + "kitchen-sink:mock-agentic-loop:namespace", + "default", + ); + const [token, setToken] = usePersistedState( + "kitchen-sink:mock-agentic-loop:token", + "dev", + ); + const [key, setKey] = useState(randomAgenticKey); + const [actorId, setActorId] = useState(""); + const [connectionStatus, setConnectionStatus] = useState("idle"); + const [seconds, setSeconds] = useState(16); + const [progressMarginMs, setProgressMarginMs] = useState(8_000); + const [currentRequest, setCurrentRequest] = useState<{ + requestId: string; + seconds: number; + received: number[]; + } | null>(null); + const [expectedRequests, setExpectedRequests] = useState([]); + const [lastVerification, setLastVerification] = useState("No requests yet."); + const [lastHistory, setLastHistory] = useState("No history loaded yet."); + const [lastBypass, setLastBypass] = useState("No bypass requests yet."); + const [isConnecting, setIsConnecting] = useState(false); + const [isRunningInference, setIsRunningInference] = useState(false); + const [stats, setStats] = useState({ + requests: 0, + expectedRows: 0, + actualRows: 0, + reconnects: 0, + maxReconnectMs: 0, + sleepPosts: 0, + sleepErrors: 0, + bypassHttpOk: 0, + bypassWsOk: 0, + actorStopping: 0, + sleepProofHttp: 0, + sleepProofWs: 0, + validationErrors: 0, + }); + const [logs, setLogs] = useState([]); + + const handleRef = useRef(null); + const socketRef = useRef(null); + const expectedRequestsRef = useRef([]); + const activeRequestRef = useRef(null); + const reconnectTimerRef = useRef | null>(null); + const progressTimerRef = useRef | null>(null); + const reconnectStartedAtRef = useRef(null); + const mainSocketCleanupRef = useRef<(() => void) | null>(null); + const closedByUserRef = useRef(false); + + const addLog = useCallback( + (level: AgenticLogEntry["level"], message: string) => { + setLogs((prev) => [ + { + id: crypto.randomUUID(), + level, + message, + time: nowTime(), + }, + ...prev.slice(0, 159), + ]); + }, + [], + ); + + const clearProgressTimer = useCallback(() => { + if (progressTimerRef.current) { + clearTimeout(progressTimerRef.current); + progressTimerRef.current = null; + } + }, []); + + const markValidationError = useCallback((message: string) => { + setStats((prev) => ({ + ...prev, + validationErrors: prev.validationErrors + 1, + })); + setLastVerification(message); + addLog("error", message); + }, [addLog]); + + const scheduleProgressTimeout = useCallback(() => { + clearProgressTimer(); + const active = activeRequestRef.current; + if (!active) return; + const timeoutMs = 1_000 + progressMarginMs; + progressTimerRef.current = setTimeout(() => { + const latest = activeRequestRef.current; + if (!latest) return; + markValidationError( + `progress timeout for ${latest.requestId.slice(0, 8)} at idx=${latest.expectedIdx}`, + ); + }, timeoutMs); + }, [clearProgressTimer, markValidationError, progressMarginMs]); + + const resetSession = useCallback(() => { + closedByUserRef.current = true; + if (reconnectTimerRef.current) clearTimeout(reconnectTimerRef.current); + clearProgressTimer(); + mainSocketCleanupRef.current?.(); + mainSocketCleanupRef.current = null; + socketRef.current?.close(1000, "new actor"); + socketRef.current = null; + handleRef.current = null; + expectedRequestsRef.current = []; + activeRequestRef.current = null; + setKey(randomAgenticKey()); + setActorId(""); + setConnectionStatus("idle"); + setCurrentRequest(null); + setExpectedRequests([]); + setIsRunningInference(false); + setLastVerification("No requests yet."); + setLastHistory("No history loaded yet."); + setLastBypass("No bypass requests yet."); + setStats({ + requests: 0, + expectedRows: 0, + actualRows: 0, + reconnects: 0, + maxReconnectMs: 0, + sleepPosts: 0, + sleepErrors: 0, + bypassHttpOk: 0, + bypassWsOk: 0, + actorStopping: 0, + sleepProofHttp: 0, + sleepProofWs: 0, + validationErrors: 0, + }); + setLogs([]); + }, [clearProgressTimer]); + + const requestHistory = useCallback(() => { + if (socketRef.current?.readyState !== WebSocket.OPEN) return; + socketRef.current.send(JSON.stringify({ type: "history" })); + addLog("info", "history requested"); + }, [addLog]); + + const verifyAll = useCallback(async () => { + const handle = handleRef.current; + if (!handle) return; + const result = await handle.verifyAll(expectedRequestsRef.current); + if (!result.ok) { + markValidationError(`aggregate verification failed: ${formatJson(result)}`); + return; + } + setStats((prev) => ({ + ...prev, + actualRows: result.totalRows, + expectedRows: result.expectedTotalRows, + })); + addLog( + "ok", + `verified all requests=${result.expectedRequests} rows=${result.totalRows}`, + ); + }, [addLog, markValidationError]); + + const handleHistory = useCallback((message: AgenticHistory) => { + const validation = validateAgenticRows( + message.entries, + expectedRequestsRef.current, + activeRequestRef.current, + ); + setStats((prev) => ({ + ...prev, + actualRows: message.totalRows, + expectedRows: validation.expectedRows, + validationErrors: validation.ok + ? prev.validationErrors + : prev.validationErrors + 1, + })); + if (validation.ok) { + setLastHistory( + `history ok: rows=${message.totalRows}, expected=${validation.expectedRows}`, + ); + addLog("ok", `history rows=${message.totalRows}`); + } else { + const text = `history mismatch: ${validation.problems.join("; ")}`; + setLastHistory(text); + addLog("error", text); + } + }, [addLog]); + + const handleProgress = useCallback((message: Extract) => { + const active = activeRequestRef.current; + if (!active || active.requestId !== message.requestId) { + markValidationError(`unexpected progress for ${message.requestId.slice(0, 8)}`); + return; + } + const now = performance.now(); + const gapMs = now - active.lastProgressAt; + if (message.idx !== active.expectedIdx) { + markValidationError( + `expected idx=${active.expectedIdx}, got idx=${message.idx}`, + ); + } + active.received.push(message.idx); + active.expectedIdx += 1; + active.lastProgressAt = now; + setCurrentRequest({ + requestId: active.requestId, + seconds: active.seconds, + received: [...active.received], + }); + addLog( + "info", + `progress ${message.idx}/${message.seconds} gapMs=${gapMs.toFixed(0)}`, + ); + scheduleProgressTimeout(); + }, [addLog, markValidationError, scheduleProgressTimeout]); + + const handleDone = useCallback(async (message: Extract) => { + const active = activeRequestRef.current; + clearProgressTimer(); + setIsRunningInference(false); + activeRequestRef.current = null; + + if (!active || active.requestId !== message.requestId) { + markValidationError(`unexpected done for ${message.requestId.slice(0, 8)}`); + return; + } + + const contiguous = + active.received.length === active.seconds && + active.received.every((idx, offset) => idx === offset + 1); + if (!contiguous || !message.verification.ok) { + markValidationError( + `done verification failed: stream=[${active.received.join(", ")}], actor=${formatJson(message.verification)}`, + ); + return; + } + + const handle = handleRef.current; + if (handle) { + const explicit = await handle.verify(active.requestId, active.seconds); + const explicitOk = + explicit.count === active.seconds && + explicit.indexes.every((idx, offset) => idx === offset + 1); + if (!explicitOk) { + markValidationError( + `action verification failed: ${formatJson(explicit)}`, + ); + return; + } + } + + const completed = { + requestId: active.requestId, + seconds: active.seconds, + }; + expectedRequestsRef.current = [...expectedRequestsRef.current, completed]; + setExpectedRequests(expectedRequestsRef.current); + setStats((prev) => ({ + ...prev, + requests: prev.requests + 1, + expectedRows: prev.expectedRows + active.seconds, + })); + setLastVerification( + `request ${active.requestId.slice(0, 8)} ok: ${active.seconds}/${active.seconds} rows`, + ); + addLog( + "ok", + `done ${active.requestId.slice(0, 8)} rows=${active.seconds}`, + ); + await verifyAll(); + requestHistory(); + }, [addLog, clearProgressTimer, markValidationError, requestHistory, verifyAll]); + + const onSocketMessage = useCallback((event: MessageEvent) => { + if (typeof event.data !== "string") return; + const message = JSON.parse(event.data) as AgenticServerMessage; + if (message.type === "hello") { + addLog("ok", `main ws hello ${message.connectionId.slice(0, 8)}`); + return; + } + if (message.type === "history") { + handleHistory(message); + return; + } + if (message.type === "debugEvent") { + const level = + message.name === "onSleepStart" || message.name === "webSocketClose" + ? "warn" + : "info"; + addLog(level, formatAgenticDebugEvent(message)); + return; + } + if (message.type === "started") { + addLog("ok", `started ${message.requestId.slice(0, 8)} seconds=${message.seconds}`); + return; + } + if (message.type === "progress") { + handleProgress(message); + return; + } + if (message.type === "done") { + void handleDone(message); + return; + } + if (message.type === "error") { + markValidationError(`actor error: ${message.message}`); + } + }, [addLog, handleDone, handleHistory, handleProgress, markValidationError]); + + const connect = useCallback(async (countReconnect = false) => { + if (isConnecting) return; + setIsConnecting(true); + setConnectionStatus("connecting"); + closedByUserRef.current = false; + const startedAt = performance.now(); + + try { + const client = createClient({ + endpoint, + namespace, + token, + encoding: "json", + }); + const handle = client.mockAgenticLoop.getOrCreate([key]) as AgenticHandle; + handleRef.current = handle; + const resolvedActorId = await handle.resolve(); + setActorId(resolvedActorId); + + const socket = await handle.webSocket(); + await waitForSocketOpen(socket); + socketRef.current = socket; + const onClose = (event: CloseEvent) => { + if (socketRef.current === socket) socketRef.current = null; + setConnectionStatus("closed"); + const closedLocally = closedByUserRef.current; + addLog( + closedLocally ? "info" : "warn", + `${closedLocally ? "local" : "remote"} main ws close code=${event.code} reason=${event.reason}`, + ); + if (!closedLocally) { + reconnectStartedAtRef.current = performance.now(); + reconnectTimerRef.current = setTimeout(() => { + void connect(true); + }, 500); + } + }; + const onError = () => { + addLog("error", "main ws error"); + }; + socket.addEventListener("message", onSocketMessage); + socket.addEventListener("close", onClose); + socket.addEventListener("error", onError); + mainSocketCleanupRef.current = () => { + socket.removeEventListener("message", onSocketMessage); + socket.removeEventListener("close", onClose); + socket.removeEventListener("error", onError); + }; + + const elapsedMs = performance.now() - startedAt; + setConnectionStatus("connected"); + if (countReconnect || reconnectStartedAtRef.current !== null) { + const reconnectMs = reconnectStartedAtRef.current === null + ? elapsedMs + : performance.now() - reconnectStartedAtRef.current; + reconnectStartedAtRef.current = null; + setStats((prev) => ({ + ...prev, + reconnects: prev.reconnects + 1, + maxReconnectMs: Math.max(prev.maxReconnectMs, reconnectMs), + })); + addLog("ok", `reconnected in ${reconnectMs.toFixed(0)}ms`); + } else { + addLog("ok", `connected actor=${resolvedActorId}`); + } + requestHistory(); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + setConnectionStatus("error"); + addLog("error", `connect failed: ${message}`); + } finally { + setIsConnecting(false); + } + }, [addLog, endpoint, isConnecting, key, namespace, onSocketMessage, requestHistory, token]); + + const disconnect = useCallback(() => { + closedByUserRef.current = true; + if (reconnectTimerRef.current) clearTimeout(reconnectTimerRef.current); + clearProgressTimer(); + mainSocketCleanupRef.current?.(); + mainSocketCleanupRef.current = null; + socketRef.current?.close(1000, "manual disconnect"); + socketRef.current = null; + setConnectionStatus("closed"); + addLog("warn", "main ws disconnected by client"); + }, [addLog, clearProgressTimer]); + + const runInference = useCallback(() => { + const socket = socketRef.current; + if (!socket || socket.readyState !== WebSocket.OPEN) { + addLog("error", "main websocket is not connected"); + return; + } + if (activeRequestRef.current) { + addLog("warn", "inference already active"); + return; + } + const safeSeconds = Math.max(1, Math.floor(seconds)); + const requestId = crypto.randomUUID(); + activeRequestRef.current = { + requestId, + seconds: safeSeconds, + expectedIdx: 1, + received: [], + lastProgressAt: performance.now(), + startedAt: performance.now(), + }; + setCurrentRequest({ requestId, seconds: safeSeconds, received: [] }); + setIsRunningInference(true); + socket.send(JSON.stringify({ type: "infer", requestId, seconds: safeSeconds })); + addLog("info", `infer ${requestId.slice(0, 8)} seconds=${safeSeconds}`); + scheduleProgressTimeout(); + }, [addLog, scheduleProgressTimeout, seconds]); + + const forceSleep = useCallback(async () => { + if (!actorId) { + addLog("error", "resolve an actor before forcing sleep"); + return; + } + const url = appendEndpointPath( + endpoint, + `/actors/${encodeURIComponent(actorId)}/sleep`, + ); + url.searchParams.set("namespace", namespace); + setStats((prev) => ({ ...prev, sleepPosts: prev.sleepPosts + 1 })); + addLog("warn", "sleep post sent"); + try { + const response = await fetch(url, { + method: "POST", + headers: { + Authorization: token ? `Bearer ${token}` : "", + "content-type": "application/json", + }, + body: "{}", + }); + const text = await response.text(); + if (!response.ok) { + setStats((prev) => ({ ...prev, sleepErrors: prev.sleepErrors + 1 })); + addLog("error", `sleep ${response.status}: ${text}`); + return; + } + addLog("ok", `sleep ${response.status}`); + } catch (error) { + setStats((prev) => ({ ...prev, sleepErrors: prev.sleepErrors + 1 })); + addLog("error", `sleep failed: ${error instanceof Error ? error.message : String(error)}`); + } + }, [actorId, addLog, endpoint, namespace, token]); + + const noteActorStopping = useCallback((label: string, status: number, text: string) => { + setStats((prev) => ({ ...prev, actorStopping: prev.actorStopping + 1 })); + setLastBypass(`${label}: actor.stopping (${status})`); + addLog("warn", `${label} actor.stopping ${text}`); + }, [addLog]); + + const testHttpBypass = useCallback(async () => { + const handle = handleRef.current; + if (!handle) { + addLog("error", "connect before testing bypass"); + return; + } + try { + const response = await handle.fetch("/bypass", { + gateway: { bypassConnectable: true }, + }); + const text = await response.text(); + if (!response.ok) { + if (text.includes('"code":"stopping"')) { + noteActorStopping("http bypass", response.status, text); + return; + } + setLastBypass(`http bypass failed ${response.status}: ${text}`); + addLog("error", `http bypass ${response.status}: ${text}`); + return; + } + const payload = JSON.parse(text) as { + type?: string; + transport?: string; + sleepStarted?: unknown; + sleepStartedAt?: unknown; + }; + const sleepStatus = sleepStatusFromPayload("http bypass", payload); + if (payload.type !== "bypass" || payload.transport !== "http") { + throw new Error(`unexpected body ${text}`); + } + setStats((prev) => ({ + ...prev, + bypassHttpOk: prev.bypassHttpOk + 1, + sleepProofHttp: + prev.sleepProofHttp + (sleepStatus.sleepStarted ? 1 : 0), + })); + setLastBypass( + `http bypass ok: sleepStarted=${sleepStatus.sleepStarted}`, + ); + addLog("ok", `http bypass sleepStarted=${sleepStatus.sleepStarted}`); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + setLastBypass(`http bypass error: ${message}`); + addLog("error", `http bypass error: ${message}`); + } + }, [addLog, noteActorStopping]); + + const testWebSocketBypass = useCallback(async () => { + const handle = handleRef.current; + if (!handle) { + addLog("error", "connect before testing bypass"); + return; + } + const probeId = crypto.randomUUID(); + let socket: WebSocket | null = null; + try { + socket = await handle.webSocket("/bypass", undefined, { + gateway: { bypassConnectable: true }, + }); + await waitForSocketOpen(socket); + const result = await new Promise>( + (resolve, reject) => { + const timeout = setTimeout(() => { + cleanup(); + reject(new Error("timed out waiting for bypass pong")); + }, 10_000); + const cleanup = () => { + clearTimeout(timeout); + socket?.removeEventListener("message", onMessage); + socket?.removeEventListener("close", onClose); + socket?.removeEventListener("error", onError); + }; + const onMessage = (event: MessageEvent) => { + if (typeof event.data !== "string") return; + const message = JSON.parse(event.data) as AgenticServerMessage; + if (message.type !== "pong" || message.probeId !== probeId) return; + cleanup(); + resolve(message); + }; + const onClose = (event: CloseEvent) => { + cleanup(); + reject( + new Error(`closed code=${event.code} reason=${event.reason}`), + ); + }; + const onError = () => { + cleanup(); + reject(new Error("websocket error")); + }; + socket?.addEventListener("message", onMessage); + socket?.addEventListener("close", onClose, { once: true }); + socket?.addEventListener("error", onError, { once: true }); + socket?.send(JSON.stringify({ type: "ping", probeId })); + }, + ); + const sleepStatus = sleepStatusFromPayload("ws bypass", result); + setStats((prev) => ({ + ...prev, + bypassWsOk: prev.bypassWsOk + 1, + sleepProofWs: prev.sleepProofWs + (sleepStatus.sleepStarted ? 1 : 0), + })); + setLastBypass(`ws bypass ok: sleepStarted=${sleepStatus.sleepStarted}`); + addLog("ok", `ws bypass sleepStarted=${sleepStatus.sleepStarted}`); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + if (message.includes("actor.stopping") || message.includes("Server Error")) { + setStats((prev) => ({ ...prev, actorStopping: prev.actorStopping + 1 })); + setLastBypass(`ws bypass transient close: ${message}`); + addLog("warn", `ws bypass transient close: ${message}`); + } else { + setLastBypass(`ws bypass error: ${message}`); + addLog("error", `ws bypass error: ${message}`); + } + } finally { + if ( + socket && + (socket.readyState === WebSocket.OPEN || + socket.readyState === WebSocket.CONNECTING) + ) { + socket.close(1000, "bypass probe complete"); + } + } + }, [addLog]); + + useEffect(() => { + return () => { + if (reconnectTimerRef.current) clearTimeout(reconnectTimerRef.current); + clearProgressTimer(); + mainSocketCleanupRef.current?.(); + mainSocketCleanupRef.current = null; + }; + }, [clearProgressTimer]); + + const currentIndexes = currentRequest?.received ?? []; + const invariantStatus = + stats.validationErrors === 0 ? "pass" : "fail"; + + return ( +
+
+
+
+

Mock Agentic Loop

+

+ Use one raw WebSocket stream, explicit actions, manual sleep, and + gateway bypass calls against the same actor. +

+
+
+ {connectionStatus} +
+
+ +
+
+ + setEndpoint(event.target.value)} + /> +
+
+ + setNamespace(event.target.value)} + /> +
+
+ + setToken(event.target.value)} + /> +
+
+ +
+
+
Key
+
{key}
+
+
+
Actor ID
+
{actorId || "not resolved"}
+
+
+ +
+ + + +
+
+ +
+
+

Inference

+
+ {stats.validationErrors === 0 ? "valid" : "invalid"} +
+
+
+
+ + setSeconds(Number(event.target.value))} + /> +
+
+ + setProgressMarginMs(Number(event.target.value))} + /> +
+
+
+ + +
+
+ {currentRequest ? ( + <> +
+ {currentRequest.requestId.slice(0, 8)} received{" "} + {currentIndexes.length}/{currentRequest.seconds} +
+
+ {Array.from({ length: currentRequest.seconds }, (_, index) => { + const idx = index + 1; + const received = currentIndexes.includes(idx); + return ( + + {idx} + + ); + })} +
+ + ) : ( +
No active inference.
+ )} +
+
+ +
+
+

Sleep and Bypass

+
+
+ + + +
+
{lastBypass}
+
+ +
+
+

Event Log

+ +
+
+ {logs.length === 0 ? ( +
No activity yet.
+ ) : ( + logs.map((entry) => ( +
+ {entry.time} + {entry.message} +
+ )) + )} +
+
+ +
+
+

Validation

+
+
+ + + + + + + + + + + + +
+
{lastVerification}
+
{lastHistory}
+
+ +
+
+ Source +
+ +
+
+ ); +} + +function AgenticStat({ + label, + value, +}: { + label: string; + value: string | number; +}) { + return ( +
+ {label} + {value} +
+ ); +} + // ── Welcome / Diagram / Config ──────────────────── function WelcomePanel() { diff --git a/examples/kitchen-sink/frontend/page-data.ts b/examples/kitchen-sink/frontend/page-data.ts index a5b9c3fc2f..aafaedb8e6 100644 --- a/examples/kitchen-sink/frontend/page-data.ts +++ b/examples/kitchen-sink/frontend/page-data.ts @@ -10,7 +10,13 @@ export type ActionTemplate = { description?: string; }; -export type DemoType = "actions" | "config" | "diagram" | "raw-http" | "raw-websocket"; +export type DemoType = + | "actions" + | "config" + | "diagram" + | "mock-agentic-loop" + | "raw-http" + | "raw-websocket"; export type PageConfig = { id: string; @@ -182,6 +188,19 @@ await actor.runCycle({ rowBytes: 16384, deleteRows: 64, retainRows: 1024, +});`, + mockAgenticLoop: `const client = createClient({ endpoint, encoding: "json" }); +const actor = client.mockAgenticLoop.getOrCreate([key]); +const ws = await actor.webSocket(); + +ws.send(JSON.stringify({ type: "infer", requestId, seconds })); + +await actor.fetch("/bypass", { + gateway: { bypassConnectable: true }, +}); + +await actor.webSocket("/bypass", undefined, { + gateway: { bypassConnectable: true }, });`, }; @@ -1282,6 +1301,16 @@ export const PAGE_GROUPS: PageGroup[] = [ T -->|action| A A -->|result| T`, }, + { + id: "mock-agentic-loop", + title: "Mock Agentic Loop", + description: + "Manually test streaming, SQLite durability, forced sleep, reconnects, and gateway bypass against one actor.", + docs: [], + actors: ["mockAgenticLoop"], + snippet: SNIPPETS.mockAgenticLoop, + demo: "mock-agentic-loop", + }, { id: "sqlite-memory-pressure", title: "SQLite Memory Pressure", diff --git a/examples/kitchen-sink/index.html b/examples/kitchen-sink/index.html index f96a17491d..f14c3cc97e 100644 --- a/examples/kitchen-sink/index.html +++ b/examples/kitchen-sink/index.html @@ -712,6 +712,214 @@ color: var(--muted); } + /* ── Mock Agentic Loop Lab ───────────── */ + + .agentic-lab { + display: grid; + grid-template-columns: repeat(2, minmax(0, 1fr)); + gap: 16px; + } + + .agentic-lab .demo-code-bottom { + grid-column: 1 / -1; + border: 1px solid var(--border-strong); + border-radius: var(--radius); + overflow: hidden; + background: var(--panel); + } + + .agentic-panel { + background: var(--panel); + border: 1px solid var(--border-strong); + border-radius: var(--radius); + padding: 16px; + display: flex; + flex-direction: column; + gap: 14px; + min-width: 0; + } + + .agentic-panel-header { + display: flex; + align-items: flex-start; + justify-content: space-between; + gap: 12px; + } + + .agentic-grid { + display: grid; + grid-template-columns: repeat(3, minmax(0, 1fr)); + gap: 10px; + } + + .agentic-grid.compact { + grid-template-columns: repeat(2, minmax(0, 1fr)); + } + + .agentic-session-row { + display: grid; + grid-template-columns: 1fr 1fr; + gap: 12px; + padding: 12px; + background: var(--panel-3); + border: 1px solid var(--border); + border-radius: 8px; + min-width: 0; + } + + .agentic-kicker { + font-size: 10px; + text-transform: uppercase; + letter-spacing: 0.08em; + color: var(--muted-2); + margin-bottom: 4px; + } + + .agentic-mono, + .agentic-result, + .agentic-empty { + font-family: ui-monospace, SFMono-Regular, "SF Mono", Consolas, monospace; + font-size: 12px; + line-height: 1.5; + overflow-wrap: anywhere; + } + + .agentic-result, + .agentic-empty { + background: var(--panel-3); + border: 1px solid var(--border); + border-radius: 8px; + padding: 10px 12px; + color: var(--muted); + } + + .agentic-status { + flex-shrink: 0; + border: 1px solid var(--border-strong); + border-radius: 999px; + padding: 4px 10px; + font-size: 12px; + color: var(--muted); + background: var(--panel-3); + } + + .agentic-status.connected, + .agentic-status.pass { + color: var(--success); + border-color: rgba(48, 209, 88, 0.45); + } + + .agentic-status.connecting { + color: var(--warning); + border-color: rgba(255, 159, 10, 0.5); + } + + .agentic-status.error, + .agentic-status.fail { + color: var(--danger); + border-color: rgba(255, 59, 48, 0.5); + } + + .agentic-stream { + min-height: 112px; + background: var(--panel-3); + border: 1px solid var(--border); + border-radius: 8px; + padding: 12px; + } + + .agentic-indexes { + display: grid; + grid-template-columns: repeat(auto-fill, minmax(34px, 1fr)); + gap: 6px; + margin-top: 10px; + } + + .agentic-index { + height: 28px; + display: inline-flex; + align-items: center; + justify-content: center; + border: 1px solid var(--border-strong); + border-radius: 6px; + color: var(--muted); + font-size: 12px; + font-family: ui-monospace, SFMono-Regular, "SF Mono", Consolas, monospace; + } + + .agentic-index.received { + color: var(--success); + border-color: rgba(48, 209, 88, 0.45); + background: rgba(48, 209, 88, 0.1); + } + + .agentic-stat-grid { + display: grid; + grid-template-columns: repeat(4, minmax(0, 1fr)); + gap: 8px; + } + + .agentic-stat { + background: var(--panel-3); + border: 1px solid var(--border); + border-radius: 8px; + padding: 10px; + min-height: 66px; + display: flex; + flex-direction: column; + justify-content: space-between; + gap: 8px; + } + + .agentic-stat span { + color: var(--muted-2); + font-size: 11px; + } + + .agentic-stat strong { + font-size: 16px; + font-family: ui-monospace, SFMono-Regular, "SF Mono", Consolas, monospace; + overflow-wrap: anywhere; + } + + .agentic-log { + background: var(--panel-3); + border: 1px solid var(--border); + border-radius: 8px; + max-height: 360px; + overflow-y: auto; + font-family: ui-monospace, SFMono-Regular, "SF Mono", Consolas, monospace; + font-size: 12px; + } + + .agentic-log-row { + display: grid; + grid-template-columns: 88px 1fr; + gap: 8px; + padding: 8px 10px; + border-bottom: 1px solid var(--border); + } + + .agentic-log-row:last-child { + border-bottom: none; + } + + .agentic-log-row span:first-child { + color: var(--muted-2); + } + + .agentic-log-row.ok span:last-child { + color: var(--success); + } + + .agentic-log-row.warn span:last-child { + color: var(--warning); + } + + .agentic-log-row.error span:last-child { + color: var(--danger); + } + /* ── Mermaid ──────────────────────────── */ .mermaid-diagram { @@ -734,6 +942,15 @@ .actor-columns { grid-template-columns: 1fr; } + .agentic-lab { + grid-template-columns: 1fr; + } + .agentic-grid, + .agentic-grid.compact, + .agentic-session-row, + .agentic-stat-grid { + grid-template-columns: 1fr; + } .actor-controls { border-right: none; border-bottom: 1px solid var(--border-strong); diff --git a/examples/kitchen-sink/scripts/mock-agentic-loop.ts b/examples/kitchen-sink/scripts/mock-agentic-loop.ts index ed43180c4c..4f40455daf 100644 --- a/examples/kitchen-sink/scripts/mock-agentic-loop.ts +++ b/examples/kitchen-sink/scripts/mock-agentic-loop.ts @@ -61,9 +61,14 @@ const MAX_RECONNECT_MS = numberFromEnv( "MOCK_AGENTIC_MAX_RECONNECT_MS", 30_000, ); +const DEFAULT_ON_SLEEP_DELAY_MS = 15_000; +const ON_SLEEP_DELAY_MS = numberFromEnv( + "MOCK_AGENTIC_ON_SLEEP_DELAY_MS", + DEFAULT_ON_SLEEP_DELAY_MS, +); const SLEEP_CLOSE_TIMEOUT_MS = numberFromEnv( "MOCK_AGENTIC_SLEEP_CLOSE_TIMEOUT_MS", - 20_000, + ON_SLEEP_DELAY_MS + 30_000, ); const PROBE_INTERVAL_MS = numberFromEnv( "MOCK_AGENTIC_PROBE_INTERVAL_MS", @@ -97,7 +102,13 @@ type ServerMessage = entries: HistoryEntry[]; timestamp: number; } - | { type: "pong"; probeId: string; timestamp: number } + | { + type: "pong"; + probeId: string; + sleepStarted: boolean; + sleepStartedAt: number | null; + timestamp: number; + } | { type: "started"; requestId: string; seconds: number; timestamp: number } | { type: "progress"; @@ -207,9 +218,13 @@ type BypassStats = { httpSuccesses: number; beforeSleepHttpSuccesses: number; afterSleepHttpSuccesses: number; + beforeSleepHttpUnexpectedSleepStarted: number; + afterSleepHttpSleepStarted: number; webSocketSuccesses: number; beforeSleepWebSocketSuccesses: number; afterSleepWebSocketSuccesses: number; + beforeSleepWebSocketUnexpectedSleepStarted: number; + afterSleepWebSocketSleepStarted: number; timeouts: BypassObservation[]; errors: BypassObservation[]; }; @@ -219,7 +234,7 @@ type BypassHandle = { input: string, init?: RequestInit & { gateway?: { - bypassConnectable?: boolean; + skipReadyWait?: boolean; }; }, ) => Promise; @@ -228,7 +243,7 @@ type BypassHandle = { protocols?: string | string[], options?: { gateway?: { - bypassConnectable?: boolean; + skipReadyWait?: boolean; }; }, ) => Promise; @@ -520,6 +535,8 @@ async function startLocalKitchenSinkServer() { RIVET_RUN_ENGINE: "1", RIVET_ENGINE_BINARY: resolveEngineBinary(), RIVETKIT_RUNTIME: process.env.RIVETKIT_RUNTIME ?? "native", + RIVETKIT_STORAGE_PATH: + process.env.RIVETKIT_STORAGE_PATH ?? dbRoot, RIVET_SERVERLESS_URL: SERVERLESS_URL, RIVET__FILE_SYSTEM__PATH: process.env.RIVET__FILE_SYSTEM__PATH ?? join(dbRoot, "db"), @@ -1155,6 +1172,30 @@ async function runProbeLoop(webSocketUrl: string, stopAt: number) { return stats; } +function validateBypassSleepStatus( + source: string, + value: { + sleepStarted?: unknown; + sleepStartedAt?: unknown; + }, +) { + if (typeof value.sleepStarted !== "boolean") { + throw new Error(`${source} missing boolean sleepStarted`); + } + if (value.sleepStarted) { + if (typeof value.sleepStartedAt !== "number") { + throw new Error(`${source} missing numeric sleepStartedAt`); + } + } else if (value.sleepStartedAt !== null) { + throw new Error(`${source} expected null sleepStartedAt before sleep`); + } + + return { + sleepStarted: value.sleepStarted, + sleepStartedAt: value.sleepStartedAt, + }; +} + async function runBypassAttempt( handle: BypassHandle, stats: BypassStats, @@ -1180,7 +1221,7 @@ async function runBypassAttempt( method: "GET", signal: controller.signal, gateway: { - bypassConnectable: true, + skipReadyWait: true, }, }), "bypass http", @@ -1194,15 +1235,24 @@ async function runBypassAttempt( const body = (await response.json()) as { type?: string; transport?: string; + sleepStarted?: unknown; + sleepStartedAt?: unknown; }; if (body.type !== "bypass" || body.transport !== "http") { throw new Error(`unexpected bypass http body ${JSON.stringify(body)}`); } + const sleepStatus = validateBypassSleepStatus("bypass http", body); stats.httpSuccesses += 1; if (phase === "beforeSleep") { stats.beforeSleepHttpSuccesses += 1; + if (sleepStatus.sleepStarted) { + stats.beforeSleepHttpUnexpectedSleepStarted += 1; + } } else { stats.afterSleepHttpSuccesses += 1; + if (sleepStatus.sleepStarted) { + stats.afterSleepHttpSleepStarted += 1; + } } } finally { clearTimeout(abortTimeout); @@ -1211,7 +1261,7 @@ async function runBypassAttempt( const ws = await withTimeout( handle.webSocket("/bypass", undefined, { gateway: { - bypassConnectable: true, + skipReadyWait: true, }, }), "bypass websocket create", @@ -1223,6 +1273,7 @@ async function runBypassAttempt( "bypass websocket open", BYPASS_TIMEOUT_MS, ); + let webSocketSleepStarted = false; const pong = new Promise((resolve, reject) => { const timeoutHandle = setTimeout(() => { cleanup(); @@ -1240,6 +1291,10 @@ async function runBypassAttempt( if (message.type !== "pong" || message.probeId !== probeId) { return; } + webSocketSleepStarted = validateBypassSleepStatus( + "bypass websocket", + message, + ).sleepStarted; cleanup(); resolve(); }; @@ -1264,8 +1319,14 @@ async function runBypassAttempt( stats.webSocketSuccesses += 1; if (phase === "beforeSleep") { stats.beforeSleepWebSocketSuccesses += 1; + if (webSocketSleepStarted) { + stats.beforeSleepWebSocketUnexpectedSleepStarted += 1; + } } else { stats.afterSleepWebSocketSuccesses += 1; + if (webSocketSleepStarted) { + stats.afterSleepWebSocketSleepStarted += 1; + } } } finally { if ( @@ -1300,9 +1361,13 @@ async function runBypassLoop( httpSuccesses: 0, beforeSleepHttpSuccesses: 0, afterSleepHttpSuccesses: 0, + beforeSleepHttpUnexpectedSleepStarted: 0, + afterSleepHttpSleepStarted: 0, webSocketSuccesses: 0, beforeSleepWebSocketSuccesses: 0, afterSleepWebSocketSuccesses: 0, + beforeSleepWebSocketUnexpectedSleepStarted: 0, + afterSleepWebSocketSleepStarted: 0, timeouts: [], errors: [], }; @@ -1430,7 +1495,7 @@ async function runWorkload() { }; console.log( - `[start] endpoint=${ENDPOINT} namespace=${NAMESPACE} pool=${POOL_NAME} actorId=${actorId} ${label} durationMs=${DURATION_MS} sleepIntervalMs=${SLEEP_INTERVAL_MS} inferenceSeconds=${INFERENCE_MIN_SECONDS}-${INFERENCE_MAX_SECONDS} jitterMs=${JITTER_MIN_MS}-${JITTER_MAX_MS} probeIntervalMs=${PROBE_INTERVAL_MS} bypassIntervalMs=${BYPASS_INTERVAL_MS}`, + `[start] endpoint=${ENDPOINT} namespace=${NAMESPACE} pool=${POOL_NAME} actorId=${actorId} ${label} durationMs=${DURATION_MS} sleepIntervalMs=${SLEEP_INTERVAL_MS} onSleepDelayMs=${ON_SLEEP_DELAY_MS} sleepCloseTimeoutMs=${SLEEP_CLOSE_TIMEOUT_MS} inferenceSeconds=${INFERENCE_MIN_SECONDS}-${INFERENCE_MAX_SECONDS} jitterMs=${JITTER_MIN_MS}-${JITTER_MAX_MS} probeIntervalMs=${PROBE_INTERVAL_MS} bypassIntervalMs=${BYPASS_INTERVAL_MS}`, ); const session = new RawSession(webSocketUrl, label); @@ -1557,7 +1622,7 @@ async function runWorkload() { await verifyAll(verifier, expectedRequests); console.log( - `[done] actorId=${actorId} key=${key} requests=${requestCount} sleepPosts=${sleepResult.posts} sleepErrors=${sleepResult.errors} reconnects=${reconnectCount} maxReconnectMs=${maxReconnectMs} probeAttempts=${probeResult.attempts} probeSuccesses=${probeResult.successes} probeExpectedCloses=${probeResult.expectedCloses} bypassAttempts=${bypassResult.attempts} bypassBeforeSleepAttempts=${bypassResult.beforeSleepAttempts} bypassAfterSleepAttempts=${bypassResult.afterSleepAttempts} bypassHttpSuccesses=${bypassResult.httpSuccesses} bypassWebSocketSuccesses=${bypassResult.webSocketSuccesses} bypassBeforeSleepHttpSuccesses=${bypassResult.beforeSleepHttpSuccesses} bypassBeforeSleepWebSocketSuccesses=${bypassResult.beforeSleepWebSocketSuccesses} bypassAfterSleepHttpSuccesses=${bypassResult.afterSleepHttpSuccesses} bypassAfterSleepWebSocketSuccesses=${bypassResult.afterSleepWebSocketSuccesses} bypassTimeouts=${bypassResult.timeouts.length} bypassErrors=${bypassResult.errors.length}`, + `[done] actorId=${actorId} key=${key} requests=${requestCount} sleepPosts=${sleepResult.posts} sleepErrors=${sleepResult.errors} reconnects=${reconnectCount} maxReconnectMs=${maxReconnectMs} probeAttempts=${probeResult.attempts} probeSuccesses=${probeResult.successes} probeExpectedCloses=${probeResult.expectedCloses} bypassAttempts=${bypassResult.attempts} bypassBeforeSleepAttempts=${bypassResult.beforeSleepAttempts} bypassAfterSleepAttempts=${bypassResult.afterSleepAttempts} bypassHttpSuccesses=${bypassResult.httpSuccesses} bypassWebSocketSuccesses=${bypassResult.webSocketSuccesses} bypassBeforeSleepHttpSuccesses=${bypassResult.beforeSleepHttpSuccesses} bypassBeforeSleepWebSocketSuccesses=${bypassResult.beforeSleepWebSocketSuccesses} bypassAfterSleepHttpSuccesses=${bypassResult.afterSleepHttpSuccesses} bypassAfterSleepWebSocketSuccesses=${bypassResult.afterSleepWebSocketSuccesses} bypassAfterSleepHttpSleepStarted=${bypassResult.afterSleepHttpSleepStarted} bypassAfterSleepWebSocketSleepStarted=${bypassResult.afterSleepWebSocketSleepStarted} bypassTimeouts=${bypassResult.timeouts.length} bypassErrors=${bypassResult.errors.length}`, ); if (DURATION_MS >= SLEEP_INTERVAL_MS && sleepResult.posts === 0) { @@ -1616,9 +1681,36 @@ async function runWorkload() { `bypass loop had pre-sleep failures: ${JSON.stringify(bypassResult)}`, ); } + if ( + bypassResult.beforeSleepHttpUnexpectedSleepStarted > 0 || + bypassResult.beforeSleepWebSocketUnexpectedSleepStarted > 0 + ) { + throw new Error( + `bypass saw sleepStarted before sleep: ${JSON.stringify(bypassResult)}`, + ); + } if (sleepResult.posts > 0 && bypassResult.afterSleepAttempts === 0) { throw new Error("bypass loop did not continue after sleep request"); } + if (sleepResult.posts > 0 && bypassResult.afterSleepHttpSuccesses === 0) { + throw new Error("bypass http had no successful after-sleep actor responses"); + } + if (sleepResult.posts > 0 && bypassResult.afterSleepWebSocketSuccesses === 0) { + throw new Error("bypass websocket had no successful after-sleep actor responses"); + } + if (sleepResult.posts > 0 && bypassResult.afterSleepHttpSleepStarted === 0) { + throw new Error( + `bypass http never returned actor sleepStarted proof: ${JSON.stringify(bypassResult)}`, + ); + } + if ( + sleepResult.posts > 0 && + bypassResult.afterSleepWebSocketSleepStarted === 0 + ) { + throw new Error( + `bypass websocket never returned actor sleepStarted proof: ${JSON.stringify(bypassResult)}`, + ); + } } async function main() { diff --git a/examples/kitchen-sink/src/actors/testing/mock-agentic-loop.ts b/examples/kitchen-sink/src/actors/testing/mock-agentic-loop.ts index 2802fd38fa..3e0a654cd0 100644 --- a/examples/kitchen-sink/src/actors/testing/mock-agentic-loop.ts +++ b/examples/kitchen-sink/src/actors/testing/mock-agentic-loop.ts @@ -18,11 +18,45 @@ type CountRow = { count: number; }; +type SleepStateRow = { + sleep_started_at: number; +}; + +type DebugEventRow = { + event_id: string; + name: string; + actor_id: string; + connection_id: string | null; + request_id: string | null; + details_json: string; + created_at: number; +}; + type ExpectedRequest = { requestId: string; seconds: number; }; +type DebugEventInput = { + name: string; + connectionId?: string; + requestId?: string; + details?: Record; + createdAt?: number; +}; + +type DebugContext = { + actorId: string; + db: { + execute: (query: string, ...params: unknown[]) => Promise; + }; + log: { + warn: (payload: unknown) => void; + }; +}; + +const debugSocketsByActorId = new Map>(); + function sleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); } @@ -64,6 +98,96 @@ function send(websocket: UniversalWebSocket, payload: unknown) { websocket.send(JSON.stringify(payload)); } +function debugPayload(row: DebugEventRow, replayed: boolean) { + return { + type: "debugEvent", + eventId: row.event_id, + name: row.name, + actorId: row.actor_id, + connectionId: row.connection_id, + requestId: row.request_id, + details: JSON.parse(row.details_json) as Record, + createdAt: row.created_at, + replayed, + }; +} + +function publishDebugEvent(row: DebugEventRow) { + const sockets = debugSocketsByActorId.get(row.actor_id); + if (!sockets) return; + + for (const socket of sockets) { + send(socket, debugPayload(row, false)); + } +} + +function addDebugSocket(actorId: string, websocket: UniversalWebSocket) { + const sockets = debugSocketsByActorId.get(actorId) ?? new Set(); + sockets.add(websocket); + debugSocketsByActorId.set(actorId, sockets); + + return () => { + sockets.delete(websocket); + if (sockets.size === 0) { + debugSocketsByActorId.delete(actorId); + } + }; +} + +async function recordDebugEvent(c: DebugContext, input: DebugEventInput) { + const row: DebugEventRow = { + event_id: crypto.randomUUID(), + name: input.name, + actor_id: c.actorId, + connection_id: input.connectionId ?? null, + request_id: input.requestId ?? null, + details_json: JSON.stringify(input.details ?? {}), + created_at: input.createdAt ?? Date.now(), + }; + + try { + await c.db.execute( + "INSERT INTO mock_agentic_debug_events (event_id, name, actor_id, connection_id, request_id, details_json, created_at) VALUES (?, ?, ?, ?, ?, ?, ?)", + row.event_id, + row.name, + row.actor_id, + row.connection_id, + row.request_id, + row.details_json, + row.created_at, + ); + publishDebugEvent(row); + } catch (error) { + c.log.warn({ + msg: "mock agentic debug event failed", + name: input.name, + err: error instanceof Error ? error.message : String(error), + }); + } +} + +async function replayDebugEvents( + database: DebugContext["db"], + websocket: UniversalWebSocket, +) { + const rows = typedRows( + await database.execute(` + SELECT event_id, name, actor_id, connection_id, request_id, details_json, created_at + FROM ( + SELECT event_id, name, actor_id, connection_id, request_id, details_json, created_at + FROM mock_agentic_debug_events + ORDER BY created_at DESC + LIMIT 200 + ) + ORDER BY created_at ASC + `), + ); + + for (const row of rows) { + send(websocket, debugPayload(row, true)); + } +} + function verifyEntryRows(rows: EntryRow[], expectedSeconds: number) { const seen = new Set(); const indexes = rows.map((row) => row.idx).sort((a, b) => a - b); @@ -153,25 +277,82 @@ export const mockAgenticLoop = actor({ await database.execute( "CREATE INDEX IF NOT EXISTS idx_mock_agentic_entries_created_at ON mock_agentic_entries(created_at)", ); + await database.execute(` + CREATE TABLE IF NOT EXISTS mock_agentic_sleep_state ( + id INTEGER PRIMARY KEY CHECK (id = 1), + sleep_started_at INTEGER NOT NULL + ) + `); + await database.execute(` + CREATE TABLE IF NOT EXISTS mock_agentic_debug_events ( + event_id TEXT PRIMARY KEY, + name TEXT NOT NULL, + actor_id TEXT NOT NULL, + connection_id TEXT, + request_id TEXT, + details_json TEXT NOT NULL, + created_at INTEGER NOT NULL + ) + `); + await database.execute( + "CREATE INDEX IF NOT EXISTS idx_mock_agentic_debug_events_created_at ON mock_agentic_debug_events(created_at)", + ); }, }), + async onWake(c) { + await recordDebugEvent(c, { + name: "onWake", + details: { + key: c.key, + name: c.name, + }, + }); + }, async onSleep(c) { const delayMs = numberFromEnv( "MOCK_AGENTIC_ON_SLEEP_DELAY_MS", DEFAULT_ON_SLEEP_DELAY_MS, ); + const sleepStartedAt = Date.now(); + await recordDebugEvent(c, { + name: "onSleepStart", + createdAt: sleepStartedAt, + details: { + delayMs, + }, + }); + await c.db.execute( + "INSERT OR REPLACE INTO mock_agentic_sleep_state (id, sleep_started_at) VALUES (1, ?)", + sleepStartedAt, + ); c.log.info({ msg: "mock agentic loop onSleep delay", delayMs, + sleepStartedAt, }); await sleep(delayMs); + await recordDebugEvent(c, { + name: "onSleepEnd", + details: { + delayMs, + sleepStartedAt, + elapsedMs: Date.now() - sleepStartedAt, + }, + }); }, - onRequest(_c, request) { + async onRequest(c, request) { const url = new URL(request.url); if (url.pathname === "/bypass" || url.pathname === "/request/bypass") { + const [sleepState] = typedRows( + await c.db.execute( + "SELECT sleep_started_at FROM mock_agentic_sleep_state WHERE id = 1", + ), + ); return new Response(JSON.stringify({ type: "bypass", transport: "http", + sleepStarted: sleepState !== undefined, + sleepStartedAt: sleepState?.sleep_started_at ?? null, timestamp: Date.now(), }), { headers: { @@ -185,12 +366,27 @@ export const mockAgenticLoop = actor({ onWebSocket(c, websocket: UniversalWebSocket) { const connectionId = crypto.randomUUID(); let activeInference: Promise | undefined; + const removeDebugSocket = addDebugSocket(c.actorId, websocket); send(websocket, { type: "hello", connectionId, timestamp: Date.now(), }); + void (async () => { + try { + await replayDebugEvents(c.db, websocket); + } catch (error) { + c.log.warn({ + msg: "mock agentic debug replay failed", + err: error instanceof Error ? error.message : String(error), + }); + } + await recordDebugEvent(c, { + name: "webSocketOpen", + connectionId, + }); + })(); const verify = async (requestId: string, expectedSeconds: number) => { const rows = typedRows( @@ -206,6 +402,18 @@ export const mockAgenticLoop = actor({ }; }; + const sleepStatus = async () => { + const [sleepState] = typedRows( + await c.db.execute( + "SELECT sleep_started_at FROM mock_agentic_sleep_state WHERE id = 1", + ), + ); + return { + sleepStarted: sleepState !== undefined, + sleepStartedAt: sleepState?.sleep_started_at ?? null, + }; + }; + const runInference = async (requestId: string, seconds: number) => { send(websocket, { type: "started", @@ -284,6 +492,7 @@ export const mockAgenticLoop = actor({ send(websocket, { type: "pong", probeId: stringValue(message.probeId, "probeId"), + ...(await sleepStatus()), timestamp: Date.now(), }); return; @@ -318,6 +527,14 @@ export const mockAgenticLoop = actor({ message.seconds, "seconds", ); + await recordDebugEvent(c, { + name: "inferenceRequested", + connectionId, + requestId, + details: { + seconds, + }, + }); const inference = runInference( requestId, seconds, @@ -342,6 +559,14 @@ export const mockAgenticLoop = actor({ } })(); }); + + websocket.addEventListener("close", () => { + removeDebugSocket(); + void recordDebugEvent(c, { + name: "webSocketClose", + connectionId, + }); + }); }, actions: { verify: async (c, requestId: string, expectedSeconds: number) => { diff --git a/examples/kitchen-sink/src/index.ts b/examples/kitchen-sink/src/index.ts index 2378b55215..c3268e9035 100644 --- a/examples/kitchen-sink/src/index.ts +++ b/examples/kitchen-sink/src/index.ts @@ -172,6 +172,8 @@ function serverlessPoolConfig() { export const registry = setup({ configurePool: serverlessPoolConfig(), serverless: { + publicToken: + process.env.RIVET_PUBLIC_TOKEN ?? process.env.RIVET_TOKEN ?? "dev", maxStartPayloadBytes: numberFromEnv( "RIVET_SERVERLESS_MAX_START_PAYLOAD_BYTES", 16 * 1024 * 1024, diff --git a/examples/kitchen-sink/vite.config.ts b/examples/kitchen-sink/vite.config.ts index e02ede3a0b..9a992b4039 100644 --- a/examples/kitchen-sink/vite.config.ts +++ b/examples/kitchen-sink/vite.config.ts @@ -16,4 +16,13 @@ function sqlRawPlugin(): Plugin { export default defineConfig({ plugins: [react(), sqlRawPlugin()], + server: { + proxy: { + "/api/rivet": { + target: "http://127.0.0.1:3000", + changeOrigin: true, + ws: true, + }, + }, + }, }); diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/index.ts b/rivetkit-typescript/packages/rivetkit/src/registry/index.ts index 20fecb02a9..96e2e73c7c 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/index.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/index.ts @@ -51,6 +51,22 @@ export class Registry { this.#config = config; } + #ensureServerlessPoolConfigured(config: RegistryConfig): Promise | undefined { + if (!config.configurePool) return undefined; + + if (!this.#configureServerlessPoolPromise) { + this.#configureServerlessPoolPromise = configureServerlessPool(config).catch( + (error) => { + this.#configureServerlessPoolPromise = undefined; + throw error; + }, + ); + this.#configureServerlessPoolPromise.catch(() => {}); + } + + return this.#configureServerlessPoolPromise; + } + /** * Handle an incoming HTTP request for serverless deployments. * @@ -65,17 +81,42 @@ export class Registry { const config = this.parseConfig(); this.#printWelcome(config, "serverless"); - if (config.configurePool && !this.#configureServerlessPoolPromise) { - this.#configureServerlessPoolPromise = - configureServerlessPool(config); - } - if (!this.#runtimeServerlessPromise) { this.#runtimeServerlessPromise = buildConfiguredRegistry(config); } const { runtime, registry, serveConfig } = await this.#runtimeServerlessPromise; + const isStartRequest = isServerlessStartRequest( + request, + serveConfig.serverlessBasePath ?? "/api/rivet", + ); + const isMetadataRequest = isServerlessMetadataRequest( + request, + serveConfig.serverlessBasePath ?? "/api/rivet", + ); + const isEngineMetadataRequest = + request.headers.get("user-agent")?.startsWith("RivetEngine/") ?? false; + + if (isStartRequest) { + try { + await this.#ensureServerlessPoolConfigured(config); + } catch (error) { + return new Response( + JSON.stringify({ + group: "guard", + code: "service_unavailable", + message: "Serverless pool is not configured.", + metadata: null, + }), + { + status: 503, + headers: { "content-type": "application/json" }, + }, + ); + } + } + const cancelToken = runtime.createCancellationToken(); const abort = () => runtime.cancelCancellationToken(cancelToken); if (request.signal.aborted) { @@ -86,10 +127,7 @@ export class Registry { const requestBody = await request.arrayBuffer(); if ( - isServerlessStartRequest( - request, - serveConfig.serverlessBasePath ?? "/api/rivet", - ) && + isStartRequest && requestBody.byteLength > serveConfig.serverlessMaxStartPayloadBytes ) { request.signal.removeEventListener("abort", abort); @@ -202,6 +240,25 @@ export class Registry { throw err; } + if (isMetadataRequest && !isEngineMetadataRequest) { + try { + await this.#ensureServerlessPoolConfigured(config); + } catch (error) { + return new Response( + JSON.stringify({ + group: "guard", + code: "service_unavailable", + message: "Serverless pool is not configured.", + metadata: null, + }), + { + status: 503, + headers: { "content-type": "application/json" }, + }, + ); + } + } + return new Response(stream, { status: head.status, headers: head.headers, @@ -459,6 +516,14 @@ function isServerlessStartRequest(request: Request, basePath: string): boolean { return parsed.pathname === `${normalizedBase}/start`; } +function isServerlessMetadataRequest(request: Request, basePath: string): boolean { + if (request.method !== "GET") return false; + const parsed = new URL(request.url); + const normalizedBase = + basePath === "/" ? "" : `/${basePath.replace(/^\/+|\/+$/g, "")}`; + return parsed.pathname === `${normalizedBase}/metadata`; +} + export function setup( input: RegistryConfigInput, ): Registry { diff --git a/rivetkit-typescript/packages/rivetkit/src/serverless/configure.ts b/rivetkit-typescript/packages/rivetkit/src/serverless/configure.ts index 63baf10317..57aaa87fb1 100644 --- a/rivetkit-typescript/packages/rivetkit/src/serverless/configure.ts +++ b/rivetkit-typescript/packages/rivetkit/src/serverless/configure.ts @@ -3,66 +3,107 @@ import { getDatacenters, updateRunnerConfig, } from "@/engine-client/api-endpoints"; +import { stringifyError } from "@/common/utils"; import type { RegistryConfig } from "@/registry/config"; import { logger } from "@/registry/log"; +const DEFAULT_CONFIGURE_TIMEOUT_MS = 60_000; +const CONFIGURE_RETRY_DELAY_MS = 1_000; + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +function configureTimeoutMs() { + const value = process.env.RIVET_SERVERLESS_CONFIGURE_TIMEOUT_MS; + if (value === undefined || value === "") return DEFAULT_CONFIGURE_TIMEOUT_MS; + + const parsed = Number(value); + if (!Number.isFinite(parsed) || parsed < 0) { + throw new Error("RIVET_SERVERLESS_CONFIGURE_TIMEOUT_MS must be a finite non-negative number"); + } + + return parsed; +} + export async function configureServerlessPool( config: RegistryConfig, ): Promise { logger().debug({ msg: "configuring serverless pool" }); - try { - if (!config.namespace) { - throw new Error("namespace is required for serverless configuration"); - } - if (!config.endpoint) { - throw new Error("endpoint is required for serverless configuration"); - } - if (!config.configurePool) { - throw new Error("configurePool is required for serverless configuration"); - } + const startedAt = Date.now(); + const timeoutMs = configureTimeoutMs(); + let attempts = 0; + let lastError: unknown; - const customConfig = config.configurePool; - const clientConfig = convertRegistryConfigToClientConfig(config); - const dcsRes = await getDatacenters(clientConfig); - const poolName = customConfig.name ?? "default"; - const headers = { - ...(config.token ? { "x-rivet-token": config.token } : {}), - ...(customConfig.headers ?? {}), - }; - const serverlessConfig = { - serverless: { - url: customConfig.url, - headers, - request_lifespan: customConfig.requestLifespan ?? 15 * 60, - drain_grace_period: customConfig.drainGracePeriod, - metadata_poll_interval: - customConfig.metadataPollInterval ?? 1000, - max_runners: 100_000, - min_runners: 0, - runners_margin: 0, - slots_per_runner: 1, - }, - metadata: customConfig.metadata ?? {}, - drain_on_version_upgrade: - customConfig.drainOnVersionUpgrade ?? true, - }; + while (Date.now() - startedAt <= timeoutMs) { + attempts += 1; + try { + if (!config.namespace) { + throw new Error("namespace is required for serverless configuration"); + } + if (!config.endpoint) { + throw new Error("endpoint is required for serverless configuration"); + } + if (!config.configurePool) { + throw new Error("configurePool is required for serverless configuration"); + } - await updateRunnerConfig(clientConfig, poolName, { - datacenters: Object.fromEntries( - dcsRes.datacenters.map((dc) => [dc.name, serverlessConfig]), - ), - }); + const customConfig = config.configurePool; + const clientConfig = convertRegistryConfigToClientConfig(config); + const dcsRes = await getDatacenters(clientConfig); + const poolName = customConfig.name ?? "default"; + const serverlessToken = config.token ?? config.publicToken; + const headers = { + ...(serverlessToken ? { "x-rivet-token": serverlessToken } : {}), + ...(customConfig.headers ?? {}), + }; + const serverlessConfig = { + serverless: { + url: customConfig.url, + headers, + request_lifespan: customConfig.requestLifespan ?? 15 * 60, + drain_grace_period: customConfig.drainGracePeriod, + metadata_poll_interval: + customConfig.metadataPollInterval ?? 1000, + max_runners: 100_000, + min_runners: 0, + runners_margin: 0, + slots_per_runner: 1, + }, + metadata: customConfig.metadata ?? {}, + drain_on_version_upgrade: + customConfig.drainOnVersionUpgrade ?? true, + }; - logger().info({ - msg: "serverless pool configured successfully", - poolName, - namespace: config.namespace, - }); - } catch (error) { - logger().error({ - msg: "failed to configure serverless pool, validate endpoint is configured correctly then restart this process", - error, - }); + await updateRunnerConfig(clientConfig, poolName, { + datacenters: Object.fromEntries( + dcsRes.datacenters.map((dc) => [dc.name, serverlessConfig]), + ), + }); + + logger().info({ + msg: "serverless pool configured successfully", + poolName, + namespace: config.namespace, + attempts, + }); + return; + } catch (error) { + lastError = error; + logger().warn({ + msg: "serverless pool configuration attempt failed", + attempts, + error: stringifyError(error), + }); + await sleep(CONFIGURE_RETRY_DELAY_MS); + } } + + logger().error({ + msg: "failed to configure serverless pool, validate endpoint is configured correctly then restart this process", + attempts, + error: stringifyError(lastError), + }); + throw lastError; }