diff --git a/.agent/specs/sqlite-vfs-staging-cache-ttl.md b/.agent/specs/sqlite-vfs-staging-cache-ttl.md new file mode 100644 index 0000000000..6316bbcde0 --- /dev/null +++ b/.agent/specs/sqlite-vfs-staging-cache-ttl.md @@ -0,0 +1,82 @@ +# SQLite VFS Staging Cache TTL Plan + +Date: 2026-05-03 + +This plan changes the SQLite VFS page cache from a broad second-level pager cache into a short-lived staging cache for speculative pages. Demand pages fetched for `xRead` should be handed to SQLite and then forgotten by the VFS. + +## Goals + +- Avoid retaining pages in VFS memory after SQLite has already received them through `xRead`. +- Keep startup preload and read-ahead useful by retaining speculative pages briefly. +- Evict speculative pages on first successful target read so TTL is only the fallback for unused preloads. +- Keep lazy loading correct when all cache and preload features are disabled. +- Treat page 1 as staging data after `xRead` while keeping parsed page-size and database-size metadata. + +## Non-Goals + +- Do not change the remote `get_pages` protocol. +- Do not change SQLite pager settings. +- Do not add read pools back. +- Do not implement persisted preload hints in this branch. + +## Current Behavior + +- `resolve_pages` classifies fetched pages as `Target` when SQLite requested them and `Prefetch` when they were predicted. +- `fetch_initial_pages_for_registration` seeds startup pages as `Startup`. +- `should_cache_page` allows target, prefetch, and startup caching based on `SqliteVfsPageCacheMode`. +- Page 1 is always cacheable. +- Early protected pages live in `protected_page_cache`, which is an `scc::HashMap` with no TTL. + +## Proposed Behavior + +- Target pages should not be inserted into the VFS page cache by default. +- Target reads should remove speculative read pages from the cache after bytes are copied to the caller. +- Prefetch pages should be inserted into a TTL cache. +- Startup preload pages should be inserted into the same TTL cache. +- Commit completion should stage dirty pages in a separate TTL cache so SQLite can reread its own writes without retaining them permanently. +- Page 1 should follow the same staging rule as other pages after `xRead`. The VFS keeps parsed page-size and database-size metadata, and it can synthesize the empty page-1 header again before the first commit when depot has no database yet. +- Protected cache should no longer protect speculative pages forever. It should be removed or left unused in favor of the TTL cache. + +## Configuration + +- Add `RIVETKIT_SQLITE_OPT_VFS_STAGING_CACHE_TTL_MS`. +- Default to a short TTL such as `30000` ms. +- A value of `0` disables speculative retention while preserving lazy target fetches. +- Keep `RIVETKIT_SQLITE_OPT_VFS_PAGE_CACHE_MODE=off` as the stronger kill switch for all non-page-1 VFS caching. +- Do not use `RIVETKIT_SQLITE_OPT_VFS_PROTECTED_CACHE_PAGES` to pin VFS page bytes beyond `xRead`. + +## Implementation Plan + +1. Extend `SqliteOptimizationFlags` and `VfsConfig` with a bounded staging TTL field. +2. Build `page_cache` with `time_to_live(Duration::from_millis(ttl_ms))` when TTL is nonzero. +3. Split cache insertion semantics so `PageCacheInsertKind::Target` is not retained by default. +4. Add an explicit `evict_pages_after_target_read` helper that removes every consumed page from both normal and protected speculative caches. +5. Call that helper after `io_read` copies returned bytes into SQLite's buffer. +6. Evict dirty page numbers from the staging cache after commit completion. +7. Rework `protected_page_cache` so it cannot pin speculative pages forever. +8. Keep `seed_main_page` behavior intact for parsed page 1 metadata. +9. Update metrics naming only if needed. `page_cache_entries` can continue to report retained VFS entries. + +## Expected Cache Matrix + +| Page source | Retained after fetch | Evicted on target read | TTL fallback | +| --- | --- | --- | --- | +| Target `xRead` miss | No | Not needed | No | +| Read-ahead prefetch | Yes | Yes | Yes | +| Startup preload | Yes | Yes | Yes | +| Page 1 | Yes during bootstrap or preload | Yes | Yes when retained | +| Dirty write buffer | Existing behavior | Existing behavior | No | + +## Tests + +- Add a VFS test proving a target read miss does not increase retained VFS cache entries. +- Add a VFS test proving prefetch pages are retained before use and removed after target read. +- Add a VFS test proving startup preload pages are retained briefly and removed after target read. +- Add a VFS test proving `VFS_STAGING_CACHE_TTL_MS=0` still lazily fetches pages. +- Add a VFS test proving `VFS_PAGE_CACHE_MODE=off` still lazily fetches pages and does not retain non-page-1 pages. +- If practical, use Tokio time pause/advance to verify TTL expiry deterministically instead of sleeping. + +## Open Questions + +- Should target retention remain available as an explicit benchmark mode, or should we remove target caching from the shipped matrix? +- Should `VFS_PROTECTED_CACHE_PAGES` be deprecated now that VFS pages are staging-only? diff --git a/docs-internal/engine/SQLITE_OPTIMIZATIONS.md b/docs-internal/engine/SQLITE_OPTIMIZATIONS.md index f79fabb8da..145d08e4c7 100644 --- a/docs-internal/engine/SQLITE_OPTIMIZATIONS.md +++ b/docs-internal/engine/SQLITE_OPTIMIZATIONS.md @@ -11,7 +11,10 @@ Range page-read protocol details live in `.agent/specs/sqlite-range-page-read-pr ## Existing Optimizations - Actor startup can preload SQLite VFS pages through `OpenConfig.preload_pgnos`, `OpenConfig.preload_ranges`, and persisted `/PRELOAD_HINTS`; first pages, hint mechanisms, and the preload byte budget are configured through central SQLite optimization flags. -- The VFS keeps an in-memory page cache seeded from `sqlite_startup_data.preloaded_pages`; cache behavior is selected with `RIVETKIT_SQLITE_OPT_VFS_PAGE_CACHE_MODE=off|target|startup|prefetch|all`, with capacity and protected-cache budget configured separately. +- The VFS keeps a short-lived staging cache for startup preload and read-ahead pages. Direct target pages fetched for `xRead` are not retained in VFS memory. +- Any speculative page consumed by `xRead`, including page 1, is evicted from the VFS staging cache after SQLite receives it. Before the first commit, a lazy page-1 read for a missing database synthesizes the empty SQLite header again instead of retaining page bytes. Staged pages that SQLite never reads expire through `RIVETKIT_SQLITE_OPT_VFS_STAGING_CACHE_TTL_MS`. +- Commit completion stages dirty pages in a separate TTL cache so SQLite can reread its own writes without turning the VFS into a permanent second pager. +- VFS staging cache behavior is selected with `RIVETKIT_SQLITE_OPT_VFS_PAGE_CACHE_MODE=off|target|startup|prefetch|all`, with capacity configured separately. The protected-cache budget no longer pins VFS page bytes beyond `xRead`. - The VFS has speculative read-ahead selected with `RIVETKIT_SQLITE_OPT_READ_AHEAD_MODE=off|bounded|adaptive`; the default bounded budget is 64 pages, which reduced the cold-read benchmark from 1,249 to 368 VFS `get_pages` calls. - The VFS tracks bounded recent page hints as hot pages plus coalesced scan ranges; `NativeDatabase::snapshot_preload_hints()` exposes the in-memory plan for future flush wiring. - Actor Prometheus metrics expose VFS read counters, fetched bytes, cache hits/misses, and `get_pages` duration at `/gateway//metrics`. diff --git a/engine/packages/depot-client/Cargo.toml b/engine/packages/depot-client/Cargo.toml index d9861bf1c6..221a7164c1 100644 --- a/engine/packages/depot-client/Cargo.toml +++ b/engine/packages/depot-client/Cargo.toml @@ -23,6 +23,7 @@ depot-client-types.workspace = true depot.workspace = true moka = { version = "0.12", default-features = false, features = ["sync"] } parking_lot.workspace = true +scc.workspace = true [dev-dependencies] depot = { workspace = true, features = ["test-faults"] } @@ -31,7 +32,6 @@ gas.workspace = true rivet-config.workspace = true rivet-pools.workspace = true rivet-test-deps.workspace = true -scc.workspace = true sha2.workspace = true tempfile.workspace = true universaldb.workspace = true diff --git a/engine/packages/depot-client/src/database.rs b/engine/packages/depot-client/src/database.rs index b6d621eb39..5310583de7 100644 --- a/engine/packages/depot-client/src/database.rs +++ b/engine/packages/depot-client/src/database.rs @@ -9,7 +9,7 @@ use crate::{ vfs::{ NativeVfsHandle, SqliteTransportHandle, SqliteVfs, SqliteVfsMetrics, SqliteVfsMetricsSnapshot, VfsConfig, VfsPreloadHintSnapshot, - fetch_initial_main_page_for_registration, + fetch_initial_pages_for_registration, }, worker::SqliteWorkerHandle, }; @@ -32,17 +32,18 @@ pub async fn open_database_from_transport( metrics: Option>, ) -> Result { let vfs_name = vfs_name_for_actor_database(&actor_id, generation); - let initial_main_page = fetch_initial_main_page_for_registration(transport.clone(), &actor_id) + let config = VfsConfig::default(); + let initial_pages = fetch_initial_pages_for_registration(transport.clone(), &actor_id, &config) .await - .map_err(|e| anyhow!("failed to preload sqlite main page: {e}"))?; + .map_err(|e| anyhow!("failed to preload sqlite pages: {e}"))?; let vfs = Arc::new( - SqliteVfs::register_with_transport_and_initial_page( + SqliteVfs::register_with_transport_and_initial_pages( &vfs_name, transport, actor_id.clone(), rt_handle, - VfsConfig::default(), - initial_main_page, + config, + initial_pages, metrics.clone(), ) .map_err(|e| anyhow!("failed to register sqlite VFS: {e}"))?, diff --git a/engine/packages/depot-client/src/optimization_flags.rs b/engine/packages/depot-client/src/optimization_flags.rs index eb068a17bf..c93398e61f 100644 --- a/engine/packages/depot-client/src/optimization_flags.rs +++ b/engine/packages/depot-client/src/optimization_flags.rs @@ -22,6 +22,7 @@ pub const VFS_PAGE_CACHE_MODE_ENV: &str = "RIVETKIT_SQLITE_OPT_VFS_PAGE_CACHE_MO pub const VFS_PAGE_CACHE_CAPACITY_PAGES_ENV: &str = "RIVETKIT_SQLITE_OPT_VFS_PAGE_CACHE_CAPACITY_PAGES"; pub const VFS_PROTECTED_CACHE_PAGES_ENV: &str = "RIVETKIT_SQLITE_OPT_VFS_PROTECTED_CACHE_PAGES"; +pub const VFS_STAGING_CACHE_TTL_MS_ENV: &str = "RIVETKIT_SQLITE_OPT_VFS_STAGING_CACHE_TTL_MS"; pub const DEFAULT_STARTUP_PRELOAD_MAX_BYTES: usize = 1024 * 1024; pub const MAX_STARTUP_PRELOAD_MAX_BYTES: usize = 8 * 1024 * 1024; @@ -31,6 +32,8 @@ pub const DEFAULT_VFS_PAGE_CACHE_CAPACITY_PAGES: u64 = 50_000; pub const MAX_VFS_PAGE_CACHE_CAPACITY_PAGES: u64 = 500_000; pub const DEFAULT_VFS_PROTECTED_CACHE_PAGES: usize = 512; pub const MAX_VFS_PROTECTED_CACHE_PAGES: usize = 8_192; +pub const DEFAULT_VFS_STAGING_CACHE_TTL_MS: u64 = 30_000; +pub const MAX_VFS_STAGING_CACHE_TTL_MS: u64 = 300_000; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum SqliteReadAheadMode { @@ -102,6 +105,7 @@ pub struct SqliteOptimizationFlags { pub vfs_page_cache_mode: SqliteVfsPageCacheMode, pub vfs_page_cache_capacity_pages: u64, pub vfs_protected_cache_pages: usize, + pub vfs_staging_cache_ttl_ms: u64, } impl Default for SqliteOptimizationFlags { @@ -128,6 +132,7 @@ impl Default for SqliteOptimizationFlags { vfs_page_cache_mode: SqliteVfsPageCacheMode::All, vfs_page_cache_capacity_pages: DEFAULT_VFS_PAGE_CACHE_CAPACITY_PAGES, vfs_protected_cache_pages: DEFAULT_VFS_PROTECTED_CACHE_PAGES, + vfs_staging_cache_ttl_ms: DEFAULT_VFS_STAGING_CACHE_TTL_MS, } } } @@ -196,6 +201,11 @@ impl SqliteOptimizationFlags { DEFAULT_VFS_PROTECTED_CACHE_PAGES, MAX_VFS_PROTECTED_CACHE_PAGES, ), + vfs_staging_cache_ttl_ms: u64_bounded_by_default( + read_env(VFS_STAGING_CACHE_TTL_MS_ENV).as_deref(), + DEFAULT_VFS_STAGING_CACHE_TTL_MS, + MAX_VFS_STAGING_CACHE_TTL_MS, + ), } } } @@ -307,6 +317,7 @@ mod tests { VFS_PAGE_CACHE_MODE_ENV => Some("off".to_string()), VFS_PAGE_CACHE_CAPACITY_PAGES_ENV => Some("0".to_string()), VFS_PROTECTED_CACHE_PAGES_ENV => Some("0".to_string()), + VFS_STAGING_CACHE_TTL_MS_ENV => Some("0".to_string()), _ => None, }); @@ -327,6 +338,7 @@ mod tests { assert_eq!(flags.vfs_page_cache_mode, SqliteVfsPageCacheMode::Off); assert_eq!(flags.vfs_page_cache_capacity_pages, 0); assert_eq!(flags.vfs_protected_cache_pages, 0); + assert_eq!(flags.vfs_staging_cache_ttl_ms, 0); } #[test] @@ -336,6 +348,7 @@ mod tests { STARTUP_PRELOAD_FIRST_PAGE_COUNT_ENV => Some("nope".to_string()), VFS_PAGE_CACHE_CAPACITY_PAGES_ENV => Some("invalid".to_string()), VFS_PROTECTED_CACHE_PAGES_ENV => Some("invalid".to_string()), + VFS_STAGING_CACHE_TTL_MS_ENV => Some("invalid".to_string()), _ => None, }); assert_eq!( @@ -354,6 +367,10 @@ mod tests { invalid.vfs_protected_cache_pages, DEFAULT_VFS_PROTECTED_CACHE_PAGES ); + assert_eq!( + invalid.vfs_staging_cache_ttl_ms, + DEFAULT_VFS_STAGING_CACHE_TTL_MS + ); let clamped = SqliteOptimizationFlags::from_env_reader(|key| match key { STARTUP_PRELOAD_MAX_BYTES_ENV => Some((MAX_STARTUP_PRELOAD_MAX_BYTES + 1).to_string()), @@ -364,6 +381,7 @@ mod tests { Some((MAX_VFS_PAGE_CACHE_CAPACITY_PAGES + 1).to_string()) } VFS_PROTECTED_CACHE_PAGES_ENV => Some((MAX_VFS_PROTECTED_CACHE_PAGES + 1).to_string()), + VFS_STAGING_CACHE_TTL_MS_ENV => Some((MAX_VFS_STAGING_CACHE_TTL_MS + 1).to_string()), _ => None, }); assert_eq!( @@ -382,5 +400,9 @@ mod tests { clamped.vfs_protected_cache_pages, MAX_VFS_PROTECTED_CACHE_PAGES ); + assert_eq!( + clamped.vfs_staging_cache_ttl_ms, + MAX_VFS_STAGING_CACHE_TTL_MS + ); } } diff --git a/engine/packages/depot-client/src/vfs.rs b/engine/packages/depot-client/src/vfs.rs index a419739e33..287ee73638 100644 --- a/engine/packages/depot-client/src/vfs.rs +++ b/engine/packages/depot-client/src/vfs.rs @@ -16,11 +16,13 @@ use libsqlite3_sys::*; use moka::sync::Cache; use parking_lot::{Mutex, RwLock}; use rivet_envoy_protocol as protocol; +use scc::HashMap as SccHashMap; use tokio::runtime::Handle; -use crate::optimization_flags::{SqliteOptimizationFlags, sqlite_optimization_flags}; +use crate::optimization_flags::{ + SqliteOptimizationFlags, SqliteVfsPageCacheMode, sqlite_optimization_flags, +}; -const DEFAULT_CACHE_CAPACITY_PAGES: u64 = 50_000; const DEFAULT_PREFETCH_DEPTH: usize = 64; const LEGACY_PREFETCH_DEPTH: usize = 16; const DEFAULT_MAX_PREFETCH_BYTES: usize = 256 * 1024; @@ -130,11 +132,19 @@ fn sqlite_now_ms() -> Result { #[derive(Debug, Clone)] pub struct VfsConfig { pub cache_capacity_pages: u64, + pub protected_cache_pages: usize, + pub page_cache_mode: SqliteVfsPageCacheMode, + pub staging_cache_ttl_ms: u64, pub prefetch_depth: usize, pub adaptive_prefetch_depth: usize, pub max_prefetch_bytes: usize, pub adaptive_max_prefetch_bytes: usize, pub max_pages_per_stage: usize, + pub startup_preload_max_bytes: usize, + pub startup_preload_first_pages: bool, + pub startup_preload_first_page_count: u32, + pub preload_hints_on_open: bool, + pub preload_hint_early_pages: bool, pub recent_hint_page_budget: usize, pub recent_hint_range_budget: usize, pub cache_hit_predictor_training: bool, @@ -152,8 +162,20 @@ impl Default for VfsConfig { impl VfsConfig { pub fn from_optimization_flags(flags: SqliteOptimizationFlags) -> Self { + let caches_pages = flags.vfs_page_cache_mode.caches_any_pages(); Self { - cache_capacity_pages: DEFAULT_CACHE_CAPACITY_PAGES, + cache_capacity_pages: if caches_pages { + flags.vfs_page_cache_capacity_pages + } else { + 0 + }, + protected_cache_pages: 0, + page_cache_mode: flags.vfs_page_cache_mode, + staging_cache_ttl_ms: if caches_pages { + flags.vfs_staging_cache_ttl_ms + } else { + 0 + }, prefetch_depth: if flags.read_ahead { DEFAULT_PREFETCH_DEPTH } else { @@ -163,12 +185,17 @@ impl VfsConfig { max_prefetch_bytes: DEFAULT_MAX_PREFETCH_BYTES, adaptive_max_prefetch_bytes: DEFAULT_ADAPTIVE_MAX_PREFETCH_BYTES, max_pages_per_stage: DEFAULT_MAX_PAGES_PER_STAGE, - recent_hint_page_budget: if flags.recent_page_hints { + startup_preload_max_bytes: flags.startup_preload_max_bytes, + startup_preload_first_pages: flags.startup_preload_first_pages, + startup_preload_first_page_count: flags.startup_preload_first_page_count, + preload_hints_on_open: flags.preload_hints_on_open, + preload_hint_early_pages: flags.preload_hint_early_pages, + recent_hint_page_budget: if flags.recent_page_hints && flags.preload_hint_hot_pages { DEFAULT_RECENT_HINT_PAGE_BUDGET } else { 0 }, - recent_hint_range_budget: if flags.recent_page_hints { + recent_hint_range_budget: if flags.recent_page_hints && flags.preload_hint_scan_ranges { DEFAULT_RECENT_HINT_RANGE_BUDGET } else { 0 @@ -229,7 +256,6 @@ pub struct SqliteVfsMetricsSnapshot { pub total_ns: u64, pub commit_count: u64, pub page_cache_entries: u64, - pub page_cache_weighted_size: u64, pub page_cache_capacity_pages: u64, pub write_buffer_dirty_pages: u64, pub db_size_pages: u64, @@ -320,6 +346,8 @@ struct VfsState { db_size_pages: u32, page_size: usize, page_cache: Cache>, + committed_page_cache: Cache>, + protected_page_cache: Arc>>, write_buffer: WriteBuffer, predictor: PrefetchPredictor, read_ahead: AdaptiveReadAhead, @@ -405,6 +433,13 @@ struct AuxFileHandle { delete_on_close: bool, } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum PageCacheInsertKind { + Target, + Prefetch, + Startup, +} + unsafe impl Send for VfsContext {} unsafe impl Sync for VfsContext {} @@ -758,15 +793,14 @@ fn push_coalesced_range(ranges: &mut VecDeque, range: VfsPr impl VfsState { fn new(config: &VfsConfig) -> Self { - let page_cache = Cache::builder() - .max_capacity(config.cache_capacity_pages) - .build(); - page_cache.insert(1, empty_db_page()); - - Self { + let page_cache = build_page_cache(config); + let committed_page_cache = build_page_cache(config); + let mut state = Self { db_size_pages: 1, page_size: DEFAULT_PAGE_SIZE, page_cache, + committed_page_cache, + protected_page_cache: Arc::new(SccHashMap::new()), write_buffer: WriteBuffer::default(), predictor: PrefetchPredictor::default(), read_ahead: AdaptiveReadAhead::default(), @@ -775,20 +809,122 @@ impl VfsState { config.recent_hint_range_budget, ), dead: false, + }; + state.cache_page(config, PageCacheInsertKind::Startup, 1, empty_db_page()); + state + } + + fn cache_page( + &mut self, + config: &VfsConfig, + kind: PageCacheInsertKind, + pgno: u32, + bytes: Vec, + ) { + if !should_cache_page(config, kind, pgno) { + return; + } + cache_page( + config, + &self.page_cache, + &self.protected_page_cache, + kind, + pgno, + bytes, + ); + } + + fn cached_page(&self, config: &VfsConfig, pgno: u32) -> Option> { + if !can_read_cached_page(config, pgno) { + return None; + } + self + .committed_page_cache + .get(&pgno) + .or_else(|| self.protected_page_cache.read_sync(&pgno, |_, bytes| bytes.clone())) + .or_else(|| self.page_cache.get(&pgno)) + } + + fn cache_committed_page(&mut self, config: &VfsConfig, pgno: u32, bytes: Vec) { + if config.staging_cache_ttl_ms == 0 || !config.page_cache_mode.caches_any_pages() { + return; } + self.committed_page_cache.insert(pgno, bytes); } - fn seed_main_page(&mut self, page: Vec) { + fn evict_target_read_pages(&self, pgnos: &[u32]) { + for pgno in pgnos.iter().copied() { + self.page_cache.invalidate(&pgno); + self.protected_page_cache.remove_sync(&pgno); + } + } + + fn seed_page(&mut self, config: &VfsConfig, kind: PageCacheInsertKind, pgno: u32, page: Vec) { + if pgno == 1 { + self.seed_main_page(config, kind, page); + } else { + self.cache_page(config, kind, pgno, page); + } + } + + fn seed_main_page(&mut self, config: &VfsConfig, kind: PageCacheInsertKind, page: Vec) { if let Some(page_size) = sqlite_header_page_size(&page) { self.page_size = page_size; } if let Some(db_size_pages) = sqlite_header_db_size_pages(&page) { self.db_size_pages = db_size_pages; } - self.page_cache.insert(1, page); + self.cache_page(config, kind, 1, page); + } + + fn invalidate_page_cache(&mut self) { + self.page_cache.invalidate_all(); + self.committed_page_cache.invalidate_all(); + self.protected_page_cache.clear_sync(); + } +} + +fn build_page_cache(config: &VfsConfig) -> Cache> { + let mut page_cache_builder = Cache::builder().max_capacity(config.cache_capacity_pages); + if config.staging_cache_ttl_ms > 0 { + page_cache_builder = + page_cache_builder.time_to_live(Duration::from_millis(config.staging_cache_ttl_ms)); + } + page_cache_builder.build() +} + +fn cache_page( + config: &VfsConfig, + page_cache: &Cache>, + _protected_page_cache: &SccHashMap>, + kind: PageCacheInsertKind, + pgno: u32, + bytes: Vec, +) { + if !should_cache_page(config, kind, pgno) { + return; + } + page_cache.insert(pgno, bytes); +} + +fn should_cache_page(config: &VfsConfig, kind: PageCacheInsertKind, pgno: u32) -> bool { + match kind { + PageCacheInsertKind::Target => false, + PageCacheInsertKind::Prefetch => { + config.staging_cache_ttl_ms > 0 && config.page_cache_mode.caches_prefetched_pages() + } + PageCacheInsertKind::Startup => { + pgno == 1 + || (config.staging_cache_ttl_ms > 0 + && config.page_cache_mode.caches_startup_preloaded_pages()) + } } } +fn can_read_cached_page(config: &VfsConfig, pgno: u32) -> bool { + pgno == 1 || config.page_cache_mode.caches_any_pages() +} + impl VfsContext { fn new( actor_id: String, @@ -796,12 +932,12 @@ impl VfsContext { transport: SqliteTransportHandle, config: VfsConfig, io_methods: sqlite3_io_methods, - initial_main_page: Option>, + initial_pages: Vec<(u32, Vec)>, metrics: Option>, ) -> std::result::Result { let mut state = VfsState::new(&config); - if let Some(page) = initial_main_page { - state.seed_main_page(page); + for (pgno, page) in initial_pages { + state.seed_page(&config, PageCacheInsertKind::Startup, pgno, page); } Ok(Self { @@ -887,8 +1023,11 @@ impl VfsContext { state_update_ns: self.commit_state_update_ns.load(Ordering::Relaxed), total_ns: self.commit_duration_ns_total.load(Ordering::Relaxed), commit_count: self.commit_total.load(Ordering::Relaxed), - page_cache_entries: state.page_cache.entry_count(), - page_cache_weighted_size: state.page_cache.weighted_size(), + page_cache_entries: state + .page_cache + .entry_count() + .saturating_add(state.committed_page_cache.entry_count()) + .saturating_add(state.protected_page_cache.len() as u64), page_cache_capacity_pages: self.config.cache_capacity_pages, write_buffer_dirty_pages: state.write_buffer.dirty.len() as u64, db_size_pages: state.db_size_pages as u64, @@ -972,7 +1111,27 @@ impl VfsContext { if !self.config.recent_page_hints { return VfsPreloadHintSnapshot::default(); } - self.state.read().recent_pages.snapshot() + let state = self.state.read(); + let mut snapshot = state.recent_pages.snapshot(); + if self.config.preload_hint_early_pages { + let mut existing_pgnos = snapshot.pgnos.iter().copied().collect::>(); + let early_page_count = self + .config + .startup_preload_first_page_count + .min(state.db_size_pages); + for pgno in 1..=early_page_count { + if !snapshot + .ranges + .iter() + .any(|range| range.contains(pgno)) + && existing_pgnos.insert(pgno) + { + snapshot.pgnos.push(pgno); + } + } + snapshot.pgnos.sort_unstable(); + } + snapshot } fn resolve_pages( @@ -1006,7 +1165,7 @@ impl VfsContext { resolved.insert(pgno, Some(bytes.clone())); continue; } - if let Some(bytes) = state.page_cache.get(&pgno) { + if let Some(bytes) = state.cached_page(&self.config, pgno) { resolved.insert(pgno, Some(bytes)); continue; } @@ -1145,7 +1304,11 @@ impl VfsContext { match response { protocol::SqliteGetPagesResponse::SqliteGetPagesOk(ok) => { - let page_cache = { self.state.read().page_cache.clone() }; + let missing_pages = missing.iter().copied().collect::>(); + let (page_cache, protected_page_cache) = { + let state = self.state.read(); + (state.page_cache.clone(), state.protected_page_cache.clone()) + }; #[cfg(debug_assertions)] let mut returned_pgnos = HashSet::new(); #[cfg(debug_assertions)] @@ -1163,10 +1326,31 @@ impl VfsContext { } } } - if let Some(bytes) = &fetched.bytes { - page_cache.insert(fetched.pgno, bytes.clone()); + let bytes = if fetched.bytes.is_none() + && self.commit_total.load(Relaxed) == 0 + && missing_pages.contains(&fetched.pgno) + && fetched.pgno == 1 + { + Some(empty_db_page()) + } else { + fetched.bytes + }; + if let Some(bytes) = &bytes { + let kind = if missing_pages.contains(&fetched.pgno) { + PageCacheInsertKind::Target + } else { + PageCacheInsertKind::Prefetch + }; + cache_page( + &self.config, + &page_cache, + &protected_page_cache, + kind, + fetched.pgno, + bytes.clone(), + ); } - resolved.insert(fetched.pgno, fetched.bytes); + resolved.insert(fetched.pgno, bytes); } #[cfg(debug_assertions)] { @@ -1203,6 +1387,20 @@ impl VfsContext { Ok(resolved) } protocol::SqliteGetPagesResponse::SqliteErrorResponse(error) => { + if self.commit_total.load(Relaxed) == 0 + && missing.contains(&1) + && is_initial_main_page_missing(&error.message) + { + for pgno in missing { + let bytes = if pgno == 1 { + Some(empty_db_page()) + } else { + None + }; + resolved.entry(pgno).or_insert(bytes); + } + return Ok(resolved); + } Err(GetPagesError::Other(error.message)) } } @@ -1303,9 +1501,7 @@ impl VfsContext { let mut state = self.state.write(); state.db_size_pages = request.new_db_size_pages; for dirty_page in &request.dirty_pages { - state - .page_cache - .insert(dirty_page.pgno, dirty_page.bytes.clone()); + state.cache_committed_page(&self.config, dirty_page.pgno, dirty_page.bytes.clone()); } state.write_buffer.dirty.clear(); let state_update_ns = state_update_start.elapsed().as_nanos() as u64; @@ -1413,9 +1609,7 @@ impl VfsContext { let mut state = self.state.write(); state.db_size_pages = request.new_db_size_pages; for dirty_page in &request.dirty_pages { - state - .page_cache - .insert(dirty_page.pgno, dirty_page.bytes.clone()); + state.cache_committed_page(&self.config, dirty_page.pgno, dirty_page.bytes.clone()); } state.write_buffer.dirty.clear(); state.write_buffer.in_atomic_write = false; @@ -1438,7 +1632,7 @@ impl VfsContext { .write_buffer .dirty .retain(|pgno, _| *pgno <= truncated_pages); - state.page_cache.invalidate_all(); + state.invalidate_page_cache(); } } @@ -1519,15 +1713,47 @@ pub(crate) async fn fetch_initial_main_page_for_registration( fetch_initial_main_page(transport, actor_id.to_string()).await } +pub(crate) async fn fetch_initial_pages_for_registration( + transport: SqliteTransportHandle, + actor_id: &str, + config: &VfsConfig, +) -> std::result::Result)>, String> { + if !config.startup_preload_first_pages + || !config.page_cache_mode.caches_startup_preloaded_pages() + || config.startup_preload_max_bytes < DEFAULT_PAGE_SIZE + { + return fetch_initial_main_page_for_registration(transport, actor_id) + .await + .map(|page| page.into_iter().map(|page| (1, page)).collect()); + } + + let page_count_from_bytes = config.startup_preload_max_bytes / DEFAULT_PAGE_SIZE; + let page_count = config + .startup_preload_first_page_count + .min(page_count_from_bytes as u32) + .max(1); + fetch_initial_pages(transport, actor_id.to_string(), page_count).await +} + async fn fetch_initial_main_page( transport: SqliteTransportHandle, actor_id: String, ) -> std::result::Result>, String> { + fetch_initial_pages(transport, actor_id, 1) + .await + .map(|pages| pages.into_iter().find(|(pgno, _)| *pgno == 1).map(|(_, bytes)| bytes)) +} + +async fn fetch_initial_pages( + transport: SqliteTransportHandle, + actor_id: String, + page_count: u32, +) -> std::result::Result)>, String> { let request_actor_id = actor_id.clone(); let response = transport .get_pages(protocol::SqliteGetPagesRequest { actor_id: request_actor_id, - pgnos: vec![1], + pgnos: (1..=page_count).collect(), expected_generation: None, expected_head_txid: None, }) @@ -1537,8 +1763,8 @@ async fn fetch_initial_main_page( Ok(protocol::SqliteGetPagesResponse::SqliteGetPagesOk(ok)) => Ok(ok .pages .into_iter() - .find(|page| page.pgno == 1) - .and_then(|page| page.bytes)), + .filter_map(|page| page.bytes.map(|bytes| (page.pgno, bytes))) + .collect()), Ok(protocol::SqliteGetPagesResponse::SqliteErrorResponse(error)) => { if !is_initial_main_page_missing(&error.message) { return Err(format!( @@ -1551,7 +1777,7 @@ async fn fetch_initial_main_page( error = %error.message, "sqlite initial page fetch did not find persisted data" ); - Ok(None) + Ok(Vec::new()) } Err(err) => Err(format!("sqlite initial page fetch failed: {err}")), } @@ -1936,6 +2162,12 @@ unsafe extern "C" fn io_read( let resolved = match ctx.resolve_pages(&requested_pages, true) { Ok(pages) => pages, Err(GetPagesError::Other(message)) => { + tracing::error!( + actor_id = %ctx.actor_id, + requested_pages = ?requested_pages, + error = %message, + "sqlite xRead failed to resolve pages" + ); ctx.mark_dead(message); return SQLITE_IOERR_READ; } @@ -1966,7 +2198,7 @@ unsafe extern "C" fn io_read( } buf.fill(0); - for pgno in requested_pages { + for pgno in requested_pages.iter().copied() { let Some(Some(bytes)) = resolved.get(&pgno) else { continue; }; @@ -1982,6 +2214,7 @@ unsafe extern "C" fn io_read( buf[dest_offset..dest_offset + copy_len] .copy_from_slice(&bytes[page_offset..page_offset + copy_len]); } + ctx.state.read().evict_target_read_pages(&requested_pages); if i_offset as usize + i_amt as usize > file_size { return SQLITE_IOERR_SHORT_READ; @@ -2510,11 +2743,18 @@ impl SqliteVfs { config: VfsConfig, metrics: Option>, ) -> std::result::Result { - Self::register_with_transport_and_initial_page( - name, transport, actor_id, runtime, config, None, metrics, + Self::register_with_transport_and_initial_pages( + name, + transport, + actor_id, + runtime, + config, + Vec::new(), + metrics, ) } + #[cfg(test)] pub(crate) fn register_with_transport_and_initial_page( name: &str, transport: SqliteTransportHandle, @@ -2523,6 +2763,30 @@ impl SqliteVfs { config: VfsConfig, initial_main_page: Option>, metrics: Option>, + ) -> std::result::Result { + let initial_pages = initial_main_page + .into_iter() + .map(|page| (1, page)) + .collect(); + Self::register_with_transport_and_initial_pages( + name, + transport, + actor_id, + runtime, + config, + initial_pages, + metrics, + ) + } + + pub(crate) fn register_with_transport_and_initial_pages( + name: &str, + transport: SqliteTransportHandle, + actor_id: String, + runtime: Handle, + config: VfsConfig, + initial_pages: Vec<(u32, Vec)>, + metrics: Option>, ) -> std::result::Result { let mut io_methods: sqlite3_io_methods = unsafe { std::mem::zeroed() }; io_methods.iVersion = 1; @@ -2545,7 +2809,7 @@ impl SqliteVfs { transport, config, io_methods, - initial_main_page, + initial_pages, metrics, )?); let ctx_ptr = (&mut *ctx) as *mut VfsContext; diff --git a/engine/packages/depot-client/tests/inline/fault/scenario.rs b/engine/packages/depot-client/tests/inline/fault/scenario.rs index 251b134147..08b470eeb5 100644 --- a/engine/packages/depot-client/tests/inline/fault/scenario.rs +++ b/engine/packages/depot-client/tests/inline/fault/scenario.rs @@ -642,15 +642,16 @@ impl FaultScenarioCtx { pub(crate) async fn seed_page_as_cold_ref_for_harness_test(&self, pgno: u32) -> Result<()> { let dirty_pages = self.with_database_blocking(|db| { - let state = db._vfs.ctx().state.read(); + let ctx = db._vfs.ctx(); + let state = ctx.state.read(); (1..=state.db_size_pages) .filter(|candidate_pgno| { *candidate_pgno / depot::keys::SHARD_SIZE == pgno / depot::keys::SHARD_SIZE }) .map(|candidate_pgno| { - let bytes = state.page_cache.get(&candidate_pgno).with_context(|| { + let bytes = state.cached_page(&ctx.config, candidate_pgno).with_context(|| { format!( - "page {candidate_pgno} should be present in strict VFS cache before cold-ref seed" + "page {candidate_pgno} should be present in VFS cache before cold-ref seed" ) })?; Ok(DirtyPage { diff --git a/engine/packages/depot-client/tests/inline/vfs.rs b/engine/packages/depot-client/tests/inline/vfs.rs index 679f114d4c..bacb876dc0 100644 --- a/engine/packages/depot-client/tests/inline/vfs.rs +++ b/engine/packages/depot-client/tests/inline/vfs.rs @@ -11,12 +11,19 @@ use std::sync::{Arc, Barrier, mpsc}; use std::thread; use std::time::Duration; +use async_trait::async_trait; use depot::cold_tier::FilesystemColdTier; use parking_lot::Mutex as SyncMutex; +use rivet_envoy_protocol as protocol; use tempfile::TempDir; use tokio::runtime::Builder; use tokio::sync::OnceCell; +use crate::optimization_flags::{ + DEFAULT_STARTUP_PRELOAD_MAX_BYTES, DEFAULT_VFS_PAGE_CACHE_CAPACITY_PAGES, + DEFAULT_VFS_PROTECTED_CACHE_PAGES, DEFAULT_VFS_STAGING_CACHE_TTL_MS, + SqliteOptimizationFlags, SqliteReadAheadMode, SqliteVfsPageCacheMode, +}; use crate::query::{BindParam, ColumnValue}; use crate::vfs::SqliteVfsMetrics; @@ -24,6 +31,212 @@ use super::*; static TEST_ID: AtomicU64 = AtomicU64::new(1); +#[test] +fn vfs_config_wires_optimization_flags() { + let flags = SqliteOptimizationFlags { + read_ahead_mode: SqliteReadAheadMode::Off, + read_ahead: false, + adaptive_read_ahead: false, + recent_page_hints: true, + cache_hit_predictor_training: true, + preload_hint_flush: true, + startup_preload_max_bytes: DEFAULT_STARTUP_PRELOAD_MAX_BYTES / 2, + startup_preload_first_pages: false, + startup_preload_first_page_count: 7, + preload_hints_on_open: false, + preload_hint_hot_pages: false, + preload_hint_early_pages: false, + preload_hint_scan_ranges: true, + dedup_get_pages_meta: true, + cache_get_pages_validation: true, + range_reads: true, + batch_chunk_reads: true, + decoded_ltx_cache: true, + vfs_page_cache_mode: SqliteVfsPageCacheMode::Startup, + vfs_page_cache_capacity_pages: DEFAULT_VFS_PAGE_CACHE_CAPACITY_PAGES / 2, + vfs_protected_cache_pages: DEFAULT_VFS_PROTECTED_CACHE_PAGES / 2, + vfs_staging_cache_ttl_ms: DEFAULT_VFS_STAGING_CACHE_TTL_MS / 2, + }; + + let config = VfsConfig::from_optimization_flags(flags); + assert_eq!(config.page_cache_mode, SqliteVfsPageCacheMode::Startup); + assert_eq!( + config.cache_capacity_pages, + DEFAULT_VFS_PAGE_CACHE_CAPACITY_PAGES / 2 + ); + assert_eq!( + config.protected_cache_pages, + 0 + ); + assert_eq!( + config.staging_cache_ttl_ms, + DEFAULT_VFS_STAGING_CACHE_TTL_MS / 2 + ); + assert_eq!(config.prefetch_depth, 16); + assert!(!config.adaptive_read_ahead); + assert_eq!( + config.startup_preload_max_bytes, + DEFAULT_STARTUP_PRELOAD_MAX_BYTES / 2 + ); + assert!(!config.startup_preload_first_pages); + assert_eq!(config.startup_preload_first_page_count, 7); + assert!(!config.preload_hints_on_open); + assert!(!config.preload_hint_early_pages); + assert_eq!(config.recent_hint_page_budget, 0); + assert!(config.recent_hint_range_budget > 0); +} + +#[derive(Default)] +struct RecordingInitialPagesTransport { + requested_pgnos: SyncMutex>, +} + +#[async_trait] +impl SqliteTransport for RecordingInitialPagesTransport { + async fn get_pages( + &self, + request: protocol::SqliteGetPagesRequest, + ) -> anyhow::Result { + *self.requested_pgnos.lock() = request.pgnos.clone(); + Ok(protocol::SqliteGetPagesResponse::SqliteGetPagesOk( + protocol::SqliteGetPagesOk { + pages: request + .pgnos + .into_iter() + .map(|pgno| protocol::SqliteFetchedPage { + pgno, + bytes: Some(vec![pgno as u8; DEFAULT_PAGE_SIZE]), + }) + .collect(), + }, + )) + } + + async fn commit( + &self, + _request: protocol::SqliteCommitRequest, + ) -> anyhow::Result { + anyhow::bail!("initial-page preload test does not commit") + } +} + +struct MissingDbTransport; + +#[async_trait] +impl SqliteTransport for MissingDbTransport { + async fn get_pages( + &self, + _request: protocol::SqliteGetPagesRequest, + ) -> anyhow::Result { + Ok(protocol::SqliteGetPagesResponse::SqliteErrorResponse( + protocol::SqliteErrorResponse { + message: "sqlite database was not found in this bucket branch".to_string(), + }, + )) + } + + async fn commit( + &self, + _request: protocol::SqliteCommitRequest, + ) -> anyhow::Result { + anyhow::bail!("missing-db transport test does not commit") + } +} + +#[test] +fn startup_initial_pages_do_not_require_preload_hints_on_open() { + let runtime = direct_runtime(); + let transport = Arc::new(RecordingInitialPagesTransport::default()); + let config = VfsConfig { + startup_preload_first_pages: true, + startup_preload_first_page_count: 4, + startup_preload_max_bytes: DEFAULT_PAGE_SIZE * 4, + preload_hints_on_open: false, + page_cache_mode: SqliteVfsPageCacheMode::Startup, + ..VfsConfig::default() + }; + + let pages = runtime + .block_on(fetch_initial_pages_for_registration( + transport.clone(), + "startup-preload-actor", + &config, + )) + .expect("initial pages should load"); + + let loaded_pgnos = pages.iter().map(|(pgno, _)| *pgno).collect::>(); + assert_eq!(*transport.requested_pgnos.lock(), vec![1, 2, 3, 4]); + assert_eq!(loaded_pgnos, vec![1, 2, 3, 4]); +} + +#[test] +fn vfs_staging_cache_retains_only_speculative_pages() { + let config = VfsConfig { + page_cache_mode: SqliteVfsPageCacheMode::All, + staging_cache_ttl_ms: DEFAULT_VFS_STAGING_CACHE_TTL_MS, + ..VfsConfig::default() + }; + let mut state = VfsState::new(&config); + + state.cache_page(&config, PageCacheInsertKind::Target, 2, vec![2; DEFAULT_PAGE_SIZE]); + assert!(state.cached_page(&config, 2).is_none()); + + state.cache_page(&config, PageCacheInsertKind::Prefetch, 3, vec![3; DEFAULT_PAGE_SIZE]); + state.cache_page(&config, PageCacheInsertKind::Startup, 4, vec![4; DEFAULT_PAGE_SIZE]); + assert!(state.cached_page(&config, 3).is_some()); + assert!(state.cached_page(&config, 4).is_some()); + assert!(state.protected_page_cache.read_sync(&3, |_, _| ()).is_none()); + + state.evict_target_read_pages(&[1, 3, 4]); + assert!(state.cached_page(&config, 1).is_none()); + assert!(state.cached_page(&config, 3).is_none()); + assert!(state.cached_page(&config, 4).is_none()); +} + +#[test] +fn vfs_staging_cache_ttl_zero_disables_speculative_retention() { + let config = VfsConfig { + page_cache_mode: SqliteVfsPageCacheMode::All, + staging_cache_ttl_ms: 0, + ..VfsConfig::default() + }; + let mut state = VfsState::new(&config); + + state.cache_page(&config, PageCacheInsertKind::Prefetch, 2, vec![2; DEFAULT_PAGE_SIZE]); + state.cache_page(&config, PageCacheInsertKind::Startup, 3, vec![3; DEFAULT_PAGE_SIZE]); + assert!(state.cached_page(&config, 1).is_some()); + assert!(state.cached_page(&config, 2).is_none()); + assert!(state.cached_page(&config, 3).is_none()); + + state.evict_target_read_pages(&[1]); + assert!(state.cached_page(&config, 1).is_none()); +} + +#[test] +fn evicted_empty_page_one_can_be_synthesized_before_first_commit() { + let runtime = direct_runtime(); + let config = VfsConfig::default(); + let ctx = VfsContext::new( + next_test_name("missing-db-actor"), + runtime.handle().clone(), + Arc::new(MissingDbTransport), + config.clone(), + unsafe { std::mem::zeroed() }, + Vec::new(), + None, + ) + .expect("vfs context should build"); + + ctx.state.read().evict_target_read_pages(&[1]); + assert!(ctx.state.read().cached_page(&config, 1).is_none()); + + let resolved = ctx + .resolve_pages(&[1], true) + .expect("missing empty database should synthesize page 1"); + assert_eq!(resolved.get(&1), Some(&Some(empty_db_page()))); + assert!(ctx.state.read().cached_page(&config, 1).is_none()); +} + fn next_test_name(prefix: &str) -> String { let id = TEST_ID.fetch_add(1, Ordering::Relaxed); format!("{prefix}-{id}") @@ -121,7 +334,7 @@ impl DirectEngineHarness { Arc::new(DirectDepotTransport::new(engine)), VfsConfig::default(), unsafe { std::mem::zeroed() }, - None, + Vec::new(), None, ) .expect("vfs context should build") @@ -2302,7 +2515,7 @@ fn concurrent_reader_during_commit_atomic_observes_consistent_snapshot() { transport, VfsConfig::default(), unsafe { std::mem::zeroed() }, - None, + Vec::new(), None, ) .expect("vfs context should build"); @@ -3233,7 +3446,7 @@ fn resolve_pages_surfaces_read_path_error_response() { transport, VfsConfig::default(), unsafe { std::mem::zeroed() }, - None, + Vec::new(), None, ) .expect("vfs context should build"); diff --git a/engine/packages/guard/src/routing/actor_path.rs b/engine/packages/guard/src/routing/actor_path.rs index 9d9c8f6c66..d7b3aea443 100644 --- a/engine/packages/guard/src/routing/actor_path.rs +++ b/engine/packages/guard/src/routing/actor_path.rs @@ -229,10 +229,13 @@ fn extract_rvt_params(rvt_params: &[(String, String)]) -> Result { } .build()); } - map.insert( - stripped.to_string(), - serde_json::Value::String(value.clone()), - ); + let value = match stripped { + "bypass_connectable" => parse_query_bool(value) + .map(serde_json::Value::Bool) + .unwrap_or_else(|| serde_json::Value::String(value.clone())), + _ => serde_json::Value::String(value.clone()), + }; + map.insert(stripped.to_string(), value); } serde_json::from_value(serde_json::Value::Object(map)).map_err(|e| { @@ -243,6 +246,14 @@ fn extract_rvt_params(rvt_params: &[(String, String)]) -> Result { }) } +fn parse_query_bool(value: &str) -> Option { + match value { + "true" | "1" => Some(true), + "false" | "0" => Some(false), + _ => None, + } +} + /// Split a comma-separated key string into components. /// Missing or empty key yields an empty vec. fn split_key(raw: Option<&str>) -> Vec { diff --git a/examples/kitchen-sink/scripts/sqlite-realworld-bench.ts b/examples/kitchen-sink/scripts/sqlite-realworld-bench.ts index 02ea4bb491..5fe05832c0 100644 --- a/examples/kitchen-sink/scripts/sqlite-realworld-bench.ts +++ b/examples/kitchen-sink/scripts/sqlite-realworld-bench.ts @@ -25,8 +25,6 @@ const DEFAULT_STARTUP_PRELOAD_MAX_BYTES = 1024 * 1024; const DEFAULT_STARTUP_PRELOAD_FIRST_PAGE_COUNT = 1; const DEFAULT_VFS_PAGE_CACHE_CAPACITY_PAGES = 50_000; const DEFAULT_VFS_PROTECTED_CACHE_PAGES = 512; -const DEFAULT_READ_POOL_MAX_READERS = 4; -const DEFAULT_READ_POOL_IDLE_TTL_MS = 60_000; const DEFAULT_BENCH_VFS_ROUND_TRIP_LATENCY_MS = 10; const BENCH_VFS_ROUND_TRIP_LATENCY_MS_ENV = "RIVETKIT_SQLITE_BENCH_VFS_ROUND_TRIP_LATENCY_MS"; @@ -47,19 +45,12 @@ const SQLITE_OPT_BOOLEAN_ENVS = [ "RIVETKIT_SQLITE_OPT_PRELOAD_HINT_HOT_PAGES", "RIVETKIT_SQLITE_OPT_PRELOAD_HINT_EARLY_PAGES", "RIVETKIT_SQLITE_OPT_PRELOAD_HINT_SCAN_RANGES", - "RIVETKIT_SQLITE_OPT_CACHE_GET_PAGES_VALIDATION", - "RIVETKIT_SQLITE_OPT_RANGE_READS", - "RIVETKIT_SQLITE_OPT_BATCH_CHUNK_READS", - "RIVETKIT_SQLITE_OPT_DECODED_LTX_CACHE", - "RIVETKIT_SQLITE_OPT_READ_POOL_ENABLED", ] as const; const SQLITE_OPT_NUMERIC_ENVS = [ "RIVETKIT_SQLITE_OPT_STARTUP_PRELOAD_MAX_BYTES", "RIVETKIT_SQLITE_OPT_STARTUP_PRELOAD_FIRST_PAGE_COUNT", "RIVETKIT_SQLITE_OPT_VFS_PAGE_CACHE_CAPACITY_PAGES", "RIVETKIT_SQLITE_OPT_VFS_PROTECTED_CACHE_PAGES", - "RIVETKIT_SQLITE_OPT_READ_POOL_MAX_READERS", - "RIVETKIT_SQLITE_OPT_READ_POOL_IDLE_TTL_MS", ] as const; const WORKLOADS = [ @@ -187,9 +178,6 @@ interface MatrixScenarioReport { fetchedPages: number; cacheHits: number; cacheMisses: number; - routedReads: number; - writeFallbacks: number; - modeTransitions: number; }>; } @@ -204,7 +192,6 @@ interface BenchmarkResult { setup: SetupResult | null; main: MainResult; vfsMetrics: VfsMetricSnapshot; - readPoolMetrics: ReadPoolMetricSnapshot; } interface VfsMetricSnapshot { @@ -221,23 +208,6 @@ interface VfsMetricSnapshot { getPagesDurationSecondsCount: number; } -interface ReadPoolMetricSnapshot { - activeReaders: number; - idleReaders: number; - readWaitDurationSecondsSum: number; - readWaitDurationSecondsCount: number; - writeWaitDurationSecondsSum: number; - writeWaitDurationSecondsCount: number; - routedReadQueriesTotal: number; - writeFallbackQueriesTotal: number; - manualTransactionDurationSecondsSum: number; - manualTransactionDurationSecondsCount: number; - readerOpensTotal: number; - readerClosesTotal: number; - rejectedReaderMutationsTotal: number; - modeTransitionsTotal: number; -} - const WORKLOAD_SPECS: WorkloadSpec[] = [ { // Included to keep tiny actor databases honest while we optimize larger datasets. @@ -320,16 +290,14 @@ const WORKLOAD_SPECS: WorkloadSpec[] = [ description: "Selective tenant/time-range aggregate over events joined to orders.", }, { - // Included to measure future read-mode parallelism where several read-only SQLite connections overlap VFS misses. - // Today this captures the serialized baseline; after the connection manager lands, independent aggregate reads should overlap. + // Included to measure concurrent read-only aggregate pressure over the same VFS and transport. name: "parallel-read-aggregates", category: "read", sizeClass: "large", description: "Concurrent read-only aggregates over one actor-local SQLite database.", }, { - // Included to measure the read-mode to write-mode transition. - // Future write mode must wait for active readers, close them, run exactly one writable connection, then allow fresh readers. + // Included to measure read pressure queued alongside a write update. name: "parallel-read-write-transition", category: "write", sizeClass: "medium", @@ -417,7 +385,6 @@ const WORKLOAD_SPECS: WorkloadSpec[] = [ }, { // Included to model tool-like fan-out where an agent asks for recent rows, indexed rows, and aggregates concurrently. - // Parallel read pool routing should make these independent read-only statements overlap once TS serialization is gone. name: "chat-tool-read-fanout", category: "read", sizeClass: "large", @@ -525,11 +492,6 @@ function defaultSqliteOptimizationEnv(): Record { RIVETKIT_SQLITE_OPT_PRELOAD_HINT_HOT_PAGES: "true", RIVETKIT_SQLITE_OPT_PRELOAD_HINT_EARLY_PAGES: "true", RIVETKIT_SQLITE_OPT_PRELOAD_HINT_SCAN_RANGES: "true", - RIVETKIT_SQLITE_OPT_CACHE_GET_PAGES_VALIDATION: "true", - RIVETKIT_SQLITE_OPT_RANGE_READS: "true", - RIVETKIT_SQLITE_OPT_BATCH_CHUNK_READS: "true", - RIVETKIT_SQLITE_OPT_DECODED_LTX_CACHE: "true", - RIVETKIT_SQLITE_OPT_READ_POOL_ENABLED: "true", RIVETKIT_SQLITE_OPT_STARTUP_PRELOAD_MAX_BYTES: DEFAULT_STARTUP_PRELOAD_MAX_BYTES.toString(), RIVETKIT_SQLITE_OPT_STARTUP_PRELOAD_FIRST_PAGE_COUNT: @@ -538,10 +500,6 @@ function defaultSqliteOptimizationEnv(): Record { DEFAULT_VFS_PAGE_CACHE_CAPACITY_PAGES.toString(), RIVETKIT_SQLITE_OPT_VFS_PROTECTED_CACHE_PAGES: DEFAULT_VFS_PROTECTED_CACHE_PAGES.toString(), - RIVETKIT_SQLITE_OPT_READ_POOL_MAX_READERS: - DEFAULT_READ_POOL_MAX_READERS.toString(), - RIVETKIT_SQLITE_OPT_READ_POOL_IDLE_TTL_MS: - DEFAULT_READ_POOL_IDLE_TTL_MS.toString(), }; } @@ -588,33 +546,14 @@ const SQLITE_OPTIMIZATION_MATRIX_SCENARIOS: MatrixScenario[] = [ }, includeInImpact: true, }, - { - id: "transport-batching-only", - label: "Transport batching only", - description: - "Range reads, chunk batching, and decoded LTX cache enabled with VFS cache, preload, read-ahead, and read pool disabled.", - env: scenarioEnv({ - ...preloadDisabledEnv(), - RIVETKIT_SQLITE_OPT_READ_AHEAD_MODE: "off", - RIVETKIT_SQLITE_OPT_VFS_PAGE_CACHE_MODE: "off", - RIVETKIT_SQLITE_OPT_VFS_PAGE_CACHE_CAPACITY_PAGES: "0", - RIVETKIT_SQLITE_OPT_VFS_PROTECTED_CACHE_PAGES: "0", - RIVETKIT_SQLITE_OPT_READ_POOL_ENABLED: "false", - }), - includeInImpact: true, - }, { id: "vfs-cache-only", label: "VFS cache only", description: - "VFS page cache enabled without read-ahead, preload hints, range reads, storage decode cache, or read pool.", + "VFS page cache enabled without read-ahead or preload hints.", env: scenarioEnv({ ...preloadDisabledEnv(), RIVETKIT_SQLITE_OPT_READ_AHEAD_MODE: "off", - RIVETKIT_SQLITE_OPT_RANGE_READS: "false", - RIVETKIT_SQLITE_OPT_BATCH_CHUNK_READS: "false", - RIVETKIT_SQLITE_OPT_DECODED_LTX_CACHE: "false", - RIVETKIT_SQLITE_OPT_READ_POOL_ENABLED: "false", }), includeInImpact: true, }, @@ -622,13 +561,12 @@ const SQLITE_OPTIMIZATION_MATRIX_SCENARIOS: MatrixScenario[] = [ id: "read-ahead-no-cache", label: "Read-ahead without VFS cache", description: - "Adaptive read-ahead and range reads enabled while prefetched pages are not retained in the VFS cache.", + "Adaptive read-ahead enabled while prefetched pages are not retained in the VFS cache.", env: scenarioEnv({ ...preloadDisabledEnv(), RIVETKIT_SQLITE_OPT_VFS_PAGE_CACHE_MODE: "off", RIVETKIT_SQLITE_OPT_VFS_PAGE_CACHE_CAPACITY_PAGES: "0", RIVETKIT_SQLITE_OPT_VFS_PROTECTED_CACHE_PAGES: "0", - RIVETKIT_SQLITE_OPT_READ_POOL_ENABLED: "false", }), includeInImpact: true, }, @@ -697,37 +635,6 @@ const SQLITE_OPTIMIZATION_MATRIX_SCENARIOS: MatrixScenario[] = [ env: scenarioEnv(preloadDisabledEnv()), includeInImpact: true, }, - { - id: "no-range-reads", - label: "Default minus range reads", - description: "Current defaults with contiguous range page reads disabled.", - env: scenarioEnv({ - RIVETKIT_SQLITE_OPT_RANGE_READS: "false", - }), - includeInImpact: true, - }, - { - id: "no-storage-read-cache", - label: "Default minus storage read cache", - description: - "Current defaults with chunk batching and decoded LTX cache disabled.", - env: scenarioEnv({ - RIVETKIT_SQLITE_OPT_BATCH_CHUNK_READS: "false", - RIVETKIT_SQLITE_OPT_DECODED_LTX_CACHE: "false", - }), - includeInImpact: true, - }, - { - id: "no-read-pool", - label: "Default minus read pool", - description: "Current defaults with the SQLite read connection pool disabled.", - env: scenarioEnv({ - RIVETKIT_SQLITE_OPT_READ_POOL_ENABLED: "false", - RIVETKIT_SQLITE_OPT_READ_POOL_MAX_READERS: "0", - RIVETKIT_SQLITE_OPT_READ_POOL_IDLE_TTL_MS: "0", - }), - includeInImpact: true, - }, ]; function usage(exitCode = 1): never { @@ -1293,55 +1200,6 @@ function scrapeVfsMetrics(text: string): VfsMetricSnapshot { }; } -function scrapeReadPoolMetrics(text: string): ReadPoolMetricSnapshot { - return { - activeReaders: metricValue(text, "sqlite_read_pool_active_readers"), - idleReaders: metricValue(text, "sqlite_read_pool_idle_readers"), - readWaitDurationSecondsSum: metricValue( - text, - "sqlite_read_pool_read_wait_duration_seconds_sum", - ), - readWaitDurationSecondsCount: metricValue( - text, - "sqlite_read_pool_read_wait_duration_seconds_count", - ), - writeWaitDurationSecondsSum: metricValue( - text, - "sqlite_read_pool_write_wait_duration_seconds_sum", - ), - writeWaitDurationSecondsCount: metricValue( - text, - "sqlite_read_pool_write_wait_duration_seconds_count", - ), - routedReadQueriesTotal: metricValue( - text, - "sqlite_read_pool_routed_read_queries_total", - ), - writeFallbackQueriesTotal: metricValue( - text, - "sqlite_read_pool_write_fallback_queries_total", - ), - manualTransactionDurationSecondsSum: metricValue( - text, - "sqlite_read_pool_manual_transaction_duration_seconds_sum", - ), - manualTransactionDurationSecondsCount: metricValue( - text, - "sqlite_read_pool_manual_transaction_duration_seconds_count", - ), - readerOpensTotal: metricValue(text, "sqlite_read_pool_reader_opens_total"), - readerClosesTotal: metricValue(text, "sqlite_read_pool_reader_closes_total"), - rejectedReaderMutationsTotal: metricValue( - text, - "sqlite_read_pool_rejected_reader_mutations_total", - ), - modeTransitionsTotal: metricValue( - text, - "sqlite_read_pool_mode_transitions_total", - ), - }; -} - function diffMetrics(after: T, before: T): T { return Object.fromEntries( Object.keys(after).map((key) => [ @@ -1367,25 +1225,6 @@ function emptyVfsMetrics(): VfsMetricSnapshot { }; } -function emptyReadPoolMetrics(): ReadPoolMetricSnapshot { - return { - activeReaders: 0, - idleReaders: 0, - readWaitDurationSecondsSum: 0, - readWaitDurationSecondsCount: 0, - writeWaitDurationSecondsSum: 0, - writeWaitDurationSecondsCount: 0, - routedReadQueriesTotal: 0, - writeFallbackQueriesTotal: 0, - manualTransactionDurationSecondsSum: 0, - manualTransactionDurationSecondsCount: 0, - readerOpensTotal: 0, - readerClosesTotal: 0, - rejectedReaderMutationsTotal: 0, - modeTransitionsTotal: 0, - }; -} - function writeResults(outputDir: string, document: unknown): void { mkdirSync(outputDir, { recursive: true }); writeFileSync( @@ -1400,8 +1239,8 @@ function writeSummary(outputDir: string, results: BenchmarkResult[]): void { "", "Server SQLite time only. Setup time, sleep delay, wake/cold-start time, and client RTT are not included.", "", - "| workload | category | size | server_ms | routed_reads | write_fallbacks | mode_transitions | reader_opens | reader_closes | get_pages | fetched_pages | cache_hits | cache_misses | rows/ops | pages |", - "| --- | --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: |", + "| workload | category | size | server_ms | get_pages | fetched_pages | cache_hits | cache_misses | rows/ops | pages |", + "| --- | --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: |", ]; for (const result of results) { const rowsOrOps = @@ -1411,7 +1250,7 @@ function writeSummary(outputDir: string, results: BenchmarkResult[]): void { ? result.main.ops : ""; lines.push( - `| ${result.workload} | ${result.category} | ${fmtBytes(result.targetBytes)} | ${result.main.ms.toFixed(1)} | ${result.readPoolMetrics.routedReadQueriesTotal} | ${result.readPoolMetrics.writeFallbackQueriesTotal} | ${result.readPoolMetrics.modeTransitionsTotal} | ${result.readPoolMetrics.readerOpensTotal} | ${result.readPoolMetrics.readerClosesTotal} | ${result.vfsMetrics.getPagesTotal} | ${result.vfsMetrics.pagesFetchedTotal} | ${result.vfsMetrics.resolvePagesCacheHitsTotal} | ${result.vfsMetrics.resolvePagesCacheMissesTotal} | ${rowsOrOps} | ${result.main.pageCount} |`, + `| ${result.workload} | ${result.category} | ${fmtBytes(result.targetBytes)} | ${result.main.ms.toFixed(1)} | ${result.vfsMetrics.getPagesTotal} | ${result.vfsMetrics.pagesFetchedTotal} | ${result.vfsMetrics.resolvePagesCacheHitsTotal} | ${result.vfsMetrics.resolvePagesCacheMissesTotal} | ${rowsOrOps} | ${result.main.pageCount} |`, ); } writeFileSync(join(outputDir, "summary.md"), `${lines.join("\n")}\n`); @@ -1517,9 +1356,6 @@ function readScenarioDocument(args: Args, scenario: MatrixScenario): MatrixScena fetchedPages: result.vfsMetrics.pagesFetchedTotal, cacheHits: result.vfsMetrics.resolvePagesCacheHitsTotal, cacheMisses: result.vfsMetrics.resolvePagesCacheMissesTotal, - routedReads: result.readPoolMetrics.routedReadQueriesTotal, - writeFallbacks: result.readPoolMetrics.writeFallbackQueriesTotal, - modeTransitions: result.readPoolMetrics.modeTransitionsTotal, })), }; } @@ -1551,8 +1387,8 @@ function writeMatrixSummary(outputDir: string, scenarios: MatrixScenarioReport[] "", "Each scenario runs in a fresh process so process-wide SQLite optimization flags are read once per configuration.", "", - "| scenario | workload | server_ms | delta_vs_defaults | get_pages | fetched_pages | cache_hits | cache_misses | routed_reads | write_fallbacks |", - "| --- | --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: |", + "| scenario | workload | server_ms | delta_vs_defaults | get_pages | fetched_pages | cache_hits | cache_misses |", + "| --- | --- | ---: | ---: | ---: | ---: | ---: | ---: |", ]; for (const scenario of scenarios) { @@ -1564,7 +1400,7 @@ function writeMatrixSummary(outputDir: string, scenarios: MatrixScenarioReport[] ? `${(((result.serverMs - base.serverMs) / base.serverMs) * 100).toFixed(1)}%` : ""; lines.push( - `| ${scenario.id} | ${result.workload} | ${result.serverMs.toFixed(1)} | ${delta} | ${result.getPages} | ${result.fetchedPages} | ${result.cacheHits} | ${result.cacheMisses} | ${result.routedReads} | ${result.writeFallbacks} |`, + `| ${scenario.id} | ${result.workload} | ${result.serverMs.toFixed(1)} | ${delta} | ${result.getPages} | ${result.fetchedPages} | ${result.cacheHits} | ${result.cacheMisses} |`, ); } } @@ -1798,12 +1634,8 @@ async function main(): Promise { scrapeVfsMetrics(afterMainMetricsText), emptyVfsMetrics(), ); - const readPoolMetrics = diffMetrics( - scrapeReadPoolMetrics(afterMainMetricsText), - emptyReadPoolMetrics(), - ); console.log( - ` server=${fmtMs(mainResult.ms)} pages=${mainResult.pageCount} routed_reads=${readPoolMetrics.routedReadQueriesTotal} write_fallbacks=${readPoolMetrics.writeFallbackQueriesTotal} mode_transitions=${readPoolMetrics.modeTransitionsTotal} get_pages=${vfsMetrics.getPagesTotal} fetched_pages=${vfsMetrics.pagesFetchedTotal}`, + ` server=${fmtMs(mainResult.ms)} pages=${mainResult.pageCount} get_pages=${vfsMetrics.getPagesTotal} fetched_pages=${vfsMetrics.pagesFetchedTotal}`, ); results.push({ @@ -1817,7 +1649,6 @@ async function main(): Promise { setup, main: mainResult, vfsMetrics, - readPoolMetrics, }); writeResults(outputDir, resultDocument); writeSummary(outputDir, results); @@ -1830,7 +1661,7 @@ async function main(): Promise { console.log("\nResults"); for (const result of results) { console.log( - ` ${result.workload}: server=${fmtMs(result.main.ms)} size=${fmtBytes(result.targetBytes)} routed_reads=${result.readPoolMetrics.routedReadQueriesTotal} write_fallbacks=${result.readPoolMetrics.writeFallbackQueriesTotal} mode_transitions=${result.readPoolMetrics.modeTransitionsTotal} get_pages=${result.vfsMetrics.getPagesTotal} fetched_pages=${result.vfsMetrics.pagesFetchedTotal}`, + ` ${result.workload}: server=${fmtMs(result.main.ms)} size=${fmtBytes(result.targetBytes)} get_pages=${result.vfsMetrics.getPagesTotal} fetched_pages=${result.vfsMetrics.pagesFetchedTotal}`, ); } console.log(`\nwrote ${join(outputDir, "results.json")}`); diff --git a/examples/kitchen-sink/tests/sqlite-realworld-bench.test.ts b/examples/kitchen-sink/tests/sqlite-realworld-bench.test.ts index c85505345f..0424d3e921 100644 --- a/examples/kitchen-sink/tests/sqlite-realworld-bench.test.ts +++ b/examples/kitchen-sink/tests/sqlite-realworld-bench.test.ts @@ -42,16 +42,9 @@ test("SQLite real-world benchmark includes read-mode/write-mode scenarios", () = ); assert.match(actor, /Promise\.all\(\[/); assert.match(actor, /UPDATE rw_orders SET total_cents = total_cents \+ 1/); - for (const metric of [ - "sqlite_read_pool_routed_read_queries_total", - "sqlite_read_pool_write_fallback_queries_total", - "sqlite_read_pool_mode_transitions_total", - ]) { - assert.match(runner, new RegExp(metric)); - } assert.match( runner, - /\| workload \| category \| size \| server_ms \| routed_reads \| write_fallbacks \| mode_transitions \|/, + /\| workload \| category \| size \| server_ms \| get_pages \| fetched_pages \|/, ); }); @@ -66,16 +59,12 @@ test("SQLite real-world benchmark defines an optimization impact matrix", () => for (const scenario of [ "defaults", "all-off", - "transport-batching-only", "vfs-cache-only", "read-ahead-no-cache", "cache-read-ahead-no-preload", "no-read-ahead", "no-vfs-cache", "no-preload", - "no-range-reads", - "no-storage-read-cache", - "no-read-pool", ]) { assert.match(runner, new RegExp(`id: "${scenario}"`)); } diff --git a/rivetkit-typescript/packages/rivetkit/src/client/actor-common.ts b/rivetkit-typescript/packages/rivetkit/src/client/actor-common.ts index a108d66ef1..193e7acdc2 100644 --- a/rivetkit-typescript/packages/rivetkit/src/client/actor-common.ts +++ b/rivetkit-typescript/packages/rivetkit/src/client/actor-common.ts @@ -33,6 +33,7 @@ export type ActorActionFunction< export interface ActorGatewayOptions { bypassConnectable?: boolean; + skipReadyWait?: boolean; } export type ResolvedActorGatewayOptions = Required; @@ -41,9 +42,16 @@ export function resolveActorGatewayOptions( defaults: ActorGatewayOptions = {}, overrides?: ActorGatewayOptions, ): ResolvedActorGatewayOptions { + const bypassConnectable = + overrides?.bypassConnectable ?? + overrides?.skipReadyWait ?? + defaults.bypassConnectable ?? + defaults.skipReadyWait ?? + false; + return { - bypassConnectable: - overrides?.bypassConnectable ?? defaults.bypassConnectable ?? false, + bypassConnectable, + skipReadyWait: bypassConnectable, }; } diff --git a/rivetkit-typescript/packages/rivetkit/src/engine-client/actor-http-client.ts b/rivetkit-typescript/packages/rivetkit/src/engine-client/actor-http-client.ts index 4220e25e0c..4783400d22 100644 --- a/rivetkit-typescript/packages/rivetkit/src/engine-client/actor-http-client.ts +++ b/rivetkit-typescript/packages/rivetkit/src/engine-client/actor-http-client.ts @@ -5,7 +5,10 @@ import { HEADER_RIVET_TARGET, HEADER_RIVET_TOKEN, } from "@/common/actor-router-consts"; -import type { GatewayRequestOptions } from "./driver"; +import { + shouldBypassConnectable, + type GatewayRequestOptions, +} from "./driver"; export interface HttpGatewayRequestOptions extends GatewayRequestOptions { directActorId?: string; @@ -79,7 +82,7 @@ function buildGuardHeaders( headers.set(HEADER_RIVET_TARGET, "actor"); headers.set(HEADER_RIVET_ACTOR, options.directActorId); } - if (options.bypassConnectable) { + if (shouldBypassConnectable(options)) { headers.set(HEADER_RIVET_BYPASS_CONNECTABLE, "1"); } return headers; diff --git a/rivetkit-typescript/packages/rivetkit/src/engine-client/actor-websocket-client.ts b/rivetkit-typescript/packages/rivetkit/src/engine-client/actor-websocket-client.ts index d4ffe13a0d..9442c988ae 100644 --- a/rivetkit-typescript/packages/rivetkit/src/engine-client/actor-websocket-client.ts +++ b/rivetkit-typescript/packages/rivetkit/src/engine-client/actor-websocket-client.ts @@ -18,7 +18,10 @@ import type { ActorGatewayQuery, CrashPolicy } from "@/client/query"; import type { Encoding, UniversalWebSocket } from "@/mod"; import { encodeCborCompat, uint8ArrayToBase64 } from "@/serde"; import { combineUrlPath } from "@/utils"; -import type { GatewayRequestOptions } from "./driver"; +import { + shouldBypassConnectable, + type GatewayRequestOptions, +} from "./driver"; import { logger } from "./log"; class BufferedRemoteWebSocket implements UniversalWebSocket { @@ -269,7 +272,7 @@ export function buildActorQueryGatewayUrl( if (token !== undefined) { params.append("rvt-token", token); } - if (options.bypassConnectable) { + if (shouldBypassConnectable(options)) { params.append("rvt-bypass_connectable", "true"); } @@ -392,7 +395,7 @@ export function buildWebSocketProtocols( protocols.push(`${WS_PROTOCOL_TARGET}${target.target}`); protocols.push(`${WS_PROTOCOL_ACTOR}${target.actorId}`); } - if (options.bypassConnectable) { + if (shouldBypassConnectable(options)) { protocols.push(WS_PROTOCOL_BYPASS_CONNECTABLE); } if (params) { diff --git a/rivetkit-typescript/packages/rivetkit/src/engine-client/driver.ts b/rivetkit-typescript/packages/rivetkit/src/engine-client/driver.ts index c2c65c6264..83919c570d 100644 --- a/rivetkit-typescript/packages/rivetkit/src/engine-client/driver.ts +++ b/rivetkit-typescript/packages/rivetkit/src/engine-client/driver.ts @@ -8,6 +8,13 @@ export type GatewayTarget = { directId: string } | ActorQuery; export interface GatewayRequestOptions { bypassConnectable?: boolean; + skipReadyWait?: boolean; +} + +export function shouldBypassConnectable( + options: GatewayRequestOptions = {}, +): boolean { + return options.bypassConnectable === true || options.skipReadyWait === true; } export interface EngineControlClient { diff --git a/rivetkit-typescript/packages/rivetkit/src/engine-client/mod.ts b/rivetkit-typescript/packages/rivetkit/src/engine-client/mod.ts index 5bfb7dfe5a..ef6eb41aea 100644 --- a/rivetkit-typescript/packages/rivetkit/src/engine-client/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/engine-client/mod.ts @@ -9,6 +9,7 @@ import { } from "@/common/actor-router-consts"; import { noopNext } from "@/common/utils"; import type { Actor as ApiActor } from "@/engine-api/actors"; +import { shouldBypassConnectable } from "@/engine-client/driver"; import type { ActorOutput, CreateInput, @@ -264,7 +265,7 @@ export class RemoteEngineControlClient implements EngineControlClient { ); const httpOptions = { ...options, - directActorId: options.bypassConnectable + directActorId: shouldBypassConnectable(options) ? directActorIdFromTarget(target) : undefined, }; @@ -299,7 +300,7 @@ export class RemoteEngineControlClient implements EngineControlClient { params, { ...options, - directActorId: options.bypassConnectable + directActorId: shouldBypassConnectable(options) ? directActorIdFromTarget(target) : undefined, }, @@ -424,7 +425,7 @@ export class RemoteEngineControlClient implements EngineControlClient { const endpoint = getEndpoint(this.#config); if ( - options.bypassConnectable && + shouldBypassConnectable(options) && directActorIdFromTarget(target) && canUseDirectBypassPath(path) ) { diff --git a/rivetkit-typescript/packages/rivetkit/tests/actor-gateway-url.test.ts b/rivetkit-typescript/packages/rivetkit/tests/actor-gateway-url.test.ts index e198e3b3fb..1aec332f61 100644 --- a/rivetkit-typescript/packages/rivetkit/tests/actor-gateway-url.test.ts +++ b/rivetkit-typescript/packages/rivetkit/tests/actor-gateway-url.test.ts @@ -7,6 +7,7 @@ import { import { buildActorGatewayUrl, buildActorQueryGatewayUrl, + buildWebSocketProtocols, } from "@/engine-client/actor-websocket-client"; import { toBase64Url } from "./test-utils"; @@ -56,7 +57,30 @@ describe("gateway URL builders", () => { expect(url).not.toContain("@"); }); - test("serializes gateway bypass for query routing", () => { + test("serializes skipReadyWait for query routing", () => { + const url = buildActorQueryGatewayUrl( + "https://api.rivet.dev/manager", + "prod", + { + getForKey: { + name: "room", + key: ["alpha"], + }, + }, + undefined, + "/status", + undefined, + undefined, + undefined, + { skipReadyWait: true }, + ); + + expect(new URL(url).searchParams.get("rvt-bypass_connectable")).toBe( + "true", + ); + }); + + test("serializes bypassConnectable for query routing", () => { const url = buildActorQueryGatewayUrl( "https://api.rivet.dev/manager", "prod", @@ -79,6 +103,19 @@ describe("gateway URL builders", () => { ); }); + test("serializes bypassConnectable for websocket protocols", () => { + const protocols = buildWebSocketProtocols( + ClientConfigSchema.parse({ endpoint: "https://api.rivet.dev" }), + "json", + undefined, + undefined, + { target: "actor", actorId: "actor-1" }, + { bypassConnectable: true }, + ); + + expect(protocols).toContain("rivet_bypass_connectable"); + }); + test("serializes getOrCreate queries with rvt-* params", () => { const input = { hello: "world" }; const url = buildActorQueryGatewayUrl( diff --git a/website/src/content/docs/actors/lifecycle.mdx b/website/src/content/docs/actors/lifecycle.mdx index 8e0e67ba62..e565fe07ee 100644 --- a/website/src/content/docs/actors/lifecycle.mdx +++ b/website/src/content/docs/actors/lifecycle.mdx @@ -761,6 +761,12 @@ curl -X POST \ `/sleep` asks the actor to enter the normal sleep shutdown sequence. `/reschedule` asks the platform to allocate the actor again, which is useful after crashes or when you need to force a fresh placement. Both endpoints require the actor ID and namespace. +### Skip Ready Wait + +The gateway normally holds requests until the actor is ready. The actor is not ready during startup (before `onWake` finishes) or during the sleep grace period (while `onSleep` and `waitUntil` are running). Probes and readiness checks can opt out with `gateway.skipReadyWait` to reach the actor's `onRequest` or `onWebSocket` handler in either window. + +See [Skip Ready Wait](/docs/clients/javascript#skip-ready-wait) on the JavaScript client page for usage. + ### Keeping the Actor Awake RivetKit gives you two primitives for holding the actor awake across background work. Both take a `Promise` and differ in how they interact with idle sleep and the grace period. diff --git a/website/src/content/docs/actors/request-handler.mdx b/website/src/content/docs/actors/request-handler.mdx index 1755f448f4..efb62d293e 100644 --- a/website/src/content/docs/actors/request-handler.mdx +++ b/website/src/content/docs/actors/request-handler.mdx @@ -249,6 +249,12 @@ The `onRequest` handler is WinterTC compliant and will work with existing librar - Does not support streaming responses & server-sent events at the moment. See the [tracking issue](https://github.com/rivet-dev/rivet/issues/3529). - `OPTIONS` requests currently are handled by Rivet and are not passed to `onRequest` +## Advanced + +### Skip Ready Wait + +Requests are normally held at the gateway until the actor is ready. Pass `gateway.skipReadyWait: true` on `handle.fetch()` to deliver immediately, including while the actor is still starting or in the [sleep grace period](/docs/actors/lifecycle#shutdown-sequence). See [Skip Ready Wait](/docs/clients/javascript#skip-ready-wait) for details. + ## API Reference - [`RequestContext`](/typedoc/interfaces/rivetkit.mod.RequestContext.html) - Context for HTTP request handlers diff --git a/website/src/content/docs/actors/websocket-handler.mdx b/website/src/content/docs/actors/websocket-handler.mdx index 5e21c07ed5..a02dce7db7 100644 --- a/website/src/content/docs/actors/websocket-handler.mdx +++ b/website/src/content/docs/actors/websocket-handler.mdx @@ -295,6 +295,10 @@ const myActor = actor({ }); ``` +### Skip Ready Wait + +Connections are normally held at the gateway until the actor is ready. Pass `gateway.skipReadyWait: true` on `handle.webSocket()` to connect immediately, including while the actor is still starting or in the [sleep grace period](/docs/actors/lifecycle#shutdown-sequence). See [Skip Ready Wait](/docs/clients/javascript#skip-ready-wait) for details. + ### Async Handlers The `onWebSocket` handler can be async, allowing you to perform async code before setting up event listeners: diff --git a/website/src/content/docs/clients/javascript.mdx b/website/src/content/docs/clients/javascript.mdx index 4371f7a4ef..c7cd99c169 100644 --- a/website/src/content/docs/clients/javascript.mdx +++ b/website/src/content/docs/clients/javascript.mdx @@ -253,6 +253,31 @@ https://namespace:token@api.rivet.dev You can also pass the endpoint without auth and provide `RIVET_NAMESPACE` and `RIVET_TOKEN` separately. For serverless deployments, use your app's `/api/rivet` URL. See [Endpoints](/docs/general/endpoints#url-auth-syntax) for details. +## Advanced + +### Skip Ready Wait + +Requests are normally held at the gateway until the actor is ready to accept traffic. An actor is not ready while it's still starting (before `onWake` finishes) or while it's in the [sleep grace period](/docs/actors/lifecycle#shutdown-sequence) (running `onSleep`, `waitUntil`, and pending disconnects). + +Pass `gateway.skipReadyWait: true` on the [low-level HTTP and WebSocket APIs](#low-level-http--websocket) to deliver immediately and reach the actor's `onRequest` / `onWebSocket` handler in either window: + +```ts @nocheck +import { createClient } from "rivetkit/client"; + +const client = createClient(); +const handle = client.chatRoom.getOrCreate(["general"]); + +const response = await handle.fetch("/healthz", { + gateway: { skipReadyWait: true }, +}); + +const ws = await handle.webSocket("probe", undefined, { + gateway: { skipReadyWait: true }, +}); +``` + +Requests still return a transient `actor.stopping` lifecycle error (`{"group":"actor","code":"stopping","message":"Actor is stopping."}`) if the actor has fully stopped, i.e. the sleep grace period has ended but it has not yet restarted. Retry once the actor is available again. + ## API Reference **Package:** [rivetkit](https://www.npmjs.com/package/rivetkit)