diff --git a/Cargo.lock b/Cargo.lock index 244056723..76e944a73 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5603,6 +5603,7 @@ dependencies = [ "thiserror 1.0.69", "tokio", "tokio-stream", + "tokio-util", "tonic", "tower 0.4.13", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 51c49d0fe..fc3a02411 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,6 +55,7 @@ zebra-rpc = "6.0.2" # Runtime tokio = { version = "1.38", features = ["full"] } tokio-stream = "0.1" +tokio-util = "0.7" # CLI clap = "4.0" diff --git a/integration-tests/.config/nextest.toml b/integration-tests/.config/nextest.toml index 7eeebc5b9..561eb2c77 100644 --- a/integration-tests/.config/nextest.toml +++ b/integration-tests/.config/nextest.toml @@ -155,7 +155,6 @@ default-filter = """ test(/^zcashd::get::lightd_info$/) | test(/^zcashd::get::mining_info$/) | test(/^zcashd::get::peer_info$/) | - test(/^zcashd::get::subtree_roots$/) | test(/^zcashd::validation::validate_address$/) | test(/^zcashd::zcash_indexer::get_best_blockhash$/) | test(/^zcashd::zcash_indexer::get_block_count$/) | @@ -180,9 +179,7 @@ default-filter = """ test(/^zebrad::fetch_service::connect_to_node_get_info$/) | test(/^zebrad::get::best_blockhash$/) | test(/^zebrad::get::block_count$/) | - test(/^zebrad::get::block_object$/) | test(/^zebrad::get::block_range_nullifiers$/) | - test(/^zebrad::get::difficulty$/) | test(/^zebrad::get::get_network_sol_ps$/) | test(/^zebrad::get::latest_block$/) | test(/^zebrad::get::subtree_roots$/) | diff --git a/packages/zaino-state/Cargo.toml b/packages/zaino-state/Cargo.toml index c1b20c87b..fb17ba035 100644 --- a/packages/zaino-state/Cargo.toml +++ b/packages/zaino-state/Cargo.toml @@ -57,6 +57,7 @@ chrono = { workspace = true } indexmap = { workspace = true } hex = { workspace = true, features = ["serde"] } tokio-stream = { workspace = true } +tokio-util = { workspace = true } futures = { workspace = true } tonic = { workspace = true } dashmap = { workspace = true } diff --git a/packages/zaino-state/src/chain_index/finalised_state/db.rs b/packages/zaino-state/src/chain_index/finalised_state/db.rs index 7a6bdd47b..0cce7cb13 100644 --- a/packages/zaino-state/src/chain_index/finalised_state/db.rs +++ b/packages/zaino-state/src/chain_index/finalised_state/db.rs @@ -71,19 +71,129 @@ use crate::{ config::BlockCacheConfig, error::FinalisedStateError, BlockHash, BlockHeaderData, CommitmentTreeData, CompactBlockStream, Height, IndexedBlock, - OrchardCompactTx, OrchardTxList, SaplingCompactTx, SaplingTxList, StatusType, - TransparentCompactTx, TransparentTxList, TxLocation, TxidList, + NamedAtomicStatus, OrchardCompactTx, OrchardTxList, SaplingCompactTx, SaplingTxList, + StatusType, TransparentCompactTx, TransparentTxList, TxLocation, TxidList, }; #[cfg(feature = "transparent_address_history_experimental")] use crate::{chain_index::finalised_state::capability::TransparentHistExt, AddrScript, Outpoint}; use async_trait::async_trait; -use std::time::Duration; -use tokio::time::{interval, MissedTickBehavior}; +use lmdb::{Database, DatabaseFlags, Environment}; +use std::{ + sync::{Arc, Mutex}, + time::Duration, +}; +use tokio::{ + task::JoinHandle, + time::{interval, sleep, MissedTickBehavior}, +}; +use tokio_util::sync::CancellationToken; +use tracing::warn; use super::capability::Capability; +/// Lifecycle scaffolding shared by every `DbVx` finalised-state backend. +/// +/// Implementors expose the four shared struct fields via required getters; +/// provided methods cover the duplicated `status()`, `wait_until_ready()`, +/// `shutdown()`, `clean_trailing()`, and the background task's per-iteration +/// `zaino_db_handler_sleep()`. +/// +/// Note: This trait ties any DB version that uses it to Lmdb. +/// In the future we may want to support alternative DB backends. +/// When this happens, we will have to lean away from this trait to some extent. +#[async_trait] +pub(super) trait LmdbLifecycle: Sync { + fn env(&self) -> &Arc; + fn db_handler_slot(&self) -> &Mutex>>; + fn cancel_token(&self) -> &CancellationToken; + fn status_atomic(&self) -> &NamedAtomicStatus; + + fn status(&self) -> StatusType { + self.status_atomic().load() + } + + async fn wait_until_ready(&self) { + let mut ticker = interval(Duration::from_millis(100)); + ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); + loop { + ticker.tick().await; + if self.status_atomic().load() == StatusType::Ready { + break; + } + } + } + + async fn clean_trailing(&self) -> Result<(), FinalisedStateError> { + let txn = self.env().begin_ro_txn()?; + drop(txn); + Ok(()) + } + + async fn zaino_db_handler_sleep(&self, maintenance: &mut tokio::time::Interval) { + tokio::select! { + _ = sleep(Duration::from_secs(5)) => {}, + _ = maintenance.tick() => { + if let Err(e) = self.clean_trailing().await { + warn!("clean_trailing failed: {}", e); + } + } + _ = self.cancel_token().cancelled() => {}, + } + } + + async fn shutdown(&self) -> Result<(), FinalisedStateError> { + self.status_atomic().store(StatusType::Closing); + self.cancel_token().cancel(); + + let taken = self + .db_handler_slot() + .lock() + .expect("db_handler mutex poisoned") + .take(); + if let Some(mut handle) = taken { + let timeout = sleep(Duration::from_secs(5)); + tokio::pin!(timeout); + + tokio::select! { + res = &mut handle => { + match res { + Ok(_) => {} + Err(e) if e.is_cancelled() => {} + Err(e) => warn!("background task ended with error: {e:?}"), + } + } + _ = &mut timeout => { + warn!("background task didn't exit in time – aborting"); + handle.abort(); + } + } + } + + let _ = self.clean_trailing().await; + if let Err(e) = self.env().sync(true) { + warn!("LMDB fsync before close failed: {e}"); + } + Ok(()) + } +} + +/// Open an LMDB database if present, otherwise create it. +pub(super) async fn open_or_create_db( + env: &Environment, + name: &str, + flags: DatabaseFlags, +) -> Result { + match env.open_db(Some(name)) { + Ok(db) => Ok(db), + Err(lmdb::Error::NotFound) => env + .create_db(Some(name), flags) + .map_err(FinalisedStateError::LmdbError), + Err(e) => Err(FinalisedStateError::LmdbError(e)), + } +} + /// Version subdirectory names for versioned on-disk layouts. /// /// This list defines the supported major-version directory names under a per-network directory. @@ -190,8 +300,8 @@ impl DbCore for DbBackend { /// This is a thin delegation wrapper over the concrete implementation. fn status(&self) -> StatusType { match self { - Self::V0(db) => db.status(), - Self::V1(db) => db.status(), + Self::V0(db) => DbCore::status(db), + Self::V1(db) => DbCore::status(db), } } @@ -200,8 +310,8 @@ impl DbCore for DbBackend { /// This is a thin delegation wrapper over the concrete implementation. async fn shutdown(&self) -> Result<(), FinalisedStateError> { match self { - Self::V0(db) => db.shutdown().await, - Self::V1(db) => db.shutdown().await, + Self::V0(db) => DbCore::shutdown(db).await, + Self::V1(db) => DbCore::shutdown(db).await, } } } @@ -626,3 +736,80 @@ impl TransparentHistExt for DbBackend { } } } + +#[cfg(test)] +mod shutdown { + use super::*; + use std::sync::atomic::{AtomicUsize, Ordering}; + use tokio::{sync::Barrier, time::timeout}; + + struct FakeDb { + env: Arc, + db_handler: Mutex>>, + cancel_token: CancellationToken, + status: NamedAtomicStatus, + } + + impl LmdbLifecycle for FakeDb { + fn env(&self) -> &Arc { + &self.env + } + fn db_handler_slot(&self) -> &Mutex>> { + &self.db_handler + } + fn cancel_token(&self) -> &CancellationToken { + &self.cancel_token + } + fn status_atomic(&self) -> &NamedAtomicStatus { + &self.status + } + } + + /// Regression for #1033 — every task awaiting cancellation must observe shutdown, + /// not just one. Originally written against the `Notify::notify_one` implementation + /// (which strands N-1 waiters); now passes against `CancellationToken::cancel`, + /// which wakes all current waiters and persists state for late subscribers. + #[tokio::test] + async fn wakes_every_shutdown_waiter() { + let tmp = tempfile::tempdir().unwrap(); + let env = Arc::new( + lmdb::Environment::new() + .set_map_size(1 << 20) + .open(tmp.path()) + .unwrap(), + ); + let db = Arc::new(FakeDb { + env, + db_handler: Mutex::new(None), + cancel_token: CancellationToken::new(), + status: NamedAtomicStatus::new("test", StatusType::Ready), + }); + + const N: usize = 3; + let woke = Arc::new(AtomicUsize::new(0)); + let barrier = Arc::new(Barrier::new(N + 1)); + + let mut waiters = Vec::with_capacity(N); + for _ in 0..N { + let token = db.cancel_token.clone(); + let woke = Arc::clone(&woke); + let barrier = Arc::clone(&barrier); + waiters.push(tokio::spawn(async move { + barrier.wait().await; + token.cancelled().await; + woke.fetch_add(1, Ordering::Relaxed); + })); + } + barrier.wait().await; + + LmdbLifecycle::shutdown(db.as_ref()).await.unwrap(); + + for (i, w) in waiters.into_iter().enumerate() { + timeout(Duration::from_millis(200), w) + .await + .unwrap_or_else(|_| panic!("waiter {i} stranded: cancel_token woke only a subset")) + .unwrap(); + } + assert_eq!(woke.load(Ordering::Relaxed), N); + } +} diff --git a/packages/zaino-state/src/chain_index/finalised_state/db/v0.rs b/packages/zaino-state/src/chain_index/finalised_state/db/v0.rs index 7ee658cf0..668fda30c 100644 --- a/packages/zaino-state/src/chain_index/finalised_state/db/v0.rs +++ b/packages/zaino-state/src/chain_index/finalised_state/db/v0.rs @@ -64,16 +64,16 @@ use zebra_chain::{ parameters::NetworkKind, }; +use super::LmdbLifecycle; + use async_trait::async_trait; use lmdb::{Cursor, Database, DatabaseFlags, Environment, EnvironmentFlags, Transaction}; use prost::Message; use serde::{Deserialize, Serialize}; use std::{fs, sync::Arc, time::Duration}; -use tokio::{ - sync::Notify, - time::{interval, MissedTickBehavior}, -}; -use tracing::{info, warn}; +use tokio::time::interval; +use tokio_util::sync::CancellationToken; +use tracing::info; // ───────────────────────── ZainoDb v0 Capabilities ───────────────────────── @@ -176,51 +176,30 @@ impl DbWrite for DbV0 { impl DbCore for DbV0 { /// Returns the current runtime status published by this backend. fn status(&self) -> StatusType { - self.status.load() + LmdbLifecycle::status(self) } /// Requests shutdown of background tasks and syncs the LMDB environment before returning. - /// - /// Signals the background task via `shutdown_notify` so it wakes out of - /// `zaino_db_handler_sleep` immediately, then awaits its join handle with - /// a 5 s timeout (aborting only if the handle fails to exit in time). async fn shutdown(&self) -> Result<(), FinalisedStateError> { - self.status.store(StatusType::Closing); - // `notify_one` stores a permit if no waiter is currently registered, - // so the task consumes the signal on its next `notified().await` even - // if shutdown fires before the task has entered the select. - // `notify_waiters` would be lost in that window (no stored permit). - self.shutdown_notify.notify_one(); - - let taken = self - .db_handler - .lock() - .expect("db_handler mutex poisoned") - .take(); - if let Some(mut handle) = taken { - let timeout = tokio::time::sleep(Duration::from_secs(5)); - tokio::pin!(timeout); - - tokio::select! { - res = &mut handle => { - match res { - Ok(_) => {} - Err(e) if e.is_cancelled() => {} - Err(e) => warn!("background task ended with error: {e:?}"), - } - } - _ = &mut timeout => { - warn!("background task didn’t exit in time – aborting"); - handle.abort(); - } - } - } + LmdbLifecycle::shutdown(self).await + } +} - let _ = self.clean_trailing().await; - if let Err(e) = self.env.sync(true) { - warn!("LMDB fsync before close failed: {e}"); - } - Ok(()) +impl LmdbLifecycle for DbV0 { + fn env(&self) -> &Arc { + &self.env + } + + fn db_handler_slot(&self) -> &std::sync::Mutex>> { + &self.db_handler + } + + fn cancel_token(&self) -> &CancellationToken { + &self.cancel_token + } + + fn status_atomic(&self) -> &NamedAtomicStatus { + &self.status } } @@ -283,10 +262,11 @@ pub struct DbV0 { /// `Option`; no `.await` happens while it's held. db_handler: std::sync::Mutex>>, - /// Wakes the background task out of `zaino_db_handler_sleep` when shutdown - /// is requested, so it observes `StatusType::Closing` without waiting for - /// the next idle-sleep or maintenance-tick boundary. - shutdown_notify: Arc, + /// Cancels the background task so it observes shutdown without waiting for + /// the next idle-sleep or maintenance-tick boundary. Cloning the token + /// shares cancellation state with every clone, so all background tasks + /// (current and future) wake on a single `cancel()` call. + cancel_token: CancellationToken, /// Backend lifecycle status. status: NamedAtomicStatus, @@ -343,9 +323,9 @@ impl DbV0 { // Open individual LMDB DBs. let heights_to_hashes = - Self::open_or_create_db(&env, "heights_to_hashes", DatabaseFlags::empty()).await?; + super::open_or_create_db(&env, "heights_to_hashes", DatabaseFlags::empty()).await?; let hashes_to_blocks = - Self::open_or_create_db(&env, "hashes_to_blocks", DatabaseFlags::empty()).await?; + super::open_or_create_db(&env, "hashes_to_blocks", DatabaseFlags::empty()).await?; // Create ZainoDB let mut zaino_db = Self { @@ -353,7 +333,7 @@ impl DbV0 { heights_to_hashes, hashes_to_blocks, db_handler: std::sync::Mutex::new(None), - shutdown_notify: Arc::new(Notify::new()), + cancel_token: CancellationToken::new(), status: NamedAtomicStatus::new("ZainoDB", StatusType::Spawning), config: config.clone(), }; @@ -364,27 +344,6 @@ impl DbV0 { Ok(zaino_db) } - /// Returns the current backend status. - pub(crate) fn status(&self) -> StatusType { - self.status.load() - } - - /// Blocks until the backend reports `StatusType::Ready`. - /// - /// This is primarily used during startup sequencing so callers do not issue reads before the - /// backend is ready to serve queries. - pub(crate) async fn wait_until_ready(&self) { - let mut ticker = interval(Duration::from_millis(100)); - ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); - - loop { - ticker.tick().await; - if self.status.load() == StatusType::Ready { - break; - } - } - } - // *** Internal Control Methods *** /// Spawns the background maintenance task. @@ -403,7 +362,7 @@ impl DbV0 { heights_to_hashes: self.heights_to_hashes, hashes_to_blocks: self.hashes_to_blocks, db_handler: std::sync::Mutex::new(None), - shutdown_notify: Arc::clone(&self.shutdown_notify), + cancel_token: self.cancel_token.clone(), status: self.status.clone(), config: self.config.clone(), }; @@ -431,50 +390,6 @@ impl DbV0 { Ok(()) } - /// Helper method to wait for the next loop iteration or perform maintenance. - /// - /// This selects between: - /// - a short sleep (steady-state pacing), and - /// - the maintenance tick (currently reader-slot cleanup). - async fn zaino_db_handler_sleep(&self, maintenance: &mut tokio::time::Interval) { - tokio::select! { - _ = tokio::time::sleep(Duration::from_secs(5)) => {}, - _ = maintenance.tick() => { - if let Err(e) = self.clean_trailing().await { - warn!("clean_trailing failed: {}", e); - } - } - _ = self.shutdown_notify.notified() => {}, - } - } - - /// Clears stale LMDB reader slots by opening and closing a read transaction. - /// - /// LMDB only reclaims reader slots when transactions are closed; this method is a cheap and safe - /// way to encourage reclamation in long-running services. - async fn clean_trailing(&self) -> Result<(), FinalisedStateError> { - let txn = self.env.begin_ro_txn()?; - drop(txn); - Ok(()) - } - - /// Opens an LMDB database if present, otherwise creates it. - /// - /// v0 uses this helper for all tables to make environment creation idempotent across restarts. - async fn open_or_create_db( - env: &Environment, - name: &str, - flags: DatabaseFlags, - ) -> Result { - match env.open_db(Some(name)) { - Ok(db) => Ok(db), - Err(lmdb::Error::NotFound) => env - .create_db(Some(name), flags) - .map_err(FinalisedStateError::LmdbError), - Err(e) => Err(FinalisedStateError::LmdbError(e)), - } - } - // *** DB write / delete methods *** // These should only ever be used in a single DB control task. @@ -547,7 +462,7 @@ impl DbV0 { heights_to_hashes: self.heights_to_hashes, hashes_to_blocks: self.hashes_to_blocks, db_handler: std::sync::Mutex::new(None), - shutdown_notify: Arc::clone(&self.shutdown_notify), + cancel_token: self.cancel_token.clone(), status: self.status.clone(), config: self.config.clone(), }; @@ -656,7 +571,7 @@ impl DbV0 { heights_to_hashes: self.heights_to_hashes, hashes_to_blocks: self.hashes_to_blocks, db_handler: std::sync::Mutex::new(None), - shutdown_notify: Arc::clone(&self.shutdown_notify), + cancel_token: self.cancel_token.clone(), status: self.status.clone(), config: self.config.clone(), }; @@ -707,7 +622,7 @@ impl DbV0 { heights_to_hashes: self.heights_to_hashes, hashes_to_blocks: self.hashes_to_blocks, db_handler: std::sync::Mutex::new(None), - shutdown_notify: Arc::clone(&self.shutdown_notify), + cancel_token: self.cancel_token.clone(), status: self.status.clone(), config: self.config.clone(), }; diff --git a/packages/zaino-state/src/chain_index/finalised_state/db/v1.rs b/packages/zaino-state/src/chain_index/finalised_state/db/v1.rs index a63931bdc..b113ef686 100644 --- a/packages/zaino-state/src/chain_index/finalised_state/db/v1.rs +++ b/packages/zaino-state/src/chain_index/finalised_state/db/v1.rs @@ -54,6 +54,8 @@ use zaino_proto::proto::{compact_formats::CompactBlock, utils::PoolTypeFilter}; use zebra_chain::parameters::NetworkKind; use zebra_state::HashOrHeight; +use super::LmdbLifecycle; + use async_trait::async_trait; use corez::io::{self, Read}; use dashmap::DashSet; @@ -70,10 +72,8 @@ use std::{ }, time::Duration, }; -use tokio::{ - sync::Notify, - time::{interval, MissedTickBehavior}, -}; +use tokio::time::interval; +use tokio_util::sync::CancellationToken; use tracing::{error, info, warn}; #[cfg(feature = "transparent_address_history_experimental")] @@ -147,46 +147,29 @@ pub(crate) const DB_VERSION_V1: DbVersion = DbVersion { #[async_trait] impl DbCore for DbV1 { fn status(&self) -> StatusType { - self.status() + LmdbLifecycle::status(self) } async fn shutdown(&self) -> Result<(), FinalisedStateError> { - self.status.store(StatusType::Closing); - // `notify_one` stores a permit if no waiter is currently registered, - // so the task consumes the signal on its next `notified().await` even - // if shutdown fires before the task has entered the select. - // `notify_waiters` would be lost in that window (no stored permit). - self.shutdown_notify.notify_one(); - - let taken = self - .db_handler - .lock() - .expect("db_handler mutex poisoned") - .take(); - if let Some(mut handle) = taken { - let timeout = tokio::time::sleep(Duration::from_secs(5)); - tokio::pin!(timeout); - - tokio::select! { - res = &mut handle => { - match res { - Ok(_) => {} - Err(e) if e.is_cancelled() => {} - Err(e) => warn!("background task ended with error: {e:?}"), - } - } - _ = &mut timeout => { - warn!("background task didn’t exit in time – aborting"); - handle.abort(); - } - } - } + LmdbLifecycle::shutdown(self).await + } +} - let _ = self.clean_trailing().await; - if let Err(e) = self.env.sync(true) { - warn!("LMDB fsync before close failed: {e}"); - } - Ok(()) +impl LmdbLifecycle for DbV1 { + fn env(&self) -> &Arc { + &self.env + } + + fn db_handler_slot(&self) -> &std::sync::Mutex>> { + &self.db_handler + } + + fn cancel_token(&self) -> &CancellationToken { + &self.cancel_token + } + + fn status_atomic(&self) -> &NamedAtomicStatus { + &self.status } } @@ -276,10 +259,11 @@ pub(crate) struct DbV1 { /// `Option`; no `.await` happens while it's held. db_handler: std::sync::Mutex>>, - /// Wakes the background task out of `zaino_db_handler_sleep` when shutdown - /// is requested, so it observes `StatusType::Closing` without waiting for - /// the next idle-sleep or maintenance-tick boundary. - shutdown_notify: Arc, + /// Cancels the background task so it observes shutdown without waiting for + /// the next idle-sleep or maintenance-tick boundary. Cloning the token + /// shares cancellation state with every clone, so all background tasks + /// (current and future) wake on a single `cancel()` call. + cancel_token: CancellationToken, /// ZainoDB status. status: NamedAtomicStatus, @@ -341,20 +325,20 @@ impl DbV1 { // Open individual LMDB DBs. let headers = - Self::open_or_create_db(&env, "headers_1_0_0", DatabaseFlags::empty()).await?; - let txids = Self::open_or_create_db(&env, "txids_1_0_0", DatabaseFlags::empty()).await?; + super::open_or_create_db(&env, "headers_1_0_0", DatabaseFlags::empty()).await?; + let txids = super::open_or_create_db(&env, "txids_1_0_0", DatabaseFlags::empty()).await?; let transparent = - Self::open_or_create_db(&env, "transparent_1_0_0", DatabaseFlags::empty()).await?; + super::open_or_create_db(&env, "transparent_1_0_0", DatabaseFlags::empty()).await?; let sapling = - Self::open_or_create_db(&env, "sapling_1_0_0", DatabaseFlags::empty()).await?; + super::open_or_create_db(&env, "sapling_1_0_0", DatabaseFlags::empty()).await?; let orchard = - Self::open_or_create_db(&env, "orchard_1_0_0", DatabaseFlags::empty()).await?; + super::open_or_create_db(&env, "orchard_1_0_0", DatabaseFlags::empty()).await?; let commitment_tree_data = - Self::open_or_create_db(&env, "commitment_tree_data_1_0_0", DatabaseFlags::empty()) + super::open_or_create_db(&env, "commitment_tree_data_1_0_0", DatabaseFlags::empty()) .await?; - let hashes = Self::open_or_create_db(&env, "hashes_1_0_0", DatabaseFlags::empty()).await?; + let hashes = super::open_or_create_db(&env, "hashes_1_0_0", DatabaseFlags::empty()).await?; - let metadata = Self::open_or_create_db(&env, "metadata", DatabaseFlags::empty()).await?; + let metadata = super::open_or_create_db(&env, "metadata", DatabaseFlags::empty()).await?; // Create the DbV1 instance. We declare the variable in the outer scope and // initialise it in the two cfg arms so `zaino_db` is available afterwards. @@ -363,9 +347,9 @@ impl DbV1 { #[cfg(feature = "transparent_address_history_experimental")] { let spent = - Self::open_or_create_db(&env, "spent_1_0_0", DatabaseFlags::empty()).await?; + super::open_or_create_db(&env, "spent_1_0_0", DatabaseFlags::empty()).await?; - let address_history = Self::open_or_create_db( + let address_history = super::open_or_create_db( &env, "address_history_1_0_0", DatabaseFlags::DUP_SORT | DatabaseFlags::DUP_FIXED, @@ -387,7 +371,7 @@ impl DbV1 { validated_tip: Arc::new(AtomicU32::new(0)), validated_set: DashSet::new(), db_handler: std::sync::Mutex::new(None), - shutdown_notify: Arc::new(Notify::new()), + cancel_token: CancellationToken::new(), status: NamedAtomicStatus::new("ZainoDB", StatusType::Spawning), config: config.clone(), }; @@ -408,7 +392,7 @@ impl DbV1 { validated_tip: Arc::new(AtomicU32::new(0)), validated_set: DashSet::new(), db_handler: std::sync::Mutex::new(None), - shutdown_notify: Arc::new(Notify::new()), + cancel_token: CancellationToken::new(), status: NamedAtomicStatus::new("ZainoDB", StatusType::Spawning), config: config.clone(), }; @@ -423,28 +407,6 @@ impl DbV1 { Ok(zaino_db) } - /// Returns the status of ZainoDB. - pub(crate) fn status(&self) -> StatusType { - self.status.load() - } - - /// Waits until the DB reaches [`StatusType::Ready`]. - /// - /// NOTE: This does not currently backpressure on LMDB reader availability. - /// - /// TODO: check db for free readers and wait if busy. - pub(crate) async fn wait_until_ready(&self) { - let mut ticker = interval(Duration::from_millis(100)); - ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); - - loop { - ticker.tick().await; - if self.status.load() == StatusType::Ready { - break; - } - } - } - // *** Internal Control Methods *** /// Spawns the background validator / maintenance task. @@ -473,7 +435,7 @@ impl DbV1 { validated_tip: Arc::clone(&self.validated_tip), validated_set: self.validated_set.clone(), db_handler: std::sync::Mutex::new(None), - shutdown_notify: Arc::clone(&self.shutdown_notify), + cancel_token: self.cancel_token.clone(), status: self.status.clone(), config: self.config.clone(), }; @@ -573,19 +535,6 @@ impl DbV1 { Ok(()) } - /// Helper method to wait for the next loop iteration or perform maintenance. - async fn zaino_db_handler_sleep(&self, maintenance: &mut tokio::time::Interval) { - tokio::select! { - _ = tokio::time::sleep(Duration::from_secs(5)) => {}, - _ = maintenance.tick() => { - if let Err(e) = self.clean_trailing().await { - warn!("clean_trailing failed: {}", e); - } - } - _ = self.shutdown_notify.notified() => {}, - } - } - /// Validates every stored spent-outpoint entry (`Outpoint` -> `TxLocation`) by checksum. #[cfg(feature = "transparent_address_history_experimental")] async fn initial_spent_scan(&self) -> Result<(), FinalisedStateError> { @@ -662,7 +611,7 @@ impl DbV1 { validated_tip: Arc::clone(&self.validated_tip), validated_set: self.validated_set.clone(), db_handler: std::sync::Mutex::new(None), - shutdown_notify: Arc::clone(&self.shutdown_notify), + cancel_token: self.cancel_token.clone(), status: self.status.clone(), config: self.config.clone(), }; @@ -685,28 +634,6 @@ impl DbV1 { .await .map_err(|e| FinalisedStateError::Custom(format!("spawn_blocking failed: {e}")))? } - - /// Clears stale reader slots by opening and closing a read transaction. - async fn clean_trailing(&self) -> Result<(), FinalisedStateError> { - let txn = self.env.begin_ro_txn()?; - drop(txn); - Ok(()) - } - - /// Opens an lmdb database if present else creates a new one. - async fn open_or_create_db( - env: &Environment, - name: &str, - flags: DatabaseFlags, - ) -> Result { - match env.open_db(Some(name)) { - Ok(db) => Ok(db), - Err(lmdb::Error::NotFound) => env - .create_db(Some(name), flags) - .map_err(FinalisedStateError::LmdbError), - Err(e) => Err(FinalisedStateError::LmdbError(e)), - } - } } impl Drop for DbV1 { diff --git a/packages/zaino-state/src/chain_index/finalised_state/db/v1/compact_block.rs b/packages/zaino-state/src/chain_index/finalised_state/db/v1/compact_block.rs index 43a8581c1..75904bcd0 100644 --- a/packages/zaino-state/src/chain_index/finalised_state/db/v1/compact_block.rs +++ b/packages/zaino-state/src/chain_index/finalised_state/db/v1/compact_block.rs @@ -339,7 +339,7 @@ impl DbV1 { validated_tip: Arc::clone(&self.validated_tip), validated_set: self.validated_set.clone(), db_handler: std::sync::Mutex::new(None), - shutdown_notify: std::sync::Arc::clone(&self.shutdown_notify), + cancel_token: self.cancel_token.clone(), status: self.status.clone(), config: self.config.clone(), }; diff --git a/packages/zaino-state/src/chain_index/finalised_state/db/v1/write_core.rs b/packages/zaino-state/src/chain_index/finalised_state/db/v1/write_core.rs index 7f943825f..5363a3d0b 100644 --- a/packages/zaino-state/src/chain_index/finalised_state/db/v1/write_core.rs +++ b/packages/zaino-state/src/chain_index/finalised_state/db/v1/write_core.rs @@ -286,7 +286,7 @@ impl DbV1 { validated_tip: Arc::clone(&self.validated_tip), validated_set: self.validated_set.clone(), db_handler: std::sync::Mutex::new(None), - shutdown_notify: std::sync::Arc::clone(&self.shutdown_notify), + cancel_token: self.cancel_token.clone(), status: self.status.clone(), config: self.config.clone(), }; @@ -849,7 +849,7 @@ impl DbV1 { validated_tip: Arc::clone(&self.validated_tip), validated_set: self.validated_set.clone(), db_handler: std::sync::Mutex::new(None), - shutdown_notify: std::sync::Arc::clone(&self.shutdown_notify), + cancel_token: self.cancel_token.clone(), status: self.status.clone(), config: self.config.clone(), };