diff --git a/node/bft/src/primary.rs b/node/bft/src/primary.rs index cc50772eef..f8bd5eb575 100644 --- a/node/bft/src/primary.rs +++ b/node/bft/src/primary.rs @@ -63,7 +63,7 @@ use snarkvm::{ narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID}, puzzle::{Solution, SolutionID}, }, - prelude::{ConsensusVersion, committee::Committee}, + prelude::{ConsensusVersion, Signature, committee::Committee}, utilities::flatten_error, }; @@ -92,8 +92,46 @@ use std::{ use tokio::sync::RwLock as TRwLock; use tokio::{sync::Notify, task::JoinHandle}; -/// A helper type to keep track of the latest proposed batch. -pub(crate) type ProposedBatch = RwLock>>; +/// The state of the primary's batch proposal. +#[derive(Debug, PartialEq, Eq)] +pub enum ProposedBatchState { + /// No batch is currently being proposed. + None, + /// A batch is being proposed and awaiting signatures. + Certifying(Box>), + /// A batch has reached quorum and is being inserted into storage. + /// Carries the batch ID so late-arriving signatures can be recognized and silently dropped. + Certified(Field), +} + +impl Default for ProposedBatchState { + fn default() -> Self { + Self::None + } +} + +impl ProposedBatchState { + /// Returns `true` if the primary has no active batch proposal. + pub fn is_none(&self) -> bool { + matches!(self, Self::None) + } + + /// Returns `true` if a batch is currently being proposed (awaiting signatures). + pub fn is_proposed(&self) -> bool { + matches!(self, Self::Certifying(_)) + } + + /// Returns a reference to the in-progress proposal, or `None` if not in the `Certifying` state. + pub fn as_proposal(&self) -> Option<&Proposal> { + match self { + Self::Certifying(p) => Some(p.as_ref()), + _ => None, + } + } +} + +/// A helper type to keep track of the state of the primary's batch proposal. +pub type ProposedBatch = RwLock>; /// This callback trait allows listening to changes in the Primary, such as round advancement. /// This is implemented by [`BFT`]. @@ -223,7 +261,10 @@ impl Primary { proposal_cache.into(); *self.latest_proposed_batch.write().await = Some((latest_certificate_round, now())); - *self.proposed_batch.write() = proposed_batch; + *self.proposed_batch.write() = match proposed_batch { + Some(p) => ProposedBatchState::Certifying(Box::new(p)), + None => ProposedBatchState::None, + }; *self.signed_proposals.write() = signed_proposals; // Update the storage with the pending certificates. @@ -459,46 +500,55 @@ impl proposal_task::BatchPropose for Primary { return Ok(false); } - // If there is a batch being proposed already, - // rebroadcast the batch header to the non-signers, and return early. - if let Some(proposal) = &*self.proposed_batch.read() { - // Ensure that the storage is caught up to the proposal before proceeding to rebroadcast this. - if round < proposal.round() - || proposal - .batch_header() - .previous_certificate_ids() - .iter() - .any(|id| !self.storage.contains_certificate(*id)) - { - warn!( - "Cannot propose a batch for round {} - the current storage (round {round}) is not caught up to the proposed batch.", - proposal.round(), - ); - return Ok(false); - } - // Construct the event. - // TODO(ljedrz): the BatchHeader should be serialized only once in advance before being sent to non-signers. - let event = Event::BatchPropose(proposal.batch_header().clone().into()); - // Iterate through the non-signers. - for address in proposal.nonsigners(&self.ledger.get_committee_lookback_for_round(proposal.round())?) { - // Resolve the address to the peer IP. - match self.gateway.resolver().read().get_peer_ip_for_address(address) { - // Resend the batch proposal to the validator for signing. - Some(peer_ip) => { - let (gateway, event_, round) = (self.gateway.clone(), event.clone(), proposal.round()); - tokio::spawn(async move { - debug!("Resending batch proposal for round {round} to peer '{peer_ip}'"); - // Resend the batch proposal to the peer. - if gateway.send(peer_ip, event_).await.is_none() { - warn!("Failed to resend batch proposal for round {round} to peer '{peer_ip}'"); - } - }); + // If there is a batch being proposed or certified already, handle accordingly. + match &*self.proposed_batch.read() { + ProposedBatchState::Certifying(proposal) => { + // Ensure that the storage is caught up to the proposal before proceeding to rebroadcast this. + if round < proposal.round() + || proposal + .batch_header() + .previous_certificate_ids() + .iter() + .any(|id| !self.storage.contains_certificate(*id)) + { + warn!( + "Cannot propose a batch for round {} - the current storage (round {round}) is not caught up to the proposed batch.", + proposal.round(), + ); + return Ok(false); + } + // Construct the event. + // TODO(ljedrz): the BatchHeader should be serialized only once in advance before being sent to non-signers. + let event = Event::BatchPropose(proposal.batch_header().clone().into()); + // Iterate through the non-signers. + for address in proposal.nonsigners(&self.ledger.get_committee_lookback_for_round(proposal.round())?) { + // Resolve the address to the peer IP. + match self.gateway.resolver().read().get_peer_ip_for_address(address) { + // Resend the batch proposal to the validator for signing. + Some(peer_ip) => { + let (gateway, event_, round) = (self.gateway.clone(), event.clone(), proposal.round()); + tokio::spawn(async move { + debug!("Resending batch proposal for round {round} to peer '{peer_ip}'"); + // Resend the batch proposal to the peer. + if gateway.send(peer_ip, event_).await.is_none() { + warn!("Failed to resend batch proposal for round {round} to peer '{peer_ip}'"); + } + }); + } + None => continue, } - None => continue, } + debug!("Proposed batch for round {} is still valid", proposal.round()); + return Ok(false); + } + // A batch is being certified; wait until it completes before proposing another. + ProposedBatchState::Certified(_) => { + debug!("Cannot propose a batch for round {round} - a batch is currently being certified"); + return Ok(false); + } + ProposedBatchState::None => { + // No batch in progress, so it is save to propose a new one. } - debug!("Proposed batch for round {} is still valid", proposal.round()); - return Ok(false); } #[cfg(feature = "metrics")] @@ -758,8 +808,8 @@ impl proposal_task::BatchPropose for Primary { // Broadcast the batch to all validators for signing. self.gateway.broadcast(Event::BatchPropose(batch_header.into())); - // Store the proposal. - *self.proposed_batch.write() = Some(proposal); + // Store the proposal in memory. + *self.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(proposal)); // Record the wall-clock time at which the batch was proposed. #[cfg(feature = "metrics")] { @@ -1063,6 +1113,105 @@ impl Primary { Ok(()) } + /// Attempts to add a peer's `signature` for `batch_id` to the current proposal. + /// + /// Consumes `state` and always returns the (possibly updated) state alongside a result so that + /// the caller can restore it unconditionally, keeping `proposed_batch` consistent even on the + /// error path. + /// + /// # Returns + /// * `(Ok(Some(proposal)), Certified(id))` — quorum reached; caller should certify. + /// * `(Ok(None), )` — signature accepted or silently dropped; nothing to do. + /// * `(Err(e), )` — signature rejected; caller should propagate the error. + fn add_signature_to_batch( + &self, + state: ProposedBatchState, + peer_ip: SocketAddr, + batch_id: Field, + signature: Signature, + ) -> (Result>>, ProposedBatchState) { + match state { + ProposedBatchState::Certifying(mut proposal) if proposal.batch_id() == batch_id => { + // This signature is for our currently active proposal. + // Use an inner closure to keep `?` ergonomics while returning a tuple. + let inner: Result = (|| { + let committee_lookback = self.ledger.get_committee_lookback_for_round(proposal.round())?; + let Some(signer) = self.gateway.resolve_to_aleo_addr(peer_ip) else { + bail!("Signature is from a disconnected validator"); + }; + let new_signature = proposal.add_signature(signer, signature, &committee_lookback)?; + if new_signature { + info!("Received a batch signature for round {} from '{peer_ip}'", proposal.round()); + Ok(proposal.is_quorum_threshold_reached(&committee_lookback)) + } else { + debug!( + "Received duplicated signature from '{peer_ip}' for batch \ + {batch_id} in round {round}", + round = proposal.round() + ); + Ok(false) + } + })(); + match inner { + Ok(true) => { + let certified_id = proposal.batch_id(); + (Ok(Some(*proposal)), ProposedBatchState::Certified(certified_id)) + } + Ok(false) => (Ok(None), ProposedBatchState::Certifying(proposal)), + Err(e) => (Err(e), ProposedBatchState::Certifying(proposal)), + } + } + ProposedBatchState::Certifying(proposal) => { + // Certifying a different proposal — check if batch_id is already in storage. + if self.storage.contains_batch(batch_id) { + debug!( + "Primary is safely skipping a batch signature from {peer_ip} for \ + round {} - batch is already certified", + proposal.round() + ); + (Ok(None), ProposedBatchState::Certifying(proposal)) + } else { + let expected_id = proposal.batch_id(); + let round = proposal.round(); + ( + Err(anyhow!("Unknown batch ID '{batch_id}', expected '{expected_id}' for round {round}")), + ProposedBatchState::Certifying(proposal), + ) + } + } + ProposedBatchState::Certified(id) if id == batch_id => { + // Quorum already reached; late-arriving signature is harmless. + debug!( + "Skipping batch signature from {peer_ip} for batch '{batch_id}' - \ + already received sufficient signatures" + ); + (Ok(None), ProposedBatchState::Certified(id)) + } + ProposedBatchState::Certified(id) => { + let result = if self.storage.contains_batch(batch_id) { + // This is most likely not malicious, but could indicate connectivity issues. + warn!("Received signature for an older batch {batch_id}"); + Ok(None) + } else { + Err(anyhow!("Unknown batch ID '{batch_id}'")) + }; + + (result, ProposedBatchState::Certified(id)) + } + ProposedBatchState::None => { + let result = if self.storage.contains_batch(batch_id) { + // This is most likely not malicious, but could indicate connectivity issues. + warn!("Received signature for an older batch {batch_id}"); + Ok(None) + } else { + Err(anyhow!("Unknown batch ID '{batch_id}'")) + }; + + (result, ProposedBatchState::None) + } + } + } + /// Processes a batch signature from a peer. /// /// This method performs the following steps: @@ -1100,60 +1249,11 @@ impl Primary { let Some(proposal) = spawn_blocking!({ // Acquire the write lock. let mut proposed_batch = self_.proposed_batch.write(); - // Add the signature to the batch, and determine if the batch is ready to be certified. - match proposed_batch.as_mut() { - Some(proposal) => { - // Ensure the batch ID matches the currently proposed batch ID. - if proposal.batch_id() != batch_id { - match self_.storage.contains_batch(batch_id) { - // If this batch was already certified, return early. - true => { - debug!( - "Primary is safely skipping a a batch signature from {peer_ip} for round {} - batch is already certified", - proposal.round() - ); - return Ok(None); - } - // If the batch ID is unknown, return an error. - false => bail!( - "Unknown batch ID '{batch_id}', expected '{}' for round {}", - proposal.batch_id(), - proposal.round() - ), - } - } - // Retrieve the committee lookback for the round. - let committee_lookback = self_.ledger.get_committee_lookback_for_round(proposal.round())?; - // Retrieve the address of the validator. - let Some(signer) = self_.gateway.resolve_to_aleo_addr(peer_ip) else { - bail!("Signature is from a disconnected validator"); - }; - // Add the signature to the batch. - let new_signature = proposal.add_signature(signer, signature, &committee_lookback)?; - if new_signature { - info!("Received a batch signature for round {} from '{peer_ip}'", proposal.round()); - // Check if the batch is ready to be certified. - if !proposal.is_quorum_threshold_reached(&committee_lookback) { - // If the batch is not ready to be certified, return early. - return Ok(None); - } - } else { - debug!( - "Received duplicated signature from '{peer_ip}' for batch {batch_id} in round {round}", - round = proposal.round() - ); - return Ok(None); - } - } - // There is no proposed batch, so return early. - None => return Ok(None), - }; - // Retrieve the batch proposal, clearing the proposed batch. - match proposed_batch.take() { - Some(proposal) => Ok(Some(proposal)), - None => Ok(None), - } + let (result, new_state) = + self_.add_signature_to_batch(std::mem::take(&mut *proposed_batch), peer_ip, batch_id, signature); + *proposed_batch = new_state; + result })? else { return Ok(()); @@ -1593,15 +1693,16 @@ impl Primary { /// Checks if the proposed batch is expired, and clears the proposed batch if it has expired. fn check_proposed_batch_for_expiration(&self) -> Result<()> { // Check if the proposed batch is timed out or stale. - let is_expired = match self.proposed_batch.read().as_ref() { - Some(proposal) => proposal.round() < self.current_round(), - None => false, + // A batch being certified is not considered expired. + let is_expired = match &*self.proposed_batch.read() { + ProposedBatchState::Certifying(proposal) => proposal.round() < self.current_round(), + _ => false, }; // If the batch is expired, clear the proposed batch. if is_expired { // Reset the proposed batch. - let proposal = self.proposed_batch.write().take(); - if let Some(proposal) = proposal { + let old = std::mem::replace(&mut *self.proposed_batch.write(), ProposedBatchState::None); + if let ProposedBatchState::Certifying(proposal) = old { debug!("Cleared expired proposal for round {}", proposal.round()); self.reinsert_transmissions_into_workers(proposal.into_transmissions())?; } @@ -1619,7 +1720,7 @@ impl Primary { // Update to the next round in storage. fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?; // Clear the proposed batch. - *self.proposed_batch.write() = None; + *self.proposed_batch.write() = ProposedBatchState::None; } } @@ -1667,10 +1768,10 @@ impl Primary { bail!("Primary is on round {current_round}, and no longer signing for round {batch_round}") } // Check if the primary is still signing for the batch round. - if let Some(signing_round) = self.proposed_batch.read().as_ref().map(|proposal| proposal.round()) - && signing_round > batch_round + if let ProposedBatchState::Certifying(proposal) = &*self.proposed_batch.read() + && proposal.round() > batch_round { - bail!("Our primary at round {signing_round} is no longer signing for round {batch_round}") + bail!("Our primary at round {} is no longer signing for round {batch_round}", proposal.round()) } Ok(()) } @@ -1737,6 +1838,10 @@ impl Primary { let (storage, certificate_) = (self.storage.clone(), certificate.clone()); spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?; debug!("Stored a batch certificate for round {}", certificate.round()); + // The batch is now in storage, so late-arriving signatures can find it via contains_batch. + // Transition from Certified back to None. + *self.proposed_batch.write() = ProposedBatchState::None; + // If a BFT sender was provided, send the certificate to the BFT. if let Some(cb) = self.primary_callback.get() { // Await the callback to continue. @@ -2064,7 +2169,11 @@ impl Primary { self.handles.lock().drain(..).for_each(|handle| handle.abort()); // Save the current proposal cache to disk. let proposal_cache = { - let proposal = self.proposed_batch.write().take(); + // Only persist a Certifying batch; a Certified batch will appear in pending_certificates. + let proposal = match std::mem::replace(&mut *self.proposed_batch.write(), ProposedBatchState::None) { + ProposedBatchState::Certifying(p) => Some(*p), + _ => None, + }; let signed_proposals = self.signed_proposals.read().clone(); let latest_round = proposal .as_ref() @@ -2249,7 +2358,7 @@ mod tests { if account.address() == primary.gateway.account().address() { continue; } - let batch_id = primary.proposed_batch.read().as_ref().unwrap().batch_id(); + let batch_id = primary.proposed_batch.read().as_proposal().unwrap().batch_id(); let signature = account.sign(&[batch_id], rng).unwrap(); signatures.push((*socket_addr, BatchSignature::new(batch_id, signature))); } @@ -2377,7 +2486,7 @@ mod tests { // Try to propose a batch again. This time, it should succeed. assert!(primary.propose_batch().await.is_ok()); - assert!(primary.proposed_batch.read().is_some()); + assert!(primary.proposed_batch.read().is_proposed()); } #[tokio::test] @@ -2390,7 +2499,7 @@ mod tests { // Try to propose a batch with no transmissions. assert!(primary.propose_batch().await.is_ok()); - assert!(primary.proposed_batch.read().is_some()); + assert!(primary.proposed_batch.read().is_proposed()); } #[tokio::test] @@ -2415,7 +2524,7 @@ mod tests { // Propose a batch again. This time, it should succeed. assert!(primary.propose_batch().await.is_ok()); - assert!(primary.proposed_batch.read().is_some()); + assert!(primary.proposed_batch.read().is_proposed()); } #[tokio::test] @@ -2482,7 +2591,7 @@ mod tests { assert!(primary.propose_batch().await.is_ok()); // Check that the proposal only contains the new transmissions that were not in previous certificates. - let proposed_transmissions = primary.proposed_batch.read().as_ref().unwrap().transmissions().clone(); + let proposed_transmissions = primary.proposed_batch.read().as_proposal().unwrap().transmissions().clone(); assert_eq!(proposed_transmissions.len(), 2); assert!(proposed_transmissions.contains_key(&TransmissionID::Solution(solution_commitment, solution_checksum))); assert!( @@ -2521,7 +2630,7 @@ mod tests { // Try to propose a batch again. This time, it should succeed. assert!(primary.propose_batch().await.is_ok()); // Expect 2/5 transactions to be included in the proposal in addition to the solution. - assert_eq!(primary.proposed_batch.read().as_ref().unwrap().transmissions().len(), 3); + assert_eq!(primary.proposed_batch.read().as_proposal().unwrap().transmissions().len(), 3); // Check the transmissions were correctly drained from the workers. assert_eq!(primary.workers().iter().map(|worker| worker.transmissions().len()).sum::(), 3); } @@ -2878,7 +2987,7 @@ mod tests { // Try to propose a batch again. This time, it should succeed. assert!(primary.propose_batch().await.is_ok()); - assert!(primary.proposed_batch.read().is_some()); + assert!(primary.proposed_batch.read().is_proposed()); } #[tokio::test] @@ -2903,12 +3012,12 @@ mod tests { ); // Store the proposal on the primary. - *primary.proposed_batch.write() = Some(proposal); + *primary.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(proposal)); // Try to propose a batch will terminate early because the storage is behind the proposal. assert!(primary.propose_batch().await.is_ok()); - assert!(primary.proposed_batch.read().is_some()); - assert!(primary.proposed_batch.read().as_ref().unwrap().round() > primary.current_round()); + assert!(primary.proposed_batch.read().is_proposed()); + assert!(primary.proposed_batch.read().as_proposal().unwrap().round() > primary.current_round()); } #[tokio::test(flavor = "multi_thread")] @@ -2931,7 +3040,7 @@ mod tests { ); // Store the proposal on the primary. - *primary.proposed_batch.write() = Some(proposal); + *primary.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(proposal)); // Each committee member signs the batch. let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng); @@ -2972,7 +3081,7 @@ mod tests { ); // Store the proposal on the primary. - *primary.proposed_batch.write() = Some(proposal); + *primary.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(proposal)); // Each committee member signs the batch. let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng); @@ -3010,7 +3119,7 @@ mod tests { ); // Store the proposal on the primary. - *primary.proposed_batch.write() = Some(proposal); + *primary.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(proposal)); // Each committee member signs the batch. let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng); @@ -3048,7 +3157,7 @@ mod tests { ); // Store the proposal on the primary. - *primary.proposed_batch.write() = Some(proposal); + *primary.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(proposal)); // Each committee member signs the batch. let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng); @@ -3063,6 +3172,143 @@ mod tests { assert_eq!(primary.current_round(), round); } + // Tests that a late-arriving signature for a batch that is currently being certified + // (ProposedBatchState::Certified) is silently dropped without error. + // This exercises the race condition where proposed_batch.take() has been called but + // insert_certificate has not yet completed. + #[tokio::test] + async fn test_batch_signature_from_peer_batch_being_certified() { + let mut rng = TestRng::default(); + let (primary, accounts) = primary_without_handlers(&mut rng); + map_account_addresses(&primary, &accounts); + + // Create a valid proposal. + let round = 1; + let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64; + let proposal = create_test_proposal( + primary.gateway.account(), + primary.ledger.current_committee().unwrap(), + round, + Default::default(), + timestamp, + 1, + &mut rng, + ); + let batch_id = proposal.batch_id(); + + // Simulate the race: the batch has been taken for certification but not yet stored. + *primary.proposed_batch.write() = ProposedBatchState::Certified(batch_id); + + // Send a late signature for the batch being certified. + let (socket_addr, account) = + accounts.iter().find(|(_, a)| a.address() != primary.gateway.account().address()).unwrap(); + let signature = account.sign(&[batch_id], &mut rng).unwrap(); + let batch_signature = BatchSignature::new(batch_id, signature); + + // The signature should be accepted without error (silently dropped). + assert!(primary.process_batch_signature_from_peer(*socket_addr, batch_signature).await.is_ok()); + // The batch state is unchanged (still BeingCertified — no new proposal was set). + assert!(matches!(&*primary.proposed_batch.read(), ProposedBatchState::Certified(id) if *id == batch_id)); + } + + // Tests that a signature for a completely unknown batch ID is rejected even when another + // batch is being certified. The BeingCertified state only suppresses errors for its own ID. + #[tokio::test] + async fn test_batch_signature_from_peer_unknown_id_while_certifying() { + let mut rng = TestRng::default(); + let (primary, accounts) = primary_without_handlers(&mut rng); + map_account_addresses(&primary, &accounts); + + // Create two proposals so we have two distinct batch IDs. + let round = 1; + let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64; + let proposal_a = create_test_proposal( + primary.gateway.account(), + primary.ledger.current_committee().unwrap(), + round, + Default::default(), + timestamp, + 1, + &mut rng, + ); + let proposal_b = create_test_proposal( + primary.gateway.account(), + primary.ledger.current_committee().unwrap(), + round, + Default::default(), + timestamp, + 1, + &mut rng, + ); + let batch_id_a = proposal_a.batch_id(); + let batch_id_b = proposal_b.batch_id(); + assert_ne!(batch_id_a, batch_id_b); + + // Simulate certifying batch A. + *primary.proposed_batch.write() = ProposedBatchState::Certified(batch_id_a); + + // Send a signature for batch B (a genuinely unknown ID). + let (socket_addr, account) = + accounts.iter().find(|(_, a)| a.address() != primary.gateway.account().address()).unwrap(); + let signature = account.sign(&[batch_id_b], &mut rng).unwrap(); + let batch_signature = BatchSignature::new(batch_id_b, signature); + + // The signature is for a genuinely unknown ID — should be rejected with an error. + assert!(primary.process_batch_signature_from_peer(*socket_addr, batch_signature).await.is_err()); + } + + // Tests the "already certified" path: a signature arrives after the batch is fully in + // storage and the primary has moved on to a new proposal. + #[tokio::test(flavor = "multi_thread")] + async fn test_batch_signature_from_peer_already_certified() { + let mut rng = TestRng::default(); + let (primary, accounts) = primary_without_handlers(&mut rng); + map_account_addresses(&primary, &accounts); + + // Create and certify a batch so it lands in storage. + let round = 1; + let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64; + let old_proposal = create_test_proposal( + primary.gateway.account(), + primary.ledger.current_committee().unwrap(), + round, + Default::default(), + timestamp, + 1, + &mut rng, + ); + let old_batch_id = old_proposal.batch_id(); + *primary.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(old_proposal)); + let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng); + for (socket_addr, signature) in signatures { + primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap(); + } + // The batch is now in storage. + assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address())); + + // Simulate a new proposal being active. + let new_proposal = create_test_proposal( + primary.gateway.account(), + primary.ledger.current_committee().unwrap(), + round, + Default::default(), + timestamp, + 1, + &mut rng, + ); + assert_ne!(new_proposal.batch_id(), old_batch_id); + *primary.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(new_proposal)); + + // Send a late signature for the already-certified old batch. + let (socket_addr, account) = + accounts.iter().find(|(_, a)| a.address() != primary.gateway.account().address()).unwrap(); + let signature = account.sign(&[old_batch_id], &mut rng).unwrap(); + let batch_signature = BatchSignature::new(old_batch_id, signature); + + // Should be silently accepted (already certified path). + assert!(primary.process_batch_signature_from_peer(*socket_addr, batch_signature).await.is_ok()); + } + #[tokio::test] async fn test_insert_certificate_with_aborted_transmissions() { let round = 3; @@ -3144,4 +3390,209 @@ mod tests { assert!(primary.storage.get_transmission(aborted_transmission_id).is_none()); } } + + // ----------------------------------------------------------------------- + // add_signature_to_batch + // ----------------------------------------------------------------------- + + /// State is `None` and the batch is not in storage — returns an error, state stays `None`. + #[test] + fn test_add_signature_to_batch_none_state() { + let mut rng = TestRng::default(); + let (primary, accounts) = primary_without_handlers(&mut rng); + + let peer_ip = accounts[1].0; + let batch_id = Field::rand(&mut rng); + let signature = accounts[1].1.sign(&[batch_id], &mut rng).unwrap(); + + let (result, new_state) = + primary.add_signature_to_batch(ProposedBatchState::None, peer_ip, batch_id, signature); + + assert!(result.is_err()); + assert_eq!(new_state, ProposedBatchState::None); + } + + /// State is `Certified` with a matching batch ID — silently dropped, state restored. + #[test] + fn test_add_signature_to_batch_certified_matching_id() { + let mut rng = TestRng::default(); + let (primary, accounts) = primary_without_handlers(&mut rng); + + let peer_ip = accounts[1].0; + let batch_id = Field::rand(&mut rng); + let signature = accounts[1].1.sign(&[batch_id], &mut rng).unwrap(); + + let (result, new_state) = + primary.add_signature_to_batch(ProposedBatchState::Certified(batch_id), peer_ip, batch_id, signature); + + assert!(result.unwrap().is_none()); + assert_eq!(new_state, ProposedBatchState::Certified(batch_id)); + } + + /// State is `Certified` with a *different* batch ID — error returned, state becomes `None`. + #[test] + fn test_add_signature_to_batch_certified_different_id() { + let mut rng = TestRng::default(); + let (primary, accounts) = primary_without_handlers(&mut rng); + + let peer_ip = accounts[1].0; + let certified_id = Field::rand(&mut rng); + let other_id = Field::rand(&mut rng); + let signature = accounts[1].1.sign(&[other_id], &mut rng).unwrap(); + + let (result, new_state) = + primary.add_signature_to_batch(ProposedBatchState::Certified(certified_id), peer_ip, other_id, signature); + + assert!(result.is_err()); + assert_eq!(new_state, ProposedBatchState::Certified(certified_id)); + } + + /// State is `Certifying` for a *different* batch ID that **is already in storage** — silently + /// dropped, state restored. + #[tokio::test(flavor = "multi_thread")] + async fn test_add_signature_to_batch_certifying_different_id_in_storage() { + let round = 1; + let mut rng = TestRng::default(); + let (primary, accounts) = primary_without_handlers(&mut rng); + map_account_addresses(&primary, &accounts); + + // Create a proposal owned by the primary. + let proposal = create_test_proposal( + primary.gateway.account(), + primary.ledger.current_committee().unwrap(), + round, + Default::default(), + now(), + 0, + &mut rng, + ); + let proposal_batch_id = proposal.batch_id(); + + // Create and store a *different* certificate so `contains_batch` returns true for it. + let (certificate, transmissions) = + create_batch_certificate(accounts[1].1.address(), &accounts, round, Default::default(), &mut rng); + let stored_batch_id = certificate.batch_id(); + primary.storage.insert_certificate(certificate, transmissions, Default::default()).unwrap(); + + let peer_ip = accounts[1].0; + let signature = accounts[1].1.sign(&[stored_batch_id], &mut rng).unwrap(); + + let (result, new_state) = primary.add_signature_to_batch( + ProposedBatchState::Certifying(Box::new(proposal)), + peer_ip, + stored_batch_id, + signature, + ); + + assert!(result.unwrap().is_none()); + // State is restored with the original proposal. + assert_eq!(new_state.as_proposal().unwrap().batch_id(), proposal_batch_id); + } + + /// State is `Certifying` for a *different* batch ID that is **not in storage** — error + /// returned, state restored. + #[test] + fn test_add_signature_to_batch_certifying_different_id_unknown() { + let mut rng = TestRng::default(); + let (primary, accounts) = primary_without_handlers(&mut rng); + + let proposal = create_test_proposal( + primary.gateway.account(), + primary.ledger.current_committee().unwrap(), + 1, + Default::default(), + now(), + 0, + &mut rng, + ); + let proposal_batch_id = proposal.batch_id(); + + let peer_ip = accounts[1].0; + let unknown_id = Field::rand(&mut rng); + let signature = accounts[1].1.sign(&[unknown_id], &mut rng).unwrap(); + + let (result, new_state) = primary.add_signature_to_batch( + ProposedBatchState::Certifying(Box::new(proposal)), + peer_ip, + unknown_id, + signature, + ); + + assert!(result.is_err()); + assert_eq!(new_state.as_proposal().unwrap().batch_id(), proposal_batch_id); + } + + /// Matching batch ID, valid signature, quorum **not yet** reached — state stays `Certifying`. + #[test] + fn test_add_signature_to_batch_certifying_matching_no_quorum() { + let mut rng = TestRng::default(); + let (primary, accounts) = primary_without_handlers(&mut rng); + map_account_addresses(&primary, &accounts); + + let proposal = create_test_proposal( + primary.gateway.account(), + primary.ledger.current_committee().unwrap(), + 1, + Default::default(), + now(), + 0, + &mut rng, + ); + let batch_id = proposal.batch_id(); + + // Only one peer signs — not enough for quorum. + let peer_ip = accounts[1].0; + let signature = accounts[1].1.sign(&[batch_id], &mut rng).unwrap(); + + let (result, new_state) = primary.add_signature_to_batch( + ProposedBatchState::Certifying(Box::new(proposal)), + peer_ip, + batch_id, + signature, + ); + + assert!(result.unwrap().is_none()); + assert_eq!(new_state.as_proposal().unwrap().batch_id(), batch_id); + } + + /// Matching batch ID, all peers sign — quorum reached, proposal extracted and state becomes + /// `Certified`. + #[test] + fn test_add_signature_to_batch_certifying_matching_quorum_reached() { + let mut rng = TestRng::default(); + let (primary, accounts) = primary_without_handlers(&mut rng); + map_account_addresses(&primary, &accounts); + + let proposal = create_test_proposal( + primary.gateway.account(), + primary.ledger.current_committee().unwrap(), + 1, + Default::default(), + now(), + 0, + &mut rng, + ); + let batch_id = proposal.batch_id(); + + // Add all peer signatures one by one until quorum is reached. + let peers: Vec<_> = + accounts.iter().filter(|(_, a)| a.address() != primary.gateway.account().address()).collect(); + let mut state = ProposedBatchState::Certifying(Box::new(proposal)); + let mut final_result = None; + + for (peer_ip, peer_account) in &peers { + let signature = peer_account.sign(&[batch_id], &mut rng).unwrap(); + let (result, new_state) = primary.add_signature_to_batch(state, *peer_ip, batch_id, signature); + state = new_state; + if result.as_ref().unwrap().is_some() { + final_result = Some(result); + break; + } + } + + // Quorum must have been reached with the committee's peers. + let proposal = final_result.expect("quorum should be reached").unwrap().unwrap(); + assert_eq!(proposal.batch_id(), batch_id); + assert_eq!(state, ProposedBatchState::Certified(batch_id)); + } } diff --git a/node/bft/src/worker.rs b/node/bft/src/worker.rs index 04e657f28b..ef56133b41 100644 --- a/node/bft/src/worker.rs +++ b/node/bft/src/worker.rs @@ -17,6 +17,7 @@ use crate::{ MAX_FETCH_TIMEOUT, MAX_WORKERS, ProposedBatch, + ProposedBatchState, Transport, events::{Event, TransmissionRequest, TransmissionResponse}, helpers::{Pending, Ready, Storage, WorkerReceiver, fmt_id, max_redundant_requests}, @@ -182,7 +183,7 @@ impl Worker { let transmission_id = transmission_id.into(); // Check if the transmission ID exists in the ready queue, proposed batch, storage, or ledger. self.ready.read().contains(transmission_id) - || self.proposed_batch.read().as_ref().is_some_and(|p| p.contains_transmission(transmission_id)) + || matches!(&*self.proposed_batch.read(), ProposedBatchState::Certifying(p) if p.contains_transmission(transmission_id)) || self.storage.contains_transmission(transmission_id) || self.ledger.contains_transmission(&transmission_id).unwrap_or(false) } @@ -201,9 +202,10 @@ impl Worker { return Some(transmission); } // Check if the transmission ID exists in the proposed batch. - if let Some(transmission) = - self.proposed_batch.read().as_ref().and_then(|p| p.get_transmission(transmission_id)) - { + if let Some(transmission) = match &*self.proposed_batch.read() { + ProposedBatchState::Certifying(p) => p.get_transmission(transmission_id), + _ => None, + } { return Some(transmission.clone()); } None diff --git a/node/bft/tests/common/utils.rs b/node/bft/tests/common/utils.rs index 38703cc278..e3b42ac2d7 100644 --- a/node/bft/tests/common/utils.rs +++ b/node/bft/tests/common/utils.rs @@ -232,7 +232,7 @@ pub fn sample_worker( // Sample a gateway. let gateway = sample_gateway(account, storage.clone(), ledger.clone()); // Sample a dummy proposed batch. - let proposed_batch = Arc::new(RwLock::new(None)); + let proposed_batch = Arc::new(RwLock::new(Default::default())); // Construct the worker instance. Worker::new(id, Arc::new(gateway.clone()), storage.clone(), ledger, proposed_batch).unwrap() }