diff --git a/ethexe/observer/Cargo.toml b/ethexe/observer/Cargo.toml index c82646fcede..57865de136f 100644 --- a/ethexe/observer/Cargo.toml +++ b/ethexe/observer/Cargo.toml @@ -17,6 +17,7 @@ gprimitives = { workspace = true, features = ["std"] } anyhow.workspace = true alloy = { workspace = true, features = [ "consensus", + "contract", "eips", "node-bindings", "provider-http", diff --git a/ethexe/observer/src/utils.rs b/ethexe/observer/src/utils.rs index 6b67ffd11df..f1a96ac997c 100644 --- a/ethexe/observer/src/utils.rs +++ b/ethexe/observer/src/utils.rs @@ -19,6 +19,7 @@ // TODO #4552: add tests for observer utils use alloy::{ + contract::Event, network::{Ethereum, Network}, providers::{Provider as _, RootProvider}, rpc::{ @@ -31,14 +32,18 @@ use alloy::{ }; use anyhow::{Context, Result}; use ethexe_common::{Address, BlockData, BlockHeader, SimpleBlockData, events::BlockEvent}; -use ethexe_ethereum::{mirror, router}; +use ethexe_ethereum::{abi::IRouter, mirror, router}; use futures::{TryFutureExt, future}; use gprimitives::H256; use std::{collections::HashMap, future::IntoFuture, ops::RangeInclusive}; // TODO: #4562 append also a configurable batch size parameter -/// Max number of blocks to query in alloy. -const MAX_QUERY_BLOCK_RANGE: usize = 256; +/// Max number of blocks per `eth_getBlockByNumber` JSON-RPC batch. +const MAX_BLOCK_BATCH_SIZE: usize = 256; +/// Block-window size passed to alloy's [`alloy::contract::ChunkedEvent`] when fetching logs. +const LOGS_CHUNK_SIZE: u64 = 256; +/// Maximum number of in-flight log chunk requests issued by [`alloy::contract::ChunkedEvent`]. +const LOGS_MAX_CONCURRENCY: usize = 8; #[derive(Debug, Copy, Clone, PartialEq, Eq, derive_more::From)] pub enum BlockId { @@ -70,6 +75,8 @@ pub trait BlockLoader { pub struct EthereumBlockLoader { provider: RootProvider, router_address: Address, + logs_chunk_size: u64, + logs_max_concurrency: usize, } impl EthereumBlockLoader { @@ -77,9 +84,21 @@ impl EthereumBlockLoader { Self { provider, router_address, + logs_chunk_size: LOGS_CHUNK_SIZE, + logs_max_concurrency: LOGS_MAX_CONCURRENCY, } } + pub fn with_logs_chunk_size(mut self, chunk_size: u64) -> Self { + self.logs_chunk_size = chunk_size; + self + } + + pub fn with_logs_max_concurrency(mut self, max_concurrency: usize) -> Self { + self.logs_max_concurrency = max_concurrency; + self + } + fn log_filter() -> Filter { let topic = Topic::from_iter( [ @@ -137,10 +156,13 @@ impl EthereumBlockLoader { (block_hash, header) } - async fn request_block_batch(&self, range: RangeInclusive) -> Result> { + /// Fetches block headers for `range` via a single `eth_getBlockByNumber` JSON-RPC batch. + /// + /// The caller is responsible for keeping batches within the provider's allowed batch size, + /// see [`MAX_BLOCK_BATCH_SIZE`]. + async fn request_block_headers(&self, range: RangeInclusive) -> Result> { let mut batch = BatchRequest::new(self.provider.client()); let headers_request = range - .clone() .map(|bn| { batch .add_call::<_, Option<::BlockResponse>>( @@ -151,37 +173,39 @@ impl EthereumBlockLoader { }) .collect::>(); batch.send().await?; - let headers_request = future::join_all(headers_request); - let filter = Self::log_filter() - .from_block(*range.start()) - .to_block(*range.end()); - let logs_request = self.provider.get_logs(&filter); - - let (blocks, logs) = future::join(headers_request, logs_request).await; - let logs = logs?; - - let mut blocks_data = Vec::new(); - for response in blocks { - let block = response?; - let Some(block) = block else { + let mut blocks = Vec::new(); + for response in future::join_all(headers_request).await { + let Some(block) = response? else { break; }; - - let (block_hash, header) = Self::block_response_to_data(block); - blocks_data.push(BlockData { - hash: block_hash, - header, - events: Vec::new(), - }); + blocks.push(block); } + Ok(blocks) + } - let mut events = self.logs_to_events(logs)?; - for block_data in blocks_data.iter_mut() { - block_data.events = events.remove(&block_data.hash).unwrap_or_default(); - } + /// Fetches all router/mirror logs for `range` using alloy's chunked-event helper. + /// + /// The helper attempts the full range first, then splits into `logs_chunk_size`-block + /// windows (default [`LOGS_CHUNK_SIZE`]) queried up to `logs_max_concurrency` + /// (default [`LOGS_MAX_CONCURRENCY`]) at a time, and finally falls back to + /// per-block queries for any chunk that still fails. + async fn request_logs(&self, range: RangeInclusive) -> Result> { + let filter = Self::log_filter() + .from_block(*range.start()) + .to_block(*range.end()); - Ok(blocks_data) + // The event type parameter is unused by `query_raw`, which returns undecoded logs; + // we pass `IRouter::BatchCommitted` solely to satisfy the `SolEvent` trait bound. + let chunked = Event::<_, IRouter::BatchCommitted>::new(self.provider.clone(), filter) + .chunked() + .chunk_size(self.logs_chunk_size) + .concurrent(self.logs_max_concurrency); + + chunked + .query_raw() + .await + .context("failed to fetch logs via alloy ChunkedEvent") } } @@ -242,18 +266,37 @@ impl BlockLoader for EthereumBlockLoader { } async fn load_many(&self, range: RangeInclusive) -> Result> { + if range.is_empty() { + return Ok(HashMap::new()); + } log::trace!("Querying blocks batch in {range:?} range"); - let batch_futures = range.clone().step_by(MAX_QUERY_BLOCK_RANGE).map(|start| { - let end = (start + MAX_QUERY_BLOCK_RANGE as u64 - 1).min(*range.end()); - self.request_block_batch(start..=end) + let header_batches = range.clone().step_by(MAX_BLOCK_BATCH_SIZE).map(|start| { + let end = (start + MAX_BLOCK_BATCH_SIZE as u64 - 1).min(*range.end()); + self.request_block_headers(start..=end) }); - let batches = future::try_join_all(batch_futures).await?; - Ok(batches - .into_iter() - .flatten() - .map(|data| (data.hash, data)) - .collect()) + let (headers_batches, logs) = future::try_join( + future::try_join_all(header_batches), + self.request_logs(range), + ) + .await?; + + let mut events = self.logs_to_events(logs)?; + let mut blocks_data: HashMap = HashMap::new(); + for block in headers_batches.into_iter().flatten() { + let (hash, header) = Self::block_response_to_data(block); + let events = events.remove(&hash).unwrap_or_default(); + blocks_data.insert( + hash, + BlockData { + hash, + header, + events, + }, + ); + } + + Ok(blocks_data) } }