Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ jobs:

node-router:
executor: rust-docker
resource_class: << pipeline.parameters.large >>
resource_class: << pipeline.parameters.xlarge >>
steps:
- run_serial:
workspace_member: node/router
Expand Down
26 changes: 13 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 26 additions & 9 deletions node/bft/events/src/block_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

use super::*;

use snarkvm::utilities::io_error;

#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub struct BlockRequest {
/// The starting block height (inclusive).
Expand All @@ -24,9 +26,15 @@ pub struct BlockRequest {
}

impl BlockRequest {
/// Initializes a new block request event.
pub fn new(start_height: u32, end_height: u32) -> Self {
Self { start_height, end_height }
/// Creates a new block request and return an error if the range is not well-formed.
pub fn new(start_height: u32, end_height: u32) -> IoResult<Self> {
if start_height == 0 {
Err(io_error("BlockRequest cannot include the genesis block"))
} else if start_height >= end_height {
Err(io_error(format!("BlockRequest must contain a valid range, but was {start_height}..{end_height}")))
} else {
Ok(Self { start_height, end_height })
}
}
}

Expand Down Expand Up @@ -56,21 +64,30 @@ impl FromBytes for BlockRequest {
let start_height = u32::read_le(&mut reader)?;
let end_height = u32::read_le(&mut reader)?;

Ok(Self::new(start_height, end_height))
Self::new(start_height, end_height)
}
}

#[cfg(test)]
pub mod prop_tests {
use crate::BlockRequest;
use crate::{BlockRequest, DataBlocks};

use snarkvm::{
console::network::MainnetV0,
utilities::{FromBytes, ToBytes},
};

use bytes::{Buf, BufMut, BytesMut};
use proptest::prelude::{BoxedStrategy, Strategy, any};
use snarkvm::utilities::{FromBytes, ToBytes};
use proptest::prelude::prop_compose;
use test_strategy::proptest;

pub fn any_block_request() -> BoxedStrategy<BlockRequest> {
any::<(u32, u32)>().prop_map(|(start_height, end_height)| BlockRequest::new(start_height, end_height)).boxed()
const MAX_RANGE: u32 = DataBlocks::<MainnetV0>::MAXIMUM_NUMBER_OF_BLOCKS as u32;

prop_compose! {
/// Creates a block request with a start point and a valid range.
pub fn any_block_request()(start_height in 1..u32::MAX-MAX_RANGE, num_blocks in 1..MAX_RANGE) -> BlockRequest {
BlockRequest { start_height, end_height: start_height + num_blocks }
}
}

#[proptest]
Expand Down
129 changes: 100 additions & 29 deletions node/bft/events/src/block_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,24 @@

use super::*;

#[derive(Clone, PartialEq, Eq)]
use snarkvm::{
console::network::ConsensusVersion,
ledger::narwhal::Data,
prelude::{FromBytes, ToBytes},
utilities::io_error,
};

use std::borrow::Cow;

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct BlockResponse<N: Network> {
/// The original block request.
pub request: BlockRequest,
/// The blocks.
pub blocks: Data<DataBlocks<N>>,
/// The consensus version at the height of the *last* block in this response.
/// This enables detecting if the current node, or the peer, missed an upgrade. Its value is `None` for messages with version < 2.
pub latest_consensus_version: Option<ConsensusVersion>,
}

impl<N: Network> EventTrait for BlockResponse<N> {
Expand All @@ -37,25 +49,59 @@ impl<N: Network> EventTrait for BlockResponse<N> {
}
}

impl<N: Network> BlockResponse<N> {
// Constructs a new block response.
pub fn new(request: BlockRequest, blocks: DataBlocks<N>, latest_consensus_version: ConsensusVersion) -> Self {
Self { request, blocks: Data::Object(blocks), latest_consensus_version: Some(latest_consensus_version) }
}
}

impl<N: Network> ToBytes for BlockResponse<N> {
fn write_le<W: Write>(&self, mut writer: W) -> IoResult<()> {
self.request.write_le(&mut writer)?;
self.blocks.write_le(&mut writer)
fn write_le<W: io::Write>(&self, mut writer: W) -> io::Result<()> {
// Block responses without a consesnsus version have message version `1`, other have to `2` (or greater in the future).
let Some(latest_consensus_version) = self.latest_consensus_version else {
return Err(io_error("Can only serialize block responses of version 2 or greater"));
};

// Send the consensus version starting with V12.
if latest_consensus_version > ConsensusVersion::V11 {
// Currently, we simply write four zero bytes as the version number,
// because we know a valid request start height is always non-zero.
// In the future we can encode the real version here.
0u32.write_le(&mut writer)?;
self.request.write_le(&mut writer)?;
self.blocks.write_le(&mut writer)?;
latest_consensus_version.write_le(&mut writer)
} else {
self.request.write_le(&mut writer)?;
self.blocks.write_le(&mut writer)
}
}
}

impl<N: Network> FromBytes for BlockResponse<N> {
fn read_le<R: Read>(mut reader: R) -> IoResult<Self> {
let request = BlockRequest::read_le(&mut reader)?;
fn read_le<R: io::Read>(mut reader: R) -> io::Result<Self> {
let start_height = u32::read_le(&mut reader)?;

// An invalid start height as the first four bytes indicates that this message
// contains the consensus version of the last block.
let contains_consensus_version = start_height == 0;

// If this message type does not contain the consensus version, use the first four bytes as the start height.
// Otherwise, read the full request.
let request = if contains_consensus_version {
BlockRequest::read_le(&mut reader)?
} else {
let end_height = u32::read_le(&mut reader)?;
BlockRequest::new(start_height, end_height)?
};

let blocks = Data::read_le(&mut reader)?;

Ok(Self { request, blocks })
}
}
let latest_consensus_version =
if contains_consensus_version { Some(FromBytes::read_le(&mut reader)?) } else { None };

impl<N: Network> std::fmt::Debug for BlockResponse<N> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", self.name())
Ok(Self { request, blocks, latest_consensus_version })
}
}

Expand Down Expand Up @@ -136,32 +182,30 @@ impl<N: Network> FromBytes for DataBlocks<N> {

#[cfg(test)]
pub mod prop_tests {
use crate::{BlockResponse, DataBlocks, block_request::prop_tests::any_block_request};
use crate::{BlockRequest, BlockResponse, DataBlocks, block_request::prop_tests::any_block_request};

use snarkvm::{
ledger::test_helpers::sample_genesis_block,
prelude::{FromBytes, TestRng, ToBytes, block::Block, narwhal::Data},
console::network::ConsensusVersion,
ledger::{narwhal::Data, test_helpers::sample_genesis_block},
prelude::{FromBytes, TestRng, ToBytes},
};

use bytes::{Buf, BufMut, BytesMut};
use proptest::{
collection::vec,
prelude::{BoxedStrategy, Strategy, any},
};
use proptest::prelude::{BoxedStrategy, Strategy, any};
use test_strategy::proptest;

type CurrentNetwork = snarkvm::prelude::MainnetV0;

pub fn any_block() -> BoxedStrategy<Block<CurrentNetwork>> {
any::<u64>().prop_map(|seed| sample_genesis_block(&mut TestRng::fixed(seed))).boxed()
}

pub fn any_data_blocks() -> BoxedStrategy<DataBlocks<CurrentNetwork>> {
vec(any_block(), 0..=1).prop_map(DataBlocks).boxed()
}

pub fn any_block_response() -> BoxedStrategy<BlockResponse<CurrentNetwork>> {
(any_block_request(), any_data_blocks())
.prop_map(|(request, data_blocks)| BlockResponse { request, blocks: Data::Object(data_blocks) })
(any_block_request(), any::<u64>())
.prop_map(|(request, seed)| {
// Generate blocks that match the requests range.
let mut rng = TestRng::from_seed(seed);
let blocks: Vec<_> =
(request.start_height..request.end_height).map(|_| sample_genesis_block(&mut rng)).collect();

BlockResponse::new(request, DataBlocks(blocks), ConsensusVersion::V11)
})
.boxed()
}

Expand All @@ -170,10 +214,37 @@ pub mod prop_tests {
let mut bytes = BytesMut::default().writer();
block_response.write_le(&mut bytes).unwrap();
let decoded = BlockResponse::<CurrentNetwork>::read_le(&mut bytes.into_inner().reader()).unwrap();

assert_eq!(block_response.request, decoded.request);
assert_eq!(block_response.latest_consensus_version, decoded.latest_consensus_version);
assert_eq!(
block_response.blocks.deserialize_blocking().unwrap(),
decoded.blocks.deserialize_blocking().unwrap(),
);
}

/// Generates a block response encoded in the old format, and ensures it is still deserializable.
#[proptest]
fn deserialize_version1(
#[strategy(any_block_request())] request: BlockRequest,
#[strategy(any::<u64>())] seed: u64,
) {
let mut rng = TestRng::from_seed(seed);

let blocks = DataBlocks(
(request.start_height..request.end_height).map(|_| sample_genesis_block(&mut rng)).collect::<Vec<_>>(),
);

// Write the response without message or consesnsus version.
let mut data = Vec::new();
request.write_le(&mut data).unwrap();
Data::Object(blocks.clone()).write_le(&mut data).unwrap();

// Deserialize it.
let response = BlockResponse::read_le(data.reader()).unwrap();

assert_eq!(response.request, request);
assert_eq!(response.latest_consensus_version, None);
assert_eq!(response.blocks.deserialize_blocking().unwrap(), blocks);
}
}
17 changes: 10 additions & 7 deletions node/bft/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,11 +581,14 @@ impl<N: Network> Gateway<N> {
bail!("Block request from '{peer_ip}' has an excessive range ({start_height}..{end_height})")
}

// End height is exclusive.
let latest_consensus_version = N::CONSENSUS_VERSION(end_height - 1)?;

let self_ = self.clone();
let blocks = match task::spawn_blocking(move || {
// Retrieve the blocks within the requested range.
match self_.ledger.get_blocks(start_height..end_height) {
Ok(blocks) => Ok(Data::Object(DataBlocks(blocks))),
Ok(blocks) => Ok(DataBlocks(blocks)),
Err(error) => bail!("Missing blocks {start_height} to {end_height} from ledger - {error}"),
}
})
Expand All @@ -599,17 +602,15 @@ impl<N: Network> Gateway<N> {
let self_ = self.clone();
tokio::spawn(async move {
// Send the `BlockResponse` message to the peer.
let event = Event::BlockResponse(BlockResponse { request: block_request, blocks });
let event =
Event::BlockResponse(BlockResponse::new(block_request, blocks, latest_consensus_version));
Transport::send(&self_, peer_ip, event).await;
});
Ok(true)
}
Event::BlockResponse(block_response) => {
Event::BlockResponse(BlockResponse { request, latest_consensus_version, blocks, .. }) => {
// Process the block response. Except for some tests, there is always a sync sender.
if let Some(sync_sender) = self.sync_sender.get() {
// Retrieve the block response.
let BlockResponse { request, blocks } = block_response;

// Check the response corresponds to a request.
if !self.cache.remove_outbound_block_request(peer_ip, &request) {
bail!("Unsolicited block response from '{peer_ip}'")
Expand All @@ -632,7 +633,9 @@ impl<N: Network> Gateway<N> {
// Ensure the block response is well-formed.
blocks.ensure_response_is_well_formed(peer_ip, request.start_height, request.end_height)?;
// Send the blocks to the sync module.
if let Err(err) = sync_sender.insert_block_response(peer_ip, blocks.0).await {
if let Err(err) =
sync_sender.insert_block_response(peer_ip, blocks.0, latest_consensus_version).await
{
warn!("Unable to process block response from '{peer_ip}' - {err}");
}
}
Expand Down
Loading