Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 56 additions & 13 deletions packages/zaino-state/src/chain_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,8 +512,9 @@ pub struct NodeBackedChainIndex<Source: BlockchainSource = ValidatorConnector> {
/// [`SyncTimings::default()`] produces production values (500 ms inter-iteration
/// sleep, 250 ms initial backoff doubling up to 8 s, 10 consecutive failures
/// before escalating to [`StatusType::CriticalError`] — ~40 s total window).
/// [`SyncTimings::fast()`] shrinks each duration by 10× so backoff-dependent
/// unit tests finish in ~4 s instead of ~40 s.
/// [`SyncTimings::fast()`] is the test-only shrink: ~120 ms total backoff
/// window with the same three observable behaviours (counter→threshold,
/// exponential doubling, cap clamp) — see its doc for the chosen values.
#[derive(Clone, Copy, Debug)]
pub(crate) struct SyncTimings {
pub(crate) interval: Duration,
Expand All @@ -535,13 +536,18 @@ impl Default for SyncTimings {

#[cfg(test)]
impl SyncTimings {
/// 10× faster than [`Self::default`] — test-only.
/// Test-only shrink: drives the backoff schedule to completion in
/// ~120 ms (4 sleeps: 10 + 20 + 40 + 50, with the 50 ms cap clamping
/// the would-be 80 ms doubling). Five `max_consecutive_failures` is
/// the smallest count that still exercises both exponential doubling
/// and `max_backoff` clamping — i.e. the same three observable
/// behaviours `default()` does, just compressed.
pub(crate) const fn fast() -> Self {
Self {
interval: Duration::from_millis(50),
initial_backoff: Duration::from_millis(25),
max_backoff: Duration::from_millis(800),
max_consecutive_failures: 10,
initial_backoff: Duration::from_millis(10),
max_backoff: Duration::from_millis(50),
max_consecutive_failures: 5,
}
}

Expand Down Expand Up @@ -650,6 +656,7 @@ impl<Source: BlockchainSource> NodeBackedChainIndex<Source> {
tokio::task::spawn(async move {
let status = status.clone();
let source = source.clone();
let mut change_rx = source.change_subscribe();
let mut consecutive_failures: u32 = 0;
let mut current_backoff = timings.initial_backoff;

Expand Down Expand Up @@ -689,7 +696,7 @@ impl<Source: BlockchainSource> NodeBackedChainIndex<Source> {
None => {
nfs.store(Some(Arc::new(
NonFinalizedState::initialize(
source,
source.clone(),
network,
fs.to_reader()
.get_chain_block_by_height(finalised_height)
Expand All @@ -716,7 +723,7 @@ impl<Source: BlockchainSource> NodeBackedChainIndex<Source> {
consecutive_failures = 0;
current_backoff = timings.initial_backoff;
status.store(StatusType::Ready);
tokio::time::sleep(timings.interval).await;
source::wait_or_source_change(change_rx.as_mut(), timings.interval).await;
}
Err(e) => {
consecutive_failures += 1;
Expand Down Expand Up @@ -776,6 +783,24 @@ impl<Source: BlockchainSource> NodeBackedChainIndexSubscriber<Source> {
combined_status
}

/// Returns a [`tokio::sync::watch::Receiver`] that wakes on every
/// transition of the chain-index sync loop's status.
///
/// Used by tests to await a specific state (e.g. first transition to
/// [`StatusType::Ready`]) without busy-polling.
#[cfg(test)]
pub(crate) fn status_subscribe(&self) -> tokio::sync::watch::Receiver<StatusType> {
self.status.subscribe()
}

/// Returns a watch receiver tracking the mempool serve loop's most
/// recently observed chain tip. Forwarder for
/// [`mempool::MempoolSubscriber::mempool_tip`].
#[cfg(test)]
pub(crate) fn mempool_tip(&self) -> tokio::sync::watch::Receiver<crate::BlockHash> {
self.mempool.mempool_tip()
}

async fn get_fullblock_bytes_from_node(
&self,
id: HashOrHeight,
Expand Down Expand Up @@ -1761,13 +1786,31 @@ impl<Source: BlockchainSource> ChainIndex for NodeBackedChainIndexSubscriber<Sou
},
None => None,
};
let expected_chain_tip = non_finalized_snapshot.map(|snapshot| snapshot.best_tip.hash);

// Stale-snapshot check: compare the caller's snapshot against the
// chain-index's *current* non-finalized tip (the same source of
// truth as `snapshot_nonfinalized_state`), not against the mempool
// serve loop's `mempool_chain_tip`. The mempool polls on its own
// cadence, so its tip view can diverge from the chain-index's by
// up to one poll cycle (#1037); using it here means the API can
// accept a stale snapshot whenever the mempool happens to be
// lagging, and reject a freshly-issued snapshot whenever the
// mempool happens to be ahead — both are caller-visible internal
// contradictions. The chain-index is authoritative.
if let Some(expected) = non_finalized_snapshot {
match self.non_finalized_state.load().as_ref() {
Some(current) => {
if expected.best_tip.hash != current.get_snapshot().best_tip.hash {
return None;
}
}
None => return None,
}
}

let mut subscriber = self.mempool.clone();

match subscriber
.get_mempool_stream(expected_chain_tip)
.now_or_never()
{
match subscriber.get_mempool_stream(None).now_or_never() {
Some(Ok((in_rx, _handle))) => {
let (out_tx, out_rx) =
tokio::sync::mpsc::channel::<Result<Vec<u8>, ChainIndexError>>(32);
Expand Down
192 changes: 108 additions & 84 deletions packages/zaino-state/src/chain_index/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{collections::HashSet, sync::Arc};
use crate::{
broadcast::{Broadcast, BroadcastSubscriber},
chain_index::{
source::{BlockchainSource, BlockchainSourceError},
source::{wait_or_source_change, BlockchainSource, BlockchainSourceError},
types::db::metadata::MempoolInfo,
},
error::{MempoolError, StatusError},
Expand Down Expand Up @@ -53,41 +53,95 @@ pub struct Mempool<T: BlockchainSource> {
status: NamedAtomicStatus,
}

/// Fetches the current mempool snapshot (all txids paired with their
/// serialized transactions) from a blockchain source.
async fn fetch_mempool_snapshot<T: BlockchainSource>(
fetcher: &T,
) -> Result<Vec<(MempoolKey, MempoolValue)>, MempoolError> {
let txids = fetcher.get_mempool_txids().await?.ok_or_else(|| {
MempoolError::BlockchainSourceError(BlockchainSourceError::Unrecoverable(
"could not fetch mempool data: mempool txid list was None".to_string(),
))
})?;

let mut transactions = Vec::with_capacity(txids.len());
for txid in txids {
let (transaction, _location) =
fetcher
.get_transaction(txid.0.into())
.await?
.ok_or_else(|| {
MempoolError::BlockchainSourceError(BlockchainSourceError::Unrecoverable(
format!(
"could not fetch mempool data: transaction not found for txid {txid}"
),
))
})?;

transactions.push((
MempoolKey {
txid: txid.to_string(),
},
MempoolValue {
serialized_tx: Arc::new(transaction.into()),
},
));
}

Ok(transactions)
}

impl<T: BlockchainSource> Mempool<T> {
/// Spawns a new [`Mempool`].
#[instrument(name = "Mempool::spawn", skip(fetcher, capacity_and_shard_amount))]
pub async fn spawn(
fetcher: T,
capacity_and_shard_amount: Option<(usize, usize)>,
) -> Result<Self, MempoolError> {
// Wait for mempool in validator to come online.
loop {
match fetcher.get_mempool_txids().await {
Ok(_) => {
break;
}
Err(_) => {
info!("Waiting for Validator mempool to come online");
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
// Run the three independent startup probes concurrently:
// (1) wait for the validator mempool to come online (retry loop),
// (2) fetch the initial best block hash (fatal if unavailable),
// (3) fetch the initial mempool snapshot (retry on transient
// fetch errors).
// None of the three depend on each other; serial ordering added
// two source round-trips to every indexer init for no semantic
// reason. `try_join!` cancels the retry loops if (2) returns a
// hard error.
let wait_for_mempool_online = async {
loop {
match fetcher.get_mempool_txids().await {
Ok(_) => return Ok::<(), MempoolError>(()),
Err(_) => {
info!("Waiting for Validator mempool to come online");
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
}
}
}
}

let best_block_hash: BlockHash = match fetcher.get_best_block_hash().await {
Ok(block_hash_opt) => match block_hash_opt {
Some(hash) => hash.into(),
None => {
return Err(MempoolError::Critical(
"Error in mempool: Error connecting with validator".to_string(),
))
}
},
Err(_e) => {
return Err(MempoolError::Critical(
};
let fetch_best_block_hash = async {
match fetcher.get_best_block_hash().await {
Ok(Some(hash)) => Ok(BlockHash::from(hash)),
Ok(None) | Err(_) => Err(MempoolError::Critical(
"Error in mempool: Error connecting with validator".to_string(),
))
)),
}
};
let fetch_initial_snapshot = async {
loop {
match fetch_mempool_snapshot(&fetcher).await {
Ok(txs) => return Ok::<_, MempoolError>(txs),
Err(e) => {
warn!("{e}");
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
}
}
};
let (_, best_block_hash, initial_transactions) = tokio::try_join!(
wait_for_mempool_online,
fetch_best_block_hash,
fetch_initial_snapshot,
)?;

let (chain_tip_sender, _chain_tip_reciever) = tokio::sync::watch::channel(best_block_hash);

Expand All @@ -102,26 +156,11 @@ impl<T: BlockchainSource> Mempool<T> {
},
mempool_chain_tip: chain_tip_sender,
sync_task_handle: None,
status: NamedAtomicStatus::new("Mempool", StatusType::Spawning),
status: NamedAtomicStatus::new("Mempool", StatusType::Ready),
};

loop {
match mempool.get_mempool_transactions().await {
Ok(mempool_transactions) => {
mempool.status.store(StatusType::Ready);
mempool
.state
.insert_filtered_set(mempool_transactions, mempool.status.load());
break;
}
Err(e) => {
mempool.state.notify(mempool.status.load());
warn!("{e}");
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
continue;
}
};
}
mempool
.state
.insert_filtered_set(initial_transactions, mempool.status.load());

mempool.sync_task_handle = Some(std::sync::Mutex::new(mempool.serve().await?));

Expand All @@ -142,6 +181,7 @@ impl<T: BlockchainSource> Mempool<T> {
status.store(StatusType::Spawning);

let sync_handle = tokio::spawn(async move {
let mut change_rx = mempool.fetcher.change_subscribe();
let mut best_block_hash: Hash;
let mut check_block_hash: Hash;

Expand Down Expand Up @@ -207,11 +247,20 @@ impl<T: BlockchainSource> Mempool<T> {
.send_replace(check_block_hash.into());
best_block_hash = check_block_hash;

tokio::time::sleep(std::time::Duration::from_millis(100)).await;
// Cool-down before we re-fetch mempool state at the new
// tip — gives the validator a moment to settle. Stays
// interruptible: a follow-on `mine_blocks` must wake us
// immediately, otherwise the next iteration of the test
// sees a stale tip until the timer expires (#1037).
wait_or_source_change(
change_rx.as_mut(),
std::time::Duration::from_millis(100),
)
.await;
continue;
}

match mempool.get_mempool_transactions().await {
match fetch_mempool_snapshot(&mempool.fetcher).await {
Ok(mempool_transactions) => {
status.store(StatusType::Ready);
state.insert_filtered_set(mempool_transactions, status.load());
Expand All @@ -230,51 +279,18 @@ impl<T: BlockchainSource> Mempool<T> {
return;
}

tokio::time::sleep(std::time::Duration::from_millis(100)).await;
// Wait the cadence interval, but wake immediately if the
// source signals new state — keeps mempool↔chain-index
// tip propagation aligned within a single iteration on
// push-capable sources (#1037 part 2/2).
wait_or_source_change(change_rx.as_mut(), std::time::Duration::from_millis(100))
.await;
}
});

Ok(sync_handle)
}

/// Returns all transactions in the mempool.
async fn get_mempool_transactions(
&self,
) -> Result<Vec<(MempoolKey, MempoolValue)>, MempoolError> {
let mut transactions = Vec::new();

let txids = self.fetcher.get_mempool_txids().await?.ok_or_else(|| {
MempoolError::BlockchainSourceError(BlockchainSourceError::Unrecoverable(
"could not fetch mempool data: mempool txid list was None".to_string(),
))
})?;

for txid in txids {
let (transaction, _location) = self
.fetcher
.get_transaction(txid.0.into())
.await?
.ok_or_else(|| {
MempoolError::BlockchainSourceError(
crate::chain_index::source::BlockchainSourceError::Unrecoverable(format!(
"could not fetch mempool data: transaction not found for txid {txid}"
)),
)
})?;

transactions.push((
MempoolKey {
txid: txid.to_string(),
},
MempoolValue {
serialized_tx: Arc::new(transaction.into()),
},
));
}

Ok(transactions)
}

/// Returns a [`MempoolSubscriber`].
pub fn subscriber(&self) -> MempoolSubscriber {
MempoolSubscriber {
Expand Down Expand Up @@ -365,6 +381,14 @@ pub struct MempoolSubscriber {
}

impl MempoolSubscriber {
/// Returns a clone of the watch receiver tracking the mempool's most
/// recently observed chain tip. Used by tests to detect the mempool↔
/// chain-index tip skew documented in #1037 without busy-polling.
#[cfg(test)]
pub(crate) fn mempool_tip(&self) -> tokio::sync::watch::Receiver<BlockHash> {
self.mempool_chain_tip.clone()
}

/// Returns all tx currently in the mempool.
pub async fn get_mempool(&self) -> Vec<(MempoolKey, MempoolValue)> {
self.subscriber.get_filtered_state(&HashSet::new())
Expand Down
Loading
Loading