Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
4ec2f56
first pass at plumbing for slipstream plugins
dgrkotsonis Mar 17, 2026
ad0c250
rework imports, fix types
dgrkotsonis Mar 18, 2026
2facb89
formatting
dgrkotsonis Mar 18, 2026
d909f2a
fix feature flag propagation
dgrkotsonis Mar 19, 2026
41c9e5e
rework flags, add some diagnostic logs
dgrkotsonis Mar 20, 2026
5422f69
default features false for slipstream plugin
dgrkotsonis Mar 20, 2026
9bd8bbc
update snarkVM rev, added some better logging
dgrkotsonis Mar 23, 2026
6e27cfb
update snarkvm rev
dgrkotsonis Mar 24, 2026
29b11b9
update snarkvm rev for testing
dgrkotsonis Mar 25, 2026
c23d26b
snarkvm rev with small fix
dgrkotsonis Mar 25, 2026
477c019
more logs
dgrkotsonis Mar 25, 2026
4cbbfc4
Updated snarkvm with arc around pluginmanager
dgrkotsonis Mar 25, 2026
d22be74
Update snarkvm w fewer logs
dgrkotsonis Mar 25, 2026
43a5c82
updated snarkvm to hopefully avoid dynlib loading issues
dgrkotsonis Mar 27, 2026
53f97ed
rebased changes, updated snarkVM, testing RPC and filtering
dgrkotsonis Mar 31, 2026
75762c4
Not sure how snarkvm import got overwritten, fixing
dgrkotsonis Mar 31, 2026
060b718
update with less noisy logs
dgrkotsonis Mar 31, 2026
825e733
new snarkvm version with dynamic loading logs
dgrkotsonis Apr 1, 2026
0ee3e70
update docs, remove reload
dgrkotsonis Apr 1, 2026
989f619
Update snarkVM rev
dgrkotsonis Apr 1, 2026
4923716
updated snarkvm
dgrkotsonis Apr 3, 2026
05a1793
changes to account for new parking_lot rwlocks
dgrkotsonis Apr 7, 2026
7ffc1aa
updated snarkvm
dgrkotsonis Apr 7, 2026
2b93e75
Update with new snarkvm changes, remove slipstream service
dgrkotsonis Apr 20, 2026
ec83f3f
Rebased/updated, see if we can get past sync error
dgrkotsonis Apr 22, 2026
3ab5120
clean up a bit
dgrkotsonis Apr 22, 2026
9aaa8f4
formatting
dgrkotsonis Apr 23, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
332 changes: 176 additions & 156 deletions Cargo.lock

Large diffs are not rendered by default.

12 changes: 9 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,15 @@ version = "1.0.1"
default-features = false

[workspace.dependencies.snarkvm]
#path = "../snarkVM"
git = "https://github.com/ProvableHQ/snarkVM.git"
rev = "8c5ef1f5849cf12e2e00b2b4ca5cea152db48402"
#version = "=4.6.0"
# Latest stream_plugin_testing commit
rev = "d83dc74732b8b273d222ad412426338e51da823b"
default-features = false

[workspace.dependencies.snarkvm-slipstream-plugin-manager]
git = "https://github.com/ProvableHQ/snarkVM.git"
# Latest stream_plugin_testing commit
rev = "d83dc74732b8b273d222ad412426338e51da823b"
default-features = false

[workspace.dependencies.anyhow]
Expand Down Expand Up @@ -268,6 +273,7 @@ path = "snarkos/main.rs"
default = [ "snarkos-cli/metrics", "snarkos-node-metrics", "snarkos-node/metrics", "snarkos-node-cdn/metrics" ]
history = [ "snarkos-node/history" ]
history-staking-rewards = [ "snarkos-node/history-staking-rewards" ]
slipstream-plugins = [ "snarkos-node/slipstream-plugins", "snarkos-cli/slipstream-plugins" ]
telemetry = [ "snarkos-node/telemetry" ]
cuda = [
"snarkos-account/cuda",
Expand Down
1 change: 1 addition & 0 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ edition = "2024"
[features]
default = [ ]
async = [ ]
slipstream-plugins = [ "snarkos-node/slipstream-plugins" ]
locktick = [
"dep:locktick",
"snarkos-display/locktick",
Expand Down
16 changes: 14 additions & 2 deletions cli/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,12 @@ pub struct Start {
/// If the flag is set, the node will attempt to automatically migrate the node data to the new format.
#[clap(long)]
pub auto_migrate_node_data: bool,

/// Paths to Slipstream plugin config files (JSON5). May be repeated for multiple plugins.
/// Requires the node to be compiled with --features slipstream-plugins.
#[cfg(feature = "slipstream-plugins")]
#[clap(long = "slipstream-config", value_name = "PATH", verbatim_doc_comment)]
pub slipstream_configs: Vec<PathBuf>,
}

impl Start {
Expand Down Expand Up @@ -844,11 +850,17 @@ impl Start {
// Register the signal handler.
let signal_handler = SignalHandler::new();

// Collect slipstream plugin config paths (empty slice when feature is disabled).
#[cfg(feature = "slipstream-plugins")]
let slipstream_configs: &[PathBuf] = &self.slipstream_configs;
#[cfg(not(feature = "slipstream-plugins"))]
let slipstream_configs: &[PathBuf] = &[];

// Initialize the node.
let node = match node_type {
NodeType::Validator => Node::new_validator(node_ip, self.bft, rest_ip, self.rest_rps, account, &trusted_peers, &trusted_validators, genesis, cdn, storage_mode, node_data_dir, self.trusted_peers_only, self.auto_db_checkpoints.clone(), dev_txs, self.dev, signal_handler.clone()).await,
NodeType::Validator => Node::new_validator(node_ip, self.bft, rest_ip, self.rest_rps, account, &trusted_peers, &trusted_validators, genesis, cdn, storage_mode, node_data_dir, self.trusted_peers_only, self.auto_db_checkpoints.clone(), dev_txs, self.dev, slipstream_configs, signal_handler.clone()).await,
NodeType::Prover => Node::new_prover(node_ip, account, &trusted_peers, genesis, node_data_dir, self.trusted_peers_only, self.dev, signal_handler.clone()).await,
NodeType::Client => Node::new_client(node_ip, rest_ip, self.rest_rps, account, &trusted_peers, genesis, cdn, storage_mode, node_data_dir, self.trusted_peers_only, self.auto_db_checkpoints.clone(), self.dev, signal_handler.clone()).await,
NodeType::Client => Node::new_client(node_ip, rest_ip, self.rest_rps, account, &trusted_peers, genesis, cdn, storage_mode, node_data_dir, self.trusted_peers_only, self.auto_db_checkpoints.clone(), self.dev, slipstream_configs, signal_handler.clone()).await,
NodeType::BootstrapClient => Node::new_bootstrap_client(node_ip, account, *genesis.header(), self.dev).await,
}?;

Expand Down
181 changes: 181 additions & 0 deletions docs/slipstream_plugins.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
# Slipstream Plugins
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a better place to put this file rather than creating this new docs dir? (Unlike snarkVM there isn't a specific plugins crate to put this in)


Slipstream is a plugin system that lets operators stream canonical mapping updates and staking
rewards from snarkOS nodes to external services (databases, metrics pipelines, etc.) in
real time, without modifying node code.

---

## Overview

Slipstream plugins are dynamically loaded shared libraries (`.so` / `.dylib` / `.dll`) that
implement the `SlipstreamPlugin` trait from `snarkvm-slipstream-plugin-interface`. The plugin
manager inside `snarkVM`'s `FinalizeStore` calls plugin hooks every time canonical finalize runs.

Plugins can subscribe to:

- **Mapping updates** — every key/value write that occurs during canonical finalize.
- **Staking rewards** — per-staker reward notifications

Only **Validator** and **Client** nodes finalize blocks and therefore support plugins.
Prover nodes do not.

---

## Building a Plugin

Use `snarkvm-slipstream-plugin-interface` as a dependency and implement the `SlipstreamPlugin`
trait. Compile your crate as a `cdylib`:

```toml
# Cargo.toml
[lib]
crate-type = ["cdylib"]

[dependencies]
snarkvm-slipstream-plugin-interface = { git = "https://github.com/ProvableHQ/snarkVM.git", branch = "stream_plugin_testing" }
```

Export the constructor with the exact symbol name `_create_plugin`:

```rust
#[no_mangle]
pub extern "C" fn _create_plugin() -> *mut dyn SlipstreamPlugin {
Box::into_raw(Box::new(MyPlugin::new()))
}
```

See `slipstream-plugin-postgres` in the snarkVM repository for a complete reference
implementation.

---

## Plugin Config File (JSON5)

Each plugin is configured via a JSON5 file. The required field is `libpath`, which can be
absolute or relative to the config file's directory.

```json5
{
// Required: path to the compiled .so / .dylib
libpath: "./libslipstream_postgres_example.so",

// Optional: override the plugin name reported by name()
name: "postgres",

// Plugin-specific fields (passed verbatim to on_load)
connection_string: "postgres://user:pass@localhost/aleo",
batch_size: 100,
}
```

---

## Starting a Node with Plugins

Compile snarkOS with the `slipstream-plugins` feature

```bash
cargo build --features slipstream-plugins
```

Pass one or more `--slipstream-config` flags at startup:

```bash
# Single plugin
snarkos start --client \
--slipstream-config ~/.aleo/plugins/postgres/plugin.json5

# Multiple plugins
snarkos start --validator \
--slipstream-config ~/.aleo/plugins/postgres/plugin.json5 \
--slipstream-config ~/.aleo/plugins/metrics/plugin.json5
```

Plugins are loaded synchronously before the REST server starts. If any plugin fails to load,
the node exits with an error.

---

## Runtime Management via REST API

All endpoints require a valid JWT token (`Authorization: Bearer <token>`).

### List loaded plugins

```
GET /{network}/slipstream/plugins
```

Response (200):
```json
["postgres", "metrics"]
```

### Load a plugin at runtime

```
POST /{network}/slipstream/plugins
Content-Type: application/json

{ "config_file": "/path/to/plugin.json5" }
```

Response (200):
```json
{ "loaded": "postgres" }
```

Returns **422 Unprocessable Entity** if a plugin with that name is already loaded.

### Unload a plugin

```
DELETE /{network}/slipstream/plugins/{name}
```

Response (200):
```json
{ "unloaded": true }
```

Returns **404 Not Found** if no plugin with that name is loaded.

### Reload a plugin (not yet implemented)

`PUT /{network}/slipstream/plugins/{name}` is not currently available. To update a plugin's
config during runtime, unload it with DELETE and reload it with POST. Otherwise, stop the snarkos service, update the config, and restart it, pointing at the new config.

---

## Example: curl Commands

```bash
JWT="<your-jwt-token>"
BASE="http://localhost:3030/mainnet"

# List
curl -H "Authorization: Bearer $JWT" "$BASE/slipstream/plugins"

# Load
curl -X POST -H "Authorization: Bearer $JWT" \
-H "Content-Type: application/json" \
-d '{"config_file":"/path/to/plugin.json5"}' \
"$BASE/slipstream/plugins"

# Unload
curl -X DELETE -H "Authorization: Bearer $JWT" \
"$BASE/slipstream/plugins/postgres"
```

---

## Notes

- Plugins are loaded in startup order and unloaded in reverse order on shutdown.
- The `on_unload` method is called on every plugin during graceful shutdown.
- Plugin errors during `notify_mapping_update` / `notify_staking_reward` are logged as warnings
and never propagated to the node — a misbehaving plugin cannot crash the node.
- The plugin manager uses a `std::sync::RwLock`; `notify_*` calls acquire a read lock, while
load/unload/reload operations acquire the write lock. Avoid long-running operations inside
plugin callbacks.
9 changes: 7 additions & 2 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ metrics = [
"snarkos-node-router/metrics",
"snarkos-node-tcp/metrics"
]
history = [ "snarkos-node-rest/history" ]
history-staking-rewards = [ "snarkos-node-rest/history-staking-rewards" ]
history = [ "snarkos-node-rest/history", "snarkvm/history" ]
history-staking-rewards = [ "snarkos-node-rest/history-staking-rewards", "snarkvm/history-staking-rewards" ]
slipstream-plugins = [ "dep:snarkvm-slipstream-plugin-manager", "snarkos-node-rest/slipstream-plugins", "snarkvm/slipstream-plugins" ]
telemetry = [ "snarkos-node-bft/telemetry", "snarkos-node-consensus/telemetry", "snarkos-node-rest/telemetry" ]
cuda = [
"snarkvm/cuda",
Expand Down Expand Up @@ -137,6 +138,10 @@ workspace = true
[dependencies.snarkvm]
workspace = true

[dependencies.snarkvm-slipstream-plugin-manager]
workspace = true
optional = true

[dependencies.time]
workspace = true

Expand Down
9 changes: 7 additions & 2 deletions node/rest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ edition = "2024"

[features]
default = [ ]
history = [ "snarkvm/history" ]
history-staking-rewards = [ "snarkvm/history-staking-rewards" ]
history = [ "dep:snarkvm-slipstream-plugin-manager", "snarkvm/history" ]
history-staking-rewards = [ "dep:snarkvm-slipstream-plugin-manager", "snarkvm/history-staking-rewards" ]
slipstream-plugins = [ "dep:snarkvm-slipstream-plugin-manager", "snarkvm/slipstream-plugins" ]
telemetry = [ "snarkos-node-consensus/telemetry" ]
cuda = [ "snarkvm/cuda", "snarkos-node-consensus/cuda", "snarkos-node-router/cuda" ]
locktick = [
Expand Down Expand Up @@ -96,6 +97,10 @@ workspace = true
[dependencies.snarkvm]
workspace = true

[dependencies.snarkvm-slipstream-plugin-manager]
workspace = true
optional = true

[dependencies.rand]
workspace = true

Expand Down
13 changes: 12 additions & 1 deletion node/rest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl<N: Network, C: ConsensusStorage<N>, R: Routing<N>> Rest<N, C, R> {
fn build_routes(&self, rest_rps: u32) -> axum::Router {
let cors = CorsLayer::new()
.allow_origin(Any)
.allow_methods([Method::GET, Method::POST, Method::OPTIONS])
.allow_methods([Method::GET, Method::POST, Method::DELETE, Method::OPTIONS])
.allow_headers([CONTENT_TYPE]);

// Prepare the rate limiting setup.
Expand Down Expand Up @@ -259,6 +259,17 @@ impl<N: Network, C: ConsensusStorage<N>, R: Routing<N>> Rest<N, C, R> {
#[cfg(feature = "history")]
let routes = routes.route("/program/{id}/mapping/{name}/{key}/history/{height}", get(Self::get_history));

// If the `slipstream-plugins` feature is enabled,
// enable the Slipstream plugin management endpoints (no auth required).
#[cfg(feature = "slipstream-plugins")]
let routes = routes
.route("/slipstream/plugins", get(Self::slipstream_list_plugins).post(Self::slipstream_load_plugin))
.route(
"/slipstream/plugins/{name}",
// TODO: PUT (reload) is not yet implemented.
axum::routing::delete(Self::slipstream_unload_plugin),
);

// If the `history-staking-rewards` feature is enabled, enable the additional endpoint.
#[cfg(feature = "history-staking-rewards")]
let routes = routes.route("/staking/rewards/{address}/{height}", get(Self::get_staking_reward));
Expand Down
Loading