Skip to content

Rafal/node backend execution 2 execution scheduler pr to main#963

Draft
leszko wants to merge 26 commits intomainfrom
rafal/node-backend-execution-2-execution-scheduler-pr-to-main
Draft

Rafal/node backend execution 2 execution scheduler pr to main#963
leszko wants to merge 26 commits intomainfrom
rafal/node-backend-execution-2-execution-scheduler-pr-to-main

Conversation

@leszko
Copy link
Copy Markdown
Collaborator

@leszko leszko commented Apr 20, 2026

No description provided.

leszko and others added 26 commits April 14, 2026 07:47
Introduce the fine-grained backend node framework shared by the
execution-scheduler and ACEStep branches:

- src/scope/core/nodes/: BaseNode ABC, NodeDefinition / NodePort /
  NodeParam Pydantic schemas, and NodeRegistry keyed by node_type_id.
- plugins: new `register_nodes` pluggy hookspec + PluginManager
  `register_plugin_nodes` hook caller; bootstrap both built-in and
  plugin nodes from pipelines registry initialization.
- graph_schema: allow `type="node"` GraphNode with `node_type_id`
  and `params`, plus a `get_backend_node_ids()` helper and a
  validation error when node_type_id is missing.
- app: add `GET /api/v1/nodes/definitions` for frontend discovery
  of registered node types.

BaseNode intentionally only requires `get_definition()`; execution
contracts (push vs. pull) are layered by the specialized branches.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
Surface backend node types on the frontend via the new
GET /api/v1/nodes/definitions endpoint:

- api.ts: extend GraphNode with `node_type_id`/`params` and add
  NodeDefinitionDto + fetchNodeDefinitions().
- graphUtils.ts: add `custom_node` to FlowNodeData.nodeType and the
  customNode* metadata fields (display, category, inputs, outputs,
  params, param defs); round-trip `custom_node` ↔ backend `type: "node"`
  in graphConfigToFlow / flowToGraphConfig.
- CustomNode.tsx (new): schema-driven React Flow node that renders
  input/output ports with port-type colors and ComfyUI-style param
  widgets (select/boolean/number/text) from `customNodeParamDefs`.
- AddNodeModal: fetch node definitions while open and merge them into
  the catalog under a new "Plugins" category; pass full definition
  metadata through onSelectNodeType extraData.
- useNodeFactories: add `custom_node` to NodeTypeKey / NODE_DEFAULTS
  and route the add flow through extraData so picked definitions land
  in FlowNodeData.
- useGraphPersistence: after import/restore, hydrate every custom_node
  with its definition from /api/v1/nodes/definitions, preserving saved
  `customNodeParams` over definition defaults.
- connectionValidation: when either endpoint is a custom_node, enforce
  port-type equality; built-in node streams stay untyped.
- GraphEditor: register `custom_node: CustomNode` in nodeTypes.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
The graph editor has always called processing units "Nodes" in the UI
(see app.daydream.live/nodes) while the code called them "Pipelines";
this PR then introduced a second unrelated concept also called "Node".
Collapse both into one hierarchy so plugin authors and users see a
single concept.

Design:
- Pipeline becomes `class Pipeline(BaseNode, ABC)` — pure inheritance
  change. Concrete pipeline subclasses (longlive, krea, LTX-2, VACE,
  streamdiffusion, …) touch zero lines. They still only implement
  `__call__` and `get_config_class`.
- Pipeline provides default `get_definition()` (derives a lightweight
  NodeDefinition from the config class) and a no-op `execute()` that
  raises NotImplementedError (pipelines are driven by `__call__` from
  PipelineProcessor; the method only exists to satisfy the abstract
  BaseNode contract).
- NodeRegistry is now the sole storage. Its register() handles both
  plain BaseNode classes (read `node_type_id` classvar) and Pipeline
  subclasses (read `pipeline_id` from the config class).
- PipelineRegistry becomes a filtering view over NodeRegistry._nodes
  that returns only Pipeline subclasses. Every pre-existing call site
  (`list_pipelines`, `get_config_class`, `chain_produces_*`, `register`,
  `unregister`, `is_registered`, `get`) keeps its old signature and
  semantics, so the server, plugin manager, OSC/DMX docs, workflows
  resolver, and tests all work unchanged.
- register_pipelines plugin hook still forwards through
  `registry.register(pipeline_id, pipeline_class)` which now plants
  into the shared storage. Existing plugins need zero edits.
- register_nodes plugin hook uses the same storage. Both hooks can be
  used interchangeably; new plugins use register_nodes.
- GET /api/v1/nodes/definitions filters pipelines out so the frontend
  add-node catalog's Plugins category only shows plain custom nodes.
  Pipelines continue to be surfaced through the dedicated
  /api/v1/pipelines/schemas endpoint for their rich config panel.
- Frontend: the add-node modal's "Pipeline" catalog entry is renamed
  to "Node", the PipelineNode parameter selector label changes from
  "Pipeline" to "Model", and the context menu's "Pipeline" entry
  becomes "Node". The word "pipeline" disappears from the user-facing
  UI.

Backwards compatibility:
- Existing pipeline subclasses don't change.
- Existing register_pipelines plugins don't change.
- Existing PipelineRegistry.* call sites don't change.
- Existing `"type": "pipeline"` saved workflows still load.
- GraphNode.type keeps both "pipeline" and "node" literals.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
Collapse pipeline and plain-node discovery into one canonical endpoint.
``GET /api/v1/nodes/definitions`` now returns every entry in the
unified ``NodeRegistry``: pipelines (Pipeline subclasses) carry the
full ``get_schema_with_metadata()`` output as ``pipeline_meta``, plain
custom nodes leave ``pipeline_meta`` ``None``. One endpoint, one shape,
one source of truth.

- core/nodes/base.py: add ``pipeline_meta: dict | None`` field to
  NodeDefinition.
- core/pipelines/interface.py: Pipeline.get_definition() populates
  pipeline_meta from get_config_class().get_schema_with_metadata().
- server/app.py:
  * /api/v1/nodes/definitions stops filtering pipelines and returns
    everything.
  * /api/v1/pipelines/schemas becomes a thin compat alias that
    derives its (legacy-shape) response by iterating
    NodeRegistry.get_all_definitions() and pulling pipeline_meta from
    entries whose pipeline_meta is set. Existing usePipelines.ts and
    cloud-mode proxy callers keep working without migration.
- frontend/lib/api.ts: NodeDefinitionDto gains the optional
  pipeline_meta field.
- frontend/components/graph/AddNodeModal.tsx: when populating the
  Plugins category from the unified endpoint, filter out entries
  where pipeline_meta != null. Pipelines are still added through the
  hardcoded "Pipeline" placeholder + dropdown UX, so this keeps the
  existing modal behavior unchanged while the backend is unified.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
Remove widget-specific fields (``min_value``, ``max_value``, ``step``,
``options``) from ``NodeParam`` and put them in a free-form ``ui``
dict instead. The base schema no longer grows when new widget kinds
appear; the frontend renderer dispatches on ``param_type`` and reads
whichever ``ui`` keys apply (``min``/``max``/``step`` for number,
``options`` for select, etc.).

- core/nodes/base.py: drop ``min_value``/``max_value``/``step``/
  ``options`` fields; add ``ui: dict[str, Any] | None`` with
  documentation of the conventional keys.
- frontend lib/api.ts: ``NodeParamDef`` mirrors the new shape —
  the typed widget fields go away, ``ui`` takes their place.
- frontend CustomNode.tsx: number widget reads ``p.ui?.min`` /
  ``p.ui?.max`` / ``p.ui?.step``; select widget reads
  ``p.ui?.options``; boolean and text widgets unchanged.

Plugins that construct ``NodeParam`` are migrated to the new shape
separately — ACEStep's ``bridge.py`` needs a coordinated update, and
Scope's audio builtins (added in the execution PR) do too. Both get
the generic shape: ``ui={"min": …, "max": …, "step": …}`` for
numbers, ``ui={"options": [...]}`` for selects.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
Narrow _derive_node_type_id to only catch ImportError around the
circular-import dodge, so real bugs (broken config class, typo in
pipeline_id) surface instead of silently failing registration.

Route PipelineRegistry.register through NodeRegistry.register so
built-in pipelines share the same id-derivation and debug log path
as plugin nodes, and assert pipeline_id matches the config class's
pipeline_id to catch drift between the registry key and the
definition.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Rafał Leszko <rafal@livepeer.org>
- Reuse NodePortDef/NodeParamDef types from api.ts in FlowNodeData;
  drop local ad-hoc cast in CustomNode so widget ui hints type-check.
- Route PipelineRegistry through NodeRegistry's public get/unregister/
  list_node_types API instead of poking the private _nodes dict.
- Thread an AbortController through hydrateCustomNodeDefinitions so
  stale /api/v1/nodes/definitions fetches (rapid reloads, unmount)
  can't overwrite newer setNodes state; switch the helper to the
  typed fetchNodeDefinitions and drop the any casts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Rafał Leszko <rafal@livepeer.org>
Add the execution layer for the backend node abstraction so plugin
nodes and built-ins run inside the pipeline graph alongside
PipelineProcessors. This closes the gap between the metadata layer
introduced in d209e3b and real execution.

Node framework:
- core/nodes/base.py: BaseNode gains abstract `execute(inputs, **kwargs)`
  and a ComfyUI-style `IS_CHANGED(**kwargs)` cache-key classmethod.
- core/nodes/processor.py: NodeProcessor adapter runs a BaseNode on a
  worker thread, feeds it from per-port input queues, fans outputs
  to downstream stream queues, and forwards audio outputs to a
  dedicated audio_output_queue for FrameProcessor.get_audio().
- core/nodes/loader.py: ComfyUI-style scanner for
  ~/.daydream-scope/custom_nodes/*/ exporting NODE_CLASS_MAPPINGS,
  with SCOPE_CUSTOM_NODES_DIR env override.
- core/nodes/builtins/: AudioSourceNode (WAV → 48kHz chunks,
  continuous=True) and AudioSinkNode (looping terminal audio) as
  generic built-ins so the framework is usable out of the box.
- core/nodes/__init__.py: register built-ins + load_and_register_local_nodes.
- core/pipelines/registry.py: call load_and_register_local_nodes during
  bootstrap so local custom-node packs are picked up next to plugins.

Graph execution:
- server/graph_executor.py: create NodeProcessor for type="node" graph
  nodes, use maxsize=1 queues for node↔node stream edges, mark
  audio_input_ports on both processor kinds, skip dedicated sink
  queues for audio sink edges, fall back to the last audio-emitting
  custom node as sink_processor for node-only graphs, and validate
  node ports via NodeRegistry definitions.
- server/pipeline_processor.py: expose audio_input_ports set and fan
  audio outputs onto graph "audio" stream queues so pipeline audio
  flows into downstream custom nodes.

Session/WebRTC plumbing:
- server/frame_processor.py: allow start() to proceed when no
  pipeline_ids are present but the graph contains type="node" nodes,
  so node-only DAGs don't trip the "No pipeline IDs provided" guard.
- server/mcp_router.py: accept node-only graphs in /session/start
  (skip pipeline-required check, skip load_pipelines, set expect_audio
  when the graph has audio-emitting custom nodes).
- server/app.py: skip the "pipeline not loaded" precondition on the
  WebRTC offer when the incoming graph has type="node" nodes.
- server/webrtc.py: add `_graph_produces_audio(graph_data)` that
  inspects custom-node definitions and audio-kind edges, and use it
  in handle_offer as a fallback when no loaded pipeline reports
  produces_audio, so node-only audio graphs get a real audio track.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
Two small fixes so Workflow Builder's Play button can start a graph
that contains only custom nodes (no pipeline nodes):

- Gate the pipeline-load step on loadItems.length > 0 so node-only
  graphs don't fail with "No pipeline load items provided" /
  "Failed to load pipeline, cannot start stream".
- Force initialParameters.produces_audio=true when the graph has
  any type="node" nodes, so the loaded pipeline's status (which
  only reflects registry pipelines, not custom-node audio output)
  can't stop the backend from creating an audio track.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
Audio edges into a Sink node are already served through the session's
audio_output_queue (see graph_executor.py audio-edge branch), so the
AudioSinkNode builtin was only acting as an invisible pass-through —
plus a policy-loaded "loop the last chunk forever" fallback. That loop
turned out to be a leaky abstraction: it silently kept the WebRTC audio
track alive even when the upstream generator had finished, which
surprises users and is inconsistent with how Record / NDI handle EOS.

Remove the node and let the regular Sink terminate audio graphs. When
the generator stops, the track falls back to silence through the
existing AudioProcessingTrack path; users re-trigger generation to
hear the output again.

- core/nodes/builtins/audio_io.py: delete AudioSinkNode
- core/nodes/builtins/__init__.py + core/nodes/__init__.py: drop
  registration and re-export
- core/nodes/processor.py: refresh stale docstring
- server/graph_executor.py: refresh stale comment

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
Make py-spy dumps readable: each NodeProcessor's worker thread is
now named NodeProcessor[<node_id>] instead of the default Thread-N.
Pure diagnostic — no behavior change.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
VAE decoders (e.g. ACEStep VAEDecodeAudio) return audio with a
leading batch dim of shape (1, C, T). AudioProcessingTrack's
channel/interleave path expects (C, T): when it sees ndim==3 the
mono/stereo test misreads the layout and the Fortran-order ravel
produces garbled samples that sound like slowed-down playback.

Drop a leading singleton batch dim in NodeProcessor._route_audio
before the tensor lands on the audio queue so the track always sees
a 2D (channels, samples) tensor.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
IS_CHANGED was copied from ComfyUI speculatively and never wired up:
no node in core, built-ins, or any plugin (ACEStep included)
overrides it, so it always returns None. The cache-skip check in
NodeProcessor was therefore reducing to

    _cached_outputs is not None
    and None == None
    and not inputs
    and not self._continuous

which is equivalent to the expression without the change-key term.
Drop the hook from BaseNode, drop the call and _last_change_key
bookkeeping from NodeProcessor, and tighten the cache-skip check.
Nodes that genuinely depend on external state (file mtime, wall
clock) are already handled by marking them continuous=True and
returning {} when there's nothing new to emit.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
…eline

BaseNode.execute used to carry a NotImplementedError stub body so
that Pipeline subclasses (LongLive, LTX2, etc.) could inherit
BaseNode without having to implement execute themselves. That put
the "this method should never be called" guard on the wrong class:
BaseNode's contract became "every node has an execute, but you
might get NotImplementedError".

Make execute @AbstractMethod on BaseNode so every non-Pipeline node
is forced to implement it, and move the NotImplementedError stub
onto Pipeline. Concrete Pipeline subclasses inherit that stub, so
nothing changes for them. The end result: BaseNode's contract now
honestly says "every node has an execute", and the "Pipelines are
invoked via __call__, not execute" exception lives on Pipeline
where it belongs.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
Let the backend decide whether a graph produces audio. The existing
_graph_produces_audio() in server/webrtc.py walks the graph, looks
up each custom node's definition, and checks its declared output
ports — it's both more accurate (only flips produces_audio when
there's actually an audio output) and the authoritative source of
truth. The frontend guess was firing on any type=='node' entry,
including video-only custom nodes, and is redundant with the
backend inspection.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
Signed-off-by: Rafal Leszko <rafal@livepeer.org>
src/scope/core/nodes/loader.py was speculative: it scanned
~/.daydream-scope/custom_nodes/ for Python modules exporting
NODE_CLASS_MAPPINGS, ComfyUI-style. Nothing ships content into
that directory, nothing documents it, nothing tests it, and the
plugin system (entry-point packaging) is what ACEStep and every
other custom-node producer actually uses — same end result
(NodeRegistry.register), more mature path.

Delete the loader, its `load_and_register_local_nodes` wrapper
in core/nodes/__init__.py, and the bootstrap call in
core/pipelines/registry.py. If someone later wants a "drop a
Python file in a directory" dev workflow we can re-add 90 lines
of directory walking easily.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
Signed-off-by: Rafal Leszko <rafal@livepeer.org>
Signed-off-by: Rafal Leszko <rafal@livepeer.org>
The "pick the last custom node as the sink" branch fired only when
a graph had zero Sink nodes AND zero Pipeline nodes — a shape no
workflow actually builds. The graph editor inserts a Sink by default
and every real workflow (ACEStep audio cover included) terminates
on one. Drop the fallback so a graph with no terminating Sink fails
cleanly at frame_processor startup instead of being silently routed
through whichever custom node happens to be last in definition order.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
Undo the @AbstractMethod on BaseNode.execute from the Option 2
refactor. Two earlier commits dropped Pipeline.execute — which is
the right end state, but leaving BaseNode.execute abstract meant
every concrete Pipeline subclass (LongLive, LTX2, StreamDiffusionV2,
etc.) failed to instantiate because it inherited an abstract
execute that it never needed to implement.

Bring back the NotImplementedError default body on BaseNode and
drop the abstract decorator. Plain backend nodes still override
execute; Pipeline subclasses inherit the default and never reach
it because PipelineProcessor drives them via __call__.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
audio_io.py:
- drop unused self._output_mode field (set but never read)
- drop self.config.get(...) fallback chain in execute(); the graph
  executor never constructs nodes with a config kwarg, so self.config
  is always {} and the fallback defaults (including an inconsistent
  duration=60.0 vs the definition default 15.0) never fire
- inline the _wrap() static helper at its two callers
- simplify _resolve_path: drop the importlib.metadata distributions
  loop that walked every installed plugin's private dist._path to find
  bundled assets. Real workflows pass absolute paths, the convenience
  was undocumented, and the traversal used a private API. Keep the
  absolute → cwd → ~/.daydream-scope/assets fallback chain.

processor.py:
- drop self.pipeline_id (no external reader — every grep hit is a
  PipelineProcessor.pipeline_id or a GraphNode.pipeline_id)
- drop self.is_prepared (never read or written on NodeProcessor;
  PipelineProcessor uses it internally for its prepare lifecycle)
- drop self.native_fps and inline get_fps() to return the 30.0
  constant (native_fps was never assigned)
- drop the @Property pipeline alias (only caller in graph_executor
  filters out node-type processors before reaching it, so the alias
  is never dereferenced on a NodeProcessor instance)
- replace self._cached_outputs (used only for truthiness) with a
  boolean self._has_executed flag
- simplify the non-continuous input-gather branch: drop the dead
  defensive except-queue.Empty path (queues are single-consumer, the
  fallback was both unreachable and silently dropped already-consumed
  inputs on the floor if it ever fired), use a dict comprehension

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
Add SchedulerNode — a time-based trigger sequencer — as a built-in on
top of the node execution runtime introduced in the foundation PR.

The scheduler is a pull-based node (continuous=True, uses the
BaseNode.execute() contract from the foundation). It maintains a
~200 Hz internal clock on its own daemon thread, fires named triggers
at configured time points, and emits incrementing counters per
trigger so downstream nodes never miss a firing.

- Static inputs: start / reset (both trigger)
- Static outputs: tick (number), elapsed (seconds), is_playing
- Dynamic outputs: one per unique trigger port_name in the
  `triggers` parameter, emitting a counter each firing.
- Supports loop + duration parameters for repeating sequences.

No changes to the node abstraction, NodeProcessor, graph executor,
or frontend — this is purely one extra class registered via
`register_builtin_nodes()`.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
Route the frontend scheduler node through the backend NodeProcessor so
it actually drives downstream execution when placed in workflow builder.
Previously the backend SchedulerNode was registered but unreachable —
the frontend kept scheduler in FRONTEND_ONLY_TYPES and emitted it as a
pipeline node with no pipeline_id, silently dropping the node.

Backend fixes on SchedulerNode:
- Collapse _pending_lock into the single _lock to remove the AB/BA
  ordering hazard between the timer thread and execute().
- Edge-trigger start/reset on strict counter increments so stale values
  on the input queue don't retoggle actions, mirroring the frontend.
- Return {} when nothing changed and drop the busy-loop sleep; let the
  NodeProcessor's built-in idle handle pacing instead of flooding
  downstream queues at ~100 Hz.
- Declare triggers/loop/duration as NodeParam entries.
- Override get_dynamic_output_ports so the graph executor accepts edges
  from per-trigger ports that aren't in the static outputs list.
- Implement shutdown() to join the timer thread when the graph stops.
- Use (port, time) tuple keys instead of float-to-str round-trip.

Supporting changes:
- BaseNode: add get_dynamic_output_ports() and shutdown() hooks.
- graph_executor._validate_edge_ports: union static + dynamic outputs
  so dynamic ports pass validation.
- NodeProcessor.stop(): call node.shutdown() during teardown.
- Frontend graphUtils: drop scheduler from FRONTEND_ONLY_TYPES, emit it
  as type=node/node_type_id=scheduler with params, and rehydrate it
  back into the native scheduler nodeType on graph load so the bespoke
  widget survives a round-trip.
- AddNodeModal: skip scheduler in the discovered Plugins list to avoid
  duplicating the existing catalog entry.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Rafał Leszko <rafal@livepeer.org>
Seed prev start/reset counters at 0 so the first positive pulse from
upstream fires instead of being silently latched. Stage is_playing into
pending outputs on both pause and auto-start so downstream sees the
transition without waiting for the elapsed heartbeat. Replace the
prev-counter-is-None auto-start guard with an explicit _auto_start_done
flag so a stale zero on start/reset can't re-arm it.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Rafał Leszko <rafal@livepeer.org>
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 20, 2026

Important

Review skipped

Auto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 3c9530fd-0f28-437a-9492-9785dac812db

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch rafal/node-backend-execution-2-execution-scheduler-pr-to-main

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link
Copy Markdown
Contributor

🚀 fal.ai Preview Deployment

App ID daydream/scope-pr-963--preview
WebSocket wss://fal.run/daydream/scope-pr-963--preview/ws
Commit b012b66

Livepeer Runner

App ID daydream/scope-livepeer-pr-963--preview
WebSocket wss://fal.run/daydream/scope-livepeer-pr-963--preview/ws
Auth private

Testing Livepeer Mode

SCOPE_CLOUD_MODE=livepeer SCOPE_CLOUD_APP_ID="daydream/scope-livepeer-pr-963--preview/ws" uv run daydream-scope

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant