Skip to content

feat(graph): streaming audio ring buffer + dynamic-port gate fixes#976

Open
ryanontheinside wants to merge 43 commits intorafal/acestep-2from
ryanontheinside/acestep
Open

feat(graph): streaming audio ring buffer + dynamic-port gate fixes#976
ryanontheinside wants to merge 43 commits intorafal/acestep-2from
ryanontheinside/acestep

Conversation

@ryanontheinside
Copy link
Copy Markdown
Collaborator

Stacked on #856. See commit message for details.

leszko and others added 30 commits April 14, 2026 07:47
Introduce the fine-grained backend node framework shared by the
execution-scheduler and ACEStep branches:

- src/scope/core/nodes/: BaseNode ABC, NodeDefinition / NodePort /
  NodeParam Pydantic schemas, and NodeRegistry keyed by node_type_id.
- plugins: new `register_nodes` pluggy hookspec + PluginManager
  `register_plugin_nodes` hook caller; bootstrap both built-in and
  plugin nodes from pipelines registry initialization.
- graph_schema: allow `type="node"` GraphNode with `node_type_id`
  and `params`, plus a `get_backend_node_ids()` helper and a
  validation error when node_type_id is missing.
- app: add `GET /api/v1/nodes/definitions` for frontend discovery
  of registered node types.

BaseNode intentionally only requires `get_definition()`; execution
contracts (push vs. pull) are layered by the specialized branches.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
Surface backend node types on the frontend via the new
GET /api/v1/nodes/definitions endpoint:

- api.ts: extend GraphNode with `node_type_id`/`params` and add
  NodeDefinitionDto + fetchNodeDefinitions().
- graphUtils.ts: add `custom_node` to FlowNodeData.nodeType and the
  customNode* metadata fields (display, category, inputs, outputs,
  params, param defs); round-trip `custom_node` ↔ backend `type: "node"`
  in graphConfigToFlow / flowToGraphConfig.
- CustomNode.tsx (new): schema-driven React Flow node that renders
  input/output ports with port-type colors and ComfyUI-style param
  widgets (select/boolean/number/text) from `customNodeParamDefs`.
- AddNodeModal: fetch node definitions while open and merge them into
  the catalog under a new "Plugins" category; pass full definition
  metadata through onSelectNodeType extraData.
- useNodeFactories: add `custom_node` to NodeTypeKey / NODE_DEFAULTS
  and route the add flow through extraData so picked definitions land
  in FlowNodeData.
- useGraphPersistence: after import/restore, hydrate every custom_node
  with its definition from /api/v1/nodes/definitions, preserving saved
  `customNodeParams` over definition defaults.
- connectionValidation: when either endpoint is a custom_node, enforce
  port-type equality; built-in node streams stay untyped.
- GraphEditor: register `custom_node: CustomNode` in nodeTypes.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
The graph editor has always called processing units "Nodes" in the UI
(see app.daydream.live/nodes) while the code called them "Pipelines";
this PR then introduced a second unrelated concept also called "Node".
Collapse both into one hierarchy so plugin authors and users see a
single concept.

Design:
- Pipeline becomes `class Pipeline(BaseNode, ABC)` — pure inheritance
  change. Concrete pipeline subclasses (longlive, krea, LTX-2, VACE,
  streamdiffusion, …) touch zero lines. They still only implement
  `__call__` and `get_config_class`.
- Pipeline provides default `get_definition()` (derives a lightweight
  NodeDefinition from the config class) and a no-op `execute()` that
  raises NotImplementedError (pipelines are driven by `__call__` from
  PipelineProcessor; the method only exists to satisfy the abstract
  BaseNode contract).
- NodeRegistry is now the sole storage. Its register() handles both
  plain BaseNode classes (read `node_type_id` classvar) and Pipeline
  subclasses (read `pipeline_id` from the config class).
- PipelineRegistry becomes a filtering view over NodeRegistry._nodes
  that returns only Pipeline subclasses. Every pre-existing call site
  (`list_pipelines`, `get_config_class`, `chain_produces_*`, `register`,
  `unregister`, `is_registered`, `get`) keeps its old signature and
  semantics, so the server, plugin manager, OSC/DMX docs, workflows
  resolver, and tests all work unchanged.
- register_pipelines plugin hook still forwards through
  `registry.register(pipeline_id, pipeline_class)` which now plants
  into the shared storage. Existing plugins need zero edits.
- register_nodes plugin hook uses the same storage. Both hooks can be
  used interchangeably; new plugins use register_nodes.
- GET /api/v1/nodes/definitions filters pipelines out so the frontend
  add-node catalog's Plugins category only shows plain custom nodes.
  Pipelines continue to be surfaced through the dedicated
  /api/v1/pipelines/schemas endpoint for their rich config panel.
- Frontend: the add-node modal's "Pipeline" catalog entry is renamed
  to "Node", the PipelineNode parameter selector label changes from
  "Pipeline" to "Model", and the context menu's "Pipeline" entry
  becomes "Node". The word "pipeline" disappears from the user-facing
  UI.

Backwards compatibility:
- Existing pipeline subclasses don't change.
- Existing register_pipelines plugins don't change.
- Existing PipelineRegistry.* call sites don't change.
- Existing `"type": "pipeline"` saved workflows still load.
- GraphNode.type keeps both "pipeline" and "node" literals.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
Collapse pipeline and plain-node discovery into one canonical endpoint.
``GET /api/v1/nodes/definitions`` now returns every entry in the
unified ``NodeRegistry``: pipelines (Pipeline subclasses) carry the
full ``get_schema_with_metadata()`` output as ``pipeline_meta``, plain
custom nodes leave ``pipeline_meta`` ``None``. One endpoint, one shape,
one source of truth.

- core/nodes/base.py: add ``pipeline_meta: dict | None`` field to
  NodeDefinition.
- core/pipelines/interface.py: Pipeline.get_definition() populates
  pipeline_meta from get_config_class().get_schema_with_metadata().
- server/app.py:
  * /api/v1/nodes/definitions stops filtering pipelines and returns
    everything.
  * /api/v1/pipelines/schemas becomes a thin compat alias that
    derives its (legacy-shape) response by iterating
    NodeRegistry.get_all_definitions() and pulling pipeline_meta from
    entries whose pipeline_meta is set. Existing usePipelines.ts and
    cloud-mode proxy callers keep working without migration.
- frontend/lib/api.ts: NodeDefinitionDto gains the optional
  pipeline_meta field.
- frontend/components/graph/AddNodeModal.tsx: when populating the
  Plugins category from the unified endpoint, filter out entries
  where pipeline_meta != null. Pipelines are still added through the
  hardcoded "Pipeline" placeholder + dropdown UX, so this keeps the
  existing modal behavior unchanged while the backend is unified.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
Remove widget-specific fields (``min_value``, ``max_value``, ``step``,
``options``) from ``NodeParam`` and put them in a free-form ``ui``
dict instead. The base schema no longer grows when new widget kinds
appear; the frontend renderer dispatches on ``param_type`` and reads
whichever ``ui`` keys apply (``min``/``max``/``step`` for number,
``options`` for select, etc.).

- core/nodes/base.py: drop ``min_value``/``max_value``/``step``/
  ``options`` fields; add ``ui: dict[str, Any] | None`` with
  documentation of the conventional keys.
- frontend lib/api.ts: ``NodeParamDef`` mirrors the new shape —
  the typed widget fields go away, ``ui`` takes their place.
- frontend CustomNode.tsx: number widget reads ``p.ui?.min`` /
  ``p.ui?.max`` / ``p.ui?.step``; select widget reads
  ``p.ui?.options``; boolean and text widgets unchanged.

Plugins that construct ``NodeParam`` are migrated to the new shape
separately — ACEStep's ``bridge.py`` needs a coordinated update, and
Scope's audio builtins (added in the execution PR) do too. Both get
the generic shape: ``ui={"min": …, "max": …, "step": …}`` for
numbers, ``ui={"options": [...]}`` for selects.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
Narrow _derive_node_type_id to only catch ImportError around the
circular-import dodge, so real bugs (broken config class, typo in
pipeline_id) surface instead of silently failing registration.

Route PipelineRegistry.register through NodeRegistry.register so
built-in pipelines share the same id-derivation and debug log path
as plugin nodes, and assert pipeline_id matches the config class's
pipeline_id to catch drift between the registry key and the
definition.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Rafał Leszko <rafal@livepeer.org>
- Reuse NodePortDef/NodeParamDef types from api.ts in FlowNodeData;
  drop local ad-hoc cast in CustomNode so widget ui hints type-check.
- Route PipelineRegistry through NodeRegistry's public get/unregister/
  list_node_types API instead of poking the private _nodes dict.
- Thread an AbortController through hydrateCustomNodeDefinitions so
  stale /api/v1/nodes/definitions fetches (rapid reloads, unmount)
  can't overwrite newer setNodes state; switch the helper to the
  typed fetchNodeDefinitions and drop the any casts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Rafał Leszko <rafal@livepeer.org>
Add the execution layer for the backend node abstraction so plugin
nodes and built-ins run inside the pipeline graph alongside
PipelineProcessors. This closes the gap between the metadata layer
introduced in d209e3b and real execution.

Node framework:
- core/nodes/base.py: BaseNode gains abstract `execute(inputs, **kwargs)`
  and a ComfyUI-style `IS_CHANGED(**kwargs)` cache-key classmethod.
- core/nodes/processor.py: NodeProcessor adapter runs a BaseNode on a
  worker thread, feeds it from per-port input queues, fans outputs
  to downstream stream queues, and forwards audio outputs to a
  dedicated audio_output_queue for FrameProcessor.get_audio().
- core/nodes/loader.py: ComfyUI-style scanner for
  ~/.daydream-scope/custom_nodes/*/ exporting NODE_CLASS_MAPPINGS,
  with SCOPE_CUSTOM_NODES_DIR env override.
- core/nodes/builtins/: AudioSourceNode (WAV → 48kHz chunks,
  continuous=True) and AudioSinkNode (looping terminal audio) as
  generic built-ins so the framework is usable out of the box.
- core/nodes/__init__.py: register built-ins + load_and_register_local_nodes.
- core/pipelines/registry.py: call load_and_register_local_nodes during
  bootstrap so local custom-node packs are picked up next to plugins.

Graph execution:
- server/graph_executor.py: create NodeProcessor for type="node" graph
  nodes, use maxsize=1 queues for node↔node stream edges, mark
  audio_input_ports on both processor kinds, skip dedicated sink
  queues for audio sink edges, fall back to the last audio-emitting
  custom node as sink_processor for node-only graphs, and validate
  node ports via NodeRegistry definitions.
- server/pipeline_processor.py: expose audio_input_ports set and fan
  audio outputs onto graph "audio" stream queues so pipeline audio
  flows into downstream custom nodes.

Session/WebRTC plumbing:
- server/frame_processor.py: allow start() to proceed when no
  pipeline_ids are present but the graph contains type="node" nodes,
  so node-only DAGs don't trip the "No pipeline IDs provided" guard.
- server/mcp_router.py: accept node-only graphs in /session/start
  (skip pipeline-required check, skip load_pipelines, set expect_audio
  when the graph has audio-emitting custom nodes).
- server/app.py: skip the "pipeline not loaded" precondition on the
  WebRTC offer when the incoming graph has type="node" nodes.
- server/webrtc.py: add `_graph_produces_audio(graph_data)` that
  inspects custom-node definitions and audio-kind edges, and use it
  in handle_offer as a fallback when no loaded pipeline reports
  produces_audio, so node-only audio graphs get a real audio track.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
Two small fixes so Workflow Builder's Play button can start a graph
that contains only custom nodes (no pipeline nodes):

- Gate the pipeline-load step on loadItems.length > 0 so node-only
  graphs don't fail with "No pipeline load items provided" /
  "Failed to load pipeline, cannot start stream".
- Force initialParameters.produces_audio=true when the graph has
  any type="node" nodes, so the loaded pipeline's status (which
  only reflects registry pipelines, not custom-node audio output)
  can't stop the backend from creating an audio track.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
Audio edges into a Sink node are already served through the session's
audio_output_queue (see graph_executor.py audio-edge branch), so the
AudioSinkNode builtin was only acting as an invisible pass-through —
plus a policy-loaded "loop the last chunk forever" fallback. That loop
turned out to be a leaky abstraction: it silently kept the WebRTC audio
track alive even when the upstream generator had finished, which
surprises users and is inconsistent with how Record / NDI handle EOS.

Remove the node and let the regular Sink terminate audio graphs. When
the generator stops, the track falls back to silence through the
existing AudioProcessingTrack path; users re-trigger generation to
hear the output again.

- core/nodes/builtins/audio_io.py: delete AudioSinkNode
- core/nodes/builtins/__init__.py + core/nodes/__init__.py: drop
  registration and re-export
- core/nodes/processor.py: refresh stale docstring
- server/graph_executor.py: refresh stale comment

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
Make py-spy dumps readable: each NodeProcessor's worker thread is
now named NodeProcessor[<node_id>] instead of the default Thread-N.
Pure diagnostic — no behavior change.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
VAE decoders (e.g. ACEStep VAEDecodeAudio) return audio with a
leading batch dim of shape (1, C, T). AudioProcessingTrack's
channel/interleave path expects (C, T): when it sees ndim==3 the
mono/stereo test misreads the layout and the Fortran-order ravel
produces garbled samples that sound like slowed-down playback.

Drop a leading singleton batch dim in NodeProcessor._route_audio
before the tensor lands on the audio queue so the track always sees
a 2D (channels, samples) tensor.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
IS_CHANGED was copied from ComfyUI speculatively and never wired up:
no node in core, built-ins, or any plugin (ACEStep included)
overrides it, so it always returns None. The cache-skip check in
NodeProcessor was therefore reducing to

    _cached_outputs is not None
    and None == None
    and not inputs
    and not self._continuous

which is equivalent to the expression without the change-key term.
Drop the hook from BaseNode, drop the call and _last_change_key
bookkeeping from NodeProcessor, and tighten the cache-skip check.
Nodes that genuinely depend on external state (file mtime, wall
clock) are already handled by marking them continuous=True and
returning {} when there's nothing new to emit.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
…eline

BaseNode.execute used to carry a NotImplementedError stub body so
that Pipeline subclasses (LongLive, LTX2, etc.) could inherit
BaseNode without having to implement execute themselves. That put
the "this method should never be called" guard on the wrong class:
BaseNode's contract became "every node has an execute, but you
might get NotImplementedError".

Make execute @AbstractMethod on BaseNode so every non-Pipeline node
is forced to implement it, and move the NotImplementedError stub
onto Pipeline. Concrete Pipeline subclasses inherit that stub, so
nothing changes for them. The end result: BaseNode's contract now
honestly says "every node has an execute", and the "Pipelines are
invoked via __call__, not execute" exception lives on Pipeline
where it belongs.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
Let the backend decide whether a graph produces audio. The existing
_graph_produces_audio() in server/webrtc.py walks the graph, looks
up each custom node's definition, and checks its declared output
ports — it's both more accurate (only flips produces_audio when
there's actually an audio output) and the authoritative source of
truth. The frontend guess was firing on any type=='node' entry,
including video-only custom nodes, and is redundant with the
backend inspection.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
Signed-off-by: Rafal Leszko <rafal@livepeer.org>
src/scope/core/nodes/loader.py was speculative: it scanned
~/.daydream-scope/custom_nodes/ for Python modules exporting
NODE_CLASS_MAPPINGS, ComfyUI-style. Nothing ships content into
that directory, nothing documents it, nothing tests it, and the
plugin system (entry-point packaging) is what ACEStep and every
other custom-node producer actually uses — same end result
(NodeRegistry.register), more mature path.

Delete the loader, its `load_and_register_local_nodes` wrapper
in core/nodes/__init__.py, and the bootstrap call in
core/pipelines/registry.py. If someone later wants a "drop a
Python file in a directory" dev workflow we can re-add 90 lines
of directory walking easily.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
Signed-off-by: Rafal Leszko <rafal@livepeer.org>
Signed-off-by: Rafal Leszko <rafal@livepeer.org>
The "pick the last custom node as the sink" branch fired only when
a graph had zero Sink nodes AND zero Pipeline nodes — a shape no
workflow actually builds. The graph editor inserts a Sink by default
and every real workflow (ACEStep audio cover included) terminates
on one. Drop the fallback so a graph with no terminating Sink fails
cleanly at frame_processor startup instead of being silently routed
through whichever custom node happens to be last in definition order.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
Undo the @AbstractMethod on BaseNode.execute from the Option 2
refactor. Two earlier commits dropped Pipeline.execute — which is
the right end state, but leaving BaseNode.execute abstract meant
every concrete Pipeline subclass (LongLive, LTX2, StreamDiffusionV2,
etc.) failed to instantiate because it inherited an abstract
execute that it never needed to implement.

Bring back the NotImplementedError default body on BaseNode and
drop the abstract decorator. Plain backend nodes still override
execute; Pipeline subclasses inherit the default and never reach
it because PipelineProcessor drives them via __call__.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
audio_io.py:
- drop unused self._output_mode field (set but never read)
- drop self.config.get(...) fallback chain in execute(); the graph
  executor never constructs nodes with a config kwarg, so self.config
  is always {} and the fallback defaults (including an inconsistent
  duration=60.0 vs the definition default 15.0) never fire
- inline the _wrap() static helper at its two callers
- simplify _resolve_path: drop the importlib.metadata distributions
  loop that walked every installed plugin's private dist._path to find
  bundled assets. Real workflows pass absolute paths, the convenience
  was undocumented, and the traversal used a private API. Keep the
  absolute → cwd → ~/.daydream-scope/assets fallback chain.

processor.py:
- drop self.pipeline_id (no external reader — every grep hit is a
  PipelineProcessor.pipeline_id or a GraphNode.pipeline_id)
- drop self.is_prepared (never read or written on NodeProcessor;
  PipelineProcessor uses it internally for its prepare lifecycle)
- drop self.native_fps and inline get_fps() to return the 30.0
  constant (native_fps was never assigned)
- drop the @Property pipeline alias (only caller in graph_executor
  filters out node-type processors before reaching it, so the alias
  is never dereferenced on a NodeProcessor instance)
- replace self._cached_outputs (used only for truthiness) with a
  boolean self._has_executed flag
- simplify the non-continuous input-gather branch: drop the dead
  defensive except-queue.Empty path (queues are single-consumer, the
  fallback was both unreachable and silently dropped already-consumed
  inputs on the floor if it ever fired), use a dict comprehension

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
When a graph has only custom nodes (no pipeline nodes), the frontend
was failing with "No pipeline load items provided". Three fixes:

1. Frontend: skip loadPipeline step when loadItems is empty (node-only
   graphs don't need pipeline loading)
2. Frontend: for node-only graphs, set produces_audio=true so the
   backend creates an AudioProcessingTrack
3. Backend: FrameProcessor.start() allows empty pipeline_ids when a
   graph config is provided (node-only execution)
4. Backend: WebRTC offer handler respects explicit produces_audio hint
   from frontend for node-only graphs

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Rafal Leszko <rafal@livepeer.org>
- Relax graph validator: sink node not required for node-only graphs
- Graph executor: find sink processor from last audio-outputting custom
  node when no sink/pipeline nodes exist
- Remove unnecessary Sink node from ACEStep workflow — AudioSink is
  the terminal node that feeds WebRTC audio directly

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Rafal Leszko <rafal@livepeer.org>
Add stream:audio input handle to SinkNode so audio edges from custom
nodes (like AudioSink) render correctly and create the WebRTC audio
track. The Sink node is required for WebRTC output — it creates the
peer connection that streams audio to the browser.

Restored Sink node in ACEStep workflow with the audio_output → output
edge that feeds the WebRTC audio track.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Rafal Leszko <rafal@livepeer.org>
…imported params

Two fixes for audio playback in Workflow Builder:

1. Frontend always sets produces_audio=true when the graph contains
   custom node types, regardless of the loaded pipeline's status.
   Previously longlive's produces_audio=false was overriding the
   graph's audio capability.

2. Workflow import enrichment now merges definition defaults WITH
   the imported params (keeping baked-in values like file_id path)
   instead of overwriting them with empty defaults.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Rafal Leszko <rafal@livepeer.org>
When an audio track arrives without video (audio-only workflow like
ACEStep), automatically unmute the Sink node so the user hears audio
immediately without needing to click the unmute button.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Rafal Leszko <rafal@livepeer.org>
The <video> element with zero dimensions doesn't reliably play audio
in Chrome. Add a separate <audio autoPlay> element when the stream
has audio but no video tracks. This ensures audio playback works
for audio-only workflows like ACEStep.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Rafal Leszko <rafal@livepeer.org>
Use a persistent hidden <audio ref> element that's always in the DOM.
Attach the remote stream and call play() explicitly when the stream
arrives. This avoids Chrome's autoplay policy issues where a
dynamically-created <audio> element gets blocked.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Rafal Leszko <rafal@livepeer.org>
this avoids soundfile dep, but that might be preferable

Signed-off-by: RyanOnTheInside <7623207+ryanontheinside@users.noreply.github.com>
leszko and others added 13 commits April 14, 2026 11:34
The persistent <audio> element added for reliable WebRTC audio
playback was never wired to the Sink node's mute button: only the
(hidden) <video> element was being muted. In audio-only workflows
the video element is invisible, so clicking mute had no audible
effect — audio kept playing through the unmuted <audio> element.

Bind muted={isMuted} on the audio element and update
audioRef.current.muted in the mute-effect alongside
videoRef.current.muted so both elements track the button state.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
Remove two near-identical blocks that forced produces_audio=true
whenever the graph contained any type="node" entry. The backend
already inspects the graph in webrtc._graph_produces_audio() and
flips produces_audio itself when it sees an audio-emitting custom
node — the frontend guess is redundant and, since it triggered on
*any* custom node, was also firing for video-only custom nodes.
Leave the decision to the backend which actually knows each node's
declared output ports.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
The frontend used to send produces_audio=true explicitly for
node-only graphs; that override was removed and the backend's
_graph_produces_audio() now handles the ACEStep case directly.
The remaining `initial_parameters.get("produces_audio")` check in
handle_offer has no caller that sends True: the frontend only
passes through the loaded pipeline's registry flag, which
PipelineRegistry.chain_produces_audio(pipeline_ids) already
reports one line up. Drop the dead third check.

The cloud relay path (handle_offer_with_relay) still reads
produces_audio from initial_parameters because the cloud pipeline
may not be registered locally — that path is untouched.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
Non-continuous custom nodes (ACEStep's TextEncode, DiffusionConfig,
Generate, etc.) execute exactly once when all their inputs arrive,
then sit idle waiting for new inputs that never come in a batch
DAG. Parameter changes routed through NodeProcessor.update_parameters
used to just mutate self.parameters silently — the worker never
re-ran, so tweaks the user made in the UI never reached the output.

Fix:
- Cache the last consumed inputs dict as self._last_inputs after each
  successful execution.
- Flip a self._needs_rerun flag in update_parameters().
- In _process_once(), when a non-continuous node has no new inputs
  but _needs_rerun is set, replay the cached inputs against the
  current parameters and route the refreshed output downstream.

Continuous nodes (AudioSource streaming, scheduler) already re-read
self.parameters every tick, so the flag is cleared for them too but
otherwise harmless. Source nodes treat _needs_rerun the same way
as non-source nodes — they re-emit with fresh params.

Unit smoke test: Node(y=x*scale), initial scale=2, input=5 → output
10; update_parameters({"scale": 3.0}) → next tick replays input=5
with new scale → output 15; subsequent idle ticks do not re-run.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
CustomNode.tsx only called updateData({customNodeParams: {...}}) on
each input change — that updates local React Flow state but never
reaches the running stream. PipelineNode has an onParameterChange
callback for this same job, injected by nodeEnrichment.ts for nodes
of type "pipeline" only; custom nodes were never wired up.

- nodeEnrichment.ts: add a "custom_node" branch that injects
  onParameterChange (same handleNodeParameterChange used by
  PipelineNode) and isStreaming.
- CustomNode.tsx: extract a setParam(name, value) helper that
  updates local data AND calls onParameterChange?.(id, name, value).
  All four widget onChange handlers now route through it, which
  also de-duplicates the four nearly-identical updateData calls.

The handleNodeParameterChange callback forwards through
sendParameterUpdate → WebRTC data channel → FrameProcessor.
update_parameters → _processors_by_node_id[node_id].
update_parameters. Combined with the NodeProcessor rerun fix,
tweaks to custom-node parameters now reflect in the running output.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
The previous "replay cached inputs on param update" commit fixed
the single-node case but not a DAG: when a downstream node sees a
fresh output from the re-run node on one port, its other ports
are still empty (consumed on the first run), so the
all-queues-non-empty wait would deadlock and the change never
propagated past one hop.

Restructure input gathering on non-continuous nodes as a latch
pattern:

- First run still requires every input port to have received data
  at least once, so _last_inputs is fully populated before we start
  falling back to cached values.
- Subsequent runs fire when EITHER any port has fresh data in its
  queue OR update_parameters() flagged _needs_rerun. For each port,
  consume the fresh value if available, otherwise reuse the cached
  value from _last_inputs.

This makes a parameter change cascade down the DAG correctly:
re-run emits to downstream queues, downstream sees fresh on the
affected port and latched values on the rest, re-runs, cascades
further, and so on.

Also clean up the decision tree: is_source / continuous / normal
are now three disjoint branches at the top of _process_once
instead of scattered checks after the input-gather step, and the
previous `consume_rerun` bookkeeping is gone since _needs_rerun is
simply cleared after every run.

Verified with a small two-node DAG unit smoke test that covers:
upstream param change propagating to downstream; downstream-only
param change using fully latched inputs; and idle ticks not
re-executing.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
AudioSourceNode is continuous=True because in mode=stream it needs
to be called every tick to emit the next 100ms chunk. In mode=full
that same setting was causing it to re-push the entire clip on
every worker tick. The old non-continuous-latch behaviour masked
this as backpressure (downstream queues filled, route_outputs
blocked). With the new latch-fallback pattern downstream no longer
deadlocks on empty queues, so those redundant emissions now
cascade through the whole ACEStep DAG and the audio track gets
flooded with 15s clips at ~1500× real-time.

Track the (file, mode=full) pair that's already been emitted and
return {} on subsequent ticks until either the file changes or the
mode flips back to stream. Stream mode clears the flag so switching
back to full later re-emits the clip once.

Smoke test: first full call emits, second full call returns {},
stream call emits a chunk and clears the flag, full again emits,
full again returns {}.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
New boolean param on AudioSourceNode, default false. The generic
CustomNode.tsx renderer already handles param_type="boolean" as a
checkbox, so no frontend changes are needed — the checkbox appears
automatically and dispatches live updates via the handleNode
ParameterChange / _needs_rerun path.

Behavior by mode:

- stream + loop=true : wrap at end of clip (previous default)
- stream + loop=false: emit chunks until end, then {} forever
- full   + loop=false: emit the clip once per (file, mode) pair
                       (unchanged from the previous one-shot fix)
- full   + loop=true : re-emit the full clip once every clip-duration
                       seconds on the wall clock, so downstream
                       (e.g. ACEStep) re-processes at roughly real
                       time without flooding the audio track

Unit smoke-tested all four combinations.

Signed-off-by: Rafal Leszko <rafal@livepeer.org>
Three related fixes that together stop audio from being dropped,
distorted, or truncated in graphs built from custom nodes (exposed
by the ACEStep workflow).

1. NodeProcessor.update_parameters only sets _needs_rerun when the
incoming dict touches a declared parameter AND the value differs
from the current one. FrameProcessor broadcasts global updates
(no node_id) to every processor, so the previous unconditional
flag caused one global tweak to cascade a full DAG rerun on every
custom node. On ACEStep, the frontend's four stream-start globals
(quantize_mode, modulations, beat_cache_reset_rate, input_source)
turned into four rapid audio_decode emissions once torch.compile
warmed up (~500ms/run), piling 5.76M samples into
AudioProcessingTrack's 60s buffer in under a second.

2. Real backpressure on the audio output path. AudioProcessingTrack
was greedily draining the whole audio_output_queue into its local
deque every recv() tick, so upstream never saw backpressure and a
silent drop-and-warn fallback was capping the buffer.
- audio_track.recv() now drains lazily: pulls only until the local
  buffer has enough samples for the next 20ms frame.
- NodeProcessor.audio_output_queue is shrunk from maxsize=10 to
  maxsize=1 and _route_audio uses a blocking put-with-retry loop.
  A stall cascades upstream through every node-to-node edge queue
  (already maxsize=1 blocking), rate-limiting batch generators to
  playback speed instead of racing ahead.

3. Only route audio to audio_output_queue when the port is wired
to a sink. The previous code used audio_output_ports (derived from
node definition), so every audio-producing node — including
intermediate ones like AudioSource (audio → audio_encode) — pushed
each chunk into its own audio_output_queue. Nothing drains those
intermediate queues; before (2) the put_nowait drop hid it, after
(2) the blocking retry deadlocked the AudioSource worker after the
second loop iteration and broke continuous playback with
loop=true. Replaced with a runtime audio_sink_ports set populated
by graph_executor at the exact spot where it already handles
audio-to-sink edges.

Verified:
- Synthetic NodeProcessor + AudioSource(loop=true, duration=3s):
  4 chunks emitted at t≈0/3/6/9 over 10s.
- ACEStep workflow with loop=true: continuous playback, zero
  "Audio buffer overflow" warnings.
- 30 burst parameter updates across three nodes: zero overflow
  warnings.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Rafal Leszko <rafal@livepeer.org>
Adds a pacing param with 'realtime' (default, unchanged) and 'downstream'
options. In downstream mode the wall-clock gate is skipped and the source
is rate-limited by the maxsize=1 edge queue backpressure, so batch/offline
pipelines run at GPU speed instead of 1x clip duration.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Rafal Leszko <rafal@livepeer.org>
Non-continuous NodeProcessors previously re-fired on every fresh-port
arrival and latched the rest from cache, which could double or triple
GPU work per upstream cycle when sibling producers finished at
different times.

The new gate drains fresh values into a pending buffer (so upstream
producers still unblock immediately) and only fires once every dynamic
input port — the ports whose upstream is transitively reachable from a
continuous source — has caught up. Static ports (one-shot handles like
MODEL/VAE/CONFIG) still latch from cache; parameter changes still
bypass the gate via _needs_rerun.

graph_executor propagates the "dynamic" marking forward from
continuous=True nodes and populates dynamic_input_ports on each
NodeProcessor at build time. Empty set falls back to the legacy
"fire on any fresh input" behaviour, so static-only subgraphs and
unit tests built without the executor are unaffected.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Rafal Leszko <rafal@livepeer.org>
Signed-off-by: RyanOnTheInside <7623207+ryanontheinside@users.noreply.github.com>
…ocessingTrack

torch.Tensor.numpy() raises TypeError on half-precision tensors, which killed the
audio track on the first chunk from pipelines whose VAE decoder returns bfloat16
(e.g. ACE-Step). The browser heard initial silence while the queue was empty,
then the track ended as soon as real audio arrived.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Rafal Leszko <rafal@livepeer.org>
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 22, 2026

Important

Review skipped

Auto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 83aa0d62-f80f-424b-947b-43053d575577

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch ryanontheinside/acestep

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds unified backend “node” support alongside pipelines, enabling audio-only/custom-node graphs to stream audio reliably and exposing a node-definition catalog for the frontend to render custom nodes.

Changes:

  • Backend: introduce NodeRegistry + NodeProcessor, extend graph schema/executor to run custom nodes and support audio edges/backpressure.
  • Backend/RTC: improve audio detection for node-only graphs; adjust audio track buffering; add node-definition discovery endpoint and plugin registration for nodes.
  • Frontend: add custom_node rendering/hydration, typed port validation for custom nodes, and improve audio-only playback via a dedicated <audio> element + sink audio handle.

Reviewed changes

Copilot reviewed 32 out of 32 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
src/scope/server/webrtc.py Detect audio-producing graphs that use custom nodes; add optional inbound param tracing.
src/scope/server/pipeline_processor.py Add audio edge fan-out; introduce (currently unused) audio-input port bookkeeping.
src/scope/server/mcp_router.py Allow node-only graphs and set expect_audio based on custom-node outputs.
src/scope/server/graph_schema.py Add type="node" graph nodes with node_type_id + params; validate structure.
src/scope/server/graph_executor.py Build/run graphs with NodeProcessor; dynamic-port gating; sink audio edge handling.
src/scope/server/frame_processor.py Allow node-only graphs to start; add inbound param tracing.
src/scope/server/audio_track.py Implement lazy audio draining/backpressure; add global playhead lookup helper.
src/scope/server/app.py Derive pipeline schemas via NodeRegistry; add /api/v1/nodes/definitions.
src/scope/core/plugins/manager.py Add unified plugin node registration (nodes + pipelines) with compat aliases.
src/scope/core/plugins/hookspecs.py Add register_nodes hookspec.
src/scope/core/plugins/init.py Export register_plugin_nodes.
src/scope/core/pipelines/registry.py Make PipelineRegistry a filtered view over NodeRegistry; init built-in nodes.
src/scope/core/pipelines/interface.py Make Pipeline a BaseNode; provide get_definition() projection.
src/scope/core/nodes/* Add node base types, registry, processor, and built-in AudioSourceNode.
frontend/src/pages/StreamPage.tsx Skip pipeline-load / pipeline_ids for node-only graphs.
frontend/src/lib/graphUtils.ts Add custom-node serialization + flow conversion support.
frontend/src/lib/api.ts Add DTOs + fetcher for node definitions; extend GraphNode for type="node".
frontend/src/components/graph/utils/nodeEnrichment.ts Wire parameter-change handler into custom nodes.
frontend/src/components/graph/utils/connectionValidation.ts Enforce port-type matching for custom-node stream connections.
frontend/src/components/graph/nodes/SinkNode.tsx Add audio element for audio-only playback; add stream:audio sink handle.
frontend/src/components/graph/nodes/PipelineNode.tsx Rename some UI labels (“Pipeline” → “Model”); display name tweak.
frontend/src/components/graph/nodes/CustomNode.tsx New custom node renderer with typed ports and simple param widgets.
frontend/src/components/graph/hooks/node/useNodeFactories.ts Add factory support for creating custom_node nodes.
frontend/src/components/graph/hooks/graph/useGraphPersistence.ts Hydrate custom nodes via /api/v1/nodes/definitions with abort support.
frontend/src/components/graph/contextMenuItems.tsx Rename “Pipeline” menu item to “Node”.
frontend/src/components/graph/GraphEditor.tsx Register custom_node type.
frontend/src/components/graph/AddNodeModal.tsx Populate catalog with plugin custom nodes from /api/v1/nodes/definitions.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +31 to +49
# Registry of live audio tracks so graph nodes that need the playhead (e.g.
# ACE-Step's StreamVAEDecode skip gate, which mirrors the realtime demo's
# ``audio_eng.position / SAMPLE_RATE`` read in pipeline.py) can query it
# without a hard dependency on FrameProcessor. Weak refs so closed tracks
# drop out automatically.
_PLAYHEAD_LOCK = threading.Lock()
_PLAYHEAD_TRACKS: "weakref.WeakSet[AudioProcessingTrack]" = weakref.WeakSet()


def get_current_playhead_seconds() -> float | None:
"""Return the playhead position of the first live audio track, in seconds.

Returns None if no track is registered yet or none are live. Callers
should treat None as "skip gate disabled this tick".
"""
with _PLAYHEAD_LOCK:
for track in _PLAYHEAD_TRACKS:
if track.readyState == "live":
return track.playhead_seconds
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_current_playhead_seconds() returns the playhead from the first live AudioProcessingTrack in a global WeakSet. The server supports multiple concurrent WebRTC sessions, so this can return a playhead from a different user/session and make any playhead-driven gating nondeterministic under concurrency.

To avoid cross-session leakage, scope playhead lookup to a specific session/FrameProcessor (e.g. store the track on the Session / FrameProcessor and query via that), or key the registry by session/connection id and require callers to pass that key.

Suggested change
# Registry of live audio tracks so graph nodes that need the playhead (e.g.
# ACE-Step's StreamVAEDecode skip gate, which mirrors the realtime demo's
# ``audio_eng.position / SAMPLE_RATE`` read in pipeline.py) can query it
# without a hard dependency on FrameProcessor. Weak refs so closed tracks
# drop out automatically.
_PLAYHEAD_LOCK = threading.Lock()
_PLAYHEAD_TRACKS: "weakref.WeakSet[AudioProcessingTrack]" = weakref.WeakSet()
def get_current_playhead_seconds() -> float | None:
"""Return the playhead position of the first live audio track, in seconds.
Returns None if no track is registered yet or none are live. Callers
should treat None as "skip gate disabled this tick".
"""
with _PLAYHEAD_LOCK:
for track in _PLAYHEAD_TRACKS:
if track.readyState == "live":
return track.playhead_seconds
# Registry of live audio tracks. Weak refs ensure closed tracks drop out
# automatically. Playhead reads must be scoped to a specific track; callers
# must not query an arbitrary live track from this process-global registry.
_PLAYHEAD_LOCK = threading.Lock()
_PLAYHEAD_TRACKS: "weakref.WeakSet[AudioProcessingTrack]" = weakref.WeakSet()
def get_current_playhead_seconds(
track: "AudioProcessingTrack | None",
) -> float | None:
"""Return the playhead position for the specified live audio track.
Returns None if ``track`` is None or not live. Callers should pass the
session-specific ``AudioProcessingTrack`` instance they own and treat
None as "skip gate disabled this tick".
"""
if track is None:
return None
with _PLAYHEAD_LOCK:
if track.readyState == "live" and track in _PLAYHEAD_TRACKS:
return track.playhead_seconds

Copilot uses AI. Check for mistakes.
sink_ports = getattr(feeder_proc, "audio_sink_ports", None)
if sink_ports is not None:
sink_ports.add(e.from_port)
break
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the sink-wiring loop, encountering an audio edge triggers a break, which stops processing any other stream edges into the same sink. If a sink has both video and audio edges (or if the audio edge appears first in edges_to() order), this will skip creating the dedicated video sink queue and can break WebRTC preview / hardware sinks for that sink.

Instead of breaking out of the edge loop on audio edges, skip only the queue-allocation for that edge (e.g. continue) and still allow other (video) edges to be processed; only break once the sink’s video queue(s) have been successfully attached.

Suggested change
break
continue

Copilot uses AI. Check for mistakes.
Comment on lines +87 to +90
# Input ports that carry audio (tensor, sample_rate) tuples instead
# of video frame tensors. Set by graph_executor when wiring audio
# edges so audio inputs don't get accumulated into a video chunk.
self.audio_input_ports: set[str] = set()
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

audio_input_ports is introduced as a way to mark ports that carry (audio_tensor, sample_rate) tuples, but PipelineProcessor never uses it. As a result, if a graph wires a stream edge carrying audio into a pipeline input port (e.g. to_port == "audio"), process_chunk() will still treat that queue as a video frame queue (qsize >= chunk_size + ensure_video_packet(...)), which will misbehave or crash when it receives audio tuples.

Either implement audio-port handling in PipelineProcessor (exclude audio_input_ports from chunk accumulation and pass audio values through separately), or explicitly disallow audio stream edges into pipeline nodes at validation time until support is complete.

Copilot uses AI. Check for mistakes.
Comment on lines +164 to +168
value={Number(val)}
min={(p.ui?.min as number | undefined) ?? undefined}
max={(p.ui?.max as number | undefined) ?? undefined}
step={(p.ui?.step as number | undefined) ?? undefined}
onChange={e => setParam(p.name, Number(e.target.value))}
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The numeric input uses value={Number(val)} where val falls back to "" when unset. Number("") becomes 0, so an unset numeric param will render as 0 and can get persisted/sent back as an explicit 0, overriding the backend/defaults unintentionally.

Consider keeping the input controlled with an empty string when the value is unset/invalid (and only coercing to a number on commit), or initialize customNodeParams with numeric defaults so val is never "" for number params.

Suggested change
value={Number(val)}
min={(p.ui?.min as number | undefined) ?? undefined}
max={(p.ui?.max as number | undefined) ?? undefined}
step={(p.ui?.step as number | undefined) ?? undefined}
onChange={e => setParam(p.name, Number(e.target.value))}
value={
val === "" ||
val == null ||
(typeof val === "string" && val.trim() === "") ||
Number.isNaN(Number(val))
? ""
: String(val)
}
min={(p.ui?.min as number | undefined) ?? undefined}
max={(p.ui?.max as number | undefined) ?? undefined}
step={(p.ui?.step as number | undefined) ?? undefined}
onChange={e =>
setParam(
p.name,
e.target.value === "" ? "" : e.target.valueAsNumber,
)
}

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants