Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ zebra-rpc = "6.0.2"
# Runtime
tokio = { version = "1.38", features = ["full"] }
tokio-stream = "0.1"
tokio-util = "0.7"

# CLI
clap = "4.0"
Expand Down
3 changes: 0 additions & 3 deletions integration-tests/.config/nextest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ default-filter = """
test(/^zcashd::get::lightd_info$/) |
test(/^zcashd::get::mining_info$/) |
test(/^zcashd::get::peer_info$/) |
test(/^zcashd::get::subtree_roots$/) |
test(/^zcashd::validation::validate_address$/) |
test(/^zcashd::zcash_indexer::get_best_blockhash$/) |
test(/^zcashd::zcash_indexer::get_block_count$/) |
Expand All @@ -180,9 +179,7 @@ default-filter = """
test(/^zebrad::fetch_service::connect_to_node_get_info$/) |
test(/^zebrad::get::best_blockhash$/) |
test(/^zebrad::get::block_count$/) |
test(/^zebrad::get::block_object$/) |
test(/^zebrad::get::block_range_nullifiers$/) |
test(/^zebrad::get::difficulty$/) |
test(/^zebrad::get::get_network_sol_ps$/) |
test(/^zebrad::get::latest_block$/) |
test(/^zebrad::get::subtree_roots$/) |
Expand Down
1 change: 1 addition & 0 deletions packages/zaino-state/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ chrono = { workspace = true }
indexmap = { workspace = true }
hex = { workspace = true, features = ["serde"] }
tokio-stream = { workspace = true }
tokio-util = { workspace = true }
futures = { workspace = true }
tonic = { workspace = true }
dashmap = { workspace = true }
Expand Down
199 changes: 191 additions & 8 deletions packages/zaino-state/src/chain_index/finalised_state/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,125 @@ use crate::{
config::BlockCacheConfig,
error::FinalisedStateError,
BlockHash, BlockHeaderData, CommitmentTreeData, CompactBlockStream, Height, IndexedBlock,
OrchardCompactTx, OrchardTxList, SaplingCompactTx, SaplingTxList, StatusType,
TransparentCompactTx, TransparentTxList, TxLocation, TxidList,
NamedAtomicStatus, OrchardCompactTx, OrchardTxList, SaplingCompactTx, SaplingTxList,
StatusType, TransparentCompactTx, TransparentTxList, TxLocation, TxidList,
};

#[cfg(feature = "transparent_address_history_experimental")]
use crate::{chain_index::finalised_state::capability::TransparentHistExt, AddrScript, Outpoint};

use async_trait::async_trait;
use std::time::Duration;
use tokio::time::{interval, MissedTickBehavior};
use lmdb::{Database, DatabaseFlags, Environment};
use std::{
sync::{Arc, Mutex},
time::Duration,
};
use tokio::{
task::JoinHandle,
time::{interval, sleep, MissedTickBehavior},
};
use tokio_util::sync::CancellationToken;
use tracing::warn;

use super::capability::Capability;

/// Lifecycle scaffolding shared by every `DbVx` finalised-state backend.
///
/// Implementors expose the four shared struct fields via required getters;
/// provided methods cover the duplicated `status()`, `wait_until_ready()`,
/// `shutdown()`, `clean_trailing()`, and the background task's per-iteration
/// `zaino_db_handler_sleep()`.
#[async_trait]
pub(super) trait DbLifecycle: Sync {
fn env(&self) -> &Arc<Environment>;
fn db_handler_slot(&self) -> &Mutex<Option<JoinHandle<()>>>;
fn cancel_token(&self) -> &CancellationToken;
fn status_atomic(&self) -> &NamedAtomicStatus;

fn status(&self) -> StatusType {
self.status_atomic().load()
}

async fn wait_until_ready(&self) {
let mut ticker = interval(Duration::from_millis(100));
ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
ticker.tick().await;
if self.status_atomic().load() == StatusType::Ready {
break;
}
}
}

async fn clean_trailing(&self) -> Result<(), FinalisedStateError> {
let txn = self.env().begin_ro_txn()?;
drop(txn);
Ok(())
}

async fn zaino_db_handler_sleep(&self, maintenance: &mut tokio::time::Interval) {
tokio::select! {
_ = sleep(Duration::from_secs(5)) => {},
_ = maintenance.tick() => {
if let Err(e) = self.clean_trailing().await {
warn!("clean_trailing failed: {}", e);
}
}
_ = self.cancel_token().cancelled() => {},
}
}

async fn shutdown(&self) -> Result<(), FinalisedStateError> {
self.status_atomic().store(StatusType::Closing);
self.cancel_token().cancel();

let taken = self
.db_handler_slot()
.lock()
.expect("db_handler mutex poisoned")
.take();
if let Some(mut handle) = taken {
let timeout = sleep(Duration::from_secs(5));
tokio::pin!(timeout);

tokio::select! {
res = &mut handle => {
match res {
Ok(_) => {}
Err(e) if e.is_cancelled() => {}
Err(e) => warn!("background task ended with error: {e:?}"),
}
}
_ = &mut timeout => {
warn!("background task didn't exit in time – aborting");
handle.abort();
}
}
}

let _ = self.clean_trailing().await;
if let Err(e) = self.env().sync(true) {
warn!("LMDB fsync before close failed: {e}");
}
Ok(())
}
}

/// Open an LMDB database if present, otherwise create it.
pub(super) async fn open_or_create_db(
env: &Environment,
name: &str,
flags: DatabaseFlags,
) -> Result<Database, FinalisedStateError> {
match env.open_db(Some(name)) {
Ok(db) => Ok(db),
Err(lmdb::Error::NotFound) => env
.create_db(Some(name), flags)
.map_err(FinalisedStateError::LmdbError),
Err(e) => Err(FinalisedStateError::LmdbError(e)),
}
}

/// Version subdirectory names for versioned on-disk layouts.
///
/// This list defines the supported major-version directory names under a per-network directory.
Expand Down Expand Up @@ -190,8 +296,8 @@ impl DbCore for DbBackend {
/// This is a thin delegation wrapper over the concrete implementation.
fn status(&self) -> StatusType {
match self {
Self::V0(db) => db.status(),
Self::V1(db) => db.status(),
Self::V0(db) => DbCore::status(db),
Self::V1(db) => DbCore::status(db),
}
}

Expand All @@ -200,8 +306,8 @@ impl DbCore for DbBackend {
/// This is a thin delegation wrapper over the concrete implementation.
async fn shutdown(&self) -> Result<(), FinalisedStateError> {
match self {
Self::V0(db) => db.shutdown().await,
Self::V1(db) => db.shutdown().await,
Self::V0(db) => DbCore::shutdown(db).await,
Self::V1(db) => DbCore::shutdown(db).await,
}
}
}
Expand Down Expand Up @@ -626,3 +732,80 @@ impl TransparentHistExt for DbBackend {
}
}
}

#[cfg(test)]
mod shutdown {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::{sync::Barrier, time::timeout};

struct FakeDb {
env: Arc<Environment>,
db_handler: Mutex<Option<JoinHandle<()>>>,
cancel_token: CancellationToken,
status: NamedAtomicStatus,
}

impl DbLifecycle for FakeDb {
fn env(&self) -> &Arc<Environment> {
&self.env
}
fn db_handler_slot(&self) -> &Mutex<Option<JoinHandle<()>>> {
&self.db_handler
}
fn cancel_token(&self) -> &CancellationToken {
&self.cancel_token
}
fn status_atomic(&self) -> &NamedAtomicStatus {
&self.status
}
}

/// Regression for #1033 — every task awaiting cancellation must observe shutdown,
/// not just one. Originally written against the `Notify::notify_one` implementation
/// (which strands N-1 waiters); now passes against `CancellationToken::cancel`,
/// which wakes all current waiters and persists state for late subscribers.
#[tokio::test]
async fn wakes_every_shutdown_waiter() {
let tmp = tempfile::tempdir().unwrap();
let env = Arc::new(
lmdb::Environment::new()
.set_map_size(1 << 20)
Copy link
Copy Markdown
Member Author

@zancas zancas Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@idky137 I don't understand why this is necessary.

.open(tmp.path())
.unwrap(),
);
let db = Arc::new(FakeDb {
env,
db_handler: Mutex::new(None),
cancel_token: CancellationToken::new(),
status: NamedAtomicStatus::new("test", StatusType::Ready),
});

const N: usize = 3;
let woke = Arc::new(AtomicUsize::new(0));
let barrier = Arc::new(Barrier::new(N + 1));

let mut waiters = Vec::with_capacity(N);
for _ in 0..N {
let token = db.cancel_token.clone();
let woke = Arc::clone(&woke);
let barrier = Arc::clone(&barrier);
waiters.push(tokio::spawn(async move {
barrier.wait().await;
token.cancelled().await;
woke.fetch_add(1, Ordering::Relaxed);
}));
}
barrier.wait().await;

DbLifecycle::shutdown(db.as_ref()).await.unwrap();

for (i, w) in waiters.into_iter().enumerate() {
timeout(Duration::from_millis(200), w)
.await
.unwrap_or_else(|_| panic!("waiter {i} stranded: cancel_token woke only a subset"))
.unwrap();
}
assert_eq!(woke.load(Ordering::Relaxed), N);
}
}
Loading
Loading