Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ build-backend = "poetry_dynamic_versioning.backend"
[tool.poetry.dependencies]
torch = ">=2.3.0"
packaging = "*"
python = ">=3.10"
python = ">=3.10,<4.0"
psutil = ">=6.0.0"
pyyaml = "*"
numpy = ">=1.26"
Expand All @@ -52,7 +52,7 @@ langchain-core = { version = ">=0.3.51,<0.4.0", optional = true }
langchain-openai = { version = ">=0.3.0,<1.0.0", optional = true }
mcp = { version = ">=1.15.0", optional = true }
setproctitle = { version = ">=1.3.0", optional = true }
logsage = { version = ">=0.1.7", optional = true }
logsage = { version = ">=0.1.8", optional = true }
fastapi = { version = ">=0.100.0", optional = true }
uvicorn = { version = ">=0.20.0", extras = ["standard"], optional = true }
pydantic = { version = ">=2.0.0", optional = true }
Expand Down
98 changes: 85 additions & 13 deletions src/nvidia_resiliency_ext/attribution/analyzer/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ async def submit(
self._schedule_progressive_analysis(result.normalized_path, user, job_id)
elif intent == ANALYSIS_INTENT_TERMINAL:
self._schedule_terminal_analysis(result.normalized_path)
elif intent == ANALYSIS_INTENT_TRACK_ONLY and self._log.is_streaming_logs:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that the changes to this file is needed.

ANALYSIS_INTENT_PROGRESSIVE : this is the start of a cycle
ANALYSIS_INTENT_TERMINAL : this is the end of a cycle

submit(..., analysis_intent="progressive")
-> self._log.submit(...) # tracks job/submission
-> _schedule_progressive_analysis(...)
-> _start_progressive_analysis(...)
-> self._log.start_progressive_analysis(...)
-> LogAnalyzerRunner.start_progressive_analysis(...)
-> MCP/lib progressive-start adapter

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See:
async def run(self, _arguments: Mapping[str, Any]) -> dict[str, str | None]:
"""Return status metadata without running terminal attribution."""
payload = ProgressiveStartResult(
status=PROGRESSIVE_STATUS_UNSUPPORTED,
message="LogSage progressive start API is not configured",
).as_payload()
payload["module"] = MODULE_LOG_ANALYZER_PROGRESSIVE_START
return payload

This is where is invoked after the MCP, currently it is stubbed (returns PROGRESSIVE_STATUS_UNSUPPORTED); and need to be hooked up with logsage start handler

self._schedule_start_analysis(result.normalized_path)

return result

Expand Down Expand Up @@ -375,7 +377,7 @@ async def _start_progressive_analysis(

async def _run_terminal_analysis(self, normalized_path: str) -> None:
"""Run the normal result-gathering path after an explicit terminal signal."""
result = await self.analyze(normalized_path)
result = await self.analyze(normalized_path, job_stage="end")
if isinstance(result, LogAnalyzerError):
logger.warning(
"Terminal analysis failed for %s: %s: %s",
Expand All @@ -388,48 +390,108 @@ async def _run_terminal_analysis(self, normalized_path: str) -> None:

def _schedule_terminal_analysis(self, normalized_path: str) -> None:
"""Start final analysis without delaying POST /logs terminal responses."""
self._schedule_stage_analysis(
normalized_path,
self._run_terminal_analysis,
stage_label="terminal",
)

async def _run_start_analysis(self, normalized_path: str) -> None:
"""Kick off streaming LogSage at job start.

Bypasses the coalescer: ``analyze_logs_rt_start`` is a long-running reader
that accumulates per-path state but produces no cacheable attribution
payload; coalescing here would deadlock the slot until the streamer exits,
preventing the eventual terminal analysis from running.
"""
try:
await self._log.fetch_log_result(normalized_path, job_stage="start")
except Exception as e:
logger.warning(
"Start analysis failed for %s: %s: %s",
normalized_path,
type(e).__name__,
e,
)
else:
logger.debug("Start analysis completed for %s", normalized_path)

def _schedule_start_analysis(self, normalized_path: str) -> None:
"""Kick off streaming LogSage start without delaying POST /logs responses."""
self._schedule_stage_analysis(
normalized_path,
self._run_start_analysis,
stage_label="start",
)

def _schedule_stage_analysis(
self,
normalized_path: str,
coro_factory: Any,
*,
stage_label: str,
) -> None:
"""Schedule a fire-and-forget per-stage analysis task on the running loop."""
try:
loop = asyncio.get_running_loop()
except RuntimeError:
if self._main_loop is None or self._main_loop.is_closed():
logger.error(
"Event loop not set — skipping terminal analysis start for %s",
"Event loop not set — skipping %s analysis start for %s",
stage_label,
normalized_path,
)
return
future = asyncio.run_coroutine_threadsafe(
self._run_terminal_analysis(normalized_path),
coro_factory(normalized_path),
self._main_loop,
)
future.add_done_callback(
lambda done: self._log_terminal_analysis_failure(normalized_path, done)
lambda done: self._log_stage_analysis_failure(
normalized_path, done, stage_label=stage_label
)
)
else:
task = loop.create_task(self._run_terminal_analysis(normalized_path))
task = loop.create_task(coro_factory(normalized_path))
task.add_done_callback(
lambda done: self._log_terminal_analysis_failure(normalized_path, done)
lambda done: self._log_stage_analysis_failure(
normalized_path, done, stage_label=stage_label
)
)
logger.debug("Scheduled terminal analysis start for %s", normalized_path)
logger.debug("Scheduled %s analysis start for %s", stage_label, normalized_path)

@staticmethod
def _log_terminal_analysis_failure(normalized_path: str, done: Any) -> None:
def _log_stage_analysis_failure(
normalized_path: str, done: Any, *, stage_label: str
) -> None:
try:
done.result()
except (asyncio.CancelledError, concurrent.futures.CancelledError):
logger.debug("Terminal analysis task cancelled for %s", normalized_path)
logger.debug(
"%s analysis task cancelled for %s", stage_label.capitalize(), normalized_path
)
except Exception as e:
logger.warning(
"Terminal analysis task failed for %s: %s: %s",
"%s analysis task failed for %s: %s: %s",
stage_label.capitalize(),
normalized_path,
type(e).__name__,
e,
)

@staticmethod
def _log_terminal_analysis_failure(normalized_path: str, done: Any) -> None:
Analyzer._log_stage_analysis_failure(
normalized_path, done, stage_label="terminal"
)

async def analyze(
self,
log_path: str,
file: Optional[str] = None,
wl_restart: Optional[int] = None,
*,
job_stage: Optional[str] = None,
) -> LogAnalyzerOutcome:
"""
Analyze a log file using LLM.
Expand Down Expand Up @@ -492,7 +554,10 @@ async def analyze(
f"job.job_id={getattr(job, 'job_id', 'N/A') if job else 'N/A'}"
)
coalesced_raw = await self._coalescer.get_or_compute(
validated, lambda: self._run_llm_analysis(validated, user=user, job_id=job_id)
validated,
lambda: self._run_llm_analysis(
validated, user=user, job_id=job_id, job_stage=job_stage
),
)
bundle = coalesced_from_cache(coalesced_raw)
fr_dump = bundle.fr_dump_path
Expand Down Expand Up @@ -591,10 +656,17 @@ def read_file_preview(
# ─── Internal methods ───

async def _run_llm_analysis(
self, path: str, user: str = "unknown", job_id: Optional[str] = None
self,
path: str,
user: str = "unknown",
job_id: Optional[str] = None,
*,
job_stage: Optional[str] = None,
) -> LogAnalysisCoalesced:
"""On cache miss: delegate to :meth:`LogAnalyzer.run_attribution_for_path`."""
return await self._log.run_attribution_for_path(path, user=user, job_id=job_id)
return await self._log.run_attribution_for_path(
path, user=user, job_id=job_id, job_stage=job_stage
)

async def _analyze_splitlog_mode(
self,
Expand Down
Loading
Loading