diff --git a/engine/packages/depot-client/src/database.rs b/engine/packages/depot-client/src/database.rs index 3ca29d2d5e..d68364fc4a 100644 --- a/engine/packages/depot-client/src/database.rs +++ b/engine/packages/depot-client/src/database.rs @@ -12,8 +12,8 @@ use crate::{ exec_statements, execute_single_statement, install_reader_authorizer, }, vfs::{ - NativeVfsHandle, SqliteVfs, SqliteVfsMetrics, VfsConfig, VfsPreloadHintSnapshot, - configure_connection_for_database, verify_batch_atomic_writes, + NativeVfsHandle, SqliteVfs, SqliteVfsMetrics, SqliteVfsMetricsSnapshot, VfsConfig, + VfsPreloadHintSnapshot, configure_connection_for_database, verify_batch_atomic_writes, }, }; @@ -166,6 +166,10 @@ impl NativeDatabaseHandle { self.vfs.snapshot_preload_hints() } + pub fn sqlite_vfs_metrics(&self) -> SqliteVfsMetricsSnapshot { + self.vfs.sqlite_vfs_metrics() + } + #[cfg(test)] pub(crate) fn manager(&self) -> NativeConnectionManager { self.manager.clone() diff --git a/engine/packages/depot-client/src/vfs.rs b/engine/packages/depot-client/src/vfs.rs index e407a42912..e553ec5c5e 100644 --- a/engine/packages/depot-client/src/vfs.rs +++ b/engine/packages/depot-client/src/vfs.rs @@ -4,6 +4,7 @@ use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; use std::ffi::{CStr, CString, c_char, c_int, c_void}; +use std::future::Future; use std::ptr; use std::slice; use std::sync::Arc; @@ -16,7 +17,7 @@ use moka::sync::Cache; use parking_lot::{Mutex, RwLock}; use rivet_envoy_client::handle::EnvoyHandle; use rivet_envoy_protocol as protocol; -use tokio::runtime::Handle; +use tokio::runtime::{Handle, RuntimeFlavor}; use crate::optimization_flags::{SqliteOptimizationFlags, sqlite_optimization_flags}; @@ -908,7 +909,7 @@ impl VfsContext { state.seed_main_page(page); } } else { - let snapshot = runtime.block_on(storage.snapshot_pages(&actor_id)); + let snapshot = block_on_runtime(&runtime, storage.snapshot_pages(&actor_id)); if snapshot.db_size_pages > 0 { state.db_size_pages = snapshot.db_size_pages; state.page_cache.invalidate_all(); @@ -1016,17 +1017,16 @@ impl VfsContext { CommitBufferError, > { let commit = commit_buffered_pages(&self.transport, request); - let result = if let Some(timeout) = timeout { - match self - .runtime - .block_on(async { tokio::time::timeout(timeout, commit).await }) - { - Ok(result) => result, - Err(_) => return Ok(CommitWait::TimedOut), - } - } else { - self.runtime.block_on(commit) - }; + let result = if let Some(timeout) = timeout { + match block_on_runtime(&self.runtime, async { + tokio::time::timeout(timeout, commit).await + }) { + Ok(result) => result, + Err(_) => return Ok(CommitWait::TimedOut), + } + } else { + block_on_runtime(&self.runtime, commit) + }; result.map(CommitWait::Completed) } @@ -1233,14 +1233,15 @@ impl VfsContext { } let get_pages_start = Instant::now(); - let response = self - .runtime - .block_on(self.transport.get_pages(protocol::SqliteGetPagesRequest { - actor_id: self.actor_id.clone(), - pgnos: to_fetch.clone(), - expected_generation: None, - expected_head_txid: None, - })) + let response = block_on_runtime( + &self.runtime, + self.transport.get_pages(protocol::SqliteGetPagesRequest { + actor_id: self.actor_id.clone(), + pgnos: to_fetch.clone(), + expected_generation: None, + expected_head_txid: None, + }), + ) .map_err(|err| GetPagesError::Other(err.to_string()))?; if let Some(metrics) = &self.metrics { metrics.observe_get_pages_duration(get_pages_start.elapsed().as_nanos() as u64); @@ -1544,17 +1545,29 @@ fn mark_dead_from_fence_commit_error(ctx: &VfsContext, err: &CommitBufferError) } } +fn block_on_runtime(runtime: &Handle, future: F) -> F::Output { + match Handle::try_current() { + Ok(current) if current.runtime_flavor() == RuntimeFlavor::MultiThread => { + tokio::task::block_in_place(|| runtime.block_on(future)) + } + Ok(_) | Err(_) => runtime.block_on(future), + } +} + fn fetch_initial_main_page( transport: &SqliteTransport, runtime: &Handle, actor_id: &str, ) -> std::result::Result>, String> { - let response = runtime.block_on(transport.get_pages(protocol::SqliteGetPagesRequest { - actor_id: actor_id.to_string(), - pgnos: vec![1], - expected_generation: None, - expected_head_txid: None, - })); + let response = block_on_runtime( + runtime, + transport.get_pages(protocol::SqliteGetPagesRequest { + actor_id: actor_id.to_string(), + pgnos: vec![1], + expected_generation: None, + expected_head_txid: None, + }), + ); match response { Ok(protocol::SqliteGetPagesResponse::SqliteGetPagesOk(ok)) => Ok(ok @@ -2490,6 +2503,10 @@ impl SqliteVfs { self.ctx.snapshot_preload_hints() } + pub(crate) fn sqlite_vfs_metrics(&self) -> SqliteVfsMetricsSnapshot { + self.ctx.sqlite_vfs_metrics() + } + fn register_with_transport( name: &str, transport: SqliteTransport,