From 870215771061934ca746c4ae099a24b247f24815 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Sun, 3 May 2026 19:13:48 -0700 Subject: [PATCH] feat(depot-client): wire sqlite optimization flags --- engine/packages/depot-client/Cargo.toml | 2 +- engine/packages/depot-client/src/database.rs | 13 +- engine/packages/depot-client/src/vfs.rs | 284 +++++++++++++++--- .../packages/depot-client/tests/inline/vfs.rs | 123 +++++++- .../scripts/sqlite-realworld-bench.ts | 193 +----------- .../tests/sqlite-realworld-bench.test.ts | 13 +- 6 files changed, 391 insertions(+), 237 deletions(-) diff --git a/engine/packages/depot-client/Cargo.toml b/engine/packages/depot-client/Cargo.toml index d9861bf1c6..221a7164c1 100644 --- a/engine/packages/depot-client/Cargo.toml +++ b/engine/packages/depot-client/Cargo.toml @@ -23,6 +23,7 @@ depot-client-types.workspace = true depot.workspace = true moka = { version = "0.12", default-features = false, features = ["sync"] } parking_lot.workspace = true +scc.workspace = true [dev-dependencies] depot = { workspace = true, features = ["test-faults"] } @@ -31,7 +32,6 @@ gas.workspace = true rivet-config.workspace = true rivet-pools.workspace = true rivet-test-deps.workspace = true -scc.workspace = true sha2.workspace = true tempfile.workspace = true universaldb.workspace = true diff --git a/engine/packages/depot-client/src/database.rs b/engine/packages/depot-client/src/database.rs index b6d621eb39..5310583de7 100644 --- a/engine/packages/depot-client/src/database.rs +++ b/engine/packages/depot-client/src/database.rs @@ -9,7 +9,7 @@ use crate::{ vfs::{ NativeVfsHandle, SqliteTransportHandle, SqliteVfs, SqliteVfsMetrics, SqliteVfsMetricsSnapshot, VfsConfig, VfsPreloadHintSnapshot, - fetch_initial_main_page_for_registration, + fetch_initial_pages_for_registration, }, worker::SqliteWorkerHandle, }; @@ -32,17 +32,18 @@ pub async fn open_database_from_transport( metrics: Option>, ) -> Result { let vfs_name = vfs_name_for_actor_database(&actor_id, generation); - let initial_main_page = fetch_initial_main_page_for_registration(transport.clone(), &actor_id) + let config = VfsConfig::default(); + let initial_pages = fetch_initial_pages_for_registration(transport.clone(), &actor_id, &config) .await - .map_err(|e| anyhow!("failed to preload sqlite main page: {e}"))?; + .map_err(|e| anyhow!("failed to preload sqlite pages: {e}"))?; let vfs = Arc::new( - SqliteVfs::register_with_transport_and_initial_page( + SqliteVfs::register_with_transport_and_initial_pages( &vfs_name, transport, actor_id.clone(), rt_handle, - VfsConfig::default(), - initial_main_page, + config, + initial_pages, metrics.clone(), ) .map_err(|e| anyhow!("failed to register sqlite VFS: {e}"))?, diff --git a/engine/packages/depot-client/src/vfs.rs b/engine/packages/depot-client/src/vfs.rs index a419739e33..b8ddc6b409 100644 --- a/engine/packages/depot-client/src/vfs.rs +++ b/engine/packages/depot-client/src/vfs.rs @@ -16,11 +16,13 @@ use libsqlite3_sys::*; use moka::sync::Cache; use parking_lot::{Mutex, RwLock}; use rivet_envoy_protocol as protocol; +use scc::HashMap as SccHashMap; use tokio::runtime::Handle; -use crate::optimization_flags::{SqliteOptimizationFlags, sqlite_optimization_flags}; +use crate::optimization_flags::{ + SqliteOptimizationFlags, SqliteVfsPageCacheMode, sqlite_optimization_flags, +}; -const DEFAULT_CACHE_CAPACITY_PAGES: u64 = 50_000; const DEFAULT_PREFETCH_DEPTH: usize = 64; const LEGACY_PREFETCH_DEPTH: usize = 16; const DEFAULT_MAX_PREFETCH_BYTES: usize = 256 * 1024; @@ -130,11 +132,18 @@ fn sqlite_now_ms() -> Result { #[derive(Debug, Clone)] pub struct VfsConfig { pub cache_capacity_pages: u64, + pub protected_cache_pages: usize, + pub page_cache_mode: SqliteVfsPageCacheMode, pub prefetch_depth: usize, pub adaptive_prefetch_depth: usize, pub max_prefetch_bytes: usize, pub adaptive_max_prefetch_bytes: usize, pub max_pages_per_stage: usize, + pub startup_preload_max_bytes: usize, + pub startup_preload_first_pages: bool, + pub startup_preload_first_page_count: u32, + pub preload_hints_on_open: bool, + pub preload_hint_early_pages: bool, pub recent_hint_page_budget: usize, pub recent_hint_range_budget: usize, pub cache_hit_predictor_training: bool, @@ -152,8 +161,19 @@ impl Default for VfsConfig { impl VfsConfig { pub fn from_optimization_flags(flags: SqliteOptimizationFlags) -> Self { + let caches_pages = flags.vfs_page_cache_mode.caches_any_pages(); Self { - cache_capacity_pages: DEFAULT_CACHE_CAPACITY_PAGES, + cache_capacity_pages: if caches_pages { + flags.vfs_page_cache_capacity_pages + } else { + 0 + }, + protected_cache_pages: if caches_pages { + flags.vfs_protected_cache_pages + } else { + 0 + }, + page_cache_mode: flags.vfs_page_cache_mode, prefetch_depth: if flags.read_ahead { DEFAULT_PREFETCH_DEPTH } else { @@ -163,12 +183,17 @@ impl VfsConfig { max_prefetch_bytes: DEFAULT_MAX_PREFETCH_BYTES, adaptive_max_prefetch_bytes: DEFAULT_ADAPTIVE_MAX_PREFETCH_BYTES, max_pages_per_stage: DEFAULT_MAX_PAGES_PER_STAGE, - recent_hint_page_budget: if flags.recent_page_hints { + startup_preload_max_bytes: flags.startup_preload_max_bytes, + startup_preload_first_pages: flags.startup_preload_first_pages, + startup_preload_first_page_count: flags.startup_preload_first_page_count, + preload_hints_on_open: flags.preload_hints_on_open, + preload_hint_early_pages: flags.preload_hint_early_pages, + recent_hint_page_budget: if flags.recent_page_hints && flags.preload_hint_hot_pages { DEFAULT_RECENT_HINT_PAGE_BUDGET } else { 0 }, - recent_hint_range_budget: if flags.recent_page_hints { + recent_hint_range_budget: if flags.recent_page_hints && flags.preload_hint_scan_ranges { DEFAULT_RECENT_HINT_RANGE_BUDGET } else { 0 @@ -229,7 +254,6 @@ pub struct SqliteVfsMetricsSnapshot { pub total_ns: u64, pub commit_count: u64, pub page_cache_entries: u64, - pub page_cache_weighted_size: u64, pub page_cache_capacity_pages: u64, pub write_buffer_dirty_pages: u64, pub db_size_pages: u64, @@ -320,6 +344,7 @@ struct VfsState { db_size_pages: u32, page_size: usize, page_cache: Cache>, + protected_page_cache: Arc>>, write_buffer: WriteBuffer, predictor: PrefetchPredictor, read_ahead: AdaptiveReadAhead, @@ -405,6 +430,13 @@ struct AuxFileHandle { delete_on_close: bool, } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum PageCacheInsertKind { + Target, + Prefetch, + Startup, +} + unsafe impl Send for VfsContext {} unsafe impl Sync for VfsContext {} @@ -761,12 +793,11 @@ impl VfsState { let page_cache = Cache::builder() .max_capacity(config.cache_capacity_pages) .build(); - page_cache.insert(1, empty_db_page()); - - Self { + let mut state = Self { db_size_pages: 1, page_size: DEFAULT_PAGE_SIZE, page_cache, + protected_page_cache: Arc::new(SccHashMap::new()), write_buffer: WriteBuffer::default(), predictor: PrefetchPredictor::default(), read_ahead: AdaptiveReadAhead::default(), @@ -775,20 +806,98 @@ impl VfsState { config.recent_hint_range_budget, ), dead: false, + }; + state.cache_page(config, PageCacheInsertKind::Target, 1, empty_db_page()); + state + } + + fn cache_page( + &mut self, + config: &VfsConfig, + kind: PageCacheInsertKind, + pgno: u32, + bytes: Vec, + ) { + if !should_cache_page(config, kind, pgno) { + return; + } + cache_page( + config, + &self.page_cache, + &self.protected_page_cache, + kind, + pgno, + bytes, + ); + } + + fn cached_page(&self, config: &VfsConfig, pgno: u32) -> Option> { + if !can_read_cached_page(config, pgno) { + return None; + } + self + .protected_page_cache + .read_sync(&pgno, |_, bytes| bytes.clone()) + .or_else(|| self.page_cache.get(&pgno)) + } + + fn seed_page(&mut self, config: &VfsConfig, kind: PageCacheInsertKind, pgno: u32, page: Vec) { + if pgno == 1 { + self.seed_main_page(config, kind, page); + } else { + self.cache_page(config, kind, pgno, page); } } - fn seed_main_page(&mut self, page: Vec) { + fn seed_main_page(&mut self, config: &VfsConfig, kind: PageCacheInsertKind, page: Vec) { if let Some(page_size) = sqlite_header_page_size(&page) { self.page_size = page_size; } if let Some(db_size_pages) = sqlite_header_db_size_pages(&page) { self.db_size_pages = db_size_pages; } - self.page_cache.insert(1, page); + self.cache_page(config, kind, 1, page); + } + + fn invalidate_page_cache(&mut self) { + self.page_cache.invalidate_all(); + self.protected_page_cache.clear_sync(); + } +} + +fn cache_page( + config: &VfsConfig, + page_cache: &Cache>, + protected_page_cache: &SccHashMap>, + kind: PageCacheInsertKind, + pgno: u32, + bytes: Vec, +) { + if !should_cache_page(config, kind, pgno) { + return; + } + if pgno <= config.protected_cache_pages as u32 { + let _ = protected_page_cache.upsert_sync(pgno, bytes); + } else { + page_cache.insert(pgno, bytes); + } +} + +fn should_cache_page(config: &VfsConfig, kind: PageCacheInsertKind, pgno: u32) -> bool { + if pgno == 1 { + return true; + } + match kind { + PageCacheInsertKind::Target => config.page_cache_mode.caches_target_pages(), + PageCacheInsertKind::Prefetch => config.page_cache_mode.caches_prefetched_pages(), + PageCacheInsertKind::Startup => config.page_cache_mode.caches_startup_preloaded_pages(), } } +fn can_read_cached_page(config: &VfsConfig, pgno: u32) -> bool { + pgno == 1 || config.page_cache_mode.caches_any_pages() +} + impl VfsContext { fn new( actor_id: String, @@ -796,12 +905,12 @@ impl VfsContext { transport: SqliteTransportHandle, config: VfsConfig, io_methods: sqlite3_io_methods, - initial_main_page: Option>, + initial_pages: Vec<(u32, Vec)>, metrics: Option>, ) -> std::result::Result { let mut state = VfsState::new(&config); - if let Some(page) = initial_main_page { - state.seed_main_page(page); + for (pgno, page) in initial_pages { + state.seed_page(&config, PageCacheInsertKind::Startup, pgno, page); } Ok(Self { @@ -887,8 +996,10 @@ impl VfsContext { state_update_ns: self.commit_state_update_ns.load(Ordering::Relaxed), total_ns: self.commit_duration_ns_total.load(Ordering::Relaxed), commit_count: self.commit_total.load(Ordering::Relaxed), - page_cache_entries: state.page_cache.entry_count(), - page_cache_weighted_size: state.page_cache.weighted_size(), + page_cache_entries: state + .page_cache + .entry_count() + .saturating_add(state.protected_page_cache.len() as u64), page_cache_capacity_pages: self.config.cache_capacity_pages, write_buffer_dirty_pages: state.write_buffer.dirty.len() as u64, db_size_pages: state.db_size_pages as u64, @@ -972,7 +1083,27 @@ impl VfsContext { if !self.config.recent_page_hints { return VfsPreloadHintSnapshot::default(); } - self.state.read().recent_pages.snapshot() + let state = self.state.read(); + let mut snapshot = state.recent_pages.snapshot(); + if self.config.preload_hint_early_pages { + let mut existing_pgnos = snapshot.pgnos.iter().copied().collect::>(); + let early_page_count = self + .config + .startup_preload_first_page_count + .min(state.db_size_pages); + for pgno in 1..=early_page_count { + if !snapshot + .ranges + .iter() + .any(|range| range.contains(pgno)) + && existing_pgnos.insert(pgno) + { + snapshot.pgnos.push(pgno); + } + } + snapshot.pgnos.sort_unstable(); + } + snapshot } fn resolve_pages( @@ -1006,7 +1137,7 @@ impl VfsContext { resolved.insert(pgno, Some(bytes.clone())); continue; } - if let Some(bytes) = state.page_cache.get(&pgno) { + if let Some(bytes) = state.cached_page(&self.config, pgno) { resolved.insert(pgno, Some(bytes)); continue; } @@ -1145,7 +1276,11 @@ impl VfsContext { match response { protocol::SqliteGetPagesResponse::SqliteGetPagesOk(ok) => { - let page_cache = { self.state.read().page_cache.clone() }; + let missing_pages = missing.iter().copied().collect::>(); + let (page_cache, protected_page_cache) = { + let state = self.state.read(); + (state.page_cache.clone(), state.protected_page_cache.clone()) + }; #[cfg(debug_assertions)] let mut returned_pgnos = HashSet::new(); #[cfg(debug_assertions)] @@ -1164,7 +1299,19 @@ impl VfsContext { } } if let Some(bytes) = &fetched.bytes { - page_cache.insert(fetched.pgno, bytes.clone()); + let kind = if missing_pages.contains(&fetched.pgno) { + PageCacheInsertKind::Target + } else { + PageCacheInsertKind::Prefetch + }; + cache_page( + &self.config, + &page_cache, + &protected_page_cache, + kind, + fetched.pgno, + bytes.clone(), + ); } resolved.insert(fetched.pgno, fetched.bytes); } @@ -1303,9 +1450,12 @@ impl VfsContext { let mut state = self.state.write(); state.db_size_pages = request.new_db_size_pages; for dirty_page in &request.dirty_pages { - state - .page_cache - .insert(dirty_page.pgno, dirty_page.bytes.clone()); + state.cache_page( + &self.config, + PageCacheInsertKind::Target, + dirty_page.pgno, + dirty_page.bytes.clone(), + ); } state.write_buffer.dirty.clear(); let state_update_ns = state_update_start.elapsed().as_nanos() as u64; @@ -1413,9 +1563,12 @@ impl VfsContext { let mut state = self.state.write(); state.db_size_pages = request.new_db_size_pages; for dirty_page in &request.dirty_pages { - state - .page_cache - .insert(dirty_page.pgno, dirty_page.bytes.clone()); + state.cache_page( + &self.config, + PageCacheInsertKind::Target, + dirty_page.pgno, + dirty_page.bytes.clone(), + ); } state.write_buffer.dirty.clear(); state.write_buffer.in_atomic_write = false; @@ -1438,7 +1591,7 @@ impl VfsContext { .write_buffer .dirty .retain(|pgno, _| *pgno <= truncated_pages); - state.page_cache.invalidate_all(); + state.invalidate_page_cache(); } } @@ -1519,15 +1672,47 @@ pub(crate) async fn fetch_initial_main_page_for_registration( fetch_initial_main_page(transport, actor_id.to_string()).await } +pub(crate) async fn fetch_initial_pages_for_registration( + transport: SqliteTransportHandle, + actor_id: &str, + config: &VfsConfig, +) -> std::result::Result)>, String> { + if !config.startup_preload_first_pages + || !config.page_cache_mode.caches_startup_preloaded_pages() + || config.startup_preload_max_bytes < DEFAULT_PAGE_SIZE + { + return fetch_initial_main_page_for_registration(transport, actor_id) + .await + .map(|page| page.into_iter().map(|page| (1, page)).collect()); + } + + let page_count_from_bytes = config.startup_preload_max_bytes / DEFAULT_PAGE_SIZE; + let page_count = config + .startup_preload_first_page_count + .min(page_count_from_bytes as u32) + .max(1); + fetch_initial_pages(transport, actor_id.to_string(), page_count).await +} + async fn fetch_initial_main_page( transport: SqliteTransportHandle, actor_id: String, ) -> std::result::Result>, String> { + fetch_initial_pages(transport, actor_id, 1) + .await + .map(|pages| pages.into_iter().find(|(pgno, _)| *pgno == 1).map(|(_, bytes)| bytes)) +} + +async fn fetch_initial_pages( + transport: SqliteTransportHandle, + actor_id: String, + page_count: u32, +) -> std::result::Result)>, String> { let request_actor_id = actor_id.clone(); let response = transport .get_pages(protocol::SqliteGetPagesRequest { actor_id: request_actor_id, - pgnos: vec![1], + pgnos: (1..=page_count).collect(), expected_generation: None, expected_head_txid: None, }) @@ -1537,8 +1722,8 @@ async fn fetch_initial_main_page( Ok(protocol::SqliteGetPagesResponse::SqliteGetPagesOk(ok)) => Ok(ok .pages .into_iter() - .find(|page| page.pgno == 1) - .and_then(|page| page.bytes)), + .filter_map(|page| page.bytes.map(|bytes| (page.pgno, bytes))) + .collect()), Ok(protocol::SqliteGetPagesResponse::SqliteErrorResponse(error)) => { if !is_initial_main_page_missing(&error.message) { return Err(format!( @@ -1551,7 +1736,7 @@ async fn fetch_initial_main_page( error = %error.message, "sqlite initial page fetch did not find persisted data" ); - Ok(None) + Ok(Vec::new()) } Err(err) => Err(format!("sqlite initial page fetch failed: {err}")), } @@ -2510,11 +2695,18 @@ impl SqliteVfs { config: VfsConfig, metrics: Option>, ) -> std::result::Result { - Self::register_with_transport_and_initial_page( - name, transport, actor_id, runtime, config, None, metrics, + Self::register_with_transport_and_initial_pages( + name, + transport, + actor_id, + runtime, + config, + Vec::new(), + metrics, ) } + #[cfg(test)] pub(crate) fn register_with_transport_and_initial_page( name: &str, transport: SqliteTransportHandle, @@ -2523,6 +2715,30 @@ impl SqliteVfs { config: VfsConfig, initial_main_page: Option>, metrics: Option>, + ) -> std::result::Result { + let initial_pages = initial_main_page + .into_iter() + .map(|page| (1, page)) + .collect(); + Self::register_with_transport_and_initial_pages( + name, + transport, + actor_id, + runtime, + config, + initial_pages, + metrics, + ) + } + + pub(crate) fn register_with_transport_and_initial_pages( + name: &str, + transport: SqliteTransportHandle, + actor_id: String, + runtime: Handle, + config: VfsConfig, + initial_pages: Vec<(u32, Vec)>, + metrics: Option>, ) -> std::result::Result { let mut io_methods: sqlite3_io_methods = unsafe { std::mem::zeroed() }; io_methods.iVersion = 1; @@ -2545,7 +2761,7 @@ impl SqliteVfs { transport, config, io_methods, - initial_main_page, + initial_pages, metrics, )?); let ctx_ptr = (&mut *ctx) as *mut VfsContext; diff --git a/engine/packages/depot-client/tests/inline/vfs.rs b/engine/packages/depot-client/tests/inline/vfs.rs index 679f114d4c..1cea01ec79 100644 --- a/engine/packages/depot-client/tests/inline/vfs.rs +++ b/engine/packages/depot-client/tests/inline/vfs.rs @@ -11,12 +11,19 @@ use std::sync::{Arc, Barrier, mpsc}; use std::thread; use std::time::Duration; +use async_trait::async_trait; use depot::cold_tier::FilesystemColdTier; use parking_lot::Mutex as SyncMutex; +use rivet_envoy_protocol as protocol; use tempfile::TempDir; use tokio::runtime::Builder; use tokio::sync::OnceCell; +use crate::optimization_flags::{ + DEFAULT_STARTUP_PRELOAD_MAX_BYTES, DEFAULT_VFS_PAGE_CACHE_CAPACITY_PAGES, + DEFAULT_VFS_PROTECTED_CACHE_PAGES, SqliteOptimizationFlags, SqliteReadAheadMode, + SqliteVfsPageCacheMode, +}; use crate::query::{BindParam, ColumnValue}; use crate::vfs::SqliteVfsMetrics; @@ -24,6 +31,116 @@ use super::*; static TEST_ID: AtomicU64 = AtomicU64::new(1); +#[test] +fn vfs_config_wires_optimization_flags() { + let flags = SqliteOptimizationFlags { + read_ahead_mode: SqliteReadAheadMode::Off, + read_ahead: false, + adaptive_read_ahead: false, + recent_page_hints: true, + cache_hit_predictor_training: true, + preload_hint_flush: true, + startup_preload_max_bytes: DEFAULT_STARTUP_PRELOAD_MAX_BYTES / 2, + startup_preload_first_pages: false, + startup_preload_first_page_count: 7, + preload_hints_on_open: false, + preload_hint_hot_pages: false, + preload_hint_early_pages: false, + preload_hint_scan_ranges: true, + dedup_get_pages_meta: true, + cache_get_pages_validation: true, + range_reads: true, + batch_chunk_reads: true, + decoded_ltx_cache: true, + vfs_page_cache_mode: SqliteVfsPageCacheMode::Startup, + vfs_page_cache_capacity_pages: DEFAULT_VFS_PAGE_CACHE_CAPACITY_PAGES / 2, + vfs_protected_cache_pages: DEFAULT_VFS_PROTECTED_CACHE_PAGES / 2, + }; + + let config = VfsConfig::from_optimization_flags(flags); + assert_eq!(config.page_cache_mode, SqliteVfsPageCacheMode::Startup); + assert_eq!( + config.cache_capacity_pages, + DEFAULT_VFS_PAGE_CACHE_CAPACITY_PAGES / 2 + ); + assert_eq!( + config.protected_cache_pages, + DEFAULT_VFS_PROTECTED_CACHE_PAGES / 2 + ); + assert_eq!(config.prefetch_depth, 16); + assert!(!config.adaptive_read_ahead); + assert_eq!( + config.startup_preload_max_bytes, + DEFAULT_STARTUP_PRELOAD_MAX_BYTES / 2 + ); + assert!(!config.startup_preload_first_pages); + assert_eq!(config.startup_preload_first_page_count, 7); + assert!(!config.preload_hints_on_open); + assert!(!config.preload_hint_early_pages); + assert_eq!(config.recent_hint_page_budget, 0); + assert!(config.recent_hint_range_budget > 0); +} + +#[derive(Default)] +struct RecordingInitialPagesTransport { + requested_pgnos: SyncMutex>, +} + +#[async_trait] +impl SqliteTransport for RecordingInitialPagesTransport { + async fn get_pages( + &self, + request: protocol::SqliteGetPagesRequest, + ) -> anyhow::Result { + *self.requested_pgnos.lock() = request.pgnos.clone(); + Ok(protocol::SqliteGetPagesResponse::SqliteGetPagesOk( + protocol::SqliteGetPagesOk { + pages: request + .pgnos + .into_iter() + .map(|pgno| protocol::SqliteFetchedPage { + pgno, + bytes: Some(vec![pgno as u8; DEFAULT_PAGE_SIZE]), + }) + .collect(), + }, + )) + } + + async fn commit( + &self, + _request: protocol::SqliteCommitRequest, + ) -> anyhow::Result { + anyhow::bail!("initial-page preload test does not commit") + } +} + +#[test] +fn startup_initial_pages_do_not_require_preload_hints_on_open() { + let runtime = direct_runtime(); + let transport = Arc::new(RecordingInitialPagesTransport::default()); + let config = VfsConfig { + startup_preload_first_pages: true, + startup_preload_first_page_count: 4, + startup_preload_max_bytes: DEFAULT_PAGE_SIZE * 4, + preload_hints_on_open: false, + page_cache_mode: SqliteVfsPageCacheMode::Startup, + ..VfsConfig::default() + }; + + let pages = runtime + .block_on(fetch_initial_pages_for_registration( + transport.clone(), + "startup-preload-actor", + &config, + )) + .expect("initial pages should load"); + + let loaded_pgnos = pages.iter().map(|(pgno, _)| *pgno).collect::>(); + assert_eq!(*transport.requested_pgnos.lock(), vec![1, 2, 3, 4]); + assert_eq!(loaded_pgnos, vec![1, 2, 3, 4]); +} + fn next_test_name(prefix: &str) -> String { let id = TEST_ID.fetch_add(1, Ordering::Relaxed); format!("{prefix}-{id}") @@ -121,7 +238,7 @@ impl DirectEngineHarness { Arc::new(DirectDepotTransport::new(engine)), VfsConfig::default(), unsafe { std::mem::zeroed() }, - None, + Vec::new(), None, ) .expect("vfs context should build") @@ -2302,7 +2419,7 @@ fn concurrent_reader_during_commit_atomic_observes_consistent_snapshot() { transport, VfsConfig::default(), unsafe { std::mem::zeroed() }, - None, + Vec::new(), None, ) .expect("vfs context should build"); @@ -3233,7 +3350,7 @@ fn resolve_pages_surfaces_read_path_error_response() { transport, VfsConfig::default(), unsafe { std::mem::zeroed() }, - None, + Vec::new(), None, ) .expect("vfs context should build"); diff --git a/examples/kitchen-sink/scripts/sqlite-realworld-bench.ts b/examples/kitchen-sink/scripts/sqlite-realworld-bench.ts index 02ea4bb491..5fe05832c0 100644 --- a/examples/kitchen-sink/scripts/sqlite-realworld-bench.ts +++ b/examples/kitchen-sink/scripts/sqlite-realworld-bench.ts @@ -25,8 +25,6 @@ const DEFAULT_STARTUP_PRELOAD_MAX_BYTES = 1024 * 1024; const DEFAULT_STARTUP_PRELOAD_FIRST_PAGE_COUNT = 1; const DEFAULT_VFS_PAGE_CACHE_CAPACITY_PAGES = 50_000; const DEFAULT_VFS_PROTECTED_CACHE_PAGES = 512; -const DEFAULT_READ_POOL_MAX_READERS = 4; -const DEFAULT_READ_POOL_IDLE_TTL_MS = 60_000; const DEFAULT_BENCH_VFS_ROUND_TRIP_LATENCY_MS = 10; const BENCH_VFS_ROUND_TRIP_LATENCY_MS_ENV = "RIVETKIT_SQLITE_BENCH_VFS_ROUND_TRIP_LATENCY_MS"; @@ -47,19 +45,12 @@ const SQLITE_OPT_BOOLEAN_ENVS = [ "RIVETKIT_SQLITE_OPT_PRELOAD_HINT_HOT_PAGES", "RIVETKIT_SQLITE_OPT_PRELOAD_HINT_EARLY_PAGES", "RIVETKIT_SQLITE_OPT_PRELOAD_HINT_SCAN_RANGES", - "RIVETKIT_SQLITE_OPT_CACHE_GET_PAGES_VALIDATION", - "RIVETKIT_SQLITE_OPT_RANGE_READS", - "RIVETKIT_SQLITE_OPT_BATCH_CHUNK_READS", - "RIVETKIT_SQLITE_OPT_DECODED_LTX_CACHE", - "RIVETKIT_SQLITE_OPT_READ_POOL_ENABLED", ] as const; const SQLITE_OPT_NUMERIC_ENVS = [ "RIVETKIT_SQLITE_OPT_STARTUP_PRELOAD_MAX_BYTES", "RIVETKIT_SQLITE_OPT_STARTUP_PRELOAD_FIRST_PAGE_COUNT", "RIVETKIT_SQLITE_OPT_VFS_PAGE_CACHE_CAPACITY_PAGES", "RIVETKIT_SQLITE_OPT_VFS_PROTECTED_CACHE_PAGES", - "RIVETKIT_SQLITE_OPT_READ_POOL_MAX_READERS", - "RIVETKIT_SQLITE_OPT_READ_POOL_IDLE_TTL_MS", ] as const; const WORKLOADS = [ @@ -187,9 +178,6 @@ interface MatrixScenarioReport { fetchedPages: number; cacheHits: number; cacheMisses: number; - routedReads: number; - writeFallbacks: number; - modeTransitions: number; }>; } @@ -204,7 +192,6 @@ interface BenchmarkResult { setup: SetupResult | null; main: MainResult; vfsMetrics: VfsMetricSnapshot; - readPoolMetrics: ReadPoolMetricSnapshot; } interface VfsMetricSnapshot { @@ -221,23 +208,6 @@ interface VfsMetricSnapshot { getPagesDurationSecondsCount: number; } -interface ReadPoolMetricSnapshot { - activeReaders: number; - idleReaders: number; - readWaitDurationSecondsSum: number; - readWaitDurationSecondsCount: number; - writeWaitDurationSecondsSum: number; - writeWaitDurationSecondsCount: number; - routedReadQueriesTotal: number; - writeFallbackQueriesTotal: number; - manualTransactionDurationSecondsSum: number; - manualTransactionDurationSecondsCount: number; - readerOpensTotal: number; - readerClosesTotal: number; - rejectedReaderMutationsTotal: number; - modeTransitionsTotal: number; -} - const WORKLOAD_SPECS: WorkloadSpec[] = [ { // Included to keep tiny actor databases honest while we optimize larger datasets. @@ -320,16 +290,14 @@ const WORKLOAD_SPECS: WorkloadSpec[] = [ description: "Selective tenant/time-range aggregate over events joined to orders.", }, { - // Included to measure future read-mode parallelism where several read-only SQLite connections overlap VFS misses. - // Today this captures the serialized baseline; after the connection manager lands, independent aggregate reads should overlap. + // Included to measure concurrent read-only aggregate pressure over the same VFS and transport. name: "parallel-read-aggregates", category: "read", sizeClass: "large", description: "Concurrent read-only aggregates over one actor-local SQLite database.", }, { - // Included to measure the read-mode to write-mode transition. - // Future write mode must wait for active readers, close them, run exactly one writable connection, then allow fresh readers. + // Included to measure read pressure queued alongside a write update. name: "parallel-read-write-transition", category: "write", sizeClass: "medium", @@ -417,7 +385,6 @@ const WORKLOAD_SPECS: WorkloadSpec[] = [ }, { // Included to model tool-like fan-out where an agent asks for recent rows, indexed rows, and aggregates concurrently. - // Parallel read pool routing should make these independent read-only statements overlap once TS serialization is gone. name: "chat-tool-read-fanout", category: "read", sizeClass: "large", @@ -525,11 +492,6 @@ function defaultSqliteOptimizationEnv(): Record { RIVETKIT_SQLITE_OPT_PRELOAD_HINT_HOT_PAGES: "true", RIVETKIT_SQLITE_OPT_PRELOAD_HINT_EARLY_PAGES: "true", RIVETKIT_SQLITE_OPT_PRELOAD_HINT_SCAN_RANGES: "true", - RIVETKIT_SQLITE_OPT_CACHE_GET_PAGES_VALIDATION: "true", - RIVETKIT_SQLITE_OPT_RANGE_READS: "true", - RIVETKIT_SQLITE_OPT_BATCH_CHUNK_READS: "true", - RIVETKIT_SQLITE_OPT_DECODED_LTX_CACHE: "true", - RIVETKIT_SQLITE_OPT_READ_POOL_ENABLED: "true", RIVETKIT_SQLITE_OPT_STARTUP_PRELOAD_MAX_BYTES: DEFAULT_STARTUP_PRELOAD_MAX_BYTES.toString(), RIVETKIT_SQLITE_OPT_STARTUP_PRELOAD_FIRST_PAGE_COUNT: @@ -538,10 +500,6 @@ function defaultSqliteOptimizationEnv(): Record { DEFAULT_VFS_PAGE_CACHE_CAPACITY_PAGES.toString(), RIVETKIT_SQLITE_OPT_VFS_PROTECTED_CACHE_PAGES: DEFAULT_VFS_PROTECTED_CACHE_PAGES.toString(), - RIVETKIT_SQLITE_OPT_READ_POOL_MAX_READERS: - DEFAULT_READ_POOL_MAX_READERS.toString(), - RIVETKIT_SQLITE_OPT_READ_POOL_IDLE_TTL_MS: - DEFAULT_READ_POOL_IDLE_TTL_MS.toString(), }; } @@ -588,33 +546,14 @@ const SQLITE_OPTIMIZATION_MATRIX_SCENARIOS: MatrixScenario[] = [ }, includeInImpact: true, }, - { - id: "transport-batching-only", - label: "Transport batching only", - description: - "Range reads, chunk batching, and decoded LTX cache enabled with VFS cache, preload, read-ahead, and read pool disabled.", - env: scenarioEnv({ - ...preloadDisabledEnv(), - RIVETKIT_SQLITE_OPT_READ_AHEAD_MODE: "off", - RIVETKIT_SQLITE_OPT_VFS_PAGE_CACHE_MODE: "off", - RIVETKIT_SQLITE_OPT_VFS_PAGE_CACHE_CAPACITY_PAGES: "0", - RIVETKIT_SQLITE_OPT_VFS_PROTECTED_CACHE_PAGES: "0", - RIVETKIT_SQLITE_OPT_READ_POOL_ENABLED: "false", - }), - includeInImpact: true, - }, { id: "vfs-cache-only", label: "VFS cache only", description: - "VFS page cache enabled without read-ahead, preload hints, range reads, storage decode cache, or read pool.", + "VFS page cache enabled without read-ahead or preload hints.", env: scenarioEnv({ ...preloadDisabledEnv(), RIVETKIT_SQLITE_OPT_READ_AHEAD_MODE: "off", - RIVETKIT_SQLITE_OPT_RANGE_READS: "false", - RIVETKIT_SQLITE_OPT_BATCH_CHUNK_READS: "false", - RIVETKIT_SQLITE_OPT_DECODED_LTX_CACHE: "false", - RIVETKIT_SQLITE_OPT_READ_POOL_ENABLED: "false", }), includeInImpact: true, }, @@ -622,13 +561,12 @@ const SQLITE_OPTIMIZATION_MATRIX_SCENARIOS: MatrixScenario[] = [ id: "read-ahead-no-cache", label: "Read-ahead without VFS cache", description: - "Adaptive read-ahead and range reads enabled while prefetched pages are not retained in the VFS cache.", + "Adaptive read-ahead enabled while prefetched pages are not retained in the VFS cache.", env: scenarioEnv({ ...preloadDisabledEnv(), RIVETKIT_SQLITE_OPT_VFS_PAGE_CACHE_MODE: "off", RIVETKIT_SQLITE_OPT_VFS_PAGE_CACHE_CAPACITY_PAGES: "0", RIVETKIT_SQLITE_OPT_VFS_PROTECTED_CACHE_PAGES: "0", - RIVETKIT_SQLITE_OPT_READ_POOL_ENABLED: "false", }), includeInImpact: true, }, @@ -697,37 +635,6 @@ const SQLITE_OPTIMIZATION_MATRIX_SCENARIOS: MatrixScenario[] = [ env: scenarioEnv(preloadDisabledEnv()), includeInImpact: true, }, - { - id: "no-range-reads", - label: "Default minus range reads", - description: "Current defaults with contiguous range page reads disabled.", - env: scenarioEnv({ - RIVETKIT_SQLITE_OPT_RANGE_READS: "false", - }), - includeInImpact: true, - }, - { - id: "no-storage-read-cache", - label: "Default minus storage read cache", - description: - "Current defaults with chunk batching and decoded LTX cache disabled.", - env: scenarioEnv({ - RIVETKIT_SQLITE_OPT_BATCH_CHUNK_READS: "false", - RIVETKIT_SQLITE_OPT_DECODED_LTX_CACHE: "false", - }), - includeInImpact: true, - }, - { - id: "no-read-pool", - label: "Default minus read pool", - description: "Current defaults with the SQLite read connection pool disabled.", - env: scenarioEnv({ - RIVETKIT_SQLITE_OPT_READ_POOL_ENABLED: "false", - RIVETKIT_SQLITE_OPT_READ_POOL_MAX_READERS: "0", - RIVETKIT_SQLITE_OPT_READ_POOL_IDLE_TTL_MS: "0", - }), - includeInImpact: true, - }, ]; function usage(exitCode = 1): never { @@ -1293,55 +1200,6 @@ function scrapeVfsMetrics(text: string): VfsMetricSnapshot { }; } -function scrapeReadPoolMetrics(text: string): ReadPoolMetricSnapshot { - return { - activeReaders: metricValue(text, "sqlite_read_pool_active_readers"), - idleReaders: metricValue(text, "sqlite_read_pool_idle_readers"), - readWaitDurationSecondsSum: metricValue( - text, - "sqlite_read_pool_read_wait_duration_seconds_sum", - ), - readWaitDurationSecondsCount: metricValue( - text, - "sqlite_read_pool_read_wait_duration_seconds_count", - ), - writeWaitDurationSecondsSum: metricValue( - text, - "sqlite_read_pool_write_wait_duration_seconds_sum", - ), - writeWaitDurationSecondsCount: metricValue( - text, - "sqlite_read_pool_write_wait_duration_seconds_count", - ), - routedReadQueriesTotal: metricValue( - text, - "sqlite_read_pool_routed_read_queries_total", - ), - writeFallbackQueriesTotal: metricValue( - text, - "sqlite_read_pool_write_fallback_queries_total", - ), - manualTransactionDurationSecondsSum: metricValue( - text, - "sqlite_read_pool_manual_transaction_duration_seconds_sum", - ), - manualTransactionDurationSecondsCount: metricValue( - text, - "sqlite_read_pool_manual_transaction_duration_seconds_count", - ), - readerOpensTotal: metricValue(text, "sqlite_read_pool_reader_opens_total"), - readerClosesTotal: metricValue(text, "sqlite_read_pool_reader_closes_total"), - rejectedReaderMutationsTotal: metricValue( - text, - "sqlite_read_pool_rejected_reader_mutations_total", - ), - modeTransitionsTotal: metricValue( - text, - "sqlite_read_pool_mode_transitions_total", - ), - }; -} - function diffMetrics(after: T, before: T): T { return Object.fromEntries( Object.keys(after).map((key) => [ @@ -1367,25 +1225,6 @@ function emptyVfsMetrics(): VfsMetricSnapshot { }; } -function emptyReadPoolMetrics(): ReadPoolMetricSnapshot { - return { - activeReaders: 0, - idleReaders: 0, - readWaitDurationSecondsSum: 0, - readWaitDurationSecondsCount: 0, - writeWaitDurationSecondsSum: 0, - writeWaitDurationSecondsCount: 0, - routedReadQueriesTotal: 0, - writeFallbackQueriesTotal: 0, - manualTransactionDurationSecondsSum: 0, - manualTransactionDurationSecondsCount: 0, - readerOpensTotal: 0, - readerClosesTotal: 0, - rejectedReaderMutationsTotal: 0, - modeTransitionsTotal: 0, - }; -} - function writeResults(outputDir: string, document: unknown): void { mkdirSync(outputDir, { recursive: true }); writeFileSync( @@ -1400,8 +1239,8 @@ function writeSummary(outputDir: string, results: BenchmarkResult[]): void { "", "Server SQLite time only. Setup time, sleep delay, wake/cold-start time, and client RTT are not included.", "", - "| workload | category | size | server_ms | routed_reads | write_fallbacks | mode_transitions | reader_opens | reader_closes | get_pages | fetched_pages | cache_hits | cache_misses | rows/ops | pages |", - "| --- | --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: |", + "| workload | category | size | server_ms | get_pages | fetched_pages | cache_hits | cache_misses | rows/ops | pages |", + "| --- | --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: |", ]; for (const result of results) { const rowsOrOps = @@ -1411,7 +1250,7 @@ function writeSummary(outputDir: string, results: BenchmarkResult[]): void { ? result.main.ops : ""; lines.push( - `| ${result.workload} | ${result.category} | ${fmtBytes(result.targetBytes)} | ${result.main.ms.toFixed(1)} | ${result.readPoolMetrics.routedReadQueriesTotal} | ${result.readPoolMetrics.writeFallbackQueriesTotal} | ${result.readPoolMetrics.modeTransitionsTotal} | ${result.readPoolMetrics.readerOpensTotal} | ${result.readPoolMetrics.readerClosesTotal} | ${result.vfsMetrics.getPagesTotal} | ${result.vfsMetrics.pagesFetchedTotal} | ${result.vfsMetrics.resolvePagesCacheHitsTotal} | ${result.vfsMetrics.resolvePagesCacheMissesTotal} | ${rowsOrOps} | ${result.main.pageCount} |`, + `| ${result.workload} | ${result.category} | ${fmtBytes(result.targetBytes)} | ${result.main.ms.toFixed(1)} | ${result.vfsMetrics.getPagesTotal} | ${result.vfsMetrics.pagesFetchedTotal} | ${result.vfsMetrics.resolvePagesCacheHitsTotal} | ${result.vfsMetrics.resolvePagesCacheMissesTotal} | ${rowsOrOps} | ${result.main.pageCount} |`, ); } writeFileSync(join(outputDir, "summary.md"), `${lines.join("\n")}\n`); @@ -1517,9 +1356,6 @@ function readScenarioDocument(args: Args, scenario: MatrixScenario): MatrixScena fetchedPages: result.vfsMetrics.pagesFetchedTotal, cacheHits: result.vfsMetrics.resolvePagesCacheHitsTotal, cacheMisses: result.vfsMetrics.resolvePagesCacheMissesTotal, - routedReads: result.readPoolMetrics.routedReadQueriesTotal, - writeFallbacks: result.readPoolMetrics.writeFallbackQueriesTotal, - modeTransitions: result.readPoolMetrics.modeTransitionsTotal, })), }; } @@ -1551,8 +1387,8 @@ function writeMatrixSummary(outputDir: string, scenarios: MatrixScenarioReport[] "", "Each scenario runs in a fresh process so process-wide SQLite optimization flags are read once per configuration.", "", - "| scenario | workload | server_ms | delta_vs_defaults | get_pages | fetched_pages | cache_hits | cache_misses | routed_reads | write_fallbacks |", - "| --- | --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: |", + "| scenario | workload | server_ms | delta_vs_defaults | get_pages | fetched_pages | cache_hits | cache_misses |", + "| --- | --- | ---: | ---: | ---: | ---: | ---: | ---: |", ]; for (const scenario of scenarios) { @@ -1564,7 +1400,7 @@ function writeMatrixSummary(outputDir: string, scenarios: MatrixScenarioReport[] ? `${(((result.serverMs - base.serverMs) / base.serverMs) * 100).toFixed(1)}%` : ""; lines.push( - `| ${scenario.id} | ${result.workload} | ${result.serverMs.toFixed(1)} | ${delta} | ${result.getPages} | ${result.fetchedPages} | ${result.cacheHits} | ${result.cacheMisses} | ${result.routedReads} | ${result.writeFallbacks} |`, + `| ${scenario.id} | ${result.workload} | ${result.serverMs.toFixed(1)} | ${delta} | ${result.getPages} | ${result.fetchedPages} | ${result.cacheHits} | ${result.cacheMisses} |`, ); } } @@ -1798,12 +1634,8 @@ async function main(): Promise { scrapeVfsMetrics(afterMainMetricsText), emptyVfsMetrics(), ); - const readPoolMetrics = diffMetrics( - scrapeReadPoolMetrics(afterMainMetricsText), - emptyReadPoolMetrics(), - ); console.log( - ` server=${fmtMs(mainResult.ms)} pages=${mainResult.pageCount} routed_reads=${readPoolMetrics.routedReadQueriesTotal} write_fallbacks=${readPoolMetrics.writeFallbackQueriesTotal} mode_transitions=${readPoolMetrics.modeTransitionsTotal} get_pages=${vfsMetrics.getPagesTotal} fetched_pages=${vfsMetrics.pagesFetchedTotal}`, + ` server=${fmtMs(mainResult.ms)} pages=${mainResult.pageCount} get_pages=${vfsMetrics.getPagesTotal} fetched_pages=${vfsMetrics.pagesFetchedTotal}`, ); results.push({ @@ -1817,7 +1649,6 @@ async function main(): Promise { setup, main: mainResult, vfsMetrics, - readPoolMetrics, }); writeResults(outputDir, resultDocument); writeSummary(outputDir, results); @@ -1830,7 +1661,7 @@ async function main(): Promise { console.log("\nResults"); for (const result of results) { console.log( - ` ${result.workload}: server=${fmtMs(result.main.ms)} size=${fmtBytes(result.targetBytes)} routed_reads=${result.readPoolMetrics.routedReadQueriesTotal} write_fallbacks=${result.readPoolMetrics.writeFallbackQueriesTotal} mode_transitions=${result.readPoolMetrics.modeTransitionsTotal} get_pages=${result.vfsMetrics.getPagesTotal} fetched_pages=${result.vfsMetrics.pagesFetchedTotal}`, + ` ${result.workload}: server=${fmtMs(result.main.ms)} size=${fmtBytes(result.targetBytes)} get_pages=${result.vfsMetrics.getPagesTotal} fetched_pages=${result.vfsMetrics.pagesFetchedTotal}`, ); } console.log(`\nwrote ${join(outputDir, "results.json")}`); diff --git a/examples/kitchen-sink/tests/sqlite-realworld-bench.test.ts b/examples/kitchen-sink/tests/sqlite-realworld-bench.test.ts index c85505345f..0424d3e921 100644 --- a/examples/kitchen-sink/tests/sqlite-realworld-bench.test.ts +++ b/examples/kitchen-sink/tests/sqlite-realworld-bench.test.ts @@ -42,16 +42,9 @@ test("SQLite real-world benchmark includes read-mode/write-mode scenarios", () = ); assert.match(actor, /Promise\.all\(\[/); assert.match(actor, /UPDATE rw_orders SET total_cents = total_cents \+ 1/); - for (const metric of [ - "sqlite_read_pool_routed_read_queries_total", - "sqlite_read_pool_write_fallback_queries_total", - "sqlite_read_pool_mode_transitions_total", - ]) { - assert.match(runner, new RegExp(metric)); - } assert.match( runner, - /\| workload \| category \| size \| server_ms \| routed_reads \| write_fallbacks \| mode_transitions \|/, + /\| workload \| category \| size \| server_ms \| get_pages \| fetched_pages \|/, ); }); @@ -66,16 +59,12 @@ test("SQLite real-world benchmark defines an optimization impact matrix", () => for (const scenario of [ "defaults", "all-off", - "transport-batching-only", "vfs-cache-only", "read-ahead-no-cache", "cache-read-ahead-no-preload", "no-read-ahead", "no-vfs-cache", "no-preload", - "no-range-reads", - "no-storage-read-cache", - "no-read-pool", ]) { assert.match(runner, new RegExp(`id: "${scenario}"`)); }