Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion engine/packages/depot-client/tests/inline/fault/scenario.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use std::future::Future;
use std::path::Path;
use std::pin::Pin;
use std::ptr;
use std::sync::Arc;
use std::sync::{Arc, LazyLock};
use std::time::Duration;

use anyhow::{Context, Result, bail, ensure};
use depot::{
Expand Down Expand Up @@ -55,6 +56,8 @@ type StageFuture = Pin<Box<dyn Future<Output = Result<()>>>>;
type Stage = Box<dyn FnOnce(FaultScenarioCtx) -> StageFuture>;
type FaultSetup = Box<dyn FnOnce(&DepotFaultController) -> Result<()>>;

static FAULT_SCENARIO_RUN_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) enum FaultProfile {
Simple,
Expand Down Expand Up @@ -190,6 +193,15 @@ impl FaultScenario {
}

pub(crate) fn run(self) -> Result<()> {
// Fault scenarios install process-global workflow hooks for compaction
// workflows, then spin and shut down workflow workers. Running multiple
// scenarios in the same test process can make one scenario observe another
// scenario's worker/debug lifecycle instead of its own force-compaction ack.
let Some(_run_guard) = FAULT_SCENARIO_RUN_LOCK.try_lock() else {
bail!(
"depot-client fault scenarios cannot run in parallel; rerun with `cargo test -p depot-client fault -- --test-threads=1`"
);
};
let runtime = Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
Expand Down Expand Up @@ -436,10 +448,18 @@ impl FaultScenarioCtx {
let manager_workflow_id = self.manager_workflow_id(database_branch_id).await?;
let test_ctx = self.inner.test_ctx.lock().await;
DepotCompactionTestDriver::new(&test_ctx)
.with_wait_timeout(self.force_compaction_wait_timeout())
.force_compaction(manager_workflow_id, database_branch_id, work)
.await
}

fn force_compaction_wait_timeout(&self) -> Duration {
match self.inner.profile {
FaultProfile::Simple => Duration::from_secs(30),
FaultProfile::Chaos => Duration::from_secs(120),
}
}

pub(crate) async fn verify_sqlite_integrity(&self) -> Result<()> {
self.with_database_blocking(|db| NativeSqliteOracle::verify_integrity(db.as_ptr()))?;
self.inner.oracle.lock().verify_oracle_integrity()?;
Expand Down
30 changes: 21 additions & 9 deletions engine/packages/depot-client/tests/inline/fault/verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@
use anyhow::{Context, Result, anyhow, bail, ensure};
use depot::{
cold_tier::ColdTier,
conveyer::branch::resolve_database_branch,
keys,
ltx::{DecodedLtx, decode_ltx_v3},
types::{
BranchState, ColdShardRef, CommitRow, DatabaseBranchId, decode_cold_shard_ref,
BranchState, BucketId, ColdShardRef, CommitRow, DatabaseBranchId, decode_cold_shard_ref,
decode_commit_row, decode_compaction_root, decode_database_branch_record,
decode_database_pointer, decode_db_head, decode_db_history_pin,
decode_pitr_interval_coverage, decode_retired_cold_object, decode_sqlite_cmp_dirty,
},
};
use futures_util::TryStreamExt;
use rivet_pools::__rivet_util::Id;
use sha2::{Digest, Sha256};
use universaldb::{
RangeOption,
Expand Down Expand Up @@ -119,15 +121,22 @@
}

async fn check_database_pointer(&mut self) -> Result<Option<DatabaseBranchId>> {
let mut current = None;
let resolved = resolve_database_branch(
self.tx,
BucketId::from_gas_id(Id::nil()),
&self.database_id,
Serializable,
)
.await?;
let mut scanned_current = None;
for (key, value) in scan_prefix(self.tx, keys::database_pointer_cur_prefix()).await? {
let decoded_key = keys::decode_database_pointer_cur_key(&key);
let pointer = decode_database_pointer(&value);
match (decoded_key, pointer) {
(Ok((_bucket_branch_id, database_id)), Ok(pointer))
if database_id == self.database_id =>
{
if current.replace(pointer.current_branch).is_some() {
if scanned_current.replace(pointer.current_branch).is_some() {
self.violate("database pointer appeared more than once");
}
}
Expand All @@ -139,15 +148,18 @@
self.violate(format!("database pointer value failed to decode: {err:#}"))
}
}
}

Check warning on line 151 in engine/packages/depot-client/tests/inline/fault/verify.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/engine/packages/depot-client/src/../tests/inline/fault/verify.rs

if current.is_none() {
self.violate(format!(
"database pointer for {} is missing",
self.database_id
));
let Some(current) = resolved else {
self.violate(format!("database pointer for {} is missing", self.database_id));
return Ok(None);
};
if let Some(scanned_current) = scanned_current
&& scanned_current != current
{
self.violate("database pointer scan disagreed with branch resolution");
}
Ok(current)
Ok(Some(current))
}

async fn check_branch_record(&mut self, branch_id: DatabaseBranchId) -> Result<()> {
Expand Down
Loading