From bdfcd46ccf25144a036f714bd0f60f56a5deabdb Mon Sep 17 00:00:00 2001 From: zancas Date: Fri, 24 Apr 2026 17:31:02 -0700 Subject: [PATCH 01/12] =?UTF-8?q?=E2=97=8F=20refactor(status):=20back=20Na?= =?UTF-8?q?medAtomicStatus=20with=20tokio::sync::watch?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Subscribe to the chain-index sync loop's `NamedAtomicStatus` via `tokio::sync::watch` so test setup wakes on the first `Ready` transition instead of after the next 2 s polling tick. A single `sleep(sync_timings.interval)` after rendezvous preserves the load-bearing teardown alignment at ~500 ms (down from 2 s); that shim disappears when the indexer gains `CancellationToken`-based shutdown. `wait_for_indexer_tip` is rewired against the same watch with a `tokio::time::timeout` budget. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/zaino-state/src/chain_index.rs | 10 +++ packages/zaino-state/src/chain_index/tests.rs | 56 ++++++++-------- .../src/chain_index/tests/mockchain_tests.rs | 66 +++++++++++-------- packages/zaino-state/src/status.rs | 31 ++++++--- 4 files changed, 97 insertions(+), 66 deletions(-) diff --git a/packages/zaino-state/src/chain_index.rs b/packages/zaino-state/src/chain_index.rs index 92597fc80..4ab7ed07c 100644 --- a/packages/zaino-state/src/chain_index.rs +++ b/packages/zaino-state/src/chain_index.rs @@ -776,6 +776,16 @@ impl NodeBackedChainIndexSubscriber { combined_status } + /// Returns a [`tokio::sync::watch::Receiver`] that wakes on every + /// transition of the chain-index sync loop's status. + /// + /// Used by tests to await a specific state (e.g. first transition to + /// [`StatusType::Ready`]) without busy-polling. + #[cfg(test)] + pub(crate) fn status_subscribe(&self) -> tokio::sync::watch::Receiver { + self.status.subscribe() + } + async fn get_fullblock_bytes_from_node( &self, id: HashOrHeight, diff --git a/packages/zaino-state/src/chain_index/tests.rs b/packages/zaino-state/src/chain_index/tests.rs index ec9ca5ce1..81a0afac2 100644 --- a/packages/zaino-state/src/chain_index/tests.rs +++ b/packages/zaino-state/src/chain_index/tests.rs @@ -23,7 +23,6 @@ pub(crate) fn init_tracing() { use std::path::PathBuf; use tempfile::TempDir; -use tokio::time::Duration; use zaino_common::{network::ActivationHeights, DatabaseConfig, Network, StorageConfig}; use crate::{ @@ -45,21 +44,7 @@ async fn load_test_vectors_and_sync_chain_index( NodeBackedChainIndexSubscriber, MockchainSource, ) { - // The 2 s poll interval here is load-bearing for other tests: most - // callers (mockchain_tests, mempool, poll, proptest_blockgen) drop the - // indexer without calling `shutdown()`, relying on the background sync - // loop being in its post-success `interval` sleep at teardown to avoid - // racing with runtime shutdown. Shorter polling lets the test body - // return before that settle point and exposes the latent race. Tests - // that need faster setup should use - // `load_test_vectors_and_sync_chain_index_with_timings` and handle - // their own teardown. - load_with_settings( - active_mockchain_source, - SyncTimings::default(), - Duration::from_secs(2), - ) - .await + load_with_settings(active_mockchain_source, SyncTimings::default()).await } async fn load_test_vectors_and_sync_chain_index_with_timings( @@ -71,18 +56,12 @@ async fn load_test_vectors_and_sync_chain_index_with_timings( NodeBackedChainIndexSubscriber, MockchainSource, ) { - load_with_settings( - active_mockchain_source, - sync_timings, - Duration::from_millis(25), - ) - .await + load_with_settings(active_mockchain_source, sync_timings).await } async fn load_with_settings( active_mockchain_source: bool, sync_timings: SyncTimings, - setup_poll_interval: Duration, ) -> ( Vec, NodeBackedChainIndex, @@ -119,18 +98,35 @@ async fn load_with_settings( .unwrap(); let index_reader = indexer.subscriber(); + let check_height = crate::Height(match active_mockchain_source { + true => source.active_height() - 100, + false => 100, + }); + let mut status = index_reader.status_subscribe(); loop { - let check_height: u32 = match active_mockchain_source { - true => source.active_height() - 100, - false => 100, - }; - if index_reader.finalized_state.db_height().await.unwrap() - == Some(crate::Height(check_height)) + if *status.borrow_and_update() == crate::StatusType::Ready + && index_reader.finalized_state.db_height().await.unwrap() == Some(check_height) { break; } - tokio::time::sleep(setup_poll_interval).await; + status + .changed() + .await + .expect("ChainIndex status sender dropped before reaching Ready"); } + // Most callers (mockchain_tests, mempool, poll, proptest_blockgen) drop + // the indexer without `shutdown()`, relying on the sync loop being + // parked in its `tokio::time::sleep(sync_timings.interval)` (see + // chain_index.rs sync loop) at teardown so runtime shutdown doesn't + // race a mid-iteration DB write. The watch-based rendezvous above + // wakes one statement *before* that sleep is polled, so under the + // multi-thread runtime there is a sub-millisecond window where the + // sync loop has not yet yielded. One full `interval` here lets the + // sync loop reach the sleep statement and stay there. Removing this + // requires explicit indexer shutdown (`CancellationToken`-based, + // tracked separately). + tokio::time::sleep(sync_timings.interval).await; + (blocks, indexer, index_reader, source) } diff --git a/packages/zaino-state/src/chain_index/tests/mockchain_tests.rs b/packages/zaino-state/src/chain_index/tests/mockchain_tests.rs index 88fe92376..2160ed859 100644 --- a/packages/zaino-state/src/chain_index/tests/mockchain_tests.rs +++ b/packages/zaino-state/src/chain_index/tests/mockchain_tests.rs @@ -1,43 +1,53 @@ use super::load_test_vectors_and_sync_chain_index; -use tokio::time::{sleep, Duration}; +use tokio::time::{sleep, timeout, Duration}; use tokio_stream::StreamExt as _; use zebra_chain::serialization::ZcashDeserializeInto; -use crate::chain_index::{ - source::test::MockchainSource, - tests::{poll::poll_until, vectors::TestVectorBlockData}, - types::{BestChainLocation, TransactionHash}, - ChainIndex, NodeBackedChainIndexSubscriber, +use crate::{ + chain_index::{ + source::test::MockchainSource, + tests::vectors::TestVectorBlockData, + types::{BestChainLocation, TransactionHash}, + ChainIndex, NodeBackedChainIndexSubscriber, + }, + StatusType, }; -/// Polls the indexer's nonfinalized-state snapshot until its best-tip height -/// equals `expected`, or panics after a 10 s budget. +/// Waits until the indexer's nonfinalized-state best-tip height equals +/// `expected`, or panics after a 10 s budget. /// -/// Use this wherever a test previously relied on a fixed `sleep` to hope the -/// indexer's sync task had caught up with the mockchain tip: the indexer -/// publishes new tips asynchronously via its background loop, and under -/// full-suite parallel load those updates can lag well past 2 s. +/// Wakes on every transition of the chain-index sync loop's status (via +/// [`NodeBackedChainIndexSubscriber::status_subscribe`]) and re-checks the +/// snapshot. The sync loop transitions to [`StatusType::Ready`] only after a +/// successful iteration has updated the tip, so each Ready transition is the +/// earliest moment the test can observe a new tip — no fixed-cadence polling +/// required. async fn wait_for_indexer_tip( index_reader: &NodeBackedChainIndexSubscriber, expected: u32, ) { - poll_until( - "indexer tip to match expected height", - Duration::from_secs(10), - Duration::from_millis(25), - || async { - let tip = index_reader - .snapshot_nonfinalized_state() + let mut status = index_reader.status_subscribe(); + let work = async { + loop { + if *status.borrow_and_update() == StatusType::Ready { + let tip = index_reader + .snapshot_nonfinalized_state() + .await + .ok() + .and_then(|s| s.get_nfs_snapshot().map(|n| n.best_tip.height.0)); + if tip == Some(expected) { + return; + } + } + status + .changed() .await - .ok()? - .get_nfs_snapshot()? - .best_tip - .height - .0; - (tip == expected).then_some(()) - }, - ) - .await; + .expect("ChainIndex status sender dropped before reaching expected tip"); + } + }; + timeout(Duration::from_secs(10), work) + .await + .unwrap_or_else(|_| panic!("indexer tip never reached {expected} within 10 s")); } #[tokio::test(flavor = "multi_thread")] diff --git a/packages/zaino-state/src/status.rs b/packages/zaino-state/src/status.rs index 1afe177e9..ec902f62d 100644 --- a/packages/zaino-state/src/status.rs +++ b/packages/zaino-state/src/status.rs @@ -1,13 +1,15 @@ //! Thread-safe status wrapper. //! //! This module provides [`AtomicStatus`], a thread-safe wrapper for [`StatusType`], -//! and [`NamedAtomicStatus`], a variant that logs status transitions. +//! and [`NamedAtomicStatus`], a variant that logs status transitions and supports +//! awaiting transitions via [`NamedAtomicStatus::subscribe`]. use std::sync::{ atomic::{AtomicUsize, Ordering}, Arc, }; +use tokio::sync::watch; use tracing::debug; pub use zaino_common::status::{Status, StatusType}; @@ -45,21 +47,23 @@ impl Status for AtomicStatus { /// Thread-safe status wrapper with component name for observability. /// -/// Unlike [`AtomicStatus`], this variant logs all status transitions, -/// making it easier to trace component lifecycle during debugging. +/// Backed by a [`tokio::sync::watch`] channel: every transition is logged and +/// every subscriber wakes via [`watch::Receiver::changed`]. Identical-value +/// stores are no-ops — neither logged nor broadcast. #[derive(Debug, Clone)] pub struct NamedAtomicStatus { name: &'static str, - inner: Arc, + inner: Arc>, } impl NamedAtomicStatus { /// Creates a new NamedAtomicStatus with the given component name and initial status. pub fn new(name: &'static str, status: StatusType) -> Self { debug!(component = name, status = %status, "[STATUS] initial"); + let (tx, _rx) = watch::channel(status); Self { name, - inner: Arc::new(AtomicUsize::new(status.into())), + inner: Arc::new(tx), } } @@ -70,10 +74,11 @@ impl NamedAtomicStatus { /// Loads the value held in the NamedAtomicStatus. pub fn load(&self) -> StatusType { - StatusType::from(self.inner.load(Ordering::SeqCst)) + *self.inner.borrow() } - /// Sets the value held in the NamedAtomicStatus, logging the transition. + /// Sets the value held in the NamedAtomicStatus, logging and broadcasting + /// the transition. Storing the current value is a no-op. pub fn store(&self, status: StatusType) { let old = self.load(); if old != status { @@ -83,8 +88,18 @@ impl NamedAtomicStatus { to = %status, "[STATUS] transition" ); + self.inner.send_replace(status); } - self.inner.store(status.into(), Ordering::SeqCst); + } + + /// Returns a [`watch::Receiver`] that observes every status transition. + /// + /// Use this to wait for a specific state with + /// [`watch::Receiver::changed`] / [`watch::Receiver::borrow_and_update`] + /// instead of busy-polling [`Self::load`]. + #[cfg(test)] + pub(crate) fn subscribe(&self) -> watch::Receiver { + self.inner.subscribe() } } From 8ac07ec8377f739166171b903bec3927e4ace374 Mon Sep 17 00:00:00 2001 From: zancas Date: Fri, 24 Apr 2026 18:12:48 -0700 Subject: [PATCH 02/12] =?UTF-8?q?=E2=97=8F=20refactor(zaino-state):=20wake?= =?UTF-8?q?=20chain-index=20sync=20loop=20on=20source=20state=20changes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add `BlockchainSource::wait_for_change` (default returns a never-resolving future), have the success-path interval sleep `select!` between the timer and that future. MockchainSource overrides `wait_for_change` with a `Notify` pinged from `mine_blocks`, so the chain-index sync loop wakes immediately on a mocked block instead of after the next 500 ms tick. Per-block test latency drops from ~510 ms (one full `interval`) to ~10 ms (sync work only). `sync_blocks_after_startup` ~12 s → ~1–2 s, hitting #1039's "under 2 s" target. Real backends keep the default and pace on the timer unchanged. `NonFinalizedState::initialize` now takes `source.clone()` so the outer `source` survives the inner async block's `.await` and is reachable in the new `select!` arm. Known regression: `get_mempool_stream_for_stale_snapshot` flips from a probabilistic flake (#1037) to a deterministic failure. The chain-index sync loop now wakes immediately on `mine_blocks`, but the mempool serve loop still polls on its 100 ms cadence, so the chain-index/mempool tip-skew window is now always observable. The principled fix (per-subscriber notify + select! in the mempool loop) is the work tracked under #1037 and follows in a TDD'd PR. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/zaino-state/src/chain_index.rs | 11 +++++- .../zaino-state/src/chain_index/source.rs | 37 ++++++++++++++++--- 2 files changed, 40 insertions(+), 8 deletions(-) diff --git a/packages/zaino-state/src/chain_index.rs b/packages/zaino-state/src/chain_index.rs index 4ab7ed07c..edb2b9910 100644 --- a/packages/zaino-state/src/chain_index.rs +++ b/packages/zaino-state/src/chain_index.rs @@ -689,7 +689,7 @@ impl NodeBackedChainIndex { None => { nfs.store(Some(Arc::new( NonFinalizedState::initialize( - source, + source.clone(), network, fs.to_reader() .get_chain_block_by_height(finalised_height) @@ -716,7 +716,14 @@ impl NodeBackedChainIndex { consecutive_failures = 0; current_backoff = timings.initial_backoff; status.store(StatusType::Ready); - tokio::time::sleep(timings.interval).await; + // Wait `interval`, but wake immediately if the source + // signals new state (push-capable sources only — the + // default `wait_for_change` never fires, so poll-only + // sources still pace themselves on the timer). + tokio::select! { + _ = tokio::time::sleep(timings.interval) => {} + _ = source.wait_for_change() => {} + } } Err(e) => { consecutive_failures += 1; diff --git a/packages/zaino-state/src/chain_index/source.rs b/packages/zaino-state/src/chain_index/source.rs index 4b470f8b2..5a5236286 100644 --- a/packages/zaino-state/src/chain_index/source.rs +++ b/packages/zaino-state/src/chain_index/source.rs @@ -100,6 +100,18 @@ pub trait BlockchainSource: Clone + Send + Sync + 'static { start_index: u16, max_entries: Option, ) -> BlockchainSourceResult>; + + /// Future that resolves when the source's observable state may have + /// changed (e.g. a new block was added). Sync loops can `select!` between + /// this and a fixed-cadence timer to drop per-iteration latency on + /// push-capable sources without losing the timer for poll-only sources. + /// + /// The default never resolves — sync loops fall through to their timer. + /// Push-capable sources (test mockchains) override this to wake the sync + /// loop on relevant state changes. + async fn wait_for_change(&self) { + std::future::pending::<()>().await + } } /// An error originating from a blockchain source. @@ -825,6 +837,7 @@ pub(crate) mod test { atomic::{AtomicU32, Ordering}, Arc, }; + use tokio::sync::Notify; use zebra_chain::{block::Block, orchard::tree as orchard, sapling::tree as sapling}; use zebra_state::HashOrHeight; @@ -874,6 +887,10 @@ pub(crate) mod test { >, active_chain_height: Arc, force_requests_against_source_to_fail: Arc, + /// Pings consumers (chain-index sync loop) when [`Self::mine_blocks`] + /// advances the active height, so they can wake from their interval + /// timer immediately instead of polling. + change_notify: Arc, } impl MockchainSource { @@ -906,6 +923,7 @@ pub(crate) mod test { force_requests_against_source_to_fail: Arc::new( std::sync::atomic::AtomicBool::new(false), ), + change_notify: Arc::new(Notify::new()), } } @@ -950,6 +968,7 @@ pub(crate) mod test { force_requests_against_source_to_fail: Arc::new( std::sync::atomic::AtomicBool::new(false), ), + change_notify: Arc::new(Notify::new()), } } @@ -963,18 +982,20 @@ pub(crate) mod test { pub(crate) fn mine_blocks(&self, blocks: u32) { // len() returns one-indexed length, height is zero-indexed. let max_height = self.max_chain_height(); - let _ = self.active_chain_height.fetch_update( - Ordering::SeqCst, - Ordering::SeqCst, - |current| { + let advanced = self + .active_chain_height + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| { let target = current.saturating_add(blocks).min(max_height); if target == current { None } else { Some(target) } - }, - ); + }) + .is_ok(); + if advanced { + self.change_notify.notify_one(); + } } pub(crate) fn max_chain_height(&self) -> u32 { @@ -1185,5 +1206,9 @@ pub(crate) mod test { ) -> BlockchainSourceResult> { todo!() } + + async fn wait_for_change(&self) { + self.change_notify.notified().await + } } } From 8b35adad0c13544a942f31db52d8c478a920859f Mon Sep 17 00:00:00 2001 From: zancas Date: Fri, 24 Apr 2026 18:15:33 -0700 Subject: [PATCH 03/12] =?UTF-8?q?=20=20test(chain=5Findex):=20document=20m?= =?UTF-8?q?empool=E2=86=94indexer=20skew=20test=20as=20#1037=20direction?= =?UTF-8?q?=20#1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rename `get_mempool_stream_for_stale_snapshot` to `skew_chain_index_ahead_returns_stale_stream` and add a doc-comment identifying it as direction #1 of the #1037 tip-skew suite. The companion test for the mempool-ahead direction follows. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/chain_index/tests/mockchain_tests.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/packages/zaino-state/src/chain_index/tests/mockchain_tests.rs b/packages/zaino-state/src/chain_index/tests/mockchain_tests.rs index 2160ed859..1e2f44897 100644 --- a/packages/zaino-state/src/chain_index/tests/mockchain_tests.rs +++ b/packages/zaino-state/src/chain_index/tests/mockchain_tests.rs @@ -509,8 +509,18 @@ async fn get_mempool_stream_correct_expected_chain_tip_snapshot() { ); } +/// Tip-skew direction #1 (#1037): chain-index sync loop ahead of mempool. +/// +/// After `mine_blocks(1)` + `wait_for_indexer_tip`, the chain-index has +/// reached the new tip but the mempool's serve loop may not have. Calling +/// `get_mempool_stream` with a snapshot taken before mining must reject +/// the snapshot as stale even when the mempool's own tip view still +/// matches it — i.e. staleness must be defined against the *latest* +/// observed tip, not whichever subsystem the consumer happens to read +/// first. Companion direction (mempool ahead of chain-index) lives in a +/// sibling test. #[tokio::test(flavor = "multi_thread", worker_threads = 8)] -async fn get_mempool_stream_for_stale_snapshot() { +async fn skew_chain_index_ahead_returns_stale_stream() { let (_blocks, _indexer, index_reader, mockchain) = load_test_vectors_and_sync_chain_index(true).await; wait_for_indexer_tip(&index_reader, mockchain.active_height()).await; From a5ff98526a62d7df092ddf6302c2ea9a4879569c Mon Sep 17 00:00:00 2001 From: zancas Date: Fri, 24 Apr 2026 18:34:11 -0700 Subject: [PATCH 04/12] Move the renamed direction-#1 test out of `mockchain_tests` and into a new `mempool_skew` mod (1:1 with `tests/mempool_skew.rs`); add the companion direction-#2 test alongside. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - `mempool_skew::chain_index_ahead_returns_stale_stream` — pre-existing direction-#1 spec, deterministically failing post-Option-2 for the same reason it was a flake before (#1037). - `mempool_skew::mempool_ahead_rejects_fresh_snapshot` — new direction-#2 spec. Drives the source via `mine_blocks_silent` (no chain-index notify) so the mempool's poll observes the new tip first, exposing the symmetric false-negative case where the API rejects its own freshly-issued snapshot. Test-only infrastructure to enable the new test: - `MockchainSource::mine_blocks_silent` advances the active height without firing the change-notify, the only way to put the chain-index *behind* the mempool now that `mine_blocks` wakes the sync loop. - `MempoolSubscriber::mempool_tip` (cfg(test)) clones the `mempool_chain_tip` watch receiver; forwarder on `NodeBackedChainIndexSubscriber::mempool_tip`. - `wait_for_mempool_tip_change` helper in `mempool_skew.rs` waits on that receiver, gating direction #2 to act exactly in the skew window. - `mockchain_tests::wait_for_indexer_tip` widened to `pub(super)` so the sibling mod can reuse it. Direction #2 is expected to fail until the principled fix lands. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/zaino-state/src/chain_index.rs | 8 ++ .../zaino-state/src/chain_index/mempool.rs | 8 ++ .../zaino-state/src/chain_index/source.rs | 20 ++++ packages/zaino-state/src/chain_index/tests.rs | 1 + .../src/chain_index/tests/mempool_skew.rs | 111 ++++++++++++++++++ .../src/chain_index/tests/mockchain_tests.rs | 27 +---- 6 files changed, 149 insertions(+), 26 deletions(-) create mode 100644 packages/zaino-state/src/chain_index/tests/mempool_skew.rs diff --git a/packages/zaino-state/src/chain_index.rs b/packages/zaino-state/src/chain_index.rs index edb2b9910..be310bc45 100644 --- a/packages/zaino-state/src/chain_index.rs +++ b/packages/zaino-state/src/chain_index.rs @@ -793,6 +793,14 @@ impl NodeBackedChainIndexSubscriber { self.status.subscribe() } + /// Returns a watch receiver tracking the mempool serve loop's most + /// recently observed chain tip. Forwarder for + /// [`mempool::MempoolSubscriber::mempool_tip`]. + #[cfg(test)] + pub(crate) fn mempool_tip(&self) -> tokio::sync::watch::Receiver { + self.mempool.mempool_tip() + } + async fn get_fullblock_bytes_from_node( &self, id: HashOrHeight, diff --git a/packages/zaino-state/src/chain_index/mempool.rs b/packages/zaino-state/src/chain_index/mempool.rs index 235706ce9..9a750a014 100644 --- a/packages/zaino-state/src/chain_index/mempool.rs +++ b/packages/zaino-state/src/chain_index/mempool.rs @@ -365,6 +365,14 @@ pub struct MempoolSubscriber { } impl MempoolSubscriber { + /// Returns a clone of the watch receiver tracking the mempool's most + /// recently observed chain tip. Used by tests to detect the mempool↔ + /// chain-index tip skew documented in #1037 without busy-polling. + #[cfg(test)] + pub(crate) fn mempool_tip(&self) -> tokio::sync::watch::Receiver { + self.mempool_chain_tip.clone() + } + /// Returns all tx currently in the mempool. pub async fn get_mempool(&self) -> Vec<(MempoolKey, MempoolValue)> { self.subscriber.get_filtered_state(&HashSet::new()) diff --git a/packages/zaino-state/src/chain_index/source.rs b/packages/zaino-state/src/chain_index/source.rs index 5a5236286..6f9d198b9 100644 --- a/packages/zaino-state/src/chain_index/source.rs +++ b/packages/zaino-state/src/chain_index/source.rs @@ -998,6 +998,26 @@ pub(crate) mod test { } } + /// Like [`Self::mine_blocks`] but does *not* fire the source's + /// change-notify. Lets the chain-index sync loop fall through to + /// its timer instead of waking immediately — the only way to put + /// the chain-index *behind* the mempool in tests, since the + /// mempool's serve loop polls `get_best_block_hash` directly and + /// always notices, notify or not. + pub(crate) fn mine_blocks_silent(&self, blocks: u32) { + let max_height = self.max_chain_height(); + let _ = self + .active_chain_height + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| { + let target = current.saturating_add(blocks).min(max_height); + if target == current { + None + } else { + Some(target) + } + }); + } + pub(crate) fn max_chain_height(&self) -> u32 { // len() returns one-indexed length, height is zero-indexed. self.blocks.len().saturating_sub(1) as u32 diff --git a/packages/zaino-state/src/chain_index/tests.rs b/packages/zaino-state/src/chain_index/tests.rs index 81a0afac2..9f08a416b 100644 --- a/packages/zaino-state/src/chain_index/tests.rs +++ b/packages/zaino-state/src/chain_index/tests.rs @@ -2,6 +2,7 @@ pub(crate) mod finalised_state; pub(crate) mod mempool; +mod mempool_skew; mod mockchain_tests; mod poll; mod proptest_blockgen; diff --git a/packages/zaino-state/src/chain_index/tests/mempool_skew.rs b/packages/zaino-state/src/chain_index/tests/mempool_skew.rs new file mode 100644 index 000000000..d699854b6 --- /dev/null +++ b/packages/zaino-state/src/chain_index/tests/mempool_skew.rs @@ -0,0 +1,111 @@ +//! Regression suite for #1037 — mempool↔chain-index tip skew. +//! +//! The chain-index has two independent sync tasks observing the same +//! `BlockchainSource` on independent cadences: the chain-index sync loop +//! and the mempool serve loop. Their tip views can diverge for a window +//! after every block. This module pins each direction of the skew with a +//! deterministic test so the principled fix can be TDD'd against both. + +use tokio::time::{timeout, Duration}; + +use super::{load_test_vectors_and_sync_chain_index, mockchain_tests::wait_for_indexer_tip}; +use crate::chain_index::{ + source::test::MockchainSource, ChainIndex, NodeBackedChainIndexSubscriber, +}; + +/// Waits for the next change of the mempool serve loop's `mempool_chain_tip` +/// watch — i.e. the next time the mempool observes a new best-block hash and +/// resets. Used by direction-#2 to act *exactly* in the window where the +/// mempool has advanced but the chain-index has not. +async fn wait_for_mempool_tip_change( + index_reader: &NodeBackedChainIndexSubscriber, +) { + let mut tip = index_reader.mempool_tip(); + tip.borrow_and_update(); + timeout(Duration::from_secs(10), tip.changed()) + .await + .unwrap_or_else(|_| panic!("mempool tip did not change within 10 s")) + .expect("mempool_chain_tip sender dropped"); +} + +/// Tip-skew direction #1 (#1037): chain-index sync loop ahead of mempool. +/// +/// After `mine_blocks(1)` + `wait_for_indexer_tip`, the chain-index has +/// reached the new tip but the mempool's serve loop may not have. Calling +/// `get_mempool_stream` with a snapshot taken before mining must reject +/// the snapshot as stale even when the mempool's own tip view still +/// matches it — i.e. staleness must be defined against the *latest* +/// observed tip, not whichever subsystem the consumer happens to read +/// first. Companion direction (mempool ahead of chain-index) lives in +/// the sibling test. +#[tokio::test(flavor = "multi_thread", worker_threads = 8)] +async fn chain_index_ahead_returns_stale_stream() { + let (_blocks, _indexer, index_reader, mockchain) = + load_test_vectors_and_sync_chain_index(true).await; + wait_for_indexer_tip(&index_reader, mockchain.active_height()).await; + + let stale_nonfinalized_snapshot = index_reader.snapshot_nonfinalized_state().await.unwrap(); + + mockchain.mine_blocks(1); + wait_for_indexer_tip(&index_reader, mockchain.active_height()).await; + + let mempool_stream = index_reader.get_mempool_stream(Some(&stale_nonfinalized_snapshot)); + + assert!(mempool_stream.is_none()); +} + +/// Tip-skew direction #2 (#1037): mempool serve loop ahead of chain-index. +/// +/// Constructed by advancing the mockchain via `mine_blocks_silent`, which +/// suppresses the chain-index sync loop's `wait_for_change` wake. The +/// mempool's serve loop polls `get_best_block_hash` on its own cadence, +/// so it observes the new tip first. Once the mempool has advanced +/// (`wait_for_mempool_tip_change`), the test takes a snapshot — which +/// reflects the chain-index's *current* (lagging) view — and asks for a +/// mempool stream against that snapshot. +/// +/// Calling `get_mempool_stream(snapshot)` immediately after a +/// `snapshot_nonfinalized_state()` from the same +/// `NodeBackedChainIndexSubscriber` should not reject the caller's +/// snapshot: the API just handed it out, it is the freshest chain-index +/// view available, and refusing the stream for it is an internal +/// contradiction (left hand says fresh, right hand says stale). Today +/// the staleness check compares `snapshot.best_tip.hash` against +/// `mempool_chain_tip` only, so when the mempool has moved past the +/// chain-index, this comparison fails and the stream is dropped. +/// +/// Expected to fail until #1037 is fixed. +#[tokio::test(flavor = "multi_thread", worker_threads = 8)] +async fn mempool_ahead_rejects_fresh_snapshot() { + let (_blocks, _indexer, index_reader, mockchain) = + load_test_vectors_and_sync_chain_index(true).await; + wait_for_indexer_tip(&index_reader, mockchain.active_height()).await; + + // Advance the source without notifying the chain-index sync loop, + // letting the mempool's poll observe the new tip first. + mockchain.mine_blocks_silent(1); + wait_for_mempool_tip_change(&index_reader).await; + + // The snapshot the API just handed out should yield a valid stream. + let fresh_snapshot = index_reader.snapshot_nonfinalized_state().await.unwrap(); + + // Sanity check: chain-index is still at the old tip in this snapshot. + let chain_index_tip = fresh_snapshot + .get_nfs_snapshot() + .unwrap() + .best_tip + .height + .0; + assert!( + chain_index_tip < mockchain.active_height(), + "test setup: chain-index should be behind the mockchain (got chain_index={chain_index_tip}, mockchain={})", + mockchain.active_height(), + ); + + let mempool_stream = index_reader.get_mempool_stream(Some(&fresh_snapshot)); + + assert!( + mempool_stream.is_some(), + "API rejected its own freshly-issued snapshot — mempool↔chain-index tip skew (#1037 direction #2)" + ); +} diff --git a/packages/zaino-state/src/chain_index/tests/mockchain_tests.rs b/packages/zaino-state/src/chain_index/tests/mockchain_tests.rs index 1e2f44897..2fd6e6478 100644 --- a/packages/zaino-state/src/chain_index/tests/mockchain_tests.rs +++ b/packages/zaino-state/src/chain_index/tests/mockchain_tests.rs @@ -22,7 +22,7 @@ use crate::{ /// successful iteration has updated the tip, so each Ready transition is the /// earliest moment the test can observe a new tip — no fixed-cadence polling /// required. -async fn wait_for_indexer_tip( +pub(super) async fn wait_for_indexer_tip( index_reader: &NodeBackedChainIndexSubscriber, expected: u32, ) { @@ -509,31 +509,6 @@ async fn get_mempool_stream_correct_expected_chain_tip_snapshot() { ); } -/// Tip-skew direction #1 (#1037): chain-index sync loop ahead of mempool. -/// -/// After `mine_blocks(1)` + `wait_for_indexer_tip`, the chain-index has -/// reached the new tip but the mempool's serve loop may not have. Calling -/// `get_mempool_stream` with a snapshot taken before mining must reject -/// the snapshot as stale even when the mempool's own tip view still -/// matches it — i.e. staleness must be defined against the *latest* -/// observed tip, not whichever subsystem the consumer happens to read -/// first. Companion direction (mempool ahead of chain-index) lives in a -/// sibling test. -#[tokio::test(flavor = "multi_thread", worker_threads = 8)] -async fn skew_chain_index_ahead_returns_stale_stream() { - let (_blocks, _indexer, index_reader, mockchain) = - load_test_vectors_and_sync_chain_index(true).await; - wait_for_indexer_tip(&index_reader, mockchain.active_height()).await; - - let stale_nonfinalized_snapshot = index_reader.snapshot_nonfinalized_state().await.unwrap(); - - mockchain.mine_blocks(1); - wait_for_indexer_tip(&index_reader, mockchain.active_height()).await; - - let mempool_stream = index_reader.get_mempool_stream(Some(&stale_nonfinalized_snapshot)); - - assert!(mempool_stream.is_none()); -} #[tokio::test(flavor = "multi_thread")] async fn get_block_height() { From 934ce01324a996c817d7892ee9d2a5e00f0f5f97 Mon Sep 17 00:00:00 2001 From: zancas Date: Fri, 24 Apr 2026 18:42:13 -0700 Subject: [PATCH 05/12] =?UTF-8?q?=E2=97=8F=20test(mempool=5Fskew):=20add?= =?UTF-8?q?=20convergence-bound=20success=20criterion=20for=20#1037?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add `tips_converge_within_bounded_time_after_mining` — a 20-sample property test that mines one block per iteration, awaits the chain-index tip via the existing event-driven helper, then samples the mempool's tracked tip immediately and counts iterations where the mempool still has the pre-mining hash. Asserts `lag_count == 0`. Today the chain-index sync loop wakes on `mine_blocks` while the mempool serve loop polls on its own 100 ms cadence, so the mempool lags in the vast majority of iterations and the test fails deterministically. Once the principled fix wakes both subsystems on `mine_blocks`, both converge within a single iteration and `lag_count` is zero. Test-only support: - `MockchainSource::active_block_hash` returns the block hash at the current active height — needed to compare the mempool's tracked tip against the expected post-mining hash. - `wait_for_mempool_tip` helper waits for a specific tip hash on the mempool's watch (companion to the existing `wait_for_mempool_tip_change`); used to resync between iterations so the test isn't chasing stacked updates. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../zaino-state/src/chain_index/source.rs | 7 ++ .../src/chain_index/tests/mempool_skew.rs | 76 ++++++++++++++++++- 2 files changed, 81 insertions(+), 2 deletions(-) diff --git a/packages/zaino-state/src/chain_index/source.rs b/packages/zaino-state/src/chain_index/source.rs index 6f9d198b9..cfb73a00c 100644 --- a/packages/zaino-state/src/chain_index/source.rs +++ b/packages/zaino-state/src/chain_index/source.rs @@ -1027,6 +1027,13 @@ pub(crate) mod test { self.active_chain_height.load(Ordering::SeqCst) } + /// Returns the [`BlockHash`] at [`Self::active_height`]. Used by + /// tip-skew tests to compare the mempool's tracked tip against the + /// expected post-mining hash. + pub(crate) fn active_block_hash(&self) -> BlockHash { + self.hashes[self.active_height() as usize] + } + fn valid_height(&self, height: u32) -> Option { let active_chain_height = self.active_height() as usize; let valid_height = height as usize; diff --git a/packages/zaino-state/src/chain_index/tests/mempool_skew.rs b/packages/zaino-state/src/chain_index/tests/mempool_skew.rs index d699854b6..2a8464db0 100644 --- a/packages/zaino-state/src/chain_index/tests/mempool_skew.rs +++ b/packages/zaino-state/src/chain_index/tests/mempool_skew.rs @@ -9,8 +9,9 @@ use tokio::time::{timeout, Duration}; use super::{load_test_vectors_and_sync_chain_index, mockchain_tests::wait_for_indexer_tip}; -use crate::chain_index::{ - source::test::MockchainSource, ChainIndex, NodeBackedChainIndexSubscriber, +use crate::{ + chain_index::{source::test::MockchainSource, ChainIndex, NodeBackedChainIndexSubscriber}, + BlockHash, }; /// Waits for the next change of the mempool serve loop's `mempool_chain_tip` @@ -28,6 +29,29 @@ async fn wait_for_mempool_tip_change( .expect("mempool_chain_tip sender dropped"); } +/// Waits until the mempool serve loop's `mempool_chain_tip` equals +/// `expected`, or panics after 10 s. Used to re-synchronise between +/// iterations of property-style skew tests. +async fn wait_for_mempool_tip( + index_reader: &NodeBackedChainIndexSubscriber, + expected: BlockHash, +) { + let mut tip = index_reader.mempool_tip(); + let work = async { + loop { + if *tip.borrow_and_update() == expected { + return; + } + tip.changed() + .await + .expect("mempool_chain_tip sender dropped"); + } + }; + timeout(Duration::from_secs(10), work) + .await + .unwrap_or_else(|_| panic!("mempool tip never reached expected hash within 10 s")); +} + /// Tip-skew direction #1 (#1037): chain-index sync loop ahead of mempool. /// /// After `mine_blocks(1)` + `wait_for_indexer_tip`, the chain-index has @@ -109,3 +133,51 @@ async fn mempool_ahead_rejects_fresh_snapshot() { "API rejected its own freshly-issued snapshot — mempool↔chain-index tip skew (#1037 direction #2)" ); } + +/// Convergence-bound (#1037 success criterion): each `mine_blocks` event +/// should bring both subsystems to the new tip within a single iteration. +/// +/// At each iteration the test mines one block, awaits the chain-index +/// tip via the existing event-driven helper, then samples the mempool's +/// tracked tip *immediately*. If the mempool also converged in the same +/// iteration, the sample reflects the new tip; otherwise the sample +/// still equals the pre-mining hash and the iteration is counted as a +/// lag. +/// +/// Today the chain-index sync loop wakes on `mine_blocks` (Option-2 +/// notify) but the mempool serve loop polls on its own 100 ms cadence, +/// so the chain-index nearly always wins the race and the mempool lags +/// in the vast majority of iterations. Repeating the sample 20× makes +/// the failure deterministic in practice — the probability of zero lags +/// across 20 events under a ~10 % per-iteration win rate is negligible. +/// +/// Once the principled fix wakes both subsystems on `mine_blocks`, the +/// expected outcome is `lag_count == 0` deterministically. +#[tokio::test(flavor = "multi_thread", worker_threads = 8)] +async fn tips_converge_within_bounded_time_after_mining() { + let (_blocks, _indexer, index_reader, mockchain) = + load_test_vectors_and_sync_chain_index(true).await; + wait_for_indexer_tip(&index_reader, mockchain.active_height()).await; + + let mut lag_count = 0; + for _ in 0..20 { + let pre_mempool_tip = *index_reader.mempool_tip().borrow(); + mockchain.mine_blocks(1); + let new_height = mockchain.active_height(); + let new_hash = mockchain.active_block_hash(); + + wait_for_indexer_tip(&index_reader, new_height).await; + if *index_reader.mempool_tip().borrow() == pre_mempool_tip { + lag_count += 1; + } + + // Resync mempool before the next iteration so we're not + // chasing multiple stacked updates. + wait_for_mempool_tip(&index_reader, new_hash).await; + } + + assert_eq!( + lag_count, 0, + "mempool lagged chain-index in {lag_count}/20 mine_blocks events (#1037)" + ); +} From 25a8606cb10af3ca57c9d9cb5accc959a49993de Mon Sep 17 00:00:00 2001 From: zancas Date: Fri, 24 Apr 2026 18:56:10 -0700 Subject: [PATCH 06/12] =?UTF-8?q?=E2=97=8F=20fix(chain=5Findex):=20make=20?= =?UTF-8?q?chain-index=20tip=20authoritative=20for=20stream=20freshness=20?= =?UTF-8?q?(#1037=20part=201/2)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `NodeBackedChainIndexSubscriber::get_mempool_stream` previously delegated the snapshot-freshness check to `Mempool`, which compared the snapshot's tip against the mempool serve loop's `mempool_chain_tip`. Because the chain-index sync loop and the mempool serve loop poll the source on independent cadences, the two views diverge for a window after every block — and the API exposed the divergence as caller-visible internal contradictions. Move the freshness check into the chain-index subscriber and compare against the chain-index's *current* non-finalized tip (synchronous `arc-swap` load on `non_finalized_state`, same source of truth as `snapshot_nonfinalized_state`). Pass `None` to `MempoolSubscriber::get_mempool_stream` so the mempool no longer gates on tip — staleness is the chain-index's responsibility. This flips the two skew-direction regression tests added under #1037: - `mempool_skew::chain_index_ahead_returns_stale_stream` (direction #1): FAIL → PASS. After `mine_blocks` + `wait_for_indexer_tip`, the chain-index has the new tip while the mempool still has the old one. The pre-fix check matched the stale snapshot against the mempool's lagging view and accepted it; the new check matches against the chain-index's advanced view and correctly returns `None`. - `mempool_skew::mempool_ahead_rejects_fresh_snapshot` (direction #2): FAIL → PASS. After `mine_blocks_silent` + `wait_for_mempool_tip_change`, the mempool has the new tip while the chain-index still has the old one. The pre-fix check rejected the freshly-issued snapshot because the mempool's tip had moved past it; the new check matches against the chain-index's authoritative (lagging) view and correctly returns `Some`. `mempool_skew::tips_converge_within_bounded_time_after_mining` still fails — the mempool serve loop still polls on its own cadence, so it lags the chain-index in raw timing. That's the next work and lands in part 2/2 (per-subscriber wake on source change via `tokio::sync::broadcast`). The dead `Some(Err(MempoolError::IncorrectChainTip { .. }))` arm in `get_mempool_stream`'s outer match is left in place as defensive code — the mempool can no longer return that error from this call site, but the arm doesn't cost anything and keeps the match exhaustive against future Mempool surface changes. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/zaino-state/src/chain_index.rs | 28 ++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/packages/zaino-state/src/chain_index.rs b/packages/zaino-state/src/chain_index.rs index be310bc45..004ccbba0 100644 --- a/packages/zaino-state/src/chain_index.rs +++ b/packages/zaino-state/src/chain_index.rs @@ -1786,13 +1786,31 @@ impl ChainIndex for NodeBackedChainIndexSubscriber None, }; - let expected_chain_tip = non_finalized_snapshot.map(|snapshot| snapshot.best_tip.hash); + + // Stale-snapshot check: compare the caller's snapshot against the + // chain-index's *current* non-finalized tip (the same source of + // truth as `snapshot_nonfinalized_state`), not against the mempool + // serve loop's `mempool_chain_tip`. The mempool polls on its own + // cadence, so its tip view can diverge from the chain-index's by + // up to one poll cycle (#1037); using it here means the API can + // accept a stale snapshot whenever the mempool happens to be + // lagging, and reject a freshly-issued snapshot whenever the + // mempool happens to be ahead — both are caller-visible internal + // contradictions. The chain-index is authoritative. + if let Some(expected) = non_finalized_snapshot { + match self.non_finalized_state.load().as_ref() { + Some(current) => { + if expected.best_tip.hash != current.get_snapshot().best_tip.hash { + return None; + } + } + None => return None, + } + } + let mut subscriber = self.mempool.clone(); - match subscriber - .get_mempool_stream(expected_chain_tip) - .now_or_never() - { + match subscriber.get_mempool_stream(None).now_or_never() { Some(Ok((in_rx, _handle))) => { let (out_tx, out_rx) = tokio::sync::mpsc::channel::, ChainIndexError>>(32); From 901afc1eb6eefb874a8fdcbd77c374b454bd316c Mon Sep 17 00:00:00 2001 From: zancas Date: Fri, 24 Apr 2026 19:27:09 -0700 Subject: [PATCH 07/12] =?UTF-8?q?=E2=97=8F=20fix(chain=5Findex):=20wake=20?= =?UTF-8?q?mempool=20serve=20loop=20on=20source=20change=20(#1037=20part?= =?UTF-8?q?=202/2)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Both the chain-index sync loop and the mempool serve loop pace themselves on a fixed-cadence timer; only the chain-index used to wake early on source state change (Option-2 `Notify`). After a `mine_blocks`, the chain-index would converge in the same iteration but the mempool would not pick up the new tip until its 100 ms timer expired. That asymmetric lag was the failure mode behind `mempool_skew::tips_converge_within_bounded_time_after_mining`. Replace `BlockchainSource::wait_for_change` (a single shared never-resolves future) with `change_subscribe` returning `Option>`. Each subscriber gets its own buffered receiver, so a `mine_blocks` that fires while one subsystem is mid-iteration is preserved on its receiver and consumed on the next `recv().await` — no missed-wake gap. Push-capable sources (the mockchain) override to return `Some`; poll-only sources (real validators) keep the `None` default and fall through to the timer. Fix the actual convergence bug: the post-`send_replace` cool-down at the top of the mempool serve loop's tip-change branch was a bare `tokio::time::sleep(100ms)` not wired to any wake source. After iter-0's mine event the mempool would land in this sleep and stay deaf to the broadcast for the next iter-1 mine — so the test saw 19 / 20 events lag (iter 0 the only exception, since the mempool started in the broadcast-aware bottom `select!` after init). Both the cool-down and the inter-iteration sleep now go through the shared helper. DRY the now-three call sites of the `match change_rx { Some => select!(sleep, recv), None => sleep }` pattern into `source::wait_or_source_change(change_rx.as_mut(), duration)`. The helper is `pub(super)` — only the chain-index sync loop and the mempool serve loop need it. `mempool_skew::tips_converge_within_bounded_time_after_mining`: FAIL (19/20 lags) → PASS (0 lags). The two direction tests (`chain_index_ahead_returns_stale_stream`, `mempool_ahead_rejects_fresh_snapshot`) keep passing — `mine_blocks_silent` still suppresses the broadcast, and the mempool's 100 ms timer alone is enough to put it ahead of the chain-index for direction #2. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/zaino-state/src/chain_index.rs | 10 +-- .../zaino-state/src/chain_index/mempool.rs | 21 +++++- .../zaino-state/src/chain_index/source.rs | 75 +++++++++++++------ .../src/chain_index/tests/mempool_skew.rs | 9 +-- .../src/chain_index/tests/mockchain_tests.rs | 1 - 5 files changed, 74 insertions(+), 42 deletions(-) diff --git a/packages/zaino-state/src/chain_index.rs b/packages/zaino-state/src/chain_index.rs index 004ccbba0..54648abcb 100644 --- a/packages/zaino-state/src/chain_index.rs +++ b/packages/zaino-state/src/chain_index.rs @@ -650,6 +650,7 @@ impl NodeBackedChainIndex { tokio::task::spawn(async move { let status = status.clone(); let source = source.clone(); + let mut change_rx = source.change_subscribe(); let mut consecutive_failures: u32 = 0; let mut current_backoff = timings.initial_backoff; @@ -716,14 +717,7 @@ impl NodeBackedChainIndex { consecutive_failures = 0; current_backoff = timings.initial_backoff; status.store(StatusType::Ready); - // Wait `interval`, but wake immediately if the source - // signals new state (push-capable sources only — the - // default `wait_for_change` never fires, so poll-only - // sources still pace themselves on the timer). - tokio::select! { - _ = tokio::time::sleep(timings.interval) => {} - _ = source.wait_for_change() => {} - } + source::wait_or_source_change(change_rx.as_mut(), timings.interval).await; } Err(e) => { consecutive_failures += 1; diff --git a/packages/zaino-state/src/chain_index/mempool.rs b/packages/zaino-state/src/chain_index/mempool.rs index 9a750a014..7282c64ce 100644 --- a/packages/zaino-state/src/chain_index/mempool.rs +++ b/packages/zaino-state/src/chain_index/mempool.rs @@ -5,7 +5,7 @@ use std::{collections::HashSet, sync::Arc}; use crate::{ broadcast::{Broadcast, BroadcastSubscriber}, chain_index::{ - source::{BlockchainSource, BlockchainSourceError}, + source::{wait_or_source_change, BlockchainSource, BlockchainSourceError}, types::db::metadata::MempoolInfo, }, error::{MempoolError, StatusError}, @@ -142,6 +142,7 @@ impl Mempool { status.store(StatusType::Spawning); let sync_handle = tokio::spawn(async move { + let mut change_rx = mempool.fetcher.change_subscribe(); let mut best_block_hash: Hash; let mut check_block_hash: Hash; @@ -207,7 +208,16 @@ impl Mempool { .send_replace(check_block_hash.into()); best_block_hash = check_block_hash; - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + // Cool-down before we re-fetch mempool state at the new + // tip — gives the validator a moment to settle. Stays + // interruptible: a follow-on `mine_blocks` must wake us + // immediately, otherwise the next iteration of the test + // sees a stale tip until the timer expires (#1037). + wait_or_source_change( + change_rx.as_mut(), + std::time::Duration::from_millis(100), + ) + .await; continue; } @@ -230,7 +240,12 @@ impl Mempool { return; } - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + // Wait the cadence interval, but wake immediately if the + // source signals new state — keeps mempool↔chain-index + // tip propagation aligned within a single iteration on + // push-capable sources (#1037 part 2/2). + wait_or_source_change(change_rx.as_mut(), std::time::Duration::from_millis(100)) + .await; } }); diff --git a/packages/zaino-state/src/chain_index/source.rs b/packages/zaino-state/src/chain_index/source.rs index cfb73a00c..24623a3ad 100644 --- a/packages/zaino-state/src/chain_index/source.rs +++ b/packages/zaino-state/src/chain_index/source.rs @@ -101,16 +101,39 @@ pub trait BlockchainSource: Clone + Send + Sync + 'static { max_entries: Option, ) -> BlockchainSourceResult>; - /// Future that resolves when the source's observable state may have - /// changed (e.g. a new block was added). Sync loops can `select!` between - /// this and a fixed-cadence timer to drop per-iteration latency on - /// push-capable sources without losing the timer for poll-only sources. + /// Subscribe to source state-change notifications. /// - /// The default never resolves — sync loops fall through to their timer. - /// Push-capable sources (test mockchains) override this to wake the sync - /// loop on relevant state changes. - async fn wait_for_change(&self) { - std::future::pending::<()>().await + /// Each subscriber receives every change-event on its own buffered + /// receiver, so wakes between iterations are preserved (no missed-wake + /// gap). Sync loops typically call `change_subscribe` once at startup + /// and `select!` `recv()` against their fixed-cadence timer, falling + /// through to the timer when no push notification arrives. + /// + /// The default returns `None` — poll-only sources (real validators) + /// pace themselves on the timer alone. Push-capable sources (test + /// mockchains) override to provide a live receiver. + fn change_subscribe(&self) -> Option> { + None + } +} + +/// Sleep up to `duration`, but return early if `change_rx` resolves first. +/// +/// Both sync loops in this module (chain-index sync, mempool serve) pace +/// themselves on a fixed-cadence timer and want to wake immediately when the +/// source signals new state. The two-arm `tokio::select!` is identical at +/// every call site; this helper is the single home for the pattern. Pass +/// `None` for poll-only sources — the helper degrades to a plain sleep. +pub(super) async fn wait_or_source_change( + change_rx: Option<&mut tokio::sync::broadcast::Receiver<()>>, + duration: std::time::Duration, +) { + match change_rx { + Some(rx) => tokio::select! { + _ = tokio::time::sleep(duration) => {} + _ = rx.recv() => {} + }, + None => tokio::time::sleep(duration).await, } } @@ -837,7 +860,7 @@ pub(crate) mod test { atomic::{AtomicU32, Ordering}, Arc, }; - use tokio::sync::Notify; + use tokio::sync::broadcast; use zebra_chain::{block::Block, orchard::tree as orchard, sapling::tree as sapling}; use zebra_state::HashOrHeight; @@ -887,10 +910,14 @@ pub(crate) mod test { >, active_chain_height: Arc, force_requests_against_source_to_fail: Arc, - /// Pings consumers (chain-index sync loop) when [`Self::mine_blocks`] - /// advances the active height, so they can wake from their interval - /// timer immediately instead of polling. - change_notify: Arc, + /// Pings every subscriber registered via + /// [`BlockchainSource::change_subscribe`] when [`Self::mine_blocks`] + /// advances the active height, so each can wake from its interval + /// timer immediately. `broadcast` (over `Notify`) gives every + /// subscriber its own buffered receiver, so a `mine_blocks` that + /// fires while one subsystem is mid-iteration is preserved on its + /// receiver and consumed on the next `recv().await`. + change_broadcast: broadcast::Sender<()>, } impl MockchainSource { @@ -923,7 +950,7 @@ pub(crate) mod test { force_requests_against_source_to_fail: Arc::new( std::sync::atomic::AtomicBool::new(false), ), - change_notify: Arc::new(Notify::new()), + change_broadcast: broadcast::channel(16).0, } } @@ -968,7 +995,7 @@ pub(crate) mod test { force_requests_against_source_to_fail: Arc::new( std::sync::atomic::AtomicBool::new(false), ), - change_notify: Arc::new(Notify::new()), + change_broadcast: broadcast::channel(16).0, } } @@ -994,7 +1021,7 @@ pub(crate) mod test { }) .is_ok(); if advanced { - self.change_notify.notify_one(); + let _ = self.change_broadcast.send(()); } } @@ -1006,16 +1033,18 @@ pub(crate) mod test { /// always notices, notify or not. pub(crate) fn mine_blocks_silent(&self, blocks: u32) { let max_height = self.max_chain_height(); - let _ = self - .active_chain_height - .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| { + let _ = self.active_chain_height.fetch_update( + Ordering::SeqCst, + Ordering::SeqCst, + |current| { let target = current.saturating_add(blocks).min(max_height); if target == current { None } else { Some(target) } - }); + }, + ); } pub(crate) fn max_chain_height(&self) -> u32 { @@ -1234,8 +1263,8 @@ pub(crate) mod test { todo!() } - async fn wait_for_change(&self) { - self.change_notify.notified().await + fn change_subscribe(&self) -> Option> { + Some(self.change_broadcast.subscribe()) } } } diff --git a/packages/zaino-state/src/chain_index/tests/mempool_skew.rs b/packages/zaino-state/src/chain_index/tests/mempool_skew.rs index 2a8464db0..f4f436256 100644 --- a/packages/zaino-state/src/chain_index/tests/mempool_skew.rs +++ b/packages/zaino-state/src/chain_index/tests/mempool_skew.rs @@ -81,7 +81,7 @@ async fn chain_index_ahead_returns_stale_stream() { /// Tip-skew direction #2 (#1037): mempool serve loop ahead of chain-index. /// /// Constructed by advancing the mockchain via `mine_blocks_silent`, which -/// suppresses the chain-index sync loop's `wait_for_change` wake. The +/// suppresses the source's `change_subscribe` broadcast. The /// mempool's serve loop polls `get_best_block_hash` on its own cadence, /// so it observes the new tip first. Once the mempool has advanced /// (`wait_for_mempool_tip_change`), the test takes a snapshot — which @@ -114,12 +114,7 @@ async fn mempool_ahead_rejects_fresh_snapshot() { let fresh_snapshot = index_reader.snapshot_nonfinalized_state().await.unwrap(); // Sanity check: chain-index is still at the old tip in this snapshot. - let chain_index_tip = fresh_snapshot - .get_nfs_snapshot() - .unwrap() - .best_tip - .height - .0; + let chain_index_tip = fresh_snapshot.get_nfs_snapshot().unwrap().best_tip.height.0; assert!( chain_index_tip < mockchain.active_height(), "test setup: chain-index should be behind the mockchain (got chain_index={chain_index_tip}, mockchain={})", diff --git a/packages/zaino-state/src/chain_index/tests/mockchain_tests.rs b/packages/zaino-state/src/chain_index/tests/mockchain_tests.rs index 2fd6e6478..d839ebb6a 100644 --- a/packages/zaino-state/src/chain_index/tests/mockchain_tests.rs +++ b/packages/zaino-state/src/chain_index/tests/mockchain_tests.rs @@ -509,7 +509,6 @@ async fn get_mempool_stream_correct_expected_chain_tip_snapshot() { ); } - #[tokio::test(flavor = "multi_thread")] async fn get_block_height() { let (blocks, _indexer, index_reader, _mockchain) = From 2d9fb2b31d48bf653f71228e97b4e36b186d58c5 Mon Sep 17 00:00:00 2001 From: zancas Date: Fri, 24 Apr 2026 19:39:14 -0700 Subject: [PATCH 08/12] =?UTF-8?q?=E2=97=8F=20test(sync=5Floop):=20migrate?= =?UTF-8?q?=20escalates=5Fto=5Fcritical=20poll=20loop=20to=20poll=5Funtil?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `escalates_to_critical_after_persistent_failure` open-coded the exact shape of `super::poll::poll_until`: a `loop { sleep; check; assert elapsed < deadline }` with a redundant post-loop deadline assert. Replace it with a single `poll_until` call — same probe cadence (`timings.initial_backoff`), same deadline (`timings.max_backoff_window() * 5`), with the deadline panic now handled inside the helper. Behaviour is preserved with one minor tightening: the original's post-loop `elapsed < max_time_to_critical` assert had a flake window of up to one `poll_interval` over budget (inner assert passed at iter N-1, then sleep + status==CriticalError break at iter N could push elapsed past the bound). `poll_until` collapses the two deadline checks into one panic on `Instant::now() >= deadline`, matching the test's stated intent. Brings this test in line with `tip_converges_after_burst_mine` in the same module — same helper, same shape. `Instant` and `sleep` imports stay; both are still used by `survives_transient_source_failure`, which is a soak test (verify status stays NOT-CriticalError for 2 s under a transient failure) and intentionally does not fit the `poll_until` shape. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../chain_index/tests/proptest_blockgen.rs | 13 ++++++++- .../src/chain_index/tests/sync_loop.rs | 27 +++++-------------- 2 files changed, 19 insertions(+), 21 deletions(-) diff --git a/packages/zaino-state/src/chain_index/tests/proptest_blockgen.rs b/packages/zaino-state/src/chain_index/tests/proptest_blockgen.rs index 48be02a88..9c4f42384 100644 --- a/packages/zaino-state/src/chain_index/tests/proptest_blockgen.rs +++ b/packages/zaino-state/src/chain_index/tests/proptest_blockgen.rs @@ -393,8 +393,19 @@ fn make_chain() { let indexer = NodeBackedChainIndex::new(mockchain.clone(), config) .await .unwrap(); - tokio::time::sleep(Duration::from_secs(5)).await; let index_reader = indexer.subscriber(); + let expected_block_count = segment_length * (branch_count + 1); + poll_until( + "indexer to load all branch blocks into the non-finalized state", + Duration::from_secs(30), + Duration::from_millis(50), + || async { + let snapshot = index_reader.snapshot_nonfinalized_state().await.ok()?; + let nfs = snapshot.get_nfs_snapshot()?; + (nfs.blocks.len() == expected_block_count).then_some(()) + }, + ) + .await; let snapshot = index_reader.snapshot_nonfinalized_state().await.unwrap(); let non_finalized_snapshot = snapshot.get_nfs_snapshot().expect("not synced"); let best_tip_hash = non_finalized_snapshot.best_tip.hash; diff --git a/packages/zaino-state/src/chain_index/tests/sync_loop.rs b/packages/zaino-state/src/chain_index/tests/sync_loop.rs index 257b0e8ac..53d92c350 100644 --- a/packages/zaino-state/src/chain_index/tests/sync_loop.rs +++ b/packages/zaino-state/src/chain_index/tests/sync_loop.rs @@ -52,32 +52,19 @@ async fn escalates_to_critical_after_persistent_failure() { let (_blocks, _indexer, index_reader, mockchain) = load_test_vectors_and_sync_chain_index_with_timings(true, timings).await; - let start = Instant::now(); mockchain.set_failing(true); // 5× slack over the nominal backoff sum to absorb scheduling jitter and // the per-iteration sync work the loop performs between sleeps. let max_time_to_critical = timings.max_backoff_window() * 5; - let poll_interval = timings.initial_backoff; - - loop { - sleep(poll_interval).await; - - if index_reader.status() == StatusType::CriticalError { - break; - } - assert!( - start.elapsed() < max_time_to_critical, - "CriticalError was not reached within {max_time_to_critical:?}" - ); - } - - let elapsed = start.elapsed(); - assert!( - elapsed < max_time_to_critical, - "CriticalError took {elapsed:?}, exceeding the maximum backoff window" - ); + super::poll::poll_until( + "sync loop to escalate to CriticalError under persistent source failure", + max_time_to_critical, + timings.initial_backoff, + || async { (index_reader.status() == StatusType::CriticalError).then_some(()) }, + ) + .await; } /// Moved here from the integration test From 2b1eaa389c2337c8d3acc185a1e8830e323b7409 Mon Sep 17 00:00:00 2001 From: zancas Date: Fri, 24 Apr 2026 19:42:30 -0700 Subject: [PATCH 09/12] =?UTF-8?q?=E2=97=8F=20test(sync=5Floop):=20shrink?= =?UTF-8?q?=20SyncTimings::fast=20to=20sub-second=20backoff=20window?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Drop initial_backoff 25→10 ms, max_backoff 800→50 ms, and max_consecutive_failures 10→5 so escalates_to_critical_after_persistent_failure drives its backoff schedule to completion in ~120 ms (4 sleeps: 10 + 20 + 40 + 50, with the cap clamping the would-be 80 ms doubling) instead of ~4 s. The compressed schedule still exercises the same three observable behaviours as default(): counter→threshold, exponential doubling, and max_backoff clamp. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/zaino-state/src/chain_index.rs | 18 ++++++++++++------ .../src/chain_index/tests/sync_loop.rs | 4 ++-- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/packages/zaino-state/src/chain_index.rs b/packages/zaino-state/src/chain_index.rs index 54648abcb..d55af0409 100644 --- a/packages/zaino-state/src/chain_index.rs +++ b/packages/zaino-state/src/chain_index.rs @@ -512,8 +512,9 @@ pub struct NodeBackedChainIndex { /// [`SyncTimings::default()`] produces production values (500 ms inter-iteration /// sleep, 250 ms initial backoff doubling up to 8 s, 10 consecutive failures /// before escalating to [`StatusType::CriticalError`] — ~40 s total window). -/// [`SyncTimings::fast()`] shrinks each duration by 10× so backoff-dependent -/// unit tests finish in ~4 s instead of ~40 s. +/// [`SyncTimings::fast()`] is the test-only shrink: ~120 ms total backoff +/// window with the same three observable behaviours (counter→threshold, +/// exponential doubling, cap clamp) — see its doc for the chosen values. #[derive(Clone, Copy, Debug)] pub(crate) struct SyncTimings { pub(crate) interval: Duration, @@ -535,13 +536,18 @@ impl Default for SyncTimings { #[cfg(test)] impl SyncTimings { - /// 10× faster than [`Self::default`] — test-only. + /// Test-only shrink: drives the backoff schedule to completion in + /// ~120 ms (4 sleeps: 10 + 20 + 40 + 50, with the 50 ms cap clamping + /// the would-be 80 ms doubling). Five `max_consecutive_failures` is + /// the smallest count that still exercises both exponential doubling + /// and `max_backoff` clamping — i.e. the same three observable + /// behaviours `default()` does, just compressed. pub(crate) const fn fast() -> Self { Self { interval: Duration::from_millis(50), - initial_backoff: Duration::from_millis(25), - max_backoff: Duration::from_millis(800), - max_consecutive_failures: 10, + initial_backoff: Duration::from_millis(10), + max_backoff: Duration::from_millis(50), + max_consecutive_failures: 5, } } diff --git a/packages/zaino-state/src/chain_index/tests/sync_loop.rs b/packages/zaino-state/src/chain_index/tests/sync_loop.rs index 53d92c350..2b686d720 100644 --- a/packages/zaino-state/src/chain_index/tests/sync_loop.rs +++ b/packages/zaino-state/src/chain_index/tests/sync_loop.rs @@ -44,8 +44,8 @@ async fn survives_transient_source_failure() { /// After `max_consecutive_failures` with exponential backoff, the sync loop /// should escalate to [`StatusType::CriticalError`]. /// -/// Uses [`SyncTimings::fast`] (10× shrunk) so the full backoff schedule fits -/// in a few seconds instead of ~40 s. +/// Uses [`SyncTimings::fast`] so the full backoff schedule fits in +/// ~120 ms instead of ~40 s. #[tokio::test(flavor = "multi_thread")] async fn escalates_to_critical_after_persistent_failure() { let timings = SyncTimings::fast(); From 270bbe9bb2cb71ca2b018c9a2b35faaf4ae19876 Mon Sep 17 00:00:00 2001 From: zancas Date: Fri, 24 Apr 2026 20:04:31 -0700 Subject: [PATCH 10/12] =?UTF-8?q?=20=20test(proptest=5Fblockgen):=20shrink?= =?UTF-8?q?=20passthrough=5Ftest=20source=20delay=20100=20=E2=86=92=2010?= =?UTF-8?q?=20ms?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The 100 ms ProptestMockchain delay keeps fs.sync_to_height from initializing non_finalized_state before the passthrough_test body runs — an invariant the :109 assertion on StillSyncingFinalizedState depends on. 100 ms was a conservative gut number: at that rate, sync_to_height(139) takes ~28 s before initializing nfs, orders of magnitude more than any test body needs. Drop to 10 ms. sync_to_height(139) now takes ~2.8 s before nfs initializes, still far longer than any test body (longest is passthrough_get_block_range at ~2 s of body work). The invariant is preserved with comfortable headroom. Direct savings: - passthrough_get_block_range: its 10-block serial stream goes from 10 × 100 ms = 1 s per task → 10 × 10 ms = 100 ms. ~900 ms off wall. - All seven passthrough_* tests: Mempool::spawn's three serial source probes (get_mempool_txids, get_best_block_hash, get_mempool_transactions) drop from 3 × 100 ms to 3 × 10 ms. ~270 ms off each. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/zaino-state/src/chain_index/tests/proptest_blockgen.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/zaino-state/src/chain_index/tests/proptest_blockgen.rs b/packages/zaino-state/src/chain_index/tests/proptest_blockgen.rs index 9c4f42384..9d1885526 100644 --- a/packages/zaino-state/src/chain_index/tests/proptest_blockgen.rs +++ b/packages/zaino-state/src/chain_index/tests/proptest_blockgen.rs @@ -61,7 +61,7 @@ fn passthrough_test( // This number can be played with. We want to slow down // sync enough to trigger passthrough without // slowing down passthrough more than we need to - delay: Some(Duration::from_millis(100)), + delay: Some(Duration::from_millis(10)), best_branch_cache: Arc::new(std::sync::OnceLock::new()), tx_index: Arc::new(std::sync::OnceLock::new()), }; From 979c1a4d37241cd544d701a68db7fe1f888e88c4 Mon Sep 17 00:00:00 2001 From: zancas Date: Fri, 24 Apr 2026 20:15:34 -0700 Subject: [PATCH 11/12] =?UTF-8?q?=E2=97=8F=20refactor(mempool):=20parallel?= =?UTF-8?q?ize=20independent=20startup=20probes=20in=20Mempool::spawn?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mempool::spawn had three serial startup probes: (1) retry-loop on get_mempool_txids waiting for validator mempool online, (2) one-shot get_best_block_hash (fatal on error), (3) retry-loop on get_mempool_transactions for initial contents. (1) and (2) share no data — (2) doesn't depend on mempool liveness, and (1) discards its result. Serial ordering added one full source round-trip to every NodeBackedChainIndex::new for no semantic reason. Run (1) and (2) concurrently via tokio::try_join!. If (2) hard-errors, try_join! cancels (1)'s retry-loop and returns the Critical error — same overall outcome as before, just without the wait. The 3 s retry backoff on (1) is preserved; (2)'s fatal-on-error semantics are preserved. Savings per indexer init: - tests (mock source at 10 ms): ~10 ms - prod LAN: a few ms - prod WAN: 50-200 ms Every passthrough_* test, make_chain, and any other test or code path that spins up a NodeBackedChainIndex benefits. (3) still runs serially after the parallel gather — extracting it for concurrent execution is a separate refactor because its helper is wired through the partially-built Mempool struct. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../zaino-state/src/chain_index/mempool.rs | 45 +++++++++---------- 1 file changed, 22 insertions(+), 23 deletions(-) diff --git a/packages/zaino-state/src/chain_index/mempool.rs b/packages/zaino-state/src/chain_index/mempool.rs index 7282c64ce..b4bd431c9 100644 --- a/packages/zaino-state/src/chain_index/mempool.rs +++ b/packages/zaino-state/src/chain_index/mempool.rs @@ -60,34 +60,33 @@ impl Mempool { fetcher: T, capacity_and_shard_amount: Option<(usize, usize)>, ) -> Result { - // Wait for mempool in validator to come online. - loop { - match fetcher.get_mempool_txids().await { - Ok(_) => { - break; - } - Err(_) => { - info!("Waiting for Validator mempool to come online"); - tokio::time::sleep(std::time::Duration::from_secs(3)).await; + // Run the two independent startup probes concurrently: + // (1) wait for the validator mempool to come online (retry loop), + // (2) fetch the initial best block hash (fatal if unavailable). + // They share nothing; serial ordering added one source round-trip + // to every indexer init for no semantic reason. `try_join!` + // cancels the retry loop if (2) returns a hard error. + let wait_for_mempool_online = async { + loop { + match fetcher.get_mempool_txids().await { + Ok(_) => return Ok::<(), MempoolError>(()), + Err(_) => { + info!("Waiting for Validator mempool to come online"); + tokio::time::sleep(std::time::Duration::from_secs(3)).await; + } } } - } - - let best_block_hash: BlockHash = match fetcher.get_best_block_hash().await { - Ok(block_hash_opt) => match block_hash_opt { - Some(hash) => hash.into(), - None => { - return Err(MempoolError::Critical( - "Error in mempool: Error connecting with validator".to_string(), - )) - } - }, - Err(_e) => { - return Err(MempoolError::Critical( + }; + let fetch_best_block_hash = async { + match fetcher.get_best_block_hash().await { + Ok(Some(hash)) => Ok(BlockHash::from(hash)), + Ok(None) | Err(_) => Err(MempoolError::Critical( "Error in mempool: Error connecting with validator".to_string(), - )) + )), } }; + let (_, best_block_hash) = + tokio::try_join!(wait_for_mempool_online, fetch_best_block_hash)?; let (chain_tip_sender, _chain_tip_reciever) = tokio::sync::watch::channel(best_block_hash); From 3fbd47794201bccda7cb71f3afcb48fbfbb09213 Mon Sep 17 00:00:00 2001 From: zancas Date: Fri, 24 Apr 2026 20:24:34 -0700 Subject: [PATCH 12/12] =?UTF-8?q?=E2=97=8F=20refactor(mempool):=20fold=20i?= =?UTF-8?q?nitial-snapshot=20fetch=20into=20parallel=20Mempool::spawn=20ga?= =?UTF-8?q?ther?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to the previous Mempool::spawn refactor, which parallelized the wait-for-online and best-block-hash probes but left the initial mempool-snapshot fetch running serially after them. Fold it in so all three source-dependent startup calls run concurrently. To make it poolable with the other two, extracted Mempool::get_mempool_transactions (private method on Mempool) into a free fn fetch_mempool_snapshot(fetcher: &T) — the method only ever touched self.fetcher, so there was no real reason it had to be a method. The serve() call site now uses the free fn too. Two minor behavioural cleanups enabled by this: - Initial Mempool status is now constructed as Ready instead of Spawning (the snapshot has already been fetched by the time we build the struct), matching what the old code did on first-iteration success. - Dropped the `state.notify(...)` call that was inside the snapshot retry loop — during spawn there are no subscribers attached yet, so the notify was unreachable. Savings per indexer init: - tests (mock source at 10 ms): 30 ms serial → 10 ms parallel, ~20 ms. - prod LAN: a few ms. - prod WAN: 100–400 ms. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../zaino-state/src/chain_index/mempool.rs | 132 +++++++++--------- 1 file changed, 67 insertions(+), 65 deletions(-) diff --git a/packages/zaino-state/src/chain_index/mempool.rs b/packages/zaino-state/src/chain_index/mempool.rs index b4bd431c9..9ad512c0d 100644 --- a/packages/zaino-state/src/chain_index/mempool.rs +++ b/packages/zaino-state/src/chain_index/mempool.rs @@ -53,6 +53,44 @@ pub struct Mempool { status: NamedAtomicStatus, } +/// Fetches the current mempool snapshot (all txids paired with their +/// serialized transactions) from a blockchain source. +async fn fetch_mempool_snapshot( + fetcher: &T, +) -> Result, MempoolError> { + let txids = fetcher.get_mempool_txids().await?.ok_or_else(|| { + MempoolError::BlockchainSourceError(BlockchainSourceError::Unrecoverable( + "could not fetch mempool data: mempool txid list was None".to_string(), + )) + })?; + + let mut transactions = Vec::with_capacity(txids.len()); + for txid in txids { + let (transaction, _location) = + fetcher + .get_transaction(txid.0.into()) + .await? + .ok_or_else(|| { + MempoolError::BlockchainSourceError(BlockchainSourceError::Unrecoverable( + format!( + "could not fetch mempool data: transaction not found for txid {txid}" + ), + )) + })?; + + transactions.push(( + MempoolKey { + txid: txid.to_string(), + }, + MempoolValue { + serialized_tx: Arc::new(transaction.into()), + }, + )); + } + + Ok(transactions) +} + impl Mempool { /// Spawns a new [`Mempool`]. #[instrument(name = "Mempool::spawn", skip(fetcher, capacity_and_shard_amount))] @@ -60,12 +98,15 @@ impl Mempool { fetcher: T, capacity_and_shard_amount: Option<(usize, usize)>, ) -> Result { - // Run the two independent startup probes concurrently: + // Run the three independent startup probes concurrently: // (1) wait for the validator mempool to come online (retry loop), - // (2) fetch the initial best block hash (fatal if unavailable). - // They share nothing; serial ordering added one source round-trip - // to every indexer init for no semantic reason. `try_join!` - // cancels the retry loop if (2) returns a hard error. + // (2) fetch the initial best block hash (fatal if unavailable), + // (3) fetch the initial mempool snapshot (retry on transient + // fetch errors). + // None of the three depend on each other; serial ordering added + // two source round-trips to every indexer init for no semantic + // reason. `try_join!` cancels the retry loops if (2) returns a + // hard error. let wait_for_mempool_online = async { loop { match fetcher.get_mempool_txids().await { @@ -85,8 +126,22 @@ impl Mempool { )), } }; - let (_, best_block_hash) = - tokio::try_join!(wait_for_mempool_online, fetch_best_block_hash)?; + let fetch_initial_snapshot = async { + loop { + match fetch_mempool_snapshot(&fetcher).await { + Ok(txs) => return Ok::<_, MempoolError>(txs), + Err(e) => { + warn!("{e}"); + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + } + } + } + }; + let (_, best_block_hash, initial_transactions) = tokio::try_join!( + wait_for_mempool_online, + fetch_best_block_hash, + fetch_initial_snapshot, + )?; let (chain_tip_sender, _chain_tip_reciever) = tokio::sync::watch::channel(best_block_hash); @@ -101,26 +156,11 @@ impl Mempool { }, mempool_chain_tip: chain_tip_sender, sync_task_handle: None, - status: NamedAtomicStatus::new("Mempool", StatusType::Spawning), + status: NamedAtomicStatus::new("Mempool", StatusType::Ready), }; - - loop { - match mempool.get_mempool_transactions().await { - Ok(mempool_transactions) => { - mempool.status.store(StatusType::Ready); - mempool - .state - .insert_filtered_set(mempool_transactions, mempool.status.load()); - break; - } - Err(e) => { - mempool.state.notify(mempool.status.load()); - warn!("{e}"); - tokio::time::sleep(std::time::Duration::from_millis(500)).await; - continue; - } - }; - } + mempool + .state + .insert_filtered_set(initial_transactions, mempool.status.load()); mempool.sync_task_handle = Some(std::sync::Mutex::new(mempool.serve().await?)); @@ -220,7 +260,7 @@ impl Mempool { continue; } - match mempool.get_mempool_transactions().await { + match fetch_mempool_snapshot(&mempool.fetcher).await { Ok(mempool_transactions) => { status.store(StatusType::Ready); state.insert_filtered_set(mempool_transactions, status.load()); @@ -251,44 +291,6 @@ impl Mempool { Ok(sync_handle) } - /// Returns all transactions in the mempool. - async fn get_mempool_transactions( - &self, - ) -> Result, MempoolError> { - let mut transactions = Vec::new(); - - let txids = self.fetcher.get_mempool_txids().await?.ok_or_else(|| { - MempoolError::BlockchainSourceError(BlockchainSourceError::Unrecoverable( - "could not fetch mempool data: mempool txid list was None".to_string(), - )) - })?; - - for txid in txids { - let (transaction, _location) = self - .fetcher - .get_transaction(txid.0.into()) - .await? - .ok_or_else(|| { - MempoolError::BlockchainSourceError( - crate::chain_index::source::BlockchainSourceError::Unrecoverable(format!( - "could not fetch mempool data: transaction not found for txid {txid}" - )), - ) - })?; - - transactions.push(( - MempoolKey { - txid: txid.to_string(), - }, - MempoolValue { - serialized_tx: Arc::new(transaction.into()), - }, - )); - } - - Ok(transactions) - } - /// Returns a [`MempoolSubscriber`]. pub fn subscriber(&self) -> MempoolSubscriber { MempoolSubscriber {