diff --git a/Cargo.lock b/Cargo.lock index 6abaf0bacd..fa0511c90c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1803,6 +1803,17 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "json5" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96b0db21af676c1ce64250b5f40f3ce2cf27e4e47cb91ed91eb6fe9350b430c1" +dependencies = [ + "pest", + "pest_derive", + "serde", +] + [[package]] name = "k256" version = "0.14.0-rc.8" @@ -2237,6 +2248,49 @@ version = "2.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" +[[package]] +name = "pest" +version = "2.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0848c601009d37dfa3430c4666e147e49cdcf1b92ecd3e63657d8a5f19da662" +dependencies = [ + "memchr", + "ucd-trie", +] + +[[package]] +name = "pest_derive" +version = "2.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11f486f1ea21e6c10ed15d5a7c77165d0ee443402f0780849d1768e7d9d6fe77" +dependencies = [ + "pest", + "pest_generator", +] + +[[package]] +name = "pest_generator" +version = "2.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8040c4647b13b210a963c1ed407c1ff4fdfa01c31d6d2a098218702e6664f94f" +dependencies = [ + "pest", + "pest_meta", + "proc-macro2", + "quote 1.0.45", + "syn 2.0.117", +] + +[[package]] +name = "pest_meta" +version = "2.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89815c69d36021a140146f26659a81d6c2afa33d216d736dd4be5381a7362220" +dependencies = [ + "pest", + "sha2 0.10.9", +] + [[package]] name = "pin-project-lite" version = "0.2.17" @@ -3878,6 +3932,7 @@ dependencies = [ "snarkvm-ledger-narwhal-batch-certificate", "snarkvm-ledger-puzzle", "snarkvm-ledger-test-helpers", + "snarkvm-slipstream-plugin-manager", "snarkvm-synthesizer-program", "snarkvm-synthesizer-snark", "snarkvm-utilities", @@ -3942,6 +3997,30 @@ dependencies = [ "web-sys", ] +[[package]] +name = "snarkvm-slipstream-plugin-interface" +version = "4.6.0" +dependencies = [ + "anyhow", + "tracing", +] + +[[package]] +name = "snarkvm-slipstream-plugin-manager" +version = "4.6.0" +dependencies = [ + "anyhow", + "json5", + "libloading", + "locktick", + "parking_lot", + "serde_json", + "snarkvm-slipstream-plugin-interface", + "thiserror 2.0.18", + "tokio", + "tracing", +] + [[package]] name = "snarkvm-synthesizer" version = "4.6.0" @@ -4610,6 +4689,12 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" +[[package]] +name = "ucd-trie" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2896d95c02a80c6d6a5d6e953d479f5ddf2dfdb6a244441010e373ac0fb88971" + [[package]] name = "unarray" version = "0.1.4" diff --git a/Cargo.toml b/Cargo.toml index b3c5a2edba..d6ed662360 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,7 +87,9 @@ members = [ "synthesizer/snark", "utilities", "utilities/derives", - "wasm" + "wasm", + "plugins/slipstream_plugin_interface", + "plugins/slipstream_plugin_manager" ] [lib] @@ -129,6 +131,7 @@ async = [ "snarkvm-ledger/async", "snarkvm-synthesizer/async" ] cuda = [ "snarkvm-algorithms/cuda" ] history = [ "snarkvm-synthesizer/history" ] history-staking-rewards = [ "snarkvm-synthesizer/history-staking-rewards" ] +slipstream-plugins = [ "snarkvm-synthesizer/slipstream-plugins" ] parameters_no_std_out = [ "snarkvm-parameters/no_std_out" ] locktick = [ "snarkvm-console?/locktick", @@ -390,6 +393,14 @@ default-features = false path = "ledger/store" version = "=4.6.0" +[workspace.dependencies.snarkvm-slipstream-plugin-interface] +path = "plugins/slipstream_plugin_interface" +version = "=4.6.0" + +[workspace.dependencies.snarkvm-slipstream-plugin-manager] +path = "plugins/slipstream_plugin_manager" +version = "=4.6.0" + [workspace.dependencies.snarkvm-ledger-test-helpers] path = "ledger/test-helpers" version = "=4.6.0" diff --git a/ledger/Cargo.toml b/ledger/Cargo.toml index 911958234c..dfe415efc7 100644 --- a/ledger/Cargo.toml +++ b/ledger/Cargo.toml @@ -70,6 +70,10 @@ history-staking-rewards = [ "snarkvm-ledger-store/history-staking-rewards", "snarkvm-synthesizer/history-staking-rewards", ] +slipstream-plugins = [ + "snarkvm-ledger-store/slipstream-plugins", + "snarkvm-synthesizer/slipstream-plugins", +] locktick = [ "dep:locktick", "snarkvm-ledger-puzzle/locktick", diff --git a/ledger/store/Cargo.toml b/ledger/store/Cargo.toml index fb200f5491..22ee839630 100644 --- a/ledger/store/Cargo.toml +++ b/ledger/store/Cargo.toml @@ -20,6 +20,7 @@ edition = "2024" default = [ "indexmap/rayon" ] history = [ ] history-staking-rewards = [ ] +slipstream-plugins = [ "dep:snarkvm-slipstream-plugin-manager" ] locktick = [ "dep:locktick", "snarkvm-ledger-puzzle/locktick" ] rocks = [ "rocksdb", "smallvec" ] serial = [ @@ -42,6 +43,10 @@ wasm = [ ] test = [ ] +[dependencies.snarkvm-slipstream-plugin-manager] +workspace = true +optional = true + [dependencies.snarkvm-console] workspace = true diff --git a/ledger/store/src/program/finalize.rs b/ledger/store/src/program/finalize.rs index 347286f044..64dfbe1a5e 100644 --- a/ledger/store/src/program/finalize.rs +++ b/ledger/store/src/program/finalize.rs @@ -18,7 +18,7 @@ use crate::{ helpers::{Map, MapRead, NestedMap, NestedMapRead}, program::{CommitteeStorage, CommitteeStore}, }; -#[cfg(feature = "history-staking-rewards")] +#[cfg(any(feature = "history-staking-rewards", feature = "slipstream-plugins"))] use console::types::Address; use console::{ network::prelude::*, @@ -31,12 +31,28 @@ use aleo_std_storage::StorageMode; use anyhow::Result; use core::marker::PhantomData; use indexmap::IndexSet; -#[cfg(feature = "history")] +#[cfg(all(feature = "slipstream-plugins", feature = "locktick"))] +use locktick::parking_lot::RwLock; +#[cfg(all(feature = "slipstream-plugins", not(feature = "locktick")))] +use parking_lot::RwLock; +#[cfg(feature = "slipstream-plugins")] +use snarkvm_slipstream_plugin_manager::{BroadcastEvent, BroadcastEventKind, SlipstreamPluginManager}; +#[cfg(feature = "slipstream-plugins")] +use std::sync::{Arc, atomic::AtomicBool}; +#[cfg(any(feature = "history", feature = "history-staking-rewards", feature = "slipstream-plugins"))] use std::{ borrow::Cow, sync::atomic::{AtomicU32, Ordering}, }; +/// Serialized form of a mapping replacement, captured before storage consumes the entries. +#[cfg(feature = "slipstream-plugins")] +struct SerializedMappingEntries { + program_id: Vec, + mapping_name: Vec, + entries: Vec<(Vec, Vec)>, +} + /// TODO (howardwu): Remove this. /// Returns the mapping ID for the given `program ID` and `mapping name`. fn to_mapping_id(program_id: &ProgramID, mapping_name: &Identifier) -> Result> { @@ -654,6 +670,19 @@ pub struct FinalizeStore> { storage: P, /// PhantomData. _phantom: PhantomData, + /// Indicates that canonical finalize is currently in progress. + /// When `true`, storage writes notify registered Slipstream plugins. + #[cfg(feature = "slipstream-plugins")] + is_finalize_mode: Arc, + /// Tracks the current block height for Slipstream plugin notifications. + /// Updated by the VM at the start of each canonical finalize + #[cfg(feature = "slipstream-plugins")] + slipstream_block_height: Arc, + /// Optional plugin manager for streaming canonical mapping and staking updates. + /// Wrapped in `Arc` so that all clones of `FinalizeStore` share the same instance; + /// the `RwLock` allows installation from a shared reference after construction. + #[cfg(feature = "slipstream-plugins")] + slipstream_plugin_manager: Arc>>, } impl> FinalizeStore { @@ -665,7 +694,16 @@ impl> FinalizeStore { /// Initializes a finalize store from storage. pub fn from(storage: P) -> Result { // Return the finalize store. - Ok(Self { storage, _phantom: PhantomData }) + Ok(Self { + storage, + _phantom: PhantomData, + #[cfg(feature = "slipstream-plugins")] + is_finalize_mode: Arc::new(AtomicBool::new(false)), + #[cfg(feature = "slipstream-plugins")] + slipstream_block_height: Arc::new(AtomicU32::new(0)), + #[cfg(feature = "slipstream-plugins")] + slipstream_plugin_manager: Arc::new(RwLock::new(None)), + }) } /// Starts an atomic batch write operation. @@ -714,6 +752,76 @@ impl> FinalizeStore { self.storage.current_block_height() } + /// Returns a reference to the canonical finalize mode flag. + /// + /// When `true`, storage writes notify registered Slipstream plugins. + /// Set to `true` by the VM before canonical finalize runs and reset to `false` afterwards. + #[cfg(feature = "slipstream-plugins")] + pub fn is_finalize_mode(&self) -> &Arc { + &self.is_finalize_mode + } + + /// Returns the current block height for Slipstream plugin notifications. + #[cfg(feature = "slipstream-plugins")] + pub fn slipstream_block_height(&self) -> &AtomicU32 { + &self.slipstream_block_height + } + + /// Installs a Slipstream plugin manager to receive canonical mapping and staking updates. + /// + /// May be called from a shared reference. Logs a warning if called more than once. + #[cfg(feature = "slipstream-plugins")] + pub fn set_slipstream_plugin_manager(&self, manager: SlipstreamPluginManager) { + let mut guard = self.slipstream_plugin_manager.write(); + if guard.is_some() { + tracing::warn!("Slipstream plugin manager is already set; ignoring subsequent call."); + return; + } + *guard = Some(manager); + } + + /// Returns a handle to the Slipstream plugin manager cell. + /// + /// The returned `Arc` is a lightweight additional handle to the same shared instance; + /// acquire a read or write lock on it to inspect or replace the manager. + #[cfg(feature = "slipstream-plugins")] + pub fn slipstream_plugin_manager(&self) -> Arc>> { + Arc::clone(&self.slipstream_plugin_manager) + } + + /// Notifies all interested plugins of a staking reward, if canonical finalize is active. + /// + /// Errors from plugin calls are logged but never propagated. + #[cfg(feature = "slipstream-plugins")] + pub fn notify_staking_reward( + &self, + staker: &Address, + validator: &Address, + reward: u64, + new_stake: u64, + block_height: u32, + ) { + if !self.is_finalize_mode.load(Ordering::SeqCst) { + return; + } + + let spm_guard = self.slipstream_plugin_manager.read(); + if let Some(mgr) = spm_guard.as_ref() { + if mgr.has_subscribers(BroadcastEventKind::StakingReward) { + // Address serializes to a fixed 32-byte array; this cannot fail. + let staker_bytes = staker.to_bytes_le().expect("Address::to_bytes_le is infallible"); + let validator_bytes = validator.to_bytes_le().expect("Address::to_bytes_le is infallible"); + mgr.broadcast(BroadcastEvent::StakingReward { + staker: &staker_bytes, + validator: &validator_bytes, + reward, + new_stake, + block_height, + }); + } + } + } + /// Returns the historical value of a mapping. #[cfg(feature = "history")] pub fn get_historical_mapping_value( @@ -827,7 +935,46 @@ impl> FinalizeStoreTrait for FinalizeStore< key: Plaintext, value: Value, ) -> Result> { - self.storage.update_key_value(program_id, mapping_name, key, value) + // Serialize before moving, if a plugin notification may be needed. + #[cfg(feature = "slipstream-plugins")] + let plugin_data = if self.is_finalize_mode.load(Ordering::SeqCst) { + let spm_guard = self.slipstream_plugin_manager.read(); + if let Some(mgr) = spm_guard.as_ref() { + if mgr.has_subscribers(BroadcastEventKind::MappingUpdate) { + Some(( + program_id.to_bytes_le()?, + mapping_name.to_bytes_le()?, + key.to_bytes_le()?, + value.to_bytes_le()?, + )) + } else { + None + } + } else { + None + } + } else { + None + }; + + let result = self.storage.update_key_value(program_id, mapping_name, key, value)?; + + // Notify plugins of the update if in canonical finalize mode. + #[cfg(feature = "slipstream-plugins")] + if let Some((pid, mname, k, v)) = plugin_data { + let height = self.slipstream_block_height().load(Ordering::SeqCst); + let spm_guard = self.slipstream_plugin_manager.read(); + if let Some(mgr) = spm_guard.as_ref() { + mgr.broadcast(BroadcastEvent::MappingUpdate { + program_id: &pid, + mapping_name: &mname, + key: &k, + value: &v, + block_height: height, + }); + } + } + Ok(result) } /// Removes the key-value pair for the given `program ID`, `mapping name`, and `key` from storage. @@ -860,7 +1007,53 @@ impl> FinalizeStore { mapping_name: Identifier, entries: Vec<(Plaintext, Value)>, ) -> Result> { - self.storage.replace_mapping(program_id, mapping_name, entries) + // Serialize mapping identity and all entries before moving them into storage, + // so they are available for plugin notification after the storage call. + #[cfg(feature = "slipstream-plugins")] + let plugin_data: Option = if self.is_finalize_mode.load(Ordering::SeqCst) { + let spm_guard = self.slipstream_plugin_manager.read(); + if let Some(mgr) = spm_guard.as_ref() { + if mgr.has_subscribers(BroadcastEventKind::MappingUpdate) { + let mut entries_bytes = Vec::with_capacity(entries.len()); + for (key, value) in &entries { + entries_bytes.push((key.to_bytes_le()?, value.to_bytes_le()?)); + } + Some(SerializedMappingEntries { + program_id: program_id.to_bytes_le()?, + mapping_name: mapping_name.to_bytes_le()?, + entries: entries_bytes, + }) + } else { + None + } + } else { + None + } + } else { + None + }; + + let result = self.storage.replace_mapping(program_id, mapping_name, entries)?; + + // Notify plugins of each updated key-value pair if in canonical finalize mode. + #[cfg(feature = "slipstream-plugins")] + if let Some(data) = plugin_data { + let height = self.slipstream_block_height().load(Ordering::SeqCst); + let spm_guard = self.slipstream_plugin_manager.read(); + if let Some(mgr) = spm_guard.as_ref() { + for (k, v) in &data.entries { + mgr.broadcast(BroadcastEvent::MappingUpdate { + program_id: &data.program_id, + mapping_name: &data.mapping_name, + key: k, + value: v, + block_height: height, + }); + } + } + } + + Ok(result) } /// Removes the mapping for the given `program ID` and `mapping name` from storage, diff --git a/plugins/slipstream_plugin_interface/Cargo.toml b/plugins/slipstream_plugin_interface/Cargo.toml new file mode 100644 index 0000000000..c3f7aef0a8 --- /dev/null +++ b/plugins/slipstream_plugin_interface/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "snarkvm-slipstream-plugin-interface" +version = "4.6.0" +authors = [ "The Aleo Team " ] +description = "The SnarkVM Slipstream plugin interface." +homepage = "https://aleo.org" +repository = "https://github.com/ProvableHQ/snarkVM" +license = "Apache-2.0" +edition = "2024" + +[dependencies.anyhow] +workspace = true + +[dependencies.tracing] +workspace = true diff --git a/plugins/slipstream_plugin_interface/README.md b/plugins/slipstream_plugin_interface/README.md new file mode 100644 index 0000000000..ce098d897d --- /dev/null +++ b/plugins/slipstream_plugin_interface/README.md @@ -0,0 +1,91 @@ +# Aleo Slipstream Plugin Interface + +This crate enables a plugin to be added into a SnarkVM runtime to take actions at the time of +mapping updates at block finalization; for example, saving historical mapping state and staking +data to an external database. The plugin must implement the `SlipstreamPlugin` trait. See +`slipstream_plugin_interface.rs` for the full interface definition. + +> **Feature flag:** compile with `--features slipstream-plugins` to enable plugin support. +> Plugin callbacks fire only during **canonical finalize** — speculative and dry-run executions +> are never observed by plugins. + +# Components + +### `plugins/slipstream_plugin_interface` +Defines the `SlipstreamPlugin` trait — the interface all plugins must implement. + +| Method | Description | +|---|---| +| `on_load` / `on_unload` | Lifecycle hooks called on startup and shutdown | +| `subscribed_events` | Returns the event types a plugin subscribes to. Defaults to `&[]` — a plugin that does not override this method receives **no callbacks**. | +| `on_broadcast` | Called once per key-value update (and once per entry in a `replace_mapping` batch). Only fires for event kinds in the subscribed list. | + +### `plugins/slipstream_plugin_manager` +Manages loaded plugins and their backing `libloading::Library` handles. + +- **`LoadedSlipstreamPlugin`** — wrapper holding a boxed plugin + its name; implements `Deref`/`DerefMut` +- **`SlipstreamPluginManager`** + - `from_config_files` — takes a slice of config file paths and loads one plugin per file + - `load_plugin(path)` / `unload_plugin(name)` — load or unload a single plugin at runtime + - `unload()` — fires `on_unload()` on every plugin then drops the libraries; field declaration order guarantees all plugin code finishes executing before the backing `.so` is unmapped + - `has_subscribers()` — aggregate opt-in check; used internally to skip serialization when no plugin is interested in an event kind + - `broadcast()` — fan-out broadcast to all interested plugins + - `list_plugins()` — returns the names of all loaded plugins + +--- + +## Plugin Config File (JSON5) + +Each plugin requires a config file: +```json5 +{ + "libpath": "/path/to/libmy_plugin.so", // required; relative paths resolve from the config file's dir + "name": "my_plugin" // optional; overrides the plugin's name() return value +} +``` + +--- + +## Plugin Library Convention + +The shared library (`.so` / `.dylib` / `.dll`) must export a C function: +```rust +#[no_mangle] +pub extern "C" fn _create_plugin() -> *mut dyn SlipstreamPlugin { + Box::into_raw(Box::new(MyPlugin::new())) +} +``` + +--- + +## Broadcast Event Format + +All byte-slice fields in `BroadcastEvent` are serialized in **little-endian** format (via +`to_bytes_le()`). Plugin implementations must deserialize accordingly. + +--- + +## Startup + +`SlipstreamPluginManager::from_config_files()` takes a slice of config file paths and returns a +manager object. Install it into the `FinalizeStore` before the node begins processing blocks: + +```rust +let manager = SlipstreamPluginManager::from_config_files(&[ + PathBuf::from("/etc/aleo/plugins/my_plugin.json5"), +])?; +finalize_store.set_slipstream_plugin_manager(manager); +``` + +## Shutdown + +Call `manager.unload()` during graceful shutdown before aborting tasks. This fires `on_unload()` +on every plugin — the right place for flushing buffers, closing connections, etc.: + +```rust +if let Some(manager) = finalize_store.slipstream_plugin_manager().write().as_mut() { + manager.unload(); +} +``` + +> Errors from plugin callbacks (`on_broadcast`) are logged as warnings and never propagated — a misbehaving plugin will not crash the node. diff --git a/plugins/slipstream_plugin_interface/src/lib.rs b/plugins/slipstream_plugin_interface/src/lib.rs new file mode 100644 index 0000000000..14443eb9bb --- /dev/null +++ b/plugins/slipstream_plugin_interface/src/lib.rs @@ -0,0 +1,18 @@ +// Copyright (c) 2019-2026 Provable Inc. +// This file is part of the snarkVM library. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: + +// http://www.apache.org/licenses/LICENSE-2.0 + +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod slipstream_plugin_interface; + +pub use slipstream_plugin_interface::{BroadcastEvent, BroadcastEventKind}; diff --git a/plugins/slipstream_plugin_interface/src/slipstream_plugin_interface.rs b/plugins/slipstream_plugin_interface/src/slipstream_plugin_interface.rs new file mode 100644 index 0000000000..1ac50d40c7 --- /dev/null +++ b/plugins/slipstream_plugin_interface/src/slipstream_plugin_interface.rs @@ -0,0 +1,90 @@ +// Copyright (c) 2019-2026 Provable Inc. +// This file is part of the snarkVM library. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: + +// http://www.apache.org/licenses/LICENSE-2.0 + +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use anyhow::Result; +use std::any::Any; + +/// Discriminant-only companion to [`BroadcastEvent`], used by +/// [`SlipstreamPlugin::subscribed_events`] to declare which event types a plugin +/// wishes to receive without carrying data payloads. +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum BroadcastEventKind { + /// Mapping key-value update during canonical finalize. + MappingUpdate, + /// Staking reward distribution during canonical finalize. + StakingReward, +} + +/// A single event dispatched to plugins via [`SlipstreamPlugin::on_broadcast`]. +/// +/// All `&[u8]` fields carry little-endian byte representations of the +/// corresponding snarkVM console types (serialized via `ToBytes`). +/// +/// Derives `Copy` — every field is `Copy` (`&[u8]`, `u32`, `u64`) — so the +/// same value can be passed to multiple plugins in a dispatch loop without cloning. +#[derive(Copy, Clone, Debug)] +pub enum BroadcastEvent<'a> { + /// A mapping key-value pair was inserted or updated during canonical finalize. + MappingUpdate { program_id: &'a [u8], mapping_name: &'a [u8], key: &'a [u8], value: &'a [u8], block_height: u32 }, + /// A staking reward was distributed to a staker during canonical finalize. + StakingReward { staker: &'a [u8], validator: &'a [u8], reward: u64, new_stake: u64, block_height: u32 }, +} + +impl BroadcastEvent<'_> { + /// Returns the discriminant of this event. + pub fn kind(&self) -> BroadcastEventKind { + match self { + BroadcastEvent::MappingUpdate { .. } => BroadcastEventKind::MappingUpdate, + BroadcastEvent::StakingReward { .. } => BroadcastEventKind::StakingReward, + } + } +} + +/// The interface for Aleo Slipstream plugins. A plugin must implement +/// the `SlipstreamPlugin` trait to work with the runtime. In addition, +/// the dynamic library must export a `C` function `_create_plugin` that +/// creates the implementation of the plugin. +pub trait SlipstreamPlugin: Any + Send + Sync + std::fmt::Debug { + /// Returns the name of the plugin. + fn name(&self) -> &'static str; + + /// The callback called when a plugin is loaded by the system, used for + /// doing whatever initialization is required by the plugin. The + /// `_config_file` contains the name of the config file (JSON format) with + /// a `libpath` field indicating the full path of the shared library. + fn on_load(&mut self, _config_file: &str, _is_reload: bool) -> Result<()> { + Ok(()) + } + + /// The callback called right before a plugin is unloaded by the system. + /// Used for doing cleanup before unload. + fn on_unload(&mut self) {} + + /// Returns the event kinds this plugin subscribes to. + /// + /// The manager checks this before serializing and dispatching each event, + /// so plugins that return an empty slice pay no serialization cost. Defaults + /// to no subscriptions. + fn subscribed_events(&self) -> &[BroadcastEventKind] { + &[] + } + + /// Receives a single broadcast event from the plugin manager. + /// + /// Only invoked when the event's kind appears in [`subscribed_events`]. + fn on_broadcast(&self, _event: BroadcastEvent<'_>) -> Result<()> { + Ok(()) + } +} diff --git a/plugins/slipstream_plugin_manager/Cargo.toml b/plugins/slipstream_plugin_manager/Cargo.toml new file mode 100644 index 0000000000..fc65ac7c3f --- /dev/null +++ b/plugins/slipstream_plugin_manager/Cargo.toml @@ -0,0 +1,44 @@ +[package] +name = "snarkvm-slipstream-plugin-manager" +version = "4.6.0" +authors = [ "The Aleo Team " ] +description = "The SnarkVM Slipstream plugin manager." +homepage = "https://aleo.org" +repository = "https://github.com/ProvableHQ/snarkVM" +license = "Apache-2.0" +edition = "2024" + +[dependencies.snarkvm-slipstream-plugin-interface] +workspace = true + +[dependencies.anyhow] +workspace = true + +[dependencies.thiserror] +workspace = true + +[dependencies.tracing] +workspace = true + +[dependencies.serde_json] +workspace = true + +[dependencies.libloading] +version = "0.8" + +[dependencies.json5] +version = "0.4" + +[features] +locktick = ["dep:locktick"] + +[dependencies.locktick] +workspace = true +optional = true + +[dependencies.parking_lot] +workspace = true + +[dependencies.tokio] +version = "1" +features = [ "sync" ] diff --git a/plugins/slipstream_plugin_manager/src/lib.rs b/plugins/slipstream_plugin_manager/src/lib.rs new file mode 100644 index 0000000000..00365b6f08 --- /dev/null +++ b/plugins/slipstream_plugin_manager/src/lib.rs @@ -0,0 +1,27 @@ +// Copyright (c) 2019-2026 Provable Inc. +// This file is part of the snarkVM library. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: + +// http://www.apache.org/licenses/LICENSE-2.0 + +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[cfg(target_arch = "wasm32")] +compile_error!( + "snarkvm-slipstream-plugin-manager uses libloading for dynamic plugin \ + loading, which is not supported on wasm32 targets. Do not enable the \ + `history`, `history-staking-rewards`, or `slipstream-plugins` features \ + when targeting wasm32." +); + +pub mod slipstream_manager; + +pub use slipstream_manager::{LoadedSlipstreamPlugin, SlipstreamPluginManager}; +pub use snarkvm_slipstream_plugin_interface::{BroadcastEvent, BroadcastEventKind}; diff --git a/plugins/slipstream_plugin_manager/src/slipstream_manager.rs b/plugins/slipstream_plugin_manager/src/slipstream_manager.rs new file mode 100644 index 0000000000..e6d4261fa6 --- /dev/null +++ b/plugins/slipstream_plugin_manager/src/slipstream_manager.rs @@ -0,0 +1,521 @@ +// Copyright (c) 2019-2026 Provable Inc. +// This file is part of the snarkVM library. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: + +// http://www.apache.org/licenses/LICENSE-2.0 + +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use snarkvm_slipstream_plugin_interface::slipstream_plugin_interface::{ + BroadcastEvent, + BroadcastEventKind, + SlipstreamPlugin, +}; + +use libloading::Library; +use std::{ + ops::{Deref, DerefMut}, + path::{Path, PathBuf}, +}; +use tracing::{info, warn}; + +/// A type alias for the result of plugin manager operations. +type JsonRpcResult = Result; + +#[derive(Debug)] +pub struct LoadedSlipstreamPlugin { + name: String, + plugin: Box, +} + +impl LoadedSlipstreamPlugin { + pub fn new(plugin: Box, name: Option) -> Self { + Self { name: name.unwrap_or_else(|| plugin.name().to_owned()), plugin } + } + + pub fn name(&self) -> &str { + &self.name + } +} + +impl Deref for LoadedSlipstreamPlugin { + type Target = Box; + + fn deref(&self) -> &Self::Target { + &self.plugin + } +} + +impl DerefMut for LoadedSlipstreamPlugin { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.plugin + } +} + +/// A fully-loaded plugin entry: the plugin instance, its backing shared library, and the +/// resolved path used for duplicate detection. Fields are declared in drop order — `plugin` +/// is dropped before `lib` — which guarantees all plugin code finishes executing before the +/// shared library is unloaded. +#[derive(Debug)] +struct LoadedPlugin { + plugin: LoadedSlipstreamPlugin, + _lib: Library, + /// Resolved, absolute path to the `.so` file. + /// Used to detect duplicate loads before calling `dlopen`, preventing unsafe double-loading. + libpath: PathBuf, +} + +impl Drop for LoadedPlugin { + fn drop(&mut self) { + info!("Unloading plugin '{}'", self.plugin.name()); + self.plugin.on_unload(); + // `plugin` then drops before `lib` (declaration order), ensuring all plugin code + // finishes executing before the shared library is unloaded. + } +} + +// The Plugin Manager itself +#[derive(Default, Debug)] +pub struct SlipstreamPluginManager { + plugins: Vec, +} + +impl SlipstreamPluginManager { + pub fn new() -> Self { + SlipstreamPluginManager { plugins: Vec::default() } + } + + /// Initializes a manager by loading one plugin per config file. + /// + /// Each config file must be a JSON5 file with a `libpath` field pointing to the + /// shared library that implements `SlipstreamPlugin`. + pub fn from_config_files(config_files: &[std::path::PathBuf]) -> Result { + let mut manager = Self::new(); + for path in config_files { + manager.load_plugin(path)?; + } + Ok(manager) + } + + /// Unload all plugins and loaded plugin libraries, making sure to fire + /// their `on_unload()` methods so they can do any necessary cleanup. + pub fn unload(&mut self) { + self.plugins.clear(); // Drop impl fires on_unload and enforces plugin-before-lib drop order. + } + + /// Returns `true` if any loaded plugin subscribes to the given event kind. + /// + /// Used as a pre-serialization guard: callers skip expensive byte serialization + /// when no plugin would receive the resulting event. + pub fn has_subscribers(&self, kind: BroadcastEventKind) -> bool { + self.plugins.iter().any(|p| p.plugin.subscribed_events().contains(&kind)) + } + + /// Dispatches an event to every plugin subscribed to its kind. + /// Errors are logged as warnings but never propagated. + pub fn broadcast(&self, event: BroadcastEvent<'_>) { + let kind = event.kind(); + for entry in &self.plugins { + if entry.plugin.subscribed_events().contains(&kind) + && let Err(e) = entry.plugin.on_broadcast(event) + { + warn!("Slipstream plugin '{}' on_broadcast error: {e}", entry.plugin.name()); + } + } + } + + /// Returns the names of all loaded plugins. + pub fn list_plugins(&self) -> JsonRpcResult> { + Ok(self.plugins.iter().map(|p| p.plugin.name().to_owned()).collect()) + } + + /// Loads a plugin from the given config file. + /// + /// # Safety + /// + /// This function loads the dynamically linked library specified in the config. The library + /// must do necessary initializations. + pub fn load_plugin(&mut self, slipstream_plugin_config_file: impl AsRef) -> JsonRpcResult { + // Resolve the library path from the config before calling dlopen. + // This lets us detect duplicates without loading the library a second time, which is + // unsafe: a second dlopen on an already-loaded .so can trigger re-execution of Rust + // .init_array startup code, corrupting global state in the running plugin instance. + let resolved_libpath = resolve_libpath_from_config(slipstream_plugin_config_file.as_ref())?; + + // Check for duplicate library path first (catches same .so before dlopen). + if let Some(entry) = self.plugins.iter().find(|p| p.libpath == resolved_libpath) { + return Err(SlipstreamPluginManagerError::PluginAlreadyLoaded(entry.plugin.name().to_string())); + } + + let (new_lib, mut new_plugin, new_config_file) = + load_plugin_from_config(slipstream_plugin_config_file.as_ref())?; + + // Also guard against a different .so that happens to expose the same plugin name. + if self.plugins.iter().any(|entry| entry.plugin.name().eq(new_plugin.name())) { + return Err(SlipstreamPluginManagerError::PluginAlreadyLoaded(new_plugin.name().to_string())); + } + + // Call on_load and push plugin. + new_plugin + .on_load(new_config_file, false) + .map_err(|e| SlipstreamPluginManagerError::PluginStartError(e.to_string()))?; + let name = new_plugin.name().to_string(); + + self.plugins.push(LoadedPlugin { plugin: new_plugin, _lib: new_lib, libpath: resolved_libpath }); + + info!("Loaded plugin: {}", name); + + Ok(name) + } + + /// Unloads the plugin with the given name. + pub fn unload_plugin(&mut self, name: &str) -> JsonRpcResult<()> { + let Some(idx) = self.plugins.iter().position(|entry| entry.plugin.name().eq(name)) else { + return Err(SlipstreamPluginManagerError::PluginNotLoaded(name.to_string())); + }; + + self._drop_plugin(idx); + Ok(()) + } + + /// Reloads the plugin with the given name from the given config file. + /// + /// # Note + /// + /// This function is not currently exposed. It was disabled due to SIGSEGV issues + /// and is a good next step to implement safely. Use `unload_plugin` + `load_plugin` + /// as a workaround in the meantime OR just stop the snarkos service and restart it with + /// the updated plugin config(s) + pub fn reload_plugin(&mut self, _name: &str, _config_file: &str) -> JsonRpcResult<()> { + Err(SlipstreamPluginManagerError::PluginLoadError("Plugin reload is not currently implemented.".to_string())) + } + + fn _drop_plugin(&mut self, idx: usize) { + self.plugins.remove(idx); // Drop impl fires on_unload and enforces plugin-before-lib drop order. + } +} + +#[derive(thiserror::Error, Debug)] +pub enum SlipstreamPluginManagerError { + #[error("Cannot open the plugin config file: {0}")] + CannotOpenConfigFile(String), + + #[error("Cannot read the plugin config file: {0}")] + CannotReadConfigFile(String), + + #[error("The config file is not in a valid JSON/JSON5 format: {0}")] + InvalidConfigFileFormat(String), + + #[error("Plugin library path is not specified in the config file")] + LibPathNotSet, + + #[error("Invalid plugin path")] + InvalidPluginPath, + + #[error("Cannot load plugin shared library (error: {0})")] + PluginLoadError(String), + + #[error("The slipstream plugin '{0}' is already loaded")] + PluginAlreadyLoaded(String), + + #[error("The plugin '{0}' is not loaded")] + PluginNotLoaded(String), + + #[error("The SlipstreamPlugin on_load method failed (error: {0})")] + PluginStartError(String), +} + +/// Parses a plugin config file and returns the resolved, absolute path to the `.so`. +/// +/// Does NOT open or load the library — safe to call for duplicate detection before `dlopen`. +#[cfg(not(test))] +pub(crate) fn resolve_libpath_from_config( + slipstream_plugin_config_file: &Path, +) -> Result { + use std::{fs::File, io::Read}; + + let mut file = File::open(slipstream_plugin_config_file).map_err(|e| { + SlipstreamPluginManagerError::CannotOpenConfigFile(format!( + "Failed to open the plugin config file {slipstream_plugin_config_file:?}, error: {e:?}" + )) + })?; + + let mut contents = String::new(); + file.read_to_string(&mut contents).map_err(|e| { + SlipstreamPluginManagerError::CannotReadConfigFile(format!( + "Failed to read the plugin config file {slipstream_plugin_config_file:?}, error: {e:?}" + )) + })?; + + let result: serde_json::Value = json5::from_str(&contents).map_err(|e| { + SlipstreamPluginManagerError::InvalidConfigFileFormat(format!( + "The config file {slipstream_plugin_config_file:?} is not in a valid Json5 format, error: {e:?}" + )) + })?; + + let libpath_str = result["libpath"].as_str().ok_or(SlipstreamPluginManagerError::LibPathNotSet)?; + let mut libpath = PathBuf::from(libpath_str); + if libpath.is_relative() { + let config_dir = slipstream_plugin_config_file.parent().ok_or_else(|| { + SlipstreamPluginManagerError::CannotOpenConfigFile(format!( + "Failed to resolve parent of {slipstream_plugin_config_file:?}", + )) + })?; + libpath = config_dir.join(libpath); + } + + Ok(libpath) +} + +/// # Safety +/// +/// This function loads the dynamically linked library specified in the path. The library +/// must do necessary initializations. +/// +/// Returns the slipstream plugin, the dynamic library, and the parsed config file as a `&str`. +/// (The slipstream plugin interface requires a `&str` for the `on_load` method.) +#[cfg(not(test))] +pub(crate) fn load_plugin_from_config( + slipstream_plugin_config_file: &Path, +) -> Result<(Library, LoadedSlipstreamPlugin, &str), SlipstreamPluginManagerError> { + use std::{fs::File, io::Read, path::PathBuf}; + // Trait objects have no C equivalent; the suppression is intentional — the plugin ABI + // uses raw pointers and the caller takes ownership immediately via Box::from_raw. + #[allow(improper_ctypes_definitions)] + type PluginConstructor = unsafe extern "C" fn() -> *mut dyn SlipstreamPlugin; + use libloading::Symbol; + + let mut file = match File::open(slipstream_plugin_config_file) { + Ok(file) => file, + Err(err) => { + return Err(SlipstreamPluginManagerError::CannotOpenConfigFile(format!( + "Failed to open the plugin config file {slipstream_plugin_config_file:?}, error: {err:?}" + ))); + } + }; + + let mut contents = String::new(); + if let Err(err) = file.read_to_string(&mut contents) { + return Err(SlipstreamPluginManagerError::CannotReadConfigFile(format!( + "Failed to read the plugin config file {slipstream_plugin_config_file:?}, error: {err:?}" + ))); + } + + let result: serde_json::Value = match json5::from_str(&contents) { + Ok(value) => value, + Err(err) => { + return Err(SlipstreamPluginManagerError::InvalidConfigFileFormat(format!( + "The config file {slipstream_plugin_config_file:?} is not in a valid Json5 format, error: {err:?}" + ))); + } + }; + + let libpath = result["libpath"].as_str().ok_or(SlipstreamPluginManagerError::LibPathNotSet)?; + let mut libpath = PathBuf::from(libpath); + if libpath.is_relative() { + let config_dir = slipstream_plugin_config_file.parent().ok_or_else(|| { + SlipstreamPluginManagerError::CannotOpenConfigFile(format!( + "Failed to resolve parent of {slipstream_plugin_config_file:?}", + )) + })?; + libpath = config_dir.join(libpath); + } + + let plugin_name = result["name"].as_str().map(|s| s.to_owned()); + + let config_file = + slipstream_plugin_config_file.as_os_str().to_str().ok_or(SlipstreamPluginManagerError::InvalidPluginPath)?; + + let (plugin, lib) = unsafe { + let lib = Library::new(libpath).map_err(|e| SlipstreamPluginManagerError::PluginLoadError(e.to_string()))?; + let constructor: Symbol = + lib.get(b"_create_plugin").map_err(|e| SlipstreamPluginManagerError::PluginLoadError(e.to_string()))?; + let plugin_raw = constructor(); + if plugin_raw.is_null() { + return Err(SlipstreamPluginManagerError::PluginLoadError( + "plugin constructor returned a null pointer".to_string(), + )); + } + (Box::from_raw(plugin_raw), lib) + }; + Ok((lib, LoadedSlipstreamPlugin::new(plugin, plugin_name), config_file)) +} + +#[cfg(test)] +const TESTPLUGIN_CONFIG: &str = "TESTPLUGIN_CONFIG"; +#[cfg(test)] +const TESTPLUGIN2_CONFIG: &str = "TESTPLUGIN2_CONFIG"; + +// In tests resolve_libpath_from_config returns the config path itself as a stand-in for +// the .so path. This is sufficient for duplicate detection without real file I/O. +#[cfg(test)] +pub(crate) fn resolve_libpath_from_config( + slipstream_plugin_config_file: &Path, +) -> Result { + Ok(slipstream_plugin_config_file.to_path_buf()) +} + +// This is mocked for tests to avoid having to do IO with a dynamically linked library +// across different architectures at test time. +#[cfg(test)] +pub(crate) fn load_plugin_from_config( + slipstream_plugin_config_file: &Path, +) -> Result<(Library, LoadedSlipstreamPlugin, &str), SlipstreamPluginManagerError> { + if slipstream_plugin_config_file.ends_with(TESTPLUGIN_CONFIG) { + Ok(tests::dummy_plugin_and_library(tests::TestPlugin, TESTPLUGIN_CONFIG)) + } else if slipstream_plugin_config_file.ends_with(TESTPLUGIN2_CONFIG) { + Ok(tests::dummy_plugin_and_library(tests::TestPlugin2, TESTPLUGIN2_CONFIG)) + } else { + Err(SlipstreamPluginManagerError::CannotOpenConfigFile( + slipstream_plugin_config_file.to_str().unwrap().to_string(), + )) + } +} + +#[cfg(test)] +mod tests { + use crate::slipstream_manager::{ + LoadedPlugin, + LoadedSlipstreamPlugin, + SlipstreamPluginManager, + TESTPLUGIN_CONFIG, + TESTPLUGIN2_CONFIG, + }; + use libloading::Library; + use snarkvm_slipstream_plugin_interface::slipstream_plugin_interface::{ + BroadcastEvent, + BroadcastEventKind, + SlipstreamPlugin, + }; + use std::{ + path::PathBuf, + sync::{Arc, RwLock}, + }; + + pub(super) fn dummy_plugin_and_library( + plugin: P, + config_path: &'static str, + ) -> (Library, LoadedSlipstreamPlugin, &'static str) { + #[cfg(unix)] + let library = libloading::os::unix::Library::this(); + #[cfg(windows)] + let library = libloading::os::windows::Library::this().unwrap(); + (Library::from(library), LoadedSlipstreamPlugin::new(Box::new(plugin), None), config_path) + } + + const DUMMY_NAME: &str = "dummy"; + const ANOTHER_DUMMY_NAME: &str = "another_dummy"; + + #[derive(Clone, Copy, Debug)] + pub(super) struct TestPlugin; + + impl SlipstreamPlugin for TestPlugin { + fn name(&self) -> &'static str { + DUMMY_NAME + } + } + + #[derive(Clone, Copy, Debug)] + pub(super) struct TestPlugin2; + + impl SlipstreamPlugin for TestPlugin2 { + fn name(&self) -> &'static str { + ANOTHER_DUMMY_NAME + } + } + + #[test] + fn test_plugin_list() { + // Initialize empty manager. + let plugin_manager = Arc::new(RwLock::new(SlipstreamPluginManager::new())); + let mut plugin_manager_lock = plugin_manager.write().unwrap(); + + // Load two plugins. + let (_lib, mut plugin, config) = dummy_plugin_and_library(TestPlugin, TESTPLUGIN_CONFIG); + plugin.on_load(config, false).unwrap(); + plugin_manager_lock.plugins.push(LoadedPlugin { plugin, _lib, libpath: PathBuf::from(config) }); + + let (_lib, mut plugin, config) = dummy_plugin_and_library(TestPlugin2, TESTPLUGIN2_CONFIG); + plugin.on_load(config, false).unwrap(); + plugin_manager_lock.plugins.push(LoadedPlugin { plugin, _lib, libpath: PathBuf::from(config) }); + + // Check that both plugins are returned in the list. + let plugins = plugin_manager_lock.list_plugins().unwrap(); + assert!(plugins.iter().any(|name| name.eq(DUMMY_NAME))); + assert!(plugins.iter().any(|name| name.eq(ANOTHER_DUMMY_NAME))); + } + + #[test] + fn test_plugin_load_unload() { + // Initialize empty manager. + let plugin_manager = Arc::new(RwLock::new(SlipstreamPluginManager::new())); + let mut plugin_manager_lock = plugin_manager.write().unwrap(); + + // Load rpc call. + let load_result = plugin_manager_lock.load_plugin(TESTPLUGIN_CONFIG); + assert!(load_result.is_ok()); + assert_eq!(plugin_manager_lock.plugins.len(), 1); + + // Unload rpc call. + let unload_result = plugin_manager_lock.unload_plugin(DUMMY_NAME); + assert!(unload_result.is_ok()); + assert_eq!(plugin_manager_lock.plugins.len(), 0); + } + + #[test] + fn test_broadcast_mapping_update() { + let mut manager = SlipstreamPluginManager::new(); + + // Install a mock plugin that tracks calls. + #[derive(Debug)] + struct TrackingPlugin { + calls: std::sync::atomic::AtomicU32, + } + impl SlipstreamPlugin for TrackingPlugin { + fn name(&self) -> &'static str { + "tracking" + } + + fn subscribed_events(&self) -> &[BroadcastEventKind] { + &[BroadcastEventKind::MappingUpdate] + } + + fn on_broadcast(&self, _event: BroadcastEvent<'_>) -> anyhow::Result<()> { + self.calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + Ok(()) + } + } + + // Manually push the plugin (bypassing dynamic loading). + #[cfg(unix)] + let _lib = Library::from(libloading::os::unix::Library::this()); + #[cfg(windows)] + let _lib = Library::from(libloading::os::windows::Library::this().unwrap()); + + let plugin = TrackingPlugin { calls: std::sync::atomic::AtomicU32::new(0) }; + manager.plugins.push(LoadedPlugin { + plugin: LoadedSlipstreamPlugin::new(Box::new(plugin), None), + _lib, + libpath: PathBuf::new(), + }); + + // Broadcast a MappingUpdate and verify the plugin received it. + manager.broadcast(BroadcastEvent::MappingUpdate { + program_id: b"program_id", + mapping_name: b"mapping", + key: b"key", + value: b"value", + block_height: 42, + }); + + // Verify via list_plugins that the plugin is still loaded. + assert_eq!(manager.list_plugins().unwrap(), vec!["tracking"]); + } +} diff --git a/synthesizer/Cargo.toml b/synthesizer/Cargo.toml index 7aa2211245..648cbd45a8 100644 --- a/synthesizer/Cargo.toml +++ b/synthesizer/Cargo.toml @@ -35,6 +35,7 @@ async = [ "snarkvm-ledger-query/async", "snarkvm-synthesizer-process/async" ] cuda = [ "snarkvm-algorithms/cuda" ] history = [ "snarkvm-ledger-store/history" ] history-staking-rewards = [ "snarkvm-ledger-store/history-staking-rewards" ] +slipstream-plugins = [ "snarkvm-ledger-store/slipstream-plugins" ] rocks = [ "snarkvm-ledger-store/rocks" ] serial = [ "snarkvm-console/serial", diff --git a/synthesizer/src/vm/finalize.rs b/synthesizer/src/vm/finalize.rs index 9f3bab4b30..a1fc037590 100644 --- a/synthesizer/src/vm/finalize.rs +++ b/synthesizer/src/vm/finalize.rs @@ -654,8 +654,18 @@ impl> VM { .current_block_height() .store(state.block_height(), std::sync::atomic::Ordering::SeqCst); + // Signal to Slipstream plugins that canonical finalize is starting. + #[cfg(feature = "slipstream-plugins")] + { + self.store.finalize_store().is_finalize_mode().store(true, std::sync::atomic::Ordering::SeqCst); + self.store + .finalize_store() + .slipstream_block_height() + .store(state.block_height(), std::sync::atomic::Ordering::SeqCst); + } + // Perform the finalize operation on the preset finalize mode. - atomic_finalize!(self.finalize_store(), FinalizeMode::RealRun, { + let finalize_result = atomic_finalize!(self.finalize_store(), FinalizeMode::RealRun, { // Initialize an iterator for ratifications before finalize. let pre_ratifications = ratifications.iter().filter(|r| match r { Ratify::Genesis(_, _, _) => true, @@ -876,7 +886,15 @@ impl> VM { finish!(timer); // <- Note: This timer does **not** include the time to write batch to DB. Ok(ratified_finalize_operations) - }) + }); + + // Reset the canonical finalize flag regardless of whether finalize succeeded or failed. + #[cfg(feature = "slipstream-plugins")] + { + self.store.finalize_store().is_finalize_mode().store(false, std::sync::atomic::Ordering::SeqCst); + } + + finalize_result } /// Returns `Some(reason)` if the transaction is aborted. Otherwise, returns `None`. @@ -1405,12 +1423,29 @@ impl> VM { #[cfg(feature = "history-staking-rewards")] { + let height = state.block_height(); for (curr_stake, (staker, (validator, new_stake))) in current_stakers.values().map(|(_, current_stake)| current_stake).zip(&next_stakers) { let reward = new_stake - curr_stake; - let height = state.block_height(); store.staking_rewards_map().insert((*staker, height), (*validator, reward, *new_stake))?; + // Notify Slipstream plugins of the staking reward, if in canonical finalize mode. + #[cfg(feature = "slipstream-plugins")] + if IS_FINALIZE { + store.notify_staking_reward(staker, validator, reward, *new_stake, height); + } + } + } + + // When history-staking-rewards is disabled, notify Slipstream plugins directly. + #[cfg(all(feature = "slipstream-plugins", not(feature = "history-staking-rewards")))] + if IS_FINALIZE { + let height = state.block_height(); + for (curr_stake, (staker, (validator, new_stake))) in + current_stakers.values().map(|(_, current_stake)| current_stake).zip(&next_stakers) + { + let reward = new_stake - curr_stake; + store.notify_staking_reward(staker, validator, reward, *new_stake, height); } } diff --git a/utilities/src/bytes.rs b/utilities/src/bytes.rs index 14110692fe..28eac78178 100644 --- a/utilities/src/bytes.rs +++ b/utilities/src/bytes.rs @@ -525,7 +525,7 @@ pub fn bits_from_bytes_le(bytes: &[u8]) -> impl DoubleEndedIterator #[inline] pub fn bytes_from_bits_le(bits: &[bool]) -> Vec { - let desired_size = if bits.len() % 8 == 0 { bits.len() / 8 } else { bits.len() / 8 + 1 }; + let desired_size = if bits.len().is_multiple_of(8) { bits.len() / 8 } else { bits.len() / 8 + 1 }; let mut bytes = Vec::with_capacity(desired_size); for bits in bits.chunks(8) {