From 1e1fce8ad8892a4b936a63628006e1c5e441e8e0 Mon Sep 17 00:00:00 2001 From: "Tessa (livepeer-tessa)" Date: Fri, 10 Apr 2026 06:20:20 +0000 Subject: [PATCH 1/2] fix: accept legacy source/target edge keys in GraphConfig for backwards-compat (#895) Old Scope clients send edge definitions using the deprecated 'source'/'target' key names instead of the current 'from'/'from_port'/ 'to_node'/'to_port' fields. This caused a pydantic ValidationError in scope.server.frame_processor, resulting in sessions starting but processing 0 frames (silent failure for users). Error was observed 4x in a 12 h window. Changes: - Add a model_validator(mode='before') on GraphEdge that detects legacy 'source'/'target' keys and transparently maps them to the current schema. - When port fields are absent, default from_port to 'video' and to_port to 'video' (matching the existing convention for simple linear graphs). - Log a WARNING with the raw edge data whenever legacy keys are coerced so operators can identify outdated clients in logs. - Add tests/test_graph_schema.py covering legacy-only, mixed, and current-only edge formats plus the deprecation warning emission. Fixes #895 Signed-off-by: Tessa (livepeer-tessa) --- src/scope/server/graph_schema.py | 64 ++++++++++++- tests/test_graph_schema.py | 159 +++++++++++++++++++++++++++++++ 2 files changed, 220 insertions(+), 3 deletions(-) create mode 100644 tests/test_graph_schema.py diff --git a/src/scope/server/graph_schema.py b/src/scope/server/graph_schema.py index 26f95341f..2ca773bfc 100644 --- a/src/scope/server/graph_schema.py +++ b/src/scope/server/graph_schema.py @@ -29,9 +29,12 @@ from __future__ import annotations -from typing import Literal +import logging +from typing import Any, Literal -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, model_validator + +logger = logging.getLogger(__name__) class GraphNode(BaseModel): @@ -72,7 +75,13 @@ class GraphNode(BaseModel): class GraphEdge(BaseModel): - """An edge connecting an output port to an input port.""" + """An edge connecting an output port to an input port. + + Accepts both the current schema (``from``, ``from_port``, ``to_node``, + ``to_port``) and the legacy schema (``source``, ``target``) for backwards + compatibility with older Scope desktop clients. When the legacy keys are + present the port names default to ``"video"``. + """ from_node: str = Field(..., alias="from", description="Source node id") from_port: str = Field( @@ -87,6 +96,55 @@ class GraphEdge(BaseModel): model_config = {"populate_by_name": True} + @model_validator(mode="before") + @classmethod + def _coerce_legacy_edge(cls, data: Any) -> Any: + """Map legacy ``source``/``target`` keys to the current schema. + + Older clients send edges as:: + + {"source": "input", "target": "pipeline"} + + The current schema requires ``from``, ``from_port``, ``to_node``, + ``to_port``. This validator accepts any mix of legacy and current keys, + mapping them where the canonical field is absent. Port names default to + ``"video"`` when the legacy payload omits port information. + """ + if not isinstance(data, dict): + return data + + has_legacy = "source" in data or "target" in data + if not has_legacy: + return data + + logger.warning( + "GraphEdge: received legacy edge schema (source/target). " + "Please update the Scope client to send 'from'/'to_node' edges. " + "Coercing automatically. source=%r target=%r", + data.get("source"), + data.get("target"), + ) + + data = dict(data) # make a mutable copy + + # Map source → from (only when 'from' is absent) + if "source" in data and "from" not in data: + data["from"] = data.pop("source") + else: + data.pop("source", None) + + # Map target → to_node (only when 'to_node' is absent) + if "target" in data and "to_node" not in data: + data["to_node"] = data.pop("target") + else: + data.pop("target", None) + + # Apply port defaults when the caller omitted them + data.setdefault("from_port", data.pop("source_port", "video")) + data.setdefault("to_port", data.pop("target_port", "video")) + + return data + class GraphConfig(BaseModel): """Root graph configuration (graph definition).""" diff --git a/tests/test_graph_schema.py b/tests/test_graph_schema.py new file mode 100644 index 000000000..e8450a5fb --- /dev/null +++ b/tests/test_graph_schema.py @@ -0,0 +1,159 @@ +"""Tests for graph_schema backwards-compatibility (issue #895). + +Verifies that GraphEdge and GraphConfig accept both the legacy +``source``/``target`` edge format and the current ``from``/``to_node`` format. +""" + +from __future__ import annotations + +import logging + +import pytest + +from scope.server.graph_schema import GraphConfig, GraphEdge, GraphNode + + +# --------------------------------------------------------------------------- +# GraphEdge unit tests +# --------------------------------------------------------------------------- + + +class TestGraphEdgeLegacyKeys: + """GraphEdge should accept the old source/target format.""" + + def test_legacy_source_target_minimal(self): + """Basic source/target without ports → defaults applied.""" + edge = GraphEdge.model_validate({"source": "input", "target": "pipeline"}) + assert edge.from_node == "input" + assert edge.to_node == "pipeline" + assert edge.from_port == "video" # default + assert edge.to_port == "video" # default + assert edge.kind == "stream" # default + + def test_legacy_source_target_with_ports(self): + """Legacy source/target alongside explicit port names.""" + edge = GraphEdge.model_validate( + { + "source": "input", + "target": "pipeline", + "from_port": "video", + "to_port": "video", + } + ) + assert edge.from_node == "input" + assert edge.to_node == "pipeline" + assert edge.from_port == "video" + assert edge.to_port == "video" + + def test_legacy_emits_deprecation_warning(self, caplog): + with caplog.at_level(logging.WARNING, logger="scope.server.graph_schema"): + GraphEdge.model_validate({"source": "a", "target": "b"}) + assert any("legacy edge schema" in r.message for r in caplog.records) + + def test_legacy_only_source(self): + """Only 'source' provided (no 'target') — should still parse.""" + edge = GraphEdge.model_validate( + {"source": "a", "to_node": "b", "from_port": "video", "to_port": "video"} + ) + assert edge.from_node == "a" + assert edge.to_node == "b" + + def test_legacy_only_target(self): + """Only 'target' provided (no 'source') — should still parse.""" + edge = GraphEdge.model_validate( + {"from": "a", "target": "b", "from_port": "video", "to_port": "video"} + ) + assert edge.from_node == "a" + assert edge.to_node == "b" + + +class TestGraphEdgeCurrentKeys: + """Existing schema (from/from_port/to_node/to_port) must still work.""" + + def test_current_format(self): + edge = GraphEdge.model_validate( + { + "from": "input", + "from_port": "video", + "to_node": "pipeline", + "to_port": "video", + "kind": "stream", + } + ) + assert edge.from_node == "input" + assert edge.from_port == "video" + assert edge.to_node == "pipeline" + assert edge.to_port == "video" + assert edge.kind == "stream" + + def test_current_format_no_warning(self, caplog): + with caplog.at_level(logging.WARNING, logger="scope.server.graph_schema"): + GraphEdge.model_validate( + { + "from": "input", + "from_port": "video", + "to_node": "pipeline", + "to_port": "video", + } + ) + assert not any("Deprecated" in r.message for r in caplog.records) + + +# --------------------------------------------------------------------------- +# GraphConfig integration test +# --------------------------------------------------------------------------- + + +class TestGraphConfigLegacyEdges: + """GraphConfig should parse correctly even when edges use legacy keys.""" + + def _make_config(self, edges): + return GraphConfig.model_validate( + { + "nodes": [ + {"id": "input", "type": "source"}, + {"id": "pipeline", "type": "pipeline", "pipeline_id": "my_pipe"}, + {"id": "output", "type": "sink"}, + ], + "edges": edges, + } + ) + + def test_legacy_edges_in_graph_config(self): + cfg = self._make_config( + [ + {"source": "input", "target": "pipeline"}, + {"source": "pipeline", "target": "output"}, + ] + ) + assert len(cfg.edges) == 2 + assert cfg.edges[0].from_node == "input" + assert cfg.edges[0].to_node == "pipeline" + assert cfg.edges[1].from_node == "pipeline" + assert cfg.edges[1].to_node == "output" + + def test_mixed_edges_in_graph_config(self): + """Mix of legacy and current edge formats in the same config.""" + cfg = self._make_config( + [ + {"source": "input", "target": "pipeline"}, + { + "from": "pipeline", + "from_port": "video", + "to_node": "output", + "to_port": "video", + }, + ] + ) + assert cfg.edges[0].from_node == "input" + assert cfg.edges[1].from_node == "pipeline" + + def test_validate_structure_passes(self): + cfg = self._make_config( + [ + {"source": "input", "target": "pipeline"}, + {"source": "pipeline", "target": "output"}, + ] + ) + errors = cfg.validate_structure() + assert errors == [] From 9f099d5fe6d9e808c399e30ba8d8457604e9d0ea Mon Sep 17 00:00:00 2001 From: "Tessa (livepeer-tessa)" Date: Fri, 10 Apr 2026 18:16:38 +0000 Subject: [PATCH 2/2] fix(vace): auto-convert 3-channel RGB depth masks to grayscale (#908) When video-depth-anything output is wired to longlive's vace_input_frames port the depth map arrives as a [B, 3, F, H, W] tensor (depth value replicated across RGB channels) instead of the expected [B, 1, F, H, W] single-channel mask. This caused a hard ValueError that fired on every chunk (~160+ times per session), silently breaking the entire pipeline. Fix: detect the 3-channel case in _encode_with_conditioning and automatically collapse it to single-channel via channel-average (mean(dim=1, keepdim=True)) before the rest of the validation runs. A logger.warning is emitted so operators can see the conversion happened and optionally fix the graph to wire a true grayscale source instead. All other unexpected channel counts still raise a clear ValueError. Fixes #908 Signed-off-by: Tessa (livepeer-tessa) --- .../wan2_1/vace/blocks/vace_encoding.py | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/src/scope/core/pipelines/wan2_1/vace/blocks/vace_encoding.py b/src/scope/core/pipelines/wan2_1/vace/blocks/vace_encoding.py index db54ace9a..e90419d1c 100644 --- a/src/scope/core/pipelines/wan2_1/vace/blocks/vace_encoding.py +++ b/src/scope/core/pipelines/wan2_1/vace/blocks/vace_encoding.py @@ -764,9 +764,23 @@ def _encode_with_conditioning(self, components, block_state, current_start): input_masks_data.shape ) if mask_channels != 1: - raise ValueError( - f"VaceEncodingBlock._encode_with_conditioning: vace_input_masks must have 1 channel, got {mask_channels}" - ) + if mask_channels == 3: + # Depth maps from video-depth-anything arrive as 3-channel RGB + # (depth value replicated across R/G/B). Convert to single-channel + # grayscale by averaging so downstream VACE encoding works correctly. + import logging + logging.getLogger(__name__).warning( + "VaceEncodingBlock._encode_with_conditioning: vace_input_masks has 3 " + "channels (likely an RGB depth map). Auto-converting to single-channel " + "by averaging. Wire a grayscale source to avoid this conversion." + ) + # Shape: [B, 3, F, H, W] -> [B, 1, F, H, W] + input_masks_data = input_masks_data.mean(dim=1, keepdim=True) + mask_channels = 1 + else: + raise ValueError( + f"VaceEncodingBlock._encode_with_conditioning: vace_input_masks must have 1 channel, got {mask_channels}" + ) if ( mask_frames != num_frames or mask_height != height