Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ethexe/observer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ gprimitives = { workspace = true, features = ["std"] }
anyhow.workspace = true
alloy = { workspace = true, features = [
"consensus",
"contract",
"eips",
"node-bindings",
"provider-http",
Expand Down
106 changes: 67 additions & 39 deletions ethexe/observer/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
// TODO #4552: add tests for observer utils

use alloy::{
contract::Event,
network::{Ethereum, Network},
providers::{Provider as _, RootProvider},
rpc::{
Expand All @@ -31,14 +32,18 @@ use alloy::{
};
use anyhow::{Context, Result};
use ethexe_common::{Address, BlockData, BlockHeader, SimpleBlockData, events::BlockEvent};
use ethexe_ethereum::{mirror, router};
use ethexe_ethereum::{abi::IRouter, mirror, router};
use futures::{TryFutureExt, future};
use gprimitives::H256;
use std::{collections::HashMap, future::IntoFuture, ops::RangeInclusive};

// TODO: #4562 append also a configurable batch size parameter
/// Max number of blocks to query in alloy.
const MAX_QUERY_BLOCK_RANGE: usize = 256;
/// Max number of blocks per `eth_getBlockByNumber` JSON-RPC batch.
const MAX_BLOCK_BATCH_SIZE: usize = 256;
/// Block-window size passed to alloy's [`alloy::contract::ChunkedEvent`] when fetching logs.
const LOGS_CHUNK_SIZE: u64 = 256;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can be up to 100k, mb make this configurable

/// Maximum number of in-flight log chunk requests issued by [`alloy::contract::ChunkedEvent`].
const LOGS_MAX_CONCURRENCY: usize = 8;

#[derive(Debug, Copy, Clone, PartialEq, Eq, derive_more::From)]
pub enum BlockId {
Expand Down Expand Up @@ -137,10 +142,13 @@ impl EthereumBlockLoader {
(block_hash, header)
}

async fn request_block_batch(&self, range: RangeInclusive<u64>) -> Result<Vec<BlockData>> {
/// Fetches block headers for `range` via a single `eth_getBlockByNumber` JSON-RPC batch.
///
/// The caller is responsible for keeping batches within the provider's allowed batch size,
/// see [`MAX_BLOCK_BATCH_SIZE`].
async fn request_block_headers(&self, range: RangeInclusive<u64>) -> Result<Vec<Block>> {
let mut batch = BatchRequest::new(self.provider.client());
let headers_request = range
.clone()
.map(|bn| {
batch
.add_call::<_, Option<<Ethereum as Network>::BlockResponse>>(
Expand All @@ -151,37 +159,38 @@ impl EthereumBlockLoader {
})
.collect::<Vec<_>>();
batch.send().await?;
let headers_request = future::join_all(headers_request);

let filter = Self::log_filter()
.from_block(*range.start())
.to_block(*range.end());
let logs_request = self.provider.get_logs(&filter);

let (blocks, logs) = future::join(headers_request, logs_request).await;
let logs = logs?;

let mut blocks_data = Vec::new();
for response in blocks {
let block = response?;
let Some(block) = block else {
let mut blocks = Vec::new();
for response in future::join_all(headers_request).await {
let Some(block) = response? else {
break;
};

let (block_hash, header) = Self::block_response_to_data(block);
blocks_data.push(BlockData {
hash: block_hash,
header,
events: Vec::new(),
});
blocks.push(block);
}
Ok(blocks)
}

let mut events = self.logs_to_events(logs)?;
for block_data in blocks_data.iter_mut() {
block_data.events = events.remove(&block_data.hash).unwrap_or_default();
}
/// Fetches all router/mirror logs for `range` using alloy's chunked-event helper.
///
/// The helper attempts the full range first, then splits into [`LOGS_CHUNK_SIZE`]-block
/// windows queried up to [`LOGS_MAX_CONCURRENCY`] at a time, and finally falls back to
/// per-block queries for any chunk that still fails.
async fn request_logs(&self, range: RangeInclusive<u64>) -> Result<Vec<Log>> {
let filter = Self::log_filter()
.from_block(*range.start())
.to_block(*range.end());

Ok(blocks_data)
// The event type parameter is unused by `query_raw`, which returns undecoded logs;
// we pass `IRouter::BatchCommitted` solely to satisfy the `SolEvent` trait bound.
let chunked = Event::<_, IRouter::BatchCommitted>::new(self.provider.clone(), filter)
.chunked()
.chunk_size(LOGS_CHUNK_SIZE)
.concurrent(LOGS_MAX_CONCURRENCY);

chunked
.query_raw()
.await
.context("failed to fetch logs via alloy ChunkedEvent")
}
}

Expand Down Expand Up @@ -242,18 +251,37 @@ impl BlockLoader for EthereumBlockLoader {
}

async fn load_many(&self, range: RangeInclusive<u64>) -> Result<HashMap<H256, BlockData>> {
if range.is_empty() {
return Ok(HashMap::new());
}
log::trace!("Querying blocks batch in {range:?} range");

let batch_futures = range.clone().step_by(MAX_QUERY_BLOCK_RANGE).map(|start| {
let end = (start + MAX_QUERY_BLOCK_RANGE as u64 - 1).min(*range.end());
self.request_block_batch(start..=end)
let header_batches = range.clone().step_by(MAX_BLOCK_BATCH_SIZE).map(|start| {
let end = (start + MAX_BLOCK_BATCH_SIZE as u64 - 1).min(*range.end());
self.request_block_headers(start..=end)
});

let batches = future::try_join_all(batch_futures).await?;
Ok(batches
.into_iter()
.flatten()
.map(|data| (data.hash, data))
.collect())
let (headers_batches, logs) = future::try_join(
future::try_join_all(header_batches),
self.request_logs(range),
)
.await?;

let mut events = self.logs_to_events(logs)?;
let mut blocks_data: HashMap<H256, BlockData> = HashMap::new();
for block in headers_batches.into_iter().flatten() {
let (hash, header) = Self::block_response_to_data(block);
let events = events.remove(&hash).unwrap_or_default();
blocks_data.insert(
hash,
BlockData {
hash,
header,
events,
},
);
}

Ok(blocks_data)
}
}
Loading