diff --git a/ethexe/network/src/db_sync/mod.rs b/ethexe/network/src/db_sync/mod.rs index 04ba93af755..a3771a4d82b 100644 --- a/ethexe/network/src/db_sync/mod.rs +++ b/ethexe/network/src/db_sync/mod.rs @@ -79,21 +79,8 @@ struct Metrics { incoming_dropped_requests: metrics::Counter, } -#[derive(Debug, Copy, Clone, Eq, PartialEq)] -pub enum NewRequestRoundReason { - /// Request was queued for the first time or re-queued because of there are no available peers - FromQueue, - /// We have only part of the data - PartialData, - /// Peer failed to respond or response validation failed - PeerFailed, -} - #[derive(Debug, Clone, Copy, Eq, PartialEq, derive_more::Display)] pub enum RequestFailure { - /// Request exceeded its round limit - #[display("Request exceeded its round limit")] - OutOfRounds, /// Request had been processing for too long #[display("Request had been processing for too long")] Timeout, @@ -101,17 +88,8 @@ pub enum RequestFailure { #[derive(Debug, Eq, PartialEq)] pub enum Event { - /// Request is processing new round - NewRequestRound { - /// The ID of request - request_id: RequestId, - /// Peer we're currently requesting to - peer_id: PeerId, - /// Reason for new request round - reason: NewRequestRoundReason, - }, - /// Request is in pending state because of lack of available peers - PendingStateRequest { + /// Request is in a pending state because there are no peers + NoPeers { /// The ID of request request_id: RequestId, }, @@ -159,7 +137,6 @@ pub enum Event { #[derive(Debug, Clone)] pub(crate) struct Config { - pub max_rounds_per_request: u32, pub request_timeout: Duration, pub max_simultaneous_responses: u32, pub max_chain_len_for_announces_response: NonZeroU32, @@ -168,7 +145,6 @@ pub(crate) struct Config { impl Default for Config { fn default() -> Self { Self { - max_rounds_per_request: 10, request_timeout: Duration::from_secs(100), max_simultaneous_responses: 10, max_chain_len_for_announces_response: DEFAULT_MAX_CHAIN_LEN_FOR_ANNOUNCES_RESPONSE, @@ -178,11 +154,6 @@ impl Default for Config { #[cfg(test)] // used only in tests yet impl Config { - pub(crate) fn with_max_rounds_per_request(mut self, max_rounds_per_request: u32) -> Self { - self.max_rounds_per_request = max_rounds_per_request; - self - } - pub(crate) fn with_request_timeout(mut self, request_timeout: Duration) -> Self { self.request_timeout = request_timeout; self @@ -394,7 +365,7 @@ pub(crate) struct InnerProgramIdsResponse(BTreeSet); pub(crate) struct InnerAnnouncesResponse(Vec); /// Network-only type to be encoded-decoded and sent over the network -#[derive(Debug, Eq, PartialEq, derive_more::From, Encode, Decode)] +#[derive(Debug, Eq, PartialEq, derive_more::From, derive_more::Unwrap, Encode, Decode)] pub(crate) enum InnerResponse { Hashes(InnerHashesResponse), ProgramIds(InnerProgramIdsResponse), @@ -723,7 +694,6 @@ pub(crate) mod tests { let (mut alice, _alice_db, _data_provider) = new_swarm().await; let alice_handle = alice.behaviour().handle(); let (mut bob, bob_db, _data_provider) = new_swarm().await; - let bob_peer_id = *bob.local_peer_id(); let hello_hash = bob_db.cas().write(b"hello"); let world_hash = bob_db.cas().write(b"world"); @@ -761,16 +731,6 @@ pub(crate) mod tests { let request = alice_handle.request(Request::hashes([hello_hash, world_hash])); let request_id = request.request_id(); - let event = alice.next_behaviour_event().await; - assert_eq!( - event, - Event::NewRequestRound { - request_id, - peer_id: bob_peer_id, - reason: NewRequestRoundReason::FromQueue, - } - ); - let event = alice.next_behaviour_event().await; assert_eq!(event, Event::RequestSucceed { request_id }); @@ -787,63 +747,6 @@ pub(crate) mod tests { ) } - #[tokio::test] - async fn out_of_rounds() { - init_logger(); - - let alice_config = Config::default().with_max_rounds_per_request(1); - let (mut alice, _alice_db, _data_provider) = new_swarm_with_config(alice_config).await; - let alice_handle = alice.behaviour().handle(); - - let mut bob = Swarm::new_ephemeral_tokio(move |_keypair| { - InnerBehaviour::new( - [(STREAM_PROTOCOL, ProtocolSupport::Full)], - request_response::Config::default(), - ) - }); - bob.connect(&mut alice).await; - - let request = alice_handle.request(Request::hashes([])); - let request_id = request.request_id(); - - let event = alice.next_behaviour_event().await; - assert_eq!( - event, - Event::NewRequestRound { - request_id, - peer_id: *bob.local_peer_id(), - reason: NewRequestRoundReason::FromQueue, - } - ); - - tokio::spawn(async move { - while let Some(event) = bob.next().await { - if let Ok(request_response::Event::Message { - message: - Message::Request { - channel, request, .. - }, - .. - }) = event.try_into_behaviour_event() - { - assert_eq!(request, InnerRequest::Hashes(HashesRequest::default())); - drop(channel); - } - } - }); - - let event = alice.next_behaviour_event().await; - assert_eq!( - event, - Event::RequestFailed { - request_id, - error: RequestFailure::OutOfRounds, - } - ); - - request.await.unwrap_err(); - } - #[tokio::test(start_paused = true)] async fn timeout() { init_logger(); @@ -863,16 +766,6 @@ pub(crate) mod tests { let request = alice_handle.request(Request::hashes([])); let request_id = request.request_id(); - let event = alice.next_behaviour_event().await; - assert_eq!( - event, - Event::NewRequestRound { - request_id, - peer_id: *bob.local_peer_id(), - reason: NewRequestRoundReason::FromQueue, - } - ); - tokio::spawn(async move { while let Some(event) = bob.next().await { if let Ok(request_response::Event::Message { @@ -927,16 +820,6 @@ pub(crate) mod tests { let request = alice_handle.request(Request::hashes([data_0, data_1])); let request_id = request.request_id(); - let event = alice.next_behaviour_event().await; - assert_eq!( - event, - Event::NewRequestRound { - request_id, - peer_id: *bob.local_peer_id(), - reason: NewRequestRoundReason::FromQueue, - } - ); - tokio::spawn(async move { while let Some(event) = bob.next().await { if let Ok(request_response::Event::Message { @@ -979,11 +862,42 @@ pub(crate) mod tests { ); } + #[tokio::test] + async fn truncated_hashes_response_completed_from_same_peer() { + const DATA_LEN: usize = 6 * 1024 * 1024; + + init_logger(); + + let (mut alice, _alice_db, _data_provider) = new_swarm().await; + let alice_handle = alice.behaviour().handle(); + let (mut bob, bob_db, _data_provider) = new_swarm().await; + + let data_0 = vec![0; DATA_LEN]; + let data_1 = vec![1; DATA_LEN]; + let hash_0 = bob_db.cas().write(&data_0); + let hash_1 = bob_db.cas().write(&data_1); + + alice.connect(&mut bob).await; + tokio::spawn(bob.loop_on_next()); + + let request = alice_handle.request(Request::hashes([hash_0, hash_1])); + let request_id = request.request_id(); + + let event = alice.next_behaviour_event().await; + assert_eq!(event, Event::RequestSucceed { request_id }); + + let response = request.await.unwrap(); + assert_eq!( + response, + Response::Hashes([(hash_0, data_0), (hash_1, data_1)].into()) + ); + } + #[tokio::test] async fn request_response_type_mismatch() { init_logger(); - let alice_config = Config::default().with_max_rounds_per_request(1); + let alice_config = Config::default().with_request_timeout(Duration::ZERO); let (mut alice, _alice_db, _data_provider) = new_swarm_with_config(alice_config).await; let alice_handle = alice.behaviour().handle(); @@ -998,16 +912,6 @@ pub(crate) mod tests { let request = alice_handle.request(Request::hashes([])); let request_id = request.request_id(); - let event = alice.next_behaviour_event().await; - assert_eq!( - event, - Event::NewRequestRound { - request_id, - peer_id: *bob.local_peer_id(), - reason: NewRequestRoundReason::FromQueue, - } - ); - tokio::spawn(async move { while let Some(event) = bob.next().await { if let Ok(request_response::Event::Message { @@ -1031,7 +935,7 @@ pub(crate) mod tests { event, Event::RequestFailed { request_id, - error: RequestFailure::OutOfRounds, + error: RequestFailure::Timeout, } ); @@ -1039,7 +943,7 @@ pub(crate) mod tests { } #[tokio::test] - async fn request_completed_by_3_rounds() { + async fn request_completed_with_3_peers() { init_logger(); let (mut alice, _alice_db, _data_provider) = new_swarm().await; @@ -1062,25 +966,6 @@ pub(crate) mod tests { let request = alice_handle.request(Request::hashes([hello_hash, world_hash, mark_hash])); let request_id = request.request_id(); - // first round - let event = alice.next_behaviour_event().await; - assert_matches!( - event, - Event::NewRequestRound { request_id: rid, reason: NewRequestRoundReason::FromQueue, .. } if rid == request_id - ); - // second round - let event = alice.next_behaviour_event().await; - assert_matches!( - event, - Event::NewRequestRound { request_id: rid, reason: NewRequestRoundReason::PartialData, .. } if rid == request_id - ); - // third round - let event = alice.next_behaviour_event().await; - assert_matches!( - event, - Event::NewRequestRound { request_id: rid, reason: NewRequestRoundReason::PartialData, .. } if rid == request_id - ); - let event = alice.next_behaviour_event().await; assert_eq!(event, Event::RequestSucceed { request_id }); @@ -1105,9 +990,7 @@ pub(crate) mod tests { let (mut alice, _alice_db, _data_provider) = new_swarm().await; let alice_handle = alice.behaviour().handle(); let (mut bob, bob_db, _data_provider) = new_swarm().await; - let bob_peer_id = *bob.local_peer_id(); let (charlie, charlie_db, _data_provider) = new_swarm().await; - let charlie_peer_id = *charlie.local_peer_id(); let charlie_addr = charlie.external_addresses().next().cloned().unwrap(); alice.connect(&mut bob).await; @@ -1119,34 +1002,14 @@ pub(crate) mod tests { let request = alice_handle.request(Request::hashes([hello_hash, world_hash])); let request_id = request.request_id(); - // first round - let event = alice.next_behaviour_event().await; - assert_eq!( - event, - Event::NewRequestRound { - request_id, - peer_id: bob_peer_id, - reason: NewRequestRoundReason::FromQueue - } - ); - - let event = alice.next_behaviour_event().await; - assert_eq!(event, Event::PendingStateRequest { request_id }); + // first attempt + let event = alice.next_behaviour_event().now_or_never(); + assert_eq!(event, None); tokio::spawn(charlie.loop_on_next()); alice.dial_and_wait(charlie_addr).await; - // second round - let event = alice.next_behaviour_event().await; - assert_eq!( - event, - Event::NewRequestRound { - request_id, - peer_id: charlie_peer_id, - reason: NewRequestRoundReason::FromQueue, - } - ); - + // second attempt let event = alice.next_behaviour_event().await; assert_eq!(event, Event::RequestSucceed { request_id }); @@ -1163,11 +1026,13 @@ pub(crate) mod tests { ); } - #[tokio::test] + #[tokio::test(start_paused = true)] async fn unsupported_protocol_handled() { + const REQUEST_TIMEOUT: Duration = Duration::from_secs(2); + init_logger(); - let alice_config = Config::default().with_request_timeout(Duration::from_secs(2)); + let alice_config = Config::default().with_request_timeout(REQUEST_TIMEOUT); let (mut alice, _alice_db, _data_provider) = new_swarm_with_config(alice_config).await; let alice_handle = alice.behaviour().handle(); @@ -1185,18 +1050,11 @@ pub(crate) mod tests { let request = alice_handle.request(Request::hashes([])); let request_id = request.request_id(); - let event = alice.next_behaviour_event().await; - assert_eq!( - event, - Event::NewRequestRound { - request_id, - peer_id: bob_peer_id, - reason: NewRequestRoundReason::FromQueue - } - ); + // activate timer + let event = alice.next_behaviour_event().now_or_never(); + assert_eq!(event, None); - let event = alice.next_behaviour_event().await; - assert_eq!(event, Event::PendingStateRequest { request_id }); + time::advance(REQUEST_TIMEOUT).await; let event = alice.next_behaviour_event().await; assert_eq!( @@ -1236,8 +1094,7 @@ pub(crate) mod tests { async fn retry() { init_logger(); - let alice_config = Config::default().with_max_rounds_per_request(1); - let (mut alice, _alice_db, _data_provider) = new_swarm_with_config(alice_config).await; + let (mut alice, _alice_db, _data_provider) = new_swarm().await; let alice_handle = alice.behaviour().handle(); let mut bob = Swarm::new_ephemeral_tokio(move |_keypair| { InnerBehaviour::new( @@ -1251,12 +1108,6 @@ pub(crate) mod tests { let request = alice_handle.request(Request::hashes([request_key])); let request_id = request.request_id(); - // first round - let event = alice.next_behaviour_event().await; - assert!( - matches!(event, Event::NewRequestRound { request_id: rid, reason: NewRequestRoundReason::FromQueue, .. } if rid == request_id) - ); - let bob_handle = tokio::spawn(async move { while let Some(event) = bob.next().await { if let Ok(request_response::Event::Message { @@ -1279,6 +1130,7 @@ pub(crate) mod tests { time::advance(Config::default().request_timeout).await; + // first attempt let event = alice.next_behaviour_event().await; assert_eq!( event, @@ -1303,12 +1155,7 @@ pub(crate) mod tests { let request = alice_handle.retry(retriable_request); let request_id = request.request_id(); - // retry round - let event = alice.next_behaviour_event().await; - assert!( - matches!(event, Event::NewRequestRound { request_id: rid, reason: NewRequestRoundReason::FromQueue, .. } if rid == request_id) - ); - + // retry attempt let event = alice.next_behaviour_event().await; assert_eq!(event, Event::RequestSucceed { request_id }); @@ -1325,11 +1172,9 @@ pub(crate) mod tests { let (mut alice, _alice_db, alice_data_provider) = new_swarm().await; let alice_handle = alice.behaviour().handle(); - let (mut bob, _bob_db, _data_provider) = new_swarm().await; - let (mut charlie, charlie_db, _data_provider) = new_swarm().await; - let bob_peer_id = *bob.local_peer_id(); + let (mut bob, bob_db, _data_provider) = new_swarm().await; - let expected_response = fill_data_provider(alice_data_provider, charlie_db).await; + let expected_response = fill_data_provider(alice_data_provider, bob_db).await; alice.connect(&mut bob).await; tokio::spawn(bob.loop_on_next()); @@ -1337,24 +1182,6 @@ pub(crate) mod tests { let request = alice_handle.request(Request::program_ids(H256::zero(), 2)); let request_id = request.request_id(); - let event = alice.next_behaviour_event().await; - assert_eq!( - event, - Event::NewRequestRound { - request_id, - peer_id: bob_peer_id, - reason: NewRequestRoundReason::FromQueue, - } - ); - - let event = alice.next_behaviour_event().await; - assert_eq!(event, Event::PendingStateRequest { request_id }); - - alice.connect(&mut charlie).await; - tokio::spawn(charlie.loop_on_next()); - - // `Event::NewRequestRound` skipped by `connect()` above - let event = alice.next_behaviour_event().await; assert_eq!(event, Event::RequestSucceed { request_id }); diff --git a/ethexe/network/src/db_sync/requests.rs b/ethexe/network/src/db_sync/requests.rs index ccd2a1a9ae7..f393569fc1d 100644 --- a/ethexe/network/src/db_sync/requests.rs +++ b/ethexe/network/src/db_sync/requests.rs @@ -20,8 +20,8 @@ use crate::{ db_sync::{ AnnouncesRequest, Config, Event, ExternalDataProvider, HandleResult, HashesRequest, InnerAnnouncesResponse, InnerBehaviour, InnerHashesResponse, InnerProgramIdsRequest, - InnerProgramIdsResponse, InnerRequest, InnerResponse, Metrics, NewRequestRoundReason, - PeerId, ProgramIdsRequest, Request, RequestFailure, RequestId, Response, ValidCodesRequest, + InnerProgramIdsResponse, InnerRequest, InnerResponse, Metrics, PeerId, ProgramIdsRequest, + Request, RequestFailure, RequestId, Response, ValidCodesRequest, }, peer_score::Handle, utils::{ConnectionMap, NoLimits}, @@ -64,7 +64,6 @@ pub(crate) struct OngoingRequests { external_data_provider: Box, // config request_timeout: Duration, - max_rounds_per_request: u32, } impl OngoingRequests { @@ -83,7 +82,6 @@ impl OngoingRequests { peer_score_handle, external_data_provider, request_timeout: config.request_timeout, - max_rounds_per_request: config.max_rounds_per_request, } } @@ -114,7 +112,6 @@ impl OngoingRequests { self.peer_score_handle.clone(), self.external_data_provider.clone_boxed(), self.request_timeout, - self.max_rounds_per_request, ) .boxed(), Some(channel), @@ -214,20 +211,16 @@ impl OngoingRequests { } if let Some(state) = state { - let event = match state { - OngoingRequestState::PendingState => Event::PendingStateRequest { request_id }, - OngoingRequestState::SendRequest(peer, request, reason) => { + match state { + OngoingRequestState::NoPeers => { + + self.pending_events.push_back(Event::NoPeers { request_id }); + }, + OngoingRequestState::SendRequest(peer, request, ) => { let outbound_request_id = behaviour.send_request(&peer, request); self.active_requests.insert(outbound_request_id, request_id); - - Event::NewRequestRound { - request_id, - peer_id: peer, - reason, - } } }; - self.pending_events.push_back(event); } else if let Poll::Ready(res) = poll { let (event, res) = match res { Ok(response) => { @@ -686,41 +679,40 @@ impl OngoingRequest { } } - async fn choose_next_peer(&mut self) -> (PeerId, Option) { + async fn choose_next_peer(&mut self) -> PeerId { let mut event_sent = None; - - let peer = CONTEXT + CONTEXT .poll_fn(|_task_cx, ctx| { - let peer = ctx - .peers - .difference(&self.tried_peers) - .choose_stable(&mut rand::thread_rng()) - .copied(); - self.tried_peers.extend(peer); - - if let Some(peer) = peer { - Poll::Ready(peer) - } else { + if ctx.peers.is_empty() { event_sent.get_or_insert_with(|| { ctx.state - .set(OngoingRequestState::PendingState) + .set(OngoingRequestState::NoPeers) .expect("set only once"); }); + return Poll::Pending; + } - Poll::Pending + loop { + let peer = ctx + .peers + .difference(&self.tried_peers) + .choose_stable(&mut rand::thread_rng()) + .copied(); + + if let Some(peer) = peer { + self.tried_peers.insert(peer); + break Poll::Ready(peer); + } else { + // just retry all peers again + self.tried_peers.clear(); + continue; + } } }) - .await; - - let reason = event_sent.map(|()| NewRequestRoundReason::FromQueue); - (peer, reason) + .await } - async fn send_request( - &mut self, - peer: PeerId, - reason: NewRequestRoundReason, - ) -> Result { + async fn send_request(&mut self, peer: PeerId) -> Result { CONTEXT.with_mut(|ctx| { ctx.state .set(OngoingRequestState::SendRequest( @@ -729,7 +721,6 @@ impl OngoingRequest { .as_ref() .expect("always Some") .inner_request(), - reason, )) .expect("set only once"); }); @@ -747,17 +738,12 @@ impl OngoingRequest { async fn next_round( &mut self, - mut reason: NewRequestRoundReason, peer_score_handle: &Handle, external_data_provider: Box, - ) -> Result { - let (peer, new_reason) = self.choose_next_peer().await; - reason = new_reason.unwrap_or(reason); + ) -> Result { + let peer = self.choose_next_peer().await; - let response = self - .send_request(peer, reason) - .await - .map_err(|()| NewRequestRoundReason::PeerFailed)?; + let response = self.send_request(peer).await?; match self .response_handler @@ -772,13 +758,13 @@ impl OngoingRequest { "response is incomplete from peer {peer}: we are going for a new round" ); self.response_handler = Some(handler); - Err(NewRequestRoundReason::PartialData) + Err(()) } ResponseHandlerResult::Err(handler, err) => { log::warn!("response processing failed for request from {peer}: {err:?}"); peer_score_handle.invalid_data(peer); self.response_handler = Some(handler); - Err(NewRequestRoundReason::PartialData) + Err(()) } } } @@ -788,30 +774,15 @@ impl OngoingRequest { peer_score_handle: Handle, external_data_provider: Box, request_timeout: Duration, - max_rounds_per_request: u32, ) -> Result { let request_loop = async { - let mut rounds = 0; - let mut reason = NewRequestRoundReason::FromQueue; - loop { - if rounds >= max_rounds_per_request { - return Err(RequestFailure::OutOfRounds); - } - rounds += 1; - match self - .next_round( - reason, - &peer_score_handle, - external_data_provider.clone_boxed(), - ) + .next_round(&peer_score_handle, external_data_provider.clone_boxed()) .await { - Ok(response) => return Ok(response), - Err(new_reason) => { - reason = new_reason; - } + Ok(response) => break Ok(response), + Err(()) => continue, }; } }; @@ -826,8 +797,8 @@ impl OngoingRequest { #[derive(Debug)] enum OngoingRequestState { - PendingState, - SendRequest(PeerId, InnerRequest, NewRequestRoundReason), + NoPeers, + SendRequest(PeerId, InnerRequest), } struct OngoingRequestContext { diff --git a/ethexe/network/src/db_sync/responses.rs b/ethexe/network/src/db_sync/responses.rs index 73ab1ba6ef5..b79d44bbb61 100644 --- a/ethexe/network/src/db_sync/responses.rs +++ b/ethexe/network/src/db_sync/responses.rs @@ -22,6 +22,7 @@ use crate::{ InnerProgramIdsResponse, InnerRequest, InnerResponse, ResponseId, }, export::PeerId, + utils::ParityScaleCodec, }; use ethexe_common::{ Announce, HashOf, @@ -29,8 +30,9 @@ use ethexe_common::{ network::{AnnouncesRequest, AnnouncesRequestUntil}, }; use libp2p::request_response; +use parity_scale_codec::{Compact, Encode}; use std::{ - collections::VecDeque, + collections::{BTreeMap, VecDeque}, num::NonZeroU32, task::{Context, Poll}, }; @@ -74,15 +76,35 @@ impl OngoingResponses { db: Box, max_chain_len_for_announces_response: NonZeroU32, ) -> InnerResponse { + const MAX_RESPONSE_SIZE: u64 = ParityScaleCodec::<(), ()>::MAX_RESPONSE_SIZE; + match request { - InnerRequest::Hashes(request) => InnerHashesResponse( - request - .0 - .into_iter() - .filter_map(|hash| Some((hash, db.read_by_hash(hash)?))) - .collect(), - ) - .into(), + InnerRequest::Hashes(request) => { + let mut response = BTreeMap::new(); + let mut entries_size = 0; + + for hash in request.0 { + let Some(data) = db.read_by_hash(hash) else { + continue; + }; + + let entry_size = hash.encoded_size() + data.encoded_size(); + let next_response_size = 1 // InnerResponse discriminant size + + Compact((response.len() + 1) as u64).encoded_size() + + entries_size + + entry_size; + + if next_response_size > MAX_RESPONSE_SIZE as usize { + // don't try to put other hashes data to prevent abusive database reads + break; + } + + entries_size += entry_size; + response.insert(hash, data); + } + + InnerHashesResponse(response).into() + } InnerRequest::ProgramIds(request) => InnerProgramIdsResponse( db.block_announces(request.at) .into_iter() @@ -244,7 +266,10 @@ enum ProcessAnnounceError { #[cfg(test)] mod tests { use super::*; - use crate::{DEFAULT_MAX_CHAIN_LEN_FOR_ANNOUNCES_RESPONSE, db_sync::requests::ResponseHandler}; + use crate::{ + DEFAULT_MAX_CHAIN_LEN_FOR_ANNOUNCES_RESPONSE, + db_sync::{HashesRequest, requests::ResponseHandler}, + }; use ethexe_common::{ Announce, HashOf, ProtocolTimelines, db::{AnnounceStorageRW, DBConfig, GlobalsStorageRW, SetConfig}, @@ -276,6 +301,48 @@ mod tests { db.globals_mutate(|globals| globals.start_announce_hash = start); } + #[test] + fn response_from_db_truncates_hashes_response_at_encoded_limit() { + const ENTRIES_BEFORE_COMPACT_BOUNDARY: u64 = 0b0011_1111; + const MAX_RESPONSE_SIZE: usize = ParityScaleCodec::<(), ()>::MAX_RESPONSE_SIZE as usize; + + let db = Database::memory(); + + let entries = (0..ENTRIES_BEFORE_COMPACT_BOUNDARY as u8) + .map(|i| vec![i]) + .collect::>(); + let entries_size = entries + .iter() + .map(|data| H256::zero().encoded_size() + data.encoded_size()) + .sum::(); + for data in &entries { + db.cas().write(data); + } + + let last_entry_size = MAX_RESPONSE_SIZE + - 1 // `InnerResponse` discriminant + - Compact(ENTRIES_BEFORE_COMPACT_BOUNDARY + 1).encoded_size() + - entries_size + - H256::zero().encoded_size(); + let last_entry = vec![42; last_entry_size]; + let last_entry_hash = db.cas().write(&last_entry); + + let request = entries + .iter() + .map(|data| ethexe_db::hash(data)) + .chain(Some(last_entry_hash)) + .collect(); + let response = OngoingResponses::response_from_db( + HashesRequest(request).into(), + Box::new(db), + DEFAULT_MAX_CHAIN_LEN_FOR_ANNOUNCES_RESPONSE, + ); + + let response = response.unwrap_hashes(); + assert_eq!(response.0.len(), ENTRIES_BEFORE_COMPACT_BOUNDARY as usize); + assert!(InnerResponse::Hashes(response).encoded_size() <= MAX_RESPONSE_SIZE); + } + #[test] fn fails_chain_len_exceeding_max() { let db = Database::memory(); diff --git a/ethexe/network/src/utils.rs b/ethexe/network/src/utils.rs index ab9571a6476..1087bfc75a3 100644 --- a/ethexe/network/src/utils.rs +++ b/ethexe/network/src/utils.rs @@ -45,8 +45,8 @@ use tokio::{time, time::Instant}; pub struct ParityScaleCodec(PhantomData<(Req, Resp)>); impl ParityScaleCodec { - const MAX_REQUEST_SIZE: u64 = 1024 * 1024; - const MAX_RESPONSE_SIZE: u64 = 10 * 1024 * 1024; + pub const MAX_REQUEST_SIZE: u64 = 1024 * 1024; + pub const MAX_RESPONSE_SIZE: u64 = 10 * 1024 * 1024; } #[async_trait]