diff --git a/engine/packages/depot-client/src/database.rs b/engine/packages/depot-client/src/database.rs index 3ca29d2d5e..4f79ef3c04 100644 --- a/engine/packages/depot-client/src/database.rs +++ b/engine/packages/depot-client/src/database.rs @@ -13,7 +13,8 @@ use crate::{ }, vfs::{ NativeVfsHandle, SqliteVfs, SqliteVfsMetrics, VfsConfig, VfsPreloadHintSnapshot, - configure_connection_for_database, verify_batch_atomic_writes, + configure_connection_for_database, fetch_initial_main_page_from_envoy, + verify_batch_atomic_writes, }, }; @@ -42,12 +43,17 @@ pub async fn open_database_from_envoy( metrics: Option>, ) -> Result { let vfs_name = vfs_name_for_actor_database(&actor_id, generation); + let initial_main_page = fetch_initial_main_page_from_envoy(&handle, &actor_id) + .await + .map_err(|e| anyhow!("failed to fetch sqlite initial main page: {e}"))?; + let mut vfs_config = VfsConfig::default(); + vfs_config.initial_main_page = initial_main_page; let vfs = Arc::new(SqliteVfs::register( &vfs_name, handle, actor_id.clone(), rt_handle, - VfsConfig::default(), + vfs_config, metrics.clone(), ) .map_err(|e| anyhow!("failed to register sqlite VFS: {e}"))?); @@ -162,6 +168,10 @@ impl NativeDatabaseHandle { self.vfs.take_last_error() } + pub fn sqlite_vfs_metrics(&self) -> crate::vfs::SqliteVfsMetricsSnapshot { + self.vfs.sqlite_vfs_metrics() + } + pub fn snapshot_preload_hints(&self) -> VfsPreloadHintSnapshot { self.vfs.snapshot_preload_hints() } diff --git a/engine/packages/depot-client/src/optimization_flags.rs b/engine/packages/depot-client/src/optimization_flags.rs index 1f1b7050ca..2211600b8a 100644 --- a/engine/packages/depot-client/src/optimization_flags.rs +++ b/engine/packages/depot-client/src/optimization_flags.rs @@ -31,7 +31,7 @@ pub const DEFAULT_STARTUP_PRELOAD_MAX_BYTES: usize = 1024 * 1024; pub const MAX_STARTUP_PRELOAD_MAX_BYTES: usize = 8 * 1024 * 1024; pub const DEFAULT_STARTUP_PRELOAD_FIRST_PAGE_COUNT: u32 = 1; pub const MAX_STARTUP_PRELOAD_FIRST_PAGE_COUNT: u32 = 256; -pub const DEFAULT_VFS_PAGE_CACHE_CAPACITY_PAGES: u64 = 50_000; +pub const DEFAULT_VFS_PAGE_CACHE_CAPACITY_PAGES: u64 = 4_096; pub const MAX_VFS_PAGE_CACHE_CAPACITY_PAGES: u64 = 500_000; pub const DEFAULT_VFS_PROTECTED_CACHE_PAGES: usize = 512; pub const MAX_VFS_PROTECTED_CACHE_PAGES: usize = 8_192; diff --git a/engine/packages/depot-client/src/vfs.rs b/engine/packages/depot-client/src/vfs.rs index e407a42912..2076b5b3c6 100644 --- a/engine/packages/depot-client/src/vfs.rs +++ b/engine/packages/depot-client/src/vfs.rs @@ -18,9 +18,10 @@ use rivet_envoy_client::handle::EnvoyHandle; use rivet_envoy_protocol as protocol; use tokio::runtime::Handle; -use crate::optimization_flags::{SqliteOptimizationFlags, sqlite_optimization_flags}; +use crate::optimization_flags::{ + SqliteOptimizationFlags, SqliteVfsPageCacheMode, sqlite_optimization_flags, +}; -const DEFAULT_CACHE_CAPACITY_PAGES: u64 = 50_000; const DEFAULT_PREFETCH_DEPTH: usize = 64; const LEGACY_PREFETCH_DEPTH: usize = 16; const DEFAULT_MAX_PREFETCH_BYTES: usize = 256 * 1024; @@ -230,7 +231,9 @@ fn sqlite_now_ms() -> Result { #[derive(Debug, Clone)] pub struct VfsConfig { + pub page_cache_mode: SqliteVfsPageCacheMode, pub cache_capacity_pages: u64, + pub initial_main_page: Option>, pub prefetch_depth: usize, pub adaptive_prefetch_depth: usize, pub max_prefetch_bytes: usize, @@ -254,35 +257,50 @@ impl Default for VfsConfig { impl VfsConfig { pub fn from_optimization_flags(flags: SqliteOptimizationFlags) -> Self { Self { - cache_capacity_pages: DEFAULT_CACHE_CAPACITY_PAGES, + page_cache_mode: flags.vfs_page_cache_mode, + cache_capacity_pages: flags.vfs_page_cache_capacity_pages, + initial_main_page: None, prefetch_depth: if flags.read_ahead { DEFAULT_PREFETCH_DEPTH } else { LEGACY_PREFETCH_DEPTH }, - adaptive_prefetch_depth: DEFAULT_ADAPTIVE_PREFETCH_DEPTH, - max_prefetch_bytes: DEFAULT_MAX_PREFETCH_BYTES, - adaptive_max_prefetch_bytes: DEFAULT_ADAPTIVE_MAX_PREFETCH_BYTES, - max_pages_per_stage: DEFAULT_MAX_PAGES_PER_STAGE, - recent_hint_page_budget: if flags.recent_page_hints { - DEFAULT_RECENT_HINT_PAGE_BUDGET - } else { - 0 - }, - recent_hint_range_budget: if flags.recent_page_hints { - DEFAULT_RECENT_HINT_RANGE_BUDGET - } else { - 0 - }, - cache_hit_predictor_training: flags.cache_hit_predictor_training, - recent_page_hints: flags.recent_page_hints, - adaptive_read_ahead: flags.adaptive_read_ahead, - #[cfg(test)] - assert_batch_atomic: true, - } + adaptive_prefetch_depth: DEFAULT_ADAPTIVE_PREFETCH_DEPTH, + max_prefetch_bytes: DEFAULT_MAX_PREFETCH_BYTES, + adaptive_max_prefetch_bytes: DEFAULT_ADAPTIVE_MAX_PREFETCH_BYTES, + max_pages_per_stage: DEFAULT_MAX_PAGES_PER_STAGE, + recent_hint_page_budget: if flags.recent_page_hints { + DEFAULT_RECENT_HINT_PAGE_BUDGET + } else { + 0 + }, + recent_hint_range_budget: if flags.recent_page_hints { + DEFAULT_RECENT_HINT_RANGE_BUDGET + } else { + 0 + }, + cache_hit_predictor_training: flags.cache_hit_predictor_training, + recent_page_hints: flags.recent_page_hints, + adaptive_read_ahead: flags.adaptive_read_ahead, + #[cfg(test)] + assert_batch_atomic: true, } } + fn caches_target_pages(&self) -> bool { + self.page_cache_mode.caches_target_pages() && self.cache_capacity_pages > 0 + } + + fn caches_prefetched_pages(&self) -> bool { + self.page_cache_mode.caches_prefetched_pages() && self.cache_capacity_pages > 0 + } + + #[cfg(test)] + fn caches_startup_preloaded_pages(&self) -> bool { + self.page_cache_mode.caches_startup_preloaded_pages() && self.cache_capacity_pages > 0 + } +} + #[derive(Debug, Clone, PartialEq, Eq)] pub struct VfsPreloadHintRange { pub start_pgno: u32, @@ -329,6 +347,11 @@ pub struct SqliteVfsMetricsSnapshot { pub state_update_ns: u64, pub total_ns: u64, pub commit_count: u64, + pub page_cache_entries: u64, + pub page_cache_weighted_size: u64, + pub page_cache_capacity_pages: u64, + pub write_buffer_dirty_pages: u64, + pub db_size_pages: u64, } pub trait SqliteVfsMetrics: Send + Sync { @@ -861,7 +884,7 @@ fn push_coalesced_range(ranges: &mut VecDeque, range: VfsPr impl VfsState { fn new(config: &VfsConfig) -> Self { let page_cache = Cache::builder() - .max_capacity(config.cache_capacity_pages) + .max_capacity(config.cache_capacity_pages.max(1)) .build(); page_cache.insert(1, empty_db_page()); @@ -889,6 +912,22 @@ impl VfsState { } self.page_cache.insert(1, page); } + + fn evict_pages_after_eof(&mut self) { + let db_size_pages = self.db_size_pages; + let stale_pgnos = self + .page_cache + .iter() + .filter_map(|entry| { + let pgno = *entry.0; + (pgno > db_size_pages).then_some(pgno) + }) + .collect::>(); + for pgno in stale_pgnos { + self.page_cache.invalidate(&pgno); + } + self.page_cache.run_pending_tasks(); + } } impl VfsContext { @@ -901,10 +940,15 @@ impl VfsContext { metrics: Option>, ) -> std::result::Result { let mut state = VfsState::new(&config); + if let Some(page) = config.initial_main_page.clone() { + state.seed_main_page(page); + } #[cfg(test)] if let SqliteTransportInner::Direct(storage) = &*transport.inner { if storage.is_strict_mode() { - if let Some(page) = fetch_initial_main_page(&transport, &runtime, &actor_id)? { + if config.initial_main_page.is_none() + && let Some(page) = fetch_initial_main_page(&transport, &runtime, &actor_id)? + { state.seed_main_page(page); } } else { @@ -913,15 +957,13 @@ impl VfsContext { state.db_size_pages = snapshot.db_size_pages; state.page_cache.invalidate_all(); for (pgno, bytes) in snapshot.pages { - state.page_cache.insert(pgno, bytes); + if pgno == 1 || config.caches_startup_preloaded_pages() { + state.page_cache.insert(pgno, bytes); + } } } } } - #[cfg(not(test))] - if let Some(page) = fetch_initial_main_page(&transport, &runtime, &actor_id)? { - state.seed_main_page(page); - } Ok(Self { actor_id, @@ -997,6 +1039,7 @@ impl VfsContext { } fn sqlite_vfs_metrics(&self) -> SqliteVfsMetricsSnapshot { + let state = self.state.read(); SqliteVfsMetricsSnapshot { request_build_ns: self.commit_request_build_ns.load(Ordering::Relaxed), serialize_ns: self.commit_serialize_ns.load(Ordering::Relaxed), @@ -1004,6 +1047,11 @@ impl VfsContext { state_update_ns: self.commit_state_update_ns.load(Ordering::Relaxed), total_ns: self.commit_duration_ns_total.load(Ordering::Relaxed), commit_count: self.commit_total.load(Ordering::Relaxed), + page_cache_entries: state.page_cache.entry_count(), + page_cache_weighted_size: state.page_cache.weighted_size(), + page_cache_capacity_pages: self.config.cache_capacity_pages, + write_buffer_dirty_pages: state.write_buffer.dirty.len() as u64, + db_size_pages: state.db_size_pages as u64, } } @@ -1248,10 +1296,18 @@ impl VfsContext { match response { protocol::SqliteGetPagesResponse::SqliteGetPagesOk(ok) => { + let target_missing = missing.iter().copied().collect::>(); let page_cache = { self.state.read().page_cache.clone() }; for fetched in ok.pages { if let Some(bytes) = &fetched.bytes { - page_cache.insert(fetched.pgno, bytes.clone()); + let should_cache = if target_missing.contains(&fetched.pgno) { + self.config.caches_target_pages() + } else { + self.config.caches_prefetched_pages() + }; + if should_cache { + page_cache.insert(fetched.pgno, bytes.clone()); + } } resolved.insert(fetched.pgno, fetched.bytes); } @@ -1346,10 +1402,15 @@ impl VfsContext { let mut state = self.state.write(); state.db_size_pages = request.new_db_size_pages; for dirty_page in &request.dirty_pages { - state - .page_cache - .insert(dirty_page.pgno, dirty_page.bytes.clone()); + if self.config.caches_target_pages() { + state + .page_cache + .insert(dirty_page.pgno, dirty_page.bytes.clone()); + } else { + state.page_cache.invalidate(&dirty_page.pgno); + } } + state.evict_pages_after_eof(); state.write_buffer.dirty.clear(); let state_update_ns = state_update_start.elapsed().as_nanos() as u64; self.add_commit_phase_metrics( @@ -1445,10 +1506,15 @@ impl VfsContext { let mut state = self.state.write(); state.db_size_pages = request.new_db_size_pages; for dirty_page in &request.dirty_pages { - state - .page_cache - .insert(dirty_page.pgno, dirty_page.bytes.clone()); + if self.config.caches_target_pages() { + state + .page_cache + .insert(dirty_page.pgno, dirty_page.bytes.clone()); + } else { + state.page_cache.invalidate(&dirty_page.pgno); + } } + state.evict_pages_after_eof(); state.write_buffer.dirty.clear(); state.write_buffer.in_atomic_write = false; let state_update_ns = state_update_start.elapsed().as_nanos() as u64; @@ -1471,6 +1537,7 @@ impl VfsContext { .dirty .retain(|pgno, _| *pgno <= truncated_pages); state.page_cache.invalidate_all(); + state.page_cache.run_pending_tasks(); } } @@ -1544,6 +1611,7 @@ fn mark_dead_from_fence_commit_error(ctx: &VfsContext, err: &CommitBufferError) } } +#[cfg(test)] fn fetch_initial_main_page( transport: &SqliteTransport, runtime: &Handle, @@ -1556,6 +1624,28 @@ fn fetch_initial_main_page( expected_head_txid: None, })); + initial_main_page_from_response(actor_id, response) +} + +pub async fn fetch_initial_main_page_from_envoy( + handle: &EnvoyHandle, + actor_id: &str, +) -> std::result::Result>, String> { + let response = handle + .sqlite_get_pages(protocol::SqliteGetPagesRequest { + actor_id: actor_id.to_string(), + pgnos: vec![1], + expected_generation: None, + expected_head_txid: None, + }) + .await; + initial_main_page_from_response(actor_id, response) +} + +fn initial_main_page_from_response( + actor_id: &str, + response: anyhow::Result, +) -> std::result::Result>, String> { match response { Ok(protocol::SqliteGetPagesResponse::SqliteGetPagesOk(ok)) => Ok(ok .pages @@ -2490,6 +2580,10 @@ impl SqliteVfs { self.ctx.snapshot_preload_hints() } + pub(crate) fn sqlite_vfs_metrics(&self) -> SqliteVfsMetricsSnapshot { + self.ctx.sqlite_vfs_metrics() + } + fn register_with_transport( name: &str, transport: SqliteTransport, diff --git a/engine/packages/depot-client/tests/inline/vfs.rs b/engine/packages/depot-client/tests/inline/vfs.rs index 8b9a5d7b4b..e679122dc8 100644 --- a/engine/packages/depot-client/tests/inline/vfs.rs +++ b/engine/packages/depot-client/tests/inline/vfs.rs @@ -252,6 +252,61 @@ }); } + #[test] + fn vfs_config_honors_page_cache_optimization_flags() { + let disabled = VfsConfig::from_optimization_flags(SqliteOptimizationFlags { + vfs_page_cache_mode: SqliteVfsPageCacheMode::Off, + vfs_page_cache_capacity_pages: 0, + ..SqliteOptimizationFlags::default() + }); + + assert_eq!(disabled.page_cache_mode, SqliteVfsPageCacheMode::Off); + assert_eq!(disabled.cache_capacity_pages, 0); + assert!(!disabled.caches_target_pages()); + assert!(!disabled.caches_prefetched_pages()); + assert!(!disabled.caches_startup_preloaded_pages()); + + let prefetch = VfsConfig::from_optimization_flags(SqliteOptimizationFlags { + vfs_page_cache_mode: SqliteVfsPageCacheMode::Prefetch, + vfs_page_cache_capacity_pages: 123, + ..SqliteOptimizationFlags::default() + }); + + assert_eq!(prefetch.cache_capacity_pages, 123); + assert!(prefetch.caches_target_pages()); + assert!(prefetch.caches_prefetched_pages()); + assert!(!prefetch.caches_startup_preloaded_pages()); + } + + #[test] + fn direct_engine_opens_empty_database_with_page_cache_disabled() { + let runtime = direct_runtime(); + let harness = DirectEngineHarness::new(); + let mut config = VfsConfig::default(); + config.page_cache_mode = SqliteVfsPageCacheMode::Off; + config.cache_capacity_pages = 0; + + let engine = runtime.block_on(harness.open_engine()); + let db = harness.open_db_on_engine(&runtime, engine, &harness.actor_id, config); + + sqlite_exec( + db.as_ptr(), + "CREATE TABLE items (id INTEGER PRIMARY KEY, value TEXT NOT NULL);", + ) + .expect("create table should succeed with page cache disabled"); + sqlite_step_statement( + db.as_ptr(), + "INSERT INTO items (id, value) VALUES (1, 'alpha');", + ) + .expect("insert should succeed with page cache disabled"); + assert_eq!( + sqlite_query_text(db.as_ptr(), "SELECT value FROM items WHERE id = 1;") + .expect("select should succeed with page cache disabled"), + "alpha" + ); + assert_eq!(db.sqlite_vfs_metrics().page_cache_capacity_pages, 0); + } + #[test] fn direct_engine_supports_create_insert_select_and_user_version() { let runtime = direct_runtime(); @@ -613,6 +668,21 @@ let shrunk_pages = sqlite_query_i64(db.as_ptr(), "PRAGMA page_count;") .expect("shrunk page_count should succeed"); assert!(shrunk_pages < grown_pages); + let ctx = direct_vfs_ctx(&db); + let state = ctx.state.read(); + assert_eq!(state.db_size_pages, shrunk_pages as u32); + assert!( + state + .page_cache + .iter() + .all(|entry| *entry.0 <= state.db_size_pages), + "VFS cache should not retain pages beyond the vacuumed EOF" + ); + assert!( + state.page_cache.entry_count() <= state.db_size_pages as u64, + "VFS cache should drain invalidated pages after vacuum" + ); + drop(state); for _ in 0..8 { sqlite_step_statement( diff --git a/engine/packages/universaldb/src/driver/postgres/transaction_task.rs b/engine/packages/universaldb/src/driver/postgres/transaction_task.rs index 3c811f2693..5f7527376e 100644 --- a/engine/packages/universaldb/src/driver/postgres/transaction_task.rs +++ b/engine/packages/universaldb/src/driver/postgres/transaction_task.rs @@ -9,9 +9,7 @@ use crate::{ options::{ConflictRangeType, MutationType}, tx_ops::Operation, value::{KeyValue, Slice, Values}, - versionstamp::{ - generate_versionstamp, substitute_raw_versionstamp, substitute_versionstamp_if_incomplete, - }, + versionstamp::{generate_versionstamp, substitute_raw_versionstamp}, }; pub enum TransactionCommand { @@ -386,9 +384,6 @@ impl TransactionTask { for op in operations { match op { Operation::Set { key, value } => { - // TODO: versionstamps need to be calculated on the sql side, not in rust - let value = substitute_versionstamp_if_incomplete(value.clone(), 0); - let query = "INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value = $2"; let stmt = tx.prepare_cached(query).await.map_err(map_postgres_error)?; diff --git a/engine/packages/universaldb/src/driver/rocksdb/database.rs b/engine/packages/universaldb/src/driver/rocksdb/database.rs index 65da99e451..9ac5d38971 100644 --- a/engine/packages/universaldb/src/driver/rocksdb/database.rs +++ b/engine/packages/universaldb/src/driver/rocksdb/database.rs @@ -22,6 +22,11 @@ use super::{ transaction::RocksDbTransactionDriver, transaction_conflict_tracker::TransactionConflictTracker, }; +const ROCKSDB_WRITE_BUFFER_SIZE_BYTES: usize = 4 * 1024 * 1024; +const ROCKSDB_DB_WRITE_BUFFER_SIZE_BYTES: usize = 16 * 1024 * 1024; +const ROCKSDB_MAX_WRITE_BUFFER_NUMBER: i32 = 2; +const ROCKSDB_MIN_WRITE_BUFFER_NUMBER_TO_MERGE: i32 = 1; + pub struct RocksDbDatabaseDriver { db: Arc, max_retries: AtomicI32, @@ -41,7 +46,11 @@ impl RocksDbDatabaseDriver { opts.set_max_open_files(10000); opts.set_keep_log_file_num(10); opts.set_max_total_wal_size(64 * 1024 * 1024); // 64MiB - opts.set_write_buffer_size(256 * 1024 * 1024); // 256MiB for conflict detection + opts.set_write_buffer_size(ROCKSDB_WRITE_BUFFER_SIZE_BYTES); + opts.set_db_write_buffer_size(ROCKSDB_DB_WRITE_BUFFER_SIZE_BYTES); + opts.set_max_write_buffer_number(ROCKSDB_MAX_WRITE_BUFFER_NUMBER); + opts.set_min_write_buffer_number_to_merge(ROCKSDB_MIN_WRITE_BUFFER_NUMBER_TO_MERGE); + opts.set_max_write_buffer_size_to_maintain(0); // Open the OptimisticTransactionDB tracing::debug!(path=%db_path.display(), "opening rocksdb"); diff --git a/engine/packages/universaldb/src/driver/rocksdb/transaction_task.rs b/engine/packages/universaldb/src/driver/rocksdb/transaction_task.rs index 4d4d7fb3e3..cbf1902ed2 100644 --- a/engine/packages/universaldb/src/driver/rocksdb/transaction_task.rs +++ b/engine/packages/universaldb/src/driver/rocksdb/transaction_task.rs @@ -14,9 +14,7 @@ use crate::{ options::{ConflictRangeType, MutationType}, tx_ops::Operation, value::{KeyValue, Slice, Values}, - versionstamp::{ - generate_versionstamp, substitute_raw_versionstamp, substitute_versionstamp_if_incomplete, - }, + versionstamp::{generate_versionstamp, substitute_raw_versionstamp}, }; pub enum TransactionCommand { @@ -322,11 +320,6 @@ impl TransactionTask { for op in operations { match op { Operation::Set { key, value } => { - // Substitute versionstamp if incomplete - // For now, just use the simple substitution - we can improve this later - // to ensure all versionstamps in a transaction have the same base timestamp - let value = substitute_versionstamp_if_incomplete(value.clone(), 0); - txn.put(key, &value) .context("failed to set key in rocksdb")?; } diff --git a/engine/packages/universaldb/tests/rocksdb.rs b/engine/packages/universaldb/tests/rocksdb.rs index 46cdcc77e6..73384fa4e0 100644 --- a/engine/packages/universaldb/tests/rocksdb.rs +++ b/engine/packages/universaldb/tests/rocksdb.rs @@ -15,7 +15,11 @@ use rocksdb::{OptimisticTransactionDB, Options, WriteOptions}; use universaldb::{ Database, prelude::*, - utils::{calculate_tx_retry_backoff, end_of_key_range}, + tuple::{Element, Versionstamp, pack_with_versionstamp}, + utils::{ + Subspace, calculate_tx_retry_backoff, end_of_key_range, + keys::{ACTIVE2, GASOLINE, KV, METRIC, RIVET, WORKER}, + }, }; use uuid::Uuid; @@ -272,3 +276,168 @@ async fn rocksdb_udb() { } } } + +#[tokio::test] +async fn rocksdb_normal_set_preserves_versionstamp_like_bytes() { + let _ = tracing_subscriber::fmt() + .with_env_filter("debug") + .with_test_writer() + .try_init(); + + let test_id = Uuid::new_v4(); + let (db_config, _docker_config) = TestDatabase::FileSystem.config(test_id, 1).await.unwrap(); + + let rivet_config::config::Database::FileSystem(fs_config) = db_config else { + unreachable!() + }; + + let driver = universaldb::driver::RocksDbDatabaseDriver::new(fs_config.path) + .await + .unwrap(); + let db = Database::new(Arc::new(driver)); + + let key = b"versionstamp-like-binary".to_vec(); + let value = pack_with_versionstamp(&vec![ + Element::Bytes(b"prefix".to_vec().into()), + Element::Versionstamp(Versionstamp::incomplete(0)), + Element::Bytes(b"suffix".to_vec().into()), + ]); + + db.run(|tx| { + let key = key.clone(); + let value = value.clone(); + + async move { + tx.set(&key, &value); + Ok(()) + } + }) + .await + .unwrap(); + + let stored = db + .run(|tx| { + let key = key.clone(); + + async move { Ok(tx.get(&key, Serializable).await?) } + }) + .await + .unwrap() + .unwrap(); + + assert_eq!(Vec::::from(stored), value); +} + +#[tokio::test] +async fn rocksdb_empty_range_before_next_keyspace_returns_empty() { + let _ = tracing_subscriber::fmt() + .with_env_filter("debug") + .with_test_writer() + .try_init(); + + let test_id = Uuid::new_v4(); + let (db_config, _docker_config) = TestDatabase::FileSystem.config(test_id, 1).await.unwrap(); + + let rivet_config::config::Database::FileSystem(fs_config) = db_config else { + unreachable!() + }; + + let driver = universaldb::driver::RocksDbDatabaseDriver::new(fs_config.path) + .await + .unwrap(); + let db = Database::new(Arc::new(driver)); + + let next_keyspace_key = b"b/metric".to_vec(); + db.run(|tx| { + let next_keyspace_key = next_keyspace_key.clone(); + + async move { + tx.set(&next_keyspace_key, b"value"); + Ok(()) + } + }) + .await + .unwrap(); + + let entries = db + .run(|tx| async move { + tx.get_ranges_keyvalues( + RangeOption { + begin: KeySelector::first_greater_or_equal(b"a/worker/active/after".to_vec()), + end: KeySelector::first_greater_or_equal(b"a/worker/active/\xff".to_vec()), + mode: StreamingMode::WantAll, + ..RangeOption::default() + }, + Serializable, + ) + .try_collect::>() + .await + }) + .await + .unwrap(); + + assert!(entries.is_empty(), "range leaked entries: {entries:?}"); +} + +#[tokio::test] +async fn rocksdb_empty_tuple_subspace_range_before_metric_key_returns_empty() { + let _ = tracing_subscriber::fmt() + .with_env_filter("debug") + .with_test_writer() + .try_init(); + + let test_id = Uuid::new_v4(); + let (db_config, _docker_config) = TestDatabase::FileSystem.config(test_id, 1).await.unwrap(); + + let rivet_config::config::Database::FileSystem(fs_config) = db_config else { + unreachable!() + }; + + let driver = universaldb::driver::RocksDbDatabaseDriver::new(fs_config.path) + .await + .unwrap(); + let db = Database::new(Arc::new(driver)); + + let gasoline_subspace = Subspace::new(&(RIVET, GASOLINE, KV)); + let metric_key = gasoline_subspace.pack(&(METRIC, 1_i64, "epoxy_replica_v2")); + db.run(|tx| { + let metric_key = metric_key.clone(); + + async move { + tx.set(&metric_key, b"value"); + Ok(()) + } + }) + .await + .unwrap(); + + let active_worker_start = gasoline_subspace.pack(&(WORKER, ACTIVE2, 1_762_049_020_000_i64)); + let active_worker_end = gasoline_subspace + .subspace(&(WORKER, ACTIVE2)) + .range() + .1; + + let entries = db + .run(|tx| { + let active_worker_start = active_worker_start.clone(); + let active_worker_end = active_worker_end.clone(); + + async move { + tx.get_ranges_keyvalues( + RangeOption { + begin: KeySelector::first_greater_or_equal(active_worker_start), + end: KeySelector::first_greater_or_equal(active_worker_end), + mode: StreamingMode::WantAll, + ..RangeOption::default() + }, + Serializable, + ) + .try_collect::>() + .await + } + }) + .await + .unwrap(); + + assert!(entries.is_empty(), "range leaked entries: {entries:?}"); +} diff --git a/rivetkit-rust/packages/rivetkit-core/src/actor/sqlite.rs b/rivetkit-rust/packages/rivetkit-core/src/actor/sqlite.rs index 3bd384fcfb..181fbd437f 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/actor/sqlite.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/actor/sqlite.rs @@ -2,8 +2,6 @@ use std::collections::HashSet; use std::io::Cursor; #[cfg(feature = "sqlite-local")] use std::sync::Arc; -#[cfg(feature = "sqlite-local")] -use std::time::Duration; use anyhow::{Context, Result}; #[cfg(feature = "sqlite-local")] @@ -21,18 +19,13 @@ use serde_json::{Map as JsonMap, Value as JsonValue}; use tokio::sync::Mutex as AsyncMutex; #[cfg(feature = "sqlite-local")] use tokio::task::JoinHandle; -#[cfg(feature = "sqlite-local")] -use tokio::time::{interval, timeout}; -#[cfg(feature = "sqlite-local")] -use tracing::Instrument; use crate::error::SqliteRuntimeError; #[cfg(feature = "sqlite-local")] use depot_client::{ database::{NativeDatabaseHandle, open_database_from_envoy}, - optimization_flags::sqlite_optimization_flags, - vfs::{SqliteVfsMetrics, SqliteVfsMetricsSnapshot, VfsPreloadHintSnapshot}, + vfs::{SqliteVfsMetrics, SqliteVfsMetricsSnapshot}, }; #[cfg(not(feature = "sqlite-local"))] @@ -44,13 +37,13 @@ pub struct SqliteVfsMetricsSnapshot { pub state_update_ns: u64, pub total_ns: u64, pub commit_count: u64, + pub page_cache_entries: u64, + pub page_cache_weighted_size: u64, + pub page_cache_capacity_pages: u64, + pub write_buffer_dirty_pages: u64, + pub db_size_pages: u64, } -#[cfg(feature = "sqlite-local")] -const PRELOAD_HINT_FLUSH_INTERVAL: Duration = Duration::from_secs(30); -#[cfg(feature = "sqlite-local")] -const PRELOAD_HINT_FLUSH_TIMEOUT: Duration = Duration::from_secs(5); - #[derive(Clone)] pub struct SqliteRuntimeConfig { pub handle: EnvoyHandle, @@ -366,44 +359,6 @@ impl SqliteDb { #[cfg(feature = "sqlite-local")] fn ensure_preload_hint_flush_task(&self) -> Result<()> { - if !sqlite_optimization_flags().preload_hint_flush { - return Ok(()); - } - - let config = self.runtime_config()?; - let Some(generation) = config.generation else { - return Ok(()); - }; - if self.db.lock().is_none() { - return Ok(()); - } - - let mut task_guard = self.preload_hint_flush_task.lock(); - if task_guard.is_some() { - return Ok(()); - } - - let db = self.db.clone(); - let handle = config.handle; - let actor_id = config.actor_id; - *task_guard = Some(tokio::spawn( - async move { - let mut tick = interval(PRELOAD_HINT_FLUSH_INTERVAL); - tick.tick().await; - loop { - tick.tick().await; - flush_preload_hints_best_effort( - db.clone(), - handle.clone(), - actor_id.clone(), - generation, - "periodic", - ) - .await; - } - } - .in_current_span(), - )); Ok(()) } @@ -416,24 +371,6 @@ impl SqliteDb { #[cfg(feature = "sqlite-local")] async fn flush_preload_hints_before_close(&self) { - if !sqlite_optimization_flags().preload_hint_flush { - return; - } - - let Ok(config) = self.runtime_config() else { - return; - }; - let Some(generation) = config.generation else { - return; - }; - - enqueue_preload_hint_flush_best_effort( - self.db.clone(), - config.handle, - config.actor_id, - generation, - ) - .await; } pub fn take_last_kv_error(&self) -> Option { @@ -621,175 +558,6 @@ impl SqliteDb { } } -#[cfg(feature = "sqlite-local")] -async fn enqueue_preload_hint_flush_best_effort( - db: Arc>>, - handle: EnvoyHandle, - actor_id: String, - generation: u64, -) { - let snapshot = match snapshot_preload_hints(db).await { - Ok(Some(snapshot)) => snapshot, - Ok(None) => return, - Err(error) => { - tracing::warn!( - actor_id = %actor_id, - ?error, - reason = "shutdown", - "sqlite preload hint snapshot failed" - ); - return; - } - }; - if snapshot.pgnos.is_empty() && snapshot.ranges.is_empty() { - return; - } - - let hint_count = snapshot.pgnos.len() + snapshot.ranges.len(); - let request = protocol::SqlitePersistPreloadHintsRequest { - actor_id: actor_id.clone(), - generation, - hints: protocol_preload_hints(snapshot), - }; - match handle.sqlite_persist_preload_hints_fire_and_forget(request) { - Ok(()) => { - tracing::debug!( - actor_id = %actor_id, - generation, - reason = "shutdown", - hint_count, - "sqlite preload hint flush queued" - ); - } - Err(error) => { - tracing::warn!( - actor_id = %actor_id, - generation, - reason = "shutdown", - hint_count, - ?error, - "sqlite preload hint flush queue failed" - ); - } - } -} - -#[cfg(feature = "sqlite-local")] -async fn flush_preload_hints_best_effort( - db: Arc>>, - handle: EnvoyHandle, - actor_id: String, - generation: u64, - reason: &'static str, -) { - let snapshot = match snapshot_preload_hints(db).await { - Ok(Some(snapshot)) => snapshot, - Ok(None) => return, - Err(error) => { - tracing::warn!( - actor_id = %actor_id, - ?error, - reason, - "sqlite preload hint snapshot failed" - ); - return; - } - }; - if snapshot.pgnos.is_empty() && snapshot.ranges.is_empty() { - return; - } - - let hint_count = snapshot.pgnos.len() + snapshot.ranges.len(); - let request = protocol::SqlitePersistPreloadHintsRequest { - actor_id: actor_id.clone(), - generation, - hints: protocol_preload_hints(snapshot), - }; - let response = timeout( - PRELOAD_HINT_FLUSH_TIMEOUT, - handle.sqlite_persist_preload_hints(request), - ) - .await; - match response { - Ok(Ok(protocol::SqlitePersistPreloadHintsResponse::SqlitePersistPreloadHintsOk)) => { - tracing::debug!( - actor_id = %actor_id, - generation, - reason, - hint_count, - "sqlite preload hints flushed" - ); - } - Ok(Ok(protocol::SqlitePersistPreloadHintsResponse::SqliteFenceMismatch(mismatch))) => { - tracing::debug!( - actor_id = %actor_id, - generation, - reason, - hint_count, - fence_reason = %mismatch.reason, - "sqlite preload hint flush skipped after fence mismatch" - ); - } - Ok(Ok(protocol::SqlitePersistPreloadHintsResponse::SqliteErrorResponse(error))) => { - tracing::warn!( - actor_id = %actor_id, - generation, - reason, - hint_count, - error = %error.message, - "sqlite preload hint flush failed" - ); - } - Ok(Err(error)) => { - tracing::warn!( - actor_id = %actor_id, - generation, - reason, - hint_count, - ?error, - "sqlite preload hint flush failed" - ); - } - Err(_) => { - tracing::warn!( - actor_id = %actor_id, - generation, - reason, - hint_count, - timeout_ms = PRELOAD_HINT_FLUSH_TIMEOUT.as_millis() as u64, - "sqlite preload hint flush timed out" - ); - } - } -} - -#[cfg(feature = "sqlite-local")] -async fn snapshot_preload_hints( - db: Arc>>, -) -> Result> { - tokio::task::spawn_blocking(move || { - let guard = db.lock(); - Ok(guard.as_ref().map(NativeDatabaseHandle::snapshot_preload_hints)) - }) - .await - .context("join sqlite preload hint snapshot task")? -} - -#[cfg(feature = "sqlite-local")] -fn protocol_preload_hints(snapshot: VfsPreloadHintSnapshot) -> protocol::SqlitePreloadHints { - protocol::SqlitePreloadHints { - pgnos: snapshot.pgnos, - ranges: snapshot - .ranges - .into_iter() - .map(|range| protocol::SqlitePreloadHintRange { - start_pgno: range.start_pgno, - page_count: range.page_count, - }) - .collect(), - } -} - struct RemoteSqliteConfig { handle: EnvoyHandle, namespace_id: String,