diff --git a/engine/sdks/rust/envoy-client/src/actor.rs b/engine/sdks/rust/envoy-client/src/actor.rs index c214d94c96..6c2583e6b8 100644 --- a/engine/sdks/rust/envoy-client/src/actor.rs +++ b/engine/sdks/rust/envoy-client/src/actor.rs @@ -1652,6 +1652,7 @@ mod tests { envoy_key: "test-envoy".to_string(), envoy_tx, actors: Arc::new(std::sync::Mutex::new(HashMap::new())), + actors_notify: Arc::new(tokio::sync::Notify::new()), live_tunnel_requests: Arc::new(std::sync::Mutex::new(HashMap::new())), pending_hibernation_restores: Arc::new(std::sync::Mutex::new(HashMap::new())), ws_tx: Arc::new(tokio::sync::Mutex::new( diff --git a/engine/sdks/rust/envoy-client/src/context.rs b/engine/sdks/rust/envoy-client/src/context.rs index 141ac66c64..b3363daace 100644 --- a/engine/sdks/rust/envoy-client/src/context.rs +++ b/engine/sdks/rust/envoy-client/src/context.rs @@ -5,8 +5,9 @@ use std::sync::atomic::AtomicBool; use crate::async_counter::AsyncCounter; use rivet_envoy_protocol as protocol; -use tokio::sync::Mutex; use tokio::sync::mpsc; +use tokio::sync::Mutex; +use tokio::sync::Notify; use tokio::sync::watch; use crate::actor::ToActor; @@ -24,6 +25,7 @@ pub struct SharedContext { pub envoy_key: String, pub envoy_tx: mpsc::UnboundedSender, pub actors: Arc>>>, + pub actors_notify: Arc, pub live_tunnel_requests: Arc>>, pub pending_hibernation_restores: Arc>>>, diff --git a/engine/sdks/rust/envoy-client/src/envoy.rs b/engine/sdks/rust/envoy-client/src/envoy.rs index af4fbc6fab..ba040d722e 100644 --- a/engine/sdks/rust/envoy-client/src/envoy.rs +++ b/engine/sdks/rust/envoy-client/src/envoy.rs @@ -183,6 +183,8 @@ impl EnvoyContext { }, ); + self.shared.actors_notify.notify_waiters(); + if let Some(messages) = self.buffered_actor_messages.remove(&buffered_actor_id) { for message in messages { match message { @@ -216,6 +218,7 @@ impl EnvoyContext { shared.remove(actor_id); } } + self.shared.actors_notify.notify_waiters(); } pub fn get_actor(&self, actor_id: &str, generation: Option) -> Option<&ActorEntry> { @@ -297,6 +300,7 @@ fn start_envoy_sync_inner(config: EnvoyConfig) -> EnvoyHandle { envoy_key, envoy_tx: envoy_tx.clone(), actors: Arc::new(std::sync::Mutex::new(HashMap::new())), + actors_notify: Arc::new(tokio::sync::Notify::new()), live_tunnel_requests: Arc::new(std::sync::Mutex::new(HashMap::new())), pending_hibernation_restores: Arc::new(std::sync::Mutex::new(HashMap::new())), ws_tx: Arc::new(tokio::sync::Mutex::new(None)), diff --git a/engine/sdks/rust/envoy-client/src/events.rs b/engine/sdks/rust/envoy-client/src/events.rs index cfdcd8118e..bc7c034c94 100644 --- a/engine/sdks/rust/envoy-client/src/events.rs +++ b/engine/sdks/rust/envoy-client/src/events.rs @@ -164,6 +164,7 @@ mod tests { envoy_key: "test-envoy".to_string(), envoy_tx, actors: Arc::new(std::sync::Mutex::new(HashMap::new())), + actors_notify: Arc::new(tokio::sync::Notify::new()), live_tunnel_requests: Arc::new(std::sync::Mutex::new(HashMap::new())), pending_hibernation_restores: Arc::new(std::sync::Mutex::new(HashMap::new())), ws_tx: Arc::new(tokio::sync::Mutex::new( diff --git a/engine/sdks/rust/envoy-client/src/handle.rs b/engine/sdks/rust/envoy-client/src/handle.rs index 6d3633bd75..86bde62913 100644 --- a/engine/sdks/rust/envoy-client/src/handle.rs +++ b/engine/sdks/rust/envoy-client/src/handle.rs @@ -17,6 +17,12 @@ pub struct EnvoyHandle { pub(crate) started_rx: tokio::sync::watch::Receiver<()>, } +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ServerlessActorStart { + pub actor_id: String, + pub generation: u32, +} + impl EnvoyHandle { #[doc(hidden)] pub fn from_shared(shared: Arc) -> Self { @@ -85,6 +91,23 @@ impl EnvoyHandle { &self.shared.config.namespace } + pub fn active_actor_count(&self) -> usize { + let guard = self + .shared + .actors + .lock() + .expect("shared actor registry poisoned"); + guard + .values() + .map(|generations| { + generations + .values() + .filter(|actor| !actor.handle.is_closed()) + .count() + }) + .sum() + } + pub fn pool_name(&self) -> &str { &self.shared.config.pool_name } @@ -138,6 +161,40 @@ impl EnvoyHandle { rx.await.ok().flatten() } + pub async fn wait_actor_registered_then_stopped(&self, actor_id: &str, generation: u32) { + let mut registered = false; + loop { + let notified = self.shared.actors_notify.notified(); + if self.is_stopped() { + return; + } + + let actor_is_registered = { + let guard = self + .shared + .actors + .lock() + .expect("shared actor registry poisoned"); + guard + .get(actor_id) + .and_then(|generations| generations.get(&generation)) + .is_some() + }; + + if registered && !actor_is_registered { + return; + } + if actor_is_registered { + registered = true; + } + + tokio::select! { + _ = notified => {} + _ = self.wait_stopped() => return, + } + } + } + pub fn http_request_counter( &self, actor_id: &str, @@ -484,6 +541,35 @@ impl EnvoyHandle { /// Inject a serverless start payload into the envoy. /// The payload is a u16 LE protocol version followed by a serialized ToEnvoy message. pub async fn start_serverless_actor(&self, payload: &[u8]) -> anyhow::Result<()> { + let (message, _) = decode_serverless_actor_start_payload(payload)?; + + // Wait for envoy to be started before injecting + self.started().await?; + + tracing::debug!( + data = crate::stringify::stringify_to_envoy(&message), + "received serverless start" + ); + self.shared + .envoy_tx + .send(ToEnvoyMessage::ConnMessage { message }) + .map_err(|_| anyhow::anyhow!("envoy channel closed"))?; + + Ok(()) + } + + pub fn decode_serverless_actor_start( + &self, + payload: &[u8], + ) -> anyhow::Result { + let (_, actor_start) = decode_serverless_actor_start_payload(payload)?; + Ok(actor_start) + } +} + +fn decode_serverless_actor_start_payload( + payload: &[u8], +) -> anyhow::Result<(protocol::ToEnvoy, ServerlessActorStart)> { use vbare::OwnedVersionedData; if payload.len() < 2 { @@ -524,21 +610,15 @@ impl EnvoyHandle { anyhow::bail!("invalid serverless payload: expected CommandStartActor"); } - // Wait for envoy to be started before injecting - self.started().await?; - - tracing::debug!( - data = crate::stringify::stringify_to_envoy(&message), - "received serverless start" - ); - self.shared - .envoy_tx - .send(ToEnvoyMessage::ConnMessage { message }) - .map_err(|_| anyhow::anyhow!("envoy channel closed"))?; + let actor_start = ServerlessActorStart { + actor_id: commands[0].checkpoint.actor_id.clone(), + generation: commands[0].checkpoint.generation, + }; - Ok(()) + Ok((message, actor_start)) } +impl EnvoyHandle { async fn send_kv_request( &self, actor_id: String, diff --git a/engine/sdks/rust/envoy-client/src/sqlite.rs b/engine/sdks/rust/envoy-client/src/sqlite.rs index 8dfa06628a..26e92623d5 100644 --- a/engine/sdks/rust/envoy-client/src/sqlite.rs +++ b/engine/sdks/rust/envoy-client/src/sqlite.rs @@ -463,6 +463,7 @@ mod tests { envoy_key: "test-envoy".to_string(), envoy_tx, actors: Arc::new(std::sync::Mutex::new(HashMap::new())), + actors_notify: Arc::new(tokio::sync::Notify::new()), live_tunnel_requests: Arc::new(std::sync::Mutex::new(HashMap::new())), pending_hibernation_restores: Arc::new(std::sync::Mutex::new(HashMap::new())), ws_tx: Arc::new(tokio::sync::Mutex::new( diff --git a/examples/kitchen-sink/src/server.ts b/examples/kitchen-sink/src/server.ts index c70fe93bec..cc9bc0aae5 100644 --- a/examples/kitchen-sink/src/server.ts +++ b/examples/kitchen-sink/src/server.ts @@ -1,20 +1,134 @@ import { registry } from "./index.ts"; import { serve } from "@hono/node-server"; import { Hono } from "hono"; +import type { Server as HttpServer } from "node:http"; +import * as v8 from "node:v8"; const app = new Hono(); +const port = Number.parseInt(process.env.PORT ?? "3000", 10); + +process.on("exit", (code) => { + console.log(JSON.stringify({ kind: "process_exit", code, pid: process.pid })); +}); +if (process.env.SQLITE_MEMORY_SOAK_DIAGNOSTICS === "1") { + for (const signal of ["SIGINT", "SIGTERM"] as const) { + process.on(signal, () => { + console.log( + JSON.stringify({ + kind: "process_signal", + signal, + pid: process.pid, + ppid: process.ppid, + timestamp: new Date().toISOString(), + }), + ); + process.exit(signal === "SIGINT" ? 130 : 143); + }); + } +} +process.on("beforeExit", (code) => { + console.log(JSON.stringify({ kind: "process_before_exit", code, pid: process.pid })); +}); +process.on("uncaughtException", (error) => { + console.error( + JSON.stringify({ + kind: "uncaught_exception", + error: error.stack ?? error.message, + }), + ); +}); +process.on("unhandledRejection", (reason) => { + console.error( + JSON.stringify({ + kind: "unhandled_rejection", + error: reason instanceof Error ? reason.stack ?? reason.message : String(reason), + }), + ); +}); + +async function memoryBreakdown(forceGc: boolean) { + const gc = (globalThis as typeof globalThis & { gc?: () => void }).gc; + if (forceGc && typeof gc === "function") gc(); + + const memory = process.memoryUsage(); + const heap = v8.getHeapStatistics(); + const spaces = v8.getHeapSpaceStatistics(); + const nativeNonV8Estimate = Math.max(0, memory.rss - heap.total_heap_size); + const registryDiagnostics = await registry.diagnostics().catch((error: unknown) => ({ + error: error instanceof Error ? error.message : String(error), + })); + + return { + pid: process.pid, + timestamp: new Date().toISOString(), + uptimeSeconds: process.uptime(), + gcRequested: forceGc, + gcAvailable: typeof gc === "function", + process: { + rssBytes: memory.rss, + heapTotalBytes: memory.heapTotal, + heapUsedBytes: memory.heapUsed, + externalBytes: memory.external, + arrayBuffersBytes: memory.arrayBuffers, + }, + v8: { + totalHeapSizeBytes: heap.total_heap_size, + usedHeapSizeBytes: heap.used_heap_size, + heapSizeLimitBytes: heap.heap_size_limit, + mallocedMemoryBytes: heap.malloced_memory, + externalMemoryBytes: heap.external_memory, + peakMallocedMemoryBytes: heap.peak_malloced_memory, + spaces: spaces.map((space) => ({ + name: space.space_name, + sizeBytes: space.space_size, + usedBytes: space.space_used_size, + availableBytes: space.space_available_size, + physicalSizeBytes: space.physical_space_size, + })), + }, + estimates: { + jsHeapResidentBytes: memory.heapTotal, + jsHeapUsedBytes: memory.heapUsed, + v8ExternalBytes: memory.external, + nativeNonV8ResidentEstimateBytes: nativeNonV8Estimate, + }, + registry: registryDiagnostics, + resourceUsage: process.resourceUsage(), + }; +} function requestHeaders(headers: Headers) { - return Object.fromEntries( - Array.from(headers.entries()).map(([key, value]) => [ + const entries: Array<[string, string]> = []; + headers.forEach((value, key) => { + entries.push([ key, key === "authorization" || key === "x-rivet-token" ? "" : value, - ]), - ); + ]); + }); + return Object.fromEntries(entries); } +app.get("/debug/memory", async (c) => { + const forceGc = c.req.query("gc") === "1"; + return c.json(await memoryBreakdown(forceGc)); +}); + +app.post("/debug/heap-snapshot", (c) => { + if (process.env.SQLITE_MEMORY_SOAK_DIAGNOSTICS !== "1") { + return c.json({ error: "disabled" }, 404); + } + + const path = c.req.query("path"); + if (!path) { + return c.json({ error: "missing path" }, 400); + } + + const writtenPath = v8.writeHeapSnapshot(path); + return c.json({ path: writtenPath }); +}); + app.use("*", async (c, next) => { const startedAt = Date.now(); await next(); @@ -33,8 +147,13 @@ app.use("*", async (c, next) => { app.all("/api/rivet/*", (c) => registry.handler(c.req.raw)); app.all("/api/rivet", (c) => registry.handler(c.req.raw)); -serve({ fetch: app.fetch, port: 3000 }, () => { +const server = serve({ fetch: app.fetch, port }, () => { console.log( - "serverless RivetKit listening on http://127.0.0.1:3000/api/rivet", + `serverless RivetKit listening on http://127.0.0.1:${port}/api/rivet`, ); }); +const httpServer = server as unknown as HttpServer; +httpServer.requestTimeout = 0; +httpServer.headersTimeout = 0; +httpServer.keepAliveTimeout = 0; +httpServer.timeout = 0; diff --git a/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs b/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs index e41c7c7634..0c4cb1b03a 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs @@ -970,7 +970,13 @@ impl ActorTask { ); self.log_dispatch_command_handled(command_kind, "enqueued"); let actor_id = self.ctx.actor_id().to_owned(); + let action_keep_awake = self + .ctx + .internal_keep_awake_region() + .with_log_fields("dispatch_action", Some(actor_id.clone())); + self.ctx.reset_sleep_timer(); self.ctx.wait_until(async move { + let _action_keep_awake = action_keep_awake; match tracked_reply_rx.await { Ok(result) => { tracing::info!( @@ -1791,6 +1797,7 @@ impl ActorTask { .cleanup() .await .with_context(|| format!("cleanup sqlite during {reason_label} shutdown"))?; + trim_native_allocator_after_shutdown(&actor_id, reason_label); tracing::debug!( actor_id = %actor_id, reason = reason_label, @@ -2164,6 +2171,24 @@ fn shutdown_reason_label(reason: ShutdownKind) -> &'static str { } } +#[cfg(all(unix, target_env = "gnu"))] +fn trim_native_allocator_after_shutdown(actor_id: &str, reason: &str) { + unsafe extern "C" { + fn malloc_trim(pad: usize) -> i32; + } + + let rc = unsafe { malloc_trim(0) }; + tracing::debug!( + actor_id, + reason, + rc, + "trimmed native allocator after actor shutdown" + ); +} + +#[cfg(not(all(unix, target_env = "gnu")))] +fn trim_native_allocator_after_shutdown(_actor_id: &str, _reason: &str) {} + fn clone_shutdown_result(result: &Result<()>) -> Result<()> { match result { Ok(()) => Ok(()), diff --git a/rivetkit-rust/packages/rivetkit-core/src/serverless.rs b/rivetkit-rust/packages/rivetkit-core/src/serverless.rs index 2032dbcd8e..f699a65c19 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/serverless.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/serverless.rs @@ -27,6 +27,8 @@ use crate::time::{sleep, timeout}; const DEFAULT_BASE_PATH: &str = "/api/rivet"; const SSE_PING_INTERVAL: Duration = Duration::from_secs(1); +const SSE_PING_FRAME: &[u8] = b"event: ping\ndata:\n\n"; +const SSE_STOPPING_FRAME: &[u8] = b"event: stopping\ndata:\n\n"; /// Bound on `handle.shutdown_and_wait` inside teardown paths. If envoy cannot /// reach the engine (reconnect loop stuck), we fall back to immediate `Stop` /// rather than hanging indefinitely. Must stay below the outer TS grace ceiling. @@ -216,6 +218,14 @@ impl CoreServerlessRuntime { } } + pub async fn active_envoy_actor_count(&self) -> Option { + self.envoy + .lock() + .await + .as_ref() + .map(EnvoyHandle::active_actor_count) + } + pub async fn handle_request(&self, req: ServerlessRequest) -> ServerlessResponse { let cors = cors_headers(&req); match self.handle_request_inner(req).await { @@ -275,10 +285,11 @@ impl CoreServerlessRuntime { let handle = self.ensure_envoy(&headers).await?; let payload = req.body; + let actor_start = handle.decode_serverless_actor_start(&payload)?; let cancel_token = req.cancel_token; let cache_envoy = self.settings.cache_envoy; let (tx, rx) = mpsc::channel(16); - let _ = tx.try_send(Ok(b"event: ping\ndata:\n\n".to_vec())); + let _ = tx.try_send(Ok(SSE_PING_FRAME.to_vec())); RuntimeSpawner::spawn(async move { let shutdown_handle = handle.clone(); @@ -305,8 +316,12 @@ impl CoreServerlessRuntime { _ = cancel_token.cancelled() => { break; } + _ = handle.wait_actor_registered_then_stopped(&actor_start.actor_id, actor_start.generation) => { + let _ = tx.send(Ok(SSE_STOPPING_FRAME.to_vec())).await; + break; + } _ = sleep(SSE_PING_INTERVAL) => { - if tx.send(Ok(b"event: ping\ndata:\n\n".to_vec())).await.is_err() { + if tx.send(Ok(SSE_PING_FRAME.to_vec())).await.is_err() { break; } } diff --git a/rivetkit-rust/packages/rivetkit-core/tests/integration/counter.rs b/rivetkit-rust/packages/rivetkit-core/tests/integration/counter.rs index aaeea242dc..97e87e4a50 100644 --- a/rivetkit-rust/packages/rivetkit-core/tests/integration/counter.rs +++ b/rivetkit-rust/packages/rivetkit-core/tests/integration/counter.rs @@ -97,6 +97,7 @@ fn counter_factory() -> ActorFactory { } ActorEvent::WebSocketOpen { ws: _, + conn: _, request: _, reply, } => { diff --git a/rivetkit-rust/packages/rivetkit-core/tests/task.rs b/rivetkit-rust/packages/rivetkit-core/tests/task.rs index 4fc1b0c773..697f2f43ca 100644 --- a/rivetkit-rust/packages/rivetkit-core/tests/task.rs +++ b/rivetkit-rust/packages/rivetkit-core/tests/task.rs @@ -34,6 +34,7 @@ mod moved_tests { use crate::actor::keys::{LAST_PUSHED_ALARM_KEY, PERSIST_DATA_KEY, make_connection_key}; use crate::actor::messages::{ActorEvent, SerializeStateReason, StateDelta}; use crate::actor::preload::PreloadedPersistedActor; + use crate::actor::sleep::CanSleep; use crate::actor::state::{ PersistedActor, PersistedScheduleEvent, RequestSaveOpts, decode_last_pushed_alarm, decode_persisted_actor, encode_last_pushed_alarm, encode_persisted_actor, @@ -1516,6 +1517,107 @@ mod moved_tests { .expect("destroy stop should succeed"); } + #[tokio::test] + async fn action_dispatch_blocks_idle_sleep_until_reply() { + let ctx = new_with_kv( + "actor-action-keep-awake", + "task-action-keep-awake", + Vec::new(), + "local", + new_in_memory(), + ); + let (action_started_tx, action_started_rx) = oneshot::channel(); + let action_started_tx = Arc::new(Mutex::new(Some(action_started_tx))); + let (release_action_tx, release_action_rx) = oneshot::channel(); + let release_action_rx = Arc::new(Mutex::new(Some(release_action_rx))); + let factory = Arc::new(ActorFactory::new(Default::default(), { + let action_started_tx = action_started_tx.clone(); + let release_action_rx = release_action_rx.clone(); + move |start| { + let action_started_tx = action_started_tx.clone(); + let release_action_rx = release_action_rx.clone(); + Box::pin(async move { + let mut events = start.events; + while let Some(event) = events.recv().await { + match event { + ActorEvent::Action { reply, .. } => { + if let Some(tx) = action_started_tx + .lock() + .expect("action started sender lock poisoned") + .take() + { + let _ = tx.send(()); + } + let release_rx = release_action_rx + .lock() + .expect("release action receiver lock poisoned") + .take() + .expect("release action receiver should exist"); + let _ = release_rx.await; + reply.send(Ok(vec![9])); + } + ActorEvent::BeginSleep => {} + ActorEvent::FinalizeSleep { reply } | ActorEvent::Destroy { reply } => { + reply.send(Ok(())); + break; + } + _ => {} + } + } + Ok(()) + }) + } + })); + + let mut task = new_task_with_factory(ctx.clone(), factory); + let (start_tx, start_rx) = oneshot::channel(); + task.handle_lifecycle(LifecycleCommand::Start { reply: start_tx }) + .await; + start_rx + .await + .expect("start reply should send") + .expect("start should succeed"); + + let client_conn = ConnHandle::new("conn-keep-awake", Vec::new(), Vec::new(), false); + let (reply_tx, reply_rx) = oneshot::channel(); + task.handle_dispatch(DispatchCommand::Action { + name: "slow-action".to_owned(), + args: Vec::new(), + conn: client_conn, + reply: reply_tx, + }) + .await; + action_started_rx + .await + .expect("action should start before sleep assertion"); + + assert_eq!(ctx.internal_keep_awake_count(), 1); + assert_eq!(ctx.can_sleep().await, CanSleep::ActiveInternalKeepAwake); + release_action_tx + .send(()) + .expect("action should still be waiting"); + + assert_eq!( + reply_rx + .await + .expect("action reply should send") + .expect("action should succeed"), + vec![9] + ); + for _ in 0..10 { + if ctx.internal_keep_awake_count() == 0 { + break; + } + yield_now().await; + } + assert_eq!(ctx.internal_keep_awake_count(), 0); + assert_eq!(ctx.can_sleep().await, CanSleep::Yes); + + task.handle_stop(ShutdownKind::Destroy) + .await + .expect("destroy stop should succeed"); + } + #[tokio::test] async fn wake_start_hibernated_does_not_refire_connection_open() { let kv = new_in_memory(); diff --git a/rivetkit-typescript/packages/rivetkit-napi/index.d.ts b/rivetkit-typescript/packages/rivetkit-napi/index.d.ts index f55f40b2ec..f02ed6f682 100644 --- a/rivetkit-typescript/packages/rivetkit-napi/index.d.ts +++ b/rivetkit-typescript/packages/rivetkit-napi/index.d.ts @@ -99,6 +99,19 @@ export interface NativeExecuteResult { changes: number lastInsertRowId?: number } +export interface JsSqliteVfsMetrics { + requestBuildNs: number + serializeNs: number + transportNs: number + stateUpdateNs: number + totalNs: number + commitCount: number + pageCacheEntries: number + pageCacheWeightedSize: number + pageCacheCapacityPages: number + writeBufferDirtyPages: number + dbSizePages: number +} export interface JsQueueNextOptions { names?: Array timeoutMs?: number @@ -161,6 +174,10 @@ export interface JsServerlessResponseHead { status: number headers: Record } +export interface JsRegistryDiagnostics { + mode: string + envoyActiveActorCount?: number +} export interface JsServerlessStreamError { group: string code: string @@ -245,6 +262,7 @@ export declare class ConnHandle { } export declare class JsNativeDatabase { takeLastKvError(): string | null + metrics(): JsSqliteVfsMetrics | null run(sql: string, params?: Array | undefined | null): Promise query(sql: string, params?: Array | undefined | null): Promise execute(sql: string, params?: Array | undefined | null): Promise @@ -294,6 +312,7 @@ export declare class CoreRegistry { * separately to avoid re-entrancy. */ shutdown(): Promise + diagnostics(): Promise handleServerlessRequest(req: JsServerlessRequest, onStreamEvent: (...args: any[]) => any, cancelToken: CancellationToken, config: JsServeConfig): Promise } export declare class Schedule { diff --git a/rivetkit-typescript/packages/rivetkit-napi/src/database.rs b/rivetkit-typescript/packages/rivetkit-napi/src/database.rs index 94ef744b3d..19fa5c3e49 100644 --- a/rivetkit-typescript/packages/rivetkit-napi/src/database.rs +++ b/rivetkit-typescript/packages/rivetkit-napi/src/database.rs @@ -62,6 +62,21 @@ pub struct NativeExecuteResult { pub last_insert_row_id: Option, } +#[napi(object)] +pub struct JsSqliteVfsMetrics { + pub request_build_ns: f64, + pub serialize_ns: f64, + pub transport_ns: f64, + pub state_update_ns: f64, + pub total_ns: f64, + pub commit_count: f64, + pub page_cache_entries: f64, + pub page_cache_weighted_size: f64, + pub page_cache_capacity_pages: f64, + pub write_buffer_dirty_pages: f64, + pub db_size_pages: f64, +} + #[napi] impl JsNativeDatabase { #[napi] @@ -69,6 +84,23 @@ impl JsNativeDatabase { self.db.take_last_kv_error() } + #[napi] + pub fn metrics(&self) -> Option { + self.db.metrics().map(|metrics| JsSqliteVfsMetrics { + request_build_ns: metrics.request_build_ns as f64, + serialize_ns: metrics.serialize_ns as f64, + transport_ns: metrics.transport_ns as f64, + state_update_ns: metrics.state_update_ns as f64, + total_ns: metrics.total_ns as f64, + commit_count: metrics.commit_count as f64, + page_cache_entries: 0.0, + page_cache_weighted_size: 0.0, + page_cache_capacity_pages: 0.0, + write_buffer_dirty_pages: 0.0, + db_size_pages: 0.0, + }) + } + #[napi] pub async fn run( &self, diff --git a/rivetkit-typescript/packages/rivetkit-napi/src/registry.rs b/rivetkit-typescript/packages/rivetkit-napi/src/registry.rs index c3fe026770..5f720d3232 100644 --- a/rivetkit-typescript/packages/rivetkit-napi/src/registry.rs +++ b/rivetkit-typescript/packages/rivetkit-napi/src/registry.rs @@ -49,6 +49,12 @@ pub struct JsServerlessResponseHead { pub headers: HashMap, } +#[napi(object)] +pub struct JsRegistryDiagnostics { + pub mode: String, + pub envoy_active_actor_count: Option, +} + #[napi(object)] #[derive(Clone)] pub struct JsServerlessStreamError { @@ -235,6 +241,41 @@ impl CoreRegistry { Ok(()) } + #[napi] + pub async fn diagnostics(&self) -> napi::Result { + let guard = self.state.lock().await; + let diagnostics = match &*guard { + RegistryState::Registering(_) => JsRegistryDiagnostics { + mode: "registering".to_owned(), + envoy_active_actor_count: None, + }, + RegistryState::BuildingServerless => JsRegistryDiagnostics { + mode: "building_serverless".to_owned(), + envoy_active_actor_count: None, + }, + RegistryState::Serving => JsRegistryDiagnostics { + mode: "serving".to_owned(), + envoy_active_actor_count: None, + }, + RegistryState::Serverless(runtime) => JsRegistryDiagnostics { + mode: "serverless".to_owned(), + envoy_active_actor_count: runtime + .active_envoy_actor_count() + .await + .map(|count| count as u32), + }, + RegistryState::ShuttingDown => JsRegistryDiagnostics { + mode: "shutting_down".to_owned(), + envoy_active_actor_count: None, + }, + RegistryState::ShutDown => JsRegistryDiagnostics { + mode: "shut_down".to_owned(), + envoy_active_actor_count: None, + }, + }; + Ok(diagnostics) + } + #[napi(ts_return_type = "Promise")] pub fn handle_serverless_request( &self, diff --git a/rivetkit-typescript/packages/rivetkit/src/common/database/config.ts b/rivetkit-typescript/packages/rivetkit/src/common/database/config.ts index 5d44a91437..8fc8cb6895 100644 --- a/rivetkit-typescript/packages/rivetkit/src/common/database/config.ts +++ b/rivetkit-typescript/packages/rivetkit/src/common/database/config.ts @@ -23,6 +23,20 @@ export interface SqliteExecuteResult extends SqliteQueryResult { lastInsertRowId?: number | null; } +export interface SqliteNativeMetrics { + requestBuildNs: number; + serializeNs: number; + transportNs: number; + stateUpdateNs: number; + totalNs: number; + commitCount: number; + pageCacheEntries: number; + pageCacheWeightedSize: number; + pageCacheCapacityPages: number; + writeBufferDirtyPages: number; + dbSizePages: number; +} + export interface SqliteDatabase { exec( sql: string, @@ -35,6 +49,10 @@ export interface SqliteDatabase { run(sql: string, params?: SqliteBindings): Promise; query(sql: string, params?: SqliteBindings): Promise; writeMode(callback: () => Promise): Promise; + nativeMetrics?(): + | SqliteNativeMetrics + | Promise + | null; close(): Promise; } @@ -132,6 +150,13 @@ export type RawAccess = { * Executes a raw SQL query. */ execute: ExecuteFunction; + /** + * Returns native SQLite metrics when the active runtime supports them. + */ + nativeMetrics?: () => + | SqliteNativeMetrics + | Promise + | null; /** * Closes the database connection and releases resources. */ diff --git a/rivetkit-typescript/packages/rivetkit/src/common/database/mod.ts b/rivetkit-typescript/packages/rivetkit/src/common/database/mod.ts index 968f1e6c58..495eaf9f4f 100644 --- a/rivetkit-typescript/packages/rivetkit/src/common/database/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/common/database/mod.ts @@ -102,6 +102,7 @@ export function db({ await db.close(); } }, + nativeMetrics: () => db.nativeMetrics?.() ?? null, __rivetWriteMode: async ( callback: () => Promise | T, ): Promise => { diff --git a/rivetkit-typescript/packages/rivetkit/src/common/database/native-database.ts b/rivetkit-typescript/packages/rivetkit/src/common/database/native-database.ts index c0b90b8f50..7b187640de 100644 --- a/rivetkit-typescript/packages/rivetkit/src/common/database/native-database.ts +++ b/rivetkit-typescript/packages/rivetkit/src/common/database/native-database.ts @@ -3,6 +3,7 @@ import type { SqliteBindings, SqliteDatabase, SqliteExecuteResult, + SqliteNativeMetrics, } from "./config"; type NativeBindNoValues = { @@ -78,6 +79,7 @@ export interface JsNativeDatabaseLike { sql: string, params?: NativeBindParam[] | null, ): Promise; + metrics?(): SqliteNativeMetrics | null; takeLastKvError?(): string | null; close(): Promise; } @@ -201,6 +203,45 @@ function toNativeBindings( }); } +function normalizeExecuteRoute(route: string): SqliteExecuteResult["route"] { + if (route === "read" || route === "write" || route === "writeFallback") { + return route; + } + throw new Error(`unsupported sqlite execute route: ${route}`); +} + +function normalizeNativeMetrics( + metrics: SqliteNativeMetrics | null | undefined, +): SqliteNativeMetrics | null { + if (!metrics) return null; + const raw = metrics as unknown as Record; + const numberField = (camel: string, snake: string) => + Number(raw[camel] ?? raw[snake] ?? 0); + + return { + requestBuildNs: numberField("requestBuildNs", "request_build_ns"), + serializeNs: numberField("serializeNs", "serialize_ns"), + transportNs: numberField("transportNs", "transport_ns"), + stateUpdateNs: numberField("stateUpdateNs", "state_update_ns"), + totalNs: numberField("totalNs", "total_ns"), + commitCount: numberField("commitCount", "commit_count"), + pageCacheEntries: numberField("pageCacheEntries", "page_cache_entries"), + pageCacheWeightedSize: numberField( + "pageCacheWeightedSize", + "page_cache_weighted_size", + ), + pageCacheCapacityPages: numberField( + "pageCacheCapacityPages", + "page_cache_capacity_pages", + ), + writeBufferDirtyPages: numberField( + "writeBufferDirtyPages", + "write_buffer_dirty_pages", + ), + dbSizePages: numberField("dbSizePages", "db_size_pages"), + }; +} + class NativeCloseGate { #active = 0; #closed = false; @@ -315,6 +356,9 @@ export function wrapJsNativeDatabase( async writeMode(callback: () => Promise): Promise { return await callback(); }, + nativeMetrics(): SqliteNativeMetrics | null { + return normalizeNativeMetrics(database.metrics?.()); + }, async close(): Promise { closePromise ??= gate.close(() => database.close()); await closePromise; diff --git a/rivetkit-typescript/packages/rivetkit/src/db/mod.ts b/rivetkit-typescript/packages/rivetkit/src/db/mod.ts index 5ddad3ad3e..95b1caaa08 100644 --- a/rivetkit-typescript/packages/rivetkit/src/db/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/db/mod.ts @@ -7,5 +7,6 @@ export type { RawDatabaseClient, SqliteBindings, SqliteDatabase, + SqliteNativeMetrics, SqliteQueryResult, } from "@/common/database/config"; diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/index.ts b/rivetkit-typescript/packages/rivetkit/src/registry/index.ts index 3a64d4f916..20fecb02a9 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/index.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/index.ts @@ -22,6 +22,11 @@ export interface ServerlessHandler { fetch: FetchHandler; } +export interface RegistryDiagnostics { + mode: string; + envoyActiveActorCount?: number | null; +} + export class Registry { #config: RegistryConfigInput; @@ -34,6 +39,7 @@ export class Registry { } #runtimeServePromise?: Promise; + #runtimeServeConfiguredPromise?: ReturnType; #runtimeServerlessPromise?: ReturnType; #configureServerlessPoolPromise?: Promise; #welcomePrinted = false; @@ -216,12 +222,30 @@ export class Registry { }; } + public async diagnostics(): Promise { + const candidates = [ + this.#runtimeServerlessPromise, + this.#runtimeServeConfiguredPromise, + ].filter((candidate): candidate is ReturnType => + candidate !== undefined + ); + + for (const candidate of candidates) { + const { runtime, registry } = await candidate; + const diagnostics = await runtime.registryDiagnostics?.(registry); + if (diagnostics) return diagnostics; + } + + return { mode: "not_started", envoyActiveActorCount: null }; + } + /** * Starts an actor envoy for standalone server deployments. */ #startEnvoy(config: RegistryConfig, printWelcome: boolean) { if (!this.#runtimeServePromise) { const configuredRegistryPromise = buildConfiguredRegistry(config); + this.#runtimeServeConfiguredPromise = configuredRegistryPromise; this.#runtimeServePromise = configuredRegistryPromise .then(async ({ runtime, registry, serveConfig }) => { await runtime.serveRegistry(registry, serveConfig); diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/napi-runtime.ts b/rivetkit-typescript/packages/rivetkit/src/registry/napi-runtime.ts index e179ea32ae..1fcfd25a39 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/napi-runtime.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/napi-runtime.ts @@ -24,6 +24,7 @@ import type { RuntimeQueueTryNextBatchOptions, RuntimeQueueWaitOptions, RuntimeRequestSaveOpts, + RuntimeRegistryDiagnostics, RuntimeServeConfig, RuntimeServerlessRequest, RuntimeServerlessResponseHead, @@ -201,6 +202,16 @@ export class NapiCoreRuntime implements CoreRuntime { await asNativeRegistry(registry).shutdown(); } + async registryDiagnostics( + registry: RegistryHandle, + ): Promise { + const diagnostics = await asNativeRegistry(registry).diagnostics(); + return { + mode: diagnostics.mode, + envoyActiveActorCount: diagnostics.envoyActiveActorCount, + }; + } + async handleServerlessRequest( registry: RegistryHandle, req: RuntimeServerlessRequest, @@ -421,6 +432,10 @@ export class NapiCoreRuntime implements CoreRuntime { return asNativeActorContext(ctx).runtimeState(); } + actorClearRuntimeState(ctx: ActorContextHandle): void { + asNativeActorContext(ctx).clearRuntimeState(); + } + actorRestartRunHandler(ctx: ActorContextHandle): void { asNativeActorContext(ctx).restartRunHandler(); } @@ -553,6 +568,10 @@ export class NapiCoreRuntime implements CoreRuntime { return await this.#actorSql(ctx).run(sql, toNapiSqlBindParams(params)); } + actorSqlMetrics(ctx: ActorContextHandle) { + return this.#actorSql(ctx).metrics?.() ?? null; + } + actorSqlTakeLastKvError(ctx: ActorContextHandle): string | null { return this.#actorSql(ctx).takeLastKvError?.() ?? null; } diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/native.ts b/rivetkit-typescript/packages/rivetkit/src/registry/native.ts index bafb19e7d6..9f4ab17873 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/native.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/native.ts @@ -410,6 +410,10 @@ function resolveNativeDestroy(runtime: CoreRuntime, ctx: ActorContextHandle) { gate.destroyCompletion = undefined; } +function clearNativeRuntimeState(runtime: CoreRuntime, ctx: ActorContextHandle) { + callNativeSync(() => runtime.actorClearRuntimeState(ctx)); +} + function closeNativeSqlDatabase( runtime: CoreRuntime, ctx: ActorContextHandle, @@ -461,6 +465,7 @@ function getOrCreateNativeSqlDatabase( execute: (sql, params) => runtime.actorSqlExecute(ctx, sql, params), query: (sql, params) => runtime.actorSqlQuery(ctx, sql, params), run: (sql, params) => runtime.actorSqlRun(ctx, sql, params), + metrics: () => runtime.actorSqlMetrics(ctx), takeLastKvError: () => runtime.actorSqlTakeLastKvError(ctx), close: () => runtime.actorSqlClose(ctx), }); @@ -3939,52 +3944,47 @@ export function buildNativeFactory( }, ) : undefined, - onSleep: onSleep - ? wrapNativeCallback( - async ( - error: unknown, - payload: { ctx: ActorContextHandle }, - ) => { - const { ctx } = unwrapTsfnPayload(error, payload); - const actorCtx = makeActorCtx(ctx); - try { - await onSleep(actorCtx); - if (runtime.kind === "wasm") { - await runtime.actorSaveState( - ctx, - actorCtx.serializeForTick("save"), - ); - } else { - await actorCtx.saveState({ immediate: true }); - } - } finally { - await actorCtx.dispose(); + onSleep: wrapNativeCallback( + async (error: unknown, payload: { ctx: ActorContextHandle }) => { + const { ctx } = unwrapTsfnPayload(error, payload); + const actorCtx = makeActorCtx(ctx); + try { + if (onSleep) { + await onSleep(actorCtx); + if (runtime.kind === "wasm") { + // Wasm cannot use the native context save helper here because + // the runtime owns the serialized state handoff. + await runtime.actorSaveState( + ctx, + actorCtx.serializeForTick("save"), + ); + } else { + await actorCtx.saveState({ immediate: true }); } - }, - ) - : undefined, - onDestroy: - typeof config.onDestroy === "function" || - databaseProvider !== undefined - ? wrapNativeCallback( - async ( - error: unknown, - payload: { ctx: ActorContextHandle }, - ) => { - const { ctx } = unwrapTsfnPayload(error, payload); - const actorCtx = makeActorCtx(ctx); - try { - if (typeof config.onDestroy === "function") { - await config.onDestroy(actorCtx); - } - } finally { - resolveNativeDestroy(runtime, ctx); - await actorCtx.closeDatabase(); - await actorCtx.dispose(); - } - }, - ) - : undefined, + } + } finally { + await actorCtx.closeDatabase(); + clearNativeRuntimeState(runtime, ctx); + await actorCtx.dispose(); + } + }, + ), + onDestroy: wrapNativeCallback( + async (error: unknown, payload: { ctx: ActorContextHandle }) => { + const { ctx } = unwrapTsfnPayload(error, payload); + const actorCtx = makeActorCtx(ctx); + try { + if (typeof config.onDestroy === "function") { + await config.onDestroy(actorCtx); + } + } finally { + resolveNativeDestroy(runtime, ctx); + await actorCtx.closeDatabase(); + clearNativeRuntimeState(runtime, ctx); + await actorCtx.dispose(); + } + }, + ), onBeforeConnect: typeof config.onBeforeConnect === "function" ? wrapNativeCallback( diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/runtime.ts b/rivetkit-typescript/packages/rivetkit/src/registry/runtime.ts index 3aba8b0a1f..f8e31680e3 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/runtime.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/runtime.ts @@ -1,3 +1,4 @@ +import type { SqliteNativeMetrics } from "@/common/database/config"; import type { RegistryConfig } from "./config"; declare const handleBrand: unique symbol; @@ -188,6 +189,7 @@ export interface RuntimeSqlDatabase { sql: string, params?: RuntimeSqlBindParams, ): Promise; + metrics?(): SqliteNativeMetrics | null; takeLastKvError?(): string | null; close(): Promise; } @@ -255,6 +257,11 @@ export interface RuntimeServerlessResponseHead { headers: Record; } +export interface RuntimeRegistryDiagnostics { + mode: string; + envoyActiveActorCount?: number | null; +} + export type RuntimeServerlessStreamEvent = | { kind: "chunk"; @@ -309,6 +316,9 @@ export interface CoreRuntime { cancelToken: CancellationTokenHandle, config: RuntimeServeConfig, ): Promise; + registryDiagnostics?( + registry: RegistryHandle, + ): Promise; createActorFactory( callbacks: object, config?: RuntimeActorConfig | undefined | null, @@ -384,6 +394,7 @@ export interface CoreRuntime { ): Promise; actorRegisterTask(ctx: ActorContextHandle, promise: Promise): void; actorRuntimeState(ctx: ActorContextHandle): object; + actorClearRuntimeState(ctx: ActorContextHandle): void; actorRestartRunHandler(ctx: ActorContextHandle): void; actorBeginWebsocketCallback(ctx: ActorContextHandle): number; actorEndWebsocketCallback(ctx: ActorContextHandle, regionId: number): void; @@ -446,6 +457,7 @@ export interface CoreRuntime { sql: string, params?: RuntimeSqlBindParams, ): Promise; + actorSqlMetrics(ctx: ActorContextHandle): SqliteNativeMetrics | null; actorSqlTakeLastKvError(ctx: ActorContextHandle): string | null; actorSqlClose(ctx: ActorContextHandle): Promise; diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/wasm-runtime.ts b/rivetkit-typescript/packages/rivetkit/src/registry/wasm-runtime.ts index d9498aef7c..de455f4456 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/wasm-runtime.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/wasm-runtime.ts @@ -270,6 +270,10 @@ export class WasmCoreRuntime implements CoreRuntime { await callWasm(() => asWasmRegistry(registry).shutdown()); } + async registryDiagnostics(): Promise<{ mode: string; envoyActiveActorCount: null }> { + return { mode: "wasm", envoyActiveActorCount: null }; + } + async handleServerlessRequest( registry: RegistryHandle, req: RuntimeServerlessRequest, @@ -517,6 +521,13 @@ export class WasmCoreRuntime implements CoreRuntime { return callHandle(asWasmActorContext(ctx), "runtimeState"); } + actorClearRuntimeState(ctx: ActorContextHandle): void { + const runtimeState = this.actorRuntimeState(ctx); + for (const key of Object.keys(runtimeState)) { + delete (runtimeState as Record)[key]; + } + } + actorRestartRunHandler(ctx: ActorContextHandle): void { callHandle(asWasmActorContext(ctx), "restartRunHandler"); } @@ -658,6 +669,10 @@ export class WasmCoreRuntime implements CoreRuntime { return await callWasm(() => this.#actorSql(ctx).run(sql, params)); } + actorSqlMetrics(ctx: ActorContextHandle) { + return this.#actorSql(ctx).metrics?.() ?? null; + } + actorSqlTakeLastKvError(ctx: ActorContextHandle): string | null { return this.#actorSql(ctx).takeLastKvError?.() ?? null; }