From adef37a0b4916276ca6196c6799c086365e2d13b Mon Sep 17 00:00:00 2001 From: Grisha Sobol Date: Wed, 29 Apr 2026 11:37:26 +0200 Subject: [PATCH 1/3] feat(ethexe/observer): use alloy ChunkedEvent for batched log queries MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the hand-rolled 256-block chunking around `eth_getLogs` in `EthereumBlockLoader::load_many` with `alloy::contract::ChunkedEvent`, which adds an optimistic full-range query first and a per-block fallback when an individual chunk fails. Block-header batching is kept and now runs in parallel with the chunked log query. Also adds an ignored `bench::bench_load_many` test that times `load_many` against an in-process Anvil. Local numbers (1500 blocks): old custom chunked ≈131 ms avg, new alloy ChunkedEvent ≈120 ms avg. Co-Authored-By: Claude Opus 4.7 (1M context) --- ethexe/observer/Cargo.toml | 1 + ethexe/observer/src/bench.rs | 104 +++++++++++++++++++++++++++++++++++ ethexe/observer/src/lib.rs | 2 + ethexe/observer/src/utils.rs | 103 +++++++++++++++++++++------------- 4 files changed, 171 insertions(+), 39 deletions(-) create mode 100644 ethexe/observer/src/bench.rs diff --git a/ethexe/observer/Cargo.toml b/ethexe/observer/Cargo.toml index c82646fcede..57865de136f 100644 --- a/ethexe/observer/Cargo.toml +++ b/ethexe/observer/Cargo.toml @@ -17,6 +17,7 @@ gprimitives = { workspace = true, features = ["std"] } anyhow.workspace = true alloy = { workspace = true, features = [ "consensus", + "contract", "eips", "node-bindings", "provider-http", diff --git a/ethexe/observer/src/bench.rs b/ethexe/observer/src/bench.rs new file mode 100644 index 00000000000..b65e58bcd5b --- /dev/null +++ b/ethexe/observer/src/bench.rs @@ -0,0 +1,104 @@ +// This file is part of Gear. +// +// Copyright (C) 2025 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Manual benchmark for [`BlockLoader::load_many`]. +//! +//! Run with: +//! ```sh +//! cargo nextest run -p ethexe-observer --no-capture \ +//! --run-ignored only -- bench::bench_load_many +//! ``` + +use crate::utils::{BlockLoader, EthereumBlockLoader}; +use alloy::{ + node_bindings::Anvil, + providers::{Provider, ProviderBuilder, ext::AnvilApi}, +}; +use anyhow::Result; +use ethexe_common::Address; +use ethexe_ethereum::deploy::EthereumDeployer; +use gsigner::secp256k1::Signer; +use std::time::Instant; + +const BENCH_BLOCKS: u64 = 1500; +const BENCH_REPEATS: usize = 5; + +#[tokio::test] +#[ignore = "manual benchmark, requires anvil"] +async fn bench_load_many() -> Result<()> { + gear_utils::init_default_logger(); + + let anvil = Anvil::new().try_spawn()?; + let ethereum_rpc = anvil.ws_endpoint(); + + let signer = Signer::memory(); + let sender_public_key = signer + .import("0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80".parse()?)?; + let sender_address = sender_public_key.to_address(); + let validators: Vec
= vec!["0x45D6536E3D4AdC8f4e13c5c4aA54bE968C55Abf1".parse()?]; + + let deployer = EthereumDeployer::new(ðereum_rpc, signer, sender_address) + .await + .unwrap(); + let ethereum = deployer + .with_validators(validators.try_into().unwrap()) + .deploy() + .await?; + + let provider = ProviderBuilder::default().connect(ðereum_rpc).await?; + + // Sprinkle some events along the chain so the log filter has work to do. + let wat_template = + "(module (export \"init\" (func $init)) (func $init (drop (i32.const {N}))))"; + for n in 0..16u32 { + let wat = wat_template.replace("{N}", &n.to_string()); + let wasm = wat::parse_str(&wat)?; + ethereum.router().request_code_validation(&wasm).await?; + } + + // Mine empty blocks up to the target height to grow the range. + let head = provider.get_block_number().await?; + if head < BENCH_BLOCKS { + provider.anvil_mine(Some(BENCH_BLOCKS - head), None).await?; + } + let head_number = provider.get_block_number().await?; + println!("[bench] chain head = {head_number}"); + + let loader = EthereumBlockLoader::new(provider.clone(), ethereum.router().address()); + + let mut samples = Vec::with_capacity(BENCH_REPEATS); + for i in 0..BENCH_REPEATS { + let start = Instant::now(); + let blocks = loader.load_many(0..=head_number).await?; + let elapsed = start.elapsed(); + println!( + "[bench] iter {i}: load_many(0..={head_number}) = {} blocks in {:?}", + blocks.len(), + elapsed + ); + samples.push(elapsed); + } + + let total: std::time::Duration = samples.iter().sum(); + let avg = total / samples.len() as u32; + let min = *samples.iter().min().unwrap(); + let max = *samples.iter().max().unwrap(); + println!("[bench] head={head_number} avg={avg:?} min={min:?} max={max:?} runs={BENCH_REPEATS}"); + + Ok(()) +} diff --git a/ethexe/observer/src/lib.rs b/ethexe/observer/src/lib.rs index d9836bbd7d3..24ca6508d75 100644 --- a/ethexe/observer/src/lib.rs +++ b/ethexe/observer/src/lib.rs @@ -43,6 +43,8 @@ use sync::ChainSync; mod sync; pub mod utils; +#[cfg(test)] +mod bench; #[cfg(test)] mod tests; diff --git a/ethexe/observer/src/utils.rs b/ethexe/observer/src/utils.rs index 6b67ffd11df..5d596a17e43 100644 --- a/ethexe/observer/src/utils.rs +++ b/ethexe/observer/src/utils.rs @@ -19,6 +19,7 @@ // TODO #4552: add tests for observer utils use alloy::{ + contract::Event, network::{Ethereum, Network}, providers::{Provider as _, RootProvider}, rpc::{ @@ -31,14 +32,18 @@ use alloy::{ }; use anyhow::{Context, Result}; use ethexe_common::{Address, BlockData, BlockHeader, SimpleBlockData, events::BlockEvent}; -use ethexe_ethereum::{mirror, router}; +use ethexe_ethereum::{abi::IRouter, mirror, router}; use futures::{TryFutureExt, future}; use gprimitives::H256; use std::{collections::HashMap, future::IntoFuture, ops::RangeInclusive}; // TODO: #4562 append also a configurable batch size parameter -/// Max number of blocks to query in alloy. -const MAX_QUERY_BLOCK_RANGE: usize = 256; +/// Max number of blocks per `eth_getBlockByNumber` JSON-RPC batch. +const MAX_BLOCK_BATCH_SIZE: usize = 256; +/// Block-window size passed to alloy's [`alloy::contract::ChunkedEvent`] when fetching logs. +const LOGS_CHUNK_SIZE: u64 = 256; +/// Maximum number of in-flight log chunk requests issued by [`alloy::contract::ChunkedEvent`]. +const LOGS_MAX_CONCURRENCY: usize = 8; #[derive(Debug, Copy, Clone, PartialEq, Eq, derive_more::From)] pub enum BlockId { @@ -137,10 +142,13 @@ impl EthereumBlockLoader { (block_hash, header) } - async fn request_block_batch(&self, range: RangeInclusive) -> Result> { + /// Fetches block headers for `range` via a single `eth_getBlockByNumber` JSON-RPC batch. + /// + /// The caller is responsible for keeping batches within the provider's allowed batch size, + /// see [`MAX_BLOCK_BATCH_SIZE`]. + async fn request_block_headers(&self, range: RangeInclusive) -> Result> { let mut batch = BatchRequest::new(self.provider.client()); let headers_request = range - .clone() .map(|bn| { batch .add_call::<_, Option<::BlockResponse>>( @@ -151,37 +159,38 @@ impl EthereumBlockLoader { }) .collect::>(); batch.send().await?; - let headers_request = future::join_all(headers_request); - let filter = Self::log_filter() - .from_block(*range.start()) - .to_block(*range.end()); - let logs_request = self.provider.get_logs(&filter); - - let (blocks, logs) = future::join(headers_request, logs_request).await; - let logs = logs?; - - let mut blocks_data = Vec::new(); - for response in blocks { - let block = response?; - let Some(block) = block else { + let mut blocks = Vec::new(); + for response in future::join_all(headers_request).await { + let Some(block) = response? else { break; }; - - let (block_hash, header) = Self::block_response_to_data(block); - blocks_data.push(BlockData { - hash: block_hash, - header, - events: Vec::new(), - }); + blocks.push(block); } + Ok(blocks) + } - let mut events = self.logs_to_events(logs)?; - for block_data in blocks_data.iter_mut() { - block_data.events = events.remove(&block_data.hash).unwrap_or_default(); - } + /// Fetches all router/mirror logs for `range` using alloy's chunked-event helper. + /// + /// The helper attempts the full range first, then splits into [`LOGS_CHUNK_SIZE`]-block + /// windows queried up to [`LOGS_MAX_CONCURRENCY`] at a time, and finally falls back to + /// per-block queries for any chunk that still fails. + async fn request_logs(&self, range: RangeInclusive) -> Result> { + let filter = Self::log_filter() + .from_block(*range.start()) + .to_block(*range.end()); - Ok(blocks_data) + // The event type parameter is unused by `query_raw`, which returns undecoded logs; + // we pass `IRouter::BatchCommitted` solely to satisfy the `SolEvent` trait bound. + let chunked = Event::<_, IRouter::BatchCommitted>::new(self.provider.clone(), filter) + .chunked() + .chunk_size(LOGS_CHUNK_SIZE) + .concurrent(LOGS_MAX_CONCURRENCY); + + chunked + .query_raw() + .await + .context("failed to fetch logs via alloy ChunkedEvent") } } @@ -244,16 +253,32 @@ impl BlockLoader for EthereumBlockLoader { async fn load_many(&self, range: RangeInclusive) -> Result> { log::trace!("Querying blocks batch in {range:?} range"); - let batch_futures = range.clone().step_by(MAX_QUERY_BLOCK_RANGE).map(|start| { - let end = (start + MAX_QUERY_BLOCK_RANGE as u64 - 1).min(*range.end()); - self.request_block_batch(start..=end) + let header_batches = range.clone().step_by(MAX_BLOCK_BATCH_SIZE).map(|start| { + let end = (start + MAX_BLOCK_BATCH_SIZE as u64 - 1).min(*range.end()); + self.request_block_headers(start..=end) }); - let batches = future::try_join_all(batch_futures).await?; - Ok(batches - .into_iter() - .flatten() - .map(|data| (data.hash, data)) - .collect()) + let (headers_batches, logs) = future::try_join( + future::try_join_all(header_batches), + self.request_logs(range), + ) + .await?; + + let mut events = self.logs_to_events(logs)?; + let mut blocks_data: HashMap = HashMap::new(); + for block in headers_batches.into_iter().flatten() { + let (hash, header) = Self::block_response_to_data(block); + let events = events.remove(&hash).unwrap_or_default(); + blocks_data.insert( + hash, + BlockData { + hash, + header, + events, + }, + ); + } + + Ok(blocks_data) } } From 04ae097ace1ff18ad14b44d7fa44dfc87ac9c415 Mon Sep 17 00:00:00 2001 From: "claude[bot]" <41898282+claude[bot]@users.noreply.github.com> Date: Wed, 29 Apr 2026 10:13:21 +0000 Subject: [PATCH 2/3] fix(ethexe/observer): remove bench, add empty-range guard in load_many Co-authored-by: Gregory Sobol --- ethexe/observer/src/bench.rs | 104 ----------------------------------- ethexe/observer/src/lib.rs | 2 - ethexe/observer/src/utils.rs | 3 + 3 files changed, 3 insertions(+), 106 deletions(-) delete mode 100644 ethexe/observer/src/bench.rs diff --git a/ethexe/observer/src/bench.rs b/ethexe/observer/src/bench.rs deleted file mode 100644 index b65e58bcd5b..00000000000 --- a/ethexe/observer/src/bench.rs +++ /dev/null @@ -1,104 +0,0 @@ -// This file is part of Gear. -// -// Copyright (C) 2025 Gear Technologies Inc. -// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -//! Manual benchmark for [`BlockLoader::load_many`]. -//! -//! Run with: -//! ```sh -//! cargo nextest run -p ethexe-observer --no-capture \ -//! --run-ignored only -- bench::bench_load_many -//! ``` - -use crate::utils::{BlockLoader, EthereumBlockLoader}; -use alloy::{ - node_bindings::Anvil, - providers::{Provider, ProviderBuilder, ext::AnvilApi}, -}; -use anyhow::Result; -use ethexe_common::Address; -use ethexe_ethereum::deploy::EthereumDeployer; -use gsigner::secp256k1::Signer; -use std::time::Instant; - -const BENCH_BLOCKS: u64 = 1500; -const BENCH_REPEATS: usize = 5; - -#[tokio::test] -#[ignore = "manual benchmark, requires anvil"] -async fn bench_load_many() -> Result<()> { - gear_utils::init_default_logger(); - - let anvil = Anvil::new().try_spawn()?; - let ethereum_rpc = anvil.ws_endpoint(); - - let signer = Signer::memory(); - let sender_public_key = signer - .import("0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80".parse()?)?; - let sender_address = sender_public_key.to_address(); - let validators: Vec
= vec!["0x45D6536E3D4AdC8f4e13c5c4aA54bE968C55Abf1".parse()?]; - - let deployer = EthereumDeployer::new(ðereum_rpc, signer, sender_address) - .await - .unwrap(); - let ethereum = deployer - .with_validators(validators.try_into().unwrap()) - .deploy() - .await?; - - let provider = ProviderBuilder::default().connect(ðereum_rpc).await?; - - // Sprinkle some events along the chain so the log filter has work to do. - let wat_template = - "(module (export \"init\" (func $init)) (func $init (drop (i32.const {N}))))"; - for n in 0..16u32 { - let wat = wat_template.replace("{N}", &n.to_string()); - let wasm = wat::parse_str(&wat)?; - ethereum.router().request_code_validation(&wasm).await?; - } - - // Mine empty blocks up to the target height to grow the range. - let head = provider.get_block_number().await?; - if head < BENCH_BLOCKS { - provider.anvil_mine(Some(BENCH_BLOCKS - head), None).await?; - } - let head_number = provider.get_block_number().await?; - println!("[bench] chain head = {head_number}"); - - let loader = EthereumBlockLoader::new(provider.clone(), ethereum.router().address()); - - let mut samples = Vec::with_capacity(BENCH_REPEATS); - for i in 0..BENCH_REPEATS { - let start = Instant::now(); - let blocks = loader.load_many(0..=head_number).await?; - let elapsed = start.elapsed(); - println!( - "[bench] iter {i}: load_many(0..={head_number}) = {} blocks in {:?}", - blocks.len(), - elapsed - ); - samples.push(elapsed); - } - - let total: std::time::Duration = samples.iter().sum(); - let avg = total / samples.len() as u32; - let min = *samples.iter().min().unwrap(); - let max = *samples.iter().max().unwrap(); - println!("[bench] head={head_number} avg={avg:?} min={min:?} max={max:?} runs={BENCH_REPEATS}"); - - Ok(()) -} diff --git a/ethexe/observer/src/lib.rs b/ethexe/observer/src/lib.rs index 24ca6508d75..d9836bbd7d3 100644 --- a/ethexe/observer/src/lib.rs +++ b/ethexe/observer/src/lib.rs @@ -43,8 +43,6 @@ use sync::ChainSync; mod sync; pub mod utils; -#[cfg(test)] -mod bench; #[cfg(test)] mod tests; diff --git a/ethexe/observer/src/utils.rs b/ethexe/observer/src/utils.rs index 5d596a17e43..991308c1139 100644 --- a/ethexe/observer/src/utils.rs +++ b/ethexe/observer/src/utils.rs @@ -251,6 +251,9 @@ impl BlockLoader for EthereumBlockLoader { } async fn load_many(&self, range: RangeInclusive) -> Result> { + if range.is_empty() { + return Ok(HashMap::new()); + } log::trace!("Querying blocks batch in {range:?} range"); let header_batches = range.clone().step_by(MAX_BLOCK_BATCH_SIZE).map(|start| { From 5f65b17bf2932a4b964c60cd28b18e2ed40c2685 Mon Sep 17 00:00:00 2001 From: "claude[bot]" <41898282+claude[bot]@users.noreply.github.com> Date: Thu, 30 Apr 2026 09:39:35 +0000 Subject: [PATCH 3/3] fix(ethexe/observer): make logs_chunk_size and logs_max_concurrency configurable Add builder methods `with_logs_chunk_size` and `with_logs_max_concurrency` to `EthereumBlockLoader` so callers (e.g. reth nodes) can override the defaults for chunk size (up to 100k) and concurrency. Co-authored-by: Gregory Sobol --- ethexe/observer/src/utils.rs | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/ethexe/observer/src/utils.rs b/ethexe/observer/src/utils.rs index 991308c1139..f1a96ac997c 100644 --- a/ethexe/observer/src/utils.rs +++ b/ethexe/observer/src/utils.rs @@ -75,6 +75,8 @@ pub trait BlockLoader { pub struct EthereumBlockLoader { provider: RootProvider, router_address: Address, + logs_chunk_size: u64, + logs_max_concurrency: usize, } impl EthereumBlockLoader { @@ -82,9 +84,21 @@ impl EthereumBlockLoader { Self { provider, router_address, + logs_chunk_size: LOGS_CHUNK_SIZE, + logs_max_concurrency: LOGS_MAX_CONCURRENCY, } } + pub fn with_logs_chunk_size(mut self, chunk_size: u64) -> Self { + self.logs_chunk_size = chunk_size; + self + } + + pub fn with_logs_max_concurrency(mut self, max_concurrency: usize) -> Self { + self.logs_max_concurrency = max_concurrency; + self + } + fn log_filter() -> Filter { let topic = Topic::from_iter( [ @@ -172,8 +186,9 @@ impl EthereumBlockLoader { /// Fetches all router/mirror logs for `range` using alloy's chunked-event helper. /// - /// The helper attempts the full range first, then splits into [`LOGS_CHUNK_SIZE`]-block - /// windows queried up to [`LOGS_MAX_CONCURRENCY`] at a time, and finally falls back to + /// The helper attempts the full range first, then splits into `logs_chunk_size`-block + /// windows (default [`LOGS_CHUNK_SIZE`]) queried up to `logs_max_concurrency` + /// (default [`LOGS_MAX_CONCURRENCY`]) at a time, and finally falls back to /// per-block queries for any chunk that still fails. async fn request_logs(&self, range: RangeInclusive) -> Result> { let filter = Self::log_filter() @@ -184,8 +199,8 @@ impl EthereumBlockLoader { // we pass `IRouter::BatchCommitted` solely to satisfy the `SolEvent` trait bound. let chunked = Event::<_, IRouter::BatchCommitted>::new(self.provider.clone(), filter) .chunked() - .chunk_size(LOGS_CHUNK_SIZE) - .concurrent(LOGS_MAX_CONCURRENCY); + .chunk_size(self.logs_chunk_size) + .concurrent(self.logs_max_concurrency); chunked .query_raw()