From 276b4a64a57e2e27415e15359284010dba9efc0e Mon Sep 17 00:00:00 2001 From: Victor Sint Nicolaas Date: Mon, 27 Oct 2025 22:54:54 +0100 Subject: [PATCH 01/10] Cache incoming txs and broadcast them to all validators a.s.a.p. --- Cargo.lock | 1 + node/bft/events/src/lib.rs | 10 +- .../bft/events/src/unconfirmed_transaction.rs | 100 ++++++++++++++++++ node/bft/src/gateway.rs | 37 ++++++- node/bft/src/helpers/channels.rs | 15 ++- node/bft/src/helpers/storage.rs | 7 ++ node/bft/src/worker.rs | 24 ++++- node/bft/storage-service/src/memory.rs | 11 ++ node/bft/storage-service/src/persistent.rs | 20 ++-- node/bft/storage-service/src/traits.rs | 5 + node/consensus/src/lib.rs | 21 ++++ node/rest/Cargo.toml | 3 + node/rest/src/routes.rs | 14 ++- node/src/validator/router.rs | 37 +++++-- 14 files changed, 274 insertions(+), 31 deletions(-) create mode 100644 node/bft/events/src/unconfirmed_transaction.rs diff --git a/Cargo.lock b/Cargo.lock index b97e470ba8..33d8b5dbfb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4403,6 +4403,7 @@ dependencies = [ "serde", "serde_json", "serde_with", + "snarkos-node-bft", "snarkos-node-cdn", "snarkos-node-consensus", "snarkos-node-network", diff --git a/node/bft/events/src/lib.rs b/node/bft/events/src/lib.rs index 5a116f5b3f..28331ec0c0 100644 --- a/node/bft/events/src/lib.rs +++ b/node/bft/events/src/lib.rs @@ -57,6 +57,9 @@ pub use transmission_request::TransmissionRequest; mod transmission_response; pub use transmission_response::TransmissionResponse; +mod unconfirmed_transaction; +pub use unconfirmed_transaction::UnconfirmedTransaction; + mod validators_request; pub use validators_request::ValidatorsRequest; @@ -108,6 +111,7 @@ pub enum Event { ValidatorsRequest(ValidatorsRequest), ValidatorsResponse(ValidatorsResponse), WorkerPing(WorkerPing), + UnconfirmedTransaction(UnconfirmedTransaction), } impl From for Event { @@ -140,6 +144,7 @@ impl Event { Self::ValidatorsRequest(event) => event.name(), Self::ValidatorsResponse(event) => event.name(), Self::WorkerPing(event) => event.name(), + Self::UnconfirmedTransaction(event) => event.name(), } } @@ -163,6 +168,7 @@ impl Event { Self::ValidatorsRequest(..) => 13, Self::ValidatorsResponse(..) => 14, Self::WorkerPing(..) => 15, + Self::UnconfirmedTransaction(..) => 16, } } } @@ -188,6 +194,7 @@ impl ToBytes for Event { Self::ValidatorsRequest(event) => event.write_le(writer), Self::ValidatorsResponse(event) => event.write_le(writer), Self::WorkerPing(event) => event.write_le(writer), + Self::UnconfirmedTransaction(event) => event.write_le(writer), } } } @@ -215,7 +222,8 @@ impl FromBytes for Event { 13 => Self::ValidatorsRequest(ValidatorsRequest::read_le(&mut reader)?), 14 => Self::ValidatorsResponse(ValidatorsResponse::read_le(&mut reader)?), 15 => Self::WorkerPing(WorkerPing::read_le(&mut reader)?), - 16.. => return Err(error(format!("Unknown event ID {id}"))), + 16 => Self::UnconfirmedTransaction(UnconfirmedTransaction::read_le(&mut reader)?), + 17.. => return Err(error(format!("Unknown event ID {id}"))), }; // Ensure that there are no "dangling" bytes. diff --git a/node/bft/events/src/unconfirmed_transaction.rs b/node/bft/events/src/unconfirmed_transaction.rs new file mode 100644 index 0000000000..a817c3fd38 --- /dev/null +++ b/node/bft/events/src/unconfirmed_transaction.rs @@ -0,0 +1,100 @@ +// Copyright (c) 2019-2025 Provable Inc. +// This file is part of the snarkOS library. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: + +// http://www.apache.org/licenses/LICENSE-2.0 + +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::*; + +use snarkvm::prelude::Transaction; + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct UnconfirmedTransaction { + pub transaction: Data>, +} + +impl UnconfirmedTransaction { + /// Initializes a new transmission response event. + pub fn new(transaction: Data>) -> Self { + Self { transaction } + } +} + +impl From>> for UnconfirmedTransaction { + /// Initializes a new transmission response event. + fn from(transaction: Data>) -> Self { + Self::new(transaction) + } +} + +impl EventTrait for UnconfirmedTransaction { + /// Returns the event name. + #[inline] + fn name(&self) -> Cow<'static, str> { + "UnconfirmedTransaction".into() + } +} + +impl ToBytes for UnconfirmedTransaction { + fn write_le(&self, mut writer: W) -> IoResult<()> { + self.transaction.write_le(&mut writer)?; + Ok(()) + } +} + +impl FromBytes for UnconfirmedTransaction { + fn read_le(mut reader: R) -> IoResult { + let transaction = Data::read_le(&mut reader)?; + + Ok(Self { transaction }) + } +} + +#[cfg(test)] +pub mod prop_tests { + use crate::UnconfirmedTransaction; + use snarkvm::{ + console::prelude::{FromBytes, ToBytes}, + ledger::narwhal::Data, + prelude::Transaction, + }; + + use bytes::{Buf, BufMut, Bytes, BytesMut}; + use proptest::{ + collection, + prelude::{BoxedStrategy, Strategy, any}, + prop_oneof, + }; + use test_strategy::proptest; + + type CurrentNetwork = snarkvm::prelude::MainnetV0; + + pub fn any_transaction() -> BoxedStrategy>> { + prop_oneof![(collection::vec(any::(), 512..=512)).prop_map(|bytes| (Data::Buffer(Bytes::from(bytes)))),] + .boxed() + } + + pub fn any_unconfirmed_transaction() -> BoxedStrategy> { + any_transaction().prop_map(UnconfirmedTransaction::new).boxed() + } + + #[proptest] + fn serialize_deserialize( + #[strategy(any_unconfirmed_transaction())] original: UnconfirmedTransaction, + ) { + let mut buf = BytesMut::default().writer(); + UnconfirmedTransaction::write_le(&original, &mut buf).unwrap(); + + let deserialized = UnconfirmedTransaction::read_le(buf.into_inner().reader()).unwrap(); + assert_eq!(original, deserialized); + } +} diff --git a/node/bft/src/gateway.rs b/node/bft/src/gateway.rs index 3f38f1d1fe..52434c2712 100644 --- a/node/bft/src/gateway.rs +++ b/node/bft/src/gateway.rs @@ -65,9 +65,9 @@ use snarkvm::{ console::prelude::*, ledger::{ committee::Committee, - narwhal::{BatchHeader, Data}, + narwhal::{BatchHeader, Data, Transmission, TransmissionID}, }, - prelude::{Address, Field}, + prelude::{Address, Field, Transaction}, }; use colored::Colorize; @@ -738,6 +738,31 @@ impl Gateway { } Ok(true) } + Event::UnconfirmedTransaction(event) => { + // Perform the deferred non-blocking deserialization of the transaction. + let transaction = match event.transaction.deserialize().await { + Ok(transaction) => transaction, + Err(error) => bail!("[UnconfirmedTransaction] {error}"), + }; + // Calculate the transmission checksum. + let checksum = Data::>::Buffer(transaction.to_bytes_le()?.into()).to_checksum::()?; + // Construct the transmission ID. + let transmission_id = TransmissionID::Transaction(transaction.id(), checksum); + // Construct the transmission. + let transmission = Transmission::Transaction(Data::Object(transaction)); + + // Determine the worker ID. + let Ok(worker_id) = assign_to_worker(transmission_id, self.num_workers()) else { + warn!("{CONTEXT} Unable to assign transmission ID '{}' to a worker", transmission_id); + return Ok(true); + }; + // Send the unconfirmed transmission to the worker. + if let Some(sender) = self.get_worker_sender(worker_id) { + // Send the unconfirmed transmission to the worker. + let _ = sender.tx_unconfirmed_transmission.send((peer_ip, transmission_id, transmission)).await; + } + Ok(true) + } Event::ValidatorsRequest(_) => { let mut connected_peers = self.get_best_connected_peers(Some(MAX_VALIDATORS_TO_SEND)); connected_peers.shuffle(&mut rand::thread_rng()); @@ -1105,6 +1130,14 @@ impl Gateway { fn handle_banned_ips(&self) { self.tcp.banned_peers().remove_old_bans(IP_BAN_TIME_IN_SECS); } + + /// Broadcast an unconfirmed transaction so other validators can cache it. + pub fn broadcast_unconfirmed_transaction(&self, transaction: Transaction) { + let event = Event::UnconfirmedTransaction(crate::events::UnconfirmedTransaction { + transaction: Data::Object(transaction), + }); + Transport::broadcast(self, event); + } } #[async_trait] diff --git a/node/bft/src/helpers/channels.rs b/node/bft/src/helpers/channels.rs index 5337216085..52163818cb 100644 --- a/node/bft/src/helpers/channels.rs +++ b/node/bft/src/helpers/channels.rs @@ -219,6 +219,7 @@ pub struct WorkerSender { pub tx_worker_ping: mpsc::Sender<(SocketAddr, TransmissionID)>, pub tx_transmission_request: mpsc::Sender<(SocketAddr, TransmissionRequest)>, pub tx_transmission_response: mpsc::Sender<(SocketAddr, TransmissionResponse)>, + pub tx_unconfirmed_transmission: mpsc::Sender<(SocketAddr, TransmissionID, Transmission)>, } #[derive(Debug)] @@ -226,6 +227,7 @@ pub struct WorkerReceiver { pub rx_worker_ping: mpsc::Receiver<(SocketAddr, TransmissionID)>, pub rx_transmission_request: mpsc::Receiver<(SocketAddr, TransmissionRequest)>, pub rx_transmission_response: mpsc::Receiver<(SocketAddr, TransmissionResponse)>, + pub rx_unconfirmed_transmission: mpsc::Receiver<(SocketAddr, TransmissionID, Transmission)>, } /// Initializes the worker channels. @@ -233,9 +235,16 @@ pub fn init_worker_channels() -> (WorkerSender, WorkerReceiver let (tx_worker_ping, rx_worker_ping) = mpsc::channel(MAX_CHANNEL_SIZE); let (tx_transmission_request, rx_transmission_request) = mpsc::channel(MAX_CHANNEL_SIZE); let (tx_transmission_response, rx_transmission_response) = mpsc::channel(MAX_CHANNEL_SIZE); - - let sender = WorkerSender { tx_worker_ping, tx_transmission_request, tx_transmission_response }; - let receiver = WorkerReceiver { rx_worker_ping, rx_transmission_request, rx_transmission_response }; + let (tx_unconfirmed_transmission, rx_unconfirmed_transmission) = mpsc::channel(MAX_CHANNEL_SIZE); + + let sender = + WorkerSender { tx_worker_ping, tx_transmission_request, tx_transmission_response, tx_unconfirmed_transmission }; + let receiver = WorkerReceiver { + rx_worker_ping, + rx_transmission_request, + rx_transmission_response, + rx_unconfirmed_transmission, + }; (sender, receiver) } diff --git a/node/bft/src/helpers/storage.rs b/node/bft/src/helpers/storage.rs index 46d4beafe6..6549ea8ea3 100644 --- a/node/bft/src/helpers/storage.rs +++ b/node/bft/src/helpers/storage.rs @@ -719,6 +719,13 @@ impl Storage { ); } + /// Caches the given `transmission` in storage. + /// + /// Returns whether the transaction is already present in the cache. + pub fn cache_transmission(&self, transmission_id: TransmissionID, transmission: Transmission) -> bool { + self.transmissions.cache_transmission(transmission_id, transmission) + } + /// Inserts the given unprocessed `certificate` into storage. /// /// This is a temporary storage, which is cleared again when calling `insert_certificate_atomic`. diff --git a/node/bft/src/worker.rs b/node/bft/src/worker.rs index 8aca95bd5c..70ad6c0962 100644 --- a/node/bft/src/worker.rs +++ b/node/bft/src/worker.rs @@ -175,7 +175,7 @@ impl Worker { || self.ledger.contains_transmission(&transmission_id).unwrap_or(false) } - /// Returns the transmission if it exists in the ready queue, proposed batch, storage. + /// Returns the transmission if it exists in the ready queue, storage or proposed batch. /// /// Note: We explicitly forbid retrieving a transmission from the ledger, as transmissions /// in the ledger are not guaranteed to be invalid for the current batch. @@ -252,6 +252,13 @@ impl Worker { self.gateway.broadcast(Event::WorkerPing(transmission_ids.into())); } } + + /// Inserts the unconfirmed transmission into the storage. + /// + /// Returns whether the transaction is already present in the cache. + fn cache_transmission(&self, transmission_id: TransmissionID, transmission: Transmission) -> bool { + self.storage.cache_transmission(transmission_id, transmission) + } } impl Worker { @@ -426,7 +433,12 @@ impl Worker { impl Worker { /// Starts the worker handlers. fn start_handlers(&self, receiver: WorkerReceiver) { - let WorkerReceiver { mut rx_worker_ping, mut rx_transmission_request, mut rx_transmission_response } = receiver; + let WorkerReceiver { + mut rx_worker_ping, + mut rx_transmission_request, + mut rx_transmission_response, + mut rx_unconfirmed_transmission, + } = receiver; // Start the pending queue expiration loop. let self_ = self.clone(); @@ -472,6 +484,14 @@ impl Worker { }); } }); + + // Process the unconfirmed transmissions. + let self_ = self.clone(); + self.spawn(async move { + while let Some((_peer_ip, transmission_id, transmission)) = rx_unconfirmed_transmission.recv().await { + self_.cache_transmission(transmission_id, transmission); + } + }); } /// Sends a transmission request to the specified peer. diff --git a/node/bft/storage-service/src/memory.rs b/node/bft/storage-service/src/memory.rs index f945b986e4..830540cb6f 100644 --- a/node/bft/storage-service/src/memory.rs +++ b/node/bft/storage-service/src/memory.rs @@ -98,6 +98,17 @@ impl StorageService for BFTMemoryService { Ok(missing_transmissions) } + /// Caches the given transmission in storage. + /// + /// Returns whether the transaction is already present in the cache. + fn cache_transmission(&self, transmission_id: TransmissionID, transmission: Transmission) -> bool { + // Acquire the transmissions write lock. + let mut transmissions = self.transmissions.write(); + // Insert the transmission. + // TODO: fix the insertion or interface. + transmissions.insert(transmission_id, (transmission, indexset! {})).is_some() + } + /// Inserts the given certificate ID for each of the transmission IDs, using the missing transmissions map, into storage. fn insert_transmissions( &self, diff --git a/node/bft/storage-service/src/persistent.rs b/node/bft/storage-service/src/persistent.rs index be391745e3..0aff2d4fbe 100644 --- a/node/bft/storage-service/src/persistent.rs +++ b/node/bft/storage-service/src/persistent.rs @@ -52,10 +52,8 @@ pub struct BFTPersistentStorage { transmissions: DataMap, (Transmission, IndexSet>)>, /// The map of `aborted transmission ID` to `certificate IDs` entries. aborted_transmission_ids: DataMap, IndexSet>>, - /// The LRU cache for `transmission ID` to `(transmission, certificate IDs)` entries that are part of the persistent storage. - cache_transmissions: Mutex, (Transmission, IndexSet>)>>, - /// The LRU cache for `aborted transmission ID` to `certificate IDs` entries that are part of the persistent storage. - cache_aborted_transmission_ids: Mutex, IndexSet>>>, + /// The LRU cache for `transmission ID` to `transmission` entries that are part of the persistent storage. + cache_transmissions: Mutex, Transmission>>, } impl BFTPersistentStorage { @@ -74,7 +72,6 @@ impl BFTPersistentStorage { MapID::BFT(BFTMap::AbortedTransmissionIDs), )?, cache_transmissions: Mutex::new(LruCache::new(capacity)), - cache_aborted_transmission_ids: Mutex::new(LruCache::new(capacity)), }) } } @@ -102,7 +99,7 @@ impl StorageService for BFTPersistentStorage { /// If the transmission ID does not exist in storage, `None` is returned. fn get_transmission(&self, transmission_id: TransmissionID) -> Option> { // Try to get the transmission from the cache first. - if let Some((transmission, _)) = self.cache_transmissions.lock().get_mut(&transmission_id) { + if let Some(transmission) = self.cache_transmissions.lock().get(&transmission_id) { return Some(transmission.clone()); } @@ -150,6 +147,13 @@ impl StorageService for BFTPersistentStorage { Ok(missing_transmissions) } + /// Caches the given transmission in storage. + /// + /// Returns whether the transaction is already present in the cache. + fn cache_transmission(&self, transmission_id: TransmissionID, transmission: Transmission) -> bool { + self.cache_transmissions.lock().put(transmission_id, transmission).is_some() + } + /// Inserts the given certificate ID for each of the transmission IDs, using the missing transmissions map, into storage. fn insert_transmissions( &self, @@ -195,7 +199,7 @@ impl StorageService for BFTPersistentStorage { error!("Failed to insert transmission {transmission_id} into storage - {e}"); } // Insert the transmission into the cache. - self.cache_transmissions.lock().put(transmission_id, (transmission, certificate_ids)); + self.cache_transmissions.lock().put(transmission_id, transmission); } // Next, handle the aborted transmission IDs. @@ -219,8 +223,6 @@ impl StorageService for BFTPersistentStorage { if let Err(e) = self.aborted_transmission_ids.insert(aborted_transmission_id, certificate_ids.clone()) { error!("Failed to insert aborted transmission ID {aborted_transmission_id} into storage - {e}"); } - // Insert the certificate IDs into the cache. - self.cache_aborted_transmission_ids.lock().put(aborted_transmission_id, certificate_ids); } } diff --git a/node/bft/storage-service/src/traits.rs b/node/bft/storage-service/src/traits.rs index e542812b59..ce4b93a859 100644 --- a/node/bft/storage-service/src/traits.rs +++ b/node/bft/storage-service/src/traits.rs @@ -41,6 +41,11 @@ pub trait StorageService: Debug + Send + Sync { aborted_transmissions: HashSet>, ) -> Result, Transmission>>; + /// Caches the given transmission in storage. + /// + /// Returns whether the transaction is already present in the cache. + fn cache_transmission(&self, transmission_id: TransmissionID, transmission: Transmission) -> bool; + /// Inserts the given certificate ID for each of the transmission IDs, using the missing transmissions map, into storage. fn insert_transmissions( &self, diff --git a/node/consensus/src/lib.rs b/node/consensus/src/lib.rs index c72a112f8b..9eba97ebdd 100644 --- a/node/consensus/src/lib.rs +++ b/node/consensus/src/lib.rs @@ -372,6 +372,27 @@ impl Consensus { Ok(()) } + /// Caches the given unconfirmed transaction. + /// + /// Returns whether the transaction to be cached is new. + pub async fn cache_unconfirmed_transaction(&self, transaction: Transaction) -> Result { + // Calculate the transmission checksum. + let checksum = Data::>::Buffer(transaction.to_bytes_le()?.into()).to_checksum::()?; + // Construct the transmission ID. + let transmission_id = TransmissionID::Transaction(transaction.id(), checksum); + // Queue the unconfirmed transaction. + let transaction_id = transaction.id(); + + // Check that the transaction is not a fee transaction. + if transaction.is_fee() { + bail!("Transaction '{}' is a fee transaction {}", fmt_id(transaction_id), "(skipping)".dimmed()); + } + // Construct the transmission. + let transmission = Transmission::Transaction(Data::Object(transaction)); + // Insert the transaction into cache_transmissions. + Ok(self.bft().primary().storage().cache_transmission(transmission_id, transmission)) + } + /// Adds the given unconfirmed transaction to the memory pool, which will then eventually be passed /// to the BFT layer for inclusion in a batch. pub async fn add_unconfirmed_transaction(&self, transaction: Transaction) -> Result<()> { diff --git a/node/rest/Cargo.toml b/node/rest/Cargo.toml index 41c8602ff7..662c150663 100644 --- a/node/rest/Cargo.toml +++ b/node/rest/Cargo.toml @@ -77,6 +77,9 @@ version = "3" [dependencies.snarkos-node-cdn] workspace = true +[dependencies.snarkos-node-bft] +workspace = true + [dependencies.snarkos-node-consensus] workspace = true diff --git a/node/rest/src/routes.rs b/node/rest/src/routes.rs index 813e19e637..a7ce99babf 100644 --- a/node/rest/src/routes.rs +++ b/node/rest/src/routes.rs @@ -14,6 +14,7 @@ // limitations under the License. use super::*; +use snarkos_node_bft::{Transport, events::Event}; use snarkos_node_network::PeerPoolHandling; use snarkos_node_router::messages::UnconfirmedSolution; use snarkvm::{ @@ -740,15 +741,20 @@ impl, R: Routing> Rest { res?; } - // If the consensus module is enabled, add the unconfirmed transaction to the memory pool. + // If the consensus module is enabled, add the unconfirmed transaction to the memory pool and broadcast it to validators. if let Some(consensus) = rest.consensus { // Add the unconfirmed transaction to the memory pool. consensus.add_unconfirmed_transaction(tx.clone()).await?; + // Broadcast the unconfirmed transaction. + let event = Event::UnconfirmedTransaction(snarkos_node_bft::events::UnconfirmedTransaction { + transaction: Data::Object(tx.clone()), + }); + consensus.bft().primary().gateway().broadcast(event); + } else { + // Broadcast the transaction to peers. + rest.routing.propagate(message, &[]); } - // Broadcast the transaction. - rest.routing.propagate(message, &[]); - // Determine if the node is synced and if the transaction was checked. match !is_within_sync_leniency && check_transaction { // If the node is not synced and we validated the transaction, return a 203. diff --git a/node/src/validator/router.rs b/node/src/validator/router.rs index 55c8a03342..8aeb0eedd8 100644 --- a/node/src/validator/router.rs +++ b/node/src/validator/router.rs @@ -260,8 +260,8 @@ impl> Inbound for Validator { /// Propagates the unconfirmed solution to all connected validators. async fn unconfirmed_solution( &self, - peer_ip: SocketAddr, - serialized: UnconfirmedSolution, + _peer_ip: SocketAddr, + _serialized: UnconfirmedSolution, solution: Solution, ) -> bool { // Add the unconfirmed solution to the memory pool. @@ -269,27 +269,44 @@ impl> Inbound for Validator { trace!("[UnconfirmedSolution] {error}"); return true; // Maintain the connection. } - let message = Message::UnconfirmedSolution(serialized); + // let message = Message::UnconfirmedSolution(serialized); // Propagate the "UnconfirmedSolution" to the connected validators. - self.propagate_to_validators(message, &[peer_ip]); + // self.propagate_to_validators(message, &[peer_ip]); true } /// Handles an `UnconfirmedTransaction` message. async fn unconfirmed_transaction( &self, - peer_ip: SocketAddr, - serialized: UnconfirmedTransaction, + _peer_ip: SocketAddr, + _serialized: UnconfirmedTransaction, transaction: Transaction, ) -> bool { + // Cache the unconfirmed transaction. + match self.consensus.cache_unconfirmed_transaction(transaction.clone()).await { + Err(error) => { + trace!("[UnconfirmedTransaction] {error}"); + return true; // Maintain the connection. + } + Ok(transaction_previously_cached) => { + // If the transaction was previously propagated and cached, there's no need to propagate again. + if transaction_previously_cached { + return true; // Maintain the connection. + } + } + } + // Broadcast the unconfirmed transaction so other validators can cache it. + self.consensus.bft().primary().gateway().broadcast_unconfirmed_transaction(transaction.clone()); // Add the unconfirmed transaction to the memory pool. + // NOTE: broadcasting the transmission before adding it to our own + // mempool ensures that we reduce network resource usage for duplicate + // transmissions, at the expense of increased latency for new + // transmissions. if let Err(error) = self.consensus.add_unconfirmed_transaction(transaction).await { trace!("[UnconfirmedTransaction] {error}"); return true; // Maintain the connection. - } - let message = Message::UnconfirmedTransaction(serialized); - // Propagate the "UnconfirmedTransaction" to the connected validators. - self.propagate_to_validators(message, &[peer_ip]); + }; + true } } From c28429b3e275d2b5addc50b027974ea28073fbc5 Mon Sep 17 00:00:00 2001 From: Victor Sint Nicolaas Date: Fri, 19 Dec 2025 12:10:32 -0800 Subject: [PATCH 02/10] Introduce cache_transmissions in Memory storage to mirror Persistent storage --- node/bft/storage-service/src/memory.rs | 43 +++++++++++++++++++------- 1 file changed, 32 insertions(+), 11 deletions(-) diff --git a/node/bft/storage-service/src/memory.rs b/node/bft/storage-service/src/memory.rs index 830540cb6f..77e0cfaaee 100644 --- a/node/bft/storage-service/src/memory.rs +++ b/node/bft/storage-service/src/memory.rs @@ -15,16 +15,23 @@ use crate::StorageService; use snarkvm::{ - ledger::narwhal::{BatchHeader, Transmission, TransmissionID}, + ledger::{ + committee::Committee, + narwhal::{BatchHeader, Transmission, TransmissionID}, + }, prelude::{Field, Network, Result, bail}, }; use indexmap::{IndexMap, IndexSet, indexset, map::Entry}; #[cfg(feature = "locktick")] -use locktick::parking_lot::RwLock; +use locktick::parking_lot::{Mutex, RwLock}; +use lru::LruCache; #[cfg(not(feature = "locktick"))] -use parking_lot::RwLock; -use std::collections::{HashMap, HashSet}; +use parking_lot::{Mutex, RwLock}; +use std::{ + collections::{HashMap, HashSet}, + num::NonZeroUsize, +}; use tracing::error; /// A BFT in-memory storage service. @@ -34,6 +41,8 @@ pub struct BFTMemoryService { transmissions: RwLock, (Transmission, IndexSet>)>>, /// The map of `aborted transmission ID` to `certificate IDs` entries. aborted_transmission_ids: RwLock, IndexSet>>>, + /// The LRU cache for `transmission ID` to `transmission` entries that are part of the memory storage. + cache_transmissions: Mutex, Transmission>>, } impl Default for BFTMemoryService { @@ -46,7 +55,16 @@ impl Default for BFTMemoryService { impl BFTMemoryService { /// Initializes a new BFT in-memory storage service. pub fn new() -> Self { - Self { transmissions: Default::default(), aborted_transmission_ids: Default::default() } + let max_committee_size = Committee::::max_committee_size().unwrap(); + let capacity = + NonZeroUsize::new((max_committee_size as usize) * (BatchHeader::::MAX_TRANSMISSIONS_PER_BATCH) * 2) + .unwrap(); + + Self { + transmissions: Default::default(), + aborted_transmission_ids: Default::default(), + cache_transmissions: Mutex::new(LruCache::new(capacity)), + } } } @@ -61,6 +79,11 @@ impl StorageService for BFTMemoryService { /// Returns the transmission for the given `transmission ID`. /// If the transmission does not exist in storage, `None` is returned. fn get_transmission(&self, transmission_id: TransmissionID) -> Option> { + // Try to get the transmission from the cache first. + if let Some(transmission) = self.cache_transmissions.lock().get(&transmission_id) { + return Some(transmission.clone()); + } + // Get the transmission. self.transmissions.read().get(&transmission_id).map(|(transmission, _)| transmission).cloned() } @@ -102,11 +125,7 @@ impl StorageService for BFTMemoryService { /// /// Returns whether the transaction is already present in the cache. fn cache_transmission(&self, transmission_id: TransmissionID, transmission: Transmission) -> bool { - // Acquire the transmissions write lock. - let mut transmissions = self.transmissions.write(); - // Insert the transmission. - // TODO: fix the insertion or interface. - transmissions.insert(transmission_id, (transmission, indexset! {})).is_some() + self.cache_transmissions.lock().put(transmission_id, transmission).is_some() } /// Inserts the given certificate ID for each of the transmission IDs, using the missing transmissions map, into storage. @@ -145,7 +164,9 @@ impl StorageService for BFTMemoryService { // Prepare the set of certificate IDs. let certificate_ids = indexset! { certificate_id }; // Insert the transmission and a new set with the certificate ID. - vacant_entry.insert((transmission, certificate_ids)); + vacant_entry.insert((transmission.clone(), certificate_ids)); + // Insert the transmission into the cache. + self.cache_transmissions.lock().put(transmission_id, transmission); } } } From a9988bbc4897ca58ca4f72a876bc25be9a4b2c13 Mon Sep 17 00:00:00 2001 From: Victor Sint Nicolaas Date: Fri, 19 Dec 2025 13:32:01 -0800 Subject: [PATCH 03/10] Improve error logging --- node/bft/src/gateway.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/node/bft/src/gateway.rs b/node/bft/src/gateway.rs index 52434c2712..a725f0fb1b 100644 --- a/node/bft/src/gateway.rs +++ b/node/bft/src/gateway.rs @@ -759,7 +759,11 @@ impl Gateway { // Send the unconfirmed transmission to the worker. if let Some(sender) = self.get_worker_sender(worker_id) { // Send the unconfirmed transmission to the worker. - let _ = sender.tx_unconfirmed_transmission.send((peer_ip, transmission_id, transmission)).await; + if let Err(error) = + sender.tx_unconfirmed_transmission.send((peer_ip, transmission_id, transmission)).await + { + warn!("{CONTEXT} Unable to send unconfirmed transmission to worker {worker_id} - {error}"); + } } Ok(true) } From 2cb2f9a319bfcbb4bea09b13d054f5c0832f52f3 Mon Sep 17 00:00:00 2001 From: Victor Sint Nicolaas Date: Fri, 19 Dec 2025 13:32:18 -0800 Subject: [PATCH 04/10] Restore solution transmission --- node/src/validator/router.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/node/src/validator/router.rs b/node/src/validator/router.rs index 8aeb0eedd8..e10502a025 100644 --- a/node/src/validator/router.rs +++ b/node/src/validator/router.rs @@ -260,8 +260,8 @@ impl> Inbound for Validator { /// Propagates the unconfirmed solution to all connected validators. async fn unconfirmed_solution( &self, - _peer_ip: SocketAddr, - _serialized: UnconfirmedSolution, + peer_ip: SocketAddr, + serialized: UnconfirmedSolution, solution: Solution, ) -> bool { // Add the unconfirmed solution to the memory pool. @@ -269,9 +269,9 @@ impl> Inbound for Validator { trace!("[UnconfirmedSolution] {error}"); return true; // Maintain the connection. } - // let message = Message::UnconfirmedSolution(serialized); + let message = Message::UnconfirmedSolution(serialized); // Propagate the "UnconfirmedSolution" to the connected validators. - // self.propagate_to_validators(message, &[peer_ip]); + self.propagate_to_validators(message, &[peer_ip]); true } From 184c44a630d7319c52153760b080fd2cbcb7f936 Mon Sep 17 00:00:00 2001 From: Victor Sint Nicolaas Date: Fri, 19 Dec 2025 13:39:59 -0800 Subject: [PATCH 05/10] Properly process unconfirmed transactions --- node/src/validator/router.rs | 32 ++++++++++++-------------------- 1 file changed, 12 insertions(+), 20 deletions(-) diff --git a/node/src/validator/router.rs b/node/src/validator/router.rs index e10502a025..9617bb2b69 100644 --- a/node/src/validator/router.rs +++ b/node/src/validator/router.rs @@ -282,31 +282,23 @@ impl> Inbound for Validator { _serialized: UnconfirmedTransaction, transaction: Transaction, ) -> bool { - // Cache the unconfirmed transaction. - match self.consensus.cache_unconfirmed_transaction(transaction.clone()).await { - Err(error) => { - trace!("[UnconfirmedTransaction] {error}"); - return true; // Maintain the connection. - } - Ok(transaction_previously_cached) => { - // If the transaction was previously propagated and cached, there's no need to propagate again. - if transaction_previously_cached { - return true; // Maintain the connection. - } - } - } - // Broadcast the unconfirmed transaction so other validators can cache it. - self.consensus.bft().primary().gateway().broadcast_unconfirmed_transaction(transaction.clone()); // Add the unconfirmed transaction to the memory pool. - // NOTE: broadcasting the transmission before adding it to our own - // mempool ensures that we reduce network resource usage for duplicate - // transmissions, at the expense of increased latency for new - // transmissions. - if let Err(error) = self.consensus.add_unconfirmed_transaction(transaction).await { + // NOTE: we must add the transaction to the mempool before caching it, + // because otherwise the mempool thinks it is already present. + if let Err(error) = self.consensus.add_unconfirmed_transaction(transaction.clone()).await { trace!("[UnconfirmedTransaction] {error}"); return true; // Maintain the connection. }; + // Cache the unconfirmed transaction. + if let Err(error) = self.consensus.cache_unconfirmed_transaction(transaction.clone()).await { + trace!("[UnconfirmedTransaction] {error}"); + return true; // Maintain the connection. + }; + + // Broadcast the unconfirmed transaction so other validators can cache it. + self.consensus.bft().primary().gateway().broadcast_unconfirmed_transaction(transaction); + true } } From 850808d7e93f66d829435d9de5daabc50614b9e9 Mon Sep 17 00:00:00 2001 From: Victor Sint Nicolaas Date: Sat, 20 Dec 2025 19:07:32 +0100 Subject: [PATCH 06/10] Remove bootstrap peers log spam --- node/network/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/network/src/lib.rs b/node/network/src/lib.rs index 827d48edea..9d42110772 100644 --- a/node/network/src/lib.rs +++ b/node/network/src/lib.rs @@ -44,8 +44,8 @@ pub fn bootstrap_peers(is_dev: bool) -> Vec { // Development testing contains optional bootstrap peers loaded from the environment. match std::env::var("TEST_BOOTSTRAP_PEERS") { Ok(peers) => peers.split(',').map(|peer| SocketAddr::from_str(peer).unwrap()).collect(), - Err(err) => { - warn!("Failed to load bootstrap peers from environment: {err}"); + Err(_err) => { + // error may include the innocent 'environment variable not found' vec![] } } From d47c67fdb1e5a53188f53e587a5778d0dd2e9100 Mon Sep 17 00:00:00 2001 From: Victor Sint Nicolaas Date: Sat, 20 Dec 2025 19:13:13 +0100 Subject: [PATCH 07/10] Only log peer SHA from ConsensusVersion::V12 --- node/bft/src/gateway.rs | 6 +++++- node/router/src/handshake.rs | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/node/bft/src/gateway.rs b/node/bft/src/gateway.rs index a725f0fb1b..1e839c3f2a 100644 --- a/node/bft/src/gateway.rs +++ b/node/bft/src/gateway.rs @@ -1622,7 +1622,11 @@ impl Gateway { fn verify_challenge_request(&self, peer_addr: SocketAddr, event: &ChallengeRequest) -> Option { // Retrieve the components of the challenge request. let &ChallengeRequest { version, listener_port, address, nonce: _, ref snarkos_sha } = event; - log_repo_sha_comparison(peer_addr, snarkos_sha, CONTEXT); + let current_block_height = self.ledger.latest_block_height(); + let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap(); + if consensus_version >= ConsensusVersion::V12 { + log_repo_sha_comparison(peer_addr, snarkos_sha, CONTEXT); + } let listener_addr = SocketAddr::new(peer_addr.ip(), listener_port); diff --git a/node/router/src/handshake.rs b/node/router/src/handshake.rs index 74f87e49db..ecf8297a50 100644 --- a/node/router/src/handshake.rs +++ b/node/router/src/handshake.rs @@ -381,7 +381,11 @@ impl Router { ) -> Option { // Retrieve the components of the challenge request. let &ChallengeRequest { version, listener_port: _, node_type, address, nonce: _, ref snarkos_sha } = message; - log_repo_sha_comparison(peer_addr, snarkos_sha, Self::OWNER); + let current_block_height = self.ledger.latest_block_height(); + let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap(); + if consensus_version >= ConsensusVersion::V12 { + log_repo_sha_comparison(peer_addr, snarkos_sha, Self::OWNER); + } // Ensure the message protocol version is not outdated. if !self.is_valid_message_version(version) { From 31b110e42f2e224c23736ac806469315a6e774af Mon Sep 17 00:00:00 2001 From: Victor Sint Nicolaas Date: Sat, 20 Dec 2025 20:18:50 +0100 Subject: [PATCH 08/10] Add extra documentation --- node/bft/src/worker.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/node/bft/src/worker.rs b/node/bft/src/worker.rs index 70ad6c0962..55acdff5e5 100644 --- a/node/bft/src/worker.rs +++ b/node/bft/src/worker.rs @@ -489,6 +489,9 @@ impl Worker { let self_ = self.clone(); self.spawn(async move { while let Some((_peer_ip, transmission_id, transmission)) = rx_unconfirmed_transmission.recv().await { + // NOTE: to improve the chance of a transaction landing, besides + // caching incoming transactions we can also consider adding it + // to the mempool. self_.cache_transmission(transmission_id, transmission); } }); From d29ba9118d18dc89f5df3e155c90af8fed4d6435 Mon Sep 17 00:00:00 2001 From: Victor Sint Nicolaas Date: Fri, 26 Dec 2025 12:44:19 +0100 Subject: [PATCH 09/10] reduce redundant (de)serialization) --- node/bft/src/gateway.rs | 10 ++++------ node/src/validator/router.rs | 6 +++--- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/node/bft/src/gateway.rs b/node/bft/src/gateway.rs index df77cbbd0f..0e844f340f 100644 --- a/node/bft/src/gateway.rs +++ b/node/bft/src/gateway.rs @@ -739,13 +739,13 @@ impl Gateway { Ok(true) } Event::UnconfirmedTransaction(event) => { + // Calculate the transmission checksum. + let checksum = event.transaction.to_checksum::()?; // Perform the deferred non-blocking deserialization of the transaction. let transaction = match event.transaction.deserialize().await { Ok(transaction) => transaction, Err(error) => bail!("[UnconfirmedTransaction] {error}"), }; - // Calculate the transmission checksum. - let checksum = Data::>::Buffer(transaction.to_bytes_le()?.into()).to_checksum::()?; // Construct the transmission ID. let transmission_id = TransmissionID::Transaction(transaction.id(), checksum); // Construct the transmission. @@ -1150,10 +1150,8 @@ impl Gateway { } /// Broadcast an unconfirmed transaction so other validators can cache it. - pub fn broadcast_unconfirmed_transaction(&self, transaction: Transaction) { - let event = Event::UnconfirmedTransaction(crate::events::UnconfirmedTransaction { - transaction: Data::Object(transaction), - }); + pub fn broadcast_unconfirmed_transaction(&self, transaction: Data>) { + let event = Event::UnconfirmedTransaction(crate::events::UnconfirmedTransaction { transaction }); Transport::broadcast(self, event); } } diff --git a/node/src/validator/router.rs b/node/src/validator/router.rs index 9617bb2b69..57aa25f35d 100644 --- a/node/src/validator/router.rs +++ b/node/src/validator/router.rs @@ -279,7 +279,7 @@ impl> Inbound for Validator { async fn unconfirmed_transaction( &self, _peer_ip: SocketAddr, - _serialized: UnconfirmedTransaction, + serialized: UnconfirmedTransaction, transaction: Transaction, ) -> bool { // Add the unconfirmed transaction to the memory pool. @@ -291,13 +291,13 @@ impl> Inbound for Validator { }; // Cache the unconfirmed transaction. - if let Err(error) = self.consensus.cache_unconfirmed_transaction(transaction.clone()).await { + if let Err(error) = self.consensus.cache_unconfirmed_transaction(transaction).await { trace!("[UnconfirmedTransaction] {error}"); return true; // Maintain the connection. }; // Broadcast the unconfirmed transaction so other validators can cache it. - self.consensus.bft().primary().gateway().broadcast_unconfirmed_transaction(transaction); + self.consensus.bft().primary().gateway().broadcast_unconfirmed_transaction(serialized.transaction); true } From 4b797097ec7f628b3730dd9d805e76e26d14b7d3 Mon Sep 17 00:00:00 2001 From: Kai Mast Date: Sun, 4 Jan 2026 16:31:02 -0800 Subject: [PATCH 10/10] misc(node/network): better error handling `bootstrap_peers` --- node/network/src/lib.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/node/network/src/lib.rs b/node/network/src/lib.rs index 9d42110772..f168f5fe2b 100644 --- a/node/network/src/lib.rs +++ b/node/network/src/lib.rs @@ -29,7 +29,7 @@ pub use resolver::*; use snarkvm::prelude::Network; -use std::{net::SocketAddr, str::FromStr}; +use std::{env::VarError, net::SocketAddr, str::FromStr}; use tracing::*; // Include the generated build information. @@ -44,8 +44,13 @@ pub fn bootstrap_peers(is_dev: bool) -> Vec { // Development testing contains optional bootstrap peers loaded from the environment. match std::env::var("TEST_BOOTSTRAP_PEERS") { Ok(peers) => peers.split(',').map(|peer| SocketAddr::from_str(peer).unwrap()).collect(), - Err(_err) => { - // error may include the innocent 'environment variable not found' + Err(VarError::NotPresent) => { + // Return an empty list if the environment variable is not present. + vec![] + } + Err(err) => { + // Log other errors, e.g., invalid encoding. + warn!("Failed to load bootstrap peers from environment: {err}"); vec![] } }