diff --git a/.circleci/config.yml b/.circleci/config.yml index f728ef6aa9..3d4183785e 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -297,7 +297,7 @@ jobs: node-router: executor: rust-docker - resource_class: << pipeline.parameters.large >> + resource_class: << pipeline.parameters.xlarge >> steps: - run_serial: workspace_member: node/router diff --git a/Cargo.lock b/Cargo.lock index e6d25bda9d..008878e955 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -587,9 +587,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.41" +version = "1.2.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac9fe6cdbb24b6ade63616c0a0688e45bb56732262c158df3c0c4bea4ca47cb7" +checksum = "81bbf3b3619004ad9bd139f62a9ab5cfe467f307455a0d307b0cf58bf070feaa" dependencies = [ "find-msvc-tools", "jobserver", @@ -1229,9 +1229,9 @@ dependencies = [ [[package]] name = "document-features" -version = "0.2.11" +version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95249b50c6c185bee49034bcb378a49dc2b5dff0be90ff6616d31d64febab05d" +checksum = "d4b8a88685455ed29a21542a33abd9cb6510b6b129abadabdcef0f4c55bc8f61" dependencies = [ "litrs", ] @@ -1429,9 +1429,9 @@ checksum = "52051878f80a721bb68ebfbc930e07b65ba72f2da88968ea5c06fd6ca3d3a127" [[package]] name = "flate2" -version = "1.1.4" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc5a4e564e38c699f2880d3fda590bedc2e69f3f84cd48b457bd892ce61d0aa9" +checksum = "bfe33edd8e85a12a67454e37f8c75e730830d83e313556ab9ebf9ee7fbeb3bfb" dependencies = [ "crc32fast", "miniz_oxide", @@ -1834,11 +1834,11 @@ dependencies = [ [[package]] name = "home" -version = "0.5.11" +version = "0.5.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "589533453244b0995c858700322199b2becb13b627df2851f64a2775d024abcf" +checksum = "cc627f471c528ff0c4a49e1d5e60450c8f6461dd6d10ba9dcd3a61d3dff7728d" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -2469,9 +2469,9 @@ checksum = "241eaef5fd12c88705a01fc1066c48c4b36e0dd4377dcdc7ec3942cea7a69956" [[package]] name = "litrs" -version = "0.4.2" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5e54036fe321fd421e10d732f155734c4e4afd610dd556d9a82833ab3ee0bed" +checksum = "11d3d7f243d5c5a8b9bb5d6dd2b1602c0cb0b9db1621bafc7ed66e35ff9fe092" [[package]] name = "lock_api" @@ -3088,9 +3088,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.102" +version = "1.0.103" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e0f6df8eaa422d97d72edcd152e1451618fed47fabbdbd5a8864167b1d4aff7" +checksum = "5ee95bc4ef87b8d5ba32e8b7714ccc834865276eab0aed5c9958d00ec45f49e8" dependencies = [ "unicode-ident", ] diff --git a/node/bft/events/src/block_request.rs b/node/bft/events/src/block_request.rs index a2eaf96cfc..ca4e3e13e4 100644 --- a/node/bft/events/src/block_request.rs +++ b/node/bft/events/src/block_request.rs @@ -15,6 +15,8 @@ use super::*; +use snarkvm::utilities::io_error; + #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] pub struct BlockRequest { /// The starting block height (inclusive). @@ -24,9 +26,15 @@ pub struct BlockRequest { } impl BlockRequest { - /// Initializes a new block request event. - pub fn new(start_height: u32, end_height: u32) -> Self { - Self { start_height, end_height } + /// Creates a new block request and return an error if the range is not well-formed. + pub fn new(start_height: u32, end_height: u32) -> IoResult { + if start_height == 0 { + Err(io_error("BlockRequest cannot include the genesis block")) + } else if start_height >= end_height { + Err(io_error(format!("BlockRequest must contain a valid range, but was {start_height}..{end_height}"))) + } else { + Ok(Self { start_height, end_height }) + } } } @@ -56,21 +64,30 @@ impl FromBytes for BlockRequest { let start_height = u32::read_le(&mut reader)?; let end_height = u32::read_le(&mut reader)?; - Ok(Self::new(start_height, end_height)) + Self::new(start_height, end_height) } } #[cfg(test)] pub mod prop_tests { - use crate::BlockRequest; + use crate::{BlockRequest, DataBlocks}; + + use snarkvm::{ + console::network::MainnetV0, + utilities::{FromBytes, ToBytes}, + }; use bytes::{Buf, BufMut, BytesMut}; - use proptest::prelude::{BoxedStrategy, Strategy, any}; - use snarkvm::utilities::{FromBytes, ToBytes}; + use proptest::prelude::prop_compose; use test_strategy::proptest; - pub fn any_block_request() -> BoxedStrategy { - any::<(u32, u32)>().prop_map(|(start_height, end_height)| BlockRequest::new(start_height, end_height)).boxed() + const MAX_RANGE: u32 = DataBlocks::::MAXIMUM_NUMBER_OF_BLOCKS as u32; + + prop_compose! { + /// Creates a block request with a start point and a valid range. + pub fn any_block_request()(start_height in 1..u32::MAX-MAX_RANGE, num_blocks in 1..MAX_RANGE) -> BlockRequest { + BlockRequest { start_height, end_height: start_height + num_blocks } + } } #[proptest] diff --git a/node/bft/events/src/block_response.rs b/node/bft/events/src/block_response.rs index 2e85184cb5..c2b5e1e993 100644 --- a/node/bft/events/src/block_response.rs +++ b/node/bft/events/src/block_response.rs @@ -15,12 +15,24 @@ use super::*; -#[derive(Clone, PartialEq, Eq)] +use snarkvm::{ + console::network::ConsensusVersion, + ledger::narwhal::Data, + prelude::{FromBytes, ToBytes}, + utilities::io_error, +}; + +use std::borrow::Cow; + +#[derive(Clone, Debug, PartialEq, Eq)] pub struct BlockResponse { /// The original block request. pub request: BlockRequest, /// The blocks. pub blocks: Data>, + /// The consensus version at the height of the *last* block in this response. + /// This enables detecting if the current node, or the peer, missed an upgrade. Its value is `None` for messages with version < 2. + pub latest_consensus_version: Option, } impl EventTrait for BlockResponse { @@ -37,25 +49,59 @@ impl EventTrait for BlockResponse { } } +impl BlockResponse { + // Constructs a new block response. + pub fn new(request: BlockRequest, blocks: DataBlocks, latest_consensus_version: ConsensusVersion) -> Self { + Self { request, blocks: Data::Object(blocks), latest_consensus_version: Some(latest_consensus_version) } + } +} + impl ToBytes for BlockResponse { - fn write_le(&self, mut writer: W) -> IoResult<()> { - self.request.write_le(&mut writer)?; - self.blocks.write_le(&mut writer) + fn write_le(&self, mut writer: W) -> io::Result<()> { + // Block responses without a consesnsus version have message version `1`, other have to `2` (or greater in the future). + let Some(latest_consensus_version) = self.latest_consensus_version else { + return Err(io_error("Can only serialize block responses of version 2 or greater")); + }; + + // Send the consensus version starting with V12. + if latest_consensus_version > ConsensusVersion::V11 { + // Currently, we simply write four zero bytes as the version number, + // because we know a valid request start height is always non-zero. + // In the future we can encode the real version here. + 0u32.write_le(&mut writer)?; + self.request.write_le(&mut writer)?; + self.blocks.write_le(&mut writer)?; + latest_consensus_version.write_le(&mut writer) + } else { + self.request.write_le(&mut writer)?; + self.blocks.write_le(&mut writer) + } } } impl FromBytes for BlockResponse { - fn read_le(mut reader: R) -> IoResult { - let request = BlockRequest::read_le(&mut reader)?; + fn read_le(mut reader: R) -> io::Result { + let start_height = u32::read_le(&mut reader)?; + + // An invalid start height as the first four bytes indicates that this message + // contains the consensus version of the last block. + let contains_consensus_version = start_height == 0; + + // If this message type does not contain the consensus version, use the first four bytes as the start height. + // Otherwise, read the full request. + let request = if contains_consensus_version { + BlockRequest::read_le(&mut reader)? + } else { + let end_height = u32::read_le(&mut reader)?; + BlockRequest::new(start_height, end_height)? + }; + let blocks = Data::read_le(&mut reader)?; - Ok(Self { request, blocks }) - } -} + let latest_consensus_version = + if contains_consensus_version { Some(FromBytes::read_le(&mut reader)?) } else { None }; -impl std::fmt::Debug for BlockResponse { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{}", self.name()) + Ok(Self { request, blocks, latest_consensus_version }) } } @@ -136,32 +182,30 @@ impl FromBytes for DataBlocks { #[cfg(test)] pub mod prop_tests { - use crate::{BlockResponse, DataBlocks, block_request::prop_tests::any_block_request}; + use crate::{BlockRequest, BlockResponse, DataBlocks, block_request::prop_tests::any_block_request}; + use snarkvm::{ - ledger::test_helpers::sample_genesis_block, - prelude::{FromBytes, TestRng, ToBytes, block::Block, narwhal::Data}, + console::network::ConsensusVersion, + ledger::{narwhal::Data, test_helpers::sample_genesis_block}, + prelude::{FromBytes, TestRng, ToBytes}, }; use bytes::{Buf, BufMut, BytesMut}; - use proptest::{ - collection::vec, - prelude::{BoxedStrategy, Strategy, any}, - }; + use proptest::prelude::{BoxedStrategy, Strategy, any}; use test_strategy::proptest; type CurrentNetwork = snarkvm::prelude::MainnetV0; - pub fn any_block() -> BoxedStrategy> { - any::().prop_map(|seed| sample_genesis_block(&mut TestRng::fixed(seed))).boxed() - } - - pub fn any_data_blocks() -> BoxedStrategy> { - vec(any_block(), 0..=1).prop_map(DataBlocks).boxed() - } - pub fn any_block_response() -> BoxedStrategy> { - (any_block_request(), any_data_blocks()) - .prop_map(|(request, data_blocks)| BlockResponse { request, blocks: Data::Object(data_blocks) }) + (any_block_request(), any::()) + .prop_map(|(request, seed)| { + // Generate blocks that match the requests range. + let mut rng = TestRng::from_seed(seed); + let blocks: Vec<_> = + (request.start_height..request.end_height).map(|_| sample_genesis_block(&mut rng)).collect(); + + BlockResponse::new(request, DataBlocks(blocks), ConsensusVersion::V11) + }) .boxed() } @@ -170,10 +214,37 @@ pub mod prop_tests { let mut bytes = BytesMut::default().writer(); block_response.write_le(&mut bytes).unwrap(); let decoded = BlockResponse::::read_le(&mut bytes.into_inner().reader()).unwrap(); + assert_eq!(block_response.request, decoded.request); + assert_eq!(block_response.latest_consensus_version, decoded.latest_consensus_version); assert_eq!( block_response.blocks.deserialize_blocking().unwrap(), decoded.blocks.deserialize_blocking().unwrap(), ); } + + /// Generates a block response encoded in the old format, and ensures it is still deserializable. + #[proptest] + fn deserialize_version1( + #[strategy(any_block_request())] request: BlockRequest, + #[strategy(any::())] seed: u64, + ) { + let mut rng = TestRng::from_seed(seed); + + let blocks = DataBlocks( + (request.start_height..request.end_height).map(|_| sample_genesis_block(&mut rng)).collect::>(), + ); + + // Write the response without message or consesnsus version. + let mut data = Vec::new(); + request.write_le(&mut data).unwrap(); + Data::Object(blocks.clone()).write_le(&mut data).unwrap(); + + // Deserialize it. + let response = BlockResponse::read_le(data.reader()).unwrap(); + + assert_eq!(response.request, request); + assert_eq!(response.latest_consensus_version, None); + assert_eq!(response.blocks.deserialize_blocking().unwrap(), blocks); + } } diff --git a/node/bft/src/gateway.rs b/node/bft/src/gateway.rs index 7404bb699c..2897522b44 100644 --- a/node/bft/src/gateway.rs +++ b/node/bft/src/gateway.rs @@ -581,11 +581,14 @@ impl Gateway { bail!("Block request from '{peer_ip}' has an excessive range ({start_height}..{end_height})") } + // End height is exclusive. + let latest_consensus_version = N::CONSENSUS_VERSION(end_height - 1)?; + let self_ = self.clone(); let blocks = match task::spawn_blocking(move || { // Retrieve the blocks within the requested range. match self_.ledger.get_blocks(start_height..end_height) { - Ok(blocks) => Ok(Data::Object(DataBlocks(blocks))), + Ok(blocks) => Ok(DataBlocks(blocks)), Err(error) => bail!("Missing blocks {start_height} to {end_height} from ledger - {error}"), } }) @@ -599,17 +602,15 @@ impl Gateway { let self_ = self.clone(); tokio::spawn(async move { // Send the `BlockResponse` message to the peer. - let event = Event::BlockResponse(BlockResponse { request: block_request, blocks }); + let event = + Event::BlockResponse(BlockResponse::new(block_request, blocks, latest_consensus_version)); Transport::send(&self_, peer_ip, event).await; }); Ok(true) } - Event::BlockResponse(block_response) => { + Event::BlockResponse(BlockResponse { request, latest_consensus_version, blocks, .. }) => { // Process the block response. Except for some tests, there is always a sync sender. if let Some(sync_sender) = self.sync_sender.get() { - // Retrieve the block response. - let BlockResponse { request, blocks } = block_response; - // Check the response corresponds to a request. if !self.cache.remove_outbound_block_request(peer_ip, &request) { bail!("Unsolicited block response from '{peer_ip}'") @@ -632,7 +633,9 @@ impl Gateway { // Ensure the block response is well-formed. blocks.ensure_response_is_well_formed(peer_ip, request.start_height, request.end_height)?; // Send the blocks to the sync module. - if let Err(err) = sync_sender.insert_block_response(peer_ip, blocks.0).await { + if let Err(err) = + sync_sender.insert_block_response(peer_ip, blocks.0, latest_consensus_version).await + { warn!("Unable to process block response from '{peer_ip}' - {err}"); } } diff --git a/node/bft/src/helpers/channels.rs b/node/bft/src/helpers/channels.rs index 19bac87a56..5cca4ddddf 100644 --- a/node/bft/src/helpers/channels.rs +++ b/node/bft/src/helpers/channels.rs @@ -231,7 +231,8 @@ pub fn init_worker_channels() -> (WorkerSender, WorkerReceiver #[derive(Debug)] pub struct SyncSender { - pub tx_block_sync_insert_block_response: mpsc::Sender<(SocketAddr, Vec>, oneshot::Sender>)>, + pub tx_block_sync_insert_block_response: + mpsc::Sender<(SocketAddr, Vec>, Option, oneshot::Sender>)>, pub tx_block_sync_remove_peer: mpsc::Sender, pub tx_block_sync_update_peer_locators: mpsc::Sender<(SocketAddr, BlockLocators, oneshot::Sender>)>, pub tx_certificate_request: mpsc::Sender<(SocketAddr, CertificateRequest)>, @@ -253,14 +254,21 @@ impl SyncSender { } /// Sends the request to insert a new block response. - pub async fn insert_block_response(&self, peer_ip: SocketAddr, blocks: Vec>) -> Result<()> { + pub async fn insert_block_response( + &self, + peer_ip: SocketAddr, + blocks: Vec>, + latest_consensus_version: Option, + ) -> Result<()> { // Initialize a callback sender and receiver. let (callback_sender, callback_receiver) = oneshot::channel(); // Send the request to advance with sync blocks. // This `tx_block_sync_advance_with_sync_blocks.send()` call // causes the `rx_block_sync_advance_with_sync_blocks.recv()` call // in one of the loops in [`Sync::run()`] to return. - self.tx_block_sync_insert_block_response.send((peer_ip, blocks, callback_sender)).await?; + self.tx_block_sync_insert_block_response + .send((peer_ip, blocks, latest_consensus_version, callback_sender)) + .await?; // Await the callback to continue. callback_receiver.await? } @@ -268,7 +276,8 @@ impl SyncSender { #[derive(Debug)] pub struct SyncReceiver { - pub rx_block_sync_insert_block_response: mpsc::Receiver<(SocketAddr, Vec>, oneshot::Sender>)>, + pub rx_block_sync_insert_block_response: + mpsc::Receiver<(SocketAddr, Vec>, Option, oneshot::Sender>)>, pub rx_block_sync_remove_peer: mpsc::Receiver, pub rx_block_sync_update_peer_locators: mpsc::Receiver<(SocketAddr, BlockLocators, oneshot::Sender>)>, pub rx_certificate_request: mpsc::Receiver<(SocketAddr, CertificateRequest)>, diff --git a/node/bft/src/sync/mod.rs b/node/bft/src/sync/mod.rs index ae3914a81c..aad60073a4 100644 --- a/node/bft/src/sync/mod.rs +++ b/node/bft/src/sync/mod.rs @@ -25,8 +25,12 @@ use snarkos_node_bft_events::{CertificateRequest, CertificateResponse, Event}; use snarkos_node_bft_ledger_service::LedgerService; use snarkos_node_network::PeerPoolHandling; use snarkos_node_sync::{BLOCK_REQUEST_BATCH_DELAY, BlockSync, Ping, PrepareSyncRequest, locators::BlockLocators}; + use snarkvm::{ - console::{network::Network, types::Field}, + console::{ + network::{ConsensusVersion, Network}, + types::Field, + }, ledger::{authority::Authority, block::Block, narwhal::BatchCertificate}, prelude::{cfg_into_iter, cfg_iter}, utilities::log_error, @@ -235,8 +239,10 @@ impl Sync { // which causes the `rx_block_sync_advance_with_sync_blocks.recv()` call below to return. let self_ = self.clone(); self.spawn(async move { - while let Some((peer_ip, blocks, callback)) = rx_block_sync_insert_block_response.recv().await { - callback.send(self_.insert_block_response(peer_ip, blocks).await).ok(); + while let Some((peer_ip, blocks, latest_consensus_version, callback)) = + rx_block_sync_insert_block_response.recv().await + { + callback.send(self_.insert_block_response(peer_ip, blocks, latest_consensus_version).await).ok(); } }); @@ -351,9 +357,14 @@ impl Sync { // Callbacks used when receiving messages from the Gateway impl Sync { /// We received a block response and can (possibly) advance synchronization. - async fn insert_block_response(&self, peer_ip: SocketAddr, blocks: Vec>) -> Result<()> { + async fn insert_block_response( + &self, + peer_ip: SocketAddr, + blocks: Vec>, + latest_consensus_version: Option, + ) -> Result<()> { // Verify that the response is valid and add it to block sync. - self.block_sync.insert_block_responses(peer_ip, blocks) + self.block_sync.insert_block_responses(peer_ip, blocks, latest_consensus_version) // No need to advance block sync here, as the new response will // notify the incoming task. diff --git a/node/router/messages/src/block_request.rs b/node/router/messages/src/block_request.rs index 62ab5b66fd..cfd62f6e8a 100644 --- a/node/router/messages/src/block_request.rs +++ b/node/router/messages/src/block_request.rs @@ -15,72 +15,17 @@ use super::*; -use snarkvm::prelude::{FromBytes, ToBytes}; +use snarkos_node_bft_events::EventTrait; use std::borrow::Cow; -#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] -pub struct BlockRequest { - /// The starting block height (inclusive). - pub start_height: u32, - /// The ending block height (exclusive). - pub end_height: u32, -} +/// Re-export the BlockRequest structure from BFT. +pub use snarkos_node_bft_events::BlockRequest; impl MessageTrait for BlockRequest { /// Returns the message name. #[inline] fn name(&self) -> Cow<'static, str> { - let start = self.start_height; - let end = self.end_height; - match start + 1 == end { - true => format!("BlockRequest {start}"), - false => format!("BlockRequest {start}..{end}"), - } - .into() - } -} - -impl ToBytes for BlockRequest { - fn write_le(&self, mut writer: W) -> io::Result<()> { - self.start_height.write_le(&mut writer)?; - self.end_height.write_le(&mut writer)?; - Ok(()) - } -} - -impl FromBytes for BlockRequest { - fn read_le(mut reader: R) -> io::Result { - let start_height = u32::read_le(&mut reader)?; - let end_height = u32::read_le(&mut reader)?; - Ok(Self { start_height, end_height }) - } -} - -impl Display for BlockRequest { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - write!(f, "{}..{}", self.start_height, self.end_height) - } -} - -#[cfg(test)] -pub mod prop_tests { - use crate::BlockRequest; - use snarkvm::utilities::{FromBytes, ToBytes}; - - use bytes::{Buf, BufMut, BytesMut}; - use proptest::prelude::{BoxedStrategy, Strategy, any}; - use test_strategy::proptest; - - pub fn any_block_request() -> BoxedStrategy { - any::<(u32, u32)>().prop_map(|(start_height, end_height)| BlockRequest { start_height, end_height }).boxed() - } - - #[proptest] - fn block_request_roundtrip(#[strategy(any_block_request())] block_request: BlockRequest) { - let mut bytes = BytesMut::default().writer(); - block_request.write_le(&mut bytes).unwrap(); - let decoded = BlockRequest::read_le(&mut bytes.into_inner().reader()).unwrap(); - assert_eq![decoded, block_request]; + EventTrait::name(self) } } diff --git a/node/router/messages/src/block_response.rs b/node/router/messages/src/block_response.rs index 20b03cb9d5..1c02e79248 100644 --- a/node/router/messages/src/block_response.rs +++ b/node/router/messages/src/block_response.rs @@ -15,91 +15,17 @@ use super::*; -use snarkvm::{ - ledger::narwhal::Data, - prelude::{FromBytes, ToBytes}, -}; +use snarkos_node_bft_events::EventTrait; use std::borrow::Cow; -#[derive(Clone, Debug, PartialEq, Eq)] -pub struct BlockResponse { - /// The original block request. - pub request: BlockRequest, - /// The blocks. - pub blocks: Data>, -} +// Re-export BlockResponse from BFT +pub use snarkos_node_bft_events::BlockResponse; impl MessageTrait for BlockResponse { - /// Returns the message name. + /// Returns the event name. #[inline] fn name(&self) -> Cow<'static, str> { - let start = self.request.start_height; - let end = self.request.end_height; - match start + 1 == end { - true => format!("BlockResponse {start}"), - false => format!("BlockResponse {start}..{end}"), - } - .into() - } -} - -impl ToBytes for BlockResponse { - fn write_le(&self, mut writer: W) -> io::Result<()> { - self.request.write_le(&mut writer)?; - self.blocks.write_le(writer) - } -} - -impl FromBytes for BlockResponse { - fn read_le(mut reader: R) -> io::Result { - let request = BlockRequest::read_le(&mut reader)?; - let blocks = Data::read_le(reader)?; - Ok(Self { request, blocks }) - } -} - -#[cfg(test)] -pub mod prop_tests { - use crate::{BlockResponse, DataBlocks, block_request::prop_tests::any_block_request}; - use snarkvm::{ - ledger::test_helpers::sample_genesis_block, - prelude::{block::Block, narwhal::Data}, - utilities::{FromBytes, TestRng, ToBytes}, - }; - - use bytes::{Buf, BufMut, BytesMut}; - use proptest::{ - collection::vec, - prelude::{BoxedStrategy, Strategy, any}, - }; - use test_strategy::proptest; - - type CurrentNetwork = snarkvm::prelude::MainnetV0; - - pub fn any_block() -> BoxedStrategy> { - any::().prop_map(|seed| sample_genesis_block(&mut TestRng::fixed(seed))).boxed() - } - - pub fn any_data_blocks() -> BoxedStrategy> { - vec(any_block(), 0..=1).prop_map(DataBlocks).boxed() - } - - pub fn any_block_response() -> BoxedStrategy> { - (any_block_request(), any_data_blocks()) - .prop_map(|(request, data_blocks)| BlockResponse { request, blocks: Data::Object(data_blocks) }) - .boxed() - } - - #[proptest] - fn block_response_roundtrip(#[strategy(any_block_response())] block_response: BlockResponse) { - let mut bytes = BytesMut::default().writer(); - block_response.write_le(&mut bytes).unwrap(); - let decoded = BlockResponse::::read_le(&mut bytes.into_inner().reader()).unwrap(); - assert_eq!(block_response.request, decoded.request); - assert_eq!( - block_response.blocks.deserialize_blocking().unwrap(), - decoded.blocks.deserialize_blocking().unwrap(), - ); + EventTrait::name(self) } } diff --git a/node/router/messages/src/lib.rs b/node/router/messages/src/lib.rs index a84f9b72c6..144be28538 100644 --- a/node/router/messages/src/lib.rs +++ b/node/router/messages/src/lib.rs @@ -75,13 +75,7 @@ use snarkvm::prelude::{ puzzle::{Solution, SolutionID}, }; -use std::{ - borrow::Cow, - fmt, - fmt::{Display, Formatter}, - io, - net::SocketAddr, -}; +use std::{borrow::Cow, io, net::SocketAddr}; pub trait MessageTrait: ToBytes + FromBytes { /// Returns the message name. diff --git a/node/router/src/inbound.rs b/node/router/src/inbound.rs index 8efaea796c..0652d4e58f 100644 --- a/node/router/src/inbound.rs +++ b/node/router/src/inbound.rs @@ -30,6 +30,7 @@ use crate::{ }; use snarkos_node_tcp::protocols::Reading; use snarkvm::prelude::{ + ConsensusVersion, Network, block::{Block, Header, Transaction}, puzzle::Solution, @@ -124,13 +125,12 @@ pub trait Inbound: Reading + Outbound { false => bail!("Peer '{peer_ip}' sent an invalid block request"), } } - Message::BlockResponse(message) => { - let BlockResponse { request, blocks } = message; - + Message::BlockResponse(BlockResponse { request, latest_consensus_version, blocks, .. }) => { // Remove the block request, checking if this node previously sent a block request to this peer. if !self.router().cache.remove_outbound_block_request(peer_ip, &request) { bail!("Peer '{peer_ip}' is not following the protocol (unexpected block response)") } + // Perform the deferred non-blocking deserialization of the blocks. // The deserialization can take a long time (minutes). We should not be running // this on a blocking task, but on a rayon thread pool. @@ -150,7 +150,7 @@ pub trait Inbound: Reading + Outbound { // Process the block response. let node = self.clone(); - match spawn_blocking(move || node.block_response(peer_ip, blocks.0)).await? { + match spawn_blocking(move || node.block_response(peer_ip, blocks.0, latest_consensus_version)).await? { true => Ok(true), false => bail!("Peer '{peer_ip}' sent an invalid block response"), } @@ -310,7 +310,12 @@ pub trait Inbound: Reading + Outbound { fn block_request(&self, peer_ip: SocketAddr, _message: BlockRequest) -> bool; /// Handles a `BlockResponse` message. - fn block_response(&self, peer_ip: SocketAddr, _blocks: Vec>) -> bool; + fn block_response( + &self, + peer_ip: SocketAddr, + blocks: Vec>, + latest_consensus_version: Option, + ) -> bool; /// Handles a `PeerRequest` message. fn peer_request(&self, peer_ip: SocketAddr) -> bool { diff --git a/node/router/tests/common/router.rs b/node/router/tests/common/router.rs index 989de00f69..95bf6770d0 100644 --- a/node/router/tests/common/router.rs +++ b/node/router/tests/common/router.rs @@ -40,6 +40,7 @@ use snarkos_node_tcp::{ protocols::{Disconnect, Handshake, OnConnect, Reading, Writing}, }; use snarkvm::prelude::{ + ConsensusVersion, Field, Network, block::{Block, Header, Transaction}, @@ -209,7 +210,12 @@ impl Inbound for TestRouter { } /// Handles a `BlockResponse` message. - fn block_response(&self, _peer_ip: SocketAddr, _blocks: Vec>) -> bool { + fn block_response( + &self, + _peer_ip: SocketAddr, + _blocks: Vec>, + _latest_consensus_version: Option, + ) -> bool { true } diff --git a/node/src/client/router.rs b/node/src/client/router.rs index 19cfd89bfa..6824857c34 100644 --- a/node/src/client/router.rs +++ b/node/src/client/router.rs @@ -32,8 +32,9 @@ use snarkos_node_router::{ }; use snarkos_node_tcp::{Connection, ConnectionSide, Tcp}; use snarkvm::{ - ledger::narwhal::Data, - prelude::{Network, block::Transaction}, + console::network::{ConsensusVersion, Network}, + ledger::{block::Transaction, narwhal::Data}, + utilities::{log_error, log_warning}, }; use std::{io, net::SocketAddr}; @@ -179,24 +180,40 @@ impl> Inbound for Client { fn block_request(&self, peer_ip: SocketAddr, message: BlockRequest) -> bool { let BlockRequest { start_height, end_height } = &message; + // Get the latest consensus version, i.e., the one for the last block's height. + let latest_consensus_version = match N::CONSENSUS_VERSION(end_height.saturating_sub(1)) { + Ok(version) => version, + Err(err) => { + log_error(err.context("Failed to retrieve consensus version")); + return false; + } + }; + // Retrieve the blocks within the requested range. let blocks = match self.ledger.get_blocks(*start_height..*end_height) { - Ok(blocks) => Data::Object(DataBlocks(blocks)), + Ok(blocks) => DataBlocks(blocks), Err(error) => { error!("Failed to retrieve blocks {start_height} to {end_height} from the ledger - {error}"); return false; } }; + // Send the `BlockResponse` message to the peer. - self.router().send(peer_ip, Message::BlockResponse(BlockResponse { request: message, blocks })); + self.router() + .send(peer_ip, Message::BlockResponse(BlockResponse::new(message, blocks, latest_consensus_version))); true } /// Handles a `BlockResponse` message. - fn block_response(&self, peer_ip: SocketAddr, blocks: Vec>) -> bool { + fn block_response( + &self, + peer_ip: SocketAddr, + blocks: Vec>, + latest_consensus_version: Option, + ) -> bool { // We do not need to explicitly sync here because insert_block_response, will wake up the sync task. - if let Err(err) = self.sync.insert_block_responses(peer_ip, blocks) { - warn!("Failed to insert block response: {err}"); + if let Err(err) = self.sync.insert_block_responses(peer_ip, blocks, latest_consensus_version) { + log_warning(err.context("Failed to insert block response")); false } else { true diff --git a/node/src/prover/router.rs b/node/src/prover/router.rs index c4fcf7773c..4c13198785 100644 --- a/node/src/prover/router.rs +++ b/node/src/prover/router.rs @@ -26,7 +26,7 @@ use snarkos_node_router::messages::{ UnconfirmedTransaction, }; use snarkos_node_tcp::{Connection, ConnectionSide, Tcp}; -use snarkvm::prelude::{Field, Network, Zero, block::Transaction}; +use snarkvm::prelude::{ConsensusVersion, Field, Network, Zero, block::Transaction}; use std::{io, net::SocketAddr}; @@ -167,7 +167,12 @@ impl> Inbound for Prover { } /// Handles a `BlockResponse` message. - fn block_response(&self, peer_ip: SocketAddr, _blocks: Vec>) -> bool { + fn block_response( + &self, + peer_ip: SocketAddr, + _blocks: Vec>, + _latest_consensus_version: Option, + ) -> bool { debug!("Disconnecting '{peer_ip}' for the following reason - {:?}", DisconnectReason::ProtocolViolation); false } diff --git a/node/src/validator/router.rs b/node/src/validator/router.rs index 0d87ae96b7..87ec5b3d72 100644 --- a/node/src/validator/router.rs +++ b/node/src/validator/router.rs @@ -28,8 +28,9 @@ use snarkos_node_router::messages::{ }; use snarkos_node_tcp::{Connection, ConnectionSide, Tcp}; use snarkvm::{ - ledger::narwhal::Data, - prelude::{Network, block::Transaction, error}, + console::network::{ConsensusVersion, Network}, + ledger::{block::Transaction, narwhal::Data}, + utilities::{error, log_error}, }; use std::{io, net::SocketAddr}; @@ -175,21 +176,36 @@ impl> Inbound for Validator { fn block_request(&self, peer_ip: SocketAddr, message: BlockRequest) -> bool { let BlockRequest { start_height, end_height } = &message; + // Get the latest consensus version, i.e., the one for the last block's height. + let latest_consensus_version = match N::CONSENSUS_VERSION(end_height.saturating_sub(1)) { + Ok(version) => version, + Err(err) => { + log_error(err.context("Failed to retrieve consensus version")); + return false; + } + }; + // Retrieve the blocks within the requested range. let blocks = match self.ledger.get_blocks(*start_height..*end_height) { - Ok(blocks) => Data::Object(DataBlocks(blocks)), + Ok(blocks) => DataBlocks(blocks), Err(error) => { error!("Failed to retrieve blocks {start_height} to {end_height} from the ledger - {error}"); return false; } }; // Send the `BlockResponse` message to the peer. - self.router().send(peer_ip, Message::BlockResponse(BlockResponse { request: message, blocks })); + self.router() + .send(peer_ip, Message::BlockResponse(BlockResponse::new(message, blocks, latest_consensus_version))); true } /// Handles a `BlockResponse` message. - fn block_response(&self, peer_ip: SocketAddr, _blocks: Vec>) -> bool { + fn block_response( + &self, + peer_ip: SocketAddr, + _blocks: Vec>, + _latest_consensus_version: Option, + ) -> bool { warn!("Received a block response through P2P, not BFT, from {peer_ip}"); false } diff --git a/node/sync/src/block_sync.rs b/node/sync/src/block_sync.rs index d90f7aa207..2e1b35f359 100644 --- a/node/sync/src/block_sync.rs +++ b/node/sync/src/block_sync.rs @@ -22,7 +22,12 @@ use snarkos_node_network::PeerPoolHandling; use snarkos_node_router::messages::DataBlocks; use snarkos_node_sync_communication_service::CommunicationService; use snarkos_node_sync_locators::{CHECKPOINT_INTERVAL, NUM_RECENT_BLOCKS}; -use snarkvm::prelude::{Network, block::Block}; + +use snarkvm::{ + console::network::{ConsensusVersion, Network}, + prelude::block::Block, + utilities::ensure_equals, +}; use anyhow::{Result, bail, ensure}; use indexmap::{IndexMap, IndexSet}; @@ -448,7 +453,32 @@ impl BlockSync { /// Note, that this only queues the response. After this, you most likely want to call `Self::try_advancing_block_synchronization`. /// #[inline] - pub fn insert_block_responses(&self, peer_ip: SocketAddr, blocks: Vec>) -> Result<()> { + pub fn insert_block_responses( + &self, + peer_ip: SocketAddr, + blocks: Vec>, + latest_consensus_version: Option, + ) -> Result<()> { + let Some(last_height) = blocks.as_slice().last().map(|b| b.height()) else { + bail!("Empty block response"); + }; + + let expected_consensus_version = N::CONSENSUS_VERSION(last_height)?; + + // Perform consensus version check, if possible. + // This check is only enabled after nodes have reached V12. + if expected_consensus_version > ConsensusVersion::V11 { + if let Some(latest_consensus_version) = latest_consensus_version { + ensure_equals!( + expected_consensus_version, + latest_consensus_version, + "the peer's consensus version for height {last_height} does not match ours" + ); + } else { + bail!("The peer did not send a consensus version"); + } + } + // Insert the candidate blocks into the sync pool. for block in blocks { if let Err(error) = self.insert_block_response(peer_ip, block) {