diff --git a/src/message_pool/mod.rs b/src/message_pool/mod.rs index b185834eb01a..b7544c10a374 100644 --- a/src/message_pool/mod.rs +++ b/src/message_pool/mod.rs @@ -12,7 +12,7 @@ pub use self::{ config::*, errors::*, mpool_locker::MpoolLocker, - msgpool::{msg_pool::MessagePool, *}, + msgpool::{msg_pool::MessagePool, msg_pool::MpoolUpdate, *}, nonce_tracker::NonceTracker, }; diff --git a/src/message_pool/msgpool/mod.rs b/src/message_pool/msgpool/mod.rs index 2dba45cf89d3..1ec4850577df 100644 --- a/src/message_pool/msgpool/mod.rs +++ b/src/message_pool/msgpool/mod.rs @@ -23,10 +23,12 @@ use ahash::{HashMap, HashMapExt, HashSet, HashSetExt}; use cid::Cid; use fvm_ipld_encoding::to_vec; use parking_lot::RwLock as SyncRwLock; +use tokio::sync::broadcast; use tracing::error; use utils::{get_base_fee_lower_bound, recover_sig}; use super::errors::Error; +use crate::message_pool::msgpool::msg_pool::MpoolUpdate; use crate::message_pool::{ msg_chain::{Chains, create_message_chains}, msg_pool::{ @@ -225,6 +227,7 @@ pub async fn head_change( cur_tipset: &SyncRwLock, key_cache: &SizeTrackingLruCache, state_nonce_cache: &SizeTrackingLruCache, + change_publisher: &broadcast::Sender, revert: Vec, apply: Vec, ) -> Result<(), Error> @@ -276,13 +279,23 @@ where }; for msg in smsgs { - mpool_ctx.remove_from_selected_msgs(&msg.from(), msg.sequence(), &mut rmsgs)?; + mpool_ctx.remove_from_selected_msgs( + &msg.from(), + msg.sequence(), + &mut rmsgs, + change_publisher, + )?; if !repub && republished.write().insert(msg.cid()) { repub = true; } } for msg in msgs { - mpool_ctx.remove_from_selected_msgs(&msg.from, msg.sequence, &mut rmsgs)?; + mpool_ctx.remove_from_selected_msgs( + &msg.from, + msg.sequence, + &mut rmsgs, + change_publisher, + )?; if !repub && republished.write().insert(msg.cid()) { repub = true; } @@ -316,6 +329,7 @@ where sequence, TrustPolicy::Trusted, StrictnessPolicy::Relaxed, + change_publisher, ) { error!("Failed to read message from reorg to mpool: {}", e); } @@ -339,6 +353,7 @@ impl MpoolCtx<'_, T> { from: &Address, sequence: u64, rmsgs: &mut HashMap>, + change_publisher: &broadcast::Sender, ) -> Result<(), Error> { if rmsgs .get_mut(from) @@ -347,7 +362,7 @@ impl MpoolCtx<'_, T> { && let Ok(resolved) = resolve_to_key(self.api, self.key_cache, from, self.ts) .inspect_err(|e| tracing::debug!(%from, "remove: failed to resolve address: {e:#}")) { - remove(&resolved, self.pending, sequence, true)?; + remove(&resolved, self.pending, sequence, true, change_publisher)?; } Ok(()) } diff --git a/src/message_pool/msgpool/msg_pool.rs b/src/message_pool/msgpool/msg_pool.rs index 04003b37554f..d040b82065f9 100644 --- a/src/message_pool/msgpool/msg_pool.rs +++ b/src/message_pool/msgpool/msg_pool.rs @@ -35,6 +35,7 @@ use get_size2::GetSize; use itertools::Itertools; use nonzero_ext::nonzero; use parking_lot::RwLock as SyncRwLock; +use tokio::sync::broadcast; use tokio::{sync::broadcast::error::RecvError, task::JoinSet, time::interval}; use tracing::warn; @@ -231,21 +232,23 @@ impl MsgSet { /// Remove the message at `sequence` and adjust `next_sequence`. /// + /// Returns the removed message, or `None` if `sequence` was not present. + /// /// - **Applied** (included on-chain): advance `next_sequence` to /// `sequence + 1` if needed. For messages not in our pool, also run /// the gap-filling loop to advance past consecutive known messages. /// - **Pruned** (evicted): rewind `next_sequence` to `sequence` if the /// removal creates a gap. - pub fn rm(&mut self, sequence: u64, applied: bool) { - if self.msgs.remove(&sequence).is_none() { + pub fn rm(&mut self, sequence: u64, applied: bool) -> Option { + let Some(removed) = self.msgs.remove(&sequence) else { if applied && sequence >= self.next_sequence { self.next_sequence = sequence + 1; while self.msgs.contains_key(&self.next_sequence) { self.next_sequence += 1; } } - return; - } + return None; + }; metrics::MPOOL_MESSAGE_TOTAL.dec(); // adjust next sequence @@ -255,16 +258,28 @@ impl MsgSet { if sequence >= self.next_sequence { self.next_sequence = sequence + 1; } - return; - } - // we removed a message because it was pruned - // we have to adjust the sequence if it creates a gap or rewinds state - if sequence < self.next_sequence { + } else if sequence < self.next_sequence { + // we removed a message because it was pruned + // we have to adjust the sequence if it creates a gap or rewinds state self.next_sequence = sequence; } + Some(removed) } } +/// Capacity of the mpool changes broadcast channel. +/// +/// Sized to absorb reorg-replay bursts (many `Add` events fired in rapid +/// succession from `head_change`) while a single subscriber drains. Subscribers +/// that fall further behind receive `Lagged` and drop events. +const MPOOL_CHANGES_CHANNEL_CAPACITY: usize = 256; + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub enum MpoolUpdate { + Add(SignedMessage), + Remove(SignedMessage), +} + /// This contains all necessary information needed for the message pool. /// Keeps track of messages to apply, as well as context needed for verifying /// transactions. @@ -297,6 +312,8 @@ pub struct MessagePool { pub config: MpoolConfig, /// Chain configuration pub chain_config: Arc, + /// Publishes the changes in the message pool + pub(crate) changes: broadcast::Sender, } /// Resolve an address to its key form, checking the cache first. @@ -513,7 +530,7 @@ where } else { StrictnessPolicy::Strict }; - self.add_helper(msg, trust_policy, strictness)?; + self.add_helper(msg, trust_policy, strictness, &self.changes)?; Ok(publish) } @@ -526,6 +543,7 @@ where msg: SignedMessage, trust_policy: TrustPolicy, strictness: StrictnessPolicy, + change_publisher: &broadcast::Sender, ) -> Result<(), Error> { let from = msg.from(); let cur_ts = self.current_tipset(); @@ -539,6 +557,7 @@ where self.get_state_sequence(&from, &cur_ts)?, trust_policy, strictness, + change_publisher, ) } @@ -697,6 +716,7 @@ where self.cur_tipset.as_ref(), self.key_cache.as_ref(), self.state_nonce_cache.as_ref(), + &self.changes, revert, apply, ) @@ -743,6 +763,7 @@ where let block_delay = chain_config.block_delay_secs; let (repub_trigger, repub_trigger_rx) = flume::bounded::<()>(4); + let (change_publisher, _) = broadcast::channel(MPOOL_CHANGES_CHANNEL_CAPACITY); let mut mp = MessagePool { local_addrs, pending, @@ -758,6 +779,7 @@ where network_sender, repub_trigger, chain_config: Arc::clone(&chain_config), + changes: change_publisher.clone(), }; mp.load_local()?; @@ -788,6 +810,7 @@ where ¤t_ts, key_cache.as_ref(), state_nonce_cache.as_ref(), + &change_publisher, reverts, applies, ) @@ -841,6 +864,10 @@ where }); Ok(mp) } + + pub fn subscribe_to_updates(&self) -> broadcast::Receiver { + self.changes.subscribe() + } } // Helpers for MessagePool @@ -860,6 +887,7 @@ pub(in crate::message_pool) fn add_helper( sequence: u64, trust_policy: TrustPolicy, strictness: StrictnessPolicy, + change_publisher: &broadcast::Sender, ) -> Result<(), Error> where T: Provider, @@ -876,11 +904,17 @@ where let mset = pending .entry(resolved_from) .or_insert_with(|| MsgSet::new(sequence)); + + let event_msg = crate::utils::broadcast::has_subscribers(change_publisher).then(|| msg.clone()); match trust_policy { TrustPolicy::Trusted => mset.add_trusted(api, msg, strictness)?, TrustPolicy::Untrusted => mset.add_untrusted(api, msg, strictness)?, } + if let Some(msg) = event_msg { + let _ = change_publisher.send(MpoolUpdate::Add(msg)); + } + Ok(()) } @@ -924,15 +958,18 @@ pub fn remove( pending: &SyncRwLock>, sequence: u64, applied: bool, + change_publisher: &broadcast::Sender, ) -> Result<(), Error> { let mut pending = pending.write(); - let mset = if let Some(mset) = pending.get_mut(from) { - mset - } else { + let Some(mset) = pending.get_mut(from) else { return Ok(()); }; - mset.rm(sequence, applied); + if let Some(removed) = mset.rm(sequence, applied) + && crate::utils::broadcast::has_subscribers(change_publisher) + { + let _ = change_publisher.send(MpoolUpdate::Remove(removed)); + } if mset.msgs.is_empty() { pending.remove(from); @@ -981,6 +1018,7 @@ mod tests { }; let msg = SignedMessage::mock_bls_signed_message(message); let sequence = msg.message().sequence; + let (change_publisher, _) = broadcast::channel(1); let res = add_helper( &api, &bls_sig_cache, @@ -991,6 +1029,7 @@ mod tests { sequence, TrustPolicy::Trusted, StrictnessPolicy::Relaxed, + &change_publisher, ); assert!(res.is_ok()); } @@ -1089,6 +1128,7 @@ mod tests { }; let msg = SignedMessage::mock_bls_signed_message(message); + let (change_publisher, _) = broadcast::channel(1); add_helper( &api, &bls_sig_cache, @@ -1099,6 +1139,7 @@ mod tests { 0, TrustPolicy::Trusted, StrictnessPolicy::Relaxed, + &change_publisher, ) .unwrap(); @@ -1126,6 +1167,8 @@ mod tests { api.set_key_address_mapping(&id_addr, &key_addr); api.set_state_sequence(&key_addr, 0); + let (change_publisher, _) = broadcast::channel(1); + // Add two messages from the ID address for seq in 0..2 { let message = ShimMessage { @@ -1145,6 +1188,7 @@ mod tests { 0, TrustPolicy::Trusted, StrictnessPolicy::Relaxed, + &change_publisher, ) .unwrap(); } diff --git a/src/message_pool/msgpool/selection.rs b/src/message_pool/msgpool/selection.rs index b8f4598a24ed..b7dacc4ce32a 100644 --- a/src/message_pool/msgpool/selection.rs +++ b/src/message_pool/msgpool/selection.rs @@ -17,6 +17,7 @@ use ahash::{HashMap, HashMapExt}; use anyhow::{Context, bail, ensure}; use parking_lot::RwLock; use rand::prelude::SliceRandom; +use tokio::sync::broadcast; use tracing::{debug, error, warn}; use crate::shim::crypto::Signature; @@ -25,7 +26,7 @@ use crate::utils::get_size::CidWrapper; use super::{MpoolCtx, msg_pool::MessagePool, provider::Provider, utils::recover_sig}; use crate::message_pool::{ - Error, add_to_selected_msgs, + Error, MpoolUpdate, add_to_selected_msgs, msg_chain::{Chains, NodeKey, create_message_chains}, msg_pool::MsgSet, msgpool::MIN_GAS, @@ -671,6 +672,7 @@ where cur_ts.clone(), ts.clone(), &mut result, + &self.changes, )?; Ok(result) @@ -816,6 +818,7 @@ fn merge_and_trim( /// It simulates a head change call. // This logic should probably be implemented in the ChainStore. It handles // reorgs. +#[allow(clippy::too_many_arguments)] pub(in crate::message_pool) fn run_head_change( api: &T, bls_sig_cache: &SizeTrackingLruCache, @@ -824,6 +827,7 @@ pub(in crate::message_pool) fn run_head_change( from: Tipset, to: Tipset, rmsgs: &mut HashMap>, + change_publisher: &broadcast::Sender, ) -> Result<(), Error> where T: Provider, @@ -873,10 +877,20 @@ where let (msgs, smsgs) = api.messages_for_block(b)?; for msg in smsgs { - mpool_ctx.remove_from_selected_msgs(&msg.from(), msg.sequence(), rmsgs)?; + mpool_ctx.remove_from_selected_msgs( + &msg.from(), + msg.sequence(), + rmsgs, + change_publisher, + )?; } for msg in msgs { - mpool_ctx.remove_from_selected_msgs(&msg.from, msg.sequence, rmsgs)?; + mpool_ctx.remove_from_selected_msgs( + &msg.from, + msg.sequence, + rmsgs, + change_publisher, + )?; } } } diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index c5d52a3d1c91..6bdb6cd4e4b2 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -17,9 +17,10 @@ use crate::lotus_json::{HasLotusJson, LotusJson, lotus_json_with_self}; #[cfg(test)] use crate::lotus_json::{assert_all_snapshots, assert_unchanged_via_json}; use crate::message::{ChainMessage, SignedMessage}; -use crate::rpc::eth::Block as EthBlock; +use crate::message_pool::MpoolUpdate; use crate::rpc::eth::{ - EthLog, TxInfo, eth_logs_with_filter, types::ApiHeaders, types::EthFilterSpec, + Block as EthBlock, EthLog, TxInfo, eth_logs_with_filter, eth_tx_hash_from_signed_message, + types::ApiHeaders, types::EthFilterSpec, types::EthHash, }; use crate::rpc::f3::F3ExportLatestSnapshot; use crate::rpc::types::*; @@ -29,12 +30,14 @@ use crate::shim::error::ExitCode; use crate::shim::executor::Receipt; use crate::shim::message::Message; use crate::utils::ShallowClone; +use crate::utils::broadcast::subscription_stream; use crate::utils::db::CborStoreExt as _; use crate::utils::io::VoidAsyncWriter; use crate::utils::misc::env::is_env_truthy; use anyhow::{Context as _, Result}; use cid::Cid; use enumflags2::{BitFlags, make_bitflags}; +use futures::StreamExt as _; use fvm_ipld_blockstore::Blockstore; use fvm_ipld_encoding::{CborStore, RawBytes}; use hex::ToHex; @@ -86,24 +89,19 @@ pub(crate) fn new_heads( data: Ctx, ) -> (Subscriber, JoinHandle<()>) { let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY); - - let mut head_changes_rx = data.chain_store().subscribe_head_changes(); + let mut stream = subscription_stream(data.chain_store().subscribe_head_changes()); let handle = tokio::spawn(async move { - while let Ok(changes) = head_changes_rx.recv().await { + while let Some(changes) = stream.next().await { for ts in changes.applies { - // Convert the tipset to an Ethereum block with full transaction info - // Note: In Filecoin's Eth RPC, a tipset maps to a single Ethereum block + // In Filecoin's Eth RPC, a tipset maps to a single Ethereum block. match EthBlock::from_filecoin_tipset(data.clone(), ts, TxInfo::Full).await { Ok(block) => { - if let Err(e) = sender.send(ApiHeaders(block)) { - tracing::error!("Failed to send headers: {}", e); + if sender.send(ApiHeaders(block)).is_err() { return; } } - Err(e) => { - tracing::error!("Failed to convert tipset to eth block: {}", e); - } + Err(e) => tracing::error!("Failed to convert tipset to eth block: {}", e), } } } @@ -112,6 +110,31 @@ pub(crate) fn new_heads( (receiver, handle) } +/// Subscribe to mpool changes and broadcast the new pending transaction (only newly Added transaction is broadcasted) +pub(crate) fn new_pending_transaction( + ctx: &Ctx, +) -> (Subscriber, JoinHandle<()>) { + let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY); + let mut stream = subscription_stream(ctx.mpool.subscribe_to_updates()); + let eth_chain_id = ctx.chain_config().eth_chain_id; + + let handle = tokio::spawn(async move { + while let Some(update) = stream.next().await { + let MpoolUpdate::Add(msg) = update else { + continue; + }; + let Ok(hash) = eth_tx_hash_from_signed_message(&msg, eth_chain_id) else { + continue; + }; + if sender.send(hash).is_err() { + return; + } + } + }); + + (receiver, handle) +} + /// Subscribes to head changes from the chain store and broadcasts new `Ethereum` logs. /// /// # Notes @@ -123,21 +146,16 @@ pub(crate) fn logs( filter: Option, ) -> (Subscriber>, JoinHandle<()>) { let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY); - - let mut head_changes_rx = ctx.chain_store().subscribe_head_changes(); - + let mut stream = subscription_stream(ctx.chain_store().subscribe_head_changes()); let ctx = ctx.clone(); let handle = tokio::spawn(async move { - while let Ok(changes) = head_changes_rx.recv().await { + while let Some(changes) = stream.next().await { for ts in changes.applies { - match eth_logs_with_filter(&ctx, &ts, filter.clone(), None).await { + match eth_logs_with_filter(&ctx, &ts, filter.as_ref(), None).await { Ok(logs) => { - if !logs.is_empty() - && let Err(e) = sender.send(logs) - { - tracing::error!("Failed to send logs for tipset {}: {}", ts.key(), e); - break; + if !logs.is_empty() && sender.send(logs).is_err() { + return; } } Err(e) => { diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index b895184646da..985ab3237064 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -1310,24 +1310,18 @@ pub async fn eth_logs_for_block_and_transaction( ctx: &Ctx, ts: &Tipset, - spec: Option, + spec: Option<&EthFilterSpec>, tx_hash: Option<&EthHash>, ) -> anyhow::Result> { let mut events = vec![]; - EthEventHandler::collect_events( - ctx, - ts, - spec.as_ref(), - SkipEvent::OnUnresolvedAddress, - &mut events, - ) - .await?; + EthEventHandler::collect_events(ctx, ts, spec, SkipEvent::OnUnresolvedAddress, &mut events) + .await?; let logs = eth_filter_logs_from_events(ctx, &events)?; Ok(match tx_hash { @@ -2535,15 +2529,8 @@ impl RpcMethod<1> for EthGetTransactionHashByCid { if let Ok(smsgs) = smsgs_result && let Some(smsg) = smsgs.first() { - let hash = if smsg.is_delegated() { - let chain_id = ctx.chain_config().eth_chain_id; - let (_, tx) = eth_tx_from_signed_eth_message(smsg, chain_id)?; - tx.eth_hash()?.into() - } else if smsg.is_secp256k1() { - smsg.cid().into() - } else { - smsg.message().cid().into() - }; + let hash = eth_tx_hash_from_signed_message(smsg, ctx.chain_config().eth_chain_id) + .context("failed to get eth tx hash from signed message")?; return Ok(Some(hash)); } @@ -3027,7 +3014,7 @@ fn eth_log_from_event(entries: &[EventEntry]) -> Option<(EthBytes, Vec) Some((data, topics)) } -fn eth_tx_hash_from_signed_message( +pub fn eth_tx_hash_from_signed_message( message: &SignedMessage, eth_chain_id: EthChainIdType, ) -> anyhow::Result { diff --git a/src/rpc/methods/eth/pubsub.rs b/src/rpc/methods/eth/pubsub.rs index d69cb082e006..dbcb263189b9 100644 --- a/src/rpc/methods/eth/pubsub.rs +++ b/src/rpc/methods/eth/pubsub.rs @@ -65,7 +65,7 @@ use crate::rpc::eth::pubsub_trait::{ use crate::rpc::{RPCState, chain}; use fvm_ipld_blockstore::Blockstore; use jsonrpsee::PendingSubscriptionSink; -use jsonrpsee::core::{SubscriptionError, SubscriptionResult}; +use jsonrpsee::core::SubscriptionResult; use std::sync::Arc; use tokio::sync::broadcast::{Receiver as Subscriber, error::RecvError}; @@ -91,13 +91,7 @@ where match kind { SubscriptionKind::NewHeads => self.handle_new_heads_subscription(sink, ctx).await, SubscriptionKind::PendingTransactions => { - return Err(SubscriptionError::from( - jsonrpsee::types::ErrorObjectOwned::owned( - jsonrpsee::types::error::METHOD_NOT_FOUND_CODE, - "pendingTransactions subscription not yet implemented", - None::<()>, - ), - )); + self.handle_pending_transaction(sink, ctx).await } SubscriptionKind::Logs => { let filter = params.and_then(|p| p.filter); @@ -136,6 +130,17 @@ where handle_subscription(logs, accepted_sink, handle).await; }); } + + async fn handle_pending_transaction( + &self, + accepted_sink: jsonrpsee::SubscriptionSink, + ctx: Arc>, + ) { + let (pending_rx, handle) = chain::new_pending_transaction(&ctx); + tokio::spawn(async move { + handle_subscription(pending_rx, accepted_sink, handle).await; + }); + } } async fn handle_subscription( diff --git a/src/utils/broadcast/mod.rs b/src/utils/broadcast/mod.rs index ee34a6de496a..4e67e69fce2f 100644 --- a/src/utils/broadcast/mod.rs +++ b/src/utils/broadcast/mod.rs @@ -4,9 +4,25 @@ #[cfg(test)] mod tests; -use futures::FutureExt as _; +use std::pin::Pin; + +use futures::{FutureExt as _, Stream, StreamExt as _}; +use tokio::sync::broadcast::{Receiver, Sender}; +use tokio_stream::wrappers::BroadcastStream; /// Returns `true` if there are any active subscribers to the given broadcast channel. -pub fn has_subscribers(tx: &tokio::sync::broadcast::Sender) -> bool { +pub fn has_subscribers(tx: &Sender) -> bool { tx.closed().now_or_never().is_none() } + +/// Wraps a broadcast [`Receiver`] as a pinned [`Stream`] that skips `Lagged` +/// events and terminates on `Closed`. +/// +/// Use this in place of a manual `rx.recv()` loop so the lagged/closed handling +/// stays DRY while each caller retains ownership of its own state across +/// iterations (no per-event `Arc::clone`s). +pub fn subscription_stream( + rx: Receiver, +) -> Pin + Send>> { + Box::pin(BroadcastStream::new(rx).filter_map(|r| async move { r.ok() })) +}