From 87b6ec344e124aa57ed1db0d9263c2009f5f2131 Mon Sep 17 00:00:00 2001 From: Rafal Leszko Date: Fri, 10 Apr 2026 09:41:00 +0000 Subject: [PATCH 01/43] feat(backend): add node abstraction base classes and discovery Introduce the fine-grained backend node framework shared by the execution-scheduler and ACEStep branches: - src/scope/core/nodes/: BaseNode ABC, NodeDefinition / NodePort / NodeParam Pydantic schemas, and NodeRegistry keyed by node_type_id. - plugins: new `register_nodes` pluggy hookspec + PluginManager `register_plugin_nodes` hook caller; bootstrap both built-in and plugin nodes from pipelines registry initialization. - graph_schema: allow `type="node"` GraphNode with `node_type_id` and `params`, plus a `get_backend_node_ids()` helper and a validation error when node_type_id is missing. - app: add `GET /api/v1/nodes/definitions` for frontend discovery of registered node types. BaseNode intentionally only requires `get_definition()`; execution contracts (push vs. pull) are layered by the specialized branches. Signed-off-by: Rafal Leszko --- src/scope/core/nodes/__init__.py | 32 ++++++++ src/scope/core/nodes/base.py | 110 +++++++++++++++++++++++++++ src/scope/core/nodes/registry.py | 53 +++++++++++++ src/scope/core/pipelines/registry.py | 22 +++++- src/scope/core/plugins/__init__.py | 2 + src/scope/core/plugins/hookspecs.py | 14 ++++ src/scope/core/plugins/manager.py | 24 ++++++ src/scope/server/app.py | 20 +++++ src/scope/server/graph_schema.py | 25 +++++- 9 files changed, 295 insertions(+), 7 deletions(-) create mode 100644 src/scope/core/nodes/__init__.py create mode 100644 src/scope/core/nodes/base.py create mode 100644 src/scope/core/nodes/registry.py diff --git a/src/scope/core/nodes/__init__.py b/src/scope/core/nodes/__init__.py new file mode 100644 index 000000000..c10025633 --- /dev/null +++ b/src/scope/core/nodes/__init__.py @@ -0,0 +1,32 @@ +"""Backend node system for Scope. + +Provides a base class and registry for defining fine-grained processing +nodes that can be wired into pipeline graphs. Nodes are simpler than full +pipelines — they declare typed input/output ports, editable parameters, +and a small execution contract. Built-in nodes and custom nodes (from +plugins or local packs) are discovered here and rendered generically by +the frontend via ``GET /api/v1/nodes/definitions``. +""" + +from .base import BaseNode, NodeDefinition, NodeParam, NodePort +from .registry import NodeRegistry + + +def register_builtin_nodes() -> None: + """Register all built-in node types. + + This is a no-op on branches that do not ship any built-in nodes; the + specialized branches (execution-scheduler, ACEStep) override or extend + this list. + """ + return None + + +__all__ = [ + "BaseNode", + "NodeDefinition", + "NodeParam", + "NodePort", + "NodeRegistry", + "register_builtin_nodes", +] diff --git a/src/scope/core/nodes/base.py b/src/scope/core/nodes/base.py new file mode 100644 index 000000000..5271dafa3 --- /dev/null +++ b/src/scope/core/nodes/base.py @@ -0,0 +1,110 @@ +"""Base classes for the Scope node system. + +Nodes are lightweight, fine-grained processing units that can be wired into +pipeline graphs alongside pipelines. Each node type declares typed input/ +output ports and editable parameters, and subclasses implement their own +execution contract (which may differ between execution backends). + +This module intentionally keeps ``BaseNode`` minimal — only a class-level +identifier and a ``get_definition()`` classmethod are required. Concrete +execution backends (graph executor integration, event-driven runtime, etc.) +layer their own abstract methods on top. +""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import Any, ClassVar, Literal + +from pydantic import BaseModel, Field + + +class NodePort(BaseModel): + """Describes an input or output port on a node.""" + + name: str = Field(..., description="Port identifier (used in edge wiring)") + port_type: str = Field( + ..., + description=( + "Type of data carried by this port. Built-in types: " + "'audio', 'video', 'number', 'string', 'boolean', 'trigger'. " + "Plugins may define custom types (e.g. 'latent', 'model')." + ), + ) + required: bool = Field(default=True, description="Whether this input is required") + description: str = Field(default="", description="Human-readable description") + default_value: Any = Field(default=None, description="Default value for inputs") + + +class NodeParam(BaseModel): + """Describes an editable parameter (widget) on a node. + + Parameters are user-configurable values that live on the node card. + Like ComfyUI widgets, a parameter may be overridden by connecting an + incoming wire to the corresponding input port — the widget then + becomes an input and the default value is ignored. + """ + + name: str = Field(..., description="Parameter identifier") + param_type: Literal["number", "string", "boolean", "select"] = Field( + ..., description="Widget type for the frontend" + ) + default: Any = Field(default=None, description="Default value") + description: str = Field(default="", description="Human-readable label") + min_value: float | None = Field(default=None, description="Minimum (for number)") + max_value: float | None = Field(default=None, description="Maximum (for number)") + step: float | None = Field(default=None, description="Step size (for number)") + options: list[str] | None = Field( + default=None, description="Options (for select dropdowns)" + ) + convertible_to_input: bool = Field( + default=True, + description=( + "If True, this parameter can be overridden by connecting an " + "input wire (ComfyUI-style widget-to-input conversion)." + ), + ) + + +class NodeDefinition(BaseModel): + """Static metadata describing a node type.""" + + node_type_id: str = Field(..., description="Unique node type identifier") + display_name: str = Field(..., description="Human-readable name") + category: str = Field(default="general", description="Category for grouping") + description: str = Field(default="", description="What this node does") + inputs: list[NodePort] = Field(default_factory=list) + outputs: list[NodePort] = Field(default_factory=list) + params: list[NodeParam] = Field( + default_factory=list, + description="Editable parameters (widgets) displayed on the node card.", + ) + continuous: bool = Field( + default=False, + description=( + "If True, source nodes (no inputs) re-execute continuously " + "instead of executing once. Useful for streaming generators." + ), + ) + + +class BaseNode(ABC): + """Abstract base class for all backend node types. + + Subclasses must set ``node_type_id`` as a ``ClassVar`` and implement + ``get_definition()``. Execution contracts (e.g. ``execute(inputs)`` for + pull-based execution, or ``setup(emit_output) / update_input(...)`` for + event-driven execution) are defined by concrete execution backends and + not by this base class. + """ + + node_type_id: ClassVar[str] + + def __init__(self, node_id: str, config: dict[str, Any] | None = None): + self.node_id = node_id + self.config = config or {} + + @classmethod + @abstractmethod + def get_definition(cls) -> NodeDefinition: + """Return static metadata for this node type.""" diff --git a/src/scope/core/nodes/registry.py b/src/scope/core/nodes/registry.py new file mode 100644 index 000000000..b4a4d5b06 --- /dev/null +++ b/src/scope/core/nodes/registry.py @@ -0,0 +1,53 @@ +"""Registry for custom node types.""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from .base import BaseNode, NodeDefinition + +logger = logging.getLogger(__name__) + + +class NodeRegistry: + """Central registry for all available node types. + + Keyed by ``node_type_id`` (read from the registered class). + """ + + _nodes: dict[str, type[BaseNode]] = {} + + @classmethod + def register(cls, node_class: type[BaseNode]) -> None: + node_type_id = node_class.node_type_id + cls._nodes[node_type_id] = node_class + logger.debug("Registered node type: %s", node_type_id) + + @classmethod + def get(cls, node_type_id: str) -> type[BaseNode] | None: + return cls._nodes.get(node_type_id) + + @classmethod + def is_registered(cls, node_type_id: str) -> bool: + return node_type_id in cls._nodes + + @classmethod + def list_node_types(cls) -> list[str]: + return list(cls._nodes.keys()) + + @classmethod + def get_all_definitions(cls) -> list[NodeDefinition]: + return [nc.get_definition() for nc in cls._nodes.values()] + + @classmethod + def unregister(cls, node_type_id: str) -> bool: + if node_type_id in cls._nodes: + del cls._nodes[node_type_id] + return True + return False + + @classmethod + def clear(cls) -> None: + cls._nodes.clear() diff --git a/src/scope/core/pipelines/registry.py b/src/scope/core/pipelines/registry.py index 030f7a3d1..53de455ca 100644 --- a/src/scope/core/pipelines/registry.py +++ b/src/scope/core/pipelines/registry.py @@ -246,28 +246,44 @@ def _register_pipelines(): def _initialize_registry(): - """Initialize registry with built-in pipelines and plugins.""" + """Initialize registry with built-in pipelines, nodes, and plugins.""" # Register built-in pipelines first _register_pipelines() - # Load and register plugin pipelines + # Register built-in nodes (no-op on the base abstraction branch) + from scope.core.nodes import register_builtin_nodes + + register_builtin_nodes() + + # Load and register plugin pipelines and plugin nodes try: from scope.core.plugins import ( ensure_plugins_installed, load_plugins, + register_plugin_nodes, register_plugin_pipelines, ) ensure_plugins_installed() load_plugins() register_plugin_pipelines(PipelineRegistry) + + from scope.core.nodes.registry import NodeRegistry + + register_plugin_nodes(NodeRegistry) except Exception as e: logger.error( f"Failed to load plugins: {e}. Built-in pipelines are still available." ) pipeline_count = len(PipelineRegistry.list_pipelines()) - logger.info(f"Registry initialized with {pipeline_count} pipeline(s)") + from scope.core.nodes.registry import NodeRegistry + + node_count = len(NodeRegistry.list_node_types()) + logger.info( + f"Registry initialized with {pipeline_count} pipeline(s) and " + f"{node_count} node(s)" + ) # Auto-register pipelines on module import diff --git a/src/scope/core/plugins/__init__.py b/src/scope/core/plugins/__init__.py index a681707d7..652f85883 100644 --- a/src/scope/core/plugins/__init__.py +++ b/src/scope/core/plugins/__init__.py @@ -14,6 +14,7 @@ get_plugin_manager, load_plugins, pm, + register_plugin_nodes, register_plugin_pipelines, ) @@ -22,6 +23,7 @@ "ensure_plugins_installed", "load_plugins", "pm", + "register_plugin_nodes", "register_plugin_pipelines", "get_plugin_manager", "FailedPluginInfo", diff --git a/src/scope/core/plugins/hookspecs.py b/src/scope/core/plugins/hookspecs.py index 62ce4f672..2c0aa78b7 100644 --- a/src/scope/core/plugins/hookspecs.py +++ b/src/scope/core/plugins/hookspecs.py @@ -22,3 +22,17 @@ def register_pipelines(self, register): def register_pipelines(register): register(MyPipeline) """ + + @hookspec + def register_nodes(self, register): + """Register custom node types. + + Args: + register: Callback to register node classes. + Usage: register(NodeClass) + + Example: + @scope.core.hookimpl + def register_nodes(register): + register(MyCustomNode) + """ diff --git a/src/scope/core/plugins/manager.py b/src/scope/core/plugins/manager.py index 87ee5a48a..ad8cb15f6 100644 --- a/src/scope/core/plugins/manager.py +++ b/src/scope/core/plugins/manager.py @@ -575,6 +575,21 @@ def register_callback(pipeline_class: Any) -> None: # Update pipeline-to-plugin mapping by checking which plugins provide which pipelines self._update_pipeline_plugin_mapping(registry) + def register_plugin_nodes(self, registry: Any) -> None: + """Call ``register_nodes`` hook for all plugins. + + Args: + registry: ``NodeRegistry`` to register nodes with + """ + with self._lock: + + def register_callback(node_class: Any) -> None: + node_type_id = node_class.node_type_id + registry.register(node_class) + logger.info(f"Registered plugin node: {node_type_id}") + + self._pm.hook.register_nodes(register=register_callback) + def _update_pipeline_plugin_mapping(self, registry: "PipelineRegistry") -> None: """Update the mapping of pipeline IDs to plugin names.""" from importlib.metadata import distributions @@ -1665,3 +1680,12 @@ def register_plugin_pipelines(registry: "PipelineRegistry") -> None: registry: PipelineRegistry to register pipelines with """ get_plugin_manager().register_plugin_pipelines(registry) + + +def register_plugin_nodes(registry: Any) -> None: + """Call ``register_nodes`` hook for all plugins. + + Args: + registry: ``NodeRegistry`` to register nodes with + """ + get_plugin_manager().register_plugin_nodes(registry) diff --git a/src/scope/server/app.py b/src/scope/server/app.py index 470771551..5ce2599b9 100644 --- a/src/scope/server/app.py +++ b/src/scope/server/app.py @@ -864,6 +864,26 @@ async def get_pipeline_schemas( return response +# --------------------------------------------------------------------------- +# Node definitions +# --------------------------------------------------------------------------- + + +@app.get("/api/v1/nodes/definitions") +async def get_node_definitions(): + """Return definitions for all registered backend node types. + + Each definition includes the ``node_type_id``, ``display_name``, + ``category``, ``description``, ``continuous`` flag, input/output + ports, and editable parameters. The frontend uses this to populate + the add-node catalog and render custom nodes generically. + """ + from scope.core.nodes.registry import NodeRegistry + + definitions = NodeRegistry.get_all_definitions() + return {"nodes": [d.model_dump() for d in definitions]} + + # --------------------------------------------------------------------------- # OSC endpoints # --------------------------------------------------------------------------- diff --git a/src/scope/server/graph_schema.py b/src/scope/server/graph_schema.py index 26f95341f..105016336 100644 --- a/src/scope/server/graph_schema.py +++ b/src/scope/server/graph_schema.py @@ -29,7 +29,7 @@ from __future__ import annotations -from typing import Literal +from typing import Any, Literal from pydantic import BaseModel, Field @@ -41,14 +41,25 @@ class GraphNode(BaseModel): ..., description="Unique node id (e.g. 'input', 'yolo_plugin', 'longlive', 'output')", ) - type: Literal["source", "pipeline", "sink", "record"] = Field( + type: Literal["source", "pipeline", "sink", "record", "node"] = Field( ..., - description="source = external input, pipeline = pipeline instance, sink = output, record = file recorder", + description=( + "source = external input, pipeline = pipeline instance, " + "sink = output, record = file recorder, node = custom backend node" + ), ) pipeline_id: str | None = Field( default=None, description="Pipeline ID (registry key) when type is 'pipeline'", ) + node_type_id: str | None = Field( + default=None, + description="Node type ID (NodeRegistry key) when type is 'node'", + ) + params: dict[str, Any] | None = Field( + default=None, + description="Per-node parameter values for custom nodes", + ) source_mode: str | None = Field( default=None, description="Video source mode for source nodes: 'video', 'camera', 'spout', 'ndi', 'syphon'", @@ -110,6 +121,10 @@ def get_record_node_ids(self) -> list[str]: """Return node ids that are record nodes.""" return [n.id for n in self.nodes if n.type == "record"] + def get_backend_node_ids(self) -> list[str]: + """Return node ids that are backend (custom) nodes.""" + return [n.id for n in self.nodes if n.type == "node"] + def edges_from(self, node_id: str) -> list[GraphEdge]: """Return edges whose source is the given node.""" return [e for e in self.edges if e.from_node == node_id] @@ -145,10 +160,12 @@ def validate_structure(self) -> list[str]: if not self.get_sink_node_ids(): errors.append("Graph must have at least one sink node") - # Pipeline nodes must have pipeline_id + # Pipeline nodes must have pipeline_id; backend nodes need node_type_id for node in self.nodes: if node.type == "pipeline" and not node.pipeline_id: errors.append(f"Pipeline node '{node.id}' is missing pipeline_id") + if node.type == "node" and not node.node_type_id: + errors.append(f"Node '{node.id}' is missing node_type_id") # Edge references must point to existing nodes node_id_set = set(node_ids) From fa24295d7a63fe9be2a1cfcdfc26e3f91f887000 Mon Sep 17 00:00:00 2001 From: Rafal Leszko Date: Fri, 10 Apr 2026 11:09:04 +0000 Subject: [PATCH 02/43] feat(frontend): add custom node catalog and generic renderer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Surface backend node types on the frontend via the new GET /api/v1/nodes/definitions endpoint: - api.ts: extend GraphNode with `node_type_id`/`params` and add NodeDefinitionDto + fetchNodeDefinitions(). - graphUtils.ts: add `custom_node` to FlowNodeData.nodeType and the customNode* metadata fields (display, category, inputs, outputs, params, param defs); round-trip `custom_node` ↔ backend `type: "node"` in graphConfigToFlow / flowToGraphConfig. - CustomNode.tsx (new): schema-driven React Flow node that renders input/output ports with port-type colors and ComfyUI-style param widgets (select/boolean/number/text) from `customNodeParamDefs`. - AddNodeModal: fetch node definitions while open and merge them into the catalog under a new "Plugins" category; pass full definition metadata through onSelectNodeType extraData. - useNodeFactories: add `custom_node` to NodeTypeKey / NODE_DEFAULTS and route the add flow through extraData so picked definitions land in FlowNodeData. - useGraphPersistence: after import/restore, hydrate every custom_node with its definition from /api/v1/nodes/definitions, preserving saved `customNodeParams` over definition defaults. - connectionValidation: when either endpoint is a custom_node, enforce port-type equality; built-in node streams stay untyped. - GraphEditor: register `custom_node: CustomNode` in nodeTypes. Signed-off-by: Rafal Leszko --- .../src/components/graph/AddNodeModal.tsx | 91 ++++++- frontend/src/components/graph/GraphEditor.tsx | 2 + .../graph/hooks/graph/useGraphPersistence.ts | 69 +++++ .../graph/hooks/node/useNodeFactories.ts | 24 +- .../src/components/graph/nodes/CustomNode.tsx | 246 ++++++++++++++++++ .../graph/utils/connectionValidation.ts | 29 ++- frontend/src/lib/api.ts | 54 +++- frontend/src/lib/graphUtils.ts | 83 +++++- 8 files changed, 576 insertions(+), 22 deletions(-) create mode 100644 frontend/src/components/graph/nodes/CustomNode.tsx diff --git a/frontend/src/components/graph/AddNodeModal.tsx b/frontend/src/components/graph/AddNodeModal.tsx index 59e91ed89..4fece1a40 100644 --- a/frontend/src/components/graph/AddNodeModal.tsx +++ b/frontend/src/components/graph/AddNodeModal.tsx @@ -1,4 +1,4 @@ -import { useState, useMemo, useRef, useEffect } from "react"; +import { useState, useMemo, useRef, useEffect, useCallback } from "react"; import { Dialog, DialogContent, @@ -6,6 +6,7 @@ import { DialogTitle, DialogDescription, } from "../ui/dialog"; +import type { FlowNodeData } from "../../lib/graphUtils"; interface AddNodeModalProps { open: boolean; @@ -36,8 +37,10 @@ interface AddNodeModalProps { | "tempo" | "prompt_list" | "prompt_blend" - | "scheduler", - subType?: string + | "scheduler" + | "custom_node", + subType?: string, + extraData?: Partial ) => void; } @@ -67,12 +70,15 @@ interface NodeCatalogItem { | "tempo" | "prompt_list" | "prompt_blend" - | "scheduler"; + | "scheduler" + | "custom_node"; subType?: string; name: string; description: string; color: string; category: string; + /** Full definition for custom nodes (inputs/outputs/params). */ + customNodeDef?: Record; } const NODE_CATALOG: NodeCatalogItem[] = [ @@ -294,7 +300,15 @@ const NODE_CATALOG: NodeCatalogItem[] = [ }, ]; -const CATEGORIES = ["All", "I/O", "Values", "Controls", "UI", "Utility"]; +const CATEGORIES = [ + "All", + "I/O", + "Values", + "Controls", + "UI", + "Utility", + "Plugins", +]; interface TooltipState { text: string; @@ -388,10 +402,38 @@ export function AddNodeModal({ }: AddNodeModalProps) { const [searchText, setSearchText] = useState(""); const [activeCategory, setActiveCategory] = useState("All"); + const [customNodes, setCustomNodes] = useState([]); + + useEffect(() => { + if (!open) return; + fetch("/api/v1/nodes/definitions") + .then(r => r.json()) + .then(data => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const items: NodeCatalogItem[] = (data.nodes ?? []).map((n: any) => ({ + type: "custom_node" as const, + subType: n.node_type_id, + name: n.display_name || n.node_type_id, + description: n.description || "", + color: "#9ca3af", + category: "Plugins", + customNodeDef: n, + })); + setCustomNodes(items); + }) + .catch(() => { + /* ignore — custom nodes just won't appear */ + }); + }, [open]); + + const fullCatalog = useMemo( + () => [...NODE_CATALOG, ...customNodes], + [customNodes] + ); const filteredItems = useMemo(() => { const lowerSearch = searchText.toLowerCase(); - return NODE_CATALOG.filter(item => { + return fullCatalog.filter(item => { const matchesSearch = !lowerSearch || item.name.toLowerCase().includes(lowerSearch) || @@ -400,14 +442,37 @@ export function AddNodeModal({ activeCategory === "All" || item.category === activeCategory; return matchesSearch && matchesCategory; }); - }, [searchText, activeCategory]); + }, [searchText, activeCategory, fullCatalog]); - const handleSelect = (item: NodeCatalogItem) => { - onSelectNodeType(item.type, item.subType); - onClose(); - setSearchText(""); - setActiveCategory("All"); - }; + const handleSelect = useCallback( + (item: NodeCatalogItem) => { + if (item.type === "custom_node" && item.customNodeDef) { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const def = item.customNodeDef as any; + onSelectNodeType("custom_node", item.subType, { + customNodeTypeId: def.node_type_id, + customNodeDisplayName: def.display_name || def.node_type_id, + customNodeCategory: def.category || "", + customNodeInputs: def.inputs || [], + customNodeOutputs: def.outputs || [], + customNodeParamDefs: def.params || [], + customNodeParams: Object.fromEntries( + (def.params || []) + // eslint-disable-next-line @typescript-eslint/no-explicit-any + .filter((p: any) => p.default != null) + // eslint-disable-next-line @typescript-eslint/no-explicit-any + .map((p: any) => [p.name, p.default]) + ), + }); + } else { + onSelectNodeType(item.type, item.subType); + } + onClose(); + setSearchText(""); + setActiveCategory("All"); + }, + [onSelectNodeType, onClose] + ); const handleClose = () => { onClose(); diff --git a/frontend/src/components/graph/GraphEditor.tsx b/frontend/src/components/graph/GraphEditor.tsx index 15446cc80..18b71338a 100644 --- a/frontend/src/components/graph/GraphEditor.tsx +++ b/frontend/src/components/graph/GraphEditor.tsx @@ -55,6 +55,7 @@ import { TempoNode } from "./nodes/TempoNode"; import { PromptListNode } from "./nodes/PromptListNode"; import { PromptBlendNode } from "./nodes/PromptBlendNode"; import { SchedulerNode } from "./nodes/SchedulerNode"; +import { CustomNode } from "./nodes/CustomNode"; import { CustomEdge } from "./CustomEdge"; import { ContextMenu } from "./ContextMenu"; import { AddNodeModal } from "./AddNodeModal"; @@ -130,6 +131,7 @@ const nodeTypes = { prompt_list: PromptListNode, prompt_blend: PromptBlendNode, scheduler: SchedulerNode, + custom_node: CustomNode, }; const edgeTypes = { diff --git a/frontend/src/components/graph/hooks/graph/useGraphPersistence.ts b/frontend/src/components/graph/hooks/graph/useGraphPersistence.ts index 8a2248d31..eb7c0a678 100644 --- a/frontend/src/components/graph/hooks/graph/useGraphPersistence.ts +++ b/frontend/src/components/graph/hooks/graph/useGraphPersistence.ts @@ -65,6 +65,71 @@ function clearGraphFromLocalStorage(): void { } } +/** + * After loading or importing a workflow, fetch `/api/v1/nodes/definitions` + * and hydrate each custom_node flow node with its inputs/outputs/params/ + * display metadata. Saved workflows only persist `node_type_id` and + * `params`, so the port definitions have to be re-attached before render. + * User-supplied param values override the defaults from the definition. + */ +function hydrateCustomNodeDefinitions( + nodes: Node[], + setNodes: React.Dispatch[]>> +): void { + const customFlowNodes = nodes.filter( + n => n.data.nodeType === "custom_node" && !n.data.customNodeInputs + ); + if (customFlowNodes.length === 0) return; + fetch("/api/v1/nodes/definitions") + .then(r => r.json()) + .then(data => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const defs = (data.nodes ?? []) as any[]; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const defMap = new Map( + defs.map(d => [d.node_type_id as string, d]) + ); + setNodes(prev => + prev.map(n => { + if ( + n.data.nodeType !== "custom_node" || + !n.data.customNodeTypeId || + n.data.customNodeInputs + ) { + return n; + } + const def = defMap.get(n.data.customNodeTypeId); + if (!def) return n; + return { + ...n, + data: { + ...n.data, + customNodeDisplayName: def.display_name, + customNodeCategory: def.category, + customNodeInputs: def.inputs ?? [], + customNodeOutputs: def.outputs ?? [], + customNodeParamDefs: def.params ?? [], + customNodeParams: { + ...Object.fromEntries( + (def.params || []) + // eslint-disable-next-line @typescript-eslint/no-explicit-any + .filter((p: any) => p.default != null) + // eslint-disable-next-line @typescript-eslint/no-explicit-any + .map((p: any) => [p.name, p.default] as const) + ), + // User-edited values take precedence over definition defaults + ...(n.data.customNodeParams || {}), + }, + }, + }; + }) + ); + }) + .catch(() => { + // custom nodes just won't be hydrated; render falls back to placeholders + }); +} + interface UseGraphPersistenceArgs { nodes: Node[]; edges: Edge[]; @@ -182,6 +247,8 @@ export function useGraphPersistence({ }, 0); } + hydrateCustomNodeDefinitions(enriched, setNodes); + // Allow async side-effects (e.g. source mode restore) to settle // before re-enabling change notifications. setTimeout(() => { @@ -374,6 +441,8 @@ export function useGraphPersistence({ } }, 0); } + + hydrateCustomNodeDefinitions(enriched, setNodes); }, [ portsMap, diff --git a/frontend/src/components/graph/hooks/node/useNodeFactories.ts b/frontend/src/components/graph/hooks/node/useNodeFactories.ts index 36982fa76..60803a265 100644 --- a/frontend/src/components/graph/hooks/node/useNodeFactories.ts +++ b/frontend/src/components/graph/hooks/node/useNodeFactories.ts @@ -44,7 +44,8 @@ type NodeTypeKey = | "tempo" | "prompt_list" | "prompt_blend" - | "scheduler"; + | "scheduler" + | "custom_node"; interface NodeDefaults { /** The React Flow node `type` */ @@ -470,6 +471,15 @@ const NODE_DEFAULTS: Record = { ], }, }, + custom_node: { + type: "custom_node", + idPrefix: "custom", + defaultX: 300, + data: { + label: "Custom Node", + nodeType: "custom_node" as const, + }, + }, }; interface UseNodeFactoriesArgs { @@ -565,8 +575,10 @@ export function useNodeFactories({ | "tempo" | "prompt_list" | "prompt_blend" - | "scheduler", - subType?: string + | "scheduler" + | "custom_node", + subType?: string, + extraData?: Partial ) => { if (!pendingNodePosition) return; @@ -597,6 +609,12 @@ export function useNodeFactories({ outputSinkType: defaultType, outputSinkName: defaultNames[defaultType] || "Scope", }); + } else if (type === "custom_node") { + addNode("custom_node", pendingNodePosition, { + customNodeTypeId: subType, + label: subType || "Custom Node", + ...extraData, + }); } else { addNode(type as NodeTypeKey, pendingNodePosition); } diff --git a/frontend/src/components/graph/nodes/CustomNode.tsx b/frontend/src/components/graph/nodes/CustomNode.tsx new file mode 100644 index 000000000..393a8e96d --- /dev/null +++ b/frontend/src/components/graph/nodes/CustomNode.tsx @@ -0,0 +1,246 @@ +import { Handle, Position } from "@xyflow/react"; +import type { NodeProps, Node } from "@xyflow/react"; +import type { FlowNodeData } from "../../../lib/graphUtils"; +import { buildHandleId } from "../../../lib/graphUtils"; +import { useNodeData } from "../hooks/node/useNodeData"; +import { useNodeCollapse } from "../hooks/node/useNodeCollapse"; +import { NodeCard, NodeHeader, NodeBody, collapsedHandleStyle } from "../ui"; + +type CustomNodeType = Node; + +/* Port type -> color mapping for custom types */ +const PORT_COLORS: Record = { + audio: "#22c55e", + video: "#eeeeee", + number: "#38bdf8", + string: "#fbbf24", + boolean: "#34d399", + trigger: "#f97316", + latent: "#a855f7", + model: "#f59e0b", + vae: "#f59e0b", + clip: "#f59e0b", + conditioning: "#3b82f6", + semantic_hints: "#06b6d4", + config: "#6b7280", + curve: "#ec4899", + mask: "#ef4444", + lora: "#f472b6", +}; + +function portColor(portType: string): string { + return PORT_COLORS[portType] ?? "#9ca3af"; +} + +export function CustomNode({ id, data, selected }: NodeProps) { + const { updateData } = useNodeData(id); + const { collapsed, toggleCollapse } = useNodeCollapse(); + + const inputs = data.customNodeInputs ?? []; + const outputs = data.customNodeOutputs ?? []; + const params = (data.customNodeParamDefs ?? []) as Array<{ + name: string; + param_type: string; + default?: unknown; + description?: string; + min_value?: number; + max_value?: number; + step?: number; + options?: string[]; + }>; + const displayName = + data.customTitle || + data.customNodeDisplayName || + data.customNodeTypeId || + "Custom Node"; + const category = data.customNodeCategory ?? ""; + + return ( + + updateData({ customTitle: t })} + collapsed={collapsed} + onCollapseToggle={toggleCollapse} + /> + {!collapsed && ( + + {/* Show category badge */} + {category && ( +
+ + {category} + +
+ )} + {/* Show input ports */} + {inputs.length > 0 && ( +
+ {inputs.map(p => ( +
+ + {p.name} + + {p.port_type} + +
+ ))} +
+ )} + {/* Show output ports */} + {outputs.length > 0 && ( +
+ {outputs.map(p => ( +
+ + {p.port_type} + + {p.name} + +
+ ))} +
+ )} + {/* Parameter widgets (ComfyUI-style editable params) */} + {params.length > 0 && ( +
+ {params.map(p => { + const val = data.customNodeParams?.[p.name] ?? p.default ?? ""; + return ( +
+ + {p.description || p.name} + + {p.param_type === "select" && p.options ? ( + + ) : p.param_type === "boolean" ? ( + + updateData({ + customNodeParams: { + ...data.customNodeParams, + [p.name]: e.target.checked, + }, + }) + } + className="accent-blue-500" + /> + ) : p.param_type === "number" ? ( + + updateData({ + customNodeParams: { + ...data.customNodeParams, + [p.name]: Number(e.target.value), + }, + }) + } + /> + ) : ( + + updateData({ + customNodeParams: { + ...data.customNodeParams, + [p.name]: e.target.value, + }, + }) + } + /> + )} +
+ ); + })} +
+ )} +
+ )} + + {/* Input handles (left side) */} + {inputs.map((p, i) => ( + + ))} + + {/* Output handles (right side) */} + {outputs.map((p, i) => ( + 0 ? inputs.length * 18 + 4 : 0) + i * 18}px`, + width: 8, + height: 8, + ...(collapsed ? collapsedHandleStyle : {}), + }} + /> + ))} +
+ ); +} diff --git a/frontend/src/components/graph/utils/connectionValidation.ts b/frontend/src/components/graph/utils/connectionValidation.ts index 1a3bf183c..475bb3a1e 100644 --- a/frontend/src/components/graph/utils/connectionValidation.ts +++ b/frontend/src/components/graph/utils/connectionValidation.ts @@ -302,9 +302,32 @@ export function validateConnection( return true; } - // Stream ↔ stream always ok - if (sourceParsed.kind === "stream" && targetParsed.kind === "stream") - return true; + // Stream ↔ stream: for custom_node edges, enforce port-type matching + // against the node's declared inputs/outputs. For built-in source / + // pipeline / sink nodes, streams are untyped (video) and always ok. + if (sourceParsed.kind === "stream" && targetParsed.kind === "stream") { + const sourceNode = nodes.find(n => n.id === connection.source); + const targetNode = nodes.find(n => n.id === connection.target); + if (!sourceNode || !targetNode) return true; + const srcIsCustom = sourceNode.data.nodeType === "custom_node"; + const tgtIsCustom = targetNode.data.nodeType === "custom_node"; + if (!srcIsCustom && !tgtIsCustom) return true; + + const srcType = srcIsCustom + ? sourceNode.data.customNodeOutputs?.find( + p => p.name === sourceParsed.name + )?.port_type + : undefined; + const tgtType = tgtIsCustom + ? targetNode.data.customNodeInputs?.find( + p => p.name === targetParsed.name + )?.port_type + : undefined; + + // If we can't look up both types, allow (assume compatible). + if (!srcType || !tgtType) return true; + return srcType === tgtType; + } // Param ↔ param if (sourceParsed.kind === "param" && targetParsed.kind === "param") { diff --git a/frontend/src/lib/api.ts b/frontend/src/lib/api.ts index fc8ce79c5..722361867 100644 --- a/frontend/src/lib/api.ts +++ b/frontend/src/lib/api.ts @@ -907,8 +907,12 @@ export const deleteApiKey = async ( export interface GraphNode { id: string; - type: "source" | "pipeline" | "sink" | "record"; + type: "source" | "pipeline" | "sink" | "record" | "node"; pipeline_id?: string | null; + /** Node type ID (NodeRegistry key) when type is "node" */ + node_type_id?: string | null; + /** Per-node parameter values for custom nodes */ + params?: Record | null; x?: number | null; y?: number | null; w?: number | null; @@ -920,6 +924,54 @@ export interface GraphNode { sink_name?: string | null; } +export interface NodePortDef { + name: string; + port_type: string; + required?: boolean; + description?: string; + default_value?: unknown; +} + +export interface NodeParamDef { + name: string; + param_type: "number" | "string" | "boolean" | "select"; + default?: unknown; + description?: string; + min_value?: number | null; + max_value?: number | null; + step?: number | null; + options?: string[] | null; + convertible_to_input?: boolean; +} + +export interface NodeDefinitionDto { + node_type_id: string; + display_name: string; + category: string; + description: string; + inputs: NodePortDef[]; + outputs: NodePortDef[]; + params: NodeParamDef[]; + continuous: boolean; +} + +export interface NodeDefinitionsResponse { + nodes: NodeDefinitionDto[]; +} + +export const fetchNodeDefinitions = + async (): Promise => { + const response = await fetch("/api/v1/nodes/definitions", { + method: "GET", + }); + if (!response.ok) { + throw new Error( + `Failed to fetch node definitions: ${response.statusText}` + ); + } + return response.json(); + }; + export interface GraphEdge { from: string; from_port: string; diff --git a/frontend/src/lib/graphUtils.ts b/frontend/src/lib/graphUtils.ts index 861df74da..bfd9040eb 100644 --- a/frontend/src/lib/graphUtils.ts +++ b/frontend/src/lib/graphUtils.ts @@ -104,7 +104,8 @@ export interface FlowNodeData { | "prompt_list" | "prompt_blend" | "scheduler" - | "audio"; + | "audio" + | "custom_node"; availablePipelineIds?: string[]; /** Declared input ports for the selected pipeline */ streamInputs?: string[]; @@ -384,6 +385,40 @@ export interface FlowNodeData { /* ── Tempo beat count offset ── */ tempoBeatCountOffset?: number; + /* ── Custom node fields ── */ + /** For custom_node: the node_type_id from the backend registry */ + customNodeTypeId?: string; + /** For custom_node: display name from node definition */ + customNodeDisplayName?: string; + /** For custom_node: category from node definition */ + customNodeCategory?: string; + /** For custom_node: input port definitions */ + customNodeInputs?: Array<{ + name: string; + port_type: string; + required?: boolean; + description?: string; + }>; + /** For custom_node: output port definitions */ + customNodeOutputs?: Array<{ + name: string; + port_type: string; + description?: string; + }>; + /** For custom_node: current parameter values (user-editable) */ + customNodeParams?: Record; + /** For custom_node: parameter definitions from API (widget metadata) */ + customNodeParamDefs?: Array<{ + name: string; + param_type: string; + default?: unknown; + description?: string; + min_value?: number; + max_value?: number; + step?: number; + options?: string[]; + }>; + /* ── Node lock / pin / collapse ── */ /** When true, parameter inputs on this node are disabled (read-only). */ locked?: boolean; @@ -779,6 +814,40 @@ export function graphConfigToFlow( }); }); + // Backend custom nodes (type="node"). Port metadata is hydrated later + // from GET /api/v1/nodes/definitions in useGraphPersistence. + const customNodes = graph.nodes.filter( + n => n.type === "node" && !isSubgraphInnerNode(n.id) + ); + customNodes.forEach((n, i) => { + const savedX = n.x ?? undefined; + const savedY = n.y ?? undefined; + const sizeProps = + n.w != null || n.h != null + ? { + width: n.w ?? undefined, + height: n.h ?? undefined, + style: { width: n.w ?? undefined, height: n.h ?? undefined }, + } + : {}; + nodes.push({ + id: n.id, + type: "custom_node", + position: { + x: savedX !== undefined ? savedX : START_X + COLUMN_GAP * 1.5, + y: + savedY !== undefined ? savedY : START_Y + i * (NODE_HEIGHT + ROW_GAP), + }, + ...sizeProps, + data: { + label: n.node_type_id || n.id, + nodeType: "custom_node", + customNodeTypeId: n.node_type_id ?? undefined, + customNodeParams: (n.params as Record) ?? undefined, + }, + }); + }); + // Convert edges - add stream: prefix to handle IDs // Skip edges that reference flattened inner subgraph nodes const edges: Edge[] = graph.edges @@ -1208,11 +1277,21 @@ export function flowToGraphConfig( ? "sink" : n.data.nodeType === "record" ? "record" - : "pipeline", + : n.data.nodeType === "custom_node" + ? "node" + : "pipeline", pipeline_id: n.data.nodeType === "pipeline" ? (n.data.pipelineId ?? null) : undefined, + node_type_id: + n.data.nodeType === "custom_node" + ? (n.data.customNodeTypeId ?? null) + : undefined, + params: + n.data.nodeType === "custom_node" && n.data.customNodeParams + ? n.data.customNodeParams + : undefined, x: n.position.x, y: n.position.y, w: w && !Number.isNaN(w) ? w : undefined, From 739cede938fd99c9ea5158f93301e7badd55d2d2 Mon Sep 17 00:00:00 2001 From: Rafal Leszko Date: Fri, 10 Apr 2026 13:36:57 +0000 Subject: [PATCH 03/43] refactor: unify Pipeline and Node under BaseNode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The graph editor has always called processing units "Nodes" in the UI (see app.daydream.live/nodes) while the code called them "Pipelines"; this PR then introduced a second unrelated concept also called "Node". Collapse both into one hierarchy so plugin authors and users see a single concept. Design: - Pipeline becomes `class Pipeline(BaseNode, ABC)` — pure inheritance change. Concrete pipeline subclasses (longlive, krea, LTX-2, VACE, streamdiffusion, …) touch zero lines. They still only implement `__call__` and `get_config_class`. - Pipeline provides default `get_definition()` (derives a lightweight NodeDefinition from the config class) and a no-op `execute()` that raises NotImplementedError (pipelines are driven by `__call__` from PipelineProcessor; the method only exists to satisfy the abstract BaseNode contract). - NodeRegistry is now the sole storage. Its register() handles both plain BaseNode classes (read `node_type_id` classvar) and Pipeline subclasses (read `pipeline_id` from the config class). - PipelineRegistry becomes a filtering view over NodeRegistry._nodes that returns only Pipeline subclasses. Every pre-existing call site (`list_pipelines`, `get_config_class`, `chain_produces_*`, `register`, `unregister`, `is_registered`, `get`) keeps its old signature and semantics, so the server, plugin manager, OSC/DMX docs, workflows resolver, and tests all work unchanged. - register_pipelines plugin hook still forwards through `registry.register(pipeline_id, pipeline_class)` which now plants into the shared storage. Existing plugins need zero edits. - register_nodes plugin hook uses the same storage. Both hooks can be used interchangeably; new plugins use register_nodes. - GET /api/v1/nodes/definitions filters pipelines out so the frontend add-node catalog's Plugins category only shows plain custom nodes. Pipelines continue to be surfaced through the dedicated /api/v1/pipelines/schemas endpoint for their rich config panel. - Frontend: the add-node modal's "Pipeline" catalog entry is renamed to "Node", the PipelineNode parameter selector label changes from "Pipeline" to "Model", and the context menu's "Pipeline" entry becomes "Node". The word "pipeline" disappears from the user-facing UI. Backwards compatibility: - Existing pipeline subclasses don't change. - Existing register_pipelines plugins don't change. - Existing PipelineRegistry.* call sites don't change. - Existing `"type": "pipeline"` saved workflows still load. - GraphNode.type keeps both "pipeline" and "node" literals. Signed-off-by: Rafal Leszko --- .../src/components/graph/AddNodeModal.tsx | 4 +- .../src/components/graph/contextMenuItems.tsx | 4 +- .../components/graph/nodes/PipelineNode.tsx | 6 +- src/scope/core/nodes/registry.py | 34 +++++- src/scope/core/pipelines/interface.py | 82 ++++++++------ src/scope/core/pipelines/registry.py | 105 ++++++------------ src/scope/core/plugins/manager.py | 73 +++++------- src/scope/server/app.py | 16 ++- 8 files changed, 161 insertions(+), 163 deletions(-) diff --git a/frontend/src/components/graph/AddNodeModal.tsx b/frontend/src/components/graph/AddNodeModal.tsx index 4fece1a40..7013ca15f 100644 --- a/frontend/src/components/graph/AddNodeModal.tsx +++ b/frontend/src/components/graph/AddNodeModal.tsx @@ -91,8 +91,8 @@ const NODE_CATALOG: NodeCatalogItem[] = [ }, { type: "pipeline", - name: "Pipeline", - description: "Processing pipeline node", + name: "Node", + description: "Video processing node (pick a model after dropping it)", color: "#60a5fa", category: "I/O", }, diff --git a/frontend/src/components/graph/contextMenuItems.tsx b/frontend/src/components/graph/contextMenuItems.tsx index 0d4903f9e..caeadd698 100644 --- a/frontend/src/components/graph/contextMenuItems.tsx +++ b/frontend/src/components/graph/contextMenuItems.tsx @@ -96,10 +96,10 @@ export function buildPaneMenuItems(deps: { keywords: ["input", "camera", "video"], }, { - label: "Pipeline", + label: "Node", icon: , onClick: () => handleNodeTypeSelect("pipeline"), - keywords: ["process", "effect", "filter"], + keywords: ["process", "effect", "filter", "pipeline"], }, { label: "Sink", diff --git a/frontend/src/components/graph/nodes/PipelineNode.tsx b/frontend/src/components/graph/nodes/PipelineNode.tsx index 7b8408448..4cd461886 100644 --- a/frontend/src/components/graph/nodes/PipelineNode.tsx +++ b/frontend/src/components/graph/nodes/PipelineNode.tsx @@ -84,7 +84,7 @@ export function PipelineNode({ const supportsLoRA = data.supportsLoRA ?? false; const isStreaming = data.isStreaming ?? false; - const pipelineName = data.pipelineId || "Pipeline"; + const pipelineName = data.pipelineId || "Node"; // Inject unavailable pipelineId into options const isUnavailable = @@ -187,8 +187,8 @@ export function PipelineNode({ /> {!collapsed && ( - {/* Pipeline selector */} - + {/* Node type selector (video pipeline model) */} + { diff --git a/src/scope/core/nodes/registry.py b/src/scope/core/nodes/registry.py index b4a4d5b06..285377c22 100644 --- a/src/scope/core/nodes/registry.py +++ b/src/scope/core/nodes/registry.py @@ -1,4 +1,4 @@ -"""Registry for custom node types.""" +"""Unified registry for every node type on the graph (plain nodes + pipelines).""" from __future__ import annotations @@ -11,17 +11,41 @@ logger = logging.getLogger(__name__) -class NodeRegistry: - """Central registry for all available node types. +def _derive_node_type_id(node_class: type) -> str | None: + """Return the registry key for a node class, or None if not derivable. - Keyed by ``node_type_id`` (read from the registered class). + Plain nodes carry the id as the ``node_type_id`` classvar; pipelines + keep it on their config class as ``pipeline_id``. """ + node_type_id = getattr(node_class, "node_type_id", None) + if node_type_id is not None: + return node_type_id + # Lazy import: nodes.registry is loaded before pipelines.interface. + try: + from scope.core.pipelines.interface import Pipeline + + if issubclass(node_class, Pipeline): + return node_class.get_config_class().pipeline_id + except Exception: + pass + return None + + +class NodeRegistry: + """Central registry for all available node types.""" _nodes: dict[str, type[BaseNode]] = {} @classmethod def register(cls, node_class: type[BaseNode]) -> None: - node_type_id = node_class.node_type_id + """Register a :class:`BaseNode` subclass (plain node or pipeline).""" + node_type_id = _derive_node_type_id(node_class) + if node_type_id is None: + raise ValueError( + f"Cannot determine node_type_id for {node_class.__name__}; " + "set a ClassVar[str] `node_type_id` on plain nodes or a " + "`pipeline_id` on the pipeline config class." + ) cls._nodes[node_type_id] = node_class logger.debug("Registered node type: %s", node_type_id) diff --git a/src/scope/core/pipelines/interface.py b/src/scope/core/pipelines/interface.py index 18312c647..667f72767 100644 --- a/src/scope/core/pipelines/interface.py +++ b/src/scope/core/pipelines/interface.py @@ -1,10 +1,20 @@ -"""Base interface for all pipelines.""" +"""Base interface for all pipelines. + +A :class:`Pipeline` is a :class:`scope.core.nodes.BaseNode` subclass — +the "heavy" kind that batches video frames, loads GPU models, and +carries a rich Pydantic config class. The graph editor and user-facing +docs call them *Nodes*; the name ``Pipeline`` survives as the +implementation base class so existing subclasses and plugins keep +working unchanged. +""" from abc import ABC, abstractmethod from typing import TYPE_CHECKING from pydantic import BaseModel +from scope.core.nodes.base import BaseNode, NodeDefinition, NodePort + if TYPE_CHECKING: from .schema import BasePipelineConfig @@ -15,50 +25,56 @@ class Requirements(BaseModel): input_size: int -class Pipeline(ABC): - """Abstract base class for all pipelines. - - Pipelines must implement get_config_class() to return their Pydantic config model. - This enables: - - Validation via model_validate() / model_validate_json() - - JSON Schema generation via model_json_schema() - - Type-safe configuration access - - API introspection and automatic UI generation +class Pipeline(BaseNode, ABC): + """Abstract base class for video-pipeline nodes. - See schema.py for the BasePipelineConfig model and pipeline-specific configs. - For multi-mode pipeline support (text/video), pipelines use helper functions - from defaults.py (resolve_input_mode, apply_mode_defaults_to_state, etc.). + Subclasses implement ``__call__`` (the per-chunk processing + function) and ``get_config_class`` (returning a Pydantic config + that drives validation, JSON-schema generation, the parameter + panel, and parameter defaults). Everything else — registry, + plugin hook, graph editor — is the same as for plain nodes. """ @classmethod def get_config_class(cls) -> type["BasePipelineConfig"]: """Return the Pydantic config class for this pipeline. - The config class should inherit from BasePipelineConfig and define: - - pipeline_id: Unique identifier - - pipeline_name: Human-readable name - - pipeline_description: Capabilities description - - pipeline_version: Version string - - Default parameter values for the pipeline - - Returns: - Pydantic config model class - - Note: - Subclasses should override this method to return their config class. - The default implementation returns BasePipelineConfig. - - Example: - from .schema import LongLiveConfig - - @classmethod - def get_config_class(cls) -> type[BasePipelineConfig]: - return LongLiveConfig + Subclasses override to return their concrete config; the + default returns ``BasePipelineConfig``. """ from .schema import BasePipelineConfig return BasePipelineConfig + @classmethod + def get_definition(cls) -> NodeDefinition: + """Project the pipeline's config class into a :class:`NodeDefinition`. + + This is the lightweight node-catalog view. The rich pipeline + metadata (``config_schema``, LoRA/VACE flags, mode defaults, + etc.) is still served by ``GET /api/v1/pipelines/schemas`` and + rendered by ``PipelineNode.tsx``. ``params`` is left empty + because the Pydantic schema is too structured to flatten into + ``NodeParam[]`` widgets. + """ + config = cls.get_config_class() + return NodeDefinition( + node_type_id=config.pipeline_id, + display_name=getattr(config, "pipeline_name", config.pipeline_id), + category="pipeline", + description=getattr(config, "pipeline_description", "") or "", + inputs=[ + NodePort(name=name, port_type="video") + for name in (getattr(config, "inputs", ["video"]) or ["video"]) + ], + outputs=[ + NodePort(name=name, port_type="video") + for name in (getattr(config, "outputs", ["video"]) or ["video"]) + ], + params=[], + continuous=False, + ) + @abstractmethod def __call__(self, **kwargs) -> dict: """ diff --git a/src/scope/core/pipelines/registry.py b/src/scope/core/pipelines/registry.py index 53de455ca..7ea6f6d53 100644 --- a/src/scope/core/pipelines/registry.py +++ b/src/scope/core/pipelines/registry.py @@ -1,8 +1,10 @@ -"""Pipeline registry for centralized pipeline management. +"""Pipeline registry — a filtering view over :class:`NodeRegistry`. -This module provides a registry pattern to eliminate if/elif chains when -accessing pipelines by ID. It enables dynamic pipeline discovery and -metadata retrieval. +Pipelines and plain custom nodes share the same ``NodeRegistry._nodes`` +storage. ``PipelineRegistry`` projects that storage down to entries +whose class is a :class:`Pipeline` subclass and exposes the same API +the rest of the codebase always used, so existing call sites and +plugins keep working unchanged. """ import importlib @@ -11,6 +13,8 @@ import torch +from scope.core.nodes.registry import NodeRegistry + if TYPE_CHECKING: from .interface import Pipeline from .schema import BasePipelineConfig @@ -18,83 +22,49 @@ logger = logging.getLogger(__name__) -class PipelineRegistry: - """Registry for managing available pipelines.""" +def _is_pipeline(node_class: object) -> bool: + """Return True when ``node_class`` is a :class:`Pipeline` subclass. + + Lazily imports :class:`Pipeline` to dodge the import cycle between + the pipelines and nodes packages at module load time. + """ + from .interface import Pipeline + + return isinstance(node_class, type) and issubclass(node_class, Pipeline) - _pipelines: dict[str, type["Pipeline"]] = {} + +class PipelineRegistry: + """Filtering view over :class:`NodeRegistry` for pipeline classes.""" @classmethod def register(cls, pipeline_id: str, pipeline_class: type["Pipeline"]) -> None: - """Register a pipeline class with its ID. - - Args: - pipeline_id: Unique identifier for the pipeline - pipeline_class: Pipeline class to register - """ - cls._pipelines[pipeline_id] = pipeline_class + """Plant a pipeline class into the unified :class:`NodeRegistry`.""" + NodeRegistry._nodes[pipeline_id] = pipeline_class @classmethod def get(cls, pipeline_id: str) -> type["Pipeline"] | None: - """Get a pipeline class by its ID. - - Args: - pipeline_id: Pipeline identifier - - Returns: - Pipeline class if found, None otherwise - """ - return cls._pipelines.get(pipeline_id) + node_class = NodeRegistry._nodes.get(pipeline_id) + return node_class if _is_pipeline(node_class) else None @classmethod def unregister(cls, pipeline_id: str) -> bool: - """Remove a pipeline from the registry. - - Args: - pipeline_id: Pipeline identifier to remove - - Returns: - True if pipeline was removed, False if not found - """ - if pipeline_id in cls._pipelines: - del cls._pipelines[pipeline_id] - return True - return False + if cls.get(pipeline_id) is None: + return False + NodeRegistry._nodes.pop(pipeline_id, None) + return True @classmethod def is_registered(cls, pipeline_id: str) -> bool: - """Check if a pipeline is registered. - - Args: - pipeline_id: Pipeline identifier - - Returns: - True if pipeline is registered, False otherwise - """ - return pipeline_id in cls._pipelines + return cls.get(pipeline_id) is not None @classmethod def get_config_class(cls, pipeline_id: str) -> type["BasePipelineConfig"] | None: - """Get config class for a specific pipeline. - - Args: - pipeline_id: Pipeline identifier - - Returns: - Pydantic config class if found, None otherwise - """ pipeline_class = cls.get(pipeline_id) - if pipeline_class is None: - return None - return pipeline_class.get_config_class() + return pipeline_class.get_config_class() if pipeline_class else None @classmethod def list_pipelines(cls) -> list[str]: - """Get list of all registered pipeline IDs. - - Returns: - List of pipeline IDs - """ - return list(cls._pipelines.keys()) + return [pid for pid, c in NodeRegistry._nodes.items() if _is_pipeline(c)] @classmethod def chain_produces_video(cls, pipeline_ids: list[str]) -> bool: @@ -255,30 +225,27 @@ def _initialize_registry(): register_builtin_nodes() - # Load and register plugin pipelines and plugin nodes + # Load and register plugins. The unified register_plugin_nodes fires + # both register_pipelines and register_nodes hooks, so old and new + # plugins are picked up in one call. try: from scope.core.plugins import ( ensure_plugins_installed, load_plugins, register_plugin_nodes, - register_plugin_pipelines, ) ensure_plugins_installed() load_plugins() - register_plugin_pipelines(PipelineRegistry) - - from scope.core.nodes.registry import NodeRegistry - - register_plugin_nodes(NodeRegistry) + register_plugin_nodes() except Exception as e: logger.error( f"Failed to load plugins: {e}. Built-in pipelines are still available." ) - pipeline_count = len(PipelineRegistry.list_pipelines()) from scope.core.nodes.registry import NodeRegistry + pipeline_count = len(PipelineRegistry.list_pipelines()) node_count = len(NodeRegistry.list_node_types()) logger.info( f"Registry initialized with {pipeline_count} pipeline(s) and " diff --git a/src/scope/core/plugins/manager.py b/src/scope/core/plugins/manager.py index ad8cb15f6..d18d7118f 100644 --- a/src/scope/core/plugins/manager.py +++ b/src/scope/core/plugins/manager.py @@ -550,45 +550,36 @@ def get_failed_plugins(self) -> list[FailedPluginInfo]: with self._lock: return list(self._failed_plugins) - def register_plugin_pipelines(self, registry: "PipelineRegistry") -> None: - """Call register_pipelines hook for all plugins. - - Args: - registry: PipelineRegistry to register pipelines with + def register_plugin_nodes(self, registry: Any = None) -> None: + """Fire ``register_nodes`` and ``register_pipelines`` hooks. + + Both hooks plant into the unified :class:`NodeRegistry` storage, + so existing plugins using ``register_pipelines(register)`` keep + working unchanged alongside new ones using ``register_nodes``. + The ``registry`` argument is accepted for legacy callers but + ignored — the unified storage is always used. """ - with self._lock: - # Clear previous mappings - self._pipeline_to_plugin.clear() - - def register_callback(pipeline_class: Any) -> None: - """Callback function passed to plugins.""" - config_class = pipeline_class.get_config_class() - pipeline_id = config_class.pipeline_id - registry.register(pipeline_id, pipeline_class) - - # Track which plugin owns this pipeline - # We'll update this mapping after the hook call - logger.info(f"Registered plugin pipeline: {pipeline_id}") - - self._pm.hook.register_pipelines(register=register_callback) - - # Update pipeline-to-plugin mapping by checking which plugins provide which pipelines - self._update_pipeline_plugin_mapping(registry) + from scope.core.nodes.registry import NodeRegistry + from scope.core.pipelines.registry import PipelineRegistry - def register_plugin_nodes(self, registry: Any) -> None: - """Call ``register_nodes`` hook for all plugins. + del registry # legacy parameter, kept for callsite compat - Args: - registry: ``NodeRegistry`` to register nodes with - """ with self._lock: + self._pipeline_to_plugin.clear() def register_callback(node_class: Any) -> None: - node_type_id = node_class.node_type_id - registry.register(node_class) - logger.info(f"Registered plugin node: {node_type_id}") + NodeRegistry.register(node_class) + node_id = getattr(node_class, "node_type_id", None) or ( + node_class.get_config_class().pipeline_id + ) + logger.info(f"Registered plugin node: {node_id}") self._pm.hook.register_nodes(register=register_callback) + self._pm.hook.register_pipelines(register=register_callback) + self._update_pipeline_plugin_mapping(PipelineRegistry) + + # Backwards-compat alias for internal callers using the legacy name. + register_plugin_pipelines = register_plugin_nodes def _update_pipeline_plugin_mapping(self, registry: "PipelineRegistry") -> None: """Update the mapping of pipeline IDs to plugin names.""" @@ -1673,19 +1664,15 @@ def load_plugins() -> None: get_plugin_manager().load_plugins() -def register_plugin_pipelines(registry: "PipelineRegistry") -> None: - """Call register_pipelines hook for all plugins. +def register_plugin_nodes(registry: Any = None) -> None: + """Fire ``register_nodes`` + ``register_pipelines`` hooks. - Args: - registry: PipelineRegistry to register pipelines with + Both hookspecs plant into the unified :class:`NodeRegistry` storage, + so old and new plugins coexist. The ``registry`` argument is kept + for legacy callers and ignored. """ - get_plugin_manager().register_plugin_pipelines(registry) - + get_plugin_manager().register_plugin_nodes(registry) -def register_plugin_nodes(registry: Any) -> None: - """Call ``register_nodes`` hook for all plugins. - Args: - registry: ``NodeRegistry`` to register nodes with - """ - get_plugin_manager().register_plugin_nodes(registry) +# Backwards-compat alias for internal callers using the legacy name. +register_plugin_pipelines = register_plugin_nodes diff --git a/src/scope/server/app.py b/src/scope/server/app.py index 5ce2599b9..a20664075 100644 --- a/src/scope/server/app.py +++ b/src/scope/server/app.py @@ -871,16 +871,20 @@ async def get_pipeline_schemas( @app.get("/api/v1/nodes/definitions") async def get_node_definitions(): - """Return definitions for all registered backend node types. + """Return definitions for plain (non-pipeline) custom nodes. - Each definition includes the ``node_type_id``, ``display_name``, - ``category``, ``description``, ``continuous`` flag, input/output - ports, and editable parameters. The frontend uses this to populate - the add-node catalog and render custom nodes generically. + Pipelines share the unified :class:`NodeRegistry` but are surfaced + via ``GET /api/v1/pipelines/schemas`` because their rich + ``config_schema`` doesn't fit the compact :class:`NodeDefinition`. """ from scope.core.nodes.registry import NodeRegistry + from scope.core.pipelines.interface import Pipeline - definitions = NodeRegistry.get_all_definitions() + definitions = [ + node_class.get_definition() + for node_class in NodeRegistry._nodes.values() + if not (isinstance(node_class, type) and issubclass(node_class, Pipeline)) + ] return {"nodes": [d.model_dump() for d in definitions]} From 89c3517ef91d8783e8c9929007c8021a16b732a3 Mon Sep 17 00:00:00 2001 From: Rafal Leszko Date: Mon, 13 Apr 2026 09:29:03 +0000 Subject: [PATCH 04/43] refactor: unify node discovery endpoint with pipeline_meta Collapse pipeline and plain-node discovery into one canonical endpoint. ``GET /api/v1/nodes/definitions`` now returns every entry in the unified ``NodeRegistry``: pipelines (Pipeline subclasses) carry the full ``get_schema_with_metadata()`` output as ``pipeline_meta``, plain custom nodes leave ``pipeline_meta`` ``None``. One endpoint, one shape, one source of truth. - core/nodes/base.py: add ``pipeline_meta: dict | None`` field to NodeDefinition. - core/pipelines/interface.py: Pipeline.get_definition() populates pipeline_meta from get_config_class().get_schema_with_metadata(). - server/app.py: * /api/v1/nodes/definitions stops filtering pipelines and returns everything. * /api/v1/pipelines/schemas becomes a thin compat alias that derives its (legacy-shape) response by iterating NodeRegistry.get_all_definitions() and pulling pipeline_meta from entries whose pipeline_meta is set. Existing usePipelines.ts and cloud-mode proxy callers keep working without migration. - frontend/lib/api.ts: NodeDefinitionDto gains the optional pipeline_meta field. - frontend/components/graph/AddNodeModal.tsx: when populating the Plugins category from the unified endpoint, filter out entries where pipeline_meta != null. Pipelines are still added through the hardcoded "Pipeline" placeholder + dropdown UX, so this keeps the existing modal behavior unchanged while the backend is unified. Signed-off-by: Rafal Leszko --- .../src/components/graph/AddNodeModal.tsx | 27 +++++---- frontend/src/lib/api.ts | 6 ++ src/scope/core/nodes/base.py | 10 ++++ src/scope/core/pipelines/interface.py | 9 +-- src/scope/server/app.py | 59 ++++++++----------- 5 files changed, 63 insertions(+), 48 deletions(-) diff --git a/frontend/src/components/graph/AddNodeModal.tsx b/frontend/src/components/graph/AddNodeModal.tsx index 7013ca15f..65588a9d2 100644 --- a/frontend/src/components/graph/AddNodeModal.tsx +++ b/frontend/src/components/graph/AddNodeModal.tsx @@ -409,16 +409,23 @@ export function AddNodeModal({ fetch("/api/v1/nodes/definitions") .then(r => r.json()) .then(data => { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const items: NodeCatalogItem[] = (data.nodes ?? []).map((n: any) => ({ - type: "custom_node" as const, - subType: n.node_type_id, - name: n.display_name || n.node_type_id, - description: n.description || "", - color: "#9ca3af", - category: "Plugins", - customNodeDef: n, - })); + // The unified endpoint returns both pipelines (pipeline_meta != null) + // and plain custom nodes. Pipelines are still added via the hardcoded + // "Pipeline" catalog entry (placeholder + dropdown), so we filter + // them out of the plugin listing here to avoid duplication. + const items: NodeCatalogItem[] = (data.nodes ?? []) + // eslint-disable-next-line @typescript-eslint/no-explicit-any + .filter((n: any) => n.pipeline_meta == null) + // eslint-disable-next-line @typescript-eslint/no-explicit-any + .map((n: any) => ({ + type: "custom_node" as const, + subType: n.node_type_id, + name: n.display_name || n.node_type_id, + description: n.description || "", + color: "#9ca3af", + category: "Plugins", + customNodeDef: n, + })); setCustomNodes(items); }) .catch(() => { diff --git a/frontend/src/lib/api.ts b/frontend/src/lib/api.ts index 722361867..92739704d 100644 --- a/frontend/src/lib/api.ts +++ b/frontend/src/lib/api.ts @@ -953,6 +953,12 @@ export interface NodeDefinitionDto { outputs: NodePortDef[]; params: NodeParamDef[]; continuous: boolean; + /** + * Rich pipeline-only metadata (config_schema, mode_defaults, + * supports_lora, supports_vace, etc.) populated for entries whose + * underlying class is a Pipeline subclass. ``null`` for plain nodes. + */ + pipeline_meta?: Record | null; } export interface NodeDefinitionsResponse { diff --git a/src/scope/core/nodes/base.py b/src/scope/core/nodes/base.py index 5271dafa3..870cefb9a 100644 --- a/src/scope/core/nodes/base.py +++ b/src/scope/core/nodes/base.py @@ -86,6 +86,16 @@ class NodeDefinition(BaseModel): "instead of executing once. Useful for streaming generators." ), ) + pipeline_meta: dict[str, Any] | None = Field( + default=None, + description=( + "Rich pipeline-only metadata (config_schema, mode_defaults, " + "supports_lora, supports_vace, etc.) for nodes that are " + ":class:`Pipeline` subclasses. ``None`` for plain nodes. " + "Populated by ``Pipeline.get_definition()`` from the config " + "class's ``get_schema_with_metadata()``." + ), + ) class BaseNode(ABC): diff --git a/src/scope/core/pipelines/interface.py b/src/scope/core/pipelines/interface.py index 667f72767..acfbebe87 100644 --- a/src/scope/core/pipelines/interface.py +++ b/src/scope/core/pipelines/interface.py @@ -50,10 +50,10 @@ def get_config_class(cls) -> type["BasePipelineConfig"]: def get_definition(cls) -> NodeDefinition: """Project the pipeline's config class into a :class:`NodeDefinition`. - This is the lightweight node-catalog view. The rich pipeline - metadata (``config_schema``, LoRA/VACE flags, mode defaults, - etc.) is still served by ``GET /api/v1/pipelines/schemas`` and - rendered by ``PipelineNode.tsx``. ``params`` is left empty + Populates the compact node-catalog fields (id, ports, etc.) + and stuffs the full ``get_schema_with_metadata()`` output into + ``pipeline_meta``, which is the rich data ``PipelineNode.tsx`` + renders in the parameter panel. ``params`` is left empty because the Pydantic schema is too structured to flatten into ``NodeParam[]`` widgets. """ @@ -73,6 +73,7 @@ def get_definition(cls) -> NodeDefinition: ], params=[], continuous=False, + pipeline_meta=config.get_schema_with_metadata(), ) @abstractmethod diff --git a/src/scope/server/app.py b/src/scope/server/app.py index a20664075..dbf079bac 100644 --- a/src/scope/server/app.py +++ b/src/scope/server/app.py @@ -823,41 +823,36 @@ async def get_pipeline_schemas( http_request: Request, cloud_manager: ScopeCloudBackend = Depends(get_scope_cloud), ): - """Get configuration schemas and defaults for all available pipelines. + """Compat alias for the pipeline-rich subset of the unified node catalog. - Returns the output of each pipeline's get_schema_with_metadata() method, - which includes: - - Pipeline metadata (id, name, description, version) - - supported_modes: List of supported input modes ("text", "video") - - default_mode: Default input mode for this pipeline - - mode_defaults: Mode-specific default overrides (if any) - - config_schema: Full JSON schema with defaults + Derives its response from the unified :class:`NodeRegistry` — + every entry whose :attr:`NodeDefinition.pipeline_meta` is set is a + pipeline, and ``pipeline_meta`` already holds the full output of + ``get_schema_with_metadata()`` (config_schema, mode_defaults, + supports_lora, supports_vace, etc.). Kept so existing frontend + callers in ``usePipelines.ts`` keep working without migration; new + code should read from ``GET /api/v1/nodes/definitions`` instead. - The frontend should use this as the source of truth for parameter defaults. - - In cloud mode (when connected to cloud), this proxies the request to the - cloud-hosted scope backend to get the available pipelines there. + In cloud mode this proxies to the cloud-hosted scope backend. """ global _pipeline_schemas_cache if _pipeline_schemas_cache is not None: return _pipeline_schemas_cache - from scope.core.pipelines.registry import PipelineRegistry + from scope.core.nodes.registry import NodeRegistry from scope.core.plugins import get_plugin_manager plugin_manager = get_plugin_manager() pipelines: dict = {} - for pipeline_id in PipelineRegistry.list_pipelines(): - config_class = PipelineRegistry.get_config_class(pipeline_id) - if config_class: - # get_schema_with_metadata() includes supported_modes, default_mode, - # and mode_defaults directly from the config class - schema_data = config_class.get_schema_with_metadata() - schema_data["plugin_name"] = plugin_manager.get_plugin_for_pipeline( - pipeline_id - ) - pipelines[pipeline_id] = schema_data + for definition in NodeRegistry.get_all_definitions(): + if definition.pipeline_meta is None: + continue + schema_data = dict(definition.pipeline_meta) + schema_data["plugin_name"] = plugin_manager.get_plugin_for_pipeline( + definition.node_type_id + ) + pipelines[definition.node_type_id] = schema_data response = PipelineSchemasResponse(pipelines=pipelines) _pipeline_schemas_cache = response @@ -871,21 +866,17 @@ async def get_pipeline_schemas( @app.get("/api/v1/nodes/definitions") async def get_node_definitions(): - """Return definitions for plain (non-pipeline) custom nodes. + """Return definitions for every registered node — pipelines included. - Pipelines share the unified :class:`NodeRegistry` but are surfaced - via ``GET /api/v1/pipelines/schemas`` because their rich - ``config_schema`` doesn't fit the compact :class:`NodeDefinition`. + The unified discovery endpoint. Pipelines (Pipeline subclasses) + appear with ``pipeline_meta`` populated; plain custom nodes leave + ``pipeline_meta`` ``None``. Frontend consumers that only want plain + nodes filter on ``pipeline_meta == null`` client-side; consumers + that want rich pipeline data read from ``pipeline_meta`` directly. """ from scope.core.nodes.registry import NodeRegistry - from scope.core.pipelines.interface import Pipeline - definitions = [ - node_class.get_definition() - for node_class in NodeRegistry._nodes.values() - if not (isinstance(node_class, type) and issubclass(node_class, Pipeline)) - ] - return {"nodes": [d.model_dump() for d in definitions]} + return {"nodes": [d.model_dump() for d in NodeRegistry.get_all_definitions()]} # --------------------------------------------------------------------------- From 19c44f68122c550d903af0cc7fbca75ca9759b1e Mon Sep 17 00:00:00 2001 From: Rafal Leszko Date: Mon, 13 Apr 2026 12:06:35 +0000 Subject: [PATCH 05/43] refactor: NodeParam widget hints in generic `ui` dict MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove widget-specific fields (``min_value``, ``max_value``, ``step``, ``options``) from ``NodeParam`` and put them in a free-form ``ui`` dict instead. The base schema no longer grows when new widget kinds appear; the frontend renderer dispatches on ``param_type`` and reads whichever ``ui`` keys apply (``min``/``max``/``step`` for number, ``options`` for select, etc.). - core/nodes/base.py: drop ``min_value``/``max_value``/``step``/ ``options`` fields; add ``ui: dict[str, Any] | None`` with documentation of the conventional keys. - frontend lib/api.ts: ``NodeParamDef`` mirrors the new shape — the typed widget fields go away, ``ui`` takes their place. - frontend CustomNode.tsx: number widget reads ``p.ui?.min`` / ``p.ui?.max`` / ``p.ui?.step``; select widget reads ``p.ui?.options``; boolean and text widgets unchanged. Plugins that construct ``NodeParam`` are migrated to the new shape separately — ACEStep's ``bridge.py`` needs a coordinated update, and Scope's audio builtins (added in the execution PR) do too. Both get the generic shape: ``ui={"min": …, "max": …, "step": …}`` for numbers, ``ui={"options": [...]}`` for selects. Signed-off-by: Rafal Leszko --- .../src/components/graph/nodes/CustomNode.tsx | 16 ++++++-------- frontend/src/lib/api.ts | 11 ++++++---- src/scope/core/nodes/base.py | 22 +++++++++++++------ 3 files changed, 29 insertions(+), 20 deletions(-) diff --git a/frontend/src/components/graph/nodes/CustomNode.tsx b/frontend/src/components/graph/nodes/CustomNode.tsx index 393a8e96d..0ee594fd5 100644 --- a/frontend/src/components/graph/nodes/CustomNode.tsx +++ b/frontend/src/components/graph/nodes/CustomNode.tsx @@ -43,10 +43,7 @@ export function CustomNode({ id, data, selected }: NodeProps) { param_type: string; default?: unknown; description?: string; - min_value?: number; - max_value?: number; - step?: number; - options?: string[]; + ui?: Record | null; }>; const displayName = data.customTitle || @@ -133,7 +130,8 @@ export function CustomNode({ id, data, selected }: NodeProps) { > {p.description || p.name} - {p.param_type === "select" && p.options ? ( + {p.param_type === "select" && + Array.isArray(p.ui?.options) ? ( - updateData({ - customNodeParams: { - ...data.customNodeParams, - [p.name]: e.target.value, - }, - }) - } + onChange={e => setParam(p.name, e.target.value)} > {(p.ui?.options as string[]).map(o => (