diff --git a/src/state_manager/actor_queries.rs b/src/state_manager/actor_queries.rs new file mode 100644 index 00000000000..5eb21db2376 --- /dev/null +++ b/src/state_manager/actor_queries.rs @@ -0,0 +1,180 @@ +// Copyright 2019-2026 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use super::*; +use crate::shim::actors::miner::{MinerInfo, MinerPower, Partition}; +use crate::shim::actors::verifreg::ext::VerifiedRegistryStateExt as _; +use crate::shim::actors::verifreg::{Allocation, AllocationID, Claim}; +use ahash::HashMap; +use fil_actor_verifreg_state::v12::DataCap; +use fil_actor_verifreg_state::v13::ClaimID; +use fil_actors_shared::fvm_ipld_bitfield::BitField; + +impl StateManager +where + DB: Blockstore + Send + Sync + 'static, +{ + /// Retrieves market state + pub fn market_state(&self, ts: &Tipset) -> Result { + let actor = self.get_required_actor(&Address::MARKET_ACTOR, *ts.parent_state())?; + let market_state = market::State::load(self.blockstore(), actor.code, actor.state)?; + Ok(market_state) + } + + /// Retrieves market balance in escrow and locked tables. + pub fn market_balance(&self, addr: &Address, ts: &Tipset) -> Result { + let market_state = self.market_state(ts)?; + let new_addr = self.lookup_required_id(addr, ts)?; + let out = MarketBalance { + escrow: { + market_state + .escrow_table(self.blockstore())? + .get(&new_addr)? + }, + locked: { + market_state + .locked_table(self.blockstore())? + .get(&new_addr)? + }, + }; + + Ok(out) + } + + /// Retrieves miner info. + pub fn miner_info(&self, addr: &Address, ts: &Tipset) -> Result { + let actor = self + .get_actor(addr, *ts.parent_state())? + .ok_or_else(|| Error::state("Miner actor not found"))?; + let state = miner::State::load(self.blockstore(), actor.code, actor.state)?; + + Ok(state.info(self.blockstore())?) + } + + /// Retrieves miner faults. + pub fn miner_faults(&self, addr: &Address, ts: &Tipset) -> Result { + self.all_partition_sectors(addr, ts, |partition| partition.faulty_sectors().clone()) + } + + /// Retrieves miner recoveries. + pub fn miner_recoveries(&self, addr: &Address, ts: &Tipset) -> Result { + self.all_partition_sectors(addr, ts, |partition| partition.recovering_sectors().clone()) + } + + fn all_partition_sectors( + &self, + addr: &Address, + ts: &Tipset, + get_sector: impl Fn(Partition<'_>) -> BitField, + ) -> Result { + let actor = self + .get_actor(addr, *ts.parent_state())? + .ok_or_else(|| Error::state("Miner actor not found"))?; + + let state = miner::State::load(self.blockstore(), actor.code, actor.state)?; + + let mut partitions = Vec::new(); + + state.for_each_deadline( + &self.chain_config().policy, + self.blockstore(), + |_, deadline| { + deadline.for_each(self.blockstore(), |_, partition| { + partitions.push(get_sector(partition)); + Ok(()) + }) + }, + )?; + + Ok(BitField::union(partitions.iter())) + } + + /// Retrieves miner power. + pub fn miner_power(&self, addr: &Address, ts: &Tipset) -> Result { + if let Some((miner_power, total_power)) = self.get_power(ts.parent_state(), Some(addr))? { + return Ok(MinerPower { + miner_power, + total_power, + has_min_power: true, + }); + } + + Ok(MinerPower { + has_min_power: false, + miner_power: Default::default(), + total_power: Default::default(), + }) + } + + pub fn get_verified_registry_actor_state( + &self, + ts: &Tipset, + ) -> anyhow::Result { + let act = self + .get_actor(&Address::VERIFIED_REGISTRY_ACTOR, *ts.parent_state()) + .map_err(Error::state)? + .ok_or_else(|| Error::state("actor not found"))?; + verifreg::State::load(self.blockstore(), act.code, act.state) + } + + pub fn get_claim( + &self, + addr: &Address, + ts: &Tipset, + claim_id: ClaimID, + ) -> anyhow::Result> { + let id_address = self.lookup_required_id(addr, ts)?; + let state = self.get_verified_registry_actor_state(ts)?; + state.get_claim(self.blockstore(), id_address, claim_id) + } + + pub fn get_all_claims(&self, ts: &Tipset) -> anyhow::Result> { + let state = self.get_verified_registry_actor_state(ts)?; + state.get_all_claims(self.blockstore()) + } + + pub fn get_allocation( + &self, + addr: &Address, + ts: &Tipset, + allocation_id: AllocationID, + ) -> anyhow::Result> { + let id_address = self.lookup_required_id(addr, ts)?; + let state = self.get_verified_registry_actor_state(ts)?; + state.get_allocation(self.blockstore(), id_address.id()?, allocation_id) + } + + pub fn get_all_allocations( + &self, + ts: &Tipset, + ) -> anyhow::Result> { + let state = self.get_verified_registry_actor_state(ts)?; + state.get_all_allocations(self.blockstore()) + } + + pub fn verified_client_status( + &self, + addr: &Address, + ts: &Tipset, + ) -> anyhow::Result> { + let id = self.lookup_required_id(addr, ts)?; + let network_version = self.get_network_version(ts.epoch()); + + // Pre-network v17 (actor v9), the verified client data cap lives in the + // verified registry actor; at/after v17 it lives in the datacap token actor. + // Original: https://github.com/filecoin-project/lotus/blob/5e76b05b17771da6939c7b0bf65127c3dc70ee23/node/impl/full/state.go#L1627-L1664. + if (u32::from(network_version.0)) < 17 { + let state = self.get_verified_registry_actor_state(ts)?; + return state.verified_client_data_cap(self.blockstore(), id); + } + + let act = self + .get_actor(&Address::DATACAP_TOKEN_ACTOR, *ts.parent_state()) + .map_err(Error::state)? + .ok_or_else(|| Error::state("Miner actor not found"))?; + + let state = datacap::State::load(self.blockstore(), act.code, act.state)?; + + state.verified_client_data_cap(self.blockstore(), id) + } +} diff --git a/src/state_manager/address_resolution.rs b/src/state_manager/address_resolution.rs new file mode 100644 index 00000000000..d78305de0c2 --- /dev/null +++ b/src/state_manager/address_resolution.rs @@ -0,0 +1,108 @@ +// Copyright 2019-2026 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use super::*; +use crate::shim::address::{Payload, Protocol}; +use anyhow::Context as _; +use bls_signatures::{PublicKey as BlsPublicKey, Serialize as _}; + +impl StateManager +where + DB: Blockstore + Send + Sync + 'static, +{ + /// Returns a BLS public key from provided address + pub fn get_bls_public_key( + db: &Arc, + addr: &Address, + state_cid: cid::Cid, + ) -> Result { + let state = StateTree::new_from_root(Arc::clone(db), &state_cid) + .map_err(|e| Error::Other(e.to_string()))?; + let kaddr = + resolve_to_key_addr(&state, db, addr).context("Failed to resolve key address")?; + + match kaddr.into_payload() { + Payload::BLS(key) => BlsPublicKey::from_bytes(&key) + .context("Failed to construct bls public key") + .map_err(Error::from), + _ => Err(Error::state( + "Address must be BLS address to load bls public key", + )), + } + } + + /// Looks up ID [Address] from the state at the given [Tipset]. + pub fn lookup_id(&self, addr: &Address, ts: &Tipset) -> Result, Error> { + let state_tree = StateTree::new_from_root(self.blockstore_owned(), ts.parent_state()) + .map_err(|e| format!("{e:?}"))?; + Ok(state_tree + .lookup_id(addr) + .map_err(|e| Error::Other(e.to_string()))? + .map(Address::new_id)) + } + + /// Looks up required ID [Address] from the state at the given [Tipset]. + pub fn lookup_required_id(&self, addr: &Address, ts: &Tipset) -> Result { + self.lookup_id(addr, ts)? + .ok_or_else(|| Error::Other(format!("Failed to lookup the id address {addr}"))) + } + + /// Similar to `resolve_to_key_addr` in the `forest_vm` [`crate::state_manager`] but does not + /// allow `Actor` type of addresses. Uses `ts` to generate the VM state. + pub async fn resolve_to_key_addr( + self: &Arc, + addr: &Address, + ts: &Tipset, + ) -> anyhow::Result
{ + match addr.protocol() { + Protocol::BLS | Protocol::Secp256k1 | Protocol::Delegated => return Ok(*addr), + Protocol::Actor => { + return Err(Error::Other( + "cannot resolve actor address to key address".to_string(), + ) + .into()); + } + _ => {} + }; + + // First try to resolve the actor in the parent state, so we don't have to + // compute anything. + let state = StateTree::new_from_root(self.blockstore_owned(), ts.parent_state())?; + if let Ok(addr) = resolve_to_key_addr(&state, self.blockstore(), addr) { + return Ok(addr); + } + + // If that fails, compute the tip-set and try again. + let TipsetState { state_root, .. } = self.load_tipset_state(ts).await?; + let state = StateTree::new_from_root(self.blockstore_owned(), &state_root)?; + + resolve_to_key_addr(&state, self.blockstore(), addr) + } + + pub async fn resolve_to_deterministic_address( + self: &Arc, + address: Address, + ts: &Tipset, + ) -> anyhow::Result
{ + use crate::shim::address::Protocol::*; + match address.protocol() { + BLS | Secp256k1 | Delegated => Ok(address), + Actor => anyhow::bail!("cannot resolve actor address to key address"), + _ => { + // First try to resolve the actor in the parent state, so we don't have to compute anything. + if let Ok(state) = + StateTree::new_from_root(self.blockstore_owned(), ts.parent_state()) + && let Ok(address) = state + .resolve_to_deterministic_addr(self.chain_store().blockstore(), address) + { + return Ok(address); + } + + // If that fails, compute the tip-set and try again. + let TipsetState { state_root, .. } = self.load_tipset_state(ts).await?; + let state = StateTree::new_from_root(self.blockstore_owned(), &state_root)?; + state.resolve_to_deterministic_addr(self.chain_store().blockstore(), address) + } + } + } +} diff --git a/src/state_manager/execution.rs b/src/state_manager/execution.rs new file mode 100644 index 00000000000..af6cb8f7eb8 --- /dev/null +++ b/src/state_manager/execution.rs @@ -0,0 +1,242 @@ +// Copyright 2019-2026 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use super::state_computation::TipsetExecutor; +use super::utils::structured; +use super::*; +use crate::interpreter::{CalledAt, VMTrace}; +use crate::rpc::state::{ApiInvocResult, MessageGasCost}; +use crate::utils::ShallowClone as _; +use anyhow::{Context as _, bail}; +use num_traits::identities::Zero; +use std::ops::RangeInclusive; + +impl StateManager +where + DB: Blockstore + Send + Sync + 'static, +{ + /// Replays the given message and returns the result of executing the + /// indicated message, assuming it was executed in the indicated tipset. + pub async fn replay(self: &Arc, ts: Tipset, mcid: Cid) -> Result { + let this = Arc::clone(self); + tokio::task::spawn_blocking(move || this.replay_blocking(ts, mcid)).await? + } + + /// Blocking version of `replay` + pub fn replay_blocking( + self: &Arc, + ts: Tipset, + mcid: Cid, + ) -> Result { + const REPLAY_HALT: &str = "replay_halt"; + + let mut api_invoc_result = None; + let callback = |ctx: MessageCallbackCtx<'_>| match ctx.at { + CalledAt::Applied | CalledAt::Reward + if api_invoc_result.is_none() && ctx.cid == mcid => + { + api_invoc_result = Some(ApiInvocResult { + msg_cid: ctx.message.cid(), + msg: ctx.message.message().clone(), + msg_rct: Some(ctx.apply_ret.msg_receipt()), + error: ctx.apply_ret.failure_info().unwrap_or_default(), + duration: ctx.duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64, + gas_cost: MessageGasCost::new(ctx.message.message(), ctx.apply_ret)?, + execution_trace: structured::parse_events(ctx.apply_ret.exec_trace()) + .unwrap_or_default(), + }); + anyhow::bail!(REPLAY_HALT); + } + _ => Ok(()), + }; + let result = self.compute_tipset_state_blocking(ts, Some(callback), VMTrace::Traced); + if let Err(error_message) = result + && error_message.to_string() != REPLAY_HALT + { + return Err(Error::Other(format!( + "unexpected error during execution : {error_message:}" + ))); + } + api_invoc_result.ok_or_else(|| Error::Other("failed to replay".into())) + } + + /// Replays a tipset up to a target message, capturing the state root before + /// and after execution. + pub async fn replay_for_prestate( + self: &Arc, + ts: Tipset, + target_message_cid: Cid, + ) -> Result<(Cid, ApiInvocResult, Cid), Error> { + let this = Arc::clone(self); + tokio::task::spawn_blocking(move || { + this.replay_for_prestate_blocking(ts, target_message_cid) + }) + .await + .map_err(|e| Error::Other(format!("{e}")))? + } + + fn replay_for_prestate_blocking( + self: &Arc, + ts: Tipset, + target_msg_cid: Cid, + ) -> Result<(Cid, ApiInvocResult, Cid), Error> { + if ts.epoch() == 0 { + return Err(Error::Other( + "cannot trace messages in the genesis block".into(), + )); + } + + let genesis_timestamp = self.chain_store().genesis_block_header().timestamp; + let exec = TipsetExecutor::new( + self.chain_index().shallow_clone(), + self.chain_config().shallow_clone(), + self.beacon_schedule().shallow_clone(), + self.engine(), + ts.shallow_clone(), + ); + let mut no_cb = NO_CALLBACK; + let (parent_state, epoch, block_messages) = + exec.prepare_parent_state(genesis_timestamp, VMTrace::NotTraced, &mut no_cb)?; + + Ok(stacker::grow(64 << 20, || { + let mut vm = + exec.create_vm(parent_state, epoch, ts.min_timestamp(), VMTrace::NotTraced)?; + let mut processed = ahash::HashSet::default(); + + for block in block_messages.iter() { + let mut penalty = TokenAmount::zero(); + let mut gas_reward = TokenAmount::zero(); + + for msg in block.messages.iter() { + let cid = msg.cid(); + if processed.contains(&cid) { + continue; + } + + processed.insert(cid); + + if cid == target_msg_cid { + let pre_root = vm.flush()?; + let mut traced_vm = + exec.create_vm(pre_root, epoch, ts.min_timestamp(), VMTrace::Traced)?; + let (ret, duration) = traced_vm.apply_message(msg)?; + let post_root = traced_vm.flush()?; + + return Ok(( + pre_root, + ApiInvocResult { + msg_cid: cid, + msg: msg.message().clone(), + msg_rct: Some(ret.msg_receipt()), + error: ret.failure_info().unwrap_or_default(), + duration: duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64, + gas_cost: MessageGasCost::default(), + execution_trace: structured::parse_events(ret.exec_trace()) + .unwrap_or_default(), + }, + post_root, + )); + } + + let (ret, _) = vm.apply_message(msg)?; + gas_reward += ret.miner_tip(); + penalty += ret.penalty(); + } + + if let Some(rew_msg) = + vm.reward_message(epoch, block.miner, block.win_count, penalty, gas_reward)? + { + let (ret, _) = vm.apply_implicit_message(&rew_msg)?; + if let Some(err) = ret.failure_info() { + bail!( + "failed to apply reward message for miner {}: {err}", + block.miner + ); + } + + if !ret.msg_receipt().exit_code().is_success() { + bail!( + "reward application message failed (exit: {:?})", + ret.msg_receipt().exit_code() + ); + } + } + } + + bail!("message {target_msg_cid} not found in tipset") + })?) + } + + /// Validates all tipsets at epoch `start..=end` behind the heaviest tipset. + #[tracing::instrument(skip(self))] + pub fn validate_range(&self, epochs: RangeInclusive) -> anyhow::Result<()> { + let heaviest = self.heaviest_tipset(); + let heaviest_epoch = heaviest.epoch(); + let end = self + .chain_index() + .tipset_by_height(*epochs.end(), heaviest, ResolveNullTipset::TakeOlder) + .with_context(|| { + format!( + "couldn't get a tipset at height {} behind heaviest tipset at height {heaviest_epoch}", + *epochs.end(), + ) + })?; + + let tipsets = end + .chain(self.blockstore()) + .take_while(|ts| ts.epoch() >= *epochs.start()); + + self.validate_tipsets(tipsets) + } + + pub fn validate_tipsets(&self, tipsets: T) -> anyhow::Result<()> + where + T: Iterator + Send, + { + let genesis_timestamp = self.chain_store().genesis_block_header().timestamp; + validate_tipsets( + genesis_timestamp, + self.chain_index(), + self.chain_config(), + self.beacon_schedule(), + self.engine(), + tipsets, + ) + } + + pub fn execution_trace(&self, tipset: &Tipset) -> anyhow::Result<(Cid, Vec)> { + let mut invoc_trace = vec![]; + + let genesis_timestamp = self.chain_store().genesis_block_header().timestamp; + + let callback = |ctx: MessageCallbackCtx<'_>| match ctx.at { + CalledAt::Applied | CalledAt::Reward => { + invoc_trace.push(ApiInvocResult { + msg_cid: ctx.message.cid(), + msg: ctx.message.message().clone(), + msg_rct: Some(ctx.apply_ret.msg_receipt()), + error: ctx.apply_ret.failure_info().unwrap_or_default(), + duration: ctx.duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64, + gas_cost: MessageGasCost::new(ctx.message.message(), ctx.apply_ret)?, + execution_trace: structured::parse_events(ctx.apply_ret.exec_trace()) + .unwrap_or_default(), + }); + Ok(()) + } + _ => Ok(()), + }; + + let ExecutedTipset { state_root, .. } = apply_block_messages( + genesis_timestamp, + self.chain_index().shallow_clone(), + self.chain_config().shallow_clone(), + self.beacon_schedule().shallow_clone(), + self.engine(), + tipset.shallow_clone(), + Some(callback), + VMTrace::Traced, + )?; + + Ok((state_root, invoc_trace)) + } +} diff --git a/src/state_manager/message_search.rs b/src/state_manager/message_search.rs new file mode 100644 index 00000000000..9a55c501e64 --- /dev/null +++ b/src/state_manager/message_search.rs @@ -0,0 +1,310 @@ +// Copyright 2019-2026 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use super::*; +use crate::blocks::TipsetKey; +use crate::message::MessageRead as _; +use crate::utils::ShallowClone as _; +use ahash::{HashMap, HashMapExt as _}; +use anyhow::Context as _; +use futures::{FutureExt, channel::oneshot, select}; +use tokio::sync::{RwLock, broadcast::error::RecvError}; +use tracing::warn; + +impl StateManager +where + DB: Blockstore + Send + Sync + 'static, +{ + /// Check if tipset had executed the message, by loading the receipt based + /// on the index of the message in the block. + fn tipset_executed_message( + &self, + tipset: &Tipset, + message: &ChainMessage, + allow_replaced: bool, + ) -> Result, Error> { + if tipset.epoch() == 0 { + return Ok(None); + } + let message_from_address = message.from(); + let message_sequence = message.sequence(); + let pts = self + .chain_index() + .load_required_tipset(tipset.parents()) + .map_err(|err| Error::Other(format!("Failed to load tipset: {err}")))?; + let messages = self + .chain_store() + .messages_for_tipset(&pts) + .map_err(|err| Error::Other(format!("Failed to load messages for tipset: {err}")))?; + messages + .iter() + .enumerate() + .rev() + .filter(|(_, s)| { + s.sequence() == message_sequence + && s.from() == message_from_address + && s.equal_call(message) + }) + .map(|(index, m)| { + if !allow_replaced && message.cid() != m.cid(){ + Err(Error::Other(format!( + "found message with equal nonce and call params but different CID. wanted {}, found: {}, nonce: {}, from: {}", + message.cid(), + m.cid(), + message.sequence(), + message.from(), + ))) + } else { + let block_header = tipset.block_headers().first(); + crate::chain::get_parent_receipt( + self.blockstore(), + block_header, + index, + ) + .map_err(|err| Error::Other(format!("Failed to get parent receipt (message_receipts={}, index={index}, error={err})", block_header.message_receipts))) + } + }) + .next() + .unwrap_or(Ok(None)) + } + + fn check_search( + &self, + mut current: Tipset, + message: &ChainMessage, + lookback_max_epoch: ChainEpoch, + allow_replaced: bool, + ) -> Result, Error> { + let message_from_address = message.from(); + let message_sequence = message.sequence(); + let mut current_actor_state = self + .get_required_actor(&message_from_address, *current.parent_state()) + .map_err(Error::state)?; + let message_from_id = self.lookup_required_id(&message_from_address, ¤t)?; + + while current.epoch() >= lookback_max_epoch { + let parent_tipset = self + .chain_index() + .load_required_tipset(current.parents()) + .map_err(|err| { + Error::Other(format!( + "failed to load tipset during msg wait searchback: {err:}" + )) + })?; + + let parent_actor_state = self + .get_actor(&message_from_id, *parent_tipset.parent_state()) + .map_err(|e| Error::State(e.to_string()))?; + + if parent_actor_state.is_none() + || (current_actor_state.sequence > message_sequence + && parent_actor_state.as_ref().unwrap().sequence <= message_sequence) + { + let receipt = self + .tipset_executed_message(¤t, message, allow_replaced)? + .context("Failed to get receipt with tipset_executed_message")?; + return Ok(Some((current, receipt))); + } + + if let Some(parent_actor_state) = parent_actor_state { + current = parent_tipset; + current_actor_state = parent_actor_state; + } else { + break; + } + } + + Ok(None) + } + + /// Searches backwards through the chain for a message receipt. + fn search_back_for_message( + &self, + current: Tipset, + message: &ChainMessage, + look_back_limit: Option, + allow_replaced: Option, + ) -> Result, Error> { + let current_epoch = current.epoch(); + let allow_replaced = allow_replaced.unwrap_or(true); + + let lookback_max_epoch = match look_back_limit { + Some(0) => return Ok(None), + Some(limit) if limit > 0 => (current_epoch - limit + 1).max(0), + _ => 0, + }; + + self.check_search(current, message, lookback_max_epoch, allow_replaced) + } + + /// Returns a message receipt from a given tipset and message CID. + pub fn get_receipt(&self, tipset: Tipset, msg: Cid) -> Result { + let m = crate::chain::get_chain_message(self.blockstore(), &msg) + .map_err(|e| Error::Other(e.to_string()))?; + let message_receipt = self.tipset_executed_message(&tipset, &m, true)?; + if let Some(receipt) = message_receipt { + return Ok(receipt); + } + + let maybe_tuple = self.search_back_for_message(tipset, &m, None, None)?; + let message_receipt = maybe_tuple + .ok_or_else(|| { + Error::Other("Could not get receipt from search back message".to_string()) + })? + .1; + Ok(message_receipt) + } + + /// `WaitForMessage` blocks until a message appears on chain. It looks + /// backwards in the chain to see if this has already happened. It + /// guarantees that the message has been on chain for at least + /// confidence epochs without being reverted before returning. + pub async fn wait_for_message( + self: &Arc, + msg_cid: Cid, + confidence: i64, + look_back_limit: Option, + allow_replaced: Option, + ) -> Result<(Option, Option), Error> { + let mut head_changes_rx = self.chain_store().subscribe_head_changes(); + let (sender, mut receiver) = oneshot::channel::<()>(); + let message = crate::chain::get_chain_message(self.blockstore(), &msg_cid) + .map_err(|err| Error::Other(format!("failed to load message {err:}")))?; + let current_tipset = self.heaviest_tipset(); + let maybe_message_receipt = + self.tipset_executed_message(¤t_tipset, &message, true)?; + if let Some(r) = maybe_message_receipt { + return Ok((Some(current_tipset.shallow_clone()), Some(r))); + } + + let mut candidate_tipset: Option = None; + let mut candidate_receipt: Option = None; + + let sm_cloned = self.shallow_clone(); + + let message_for_task = message.clone(); + let height_of_head = current_tipset.epoch(); + let task = tokio::task::spawn(async move { + let back_tuple = sm_cloned.search_back_for_message( + current_tipset, + &message_for_task, + look_back_limit, + allow_replaced, + )?; + sender + .send(()) + .map_err(|e| Error::Other(format!("Could not send to channel {e:?}")))?; + Ok::<_, Error>(back_tuple) + }); + + let reverts: Arc>> = Arc::new(RwLock::new(HashMap::new())); + let block_revert = reverts.clone(); + let sm_cloned = Arc::clone(self); + + let mut subscriber_poll = tokio::task::spawn(async move { + loop { + match head_changes_rx.recv().await { + Ok(head_changes) => { + for tipset in head_changes.reverts { + if candidate_tipset + .as_ref() + .is_some_and(|candidate| candidate.key() == tipset.key()) + { + candidate_tipset = None; + candidate_receipt = None; + } + } + for tipset in head_changes.applies { + if candidate_tipset + .as_ref() + .map(|s| tipset.epoch() >= s.epoch() + confidence) + .unwrap_or_default() + { + return Ok((candidate_tipset, candidate_receipt)); + } + let poll_receiver = receiver.try_recv(); + if let Ok(Some(_)) = poll_receiver { + block_revert + .write() + .await + .insert(tipset.key().to_owned(), true); + } + + let maybe_receipt = + sm_cloned.tipset_executed_message(&tipset, &message, true)?; + if let Some(receipt) = maybe_receipt { + if confidence == 0 { + return Ok((Some(tipset), Some(receipt))); + } + candidate_tipset = Some(tipset); + candidate_receipt = Some(receipt) + } + } + } + Err(RecvError::Lagged(i)) => { + warn!( + "wait for message head change subscriber lagged, skipped {} events", + i + ); + } + Err(RecvError::Closed) => break, + } + } + Ok((None, None)) + }) + .fuse(); + + let mut search_back_poll = tokio::task::spawn(async move { + let back_tuple = task.await.map_err(|e| { + Error::Other(format!("Could not search backwards for message {e}")) + })??; + if let Some((back_tipset, back_receipt)) = back_tuple { + let should_revert = *reverts + .read() + .await + .get(back_tipset.key()) + .unwrap_or(&false); + let larger_height_of_head = height_of_head >= back_tipset.epoch() + confidence; + if !should_revert && larger_height_of_head { + return Ok::<_, Error>((Some(back_tipset), Some(back_receipt))); + } + return Ok((None, None)); + } + Ok((None, None)) + }) + .fuse(); + + loop { + select! { + res = subscriber_poll => { + return res? + } + res = search_back_poll => { + if let Ok((Some(ts), Some(rct))) = res? { + return Ok((Some(ts), Some(rct))); + } + } + } + } + } + + pub async fn search_for_message( + &self, + from: Option, + msg_cid: Cid, + look_back_limit: Option, + allow_replaced: Option, + ) -> Result, Error> { + let from = from.unwrap_or_else(|| self.heaviest_tipset()); + let message = crate::chain::get_chain_message(self.blockstore(), &msg_cid) + .map_err(|err| Error::Other(format!("failed to load message {err}")))?; + let current_tipset = self.heaviest_tipset(); + let maybe_message_receipt = + self.tipset_executed_message(&from, &message, allow_replaced.unwrap_or(true))?; + if let Some(r) = maybe_message_receipt { + Ok(Some((from, r))) + } else { + self.search_back_for_message(current_tipset, &message, look_back_limit, allow_replaced) + } + } +} diff --git a/src/state_manager/message_simulation.rs b/src/state_manager/message_simulation.rs new file mode 100644 index 00000000000..b0bcb6740fc --- /dev/null +++ b/src/state_manager/message_simulation.rs @@ -0,0 +1,232 @@ +// Copyright 2019-2026 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use super::circulating_supply::GenesisInfo; +use super::utils::structured; +use super::*; +use crate::interpreter::{ExecutionContext, IMPLICIT_MESSAGE_GAS_LIMIT, VM, VMTrace}; +use crate::message::{MessageRead as _, MessageReadWrite as _, SignedMessage}; +use crate::rpc::state::{ApiInvocResult, InvocResult, MessageGasCost}; +use crate::shim::address::Protocol; +use crate::shim::crypto::{Signature, SignatureType}; +use crate::shim::executor::ApplyRet; +use crate::shim::message::Message; +use crate::utils::ShallowClone as _; +use fvm_shared4::crypto::signature::SECP_SIG_LEN; +use std::time::Duration; +use tracing::instrument; + +impl StateManager +where + DB: Blockstore + Send + Sync + 'static, +{ + #[instrument(skip(self, rand))] + fn call_raw( + &self, + state_cid: Option, + msg: &Message, + rand: ChainRand, + tipset: &Tipset, + ) -> Result { + let mut msg = msg.clone(); + + let state_cid = state_cid.unwrap_or(*tipset.parent_state()); + + let tipset_messages = self + .chain_store() + .messages_for_tipset(tipset) + .map_err(|err| Error::Other(err.to_string()))?; + + let prior_messsages = tipset_messages + .iter() + .filter(|ts_msg| ts_msg.message().from() == msg.from()); + + let height = tipset.epoch(); + let genesis_info = GenesisInfo::from_chain_config(self.chain_config().clone()); + let mut vm = VM::new( + ExecutionContext { + heaviest_tipset: tipset.shallow_clone(), + state_tree_root: state_cid, + epoch: height, + rand: Box::new(rand), + base_fee: tipset.block_headers().first().parent_base_fee.clone(), + circ_supply: genesis_info.get_vm_circulating_supply( + height, + self.blockstore(), + &state_cid, + )?, + chain_config: self.chain_config().shallow_clone(), + chain_index: self.chain_index().shallow_clone(), + timestamp: tipset.min_timestamp(), + }, + self.engine(), + VMTrace::Traced, + )?; + + for m in prior_messsages { + vm.apply_message(m)?; + } + + // We flush to get the VM's view of the state tree after applying the above messages + // This is needed to get the correct nonce from the actor state to match the VM + let state_cid = vm.flush()?; + + let state = StateTree::new_from_root(self.blockstore_owned(), &state_cid)?; + + let from_actor = state + .get_actor(&msg.from())? + .ok_or_else(|| anyhow::anyhow!("actor not found"))?; + msg.set_sequence(from_actor.sequence); + + // Implicit messages need to set a special gas limit + let mut msg = msg.clone(); + msg.gas_limit = IMPLICIT_MESSAGE_GAS_LIMIT as u64; + + let (apply_ret, duration) = vm.apply_implicit_message(&msg)?; + + Ok(ApiInvocResult { + msg: msg.clone(), + msg_rct: Some(apply_ret.msg_receipt()), + msg_cid: msg.cid(), + error: apply_ret.failure_info().unwrap_or_default(), + duration: duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64, + gas_cost: MessageGasCost::default(), + execution_trace: structured::parse_events(apply_ret.exec_trace()).unwrap_or_default(), + }) + } + + /// runs the given message and returns its result without any persisted + /// changes. + pub fn call(&self, message: &Message, tipset: Option) -> Result { + let ts = tipset.unwrap_or_else(|| self.heaviest_tipset()); + let chain_rand = self.chain_rand(ts.shallow_clone()); + self.call_raw(None, message, chain_rand, &ts) + } + + /// Same as [`StateManager::call`] but runs the message on the given state and not + /// on the parent state of the tipset. + pub fn call_on_state( + &self, + state_cid: Cid, + message: &Message, + tipset: Option, + ) -> Result { + let ts = tipset.unwrap_or_else(|| self.heaviest_tipset()); + let chain_rand = self.chain_rand(ts.shallow_clone()); + self.call_raw(Some(state_cid), message, chain_rand, &ts) + } + + pub async fn apply_on_state_with_gas( + self: &Arc, + tipset: Option, + msg: Message, + vm_flush: VMFlush, + ) -> anyhow::Result<(ApiInvocResult, Option)> { + let ts = tipset.unwrap_or_else(|| self.heaviest_tipset()); + + let from_a = self.resolve_to_key_addr(&msg.from, &ts).await?; + + // Pretend that the message is signed. This has an influence on the gas + // cost. We obviously can't generate a valid signature. Instead, we just + // fill the signature with zeros. The validity is not checked. + let mut chain_msg = match from_a.protocol() { + Protocol::Secp256k1 => SignedMessage::new_unchecked( + msg.clone(), + Signature::new_secp256k1(vec![0; SECP_SIG_LEN]), + ) + .into(), + Protocol::Delegated => SignedMessage::new_unchecked( + msg.clone(), + Signature::new(SignatureType::Delegated, vec![0; SECP_SIG_LEN]), + ) + .into(), + _ => msg.clone().into(), + }; + + let (_invoc_res, apply_ret, duration, state_root) = self + .call_with_gas(&mut chain_msg, &[], Some(ts), vm_flush) + .await?; + + Ok(( + ApiInvocResult { + msg_cid: msg.cid(), + msg, + msg_rct: Some(apply_ret.msg_receipt()), + error: apply_ret.failure_info().unwrap_or_default(), + duration: duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64, + gas_cost: MessageGasCost::default(), + execution_trace: structured::parse_events(apply_ret.exec_trace()) + .unwrap_or_default(), + }, + state_root, + )) + } + + /// Computes message on the given [Tipset] state, after applying other + /// messages and returns the values computed in the VM. + pub async fn call_with_gas( + self: &Arc, + message: &mut ChainMessage, + prior_messages: &[ChainMessage], + tipset: Option, + vm_flush: VMFlush, + ) -> Result<(InvocResult, ApplyRet, Duration, Option), Error> { + let ts = tipset.unwrap_or_else(|| self.heaviest_tipset()); + let TipsetState { state_root, .. } = self + .load_tipset_state(&ts) + .await + .map_err(|e| Error::Other(format!("Could not load tipset state: {e:#}")))?; + let chain_rand = self.chain_rand(ts.clone()); + + // Since we're simulating a future message, pretend we're applying it in the + // "next" tipset + let epoch = ts.epoch() + 1; + let genesis_info = GenesisInfo::from_chain_config(self.chain_config().clone()); + // FVM requires a stack size of 64MiB. The alternative is to use `ThreadedExecutor` from + // FVM, but that introduces some constraints, and possible deadlocks. + let (ret, duration, state_cid) = stacker::grow(64 << 20, || -> anyhow::Result<_> { + let mut vm = VM::new( + ExecutionContext { + heaviest_tipset: ts.clone(), + state_tree_root: state_root, + epoch, + rand: Box::new(chain_rand), + base_fee: ts.block_headers().first().parent_base_fee.clone(), + circ_supply: genesis_info.get_vm_circulating_supply( + epoch, + self.blockstore(), + &state_root, + )?, + chain_config: self.chain_config().shallow_clone(), + chain_index: self.chain_index().shallow_clone(), + timestamp: ts.min_timestamp(), + }, + self.engine(), + VMTrace::NotTraced, + )?; + + for msg in prior_messages { + vm.apply_message(msg)?; + } + let from_actor = vm + .get_actor(&message.from()) + .map_err(|e| Error::Other(format!("Could not get actor from state: {e:#}")))? + .ok_or_else(|| Error::Other("cant find actor in state tree".to_string()))?; + + message.set_sequence(from_actor.sequence); + let (ret, duration) = vm.apply_message(message)?; + let state_root = match vm_flush { + VMFlush::Flush => Some(vm.flush()?), + VMFlush::Skip => None, + }; + Ok((ret, duration, state_root)) + })?; + + Ok(( + InvocResult::new(message.message().clone(), &ret), + ret, + duration, + state_cid, + )) + } +} diff --git a/src/state_manager/mining.rs b/src/state_manager/mining.rs new file mode 100644 index 00000000000..a1dabcebcd2 --- /dev/null +++ b/src/state_manager/mining.rs @@ -0,0 +1,167 @@ +// Copyright 2019-2026 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use super::chain_rand::draw_randomness; +use super::*; +use crate::beacon::BeaconEntry; +use crate::rpc::types::MiningBaseInfo; +use crate::shim::randomness::Randomness; +use crate::shim::runtime::Policy; +use anyhow::Context as _; +use fil_actors_shared::v12::runtime::DomainSeparationTag; +use fvm_ipld_encoding::to_vec; +use num::BigInt; +use num_traits::identities::Zero; + +impl StateManager +where + DB: Blockstore + Send + Sync + 'static, +{ + /// Checks the eligibility of the miner. This is used in the validation that + /// a block's miner has the requirements to mine a block. + pub fn eligible_to_mine( + &self, + address: &Address, + base_tipset: &Tipset, + lookback_tipset: &Tipset, + ) -> anyhow::Result { + let hmp = + self.miner_has_min_power(&self.chain_config().policy, address, lookback_tipset)?; + let version = self.get_network_version(base_tipset.epoch()); + + if version <= NetworkVersion::V3 { + return Ok(hmp); + } + + if !hmp { + return Ok(false); + } + + let actor = self + .get_actor(&Address::POWER_ACTOR, *base_tipset.parent_state())? + .ok_or_else(|| Error::state("Power actor address could not be resolved"))?; + + let power_state = power::State::load(self.blockstore(), actor.code, actor.state)?; + + let actor = self + .get_actor(address, *base_tipset.parent_state())? + .ok_or_else(|| Error::state("Miner actor address could not be resolved"))?; + + let miner_state = miner::State::load(self.blockstore(), actor.code, actor.state)?; + + let claim = power_state + .miner_power(self.blockstore(), address)? + .ok_or_else(|| Error::Other("Could not get claim".to_string()))?; + if claim.quality_adj_power <= BigInt::zero() { + return Ok(false); + } + + if !miner_state.fee_debt().is_zero() { + return Ok(false); + } + + let info = miner_state.info(self.blockstore())?; + if base_tipset.epoch() <= info.consensus_fault_elapsed { + return Ok(false); + } + + Ok(true) + } + + pub async fn miner_get_base_info( + self: &Arc, + beacon_schedule: &BeaconSchedule, + tipset: Tipset, + addr: Address, + epoch: ChainEpoch, + ) -> anyhow::Result> { + let prev_beacon = self.chain_index().latest_beacon_entry(tipset.clone())?; + + let entries: Vec = beacon_schedule + .beacon_entries_for_block( + self.chain_config().network_version(epoch), + epoch, + tipset.epoch(), + &prev_beacon, + ) + .await?; + + let base = entries.last().unwrap_or(&prev_beacon); + + let (lb_tipset, lb_state_root) = ChainStore::get_lookback_tipset_for_round( + self.chain_index(), + self.chain_config(), + &tipset, + epoch, + )?; + + // If the miner actor doesn't exist in the current tipset, it is a + // user-error and we must return an error message. If the miner exists + // in the current tipset but not in the lookback tipset, we may not + // error and should instead return None. + let actor = self.get_required_actor(&addr, *tipset.parent_state())?; + if self.get_actor(&addr, lb_state_root)?.is_none() { + return Ok(None); + } + + let miner_state = miner::State::load(self.blockstore(), actor.code, actor.state)?; + + let addr_buf = to_vec(&addr)?; + let rand = draw_randomness( + base.signature(), + DomainSeparationTag::WinningPoStChallengeSeed as i64, + epoch, + &addr_buf, + )?; + + let network_version = self.chain_config().network_version(tipset.epoch()); + let sectors = self.get_sectors_for_winning_post( + &lb_state_root, + network_version, + &addr, + Randomness::new(rand.to_vec()), + )?; + + if sectors.is_empty() { + return Ok(None); + } + + let (miner_power, total_power) = self + .get_power(&lb_state_root, Some(&addr))? + .context("failed to get power")?; + + let info = miner_state.info(self.blockstore())?; + + let worker_key = self + .resolve_to_deterministic_address(info.worker, &tipset) + .await?; + let eligible = self.eligible_to_mine(&addr, &tipset, &lb_tipset)?; + + Ok(Some(MiningBaseInfo { + miner_power: miner_power.quality_adj_power, + network_power: total_power.quality_adj_power, + sectors, + worker_key, + sector_size: info.sector_size, + prev_beacon_entry: prev_beacon, + beacon_entries: entries, + eligible_for_mining: eligible, + })) + } + + /// Checks power actor state for if miner meets consensus minimum + /// requirements. + pub fn miner_has_min_power( + &self, + policy: &Policy, + addr: &Address, + ts: &Tipset, + ) -> anyhow::Result { + let actor = self + .get_actor(&Address::POWER_ACTOR, *ts.parent_state())? + .ok_or_else(|| Error::state("Power actor address could not be resolved"))?; + let ps = power::State::load(self.blockstore(), actor.code, actor.state)?; + + ps.miner_nominal_power_meets_consensus_minimum(policy, self.blockstore(), addr) + } +} diff --git a/src/state_manager/mod.rs b/src/state_manager/mod.rs index 9586618b019..fb95fbdf617 100644 --- a/src/state_manager/mod.rs +++ b/src/state_manager/mod.rs @@ -4,88 +4,59 @@ #[cfg(test)] mod tests; +mod actor_queries; +mod address_resolution; mod cache; pub mod chain_rand; pub mod circulating_supply; mod errors; +mod execution; +mod message_search; +mod message_simulation; +mod mining; +mod state_computation; pub mod utils; pub use self::errors::*; -use self::utils::structured; +pub use self::state_computation::{apply_block_messages, validate_tipsets}; -use crate::beacon::{BeaconEntry, BeaconSchedule}; -use crate::blocks::{Tipset, TipsetKey}; +use crate::beacon::BeaconSchedule; +use crate::blocks::Tipset; use crate::chain::{ ChainStore, index::{ChainIndex, ResolveNullTipset}, }; -use crate::interpreter::{ - BlockMessages, CalledAt, ExecutionContext, IMPLICIT_MESSAGE_GAS_LIMIT, VM, resolve_to_key_addr, -}; -use crate::interpreter::{MessageCallbackCtx, VMTrace}; +use crate::interpreter::{MessageCallbackCtx, resolve_to_key_addr}; use crate::lotus_json::{LotusJson, lotus_json_with_self}; -use crate::message::{ChainMessage, MessageRead as _, MessageReadWrite as _, SignedMessage}; +use crate::message::ChainMessage; use crate::networks::ChainConfig; -use crate::rpc::state::{ApiInvocResult, InvocResult, MessageGasCost}; -use crate::rpc::types::{MiningBaseInfo, SectorOnChainInfo}; +use crate::rpc::types::SectorOnChainInfo; use crate::shim::actors::init::{self, State}; -use crate::shim::actors::miner::{MinerInfo, MinerPower, Partition}; -use crate::shim::actors::verifreg::{Allocation, AllocationID, Claim}; +use crate::shim::actors::miner::ext::MinerStateExt as _; use crate::shim::actors::*; -use crate::shim::crypto::{Signature, SignatureType}; -use crate::shim::{ - actors::{ - LoadActorStateFromBlockstore, miner::ext::MinerStateExt as _, - verifreg::ext::VerifiedRegistryStateExt as _, - }, - executor::{ApplyRet, Receipt, StampedEvent}, -}; +use crate::shim::executor::{Receipt, StampedEvent}; use crate::shim::{ - address::{Address, Payload, Protocol}, + address::Address, clock::ChainEpoch, econ::TokenAmount, machine::{GLOBAL_MULTI_ENGINE, MultiEngine}, - message::Message, - randomness::Randomness, - runtime::Policy, state_tree::{ActorState, StateTree}, version::NetworkVersion, }; use crate::state_manager::cache::TipsetStateCache; -use crate::state_manager::chain_rand::draw_randomness; -use crate::state_migration::run_state_migrations; use crate::utils::ShallowClone as _; use crate::utils::get_size::{GetSize, vec_heap_size_helper}; -use ahash::{HashMap, HashMapExt}; -use anyhow::{Context as _, bail, ensure}; -use bls_signatures::{PublicKey as BlsPublicKey, Serialize as _}; +use anyhow::Context as _; use chain_rand::ChainRand; use cid::Cid; -pub use circulating_supply::GenesisInfo; -use fil_actor_verifreg_state::v12::DataCap; -use fil_actor_verifreg_state::v13::ClaimID; -use fil_actors_shared::fvm_ipld_amt::{Amt, Amtv0}; -use fil_actors_shared::fvm_ipld_bitfield::BitField; -use fil_actors_shared::v12::runtime::DomainSeparationTag; -use futures::{FutureExt, channel::oneshot, select}; use fvm_ipld_blockstore::Blockstore; -use fvm_ipld_encoding::to_vec; -use fvm_shared4::crypto::signature::SECP_SIG_LEN; -use itertools::Itertools as _; use nonzero_ext::nonzero; -use num::BigInt; -use num_traits::identities::Zero; -use rayon::prelude::ParallelBridge; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use std::ops::RangeInclusive; -use std::time::Duration; use std::{num::NonZeroUsize, sync::Arc}; -use tokio::sync::{RwLock, broadcast::error::RecvError}; -use tracing::{error, info, instrument, warn}; const DEFAULT_TIPSET_CACHE_SIZE: NonZeroUsize = nonzero!(1024usize); -pub const EVENTS_AMT_BITWIDTH: u32 = 5; +pub(crate) const EVENTS_AMT_BITWIDTH: u32 = 5; /// Result of executing an individual chain message in a tipset. /// @@ -187,17 +158,23 @@ lotus_json_with_self!(MarketBalance); /// the chain. The state manager not only allows interfacing with state, but /// also is used when performing state transitions. pub struct StateManager { - /// Chain store cs: Arc>, - /// This is a cache which indexes tipsets to their calculated state output (state root, receipt root). cache: TipsetStateCache, - beacon: Arc, + beacon: Arc, engine: Arc, } #[allow(clippy::type_complexity)] pub const NO_CALLBACK: Option) -> anyhow::Result<()>> = None; +/// Controls whether the VM should flush its state after execution +#[derive(Debug, Copy, Clone, Default)] +pub enum VMFlush { + Flush, + #[default] + Skip, +} + impl StateManager where DB: Blockstore, @@ -215,7 +192,7 @@ where Ok(Self { cs, - cache: TipsetStateCache::new("executed_tipset"), // For StateOutput + cache: TipsetStateCache::new("executed_tipset"), beacon, engine, }) @@ -279,6 +256,14 @@ where &self.beacon } + pub(in crate::state_manager) fn engine(&self) -> &Arc { + &self.engine + } + + pub(in crate::state_manager) fn tipset_state_cache(&self) -> &TipsetStateCache { + &self.cache + } + /// Returns network version for the given epoch. pub fn get_network_version(&self, epoch: ChainEpoch) -> NetworkVersion { self.chain_config().network_version(epoch) @@ -351,7 +336,7 @@ where self.chain_config().shallow_clone(), tipset, self.chain_index().shallow_clone(), - self.beacon.shallow_clone(), + self.beacon_schedule().shallow_clone(), ) } @@ -437,1800 +422,3 @@ where state.load_sectors_ext(self.blockstore(), None) } } - -impl StateManager -where - DB: Blockstore + Send + Sync + 'static, -{ - /// Load the state of a tipset, including state root, message receipts - pub async fn load_tipset_state(self: &Arc, ts: &Tipset) -> anyhow::Result { - if let Some(state) = self.cache.get_map(ts.key(), |et| et.into()) { - Ok(state) - } else if let Ok(receipt_ts) = self.chain_store().load_child_tipset(ts) { - Ok(TipsetState { - state_root: *receipt_ts.parent_state(), - receipt_root: *receipt_ts.parent_message_receipts(), - }) - } else { - Ok(self.load_executed_tipset(ts).await?.into()) - } - } - - /// Load an executed tipset, including state root, message receipts and events with caching. - pub async fn load_executed_tipset( - self: &Arc, - ts: &Tipset, - ) -> anyhow::Result { - self.cache - .get_or_else(ts.key(), || async move { - let receipt_ts = self.chain_store().load_child_tipset(ts).ok(); - self.load_executed_tipset_inner(ts, receipt_ts.as_ref()) - .await - }) - .await - } - - async fn load_executed_tipset_inner( - self: &Arc, - msg_ts: &Tipset, - // when `msg_ts` is the current head, `receipt_ts` is `None` - receipt_ts: Option<&Tipset>, - ) -> anyhow::Result { - if let Some(receipt_ts) = receipt_ts { - anyhow::ensure!( - msg_ts.key() == receipt_ts.parents(), - "message tipset should be the parent of message receipt tipset" - ); - } - let mut recomputed = false; - let (state_root, receipt_root, receipts) = match receipt_ts.and_then(|ts| { - let receipt_root = *ts.parent_message_receipts(); - Receipt::get_receipts(self.cs.blockstore(), receipt_root) - .ok() - .map(|r| (*ts.parent_state(), receipt_root, r)) - }) { - Some((state_root, receipt_root, receipts)) => (state_root, receipt_root, receipts), - None => { - let state_output = self - .compute_tipset_state(msg_ts.shallow_clone(), NO_CALLBACK, VMTrace::NotTraced) - .await?; - recomputed = true; - ( - state_output.state_root, - state_output.receipt_root, - Receipt::get_receipts(self.cs.blockstore(), state_output.receipt_root)?, - ) - } - }; - - let messages = self.chain_store().messages_for_tipset(msg_ts)?; - anyhow::ensure!( - messages.len() == receipts.len(), - "mismatching message and receipt counts ({} messages, {} receipts)", - messages.len(), - receipts.len() - ); - let mut executed_messages = Vec::with_capacity(messages.len()); - for (message, receipt) in messages.iter().cloned().zip(receipts.into_iter()) { - let events = if let Some(events_root) = receipt.events_root() { - Some( - match StampedEvent::get_events(self.cs.blockstore(), &events_root) { - Ok(events) => events, - Err(e) if recomputed => return Err(e), - Err(_) => { - self.compute_tipset_state( - msg_ts.shallow_clone(), - NO_CALLBACK, - VMTrace::NotTraced, - ) - .await?; - recomputed = true; - StampedEvent::get_events(self.cs.blockstore(), &events_root)? - } - }, - ) - } else { - None - }; - executed_messages.push(ExecutedMessage { - message, - receipt, - events, - }); - } - Ok(ExecutedTipset { - state_root, - receipt_root, - executed_messages: Arc::new(executed_messages), - }) - } - - #[instrument(skip(self, rand))] - fn call_raw( - &self, - state_cid: Option, - msg: &Message, - rand: ChainRand, - tipset: &Tipset, - ) -> Result { - let mut msg = msg.clone(); - - let state_cid = state_cid.unwrap_or(*tipset.parent_state()); - - let tipset_messages = self - .chain_store() - .messages_for_tipset(tipset) - .map_err(|err| Error::Other(err.to_string()))?; - - let prior_messsages = tipset_messages - .iter() - .filter(|ts_msg| ts_msg.message().from() == msg.from()); - - // Handle state forks - - let height = tipset.epoch(); - let genesis_info = GenesisInfo::from_chain_config(self.chain_config().clone()); - let mut vm = VM::new( - ExecutionContext { - heaviest_tipset: tipset.shallow_clone(), - state_tree_root: state_cid, - epoch: height, - rand: Box::new(rand), - base_fee: tipset.block_headers().first().parent_base_fee.clone(), - circ_supply: genesis_info.get_vm_circulating_supply( - height, - self.blockstore(), - &state_cid, - )?, - chain_config: self.chain_config().shallow_clone(), - chain_index: self.chain_index().shallow_clone(), - timestamp: tipset.min_timestamp(), - }, - &self.engine, - VMTrace::Traced, - )?; - - for m in prior_messsages { - vm.apply_message(m)?; - } - - // We flush to get the VM's view of the state tree after applying the above messages - // This is needed to get the correct nonce from the actor state to match the VM - let state_cid = vm.flush()?; - - let state = StateTree::new_from_root(self.blockstore_owned(), &state_cid)?; - - let from_actor = state - .get_actor(&msg.from())? - .ok_or_else(|| anyhow::anyhow!("actor not found"))?; - msg.set_sequence(from_actor.sequence); - - // Implicit messages need to set a special gas limit - let mut msg = msg.clone(); - msg.gas_limit = IMPLICIT_MESSAGE_GAS_LIMIT as u64; - - let (apply_ret, duration) = vm.apply_implicit_message(&msg)?; - - Ok(ApiInvocResult { - msg: msg.clone(), - msg_rct: Some(apply_ret.msg_receipt()), - msg_cid: msg.cid(), - error: apply_ret.failure_info().unwrap_or_default(), - duration: duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64, - gas_cost: MessageGasCost::default(), - execution_trace: structured::parse_events(apply_ret.exec_trace()).unwrap_or_default(), - }) - } - - /// runs the given message and returns its result without any persisted - /// changes. - pub fn call(&self, message: &Message, tipset: Option) -> Result { - let ts = tipset.unwrap_or_else(|| self.heaviest_tipset()); - let chain_rand = self.chain_rand(ts.shallow_clone()); - self.call_raw(None, message, chain_rand, &ts) - } - - /// Same as [`StateManager::call`] but runs the message on the given state and not - /// on the parent state of the tipset. - pub fn call_on_state( - &self, - state_cid: Cid, - message: &Message, - tipset: Option, - ) -> Result { - let ts = tipset.unwrap_or_else(|| self.cs.heaviest_tipset()); - let chain_rand = self.chain_rand(ts.shallow_clone()); - self.call_raw(Some(state_cid), message, chain_rand, &ts) - } - - pub async fn apply_on_state_with_gas( - self: &Arc, - tipset: Option, - msg: Message, - vm_flush: VMFlush, - ) -> anyhow::Result<(ApiInvocResult, Option)> { - let ts = tipset.unwrap_or_else(|| self.heaviest_tipset()); - - let from_a = self.resolve_to_key_addr(&msg.from, &ts).await?; - - // Pretend that the message is signed. This has an influence on the gas - // cost. We obviously can't generate a valid signature. Instead, we just - // fill the signature with zeros. The validity is not checked. - let mut chain_msg = match from_a.protocol() { - Protocol::Secp256k1 => SignedMessage::new_unchecked( - msg.clone(), - Signature::new_secp256k1(vec![0; SECP_SIG_LEN]), - ) - .into(), - Protocol::Delegated => SignedMessage::new_unchecked( - msg.clone(), - // In Lotus, delegated signatures have the same length as SECP256k1. - // This may or may not change in the future. - Signature::new(SignatureType::Delegated, vec![0; SECP_SIG_LEN]), - ) - .into(), - _ => msg.clone().into(), - }; - - let (_invoc_res, apply_ret, duration, state_root) = self - .call_with_gas(&mut chain_msg, &[], Some(ts), vm_flush) - .await?; - - Ok(( - ApiInvocResult { - msg_cid: msg.cid(), - msg, - msg_rct: Some(apply_ret.msg_receipt()), - error: apply_ret.failure_info().unwrap_or_default(), - duration: duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64, - gas_cost: MessageGasCost::default(), - execution_trace: structured::parse_events(apply_ret.exec_trace()) - .unwrap_or_default(), - }, - state_root, - )) - } - - /// Computes message on the given [Tipset] state, after applying other - /// messages and returns the values computed in the VM. - pub async fn call_with_gas( - self: &Arc, - message: &mut ChainMessage, - prior_messages: &[ChainMessage], - tipset: Option, - vm_flush: VMFlush, - ) -> Result<(InvocResult, ApplyRet, Duration, Option), Error> { - let ts = tipset.unwrap_or_else(|| self.heaviest_tipset()); - let TipsetState { state_root, .. } = self - .load_tipset_state(&ts) - .await - .map_err(|e| Error::Other(format!("Could not load tipset state: {e:#}")))?; - let chain_rand = self.chain_rand(ts.clone()); - - // Since we're simulating a future message, pretend we're applying it in the - // "next" tipset - let epoch = ts.epoch() + 1; - let genesis_info = GenesisInfo::from_chain_config(self.chain_config().clone()); - // FVM requires a stack size of 64MiB. The alternative is to use `ThreadedExecutor` from - // FVM, but that introduces some constraints, and possible deadlocks. - let (ret, duration, state_cid) = stacker::grow(64 << 20, || -> anyhow::Result<_> { - let mut vm = VM::new( - ExecutionContext { - heaviest_tipset: ts.clone(), - state_tree_root: state_root, - epoch, - rand: Box::new(chain_rand), - base_fee: ts.block_headers().first().parent_base_fee.clone(), - circ_supply: genesis_info.get_vm_circulating_supply( - epoch, - self.blockstore(), - &state_root, - )?, - chain_config: self.chain_config().shallow_clone(), - chain_index: self.chain_index().shallow_clone(), - timestamp: ts.min_timestamp(), - }, - &self.engine, - VMTrace::NotTraced, - )?; - - for msg in prior_messages { - vm.apply_message(msg)?; - } - let from_actor = vm - .get_actor(&message.from()) - .map_err(|e| Error::Other(format!("Could not get actor from state: {e:#}")))? - .ok_or_else(|| Error::Other("cant find actor in state tree".to_string()))?; - - message.set_sequence(from_actor.sequence); - let (ret, duration) = vm.apply_message(message)?; - let state_root = match vm_flush { - VMFlush::Flush => Some(vm.flush()?), - VMFlush::Skip => None, - }; - Ok((ret, duration, state_root)) - })?; - - Ok(( - InvocResult::new(message.message().clone(), &ret), - ret, - duration, - state_cid, - )) - } - - /// Replays the given message and returns the result of executing the - /// indicated message, assuming it was executed in the indicated tipset. - pub async fn replay(self: &Arc, ts: Tipset, mcid: Cid) -> Result { - let this = Arc::clone(self); - tokio::task::spawn_blocking(move || this.replay_blocking(ts, mcid)).await? - } - - /// Blocking version of `replay` - pub fn replay_blocking( - self: &Arc, - ts: Tipset, - mcid: Cid, - ) -> Result { - const REPLAY_HALT: &str = "replay_halt"; - - let mut api_invoc_result = None; - let callback = |ctx: MessageCallbackCtx<'_>| { - match ctx.at { - CalledAt::Applied | CalledAt::Reward - if api_invoc_result.is_none() && ctx.cid == mcid => - { - api_invoc_result = Some(ApiInvocResult { - msg_cid: ctx.message.cid(), - msg: ctx.message.message().clone(), - msg_rct: Some(ctx.apply_ret.msg_receipt()), - error: ctx.apply_ret.failure_info().unwrap_or_default(), - duration: ctx.duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64, - gas_cost: MessageGasCost::new(ctx.message.message(), ctx.apply_ret)?, - execution_trace: structured::parse_events(ctx.apply_ret.exec_trace()) - .unwrap_or_default(), - }); - anyhow::bail!(REPLAY_HALT); - } - _ => Ok(()), // ignored - } - }; - let result = self.compute_tipset_state_blocking(ts, Some(callback), VMTrace::Traced); - if let Err(error_message) = result - && error_message.to_string() != REPLAY_HALT - { - return Err(Error::Other(format!( - "unexpected error during execution : {error_message:}" - ))); - } - api_invoc_result.ok_or_else(|| Error::Other("failed to replay".into())) - } - - /// Replays a tipset up to a target message, capturing the state root before - /// and after execution. - pub async fn replay_for_prestate( - self: &Arc, - ts: Tipset, - target_message_cid: Cid, - ) -> Result<(Cid, ApiInvocResult, Cid), Error> { - let this = Arc::clone(self); - tokio::task::spawn_blocking(move || { - this.replay_for_prestate_blocking(ts, target_message_cid) - }) - .await - .map_err(|e| Error::Other(format!("{e}")))? - } - - fn replay_for_prestate_blocking( - self: &Arc, - ts: Tipset, - target_msg_cid: Cid, - ) -> Result<(Cid, ApiInvocResult, Cid), Error> { - if ts.epoch() == 0 { - return Err(Error::Other( - "cannot trace messages in the genesis block".into(), - )); - } - - let genesis_timestamp = self.chain_store().genesis_block_header().timestamp; - let exec = TipsetExecutor::new( - self.chain_index().shallow_clone(), - self.chain_config().shallow_clone(), - self.beacon_schedule().shallow_clone(), - &self.engine, - ts.shallow_clone(), - ); - let mut no_cb = NO_CALLBACK; - let (parent_state, epoch, block_messages) = - exec.prepare_parent_state(genesis_timestamp, VMTrace::NotTraced, &mut no_cb)?; - - Ok(stacker::grow(64 << 20, || { - let mut vm = - exec.create_vm(parent_state, epoch, ts.min_timestamp(), VMTrace::NotTraced)?; - let mut processed = ahash::HashSet::default(); - - for block in block_messages.iter() { - let mut penalty = TokenAmount::zero(); - let mut gas_reward = TokenAmount::zero(); - - for msg in block.messages.iter() { - let cid = msg.cid(); - if processed.contains(&cid) { - continue; - } - - processed.insert(cid); - - if cid == target_msg_cid { - let pre_root = vm.flush()?; - let mut traced_vm = - exec.create_vm(pre_root, epoch, ts.min_timestamp(), VMTrace::Traced)?; - let (ret, duration) = traced_vm.apply_message(msg)?; - let post_root = traced_vm.flush()?; - - return Ok(( - pre_root, - ApiInvocResult { - msg_cid: cid, - msg: msg.message().clone(), - msg_rct: Some(ret.msg_receipt()), - error: ret.failure_info().unwrap_or_default(), - duration: duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64, - gas_cost: MessageGasCost::default(), - execution_trace: structured::parse_events(ret.exec_trace()) - .unwrap_or_default(), - }, - post_root, - )); - } - - let (ret, _) = vm.apply_message(msg)?; - gas_reward += ret.miner_tip(); - penalty += ret.penalty(); - } - - if let Some(rew_msg) = - vm.reward_message(epoch, block.miner, block.win_count, penalty, gas_reward)? - { - let (ret, _) = vm.apply_implicit_message(&rew_msg)?; - if let Some(err) = ret.failure_info() { - bail!( - "failed to apply reward message for miner {}: {err}", - block.miner - ); - } - - // This is more of a sanity check, this should not be able to be hit. - if !ret.msg_receipt().exit_code().is_success() { - bail!( - "reward application message failed (exit: {:?})", - ret.msg_receipt().exit_code() - ); - } - } - } - - bail!("message {target_msg_cid} not found in tipset") - })?) - } - - /// Checks the eligibility of the miner. This is used in the validation that - /// a block's miner has the requirements to mine a block. - pub fn eligible_to_mine( - &self, - address: &Address, - base_tipset: &Tipset, - lookback_tipset: &Tipset, - ) -> anyhow::Result { - let hmp = - self.miner_has_min_power(&self.chain_config().policy, address, lookback_tipset)?; - let version = self.get_network_version(base_tipset.epoch()); - - if version <= NetworkVersion::V3 { - return Ok(hmp); - } - - if !hmp { - return Ok(false); - } - - let actor = self - .get_actor(&Address::POWER_ACTOR, *base_tipset.parent_state())? - .ok_or_else(|| Error::state("Power actor address could not be resolved"))?; - - let power_state = power::State::load(self.blockstore(), actor.code, actor.state)?; - - let actor = self - .get_actor(address, *base_tipset.parent_state())? - .ok_or_else(|| Error::state("Miner actor address could not be resolved"))?; - - let miner_state = miner::State::load(self.blockstore(), actor.code, actor.state)?; - - // Non-empty power claim. - let claim = power_state - .miner_power(self.blockstore(), address)? - .ok_or_else(|| Error::Other("Could not get claim".to_string()))?; - if claim.quality_adj_power <= BigInt::zero() { - return Ok(false); - } - - // No fee debt. - if !miner_state.fee_debt().is_zero() { - return Ok(false); - } - - // No active consensus faults. - let info = miner_state.info(self.blockstore())?; - if base_tipset.epoch() <= info.consensus_fault_elapsed { - return Ok(false); - } - - Ok(true) - } - - /// Conceptually, a [`Tipset`] consists of _blocks_ which share an _epoch_. - /// Each _block_ contains _messages_, which are executed by the _Filecoin Virtual Machine_. - /// - /// VM message execution essentially looks like this: - /// ```text - /// state[N-900..N] * message = state[N+1] - /// ``` - /// - /// The `state`s above are stored in the `IPLD Blockstore`, and can be referred to by - /// a [`Cid`] - the _state root_. - /// The previous 900 states (configurable, see - /// ) can be - /// queried when executing a message, so a store needs at least that many. - /// (a snapshot typically contains 2000, for example). - /// - /// Each message costs FIL to execute - this is _gas_. - /// After execution, the message has a _receipt_, showing how much gas was spent. - /// This is similarly a [`Cid`] into the block store. - /// - /// For details, see the documentation for [`apply_block_messages`]. - /// - pub async fn compute_tipset_state( - self: &Arc, - tipset: Tipset, - callback: Option) -> anyhow::Result<()> + Send + 'static>, - enable_tracing: VMTrace, - ) -> Result { - let this = Arc::clone(self); - tokio::task::spawn_blocking(move || { - this.compute_tipset_state_blocking(tipset, callback, enable_tracing) - }) - .await? - } - - /// Blocking version of `compute_tipset_state` - pub fn compute_tipset_state_blocking( - &self, - tipset: Tipset, - callback: Option) -> anyhow::Result<()>>, - enable_tracing: VMTrace, - ) -> Result { - let epoch = tipset.epoch(); - let has_callback = callback.is_some(); - info!( - "Evaluating tipset: EPOCH={epoch}, blocks={}, tsk={}", - tipset.len(), - tipset.key(), - ); - Ok(apply_block_messages( - self.chain_store().genesis_block_header().timestamp, - self.chain_index().shallow_clone(), - self.chain_config().shallow_clone(), - self.beacon_schedule().shallow_clone(), - &self.engine, - tipset, - callback, - enable_tracing, - ) - .map_err(|e| { - if has_callback { - e - } else { - e.context(format!("Failed to compute tipset state@{epoch}")) - } - })?) - } - - #[instrument(skip_all)] - pub async fn compute_state( - self: &Arc, - height: ChainEpoch, - messages: Vec, - tipset: Tipset, - callback: Option) -> anyhow::Result<()> + Send + 'static>, - enable_tracing: VMTrace, - ) -> Result { - let this = Arc::clone(self); - tokio::task::spawn_blocking(move || { - this.compute_state_blocking(height, messages, tipset, callback, enable_tracing) - }) - .await? - } - - /// Blocking version of `compute_state` - #[tracing::instrument(skip_all)] - pub fn compute_state_blocking( - &self, - height: ChainEpoch, - messages: Vec, - tipset: Tipset, - callback: Option) -> anyhow::Result<()>>, - enable_tracing: VMTrace, - ) -> Result { - Ok(compute_state( - height, - messages, - tipset, - self.chain_store().genesis_block_header().timestamp, - self.chain_index().shallow_clone(), - self.chain_config().shallow_clone(), - self.beacon_schedule().shallow_clone(), - &self.engine, - callback, - enable_tracing, - )?) - } - - /// Check if tipset had executed the message, by loading the receipt based - /// on the index of the message in the block. - fn tipset_executed_message( - &self, - tipset: &Tipset, - message: &ChainMessage, - allow_replaced: bool, - ) -> Result, Error> { - if tipset.epoch() == 0 { - return Ok(None); - } - let message_from_address = message.from(); - let message_sequence = message.sequence(); - // Load parent state. - let pts = self - .chain_index() - .load_required_tipset(tipset.parents()) - .map_err(|err| Error::Other(format!("Failed to load tipset: {err}")))?; - let messages = self - .cs - .messages_for_tipset(&pts) - .map_err(|err| Error::Other(format!("Failed to load messages for tipset: {err}")))?; - messages - .iter() - .enumerate() - // iterate in reverse because we going backwards through the chain - .rev() - .filter(|(_, s)| { - s.sequence() == message_sequence - && s.from() == message_from_address - && s.equal_call(message) - }) - .map(|(index, m)| { - // A replacing message is a message with a different CID, - // any of Gas values, and different signature, but with all - // other parameters matching (source/destination, nonce, params, etc.) - if !allow_replaced && message.cid() != m.cid(){ - Err(Error::Other(format!( - "found message with equal nonce and call params but different CID. wanted {}, found: {}, nonce: {}, from: {}", - message.cid(), - m.cid(), - message.sequence(), - message.from(), - ))) - } else { - let block_header = tipset.block_headers().first(); - crate::chain::get_parent_receipt( - self.blockstore(), - block_header, - index, - ) - .map_err(|err| Error::Other(format!("Failed to get parent receipt (message_receipts={}, index={index}, error={err})", block_header.message_receipts))) - } - }) - .next() - .unwrap_or(Ok(None)) - } - - fn check_search( - &self, - mut current: Tipset, - message: &ChainMessage, - lookback_max_epoch: ChainEpoch, - allow_replaced: bool, - ) -> Result, Error> { - let message_from_address = message.from(); - let message_sequence = message.sequence(); - let mut current_actor_state = self - .get_required_actor(&message_from_address, *current.parent_state()) - .map_err(Error::state)?; - let message_from_id = self.lookup_required_id(&message_from_address, ¤t)?; - - while current.epoch() >= lookback_max_epoch { - let parent_tipset = self - .chain_index() - .load_required_tipset(current.parents()) - .map_err(|err| { - Error::Other(format!( - "failed to load tipset during msg wait searchback: {err:}" - )) - })?; - - let parent_actor_state = self - .get_actor(&message_from_id, *parent_tipset.parent_state()) - .map_err(|e| Error::State(e.to_string()))?; - - if parent_actor_state.is_none() - || (current_actor_state.sequence > message_sequence - && parent_actor_state.as_ref().unwrap().sequence <= message_sequence) - { - let receipt = self - .tipset_executed_message(¤t, message, allow_replaced)? - .context("Failed to get receipt with tipset_executed_message")?; - return Ok(Some((current, receipt))); - } - - if let Some(parent_actor_state) = parent_actor_state { - current = parent_tipset; - current_actor_state = parent_actor_state; - } else { - break; - } - } - - Ok(None) - } - - /// Searches backwards through the chain for a message receipt. - fn search_back_for_message( - &self, - current: Tipset, - message: &ChainMessage, - look_back_limit: Option, - allow_replaced: Option, - ) -> Result, Error> { - let current_epoch = current.epoch(); - let allow_replaced = allow_replaced.unwrap_or(true); - - // Calculate the max lookback epoch (inclusive lower bound) for the search. - let lookback_max_epoch = match look_back_limit { - // No search: limit = 0 means search 0 epochs - Some(0) => return Ok(None), - // Limited search: calculate the inclusive lower bound, clamped to genesis - // Example: limit=5 at epoch=1000 → min_epoch=996, searches [996,1000] = 5 epochs - // Example: limit=2000 at epoch=1000 → min_epoch=0, searches [0,1000] = 1001 epochs (all available) - Some(limit) if limit > 0 => (current_epoch - limit + 1).max(0), - // Search all the way to genesis (epoch 0) - _ => 0, - }; - - self.check_search(current, message, lookback_max_epoch, allow_replaced) - } - - /// Returns a message receipt from a given tipset and message CID. - pub fn get_receipt(&self, tipset: Tipset, msg: Cid) -> Result { - let m = crate::chain::get_chain_message(self.blockstore(), &msg) - .map_err(|e| Error::Other(e.to_string()))?; - let message_receipt = self.tipset_executed_message(&tipset, &m, true)?; - if let Some(receipt) = message_receipt { - return Ok(receipt); - } - - let maybe_tuple = self.search_back_for_message(tipset, &m, None, None)?; - let message_receipt = maybe_tuple - .ok_or_else(|| { - Error::Other("Could not get receipt from search back message".to_string()) - })? - .1; - Ok(message_receipt) - } - - /// `WaitForMessage` blocks until a message appears on chain. It looks - /// backwards in the chain to see if this has already happened. It - /// guarantees that the message has been on chain for at least - /// confidence epochs without being reverted before returning. - pub async fn wait_for_message( - self: &Arc, - msg_cid: Cid, - confidence: i64, - look_back_limit: Option, - allow_replaced: Option, - ) -> Result<(Option, Option), Error> { - let mut head_changes_rx = self.cs.subscribe_head_changes(); - let (sender, mut receiver) = oneshot::channel::<()>(); - let message = crate::chain::get_chain_message(self.blockstore(), &msg_cid) - .map_err(|err| Error::Other(format!("failed to load message {err:}")))?; - let current_tipset = self.heaviest_tipset(); - let maybe_message_receipt = - self.tipset_executed_message(¤t_tipset, &message, true)?; - if let Some(r) = maybe_message_receipt { - return Ok((Some(current_tipset.shallow_clone()), Some(r))); - } - - let mut candidate_tipset: Option = None; - let mut candidate_receipt: Option = None; - - let sm_cloned = self.shallow_clone(); - - let message_for_task = message.clone(); - let height_of_head = current_tipset.epoch(); - let task = tokio::task::spawn(async move { - let back_tuple = sm_cloned.search_back_for_message( - current_tipset, - &message_for_task, - look_back_limit, - allow_replaced, - )?; - sender - .send(()) - .map_err(|e| Error::Other(format!("Could not send to channel {e:?}")))?; - Ok::<_, Error>(back_tuple) - }); - - let reverts: Arc>> = Arc::new(RwLock::new(HashMap::new())); - let block_revert = reverts.clone(); - let sm_cloned = Arc::clone(self); - - // Wait for message to be included in head change. - let mut subscriber_poll = tokio::task::spawn(async move { - loop { - match head_changes_rx.recv().await { - Ok(head_changes) => { - for tipset in head_changes.reverts { - if candidate_tipset - .as_ref() - .is_some_and(|candidate| candidate.key() == tipset.key()) - { - candidate_tipset = None; - candidate_receipt = None; - } - } - for tipset in head_changes.applies { - if candidate_tipset - .as_ref() - .map(|s| tipset.epoch() >= s.epoch() + confidence) - .unwrap_or_default() - { - return Ok((candidate_tipset, candidate_receipt)); - } - let poll_receiver = receiver.try_recv(); - if let Ok(Some(_)) = poll_receiver { - block_revert - .write() - .await - .insert(tipset.key().to_owned(), true); - } - - let maybe_receipt = - sm_cloned.tipset_executed_message(&tipset, &message, true)?; - if let Some(receipt) = maybe_receipt { - if confidence == 0 { - return Ok((Some(tipset), Some(receipt))); - } - candidate_tipset = Some(tipset); - candidate_receipt = Some(receipt) - } - } - } - Err(RecvError::Lagged(i)) => { - warn!( - "wait for message head change subscriber lagged, skipped {} events", - i - ); - } - Err(RecvError::Closed) => break, - } - } - Ok((None, None)) - }) - .fuse(); - - // Search backwards for message. - let mut search_back_poll = tokio::task::spawn(async move { - let back_tuple = task.await.map_err(|e| { - Error::Other(format!("Could not search backwards for message {e}")) - })??; - if let Some((back_tipset, back_receipt)) = back_tuple { - let should_revert = *reverts - .read() - .await - .get(back_tipset.key()) - .unwrap_or(&false); - let larger_height_of_head = height_of_head >= back_tipset.epoch() + confidence; - if !should_revert && larger_height_of_head { - return Ok::<_, Error>((Some(back_tipset), Some(back_receipt))); - } - return Ok((None, None)); - } - Ok((None, None)) - }) - .fuse(); - - // Await on first future to finish. - loop { - select! { - res = subscriber_poll => { - return res? - } - res = search_back_poll => { - if let Ok((Some(ts), Some(rct))) = res? { - return Ok((Some(ts), Some(rct))); - } - } - } - } - } - - pub async fn search_for_message( - &self, - from: Option, - msg_cid: Cid, - look_back_limit: Option, - allow_replaced: Option, - ) -> Result, Error> { - let from = from.unwrap_or_else(|| self.heaviest_tipset()); - let message = crate::chain::get_chain_message(self.blockstore(), &msg_cid) - .map_err(|err| Error::Other(format!("failed to load message {err}")))?; - let current_tipset = self.heaviest_tipset(); - let maybe_message_receipt = - self.tipset_executed_message(&from, &message, allow_replaced.unwrap_or(true))?; - if let Some(r) = maybe_message_receipt { - Ok(Some((from, r))) - } else { - self.search_back_for_message(current_tipset, &message, look_back_limit, allow_replaced) - } - } - - /// Returns a BLS public key from provided address - pub fn get_bls_public_key( - db: &Arc, - addr: &Address, - state_cid: Cid, - ) -> Result { - let state = StateTree::new_from_root(Arc::clone(db), &state_cid) - .map_err(|e| Error::Other(e.to_string()))?; - let kaddr = - resolve_to_key_addr(&state, db, addr).context("Failed to resolve key address")?; - - match kaddr.into_payload() { - Payload::BLS(key) => BlsPublicKey::from_bytes(&key) - .context("Failed to construct bls public key") - .map_err(Error::from), - _ => Err(Error::state( - "Address must be BLS address to load bls public key", - )), - } - } - - /// Looks up ID [Address] from the state at the given [Tipset]. - pub fn lookup_id(&self, addr: &Address, ts: &Tipset) -> Result, Error> { - let state_tree = StateTree::new_from_root(self.blockstore_owned(), ts.parent_state()) - .map_err(|e| format!("{e:?}"))?; - Ok(state_tree - .lookup_id(addr) - .map_err(|e| Error::Other(e.to_string()))? - .map(Address::new_id)) - } - - /// Looks up required ID [Address] from the state at the given [Tipset]. - pub fn lookup_required_id(&self, addr: &Address, ts: &Tipset) -> Result { - self.lookup_id(addr, ts)? - .ok_or_else(|| Error::Other(format!("Failed to lookup the id address {addr}"))) - } - - /// Retrieves market state - pub fn market_state(&self, ts: &Tipset) -> Result { - let actor = self.get_required_actor(&Address::MARKET_ACTOR, *ts.parent_state())?; - let market_state = market::State::load(self.blockstore(), actor.code, actor.state)?; - Ok(market_state) - } - - /// Retrieves market balance in escrow and locked tables. - pub fn market_balance(&self, addr: &Address, ts: &Tipset) -> Result { - let market_state = self.market_state(ts)?; - let new_addr = self.lookup_required_id(addr, ts)?; - let out = MarketBalance { - escrow: { - market_state - .escrow_table(self.blockstore())? - .get(&new_addr)? - }, - locked: { - market_state - .locked_table(self.blockstore())? - .get(&new_addr)? - }, - }; - - Ok(out) - } - - /// Retrieves miner info. - pub fn miner_info(&self, addr: &Address, ts: &Tipset) -> Result { - let actor = self - .get_actor(addr, *ts.parent_state())? - .ok_or_else(|| Error::state("Miner actor not found"))?; - let state = miner::State::load(self.blockstore(), actor.code, actor.state)?; - - Ok(state.info(self.blockstore())?) - } - - /// Retrieves miner faults. - pub fn miner_faults(&self, addr: &Address, ts: &Tipset) -> Result { - self.all_partition_sectors(addr, ts, |partition| partition.faulty_sectors().clone()) - } - - /// Retrieves miner recoveries. - pub fn miner_recoveries(&self, addr: &Address, ts: &Tipset) -> Result { - self.all_partition_sectors(addr, ts, |partition| partition.recovering_sectors().clone()) - } - - fn all_partition_sectors( - &self, - addr: &Address, - ts: &Tipset, - get_sector: impl Fn(Partition<'_>) -> BitField, - ) -> Result { - let actor = self - .get_actor(addr, *ts.parent_state())? - .ok_or_else(|| Error::state("Miner actor not found"))?; - - let state = miner::State::load(self.blockstore(), actor.code, actor.state)?; - - let mut partitions = Vec::new(); - - state.for_each_deadline( - &self.chain_config().policy, - self.blockstore(), - |_, deadline| { - deadline.for_each(self.blockstore(), |_, partition| { - partitions.push(get_sector(partition)); - Ok(()) - }) - }, - )?; - - Ok(BitField::union(partitions.iter())) - } - - /// Retrieves miner power. - pub fn miner_power(&self, addr: &Address, ts: &Tipset) -> Result { - if let Some((miner_power, total_power)) = self.get_power(ts.parent_state(), Some(addr))? { - return Ok(MinerPower { - miner_power, - total_power, - has_min_power: true, - }); - } - - Ok(MinerPower { - has_min_power: false, - miner_power: Default::default(), - total_power: Default::default(), - }) - } - - /// Similar to `resolve_to_key_addr` in the `forest_vm` [`crate::state_manager`] but does not - /// allow `Actor` type of addresses. Uses `ts` to generate the VM state. - pub async fn resolve_to_key_addr( - self: &Arc, - addr: &Address, - ts: &Tipset, - ) -> anyhow::Result
{ - match addr.protocol() { - Protocol::BLS | Protocol::Secp256k1 | Protocol::Delegated => return Ok(*addr), - Protocol::Actor => { - return Err(Error::Other( - "cannot resolve actor address to key address".to_string(), - ) - .into()); - } - _ => {} - }; - - // First try to resolve the actor in the parent state, so we don't have to - // compute anything. - let state = StateTree::new_from_root(self.blockstore_owned(), ts.parent_state())?; - if let Ok(addr) = resolve_to_key_addr(&state, self.blockstore(), addr) { - return Ok(addr); - } - - // If that fails, compute the tip-set and try again. - let TipsetState { state_root, .. } = self.load_tipset_state(ts).await?; - let state = StateTree::new_from_root(self.blockstore_owned(), &state_root)?; - - resolve_to_key_addr(&state, self.blockstore(), addr) - } - - pub async fn miner_get_base_info( - self: &Arc, - beacon_schedule: &BeaconSchedule, - tipset: Tipset, - addr: Address, - epoch: ChainEpoch, - ) -> anyhow::Result> { - let prev_beacon = self - .chain_store() - .chain_index() - .latest_beacon_entry(tipset.clone())?; - - let entries: Vec = beacon_schedule - .beacon_entries_for_block( - self.chain_config().network_version(epoch), - epoch, - tipset.epoch(), - &prev_beacon, - ) - .await?; - - let base = entries.last().unwrap_or(&prev_beacon); - - let (lb_tipset, lb_state_root) = ChainStore::get_lookback_tipset_for_round( - self.chain_index(), - self.chain_config(), - &tipset, - epoch, - )?; - - // If the miner actor doesn't exist in the current tipset, it is a - // user-error and we must return an error message. If the miner exists - // in the current tipset but not in the lookback tipset, we may not - // error and should instead return None. - let actor = self.get_required_actor(&addr, *tipset.parent_state())?; - if self.get_actor(&addr, lb_state_root)?.is_none() { - return Ok(None); - } - - let miner_state = miner::State::load(self.blockstore(), actor.code, actor.state)?; - - let addr_buf = to_vec(&addr)?; - let rand = draw_randomness( - base.signature(), - DomainSeparationTag::WinningPoStChallengeSeed as i64, - epoch, - &addr_buf, - )?; - - let network_version = self.chain_config().network_version(tipset.epoch()); - let sectors = self.get_sectors_for_winning_post( - &lb_state_root, - network_version, - &addr, - Randomness::new(rand.to_vec()), - )?; - - if sectors.is_empty() { - return Ok(None); - } - - let (miner_power, total_power) = self - .get_power(&lb_state_root, Some(&addr))? - .context("failed to get power")?; - - let info = miner_state.info(self.blockstore())?; - - let worker_key = self - .resolve_to_deterministic_address(info.worker, &tipset) - .await?; - let eligible = self.eligible_to_mine(&addr, &tipset, &lb_tipset)?; - - Ok(Some(MiningBaseInfo { - miner_power: miner_power.quality_adj_power, - network_power: total_power.quality_adj_power, - sectors, - worker_key, - sector_size: info.sector_size, - prev_beacon_entry: prev_beacon, - beacon_entries: entries, - eligible_for_mining: eligible, - })) - } - - /// Checks power actor state for if miner meets consensus minimum - /// requirements. - pub fn miner_has_min_power( - &self, - policy: &Policy, - addr: &Address, - ts: &Tipset, - ) -> anyhow::Result { - let actor = self - .get_actor(&Address::POWER_ACTOR, *ts.parent_state())? - .ok_or_else(|| Error::state("Power actor address could not be resolved"))?; - let ps = power::State::load(self.blockstore(), actor.code, actor.state)?; - - ps.miner_nominal_power_meets_consensus_minimum(policy, self.blockstore(), addr) - } - - /// Validates all tipsets at epoch `start..=end` behind the heaviest tipset. - /// - /// This spawns [`rayon::current_num_threads`] threads to do the compute-heavy work - /// of tipset validation. - /// - /// # What is validation? - /// Every state transition returns a new _state root_, which is typically retained in, e.g., snapshots. - /// For "full" snapshots, all state roots are retained. - /// For standard snapshots, the last 2000 or so state roots are retained. - /// - /// _receipts_ meanwhile, are typically ephemeral, but each tipset knows the _receipt root_ - /// (hash) of the previous tipset. - /// - /// This function takes advantage of that fact to validate tipsets: - /// - `tipset[N]` claims that `receipt_root[N-1]` should be `0xDEADBEEF` - /// - find `tipset[N-1]`, and perform its state transition to get the actual `receipt_root` - /// - assert that they match - /// - /// See [`Self::compute_tipset_state_blocking`] for an explanation of state transitions. - /// - /// # Known issues - /// This function is blocking, but we do observe threads waiting and synchronizing. - /// This is suspected to be due something in the VM or its `WASM` runtime. - #[tracing::instrument(skip(self))] - pub fn validate_range(&self, epochs: RangeInclusive) -> anyhow::Result<()> { - let heaviest = self.heaviest_tipset(); - let heaviest_epoch = heaviest.epoch(); - let end = self - .chain_index() - .tipset_by_height(*epochs.end(), heaviest, ResolveNullTipset::TakeOlder) - .with_context(|| { - format!( - "couldn't get a tipset at height {} behind heaviest tipset at height {heaviest_epoch}", - *epochs.end(), - ) - })?; - - // lookup tipset parents as we go along, iterating DOWN from `end` - let tipsets = end - .chain(self.blockstore()) - .take_while(|ts| ts.epoch() >= *epochs.start()); - - self.validate_tipsets(tipsets) - } - - pub fn validate_tipsets(&self, tipsets: T) -> anyhow::Result<()> - where - T: Iterator + Send, - { - let genesis_timestamp = self.chain_store().genesis_block_header().timestamp; - validate_tipsets( - genesis_timestamp, - self.chain_index(), - self.chain_config(), - self.beacon_schedule(), - &self.engine, - tipsets, - ) - } - - pub fn get_verified_registry_actor_state( - &self, - ts: &Tipset, - ) -> anyhow::Result { - let act = self - .get_actor(&Address::VERIFIED_REGISTRY_ACTOR, *ts.parent_state()) - .map_err(Error::state)? - .ok_or_else(|| Error::state("actor not found"))?; - verifreg::State::load(self.blockstore(), act.code, act.state) - } - pub fn get_claim( - &self, - addr: &Address, - ts: &Tipset, - claim_id: ClaimID, - ) -> anyhow::Result> { - let id_address = self.lookup_required_id(addr, ts)?; - let state = self.get_verified_registry_actor_state(ts)?; - state.get_claim(self.blockstore(), id_address, claim_id) - } - - pub fn get_all_claims(&self, ts: &Tipset) -> anyhow::Result> { - let state = self.get_verified_registry_actor_state(ts)?; - state.get_all_claims(self.blockstore()) - } - - pub fn get_allocation( - &self, - addr: &Address, - ts: &Tipset, - allocation_id: AllocationID, - ) -> anyhow::Result> { - let id_address = self.lookup_required_id(addr, ts)?; - let state = self.get_verified_registry_actor_state(ts)?; - state.get_allocation(self.blockstore(), id_address.id()?, allocation_id) - } - - pub fn get_all_allocations( - &self, - ts: &Tipset, - ) -> anyhow::Result> { - let state = self.get_verified_registry_actor_state(ts)?; - state.get_all_allocations(self.blockstore()) - } - - pub fn verified_client_status( - &self, - addr: &Address, - ts: &Tipset, - ) -> anyhow::Result> { - let id = self.lookup_required_id(addr, ts)?; - let network_version = self.get_network_version(ts.epoch()); - - // This is a copy of Lotus code, we need to treat all the actors below version 9 - // differently. Which maps to network below version 17. - // Original: https://github.com/filecoin-project/lotus/blob/5e76b05b17771da6939c7b0bf65127c3dc70ee23/node/impl/full/state.go#L1627-L1664. - if (u32::from(network_version.0)) < 17 { - let state = self.get_verified_registry_actor_state(ts)?; - return state.verified_client_data_cap(self.blockstore(), id); - } - - let act = self - .get_actor(&Address::DATACAP_TOKEN_ACTOR, *ts.parent_state()) - .map_err(Error::state)? - .ok_or_else(|| Error::state("Miner actor not found"))?; - - let state = datacap::State::load(self.blockstore(), act.code, act.state)?; - - state.verified_client_data_cap(self.blockstore(), id) - } - - pub async fn resolve_to_deterministic_address( - self: &Arc, - address: Address, - ts: &Tipset, - ) -> anyhow::Result
{ - use crate::shim::address::Protocol::*; - match address.protocol() { - BLS | Secp256k1 | Delegated => Ok(address), - Actor => anyhow::bail!("cannot resolve actor address to key address"), - _ => { - // First try to resolve the actor in the parent state, so we don't have to compute anything. - if let Ok(state) = - StateTree::new_from_root(self.blockstore_owned(), ts.parent_state()) - && let Ok(address) = state - .resolve_to_deterministic_addr(self.chain_store().blockstore(), address) - { - return Ok(address); - } - - // If that fails, compute the tip-set and try again. - let TipsetState { state_root, .. } = self.load_tipset_state(ts).await?; - let state = StateTree::new_from_root(self.blockstore_owned(), &state_root)?; - state.resolve_to_deterministic_addr(self.chain_store().blockstore(), address) - } - } - } - - pub fn execution_trace(&self, tipset: &Tipset) -> anyhow::Result<(Cid, Vec)> { - let mut invoc_trace = vec![]; - - let genesis_timestamp = self.chain_store().genesis_block_header().timestamp; - - let callback = |ctx: MessageCallbackCtx<'_>| { - match ctx.at { - CalledAt::Applied | CalledAt::Reward => { - invoc_trace.push(ApiInvocResult { - msg_cid: ctx.message.cid(), - msg: ctx.message.message().clone(), - msg_rct: Some(ctx.apply_ret.msg_receipt()), - error: ctx.apply_ret.failure_info().unwrap_or_default(), - duration: ctx.duration.as_nanos().clamp(0, u128::from(u64::MAX)) as u64, - gas_cost: MessageGasCost::new(ctx.message.message(), ctx.apply_ret)?, - execution_trace: structured::parse_events(ctx.apply_ret.exec_trace()) - .unwrap_or_default(), - }); - Ok(()) - } - _ => Ok(()), // ignored - } - }; - - let ExecutedTipset { state_root, .. } = apply_block_messages( - genesis_timestamp, - self.chain_index().shallow_clone(), - self.chain_config().shallow_clone(), - self.beacon_schedule().shallow_clone(), - &self.engine, - tipset.shallow_clone(), - Some(callback), - VMTrace::Traced, - )?; - - Ok((state_root, invoc_trace)) - } -} - -pub fn validate_tipsets( - genesis_timestamp: u64, - chain_index: &ChainIndex, - chain_config: &Arc, - beacon: &Arc, - engine: &MultiEngine, - tipsets: T, -) -> anyhow::Result<()> -where - DB: Blockstore + Send + Sync + 'static, - T: Iterator + Send, -{ - use rayon::iter::ParallelIterator as _; - tipsets - .tuple_windows() - .par_bridge() - .try_for_each(|(child, parent)| { - info!(height = parent.epoch(), "compute parent state"); - let ExecutedTipset { - state_root: actual_state, - receipt_root: actual_receipt, - .. - } = apply_block_messages( - genesis_timestamp, - chain_index.shallow_clone(), - chain_config.shallow_clone(), - beacon.shallow_clone(), - engine, - parent, - NO_CALLBACK, - VMTrace::NotTraced, - ) - .context("couldn't compute tipset state")?; - let expected_receipt = child.min_ticket_block().message_receipts; - let expected_state = child.parent_state(); - match (expected_state, expected_receipt) == (&actual_state, actual_receipt) { - true => Ok(()), - false => { - error!( - height = child.epoch(), - ?expected_state, - ?expected_receipt, - ?actual_state, - ?actual_receipt, - "state mismatch" - ); - bail!("state mismatch"); - } - } - }) -} - -/// Shared context for creating VMs and preparing tipset state. -/// -/// Encapsulates randomness source, genesis info, VM construction, -/// null-epoch cron handling, and state migrations. -struct TipsetExecutor<'a, DB: Blockstore + Send + Sync + 'static> { - tipset: Tipset, - rand: ChainRand, - chain_config: Arc, - chain_index: ChainIndex, - genesis_info: GenesisInfo, - engine: &'a MultiEngine, -} - -impl<'a, DB: Blockstore + Send + Sync + 'static> TipsetExecutor<'a, DB> { - fn new( - chain_index: ChainIndex, - chain_config: Arc, - beacon: Arc, - engine: &'a MultiEngine, - tipset: Tipset, - ) -> Self { - let rand = ChainRand::new( - chain_config.shallow_clone(), - tipset.shallow_clone(), - chain_index.shallow_clone(), - beacon, - ); - let genesis_info = GenesisInfo::from_chain_config(chain_config.shallow_clone()); - Self { - tipset, - rand, - chain_config, - chain_index, - genesis_info, - engine, - } - } - - fn create_vm( - &self, - state_root: Cid, - epoch: ChainEpoch, - timestamp: u64, - trace: VMTrace, - ) -> anyhow::Result> { - let circ_supply = self.genesis_info.get_vm_circulating_supply( - epoch, - self.chain_index.db(), - &state_root, - )?; - VM::new( - ExecutionContext { - heaviest_tipset: self.tipset.shallow_clone(), - state_tree_root: state_root, - epoch, - rand: Box::new(self.rand.shallow_clone()), - base_fee: self.tipset.min_ticket_block().parent_base_fee.clone(), - circ_supply, - chain_config: self.chain_config.shallow_clone(), - chain_index: self.chain_index.shallow_clone(), - timestamp, - }, - self.engine, - trace, - ) - } - - /// Produces the state root ready for message execution by running - /// null-epoch `crons` and any pending state migrations. - fn prepare_parent_state( - &self, - genesis_timestamp: u64, - null_epoch_trace: VMTrace, - cron_callback: &mut Option, - ) -> anyhow::Result<(Cid, ChainEpoch, Vec)> - where - F: FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>, - { - use crate::shim::clock::EPOCH_DURATION_SECONDS; - - let mut parent_state = *self.tipset.parent_state(); - let parent_epoch = self - .chain_index - .load_required_tipset(self.tipset.parents())? - .epoch(); - let epoch = self.tipset.epoch(); - - for epoch_i in parent_epoch..epoch { - if epoch_i > parent_epoch { - let timestamp = genesis_timestamp + ((EPOCH_DURATION_SECONDS * epoch_i) as u64); - parent_state = stacker::grow(64 << 20, || -> anyhow::Result { - let mut vm = - self.create_vm(parent_state, epoch_i, timestamp, null_epoch_trace)?; - if let Err(e) = vm.run_cron(epoch_i, cron_callback.as_mut()) { - error!("Beginning of epoch cron failed to run: {e:#}"); - return Err(e); - } - vm.flush() - })?; - } - if let Some(new_state) = run_state_migrations( - epoch_i, - &self.chain_config, - self.chain_index.db(), - &parent_state, - )? { - parent_state = new_state; - } - } - - let block_messages = BlockMessages::for_tipset(self.chain_index.db(), &self.tipset)?; - Ok((parent_state, epoch, block_messages)) - } -} - -/// Messages are transactions that produce new states. The state (usually -/// referred to as the 'state-tree') is a mapping from actor addresses to actor -/// states. Each block contains the hash of the state-tree that should be used -/// as the starting state when executing the block messages. -/// -/// # Execution environment -/// -/// Transaction execution has the following inputs: -/// - a current state-tree (stored as IPLD in a key-value database). This -/// reference is in [`Tipset::parent_state`]. -/// - up to 900 past state-trees. See -/// . -/// - up to 900 past tipset IDs. -/// - a deterministic source of randomness. -/// - the circulating supply of FIL (see -/// ). The circulating -/// supply is determined by the epoch and the states of a few key actors. -/// - the base fee (see ). -/// This value is defined by `tipset.parent_base_fee`. -/// - the genesis timestamp (UNIX epoch time when the first block was -/// mined/created). -/// - a chain configuration (maps epoch to network version, has chain specific -/// settings). -/// -/// The result of running a set of block messages is an index to the final -/// state-tree and an index to an array of message receipts (listing gas used, -/// return codes, etc). -/// -/// # Cron and null tipsets -/// -/// Once per epoch, after all messages have run, a special 'cron' transaction -/// must be executed. The tasks of the 'cron' transaction include running batch -/// jobs and keeping the state up-to-date with the current epoch. -/// -/// It can happen that no blocks are mined in an epoch. The tipset for such an -/// epoch is called a null tipset. A null tipset has no identity and cannot be -/// directly executed. This is a problem for 'cron' which must run for every -/// epoch, even if there are no messages. The fix is to run 'cron' if there are -/// any null tipsets between the current epoch and the parent epoch. -/// -/// Imagine the blockchain looks like this with a null tipset at epoch 9: -/// -/// ```text -/// ┌────────┐ ┌────┐ ┌───────┐ ┌───────┐ -/// │Epoch 10│ │Null│ │Epoch 8├──►Epoch 7├─► -/// └───┬────┘ └────┘ └───▲───┘ └───────┘ -/// └─────────────────┘ -/// ``` -/// -/// The parent of tipset-epoch-10 is tipset-epoch-8. Before executing the -/// messages in epoch 10, we have to run cron for epoch 9. However, running -/// 'cron' requires the timestamp of the youngest block in the tipset (which -/// doesn't exist because there are no blocks in the tipset). Lotus dictates that -/// the timestamp of a null tipset is `30s * epoch` after the genesis timestamp. -/// So, in the above example, if the genesis block was mined at time `X`, the -/// null tipset for epoch 9 will have timestamp `X + 30 * 9`. -/// -/// # Migrations -/// -/// Migrations happen between network upgrades and modify the state tree. If a -/// migration is scheduled for epoch 10, it will be run _after_ the messages for -/// epoch 10. The tipset for epoch 11 will link the state-tree produced by the -/// migration. -/// -/// Example timeline with a migration at epoch 10: -/// 1. Tipset-epoch-10 executes, producing state-tree A. -/// 2. Migration consumes state-tree A and produces state-tree B. -/// 3. Tipset-epoch-11 executes, consuming state-tree B (rather than A). -/// -/// Note: The migration actually happens when tipset-epoch-11 executes. This is -/// because tipset-epoch-10 may be null and therefore not executed at all. -/// -/// # Caching -/// -/// Scanning the blockchain to find past tipsets and state-trees may be slow. -/// The `ChainStore` caches recent tipsets to make these scans faster. -#[allow(clippy::too_many_arguments)] -pub fn apply_block_messages( - genesis_timestamp: u64, - chain_index: ChainIndex, - chain_config: Arc, - beacon: Arc, - engine: &MultiEngine, - tipset: Tipset, - mut callback: Option) -> anyhow::Result<()>>, - enable_tracing: VMTrace, -) -> anyhow::Result -where - DB: Blockstore + Send + Sync + 'static, -{ - // This function will: - // 1. handle the genesis block as a special case - // 2. run 'cron' for any null-tipsets between the current tipset and our parent tipset - // 3. run migrations - // 4. execute block messages - // 5. write the state-tree to the DB and return the CID - - // step 1: special case for genesis block - if tipset.epoch() == 0 { - // NB: This is here because the process that executes blocks requires that the - // block miner reference a valid miner in the state tree. Unless we create some - // magical genesis miner, this won't work properly, so we short circuit here - // This avoids the question of 'who gets paid the genesis block reward' - let message_receipts = tipset.min_ticket_block().message_receipts; - return Ok(ExecutedTipset { - state_root: *tipset.parent_state(), - receipt_root: message_receipts, - executed_messages: vec![].into(), - }); - } - - let exec = TipsetExecutor::new( - chain_index.shallow_clone(), - chain_config, - beacon, - engine, - tipset.shallow_clone(), - ); - - // step 2: running cron for any null-tipsets - // step 3: run migrations - let (parent_state, epoch, block_messages) = - exec.prepare_parent_state(genesis_timestamp, enable_tracing, &mut callback)?; - - // FVM requires a stack size of 64MiB. The alternative is to use `ThreadedExecutor` from - // FVM, but that introduces some constraints, and possible deadlocks. - stacker::grow(64 << 20, || -> anyhow::Result { - let mut vm = exec.create_vm(parent_state, epoch, tipset.min_timestamp(), enable_tracing)?; - - // step 4: apply tipset messages - let (receipts, events, events_roots) = - vm.apply_block_messages(&block_messages, epoch, callback)?; - - // step 5: construct receipt root from receipts - let receipt_root = Amtv0::new_from_iter(chain_index.db(), receipts.iter())?; - - // step 6: store events AMTs in the blockstore - for (events, events_root) in events.iter().zip(events_roots.iter()) { - if let Some(events) = events { - let event_root = - events_root.context("events root should be present when events present")?; - // Store the events AMT - the root CID should match the one computed by FVM - let derived_event_root = Amt::new_from_iter_with_bit_width( - chain_index.db(), - EVENTS_AMT_BITWIDTH, - events.iter(), - ) - .map_err(|e| Error::Other(format!("failed to store events AMT: {e}")))?; - - // Verify the stored root matches the FVM-computed root - ensure!( - derived_event_root == event_root, - "Events AMT root mismatch: derived={derived_event_root}, actual={event_root}." - ); - } - } - - let state_root = vm.flush()?; - - // Update executed tipset cache - let messages: Vec = block_messages - .into_iter() - .flat_map(|bm| bm.messages) - .collect_vec(); - anyhow::ensure!( - messages.len() == receipts.len() && messages.len() == events.len(), - "length of messages, receipts, and events should match", - ); - Ok(ExecutedTipset { - state_root, - receipt_root, - executed_messages: messages - .into_iter() - .zip(receipts) - .zip(events) - .map(|((message, receipt), events)| ExecutedMessage { - message, - receipt, - events, - }) - .collect_vec() - .into(), - }) - }) -} - -#[allow(clippy::too_many_arguments)] -pub fn compute_state( - _height: ChainEpoch, - messages: Vec, - tipset: Tipset, - genesis_timestamp: u64, - chain_index: ChainIndex, - chain_config: Arc, - beacon: Arc, - engine: &MultiEngine, - callback: Option) -> anyhow::Result<()>>, - enable_tracing: VMTrace, -) -> anyhow::Result -where - DB: Blockstore + Send + Sync + 'static, -{ - if !messages.is_empty() { - anyhow::bail!("Applying messages is not yet implemented."); - } - - let output = apply_block_messages( - genesis_timestamp, - chain_index, - chain_config, - beacon, - engine, - tipset, - callback, - enable_tracing, - )?; - - Ok(output) -} - -/// Controls whether the VM should flush its state after execution -#[derive(Debug, Copy, Clone, Default)] -pub enum VMFlush { - Flush, - #[default] - Skip, -} diff --git a/src/state_manager/state_computation.rs b/src/state_manager/state_computation.rs new file mode 100644 index 00000000000..f29429aabaf --- /dev/null +++ b/src/state_manager/state_computation.rs @@ -0,0 +1,570 @@ +// Copyright 2019-2026 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use super::circulating_supply::GenesisInfo; +use super::*; +use crate::interpreter::{BlockMessages, ExecutionContext, VM, VMTrace}; +use crate::shim::message::Message; +use crate::state_migration::run_state_migrations; +use crate::utils::ShallowClone as _; +use anyhow::{Context as _, bail, ensure}; +use fil_actors_shared::fvm_ipld_amt::{Amt, Amtv0}; +use itertools::Itertools as _; +use rayon::prelude::ParallelBridge; +use tracing::{error, info, instrument}; + +impl StateManager +where + DB: Blockstore + Send + Sync + 'static, +{ + /// Load the state of a tipset, including state root, message receipts + pub async fn load_tipset_state(self: &Arc, ts: &Tipset) -> anyhow::Result { + if let Some(state) = self.tipset_state_cache().get_map(ts.key(), |et| et.into()) { + Ok(state) + } else if let Ok(receipt_ts) = self.chain_store().load_child_tipset(ts) { + Ok(TipsetState { + state_root: *receipt_ts.parent_state(), + receipt_root: *receipt_ts.parent_message_receipts(), + }) + } else { + Ok(self.load_executed_tipset(ts).await?.into()) + } + } + + /// Load an executed tipset, including state root, message receipts and events with caching. + pub async fn load_executed_tipset( + self: &Arc, + ts: &Tipset, + ) -> anyhow::Result { + self.tipset_state_cache() + .get_or_else(ts.key(), || async move { + let receipt_ts = self.chain_store().load_child_tipset(ts).ok(); + self.load_executed_tipset_inner(ts, receipt_ts.as_ref()) + .await + }) + .await + } + + async fn load_executed_tipset_inner( + self: &Arc, + msg_ts: &Tipset, + // when `msg_ts` is the current head, `receipt_ts` is `None` + receipt_ts: Option<&Tipset>, + ) -> anyhow::Result { + if let Some(receipt_ts) = receipt_ts { + anyhow::ensure!( + msg_ts.key() == receipt_ts.parents(), + "message tipset should be the parent of message receipt tipset" + ); + } + let mut recomputed = false; + let (state_root, receipt_root, receipts) = match receipt_ts.and_then(|ts| { + let receipt_root = *ts.parent_message_receipts(); + Receipt::get_receipts(self.blockstore(), receipt_root) + .ok() + .map(|r| (*ts.parent_state(), receipt_root, r)) + }) { + Some((state_root, receipt_root, receipts)) => (state_root, receipt_root, receipts), + None => { + let state_output = self + .compute_tipset_state(msg_ts.shallow_clone(), NO_CALLBACK, VMTrace::NotTraced) + .await?; + recomputed = true; + ( + state_output.state_root, + state_output.receipt_root, + Receipt::get_receipts(self.blockstore(), state_output.receipt_root)?, + ) + } + }; + + let messages = self.chain_store().messages_for_tipset(msg_ts)?; + anyhow::ensure!( + messages.len() == receipts.len(), + "mismatching message and receipt counts ({} messages, {} receipts)", + messages.len(), + receipts.len() + ); + let mut executed_messages = Vec::with_capacity(messages.len()); + for (message, receipt) in messages.iter().cloned().zip(receipts.into_iter()) { + let events = if let Some(events_root) = receipt.events_root() { + Some( + match StampedEvent::get_events(self.blockstore(), &events_root) { + Ok(events) => events, + Err(e) if recomputed => return Err(e), + Err(_) => { + self.compute_tipset_state( + msg_ts.shallow_clone(), + NO_CALLBACK, + VMTrace::NotTraced, + ) + .await?; + recomputed = true; + StampedEvent::get_events(self.blockstore(), &events_root)? + } + }, + ) + } else { + None + }; + executed_messages.push(ExecutedMessage { + message, + receipt, + events, + }); + } + Ok(ExecutedTipset { + state_root, + receipt_root, + executed_messages: Arc::new(executed_messages), + }) + } + + /// Conceptually, a [`Tipset`] consists of _blocks_ which share an _epoch_. + /// Each _block_ contains _messages_, which are executed by the _Filecoin Virtual Machine_. + /// + /// For details, see the documentation for [`apply_block_messages`]. + pub async fn compute_tipset_state( + self: &Arc, + tipset: Tipset, + callback: Option) -> anyhow::Result<()> + Send + 'static>, + enable_tracing: VMTrace, + ) -> Result { + let this = Arc::clone(self); + tokio::task::spawn_blocking(move || { + this.compute_tipset_state_blocking(tipset, callback, enable_tracing) + }) + .await? + } + + /// Blocking version of `compute_tipset_state` + pub fn compute_tipset_state_blocking( + &self, + tipset: Tipset, + callback: Option) -> anyhow::Result<()>>, + enable_tracing: VMTrace, + ) -> Result { + let epoch = tipset.epoch(); + let has_callback = callback.is_some(); + info!( + "Evaluating tipset: EPOCH={epoch}, blocks={}, tsk={}", + tipset.len(), + tipset.key(), + ); + Ok(apply_block_messages( + self.chain_store().genesis_block_header().timestamp, + self.chain_index().shallow_clone(), + self.chain_config().shallow_clone(), + self.beacon_schedule().shallow_clone(), + self.engine(), + tipset, + callback, + enable_tracing, + ) + .map_err(|e| { + if has_callback { + e + } else { + e.context(format!("Failed to compute tipset state@{epoch}")) + } + })?) + } + + #[instrument(skip_all)] + pub async fn compute_state( + self: &Arc, + height: ChainEpoch, + messages: Vec, + tipset: Tipset, + callback: Option) -> anyhow::Result<()> + Send + 'static>, + enable_tracing: VMTrace, + ) -> Result { + let this = Arc::clone(self); + tokio::task::spawn_blocking(move || { + this.compute_state_blocking(height, messages, tipset, callback, enable_tracing) + }) + .await? + } + + /// Blocking version of `compute_state` + #[tracing::instrument(skip_all)] + pub fn compute_state_blocking( + &self, + height: ChainEpoch, + messages: Vec, + tipset: Tipset, + callback: Option) -> anyhow::Result<()>>, + enable_tracing: VMTrace, + ) -> Result { + Ok(compute_state( + height, + messages, + tipset, + self.chain_store().genesis_block_header().timestamp, + self.chain_index().shallow_clone(), + self.chain_config().shallow_clone(), + self.beacon_schedule().shallow_clone(), + self.engine(), + callback, + enable_tracing, + )?) + } +} + +pub fn validate_tipsets( + genesis_timestamp: u64, + chain_index: &ChainIndex, + chain_config: &Arc, + beacon: &Arc, + engine: &MultiEngine, + tipsets: T, +) -> anyhow::Result<()> +where + DB: Blockstore + Send + Sync + 'static, + T: Iterator + Send, +{ + use rayon::iter::ParallelIterator as _; + tipsets + .tuple_windows() + .par_bridge() + .try_for_each(|(child, parent)| { + info!(height = parent.epoch(), "compute parent state"); + let ExecutedTipset { + state_root: actual_state, + receipt_root: actual_receipt, + .. + } = apply_block_messages( + genesis_timestamp, + chain_index.shallow_clone(), + chain_config.shallow_clone(), + beacon.shallow_clone(), + engine, + parent, + NO_CALLBACK, + VMTrace::NotTraced, + ) + .context("couldn't compute tipset state")?; + let expected_receipt = child.min_ticket_block().message_receipts; + let expected_state = child.parent_state(); + match (expected_state, expected_receipt) == (&actual_state, actual_receipt) { + true => Ok(()), + false => { + error!( + height = child.epoch(), + ?expected_state, + ?expected_receipt, + ?actual_state, + ?actual_receipt, + "state mismatch" + ); + bail!("state mismatch"); + } + } + }) +} + +/// Shared context for creating VMs and preparing tipset state. +/// +/// Encapsulates randomness source, genesis info, VM construction, +/// null-epoch cron handling, and state migrations. +pub(in crate::state_manager) struct TipsetExecutor<'a, DB: Blockstore + Send + Sync + 'static> { + tipset: Tipset, + rand: ChainRand, + chain_config: Arc, + chain_index: ChainIndex, + genesis_info: GenesisInfo, + engine: &'a MultiEngine, +} + +impl<'a, DB: Blockstore + Send + Sync + 'static> TipsetExecutor<'a, DB> { + pub(in crate::state_manager) fn new( + chain_index: ChainIndex, + chain_config: Arc, + beacon: Arc, + engine: &'a MultiEngine, + tipset: Tipset, + ) -> Self { + let rand = ChainRand::new( + chain_config.shallow_clone(), + tipset.shallow_clone(), + chain_index.shallow_clone(), + beacon, + ); + let genesis_info = GenesisInfo::from_chain_config(chain_config.shallow_clone()); + Self { + tipset, + rand, + chain_config, + chain_index, + genesis_info, + engine, + } + } + + pub(in crate::state_manager) fn create_vm( + &self, + state_root: Cid, + epoch: ChainEpoch, + timestamp: u64, + trace: VMTrace, + ) -> anyhow::Result> { + let circ_supply = self.genesis_info.get_vm_circulating_supply( + epoch, + self.chain_index.db(), + &state_root, + )?; + VM::new( + ExecutionContext { + heaviest_tipset: self.tipset.shallow_clone(), + state_tree_root: state_root, + epoch, + rand: Box::new(self.rand.shallow_clone()), + base_fee: self.tipset.min_ticket_block().parent_base_fee.clone(), + circ_supply, + chain_config: self.chain_config.shallow_clone(), + chain_index: self.chain_index.shallow_clone(), + timestamp, + }, + self.engine, + trace, + ) + } + + /// Produces the state root ready for message execution by running + /// null-epoch `crons` and any pending state migrations. + pub(in crate::state_manager) fn prepare_parent_state( + &self, + genesis_timestamp: u64, + null_epoch_trace: VMTrace, + cron_callback: &mut Option, + ) -> anyhow::Result<(Cid, ChainEpoch, Vec)> + where + F: FnMut(MessageCallbackCtx<'_>) -> anyhow::Result<()>, + { + use crate::shim::clock::EPOCH_DURATION_SECONDS; + + let mut parent_state = *self.tipset.parent_state(); + let parent_epoch = self + .chain_index + .load_required_tipset(self.tipset.parents())? + .epoch(); + let epoch = self.tipset.epoch(); + + for epoch_i in parent_epoch..epoch { + if epoch_i > parent_epoch { + let timestamp = genesis_timestamp + ((EPOCH_DURATION_SECONDS * epoch_i) as u64); + parent_state = stacker::grow(64 << 20, || -> anyhow::Result { + let mut vm = + self.create_vm(parent_state, epoch_i, timestamp, null_epoch_trace)?; + if let Err(e) = vm.run_cron(epoch_i, cron_callback.as_mut()) { + error!("Beginning of epoch cron failed to run: {e:#}"); + return Err(e); + } + vm.flush() + })?; + } + if let Some(new_state) = run_state_migrations( + epoch_i, + &self.chain_config, + self.chain_index.db(), + &parent_state, + )? { + parent_state = new_state; + } + } + + let block_messages = BlockMessages::for_tipset(self.chain_index.db(), &self.tipset)?; + Ok((parent_state, epoch, block_messages)) + } +} + +/// Messages are transactions that produce new states. The state (usually +/// referred to as the 'state-tree') is a mapping from actor addresses to actor +/// states. Each block contains the hash of the state-tree that should be used +/// as the starting state when executing the block messages. +/// +/// # Execution environment +/// +/// Transaction execution has the following inputs: +/// - a current state-tree (stored as IPLD in a key-value database). This +/// reference is in [`Tipset::parent_state`]. +/// - up to 900 past state-trees. See +/// . +/// - up to 900 past tipset IDs. +/// - a deterministic source of randomness. +/// - the circulating supply of FIL (see +/// ). The circulating +/// supply is determined by the epoch and the states of a few key actors. +/// - the base fee (see ). +/// This value is defined by `tipset.parent_base_fee`. +/// - the genesis timestamp (UNIX epoch time when the first block was +/// mined/created). +/// - a chain configuration (maps epoch to network version, has chain specific +/// settings). +/// +/// The result of running a set of block messages is an index to the final +/// state-tree and an index to an array of message receipts (listing gas used, +/// return codes, etc). +/// +/// # Cron and null tipsets +/// +/// Once per epoch, after all messages have run, a special 'cron' transaction +/// must be executed. The tasks of the 'cron' transaction include running batch +/// jobs and keeping the state up-to-date with the current epoch. +/// +/// It can happen that no blocks are mined in an epoch. The tipset for such an +/// epoch is called a null tipset. A null tipset has no identity and cannot be +/// directly executed. This is a problem for 'cron' which must run for every +/// epoch, even if there are no messages. The fix is to run 'cron' if there are +/// any null tipsets between the current epoch and the parent epoch. +/// +/// Imagine the blockchain looks like this with a null tipset at epoch 9: +/// +/// ```text +/// ┌────────┐ ┌────┐ ┌───────┐ ┌───────┐ +/// │Epoch 10│ │Null│ │Epoch 8├──►Epoch 7├─► +/// └───┬────┘ └────┘ └───▲───┘ └───────┘ +/// └─────────────────┘ +/// ``` +/// +/// The parent of tipset-epoch-10 is tipset-epoch-8. Before executing the +/// messages in epoch 10, we have to run cron for epoch 9. However, running +/// 'cron' requires the timestamp of the youngest block in the tipset (which +/// doesn't exist because there are no blocks in the tipset). Lotus dictates that +/// the timestamp of a null tipset is `30s * epoch` after the genesis timestamp. +/// So, in the above example, if the genesis block was mined at time `X`, the +/// null tipset for epoch 9 will have timestamp `X + 30 * 9`. +/// +/// # Migrations +/// +/// Migrations happen between network upgrades and modify the state tree. If a +/// migration is scheduled for epoch 10, it will be run _after_ the messages for +/// epoch 10. The tipset for epoch 11 will link the state-tree produced by the +/// migration. +#[allow(clippy::too_many_arguments)] +pub fn apply_block_messages( + genesis_timestamp: u64, + chain_index: ChainIndex, + chain_config: Arc, + beacon: Arc, + engine: &MultiEngine, + tipset: Tipset, + mut callback: Option) -> anyhow::Result<()>>, + enable_tracing: VMTrace, +) -> anyhow::Result +where + DB: Blockstore + Send + Sync + 'static, +{ + // step 1: special case for genesis block + if tipset.epoch() == 0 { + let message_receipts = tipset.min_ticket_block().message_receipts; + return Ok(ExecutedTipset { + state_root: *tipset.parent_state(), + receipt_root: message_receipts, + executed_messages: vec![].into(), + }); + } + + let exec = TipsetExecutor::new( + chain_index.shallow_clone(), + chain_config, + beacon, + engine, + tipset.shallow_clone(), + ); + + // step 2: running cron for any null-tipsets + // step 3: run migrations + let (parent_state, epoch, block_messages) = + exec.prepare_parent_state(genesis_timestamp, enable_tracing, &mut callback)?; + + // FVM requires a stack size of 64MiB. The alternative is to use `ThreadedExecutor` from + // FVM, but that introduces some constraints, and possible deadlocks. + stacker::grow(64 << 20, || -> anyhow::Result { + let mut vm = exec.create_vm(parent_state, epoch, tipset.min_timestamp(), enable_tracing)?; + + // step 4: apply tipset messages + let (receipts, events, events_roots) = + vm.apply_block_messages(&block_messages, epoch, callback)?; + + // step 5: construct receipt root from receipts + let receipt_root = Amtv0::new_from_iter(chain_index.db(), receipts.iter())?; + + // step 6: store events AMTs in the blockstore + for (events, events_root) in events.iter().zip(events_roots.iter()) { + if let Some(events) = events { + let event_root = + events_root.context("events root should be present when events present")?; + let derived_event_root = Amt::new_from_iter_with_bit_width( + chain_index.db(), + EVENTS_AMT_BITWIDTH, + events.iter(), + ) + .map_err(|e| Error::Other(format!("failed to store events AMT: {e}")))?; + + ensure!( + derived_event_root == event_root, + "Events AMT root mismatch: derived={derived_event_root}, actual={event_root}." + ); + } + } + + let state_root = vm.flush()?; + + let messages: Vec = block_messages + .into_iter() + .flat_map(|bm| bm.messages) + .collect_vec(); + anyhow::ensure!( + messages.len() == receipts.len() && messages.len() == events.len(), + "length of messages, receipts, and events should match", + ); + Ok(ExecutedTipset { + state_root, + receipt_root, + executed_messages: messages + .into_iter() + .zip(receipts) + .zip(events) + .map(|((message, receipt), events)| ExecutedMessage { + message, + receipt, + events, + }) + .collect_vec() + .into(), + }) + }) +} + +#[allow(clippy::too_many_arguments)] +pub(in crate::state_manager) fn compute_state( + _height: ChainEpoch, + messages: Vec, + tipset: Tipset, + genesis_timestamp: u64, + chain_index: ChainIndex, + chain_config: Arc, + beacon: Arc, + engine: &MultiEngine, + callback: Option) -> anyhow::Result<()>>, + enable_tracing: VMTrace, +) -> anyhow::Result +where + DB: Blockstore + Send + Sync + 'static, +{ + if !messages.is_empty() { + anyhow::bail!("Applying messages is not yet implemented."); + } + + apply_block_messages( + genesis_timestamp, + chain_index, + chain_config, + beacon, + engine, + tipset, + callback, + enable_tracing, + ) +}