From 0d581c3a004282958258ffe91dcf377b17223ac7 Mon Sep 17 00:00:00 2001 From: "Tessa (livepeer-tessa)" Date: Tue, 14 Apr 2026 06:25:06 +0000 Subject: [PATCH] fix: distinguish transient plugin-not-installed from truly invalid pipeline ID (#936) Signed-off-by: Tessa (livepeer-tessa) --- src/scope/server/pipeline_manager.py | 43 +++++- tests/test_pipeline_race_condition.py | 183 ++++++++++++++++++++++++++ 2 files changed, 225 insertions(+), 1 deletion(-) create mode 100644 tests/test_pipeline_race_condition.py diff --git a/src/scope/server/pipeline_manager.py b/src/scope/server/pipeline_manager.py index 22ac2588e..5701aef59 100644 --- a/src/scope/server/pipeline_manager.py +++ b/src/scope/server/pipeline_manager.py @@ -27,6 +27,22 @@ class PipelineNotAvailableException(Exception): pass +class PipelineNotYetRegisteredException(ValueError): + """Exception raised when a pipeline ID is not in the registry yet. + + This is a *transient* error — it typically occurs during cloud session + initialization when the frontend concurrently requests a plugin install + and a pipeline load. The pipeline load may arrive before the plugin has + finished installing and registering itself, so the registry lookup returns + ``None`` even though the pipeline ID will eventually become valid. + + Callers should treat this as a retriable condition rather than a hard + error. + """ + + pass + + class PipelineStatus(Enum): """Pipeline loading status enumeration.""" @@ -336,6 +352,29 @@ def _load_pipeline_by_id_sync( ) return True + except PipelineNotYetRegisteredException: + # Transient race condition: the pipeline plugin hasn't finished + # installing yet. Log at WARN (not ERROR) and leave the status as + # NOT_LOADED so the frontend doesn't show an error state and the + # load can be retried transparently once the plugin is registered. + self.set_loading_stage(None) + logger.warning( + f"Pipeline '{key}' is not registered — the plugin may still be " + f"installing. This is likely a transient race condition and will " + f"resolve once the plugin is installed." + ) + with self._lock: + self._pipeline_statuses[key] = PipelineStatus.NOT_LOADED + if key in self._pipelines: + del self._pipelines[key] + if key in self._pipeline_load_params: + del self._pipeline_load_params[key] + if key in self._pipeline_registry_ids: + del self._pipeline_registry_ids[key] + if key in self._load_events: + self._load_events[key].set() + return False + except Exception as e: self.set_loading_stage(None) from .models_config import get_models_dir @@ -1385,7 +1424,9 @@ def _load_pipeline_implementation( logger.info("OpticalFlow pipeline initialized") return pipeline else: - raise ValueError(f"Invalid pipeline ID: {pipeline_id}") + raise PipelineNotYetRegisteredException( + f"Invalid pipeline ID: {pipeline_id}. Plugin may not be installed yet." + ) def is_loaded(self) -> bool: """Check if pipeline is loaded and ready (thread-safe).""" diff --git a/tests/test_pipeline_race_condition.py b/tests/test_pipeline_race_condition.py new file mode 100644 index 000000000..a2d9f0728 --- /dev/null +++ b/tests/test_pipeline_race_condition.py @@ -0,0 +1,183 @@ +"""Tests for issue #936 — transient race condition when a plugin isn't yet installed. + +Verifies that: +1. ``_load_pipeline_implementation`` raises ``PipelineNotYetRegisteredException`` + (not a plain ``ValueError``) when the pipeline ID is unknown and not a builtin. +2. ``_load_pipeline_by_id_sync`` returns ``False`` **without** setting the + pipeline status to ``ERROR`` when it catches that exception — it must leave + the status as ``NOT_LOADED`` so the load can be retried once the plugin + finishes installing. +""" + +from unittest.mock import MagicMock, patch + +import pytest + +from scope.server.pipeline_manager import ( + PipelineManager, + PipelineNotYetRegisteredException, + PipelineStatus, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_manager() -> PipelineManager: + return PipelineManager() + + +# --------------------------------------------------------------------------- +# Unit tests for _load_pipeline_implementation +# --------------------------------------------------------------------------- + + +class TestLoadPipelineImplementationUnknownId: + """_load_pipeline_implementation must raise PipelineNotYetRegisteredException + for a pipeline ID that is neither a builtin nor in the registry.""" + + def test_raises_for_unregistered_non_builtin(self): + manager = _make_manager() + + # Patch PipelineRegistry.get to return None (plugin not installed yet) + with patch( + "scope.core.pipelines.registry.PipelineRegistry.get", return_value=None + ): + with pytest.raises(PipelineNotYetRegisteredException) as exc_info: + manager._load_pipeline_implementation("yolo_mask") + + assert "yolo_mask" in str(exc_info.value) + + def test_exception_is_subclass_of_value_error(self): + """PipelineNotYetRegisteredException must be a ValueError subclass.""" + assert issubclass(PipelineNotYetRegisteredException, ValueError) + + def test_does_not_raise_for_builtin(self): + """Built-in pipeline IDs should NOT raise PipelineNotYetRegisteredException + — they fall through to their own initialisation logic (which may succeed + or fail for unrelated reasons, but must never raise the transient + plugin-not-yet-registered exception).""" + manager = _make_manager() + + # "passthrough" is a builtin; a missing registry entry is fine for it. + # The implementation either returns successfully or raises something that + # is NOT PipelineNotYetRegisteredException. + with patch( + "scope.core.pipelines.registry.PipelineRegistry.get", return_value=None + ): + try: + manager._load_pipeline_implementation("passthrough") + # Completed without exception — that's also fine. + except PipelineNotYetRegisteredException: + pytest.fail( + "Built-in pipelines should never raise PipelineNotYetRegisteredException" + ) + except Exception: + # Any other exception (ImportError, etc.) is acceptable. + pass + + +# --------------------------------------------------------------------------- +# Integration tests for _load_pipeline_by_id_sync +# --------------------------------------------------------------------------- + + +class TestLoadPipelineByIdSyncRaceCondition: + """_load_pipeline_by_id_sync must handle PipelineNotYetRegisteredException + gracefully — returning False and leaving status as NOT_LOADED (not ERROR).""" + + def _sync_load_with_not_yet_registered( + self, pipeline_id: str = "yolo_mask" + ) -> tuple[PipelineManager, bool]: + """Run _load_pipeline_by_id_sync where _load_pipeline_implementation + raises PipelineNotYetRegisteredException, and return (manager, result).""" + manager = _make_manager() + + def fake_impl(pid, load_params=None, stage_callback=None): + raise PipelineNotYetRegisteredException( + f"Invalid pipeline ID: {pid}. Plugin may not be installed yet." + ) + + with patch.object(manager, "_load_pipeline_implementation", side_effect=fake_impl): + result = manager._load_pipeline_by_id_sync(pipeline_id) + + return manager, result + + def test_returns_false(self): + _, result = self._sync_load_with_not_yet_registered() + assert result is False + + def test_status_is_not_loaded_not_error(self): + """Status must be NOT_LOADED so the frontend never sees ERROR.""" + manager, _ = self._sync_load_with_not_yet_registered() + status = manager._pipeline_statuses.get("yolo_mask") + assert status == PipelineStatus.NOT_LOADED, ( + f"Expected NOT_LOADED, got {status!r}" + ) + + def test_pipeline_not_stored(self): + """No pipeline instance should be stored after a transient failure.""" + manager, _ = self._sync_load_with_not_yet_registered() + assert "yolo_mask" not in manager._pipelines + + def test_load_event_is_set(self): + """The load event must be signalled so any waiting threads are unblocked.""" + manager = _make_manager() + + def fake_impl(pid, load_params=None, stage_callback=None): + raise PipelineNotYetRegisteredException( + f"Invalid pipeline ID: {pid}. Plugin may not be installed yet." + ) + + with patch.object(manager, "_load_pipeline_implementation", side_effect=fake_impl): + manager._load_pipeline_by_id_sync("yolo_mask") + + event = manager._load_events.get("yolo_mask") + # The event should have been set (or cleaned up — either is acceptable, + # but it must not be left unset and blocking). + if event is not None: + assert event.is_set(), "Load event must be set after transient failure" + + def test_no_error_log_emitted(self, caplog): + """No ERROR-level log message should be emitted for a transient failure.""" + import logging + + manager = _make_manager() + + def fake_impl(pid, load_params=None, stage_callback=None): + raise PipelineNotYetRegisteredException( + f"Invalid pipeline ID: {pid}. Plugin may not be installed yet." + ) + + with patch.object(manager, "_load_pipeline_implementation", side_effect=fake_impl): + with caplog.at_level(logging.WARNING, logger="scope.server.pipeline_manager"): + manager._load_pipeline_by_id_sync("yolo_mask") + + error_records = [r for r in caplog.records if r.levelno >= logging.ERROR] + assert not error_records, ( + f"Unexpected ERROR log(s): {[r.message for r in error_records]}" + ) + + def test_warning_log_emitted(self, caplog): + """A WARNING-level log should be emitted to explain the transient state.""" + import logging + + manager = _make_manager() + + def fake_impl(pid, load_params=None, stage_callback=None): + raise PipelineNotYetRegisteredException( + f"Invalid pipeline ID: yolo_mask. Plugin may not be installed yet." + ) + + with patch.object(manager, "_load_pipeline_implementation", side_effect=fake_impl): + with caplog.at_level(logging.WARNING, logger="scope.server.pipeline_manager"): + manager._load_pipeline_by_id_sync("yolo_mask") + + warn_records = [r for r in caplog.records if r.levelno == logging.WARNING] + assert warn_records, "Expected at least one WARNING log for transient failure" + combined = " ".join(r.message for r in warn_records) + assert "plugin" in combined.lower() or "installing" in combined.lower(), ( + "Warning should mention plugin/installing" + )