Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/scope/server/frame_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,16 @@ def schedule_quantized_update(self, params: dict):

def update_parameters(self, parameters: dict[str, Any]):
"""Update parameters that will be used in the next pipeline call."""
# Sanitize foreign-OS / stale asset paths (e.g. i2v_image, first_frame_image)
# before they reach the pipeline. This mirrors what _load_pipeline_implementation
# does for load-time params but also covers mid-session parameter updates sent
# by the client over the WebSocket (e.g. user picking a Reference Image while
# already streaming). Without this, a path like
# "/tmp/.daydream-scope/assets/foo.png" from a *different* machine ends up on
# a fal.ai worker where it doesn't exist, causing a FileNotFoundError on every
# processed chunk.
parameters = PipelineManager._sanitize_initial_params(parameters)

# Always strip tempo-control keys so they never leak into pipelines,
# even when the corresponding helper (scheduler/engine/tempo_sync) is absent.

Expand Down
107 changes: 106 additions & 1 deletion src/scope/server/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,109 @@ def _apply_load_params(
# Pass merge_mode directly to mixin, not via config
config["_lora_merge_mode"] = lora_merge_mode

# ------------------------------------------------------------------
# Asset path sanitization
# ------------------------------------------------------------------

@staticmethod
def _sanitize_asset_path(path: str) -> str:
"""Normalize a single asset path so it is resolvable on the current system.

When a client sends ``i2v_image`` (or similar params) to a cloud worker,
the path may reference a location on the *client's* machine that does not
exist on the worker. Two cases are handled:

1. **Windows absolute paths** (``X:\\...`` / ``X:/...`` / UNC
``\\\\server\\...``): always rewritten — meaningless on Linux.
2. **Unix absolute paths outside the configured assets directory**: also
rewritten. This catches paths like ``/tmp/.daydream-scope/assets/…``
that came from a *different* Linux machine (e.g. client sends its
local ``/tmp/`` path to a fal.ai worker whose ``/tmp/`` is separate).

In both cases the bare filename is extracted and rejoined against
``get_assets_dir()`` — the location where the server has already
downloaded the uploaded asset.

Relative paths and already-valid absolute paths (inside ``assets_dir``)
are returned unchanged.

Args:
path: Asset path string to normalise.

Returns:
Normalised path string, or the original if no rewrite was needed.
"""
import re
from pathlib import Path, PurePosixPath, PureWindowsPath

from .models_config import get_assets_dir

assets_dir = get_assets_dir()
_windows_abs_re = re.compile(r"^(?:[A-Za-z]:[/\\]|\\\\)", re.ASCII)

needs_rewrite = False

if _windows_abs_re.match(path):
needs_rewrite = True
filename = PureWindowsPath(path).name
elif path.startswith("/"):
try:
abs_path = Path(path).resolve()
if not abs_path.is_relative_to(assets_dir.resolve()):
needs_rewrite = True
filename = Path(path).name
except Exception:
needs_rewrite = True
filename = Path(path).name

if needs_rewrite:
new_path = (assets_dir / filename).as_posix()
logger.warning(
"_sanitize_asset_path: asset path %r appears to be a local "
"absolute path from a different OS or filesystem. "
"Rewriting to %r.",
path,
new_path,
)
return new_path
return path

@staticmethod
def _sanitize_initial_params(params: dict) -> dict:
"""Sanitize known asset path parameters in *params*.

Rewrites Windows / foreign-OS absolute paths for these keys so the
cloud worker can locate the assets that were already uploaded to
``get_assets_dir()``:

* ``i2v_image`` – str or None
* ``first_frame_image`` – str or None
* ``last_frame_image`` – str or None
* ``images`` – list[str] or None (each item sanitized)
* ``vace_ref_images`` – list[str] or None (each item sanitized)

Args:
params: Pipeline parameter dict (will not be mutated).

Returns:
A shallow copy of *params* with the above keys sanitized.
"""
result = dict(params)

_str_keys = ("i2v_image", "first_frame_image", "last_frame_image")
for key in _str_keys:
if key in result and result[key] is not None:
result[key] = PipelineManager._sanitize_asset_path(result[key])

_list_keys = ("images", "vace_ref_images")
for key in _list_keys:
if key in result and result[key] is not None:
result[key] = [
PipelineManager._sanitize_asset_path(p) for p in result[key]
]

return result

def unload_pipeline_by_id(
self,
pipeline_id: str,
Expand Down Expand Up @@ -902,8 +1005,10 @@ def _load_pipeline_implementation(
for name, field in config_class.model_fields.items():
if field.default is not None:
schema_defaults[name] = field.default
# Sanitize foreign-OS asset paths before passing to the pipeline.
sanitized_params = self._sanitize_initial_params(load_params or {})
# Merge: load_params override schema defaults
merged_params = {**schema_defaults, **(load_params or {})}
merged_params = {**schema_defaults, **sanitized_params}
return pipeline_class(**merged_params)

# Fall through to built-in pipeline initialization
Expand Down
23 changes: 23 additions & 0 deletions src/scope/server/pipeline_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,12 @@ def __init__(
self._beat_cache_reset_rate: str = "none"
self._last_reset_boundary: int = -1

# Rate-limit repeated FileNotFoundError messages for the same missing path.
# Maps path -> last-logged timestamp so we only emit one ERROR per path
# per _FNF_LOG_INTERVAL seconds rather than flooding the log on every chunk.
self._fnf_last_logged: dict[str, float] = {}
self._FNF_LOG_INTERVAL: float = 30.0 # seconds between repeated log entries

# Native frame rate reported by the pipeline (e.g. 24fps for LTX-2).
# When set, get_fps() returns this instead of the measured production rate,
# giving the video track a stable playback speed for A/V sync.
Expand Down Expand Up @@ -608,6 +614,23 @@ def process_chunk(self):
seen.add(proc_id)
consumer_proc.update_parameters(extra_params)

except FileNotFoundError as e:
# Missing asset files (e.g. i2v_image path that hasn't been downloaded
# yet, or a stale /tmp path from a different machine) cause a
# FileNotFoundError on every single chunk — easily 2500+ per session.
# Rate-limit to one log entry per missing path per 30 s to avoid
# flooding Grafana / CloudWatch while still making the error visible.
missing_path = str(e.filename) if e.filename else str(e)
now = time.monotonic()
last = self._fnf_last_logged.get(missing_path, 0.0)
if now - last >= self._FNF_LOG_INTERVAL:
self._fnf_last_logged[missing_path] = now
logger.error(
f"Error processing chunk for {self.pipeline_id}: {e} "
f"(further identical errors suppressed for {self._FNF_LOG_INTERVAL:.0f}s)",
exc_info=False,
)
# Continue processing — the next param update may correct the path.
except Exception as e:
if self._is_recoverable(e):
logger.error(
Expand Down
127 changes: 127 additions & 0 deletions tests/test_pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,3 +341,130 @@ def test_find_reusable_pipeline_allows_self_reserved(self):
"node_a", "longlive", {}, claimed_keys=set(), reserved_keys={"node_a"}
)
assert result == "node_a"


# ---------------------------------------------------------------------------
# Tests for _sanitize_asset_path and _sanitize_initial_params
# ---------------------------------------------------------------------------


class TestSanitizeAssetPath:
"""Tests for PipelineManager._sanitize_asset_path and _sanitize_initial_params."""

def _mock_assets_dir(self, tmp_path, monkeypatch):
"""Patch get_assets_dir to return tmp_path/assets."""
assets_dir = tmp_path / "assets"
assets_dir.mkdir(parents=True, exist_ok=True)
monkeypatch.setattr(
"scope.server.pipeline_manager.PipelineManager._sanitize_asset_path.__func__",
None,
raising=False,
)
return assets_dir

def test_windows_backslash_path_is_rewritten(self, tmp_path, monkeypatch):
assets_dir = tmp_path / "assets"
assets_dir.mkdir()
monkeypatch.setattr(
"scope.server.models_config.get_assets_dir", lambda: assets_dir
)
result = PipelineManager._sanitize_asset_path(
r"C:\Users\Joshu\.daydream-scope\assets\ShinraFireForce.webp"
)
assert result == (assets_dir / "ShinraFireForce.webp").as_posix()

def test_windows_forward_slash_drive_path_is_rewritten(self, tmp_path, monkeypatch):
assets_dir = tmp_path / "assets"
assets_dir.mkdir()
monkeypatch.setattr(
"scope.server.models_config.get_assets_dir", lambda: assets_dir
)
result = PipelineManager._sanitize_asset_path(
"C:/Users/Joshu/.daydream-scope/assets/ShinraFireForce.webp"
)
assert result == (assets_dir / "ShinraFireForce.webp").as_posix()

def test_foreign_linux_tmp_path_is_rewritten(self, tmp_path, monkeypatch):
"""A /tmp/.daydream-scope/assets/… path from a different Linux machine is rewritten."""
assets_dir = tmp_path / "assets"
assets_dir.mkdir()
monkeypatch.setattr(
"scope.server.models_config.get_assets_dir", lambda: assets_dir
)
result = PipelineManager._sanitize_asset_path(
"/tmp/.daydream-scope/assets/hakoniwa_abc.png"
)
assert result == (assets_dir / "hakoniwa_abc.png").as_posix()

def test_relative_path_unchanged(self, tmp_path, monkeypatch):
assets_dir = tmp_path / "assets"
assets_dir.mkdir()
monkeypatch.setattr(
"scope.server.models_config.get_assets_dir", lambda: assets_dir
)
result = PipelineManager._sanitize_asset_path("image.png")
assert result == "image.png"

def test_sanitize_initial_params_none_value(self, tmp_path, monkeypatch):
"""_sanitize_initial_params should leave None values as None."""
assets_dir = tmp_path / "assets"
assets_dir.mkdir()
monkeypatch.setattr(
"scope.server.models_config.get_assets_dir", lambda: assets_dir
)
result = PipelineManager._sanitize_initial_params({"i2v_image": None})
assert result["i2v_image"] is None

def test_sanitize_initial_params_i2v_image_windows_path(self, tmp_path, monkeypatch):
assets_dir = tmp_path / "assets"
assets_dir.mkdir()
monkeypatch.setattr(
"scope.server.models_config.get_assets_dir", lambda: assets_dir
)
params = {
"prompts": [{"text": "test"}],
"i2v_image": r"C:\Users\Joshu\.daydream-scope\assets\ShinraFireForce.webp",
}
result = PipelineManager._sanitize_initial_params(params)
assert result["i2v_image"] == (assets_dir / "ShinraFireForce.webp").as_posix()

def test_sanitize_initial_params_i2v_image_linux_tmp_path(self, tmp_path, monkeypatch):
"""Linux /tmp path from a different machine is rewritten (issue #916)."""
assets_dir = tmp_path / "assets"
assets_dir.mkdir()
monkeypatch.setattr(
"scope.server.models_config.get_assets_dir", lambda: assets_dir
)
params = {
"i2v_image": "/tmp/.daydream-scope/assets/hakoniwa_abc.png",
}
result = PipelineManager._sanitize_initial_params(params)
assert result["i2v_image"] == (assets_dir / "hakoniwa_abc.png").as_posix()

def test_sanitize_initial_params_images_list(self, tmp_path, monkeypatch):
assets_dir = tmp_path / "assets"
assets_dir.mkdir()
monkeypatch.setattr(
"scope.server.models_config.get_assets_dir", lambda: assets_dir
)
params = {
"images": [
r"C:\Users\Joshu\.daydream-scope\assets\foo.webp",
r"C:\Users\Joshu\.daydream-scope\assets\bar.png",
]
}
result = PipelineManager._sanitize_initial_params(params)
assert result["images"] == [
(assets_dir / "foo.webp").as_posix(),
(assets_dir / "bar.png").as_posix(),
]

def test_sanitize_initial_params_no_asset_params_unchanged(self, tmp_path, monkeypatch):
assets_dir = tmp_path / "assets"
assets_dir.mkdir()
monkeypatch.setattr(
"scope.server.models_config.get_assets_dir", lambda: assets_dir
)
params = {"prompts": [{"text": "test"}], "seed": 42}
result = PipelineManager._sanitize_initial_params(params)
assert result == params
Loading