Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/source/fault_tolerance/usage_guide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ service dependencies.
- ``--ft-attribution-llm-api-key-file <PATH>`` (alias: ``--ft_attribution_llm_api_key_file``)
- ``--ft-attribution-llm-base-url <URL>`` (alias: ``--ft_attribution_llm_base_url``)
- ``--ft-attribution-llm-model <MODEL>`` (alias: ``--ft_attribution_llm_model``)
- ``--ft-attribution-analysis-backend mcp`` (alias: ``--ft_attribution_analysis_backend``)
- ``--ft-attribution-startup-timeout <SECONDS>`` (alias: ``--ft_attribution_startup_timeout``), default ``20``
- ``--ft-attribution-decision-timeout <SECONDS>`` (alias: ``--ft_attribution_decision_timeout``), default ``60``
- ``--ft-attribution-export-url <URL>`` (alias: ``--ft_attribution_export_url``)
Expand All @@ -245,6 +246,10 @@ service dependencies.
To export managed attribution results, pass ``--ft-attribution-export-url`` or
set ``attribution_export_url`` in the fault tolerance YAML config.

Launcher-managed attribution supports only the ``mcp`` analysis backend. The backend flag may be
left unset, which uses the service default, or set explicitly to ``mcp``. The in-process ``lib``
backend is available only when running ``nvrx-attrsvc`` as a standalone service.

``--ft-attribution-decision-timeout`` is the total launcher-side budget for one
attribution decision, measured from the terminal analysis request until the rendezvous
host fetches the STOP/RESTART recommendation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def __init__(self, config: LogSageExecutionConfig):
# Set on first failed NVRxLogAnalyzer() so we do not retry init every request.
self._lib_log_analyzer_init_error: Optional[BaseException] = None
self._log_analysis_lock = asyncio.Lock()
self._lib_log_analysis_tasks: set[asyncio.Task[Any]] = set()
self._mcp_client: Any = None
if not config.use_lib_log_analysis:
try:
Expand Down Expand Up @@ -122,6 +123,59 @@ async def reconnect_mcp(self) -> bool:
return True
return await self._mcp_client.reconnect()

@staticmethod
def _run_lib_log_analyzer_sync(analyzer: Any, run_kwargs: Dict[str, Any]) -> Any:
"""Run the in-process LogSage coroutine on this worker thread's event loop."""
return asyncio.run(analyzer.run(dict(run_kwargs)))

async def _run_lib_log_analyzer_serialized(
self, analyzer: Any, run_kwargs: Dict[str, Any]
) -> Any:
"""Run in-process LogSage without blocking the caller's event loop.

In lib mode, ``NVRxLogAnalyzer.run`` is async-shaped but can spend most
of its wall time inside synchronous LogSage/LLM work. If that runs on
the attrsvc/coalescer event loop, ``asyncio.wait_for`` cannot fire until
the work has already completed. Offloading the full lib run gives the
coalescer an actual await point to time out on.

The serialization lock is released when the worker task finishes, not
when the waiting request is cancelled, so a timed-out lib analysis does
not overlap a retry on the same singleton analyzer.
"""
await self._log_analysis_lock.acquire()
try:
worker_task = asyncio.create_task(
asyncio.to_thread(self._run_lib_log_analyzer_sync, analyzer, run_kwargs)
)
except BaseException:
self._log_analysis_lock.release()
raise

self._lib_log_analysis_tasks.add(worker_task)
waiter_cancelled = False

def _finish_lib_run(task: "asyncio.Task[Any]") -> None:
self._lib_log_analysis_tasks.discard(task)
if self._log_analysis_lock.locked():
self._log_analysis_lock.release()
try:
exc = task.exception()
except asyncio.CancelledError:
return
if exc is not None and waiter_cancelled:
logger.warning(
"Lib LogSage analysis worker failed after caller cancellation or timeout",
exc_info=(type(exc), exc, exc.__traceback__),
)

worker_task.add_done_callback(_finish_lib_run)
try:
return await asyncio.shield(worker_task)
except asyncio.CancelledError:
waiter_cancelled = True
raise

async def _get_lib_log_analyzer(self, run_kwargs: Dict[str, Any]) -> Any:
"""Return the cached in-process :class:`NVRxLogAnalyzer`, creating it on first success.

Expand Down Expand Up @@ -163,8 +217,7 @@ async def _fetch_log_result_lib(self, path: str) -> Dict[str, Any]:
**self.config.llm_runtime_overrides(),
}
analyzer = await self._get_lib_log_analyzer(run_kwargs)
async with self._log_analysis_lock:
result = await analyzer.run(run_kwargs)
result = await self._run_lib_log_analyzer_serialized(analyzer, run_kwargs)

return nvrx_run_result_to_log_dict(result, path)

Expand Down
21 changes: 21 additions & 0 deletions src/nvidia_resiliency_ext/fault_tolerance/attribution_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ class AttributionConfig:
log_level: Optional[str] = None
export_url: Optional[str] = None

def __post_init__(self) -> None:
object.__setattr__(
self,
"analysis_backend",
_normalize_analysis_backend(self.analysis_backend),
)

@property
def is_enabled(self) -> bool:
return self.endpoint is not None
Expand Down Expand Up @@ -415,6 +422,20 @@ def _resolve_decision_timeout(args: Any, ft_cfg: FaultToleranceConfig) -> Option
return value


def _normalize_analysis_backend(value: Optional[str]) -> Optional[str]:
if value is None:
return None
normalized = str(value).strip().lower()
if not normalized:
return None
if normalized != "mcp":
raise ValueError(
"--ft-attribution-analysis-backend supports only 'mcp'; "
f"'lib' is no longer supported by launcher-managed attrsvc, got {value!r}"
)
return normalized


def _validate_export_url(url: str) -> None:
parsed = urlparse(url)
if parsed.scheme not in _EXPORT_URL_SCHEMES or not parsed.netloc:
Expand Down
10 changes: 7 additions & 3 deletions src/nvidia_resiliency_ext/fault_tolerance/cli_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,11 +384,15 @@ def _add_attribution_args(parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"--ft-attribution-analysis-backend",
"--ft_attribution_analysis_backend",
type=str,
type=str.lower,
default=None,
dest="ft_attribution_analysis_backend",
choices=("mcp", "lib"),
help="Analysis backend for launcher-managed attribution service: mcp or lib.",
choices=("mcp",),
metavar="mcp",
help=(
"Analysis backend for launcher-managed attribution service. "
"Only mcp is supported; use standalone nvrx-attrsvc for lib backend experiments."
),
)
parser.add_argument(
"--ft-attribution-decision-timeout",
Expand Down
132 changes: 132 additions & 0 deletions tests/attribution/unit/test_log_analyzer_observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@

import asyncio
import importlib
import logging
import sys
import threading
import time
import types
from typing import Any, Dict

from nvidia_resiliency_ext.attribution.coalescing.coalescer import RequestCoalescer
from nvidia_resiliency_ext.attribution.orchestration.analysis_pipeline import AnalysisPipelineMode
from nvidia_resiliency_ext.attribution.orchestration.config import LogSageExecutionConfig

Expand All @@ -34,19 +36,72 @@ def _import_log_analyzer_with_optional_dependency_stubs(monkeypatch):
langchain_openai = _stub_module(monkeypatch, "langchain_openai")
langchain_openai.ChatOpenAI = object

output_parsers = _stub_module(monkeypatch, "langchain_core.output_parsers")
output_parsers.StrOutputParser = object
prompts = _stub_module(monkeypatch, "langchain_core.prompts")
prompts.ChatPromptTemplate = types.SimpleNamespace(
from_template=lambda *_args, **_kwargs: object()
)
runnables = _stub_module(monkeypatch, "langchain_core.runnables")
runnables.RunnablePassthrough = object

_stub_module(monkeypatch, "logsage")
_stub_module(monkeypatch, "logsage.auto_resume_policy")
attribution_classes = _stub_module(
monkeypatch, "logsage.auto_resume_policy.attribution_classes"
)

class StubErrorAttribution:
def __init__(self, **kwargs):
self.__dict__.update(kwargs)

stub_attribution = types.SimpleNamespace(
APPLICATION_DONE="APPLICATION_DONE",
ERRORS_NOT_FOUND="ERRORS_NOT_FOUND",
LLM_FAILURE="LLM_FAILURE",
SLURM_STEP_CANCELLED="SLURM_STEP_CANCELLED",
SLURM_STEP_CANCELLED_JOB_REQUEUE="SLURM_STEP_CANCELLED_JOB_REQUEUE",
)
stub_auto_resume = types.SimpleNamespace(
ERRORS_NOT_FOUND="ERRORS_NOT_FOUND",
LLM_FAILURE="LLM_FAILURE",
RESTART_IMMEDIATE="RESTART IMMEDIATE",
STOP_NO_RESTART="STOP - DONT RESTART IMMEDIATE",
)
stub_finished = types.SimpleNamespace(
APPLICATION_DONE="APPLICATION_DONE",
LLM_FAILURE="LLM_FAILURE",
SLURM_CANCELLED="SLURM_CANCELLED",
SLURM_CANCELLED_JOB_REQUEUE="SLURM_CANCELLED_JOB_REQUEUE",
SLURM_CANCELLED_TIME_LIMIT="SLURM_CANCELLED_TIME_LIMIT",
)
attribution_classes.ApplicationData = object
attribution_classes.Attribution = stub_attribution
attribution_classes.AutoResumeAction = stub_auto_resume
attribution_classes.ErrorAttribution = StubErrorAttribution
attribution_classes.FinishedStatus = stub_finished
attribution_classes.LRUCache = object

error_attribution = _stub_module(monkeypatch, "logsage.auto_resume_policy.error_attribution")
error_attribution.CONTEXT_SIZE = 4096
error_attribution.get_attribution = lambda *args, **kwargs: (None, None, None, None)
error_attribution.get_auto_resume = lambda *args, **kwargs: ("", "")
error_attribution.get_proposed_solution_cat = lambda *args, **kwargs: None

error_extraction = _stub_module(monkeypatch, "logsage.auto_resume_policy.error_extraction")
error_extraction.finished_validation = lambda _llm, data: data
error_extraction.return_application_errors = lambda *args, **kwargs: []
error_extraction.return_application_errors_rt = lambda *args, **kwargs: types.SimpleNamespace(
checkpoint_saved=False
)
prompts_mod = _stub_module(monkeypatch, "logsage.auto_resume_policy.prompts")
prompts_mod.template_post_error_check = ""
util_postprocessing = _stub_module(
monkeypatch, "logsage.auto_resume_policy.util_postprocessing"
)
util_postprocessing.get_auto_resume_postprocessing = lambda *args, **kwargs: False
utils = _stub_module(monkeypatch, "logsage.auto_resume_policy.utils")
utils.chunk_indices = lambda *args, **kwargs: []

httpx = _stub_module(monkeypatch, "httpx")
httpx.post = lambda *args, **kwargs: types.SimpleNamespace(status_code=201, text="created")
Expand Down Expand Up @@ -213,6 +268,83 @@ async def run() -> None:
assert kwargs["max_tokens"] == 0


def test_log_sage_runner_lib_run_allows_coalescer_timeout(monkeypatch):
_import_log_analyzer_with_optional_dependency_stubs(monkeypatch)
module = importlib.import_module("nvidia_resiliency_ext.attribution.orchestration.log_analyzer")
runner = module.LogSageRunner(LogSageExecutionConfig(use_lib_log_analysis=True))
started = threading.Event()
finished = threading.Event()

class BlockingFakeLogAnalyzer:
async def run(self, kwargs: dict[str, Any]) -> list[Any]:
started.set()
time.sleep(0.5)
finished.set()
return []

async def fake_get_lib_log_analyzer(kwargs: dict[str, Any]) -> BlockingFakeLogAnalyzer:
return BlockingFakeLogAnalyzer()

runner._get_lib_log_analyzer = fake_get_lib_log_analyzer

async def run() -> None:
coalescer = RequestCoalescer(compute_timeout=0.05)

t0 = time.monotonic()
result = await coalescer.get_or_compute(
"/tmp/job.log",
lambda: runner._fetch_log_result_lib("/tmp/job.log"),
)
elapsed = time.monotonic() - t0

assert started.is_set()
assert result["state"] == "timeout"
assert result["recommendation"]["action"] == "TIMEOUT"
assert elapsed < 0.3

assert await asyncio.to_thread(finished.wait, 1.0)
await asyncio.wait_for(runner._log_analysis_lock.acquire(), timeout=0.1)
runner._log_analysis_lock.release()

asyncio.run(run())


def test_log_sage_runner_logs_worker_exception_after_timeout(monkeypatch, caplog):
_import_log_analyzer_with_optional_dependency_stubs(monkeypatch)
module = importlib.import_module("nvidia_resiliency_ext.attribution.orchestration.log_analyzer")
runner = module.LogSageRunner(LogSageExecutionConfig(use_lib_log_analysis=True))

class FailingFakeLogAnalyzer:
async def run(self, kwargs: dict[str, Any]) -> list[Any]:
time.sleep(0.2)
raise RuntimeError("logsage worker failed")

async def fake_get_lib_log_analyzer(kwargs: dict[str, Any]) -> FailingFakeLogAnalyzer:
return FailingFakeLogAnalyzer()

runner._get_lib_log_analyzer = fake_get_lib_log_analyzer

async def run() -> None:
coalescer = RequestCoalescer(compute_timeout=0.05)
result = await coalescer.get_or_compute(
"/tmp/job.log",
lambda: runner._fetch_log_result_lib("/tmp/job.log"),
)

assert result["state"] == "timeout"

deadline = time.monotonic() + 1.0
while runner._lib_log_analysis_tasks and time.monotonic() < deadline:
await asyncio.sleep(0.01)
assert not runner._lib_log_analysis_tasks

with caplog.at_level(logging.WARNING, logger=module.logger.name):
asyncio.run(run())

assert "Lib LogSage analysis worker failed after caller cancellation or timeout" in caplog.text
assert "logsage worker failed" in caplog.text


def test_log_fr_analyzer_mcp_uses_top_level_log_contract(monkeypatch):
_import_log_analyzer_with_optional_dependency_stubs(monkeypatch)
module = importlib.import_module("nvidia_resiliency_ext.attribution.orchestration.log_analyzer")
Expand Down
16 changes: 14 additions & 2 deletions tests/fault_tolerance/unit/test_attribution_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def test_attribution_config_maps_launcher_args(tmp_path):
ft_attribution_llm_api_key_file=str(api_key_file),
ft_attribution_llm_base_url="https://llm.example/v1",
ft_attribution_llm_model="model-a",
ft_attribution_analysis_backend="lib",
ft_attribution_analysis_backend="mcp",
ft_attribution_decision_timeout=12.5,
ft_attribution_log_level="DEBUG",
ft_attribution_export_url=(
Expand All @@ -198,7 +198,7 @@ def test_attribution_config_maps_launcher_args(tmp_path):
assert env["NVRX_ATTRSVC_ALLOWED_ROOT"] == str(applog_dir)
assert env["NVRX_ATTRSVC_LLM_BASE_URL"] == "https://llm.example/v1"
assert env["NVRX_ATTRSVC_LLM_MODEL"] == "model-a"
assert env["NVRX_ATTRSVC_ANALYSIS_BACKEND"] == "lib"
assert env["NVRX_ATTRSVC_ANALYSIS_BACKEND"] == "mcp"
assert cfg.client_endpoint.decision_timeout == 12.5
assert env["NVRX_ATTRSVC_LOG_LEVEL"] == "DEBUG"
assert (
Expand All @@ -207,6 +207,18 @@ def test_attribution_config_maps_launcher_args(tmp_path):
)


def test_attribution_config_rejects_lib_analysis_backend(tmp_path):
with pytest.raises(ValueError, match="only 'mcp'"):
AttributionConfig.from_args(
_args(
ft_attribution_endpoint="localhost",
ft_attribution_analysis_backend="lib",
),
str(tmp_path / "train.log"),
FaultToleranceConfig(),
)


def test_yaml_attribution_decision_timeout_is_used(tmp_path):
cfg = AttributionConfig.from_args(
_args(ft_attribution_endpoint="localhost"),
Expand Down
Loading