Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/message_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub use self::{
config::*,
errors::*,
mpool_locker::MpoolLocker,
msgpool::{msg_pool::MessagePool, *},
msgpool::{msg_pool::MessagePool, msg_pool::MpoolUpdate, *},
nonce_tracker::NonceTracker,
};

Expand Down
21 changes: 18 additions & 3 deletions src/message_pool/msgpool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
use cid::Cid;
use fvm_ipld_encoding::to_vec;
use parking_lot::RwLock as SyncRwLock;
use tokio::sync::broadcast;
use tracing::error;
use utils::{get_base_fee_lower_bound, recover_sig};

use super::errors::Error;
use crate::message_pool::msgpool::msg_pool::MpoolUpdate;
use crate::message_pool::{
msg_chain::{Chains, create_message_chains},
msg_pool::{
Expand Down Expand Up @@ -225,6 +227,7 @@ pub async fn head_change<T>(
cur_tipset: &SyncRwLock<Tipset>,
key_cache: &SizeTrackingLruCache<Address, Address>,
state_nonce_cache: &SizeTrackingLruCache<StateNonceCacheKey, u64>,
change_publisher: &broadcast::Sender<MpoolUpdate>,
revert: Vec<Tipset>,
apply: Vec<Tipset>,
) -> Result<(), Error>
Expand Down Expand Up @@ -276,13 +279,23 @@ where
};

for msg in smsgs {
mpool_ctx.remove_from_selected_msgs(&msg.from(), msg.sequence(), &mut rmsgs)?;
mpool_ctx.remove_from_selected_msgs(
&msg.from(),
msg.sequence(),
&mut rmsgs,
change_publisher,
)?;
if !repub && republished.write().insert(msg.cid()) {
repub = true;
}
}
for msg in msgs {
mpool_ctx.remove_from_selected_msgs(&msg.from, msg.sequence, &mut rmsgs)?;
mpool_ctx.remove_from_selected_msgs(
&msg.from,
msg.sequence,
&mut rmsgs,
change_publisher,
)?;
if !repub && republished.write().insert(msg.cid()) {
repub = true;
}
Expand Down Expand Up @@ -316,6 +329,7 @@ where
sequence,
TrustPolicy::Trusted,
StrictnessPolicy::Relaxed,
change_publisher,
) {
error!("Failed to read message from reorg to mpool: {}", e);
}
Expand All @@ -339,6 +353,7 @@ impl<T: Provider> MpoolCtx<'_, T> {
from: &Address,
sequence: u64,
rmsgs: &mut HashMap<Address, HashMap<u64, SignedMessage>>,
change_publisher: &broadcast::Sender<MpoolUpdate>,
) -> Result<(), Error> {
if rmsgs
.get_mut(from)
Expand All @@ -347,7 +362,7 @@ impl<T: Provider> MpoolCtx<'_, T> {
&& let Ok(resolved) = resolve_to_key(self.api, self.key_cache, from, self.ts)
.inspect_err(|e| tracing::debug!(%from, "remove: failed to resolve address: {e:#}"))
{
remove(&resolved, self.pending, sequence, true)?;
remove(&resolved, self.pending, sequence, true, change_publisher)?;
}
Ok(())
}
Expand Down
72 changes: 58 additions & 14 deletions src/message_pool/msgpool/msg_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use get_size2::GetSize;
use itertools::Itertools;
use nonzero_ext::nonzero;
use parking_lot::RwLock as SyncRwLock;
use tokio::sync::broadcast;
use tokio::{sync::broadcast::error::RecvError, task::JoinSet, time::interval};
use tracing::warn;

Expand Down Expand Up @@ -231,21 +232,23 @@ impl MsgSet {

/// Remove the message at `sequence` and adjust `next_sequence`.
///
/// Returns the removed message, or `None` if `sequence` was not present.
///
/// - **Applied** (included on-chain): advance `next_sequence` to
/// `sequence + 1` if needed. For messages not in our pool, also run
/// the gap-filling loop to advance past consecutive known messages.
/// - **Pruned** (evicted): rewind `next_sequence` to `sequence` if the
/// removal creates a gap.
pub fn rm(&mut self, sequence: u64, applied: bool) {
if self.msgs.remove(&sequence).is_none() {
pub fn rm(&mut self, sequence: u64, applied: bool) -> Option<SignedMessage> {
let Some(removed) = self.msgs.remove(&sequence) else {
if applied && sequence >= self.next_sequence {
self.next_sequence = sequence + 1;
while self.msgs.contains_key(&self.next_sequence) {
self.next_sequence += 1;
}
}
return;
}
return None;
};
metrics::MPOOL_MESSAGE_TOTAL.dec();

// adjust next sequence
Expand All @@ -255,16 +258,28 @@ impl MsgSet {
if sequence >= self.next_sequence {
self.next_sequence = sequence + 1;
}
return;
}
// we removed a message because it was pruned
// we have to adjust the sequence if it creates a gap or rewinds state
if sequence < self.next_sequence {
} else if sequence < self.next_sequence {
// we removed a message because it was pruned
// we have to adjust the sequence if it creates a gap or rewinds state
self.next_sequence = sequence;
}
Some(removed)
}
}

/// Capacity of the mpool changes broadcast channel.
///
/// Sized to absorb reorg-replay bursts (many `Add` events fired in rapid
/// succession from `head_change`) while a single subscriber drains. Subscribers
/// that fall further behind receive `Lagged` and drop events.
const MPOOL_CHANGES_CHANNEL_CAPACITY: usize = 256;

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum MpoolUpdate {
Add(SignedMessage),
Remove(SignedMessage),
}

/// This contains all necessary information needed for the message pool.
/// Keeps track of messages to apply, as well as context needed for verifying
/// transactions.
Expand Down Expand Up @@ -297,6 +312,8 @@ pub struct MessagePool<T> {
pub config: MpoolConfig,
/// Chain configuration
pub chain_config: Arc<ChainConfig>,
/// Publishes the changes in the message pool
pub(crate) changes: broadcast::Sender<MpoolUpdate>,
}

/// Resolve an address to its key form, checking the cache first.
Expand Down Expand Up @@ -513,7 +530,7 @@ where
} else {
StrictnessPolicy::Strict
};
self.add_helper(msg, trust_policy, strictness)?;
self.add_helper(msg, trust_policy, strictness, &self.changes)?;
Ok(publish)
}

Expand All @@ -526,6 +543,7 @@ where
msg: SignedMessage,
trust_policy: TrustPolicy,
strictness: StrictnessPolicy,
change_publisher: &broadcast::Sender<MpoolUpdate>,
) -> Result<(), Error> {
let from = msg.from();
let cur_ts = self.current_tipset();
Expand All @@ -539,6 +557,7 @@ where
self.get_state_sequence(&from, &cur_ts)?,
trust_policy,
strictness,
change_publisher,
)
}

Expand Down Expand Up @@ -697,6 +716,7 @@ where
self.cur_tipset.as_ref(),
self.key_cache.as_ref(),
self.state_nonce_cache.as_ref(),
&self.changes,
revert,
apply,
)
Expand Down Expand Up @@ -743,6 +763,7 @@ where
let block_delay = chain_config.block_delay_secs;

let (repub_trigger, repub_trigger_rx) = flume::bounded::<()>(4);
let (change_publisher, _) = broadcast::channel(MPOOL_CHANGES_CHANNEL_CAPACITY);
let mut mp = MessagePool {
local_addrs,
pending,
Expand All @@ -758,6 +779,7 @@ where
network_sender,
repub_trigger,
chain_config: Arc::clone(&chain_config),
changes: change_publisher.clone(),
};

mp.load_local()?;
Expand Down Expand Up @@ -788,6 +810,7 @@ where
&current_ts,
key_cache.as_ref(),
state_nonce_cache.as_ref(),
&change_publisher,
reverts,
applies,
)
Expand Down Expand Up @@ -841,6 +864,10 @@ where
});
Ok(mp)
}

pub fn subscribe_to_updates(&self) -> broadcast::Receiver<MpoolUpdate> {
self.changes.subscribe()
}
}

// Helpers for MessagePool
Expand All @@ -860,6 +887,7 @@ pub(in crate::message_pool) fn add_helper<T>(
sequence: u64,
trust_policy: TrustPolicy,
strictness: StrictnessPolicy,
change_publisher: &broadcast::Sender<MpoolUpdate>,
) -> Result<(), Error>
where
T: Provider,
Expand All @@ -876,11 +904,17 @@ where
let mset = pending
.entry(resolved_from)
.or_insert_with(|| MsgSet::new(sequence));

let event_msg = crate::utils::broadcast::has_subscribers(change_publisher).then(|| msg.clone());
match trust_policy {
TrustPolicy::Trusted => mset.add_trusted(api, msg, strictness)?,
TrustPolicy::Untrusted => mset.add_untrusted(api, msg, strictness)?,
}

if let Some(msg) = event_msg {
let _ = change_publisher.send(MpoolUpdate::Add(msg));
}

Ok(())
}

Expand Down Expand Up @@ -924,15 +958,18 @@ pub fn remove(
pending: &SyncRwLock<HashMap<Address, MsgSet>>,
sequence: u64,
applied: bool,
change_publisher: &broadcast::Sender<MpoolUpdate>,
) -> Result<(), Error> {
let mut pending = pending.write();
let mset = if let Some(mset) = pending.get_mut(from) {
mset
} else {
let Some(mset) = pending.get_mut(from) else {
return Ok(());
};

mset.rm(sequence, applied);
if let Some(removed) = mset.rm(sequence, applied)
&& crate::utils::broadcast::has_subscribers(change_publisher)
{
let _ = change_publisher.send(MpoolUpdate::Remove(removed));
}

if mset.msgs.is_empty() {
pending.remove(from);
Expand Down Expand Up @@ -981,6 +1018,7 @@ mod tests {
};
let msg = SignedMessage::mock_bls_signed_message(message);
let sequence = msg.message().sequence;
let (change_publisher, _) = broadcast::channel(1);
let res = add_helper(
&api,
&bls_sig_cache,
Expand All @@ -991,6 +1029,7 @@ mod tests {
sequence,
TrustPolicy::Trusted,
StrictnessPolicy::Relaxed,
&change_publisher,
);
assert!(res.is_ok());
}
Expand Down Expand Up @@ -1089,6 +1128,7 @@ mod tests {
};
let msg = SignedMessage::mock_bls_signed_message(message);

let (change_publisher, _) = broadcast::channel(1);
add_helper(
&api,
&bls_sig_cache,
Expand All @@ -1099,6 +1139,7 @@ mod tests {
0,
TrustPolicy::Trusted,
StrictnessPolicy::Relaxed,
&change_publisher,
)
.unwrap();

Expand Down Expand Up @@ -1126,6 +1167,8 @@ mod tests {
api.set_key_address_mapping(&id_addr, &key_addr);
api.set_state_sequence(&key_addr, 0);

let (change_publisher, _) = broadcast::channel(1);

// Add two messages from the ID address
for seq in 0..2 {
let message = ShimMessage {
Expand All @@ -1145,6 +1188,7 @@ mod tests {
0,
TrustPolicy::Trusted,
StrictnessPolicy::Relaxed,
&change_publisher,
)
.unwrap();
}
Expand Down
20 changes: 17 additions & 3 deletions src/message_pool/msgpool/selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use ahash::{HashMap, HashMapExt};
use anyhow::{Context, bail, ensure};
use parking_lot::RwLock;
use rand::prelude::SliceRandom;
use tokio::sync::broadcast;
use tracing::{debug, error, warn};

use crate::shim::crypto::Signature;
Expand All @@ -25,7 +26,7 @@ use crate::utils::get_size::CidWrapper;

use super::{MpoolCtx, msg_pool::MessagePool, provider::Provider, utils::recover_sig};
use crate::message_pool::{
Error, add_to_selected_msgs,
Error, MpoolUpdate, add_to_selected_msgs,
msg_chain::{Chains, NodeKey, create_message_chains},
msg_pool::MsgSet,
msgpool::MIN_GAS,
Expand Down Expand Up @@ -671,6 +672,7 @@ where
cur_ts.clone(),
ts.clone(),
&mut result,
&self.changes,
)?;

Ok(result)
Expand Down Expand Up @@ -816,6 +818,7 @@ fn merge_and_trim(
/// It simulates a head change call.
// This logic should probably be implemented in the ChainStore. It handles
// reorgs.
#[allow(clippy::too_many_arguments)]
pub(in crate::message_pool) fn run_head_change<T>(
api: &T,
bls_sig_cache: &SizeTrackingLruCache<CidWrapper, Signature>,
Expand All @@ -824,6 +827,7 @@ pub(in crate::message_pool) fn run_head_change<T>(
from: Tipset,
to: Tipset,
rmsgs: &mut HashMap<Address, HashMap<u64, SignedMessage>>,
change_publisher: &broadcast::Sender<MpoolUpdate>,
) -> Result<(), Error>
where
T: Provider,
Expand Down Expand Up @@ -873,10 +877,20 @@ where
let (msgs, smsgs) = api.messages_for_block(b)?;

for msg in smsgs {
mpool_ctx.remove_from_selected_msgs(&msg.from(), msg.sequence(), rmsgs)?;
mpool_ctx.remove_from_selected_msgs(
&msg.from(),
msg.sequence(),
rmsgs,
change_publisher,
)?;
}
for msg in msgs {
mpool_ctx.remove_from_selected_msgs(&msg.from, msg.sequence, rmsgs)?;
mpool_ctx.remove_from_selected_msgs(
&msg.from,
msg.sequence,
rmsgs,
change_publisher,
)?;
}
}
}
Expand Down
Loading
Loading