Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 33 additions & 13 deletions src/scope/cloud/fal_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,12 @@ def is_running(self) -> bool:
# Persistent shared directory for sample LoRAs (survives session cleanup)
SHARED_LORA_DIR = "/data/models/lora"

# Persistent user LoRA directory: stored on the /data volume so LoRAs installed
# by the user survive fal.ai worker resets between jobs. This is distinct from
# SHARED_LORA_DIR (which holds pre-bundled sample LoRAs) so the two can be
# managed and cleaned up independently.
USER_LORA_DIR = "/data/models/user-loras"


# Gates the "ready" WebSocket message until the previous session's cleanup completes.
# Initialized lazily to ensure an event loop is available.
Expand Down Expand Up @@ -286,25 +292,36 @@ def cleanup_session_data():
This prevents data leakage between users on fal.ai by clearing:
- Assets directory (uploaded images, videos)
- Recording files in temp directory
- User-installed LoRA directory (persistent volume, cleared per-session for isolation)
"""
from pathlib import Path

def _rmdir_contents(path: Path, label: str) -> None:
"""Delete all children of *path* without removing the directory itself."""
if not path.exists():
return
for item in path.iterdir():
try:
if item.is_file():
item.unlink()
elif item.is_dir():
shutil.rmtree(item)
except Exception as e:
print(f"Warning: Failed to delete {item}: {e}")
print(f"Cleaned up {label}: {path}")

try:
# Clean assets directory (matches DAYDREAM_SCOPE_ASSETS_DIR set in setup)
assets_dir = Path(ASSETS_DIR_PATH).expanduser()
if assets_dir.exists():
for item in assets_dir.iterdir():
try:
if item.is_file():
item.unlink()
elif item.is_dir():
shutil.rmtree(item)
except Exception as e:
print(f"Warning: Failed to delete {item}: {e}")
print(f"Cleaned up assets directory: {assets_dir}")
_rmdir_contents(Path(ASSETS_DIR_PATH).expanduser(), "assets directory")
except Exception as e:
print(f"Warning: Assets cleanup failed: {e}")

try:
# Clean user LoRA directory (persistent volume — must be wiped between
# sessions to prevent LoRA files from one user leaking to the next).
_rmdir_contents(Path(USER_LORA_DIR), "user LoRA directory")
except Exception as e:
print(f"Warning: Session cleanup failed: {e}")
print(f"Warning: User LoRA cleanup failed: {e}")


async def cleanup_installed_plugins():
Expand Down Expand Up @@ -479,7 +496,10 @@ def setup(self):
# not shared between users
scope_env["DAYDREAM_SCOPE_LOGS_DIR"] = ASSETS_DIR_PATH + "/logs"
scope_env["DAYDREAM_SCOPE_ASSETS_DIR"] = ASSETS_DIR_PATH
scope_env["DAYDREAM_SCOPE_LORA_DIR"] = ASSETS_DIR_PATH + "/lora"
# Store user-installed LoRAs on the persistent /data volume so they
# survive fal.ai worker resets between jobs. The tmp-based path was
# cleared on every new job, causing pipeline load failures (#923).
scope_env["DAYDREAM_SCOPE_LORA_DIR"] = USER_LORA_DIR
scope_env["DAYDREAM_SCOPE_LORA_SHARED_DIR"] = "/data/models/lora"
scope_env["UV_CACHE_DIR"] = "/tmp/uv-cache"

Expand Down
37 changes: 28 additions & 9 deletions src/scope/cloud/livepeer_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@
REMOTE_VIDEO_CLOCK_RATE = 90_000
REMOTE_VIDEO_TIME_BASE = fractions.Fraction(1, REMOTE_VIDEO_CLOCK_RATE)
ASSETS_DIR_PATH = os.getenv("DAYDREAM_SCOPE_ASSETS_DIR", "/tmp/.daydream-scope/assets")
# User LoRA dir lives on the persistent volume; read from env so it matches the
# value injected by the outer fal app at worker start (#923).
USER_LORA_DIR = os.getenv("DAYDREAM_SCOPE_LORA_DIR", "/data/models/user-loras")


@asynccontextmanager
Expand Down Expand Up @@ -1178,16 +1181,15 @@ async def _cleanup_plugins_via_scope_client() -> dict[str, Any]:
}


def _cleanup_assets_dir() -> dict[str, Any]:
"""Delete all files and directories inside the configured assets directory."""
assets_dir = Path(ASSETS_DIR_PATH).expanduser()
def _cleanup_dir_contents(dir_path: Path) -> dict[str, Any]:
"""Delete all files and directories inside *dir_path* without removing it."""
deleted = 0
errors: list[dict[str, str]] = []

if not assets_dir.exists():
return {"path": str(assets_dir), "deleted": deleted, "errors": errors}
if not dir_path.exists():
return {"path": str(dir_path), "deleted": deleted, "errors": errors}

for item in assets_dir.iterdir():
for item in dir_path.iterdir():
try:
if item.is_file():
item.unlink()
Expand All @@ -1198,24 +1200,41 @@ def _cleanup_assets_dir() -> dict[str, Any]:
except Exception as exc:
errors.append({"path": str(item), "error": str(exc)})

return {"path": str(assets_dir), "deleted": deleted, "errors": errors}
return {"path": str(dir_path), "deleted": deleted, "errors": errors}


def _cleanup_assets_dir() -> dict[str, Any]:
"""Delete all files and directories inside the configured assets directory."""
return _cleanup_dir_contents(Path(ASSETS_DIR_PATH).expanduser())


def _cleanup_user_lora_dir() -> dict[str, Any]:
"""Delete user-installed LoRAs from the persistent volume.

User LoRAs are stored on /data (not /tmp) so they survive worker resets
within a session (#923). They must still be wiped at session end to
prevent one user's LoRAs from leaking to the next user on the same worker.
"""
return _cleanup_dir_contents(Path(USER_LORA_DIR))


@app.post("/internal/cleanup-session")
async def cleanup_session() -> dict[str, Any]:
"""Cleanup plugins and assets after the outer fal websocket disconnects."""
"""Cleanup plugins, assets and user LoRAs after the outer fal websocket disconnects."""
try:
plugins = await _cleanup_plugins_via_scope_client()
assets = _cleanup_assets_dir()
loras = _cleanup_user_lora_dir()
except RuntimeError as exc:
raise HTTPException(status_code=503, detail=str(exc)) from exc
except Exception as exc:
raise HTTPException(status_code=500, detail=str(exc)) from exc

return {
"ok": not plugins["failed"] and not assets["errors"],
"ok": not plugins["failed"] and not assets["errors"] and not loras["errors"],
"plugins": plugins,
"assets": assets,
"loras": loras,
}


Expand Down
8 changes: 7 additions & 1 deletion src/scope/cloud/livepeer_fal_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
RUNNER_FAILURE_WINDOW_SECONDS = 60.0
ASSETS_DIR_PATH = "/tmp/.daydream-scope/assets"

# Persistent user LoRA directory (matches fal_app.py). Stored on the /data
# volume so user-installed LoRAs survive fal.ai worker resets (#923).
USER_LORA_DIR = "/data/models/user-loras"

# Gates startup cleanup so only one cleanup run executes at a time.
_cleanup_event: asyncio.Event | None = None

Expand Down Expand Up @@ -272,7 +276,9 @@ def setup(self):
runner_env.setdefault("DAYDREAM_SCOPE_MODELS_DIR", "/data/models")
runner_env.setdefault("DAYDREAM_SCOPE_LORA_SHARED_DIR", "/data/models/lora")
runner_env.setdefault("DAYDREAM_SCOPE_ASSETS_DIR", ASSETS_DIR_PATH)
runner_env.setdefault("DAYDREAM_SCOPE_LORA_DIR", ASSETS_DIR_PATH + "/lora")
# Store user-installed LoRAs on the persistent /data volume so they
# survive fal.ai worker resets between jobs (#923).
runner_env.setdefault("DAYDREAM_SCOPE_LORA_DIR", USER_LORA_DIR)
runner_env.setdefault("DAYDREAM_SCOPE_LOGS_DIR", ASSETS_DIR_PATH + "/logs")
runner_env.setdefault(
"DAYDREAM_SCOPE_PLUGINS_DIR", ASSETS_DIR_PATH + "/plugins"
Expand Down
17 changes: 17 additions & 0 deletions src/scope/core/pipelines/wan2_1/lora/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,23 @@ def parse_lora_weights(
f"parse_lora_weights: Matched base_key='{base_key}' -> model_key='{model_key}'"
)

# Validate LoRA dimensions against the model weight before injecting.
# lora_A shape: [rank, in_features] — in_features must match model weight dim 1
# lora_B shape: [out_features, rank] — out_features must match model weight dim 0
# (model weight shape is [out_features, in_features] for nn.Linear)
model_weight = model_state.get(model_key)
if model_weight is not None and lora_A.ndim == 2 and lora_B.ndim == 2:
lora_in = lora_A.shape[1] # LoRA expects this input dimension
lora_out = lora_B.shape[0] # LoRA expects this output dimension
model_out, model_in = model_weight.shape[0], model_weight.shape[1]
if lora_in != model_in or lora_out != model_out:
raise ValueError(
f"LoRA dimension mismatch at layer '{base_key}': "
f"LoRA expects ({lora_out}×{lora_in}) but model layer is ({model_out}×{model_in}). "
f"This LoRA was likely trained for a different model size (e.g. Wan2.1-5B vs 1.3B). "
f"Please use a LoRA that matches the loaded model architecture."
)

# Extract alpha and rank
alpha = None
if alpha_key and alpha_key in lora_state:
Expand Down
87 changes: 87 additions & 0 deletions tests/test_lora_dimension_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"""Tests for LoRA dimension validation in parse_lora_weights.

Regression test for issue #922: a LoRA trained for Wan2.1-5B (in_features=5120)
was silently loaded into the Wan2.1-1.3B model (in_features=1536) and only
failed 156 times at inference time with an inscrutable RuntimeError.
"""

import pytest
import torch

from scope.core.pipelines.wan2_1.lora.utils import parse_lora_weights


def _make_model_state(in_features: int, out_features: int = 256) -> dict:
"""Minimal model state dict with one linear layer."""
return {
"blocks.0.self_attn.q.weight": torch.zeros(out_features, in_features),
}


def _make_lora_state(rank: int, in_features: int, out_features: int = 256) -> dict:
"""Minimal PEFT-format LoRA state targeting the same layer."""
return {
"diffusion_model.blocks.0.self_attn.q.lora_A.weight": torch.zeros(rank, in_features),
"diffusion_model.blocks.0.self_attn.q.lora_B.weight": torch.zeros(out_features, rank),
}


class TestLoRADimensionValidation:
"""Verify parse_lora_weights raises a clear error on dimension mismatch."""

def test_compatible_lora_loads_successfully(self):
"""LoRA matching the model's dimensions should parse without error."""
model_state = _make_model_state(in_features=1536)
lora_state = _make_lora_state(rank=32, in_features=1536)

mapping = parse_lora_weights(lora_state, model_state)

assert len(mapping) == 1
key = "blocks.0.self_attn.q.weight"
assert key in mapping
assert mapping[key]["rank"] == 32

def test_incompatible_lora_raises_value_error(self):
"""LoRA trained for 5B (in_features=5120) must not silently load into 1.3B (in_features=1536)."""
model_state = _make_model_state(in_features=1536) # 1.3B model
lora_state = _make_lora_state(rank=32, in_features=5120) # 5B LoRA

with pytest.raises(ValueError, match="LoRA dimension mismatch"):
parse_lora_weights(lora_state, model_state)

def test_error_message_is_user_friendly(self):
"""The error message should name the layer and the dimension sizes."""
model_state = _make_model_state(in_features=1536)
lora_state = _make_lora_state(rank=32, in_features=5120)

with pytest.raises(ValueError) as exc_info:
parse_lora_weights(lora_state, model_state)

msg = str(exc_info.value)
assert "blocks.0.self_attn.q" in msg, "Layer name should appear in error"
assert "5120" in msg, "LoRA in_features should appear in error"
assert "1536" in msg, "Model in_features should appear in error"
assert "model size" in msg.lower() or "architecture" in msg.lower(), (
"Error should hint at model size mismatch"
)

def test_out_features_mismatch_also_caught(self):
"""LoRA with wrong output dimension should also be rejected."""
model_state = _make_model_state(in_features=1536, out_features=256)
# LoRA with matching in_features but wrong out_features
lora_state = {
"diffusion_model.blocks.0.self_attn.q.lora_A.weight": torch.zeros(32, 1536),
"diffusion_model.blocks.0.self_attn.q.lora_B.weight": torch.zeros(512, 32), # wrong
}

with pytest.raises(ValueError, match="LoRA dimension mismatch"):
parse_lora_weights(lora_state, model_state)

def test_compatible_5b_lora_on_5b_model(self):
"""LoRA trained for 5B on a 5B model should load fine."""
model_state = _make_model_state(in_features=5120, out_features=5120)
lora_state = _make_lora_state(rank=32, in_features=5120, out_features=5120)

mapping = parse_lora_weights(lora_state, model_state)

assert len(mapping) == 1
Loading