diff --git a/docs/source/fault_tolerance/usage_guide.rst b/docs/source/fault_tolerance/usage_guide.rst index 09fe9b3b..02138675 100644 --- a/docs/source/fault_tolerance/usage_guide.rst +++ b/docs/source/fault_tolerance/usage_guide.rst @@ -229,6 +229,7 @@ service dependencies. - ``--ft-attribution-llm-api-key-file `` (alias: ``--ft_attribution_llm_api_key_file``) - ``--ft-attribution-llm-base-url `` (alias: ``--ft_attribution_llm_base_url``) - ``--ft-attribution-llm-model `` (alias: ``--ft_attribution_llm_model``) + - ``--ft-attribution-analysis-backend mcp`` (alias: ``--ft_attribution_analysis_backend``) - ``--ft-attribution-startup-timeout `` (alias: ``--ft_attribution_startup_timeout``), default ``20`` - ``--ft-attribution-decision-timeout `` (alias: ``--ft_attribution_decision_timeout``), default ``60`` - ``--ft-attribution-export-url `` (alias: ``--ft_attribution_export_url``) @@ -245,6 +246,10 @@ service dependencies. To export managed attribution results, pass ``--ft-attribution-export-url`` or set ``attribution_export_url`` in the fault tolerance YAML config. + Launcher-managed attribution supports only the ``mcp`` analysis backend. The backend flag may be + left unset, which uses the service default, or set explicitly to ``mcp``. The in-process ``lib`` + backend is available only when running ``nvrx-attrsvc`` as a standalone service. + ``--ft-attribution-decision-timeout`` is the total launcher-side budget for one attribution decision, measured from the terminal analysis request until the rendezvous host fetches the STOP/RESTART recommendation. diff --git a/src/nvidia_resiliency_ext/attribution/orchestration/log_analyzer.py b/src/nvidia_resiliency_ext/attribution/orchestration/log_analyzer.py index 6d067d39..a1856bd2 100644 --- a/src/nvidia_resiliency_ext/attribution/orchestration/log_analyzer.py +++ b/src/nvidia_resiliency_ext/attribution/orchestration/log_analyzer.py @@ -58,6 +58,7 @@ def __init__(self, config: LogSageExecutionConfig): # Set on first failed NVRxLogAnalyzer() so we do not retry init every request. self._lib_log_analyzer_init_error: Optional[BaseException] = None self._log_analysis_lock = asyncio.Lock() + self._lib_log_analysis_tasks: set[asyncio.Task[Any]] = set() self._mcp_client: Any = None if not config.use_lib_log_analysis: try: @@ -122,6 +123,59 @@ async def reconnect_mcp(self) -> bool: return True return await self._mcp_client.reconnect() + @staticmethod + def _run_lib_log_analyzer_sync(analyzer: Any, run_kwargs: Dict[str, Any]) -> Any: + """Run the in-process LogSage coroutine on this worker thread's event loop.""" + return asyncio.run(analyzer.run(dict(run_kwargs))) + + async def _run_lib_log_analyzer_serialized( + self, analyzer: Any, run_kwargs: Dict[str, Any] + ) -> Any: + """Run in-process LogSage without blocking the caller's event loop. + + In lib mode, ``NVRxLogAnalyzer.run`` is async-shaped but can spend most + of its wall time inside synchronous LogSage/LLM work. If that runs on + the attrsvc/coalescer event loop, ``asyncio.wait_for`` cannot fire until + the work has already completed. Offloading the full lib run gives the + coalescer an actual await point to time out on. + + The serialization lock is released when the worker task finishes, not + when the waiting request is cancelled, so a timed-out lib analysis does + not overlap a retry on the same singleton analyzer. + """ + await self._log_analysis_lock.acquire() + try: + worker_task = asyncio.create_task( + asyncio.to_thread(self._run_lib_log_analyzer_sync, analyzer, run_kwargs) + ) + except BaseException: + self._log_analysis_lock.release() + raise + + self._lib_log_analysis_tasks.add(worker_task) + waiter_cancelled = False + + def _finish_lib_run(task: "asyncio.Task[Any]") -> None: + self._lib_log_analysis_tasks.discard(task) + if self._log_analysis_lock.locked(): + self._log_analysis_lock.release() + try: + exc = task.exception() + except asyncio.CancelledError: + return + if exc is not None and waiter_cancelled: + logger.warning( + "Lib LogSage analysis worker failed after caller cancellation or timeout", + exc_info=(type(exc), exc, exc.__traceback__), + ) + + worker_task.add_done_callback(_finish_lib_run) + try: + return await asyncio.shield(worker_task) + except asyncio.CancelledError: + waiter_cancelled = True + raise + async def _get_lib_log_analyzer(self, run_kwargs: Dict[str, Any]) -> Any: """Return the cached in-process :class:`NVRxLogAnalyzer`, creating it on first success. @@ -163,8 +217,7 @@ async def _fetch_log_result_lib(self, path: str) -> Dict[str, Any]: **self.config.llm_runtime_overrides(), } analyzer = await self._get_lib_log_analyzer(run_kwargs) - async with self._log_analysis_lock: - result = await analyzer.run(run_kwargs) + result = await self._run_lib_log_analyzer_serialized(analyzer, run_kwargs) return nvrx_run_result_to_log_dict(result, path) diff --git a/src/nvidia_resiliency_ext/fault_tolerance/attribution_manager.py b/src/nvidia_resiliency_ext/fault_tolerance/attribution_manager.py index e494d436..d42b969a 100644 --- a/src/nvidia_resiliency_ext/fault_tolerance/attribution_manager.py +++ b/src/nvidia_resiliency_ext/fault_tolerance/attribution_manager.py @@ -59,6 +59,13 @@ class AttributionConfig: log_level: Optional[str] = None export_url: Optional[str] = None + def __post_init__(self) -> None: + object.__setattr__( + self, + "analysis_backend", + _normalize_analysis_backend(self.analysis_backend), + ) + @property def is_enabled(self) -> bool: return self.endpoint is not None @@ -415,6 +422,20 @@ def _resolve_decision_timeout(args: Any, ft_cfg: FaultToleranceConfig) -> Option return value +def _normalize_analysis_backend(value: Optional[str]) -> Optional[str]: + if value is None: + return None + normalized = str(value).strip().lower() + if not normalized: + return None + if normalized != "mcp": + raise ValueError( + "--ft-attribution-analysis-backend supports only 'mcp'; " + f"'lib' is no longer supported by launcher-managed attrsvc, got {value!r}" + ) + return normalized + + def _validate_export_url(url: str) -> None: parsed = urlparse(url) if parsed.scheme not in _EXPORT_URL_SCHEMES or not parsed.netloc: diff --git a/src/nvidia_resiliency_ext/fault_tolerance/cli_args.py b/src/nvidia_resiliency_ext/fault_tolerance/cli_args.py index f1521486..56a58332 100644 --- a/src/nvidia_resiliency_ext/fault_tolerance/cli_args.py +++ b/src/nvidia_resiliency_ext/fault_tolerance/cli_args.py @@ -384,11 +384,15 @@ def _add_attribution_args(parser: argparse.ArgumentParser) -> None: parser.add_argument( "--ft-attribution-analysis-backend", "--ft_attribution_analysis_backend", - type=str, + type=str.lower, default=None, dest="ft_attribution_analysis_backend", - choices=("mcp", "lib"), - help="Analysis backend for launcher-managed attribution service: mcp or lib.", + choices=("mcp",), + metavar="mcp", + help=( + "Analysis backend for launcher-managed attribution service. " + "Only mcp is supported; use standalone nvrx-attrsvc for lib backend experiments." + ), ) parser.add_argument( "--ft-attribution-decision-timeout", diff --git a/tests/attribution/unit/test_log_analyzer_observability.py b/tests/attribution/unit/test_log_analyzer_observability.py index 2d80e155..0b2ff30f 100644 --- a/tests/attribution/unit/test_log_analyzer_observability.py +++ b/tests/attribution/unit/test_log_analyzer_observability.py @@ -3,12 +3,14 @@ import asyncio import importlib +import logging import sys import threading import time import types from typing import Any, Dict +from nvidia_resiliency_ext.attribution.coalescing.coalescer import RequestCoalescer from nvidia_resiliency_ext.attribution.orchestration.analysis_pipeline import AnalysisPipelineMode from nvidia_resiliency_ext.attribution.orchestration.config import LogSageExecutionConfig @@ -34,19 +36,72 @@ def _import_log_analyzer_with_optional_dependency_stubs(monkeypatch): langchain_openai = _stub_module(monkeypatch, "langchain_openai") langchain_openai.ChatOpenAI = object + output_parsers = _stub_module(monkeypatch, "langchain_core.output_parsers") + output_parsers.StrOutputParser = object + prompts = _stub_module(monkeypatch, "langchain_core.prompts") + prompts.ChatPromptTemplate = types.SimpleNamespace( + from_template=lambda *_args, **_kwargs: object() + ) + runnables = _stub_module(monkeypatch, "langchain_core.runnables") + runnables.RunnablePassthrough = object + _stub_module(monkeypatch, "logsage") _stub_module(monkeypatch, "logsage.auto_resume_policy") attribution_classes = _stub_module( monkeypatch, "logsage.auto_resume_policy.attribution_classes" ) + + class StubErrorAttribution: + def __init__(self, **kwargs): + self.__dict__.update(kwargs) + + stub_attribution = types.SimpleNamespace( + APPLICATION_DONE="APPLICATION_DONE", + ERRORS_NOT_FOUND="ERRORS_NOT_FOUND", + LLM_FAILURE="LLM_FAILURE", + SLURM_STEP_CANCELLED="SLURM_STEP_CANCELLED", + SLURM_STEP_CANCELLED_JOB_REQUEUE="SLURM_STEP_CANCELLED_JOB_REQUEUE", + ) + stub_auto_resume = types.SimpleNamespace( + ERRORS_NOT_FOUND="ERRORS_NOT_FOUND", + LLM_FAILURE="LLM_FAILURE", + RESTART_IMMEDIATE="RESTART IMMEDIATE", + STOP_NO_RESTART="STOP - DONT RESTART IMMEDIATE", + ) + stub_finished = types.SimpleNamespace( + APPLICATION_DONE="APPLICATION_DONE", + LLM_FAILURE="LLM_FAILURE", + SLURM_CANCELLED="SLURM_CANCELLED", + SLURM_CANCELLED_JOB_REQUEUE="SLURM_CANCELLED_JOB_REQUEUE", + SLURM_CANCELLED_TIME_LIMIT="SLURM_CANCELLED_TIME_LIMIT", + ) attribution_classes.ApplicationData = object + attribution_classes.Attribution = stub_attribution + attribution_classes.AutoResumeAction = stub_auto_resume + attribution_classes.ErrorAttribution = StubErrorAttribution + attribution_classes.FinishedStatus = stub_finished attribution_classes.LRUCache = object error_attribution = _stub_module(monkeypatch, "logsage.auto_resume_policy.error_attribution") + error_attribution.CONTEXT_SIZE = 4096 + error_attribution.get_attribution = lambda *args, **kwargs: (None, None, None, None) + error_attribution.get_auto_resume = lambda *args, **kwargs: ("", "") error_attribution.get_proposed_solution_cat = lambda *args, **kwargs: None error_extraction = _stub_module(monkeypatch, "logsage.auto_resume_policy.error_extraction") + error_extraction.finished_validation = lambda _llm, data: data error_extraction.return_application_errors = lambda *args, **kwargs: [] + error_extraction.return_application_errors_rt = lambda *args, **kwargs: types.SimpleNamespace( + checkpoint_saved=False + ) + prompts_mod = _stub_module(monkeypatch, "logsage.auto_resume_policy.prompts") + prompts_mod.template_post_error_check = "" + util_postprocessing = _stub_module( + monkeypatch, "logsage.auto_resume_policy.util_postprocessing" + ) + util_postprocessing.get_auto_resume_postprocessing = lambda *args, **kwargs: False + utils = _stub_module(monkeypatch, "logsage.auto_resume_policy.utils") + utils.chunk_indices = lambda *args, **kwargs: [] httpx = _stub_module(monkeypatch, "httpx") httpx.post = lambda *args, **kwargs: types.SimpleNamespace(status_code=201, text="created") @@ -213,6 +268,83 @@ async def run() -> None: assert kwargs["max_tokens"] == 0 +def test_log_sage_runner_lib_run_allows_coalescer_timeout(monkeypatch): + _import_log_analyzer_with_optional_dependency_stubs(monkeypatch) + module = importlib.import_module("nvidia_resiliency_ext.attribution.orchestration.log_analyzer") + runner = module.LogSageRunner(LogSageExecutionConfig(use_lib_log_analysis=True)) + started = threading.Event() + finished = threading.Event() + + class BlockingFakeLogAnalyzer: + async def run(self, kwargs: dict[str, Any]) -> list[Any]: + started.set() + time.sleep(0.5) + finished.set() + return [] + + async def fake_get_lib_log_analyzer(kwargs: dict[str, Any]) -> BlockingFakeLogAnalyzer: + return BlockingFakeLogAnalyzer() + + runner._get_lib_log_analyzer = fake_get_lib_log_analyzer + + async def run() -> None: + coalescer = RequestCoalescer(compute_timeout=0.05) + + t0 = time.monotonic() + result = await coalescer.get_or_compute( + "/tmp/job.log", + lambda: runner._fetch_log_result_lib("/tmp/job.log"), + ) + elapsed = time.monotonic() - t0 + + assert started.is_set() + assert result["state"] == "timeout" + assert result["recommendation"]["action"] == "TIMEOUT" + assert elapsed < 0.3 + + assert await asyncio.to_thread(finished.wait, 1.0) + await asyncio.wait_for(runner._log_analysis_lock.acquire(), timeout=0.1) + runner._log_analysis_lock.release() + + asyncio.run(run()) + + +def test_log_sage_runner_logs_worker_exception_after_timeout(monkeypatch, caplog): + _import_log_analyzer_with_optional_dependency_stubs(monkeypatch) + module = importlib.import_module("nvidia_resiliency_ext.attribution.orchestration.log_analyzer") + runner = module.LogSageRunner(LogSageExecutionConfig(use_lib_log_analysis=True)) + + class FailingFakeLogAnalyzer: + async def run(self, kwargs: dict[str, Any]) -> list[Any]: + time.sleep(0.2) + raise RuntimeError("logsage worker failed") + + async def fake_get_lib_log_analyzer(kwargs: dict[str, Any]) -> FailingFakeLogAnalyzer: + return FailingFakeLogAnalyzer() + + runner._get_lib_log_analyzer = fake_get_lib_log_analyzer + + async def run() -> None: + coalescer = RequestCoalescer(compute_timeout=0.05) + result = await coalescer.get_or_compute( + "/tmp/job.log", + lambda: runner._fetch_log_result_lib("/tmp/job.log"), + ) + + assert result["state"] == "timeout" + + deadline = time.monotonic() + 1.0 + while runner._lib_log_analysis_tasks and time.monotonic() < deadline: + await asyncio.sleep(0.01) + assert not runner._lib_log_analysis_tasks + + with caplog.at_level(logging.WARNING, logger=module.logger.name): + asyncio.run(run()) + + assert "Lib LogSage analysis worker failed after caller cancellation or timeout" in caplog.text + assert "logsage worker failed" in caplog.text + + def test_log_fr_analyzer_mcp_uses_top_level_log_contract(monkeypatch): _import_log_analyzer_with_optional_dependency_stubs(monkeypatch) module = importlib.import_module("nvidia_resiliency_ext.attribution.orchestration.log_analyzer") diff --git a/tests/fault_tolerance/unit/test_attribution_manager.py b/tests/fault_tolerance/unit/test_attribution_manager.py index e81bdf4f..500c5cca 100644 --- a/tests/fault_tolerance/unit/test_attribution_manager.py +++ b/tests/fault_tolerance/unit/test_attribution_manager.py @@ -181,7 +181,7 @@ def test_attribution_config_maps_launcher_args(tmp_path): ft_attribution_llm_api_key_file=str(api_key_file), ft_attribution_llm_base_url="https://llm.example/v1", ft_attribution_llm_model="model-a", - ft_attribution_analysis_backend="lib", + ft_attribution_analysis_backend="mcp", ft_attribution_decision_timeout=12.5, ft_attribution_log_level="DEBUG", ft_attribution_export_url=( @@ -198,7 +198,7 @@ def test_attribution_config_maps_launcher_args(tmp_path): assert env["NVRX_ATTRSVC_ALLOWED_ROOT"] == str(applog_dir) assert env["NVRX_ATTRSVC_LLM_BASE_URL"] == "https://llm.example/v1" assert env["NVRX_ATTRSVC_LLM_MODEL"] == "model-a" - assert env["NVRX_ATTRSVC_ANALYSIS_BACKEND"] == "lib" + assert env["NVRX_ATTRSVC_ANALYSIS_BACKEND"] == "mcp" assert cfg.client_endpoint.decision_timeout == 12.5 assert env["NVRX_ATTRSVC_LOG_LEVEL"] == "DEBUG" assert ( @@ -207,6 +207,18 @@ def test_attribution_config_maps_launcher_args(tmp_path): ) +def test_attribution_config_rejects_lib_analysis_backend(tmp_path): + with pytest.raises(ValueError, match="only 'mcp'"): + AttributionConfig.from_args( + _args( + ft_attribution_endpoint="localhost", + ft_attribution_analysis_backend="lib", + ), + str(tmp_path / "train.log"), + FaultToleranceConfig(), + ) + + def test_yaml_attribution_decision_timeout_is_used(tmp_path): cfg = AttributionConfig.from_args( _args(ft_attribution_endpoint="localhost"),