diff --git a/engine/packages/depot-client/tests/inline/fault/scenario.rs b/engine/packages/depot-client/tests/inline/fault/scenario.rs index 1489b5d224..251b134147 100644 --- a/engine/packages/depot-client/tests/inline/fault/scenario.rs +++ b/engine/packages/depot-client/tests/inline/fault/scenario.rs @@ -3,7 +3,8 @@ use std::future::Future; use std::path::Path; use std::pin::Pin; use std::ptr; -use std::sync::Arc; +use std::sync::{Arc, LazyLock}; +use std::time::Duration; use anyhow::{Context, Result, bail, ensure}; use depot::{ @@ -55,6 +56,8 @@ type StageFuture = Pin>>>; type Stage = Box StageFuture>; type FaultSetup = Box Result<()>>; +static FAULT_SCENARIO_RUN_LOCK: LazyLock> = LazyLock::new(|| Mutex::new(())); + #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub(crate) enum FaultProfile { Simple, @@ -190,6 +193,15 @@ impl FaultScenario { } pub(crate) fn run(self) -> Result<()> { + // Fault scenarios install process-global workflow hooks for compaction + // workflows, then spin and shut down workflow workers. Running multiple + // scenarios in the same test process can make one scenario observe another + // scenario's worker/debug lifecycle instead of its own force-compaction ack. + let Some(_run_guard) = FAULT_SCENARIO_RUN_LOCK.try_lock() else { + bail!( + "depot-client fault scenarios cannot run in parallel; rerun with `cargo test -p depot-client fault -- --test-threads=1`" + ); + }; let runtime = Builder::new_multi_thread() .worker_threads(2) .enable_all() @@ -436,10 +448,18 @@ impl FaultScenarioCtx { let manager_workflow_id = self.manager_workflow_id(database_branch_id).await?; let test_ctx = self.inner.test_ctx.lock().await; DepotCompactionTestDriver::new(&test_ctx) + .with_wait_timeout(self.force_compaction_wait_timeout()) .force_compaction(manager_workflow_id, database_branch_id, work) .await } + fn force_compaction_wait_timeout(&self) -> Duration { + match self.inner.profile { + FaultProfile::Simple => Duration::from_secs(30), + FaultProfile::Chaos => Duration::from_secs(120), + } + } + pub(crate) async fn verify_sqlite_integrity(&self) -> Result<()> { self.with_database_blocking(|db| NativeSqliteOracle::verify_integrity(db.as_ptr()))?; self.inner.oracle.lock().verify_oracle_integrity()?; diff --git a/engine/packages/depot-client/tests/inline/fault/verify.rs b/engine/packages/depot-client/tests/inline/fault/verify.rs index 6261930722..eef4db0625 100644 --- a/engine/packages/depot-client/tests/inline/fault/verify.rs +++ b/engine/packages/depot-client/tests/inline/fault/verify.rs @@ -4,16 +4,18 @@ use std::sync::Arc; use anyhow::{Context, Result, anyhow, bail, ensure}; use depot::{ cold_tier::ColdTier, + conveyer::branch::resolve_database_branch, keys, ltx::{DecodedLtx, decode_ltx_v3}, types::{ - BranchState, ColdShardRef, CommitRow, DatabaseBranchId, decode_cold_shard_ref, + BranchState, BucketId, ColdShardRef, CommitRow, DatabaseBranchId, decode_cold_shard_ref, decode_commit_row, decode_compaction_root, decode_database_branch_record, decode_database_pointer, decode_db_head, decode_db_history_pin, decode_pitr_interval_coverage, decode_retired_cold_object, decode_sqlite_cmp_dirty, }, }; use futures_util::TryStreamExt; +use rivet_pools::__rivet_util::Id; use sha2::{Digest, Sha256}; use universaldb::{ RangeOption, @@ -119,7 +121,14 @@ impl<'a> InvariantScan<'a> { } async fn check_database_pointer(&mut self) -> Result> { - let mut current = None; + let resolved = resolve_database_branch( + self.tx, + BucketId::from_gas_id(Id::nil()), + &self.database_id, + Serializable, + ) + .await?; + let mut scanned_current = None; for (key, value) in scan_prefix(self.tx, keys::database_pointer_cur_prefix()).await? { let decoded_key = keys::decode_database_pointer_cur_key(&key); let pointer = decode_database_pointer(&value); @@ -127,7 +136,7 @@ impl<'a> InvariantScan<'a> { (Ok((_bucket_branch_id, database_id)), Ok(pointer)) if database_id == self.database_id => { - if current.replace(pointer.current_branch).is_some() { + if scanned_current.replace(pointer.current_branch).is_some() { self.violate("database pointer appeared more than once"); } } @@ -141,13 +150,16 @@ impl<'a> InvariantScan<'a> { } } - if current.is_none() { - self.violate(format!( - "database pointer for {} is missing", - self.database_id - )); + let Some(current) = resolved else { + self.violate(format!("database pointer for {} is missing", self.database_id)); + return Ok(None); + }; + if let Some(scanned_current) = scanned_current + && scanned_current != current + { + self.violate("database pointer scan disagreed with branch resolution"); } - Ok(current) + Ok(Some(current)) } async fn check_branch_record(&mut self, branch_id: DatabaseBranchId) -> Result<()> {