From 15fb53f64df53032151cde6c52ecb20c9754bc89 Mon Sep 17 00:00:00 2001 From: Rafal Leszko Date: Mon, 20 Apr 2026 09:24:21 +0000 Subject: [PATCH] fix: stop active pipeline workers before unload to free VRAM MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When /pipeline/load triggered a swap while a PipelineProcessor worker was still producing frames, the unload path dropped pipeline_manager's reference and called gc.collect()/empty_cache(), but the worker thread kept the pipeline object alive through its closure and continued allocating CUDA memory. The next load (e.g. longlive after ltx2) OOMed with ~30 GiB still in use despite logging "CUDA cache cleared". Add a pre-unload hook registry on PipelineManager. graph_executor registers each processor's stop() under its node_id at creation time, and FrameProcessor.stop() unregisters on normal teardown. The hook fires synchronously inside _unload_pipeline_by_id_unsafe BEFORE the pipeline reference is dropped, so the worker exits and releases its tensors first — then gc/empty_cache can actually reclaim VRAM. Verified: loading ltx2, running a session, then POSTing /pipeline/load {longlive, passthrough} without a session stop now succeeds. Log sequence is Unloading → PipelineProcessor stopped → CUDA cache cleared, and the next session starts cleanly. Co-Authored-By: Claude Opus 4.7 (1M context) Signed-off-by: Rafal Leszko --- src/scope/server/frame_processor.py | 5 ++++ src/scope/server/graph_executor.py | 9 +++++++ src/scope/server/pipeline_manager.py | 35 ++++++++++++++++++++++++++++ 3 files changed, 49 insertions(+) diff --git a/src/scope/server/frame_processor.py b/src/scope/server/frame_processor.py index 712966274..f072f990c 100644 --- a/src/scope/server/frame_processor.py +++ b/src/scope/server/frame_processor.py @@ -292,6 +292,11 @@ def stop(self, error_message: str = None): for processor in self.pipeline_processors: processor.stop() + # Remove the pre-unload hooks registered in graph_executor so the + # pipeline_manager doesn't retain stale closures over stopped processors. + for node_id in list(self._processors_by_node_id.keys()): + self.pipeline_manager.unregister_pre_unload_hook(node_id) + # Clear pipeline processors self.pipeline_processors.clear() diff --git a/src/scope/server/graph_executor.py b/src/scope/server/graph_executor.py index 3939f510f..2a71fe6e4 100644 --- a/src/scope/server/graph_executor.py +++ b/src/scope/server/graph_executor.py @@ -137,6 +137,15 @@ def build_graph( node_processors[node.id] = processor pipeline_ids.append(node.pipeline_id) + # If this pipeline gets unloaded mid-session (e.g. a user-triggered + # pipeline swap), the worker thread must exit before the manager drops + # its reference — otherwise the worker keeps allocating GPU memory + # right through the "unload" and OOMs the next load. Capture processor + # by default-arg to avoid the late-binding closure pitfall. + pipeline_manager.register_pre_unload_hook( + node.id, lambda _key, p=processor: p.stop() + ) + for e in graph.edges_to(node.id): if e.kind != "stream": continue diff --git a/src/scope/server/pipeline_manager.py b/src/scope/server/pipeline_manager.py index 22ac2588e..0d0aee0a8 100644 --- a/src/scope/server/pipeline_manager.py +++ b/src/scope/server/pipeline_manager.py @@ -5,6 +5,7 @@ import logging import threading import time +from collections.abc import Callable from enum import Enum from typing import Any @@ -64,11 +65,33 @@ def __init__(self): # Loading stage for frontend display (e.g., "Loading diffusion model...") self._loading_stage: str | None = None + # Pre-unload hooks keyed by instance_key. Invoked before a pipeline is + # removed so callers (e.g. PipelineProcessor workers) can release their + # pipeline reference and stop allocating GPU memory. Without this, a + # swap-while-active leaves worker threads holding live CUDA tensors and + # the next load OOMs. + self._pre_unload_hooks: dict[str, Callable[[str], None]] = {} + def set_loading_stage(self, stage: str | None) -> None: """Set the current loading stage (thread-safe).""" with self._lock: self._loading_stage = stage + def register_pre_unload_hook(self, key: str, hook: Callable[[str], None]) -> None: + """Register a callback invoked before *key* is unloaded. + + Used by PipelineProcessor to ensure its worker thread is stopped — and + therefore releases its pipeline reference and GPU tensors — before the + next load begins. The hook MUST block until its worker has exited. + """ + with self._lock: + self._pre_unload_hooks[key] = hook + + def unregister_pre_unload_hook(self, key: str) -> None: + """Remove a previously registered hook (idempotent).""" + with self._lock: + self._pre_unload_hooks.pop(key, None) + @property def status(self) -> PipelineStatus: """Get current pipeline status.""" @@ -824,6 +847,18 @@ def _unload_pipeline_by_id_unsafe( logger.info(f"Unloading pipeline: {pipeline_id}") + # Stop any active worker (e.g. PipelineProcessor thread) bound to this + # key BEFORE dropping our reference. If we skip this, the worker keeps + # running during the subsequent gc/empty_cache, holds the pipeline alive + # via its closure, and continues allocating CUDA memory — causing the + # next load to OOM despite our "cleanup". + hook = self._pre_unload_hooks.pop(pipeline_id, None) + if hook is not None: + try: + hook(pipeline_id) + except Exception as e: + logger.warning(f"Pre-unload hook for {pipeline_id} raised: {e}") + # Remove from tracked pipelines if pipeline_id in self._pipelines: del self._pipelines[pipeline_id]