From 24ed897ce9477085989737e3df308d0f5d972c6b Mon Sep 17 00:00:00 2001 From: Haim Elisha Date: Thu, 21 May 2026 15:42:11 +0300 Subject: [PATCH 1/6] attribution_inline_processing_stop_repeated_failures --- .../unit/run_dispatch_test_parallel.py | 168 +++++++++ .../run_log_analyzer_dispatch_four_cycles.py | 338 ++++++++++++++++++ .../run_log_writer_dispatch_four_cycles.py | 174 +++++++++ 3 files changed, 680 insertions(+) create mode 100644 tests/attribution/unit/run_dispatch_test_parallel.py create mode 100644 tests/attribution/unit/run_log_analyzer_dispatch_four_cycles.py create mode 100644 tests/attribution/unit/run_log_writer_dispatch_four_cycles.py diff --git a/tests/attribution/unit/run_dispatch_test_parallel.py b/tests/attribution/unit/run_dispatch_test_parallel.py new file mode 100644 index 00000000..8be607f6 --- /dev/null +++ b/tests/attribution/unit/run_dispatch_test_parallel.py @@ -0,0 +1,168 @@ +"""Run the writer + dispatch analyzer in parallel. + +Launches ``run_log_writer_dispatch_four_cycles.py`` and +``run_log_analyzer_dispatch_four_cycles.py`` as concurrent subprocesses, +sharing the same log directory. Both pace themselves so cycles align: + + - Writer: 5 chunks × ``--chunk-interval`` (default 60s) = 300s per cycle + - Analyzer: polls every ``--poll-interval`` (60s) for ``--cycle-duration`` + (300s) per cycle, then runs the end phase over + ``tail + --end-window-minutes`` of history (default 2 min) + +Each subprocess's stdout is forwarded line-by-line with a ``[writer]`` / +``[reader]`` prefix so the output is greppable. + +The driver exits with the worst of the two return codes. + +Usage: + # Realistic 20-min run: + python tests/attribution/unit/run_dispatch_test_parallel.py + # Fast iteration (5s polls → ~100 s total): + python tests/attribution/unit/run_dispatch_test_parallel.py \\ + --chunk-interval 5 --poll-interval 5 --cycle-duration 25 +""" + +import argparse +import os +import subprocess +import sys +import threading +import time + +_HERE = os.path.dirname(os.path.abspath(__file__)) +WRITER_SCRIPT = os.path.join(_HERE, "run_log_writer_dispatch_four_cycles.py") +ANALYZER_SCRIPT = os.path.join(_HERE, "run_log_analyzer_dispatch_four_cycles.py") + + +def _stream(proc: subprocess.Popen, label: str) -> None: + """Forward proc.stdout to our stdout, prefixed by ``label``.""" + assert proc.stdout is not None + for raw in proc.stdout: + line = raw.decode("utf-8", errors="replace").rstrip("\n") + # Each subprocess already prefixes its lines (``[writer] ...``, + # ``[dispatch] ...``) — add the outer label so it's obvious + # which subprocess emitted the line in interleaved output. + print(f"{label} {line}", flush=True) + + +def main() -> int: + parser = argparse.ArgumentParser() + parser.add_argument("--log-dir", default=_HERE) + parser.add_argument("--num-cycles", type=int, default=4) + parser.add_argument( + "--chunk-interval", type=float, default=60.0, + help="Seconds between writer chunks (default 60s = 1 min)", + ) + parser.add_argument( + "--poll-interval", type=float, default=60.0, + help="Seconds between analyzer polls (default 60s = 1 min)", + ) + parser.add_argument( + "--cycle-duration", type=float, default=300.0, + help="Analyzer's per-cycle deadline (default 300s = 5 min)", + ) + parser.add_argument( + "--end-window-minutes", type=float, default=2.0, + help="Minutes of history the end phase reuses (default 2 = " + "tail + last 2 minutes)", + ) + parser.add_argument( + "--checkpoint-cycles", default="last", + help="Which cycle indices include a 'Saved checkpoint' line. " + "Default 'last' = only the final cycle. The end phase's " + "checkpoint_saved circuit-breaker then prevents the " + "repeated-issue STOP from firing on that cycle.", + ) + parser.add_argument( + "--writer-head-start", type=float, default=0.5, + help="Seconds the writer leads the analyzer so the first poll " + "finds a non-empty file (default 0.5s)", + ) + args = parser.parse_args() + + os.makedirs(args.log_dir, exist_ok=True) + # Clean stale cycle files so the analyzer's first read sees fresh + # writer output rather than a prior run's content. + for cycle in range(args.num_cycles): + path = os.path.join(args.log_dir, f"nvrx_{cycle}.log") + if os.path.exists(path): + os.remove(path) + # Drop the pickled state file too if a previous run left one. + stale = os.path.join(args.log_dir, "nvrx_four_cycles.state.pkl") + if os.path.exists(stale): + os.remove(stale) + + common_env = os.environ.copy() + # Force unbuffered Python so the line streamer sees output promptly. + common_env["PYTHONUNBUFFERED"] = "1" + + writer_cmd = [ + sys.executable, WRITER_SCRIPT, + "--log-dir", args.log_dir, + "--num-cycles", str(args.num_cycles), + "--chunk-interval", str(args.chunk_interval), + "--checkpoint-cycles", args.checkpoint_cycles, + ] + analyzer_cmd = [ + sys.executable, ANALYZER_SCRIPT, + "--log-dir", args.log_dir, + "--num-cycles", str(args.num_cycles), + "--cycle-duration", str(args.cycle_duration), + "--poll-interval", str(args.poll_interval), + "--end-window-minutes", str(args.end_window_minutes), + ] + + print(f"[driver] writer: {' '.join(writer_cmd)}", flush=True) + print(f"[driver] analyzer: {' '.join(analyzer_cmd)}", flush=True) + print( + f"[driver] launching in parallel, writer head start " + f"{args.writer_head_start:.1f}s", + flush=True, + ) + + writer = subprocess.Popen( + writer_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + env=common_env, + ) + # Brief head start so the writer creates / writes to nvrx_0.log + # before the analyzer's first read. + if args.writer_head_start > 0: + time.sleep(args.writer_head_start) + analyzer = subprocess.Popen( + analyzer_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + env=common_env, + ) + + threads = [ + threading.Thread(target=_stream, args=(writer, "[W]"), daemon=True), + threading.Thread(target=_stream, args=(analyzer, "[R]"), daemon=True), + ] + for t in threads: + t.start() + + try: + writer_rc = writer.wait() + analyzer_rc = analyzer.wait() + except KeyboardInterrupt: + print("\n[driver] interrupted; terminating children", flush=True) + writer.terminate() + analyzer.terminate() + writer_rc = writer.wait() + analyzer_rc = analyzer.wait() + + for t in threads: + t.join(timeout=2.0) + + print( + f"\n[driver] writer exit={writer_rc}, analyzer exit={analyzer_rc}", + flush=True, + ) + return max(writer_rc, analyzer_rc) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/attribution/unit/run_log_analyzer_dispatch_four_cycles.py b/tests/attribution/unit/run_log_analyzer_dispatch_four_cycles.py new file mode 100644 index 00000000..2c8677eb --- /dev/null +++ b/tests/attribution/unit/run_log_analyzer_dispatch_four_cycles.py @@ -0,0 +1,338 @@ +"""Script 2/2: drive ``NVRxLogAnalyzer._analyze_logs_rt_dispatch`` over 4 cycles. + +Pairs with ``run_log_writer_dispatch_four_cycles.py``. A single analyzer +instance is reused so ``attribution_dict`` / ``cycle_counter_dict`` +persist across cycles. For each cycle ``i`` in ``[0..3]`` the script: + + 1. Sets ``cfg["job_stage"] = "start"`` and calls + ``_analyze_logs_rt_dispatch`` → routes to + ``analyze_logs_rt_start``. Polls the cycle's file every + ``--poll-interval`` seconds for ``--cycle-duration`` seconds total + (default 60s / 300s = poll 1×/min for 5 min), then returns + ``None``. + 2. Sets ``cfg["job_stage"] = "end"`` and calls it again → routes to + ``analyze_logs_rt_end``. The end phase rebuilds its LLM input as + ``tail + last --end-window-minutes of history`` (default 2 min), + re-runs extraction, and returns an ``ErrorAttribution``. + +With the writer producing identical errors across cycles and default +``repeated_amount=3``, the third identical attribution should override +``auto_resume`` to ``STOP - DONT RESTART IMMEDIATE``. + +Workarounds applied (each is a current source quirk in ``nvrx_logsage.py``): + + - ``__init__`` assigns ``self.stop_accumulating_count``, + ``self.chunks_per_time``, ``self.logs_minutes_before_job_end``, and + ``self.repeated_amount`` with trailing commas → 1-tuples. We + rewrite them to scalar ints after construction. + - ``analyze_logs_rt_start`` appends to ``self.job_inline_data_dict[path]`` + without initializing the list — we pre-seed it. + - ``analyze_logs_rt_end`` does ``cycle_counter_dict[ck] += 1`` without + a ``setdefault`` — we pre-seed the shared key to 0. + - The start branch's ``while True`` polling loop is bounded by + patching ``time.sleep`` with a deadline (matches the existing + ``run_log_analyzer_start_four_cycles.py`` pattern). When the + deadline elapses the sleep raises ``_Phase1Deadline``, the start + coroutine exits, and the script moves on to the end phase. + +Usage: + # Real timing (60s polls, 300s cycles = 20 min total): + python tests/attribution/unit/run_log_analyzer_dispatch_four_cycles.py + # Fast local iteration (5s polls, 25s cycles): + python tests/attribution/unit/run_log_analyzer_dispatch_four_cycles.py \\ + --poll-interval 5 --cycle-duration 25 +""" + +import argparse +import asyncio +import importlib.util +import os +import sys +import time +from unittest.mock import patch + +_SRC = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "..", "src")) +if os.path.isdir(_SRC) and _SRC not in sys.path: + sys.path.insert(0, _SRC) + +_HERE = os.path.dirname(os.path.abspath(__file__)) +DEFAULT_LOG_DIR = _HERE +NUM_CYCLES = 4 + + +def cycle_log_path(log_dir: str, cycle: int) -> str: + return os.path.join(log_dir, f"nvrx_{cycle}.log") + + +def _logsage_available() -> bool: + return importlib.util.find_spec("logsage") is not None + + +def _api_key_available() -> bool: + try: + from nvidia_resiliency_ext.attribution.api_keys import load_llm_api_key + except ImportError: + return False + return bool(load_llm_api_key()) + + +class _Phase1Deadline(Exception): + """Raised inside the patched sleep when the per-cycle budget expires.""" + + +def _make_deadline_sleep(start_duration_sec: float, poll_interval_sec: float): + real_sleep = time.sleep + deadline = time.monotonic() + start_duration_sec + + def deadline_sleep(*_args, **_kwargs): + remaining = deadline - time.monotonic() + if remaining <= 0: + raise _Phase1Deadline() + real_sleep(min(poll_interval_sec, remaining)) + + return deadline_sleep + + +def _override_scalar_config( + analyzer, + poll_interval_sec: float, + end_window_minutes: float, +) -> None: + """Undo the 1-tuple bug in ``NVRxLogAnalyzer.__init__`` and pin the + polling cadence + end-phase history window. + + ``end_window_minutes`` is how many minutes of history the end phase + glues to the tail before re-running extraction. With the writer's + 1-chunk-per-minute pacing and the OOM in the last chunk, a value of + 2 means the end phase always sees the OOM (last minute) plus the + minute before it. + """ + # poll_interval is the wall-clock sleep between reads. The source + # uses ``chunks_per_time * 60`` seconds, so chunks_per_time is in + # minutes. + analyzer.chunks_per_time = max(poll_interval_sec / 60.0, 0.0) + # Generous counter — the deadline patch is what bounds the loop. + analyzer.stop_accumulating_count = 1000 + # End-phase grabs the last + # ``int(logs_minutes_before_job_end / chunks_per_time)`` history + # entries and concatenates them with the freshly read tail. + analyzer.logs_minutes_before_job_end = end_window_minutes + analyzer.repeated_amount = 3 + + +def _dispatch(analyzer, path: str, job_stage: str, cycle: int): + """Configure cfg + minimal state, then call _analyze_logs_rt_dispatch.""" + analyzer._init_config["log_path"] = path + analyzer._init_config["job_stage"] = job_stage + analyzer._init_config["cycle_counter"] = cycle + analyzer._init_config["attribution"] = True + + from nvidia_resiliency_ext.attribution.log_analyzer import nvrx_logsage + analyzer.job_inline_data_dict.setdefault(path, []) + ck = nvrx_logsage._cycle_counter_key(path) + analyzer.cycle_counter_dict.setdefault(ck, 0) + + return asyncio.run(analyzer._analyze_logs_rt_dispatch()) + + +def _wait_for_file(path: str, timeout_sec: float = 30.0) -> bool: + """Block until ``path`` exists, up to ``timeout_sec``.""" + deadline = time.monotonic() + timeout_sec + while time.monotonic() < deadline: + if os.path.isfile(path): + return True + time.sleep(0.5) + return False + + +def run_dispatch_all_cycles( + log_dir: str, + num_cycles: int, + cycle_duration_sec: float, + poll_interval_sec: float, + end_window_minutes: float, +) -> None: + from nvidia_resiliency_ext.attribution.log_analyzer import nvrx_logsage + + first_path = cycle_log_path(log_dir, 0) + # Writer truncates files up front, so the file is expected to exist + # by the time we start dispatching cycle 0. Be tolerant of a small + # startup race when launched in parallel. + if not _wait_for_file(first_path, timeout_sec=30): + print( + f"[dispatch] {first_path} did not appear within 30s — start " + "the writer first or pass --log-dir.", + file=sys.stderr, + ) + sys.exit(2) + + analyzer = nvrx_logsage.NVRxLogAnalyzer( + { + "log_path": first_path, + "job_stage": "start", + "is_streaming_logs": True, + } + ) + _override_scalar_config(analyzer, poll_interval_sec, end_window_minutes) + + per_cycle_result: list[tuple[int, str, object]] = [] + + for cycle in range(num_cycles): + path = cycle_log_path(log_dir, cycle) + if not _wait_for_file(path, timeout_sec=poll_interval_sec): + print( + f"[dispatch] cycle {cycle}: {path} missing; skipping", + file=sys.stderr, + flush=True, + ) + continue + + print( + f"\n[dispatch] === cycle {cycle} " + f"({os.path.basename(path)}) ===", + flush=True, + ) + + # Phase 1 — start branch via dispatcher, bounded by deadline. + print( + f"[dispatch] cycle {cycle}: job_stage=start → " + f"analyze_logs_rt_start (deadline {cycle_duration_sec:.0f}s, " + f"poll {poll_interval_sec:.0f}s)", + flush=True, + ) + start_t0 = time.monotonic() + with patch.object( + nvrx_logsage.time, + "sleep", + _make_deadline_sleep(cycle_duration_sec, poll_interval_sec), + ): + try: + _dispatch(analyzer, path, "start", cycle) + except _Phase1Deadline: + print( + f"[dispatch] cycle {cycle}: start deadline reached", + flush=True, + ) + start_elapsed = time.monotonic() - start_t0 + + history_len = len(analyzer.job_inline_data_dict.get(path, [])) + print( + f"[dispatch] cycle {cycle}: start took {start_elapsed:.2f}s, " + f"job_inline_data_dict[{os.path.basename(path)}] len=" + f"{history_len}", + flush=True, + ) + + # Phase 2 — end branch via dispatcher. + print( + f"[dispatch] cycle {cycle}: job_stage=end → " + "analyze_logs_rt_end", + flush=True, + ) + end_t0 = time.monotonic() + end_result = _dispatch(analyzer, path, "end", cycle) + end_elapsed = time.monotonic() - end_t0 + print( + f"[dispatch] cycle {cycle}: end took {end_elapsed:.2f}s", + flush=True, + ) + # The end phase OR-reduces checkpoint_saved across all per-poll + # entries in job_inline_data_dict[path]; recompute the same way + # for visibility. + ckpt_saved_in_history = any( + getattr(item[2], "checkpoint_saved", False) + for item in analyzer.job_inline_data_dict.get(path, []) + ) + + if end_result is None: + print( + f"[dispatch] cycle {cycle}: end returned None " + f"(checkpoint_saved_in_history={ckpt_saved_in_history})", + file=sys.stderr, + flush=True, + ) + else: + auto_resume = getattr(end_result, "auto_resume", "?") + attribution = getattr(end_result, "attribution", "?") + verbose = getattr(end_result, "auto_resume_verbose", "") + ckpt_on_result = getattr(end_result, "checkpoint_saved", None) + print( + f"[dispatch] cycle {cycle}: end → " + f"auto_resume={auto_resume!r}, " + f"attribution={attribution!r}, " + f"verbose={verbose!r}, " + f"checkpoint_saved={ckpt_on_result!r} " + f"(history={ckpt_saved_in_history})", + flush=True, + ) + per_cycle_result.append( + (cycle, path, end_result, ckpt_saved_in_history, + start_elapsed, end_elapsed) + ) + + print("\n[dispatch] ====== summary ======", flush=True) + print("[dispatch] attribution_dict (per path):", flush=True) + for path, attribution in analyzer.attribution_dict.items(): + print(f" {os.path.basename(path)}: {attribution!r}", flush=True) + + print("[dispatch] cycle_counter_dict (per stripped key):", flush=True) + for key, counter in analyzer.cycle_counter_dict.items(): + print(f" {os.path.basename(key)}: {counter}", flush=True) + + print("[dispatch] per-cycle auto_resume:", flush=True) + for cycle, _path, result, ckpt, start_s, end_s in per_cycle_result: + ar = getattr(result, "auto_resume", None) if result else None + verb = getattr(result, "auto_resume_verbose", "") if result else "" + ckpt_tag = " [checkpoint_saved]" if ckpt else "" + print( + f" cycle {cycle}: {ar!r} ({verb!r}){ckpt_tag} " + f"[start={start_s:.2f}s, end={end_s:.2f}s]", + flush=True, + ) + + +def main() -> int: + parser = argparse.ArgumentParser() + parser.add_argument("--log-dir", default=DEFAULT_LOG_DIR) + parser.add_argument( + "--num-cycles", type=int, default=NUM_CYCLES, + help=f"Number of cycle files (default {NUM_CYCLES})", + ) + parser.add_argument( + "--cycle-duration", type=float, default=300.0, + help="Seconds the start phase polls per cycle before exiting " + "(default 300s = 5 min)", + ) + parser.add_argument( + "--poll-interval", type=float, default=60.0, + help="Seconds between polls inside the start phase (default 60s = 1 min)", + ) + parser.add_argument( + "--end-window-minutes", type=float, default=2.0, + help="Minutes of streaming history the end phase glues to the " + "freshly read tail before re-running extraction. With the " + "writer at 1 chunk/min, '2' = tail + last 2 minutes " + "(default 2)", + ) + args = parser.parse_args() + + if not _logsage_available(): + print("logsage package not installed; aborting.", file=sys.stderr) + return 2 + if not _api_key_available(): + print("LLM API key not configured; aborting.", file=sys.stderr) + return 2 + + run_dispatch_all_cycles( + args.log_dir, + args.num_cycles, + args.cycle_duration, + args.poll_interval, + args.end_window_minutes, + ) + print("[dispatch] done.", flush=True) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/attribution/unit/run_log_writer_dispatch_four_cycles.py b/tests/attribution/unit/run_log_writer_dispatch_four_cycles.py new file mode 100644 index 00000000..8aed36ef --- /dev/null +++ b/tests/attribution/unit/run_log_writer_dispatch_four_cycles.py @@ -0,0 +1,174 @@ +"""Script 1/2: write four 5-minute per-cycle log files for the dispatch test. + +Pairs with ``run_log_analyzer_dispatch_four_cycles.py``. For each cycle +``i`` in ``[0..3]``: + + - Truncates ``nvrx_{i}.log`` + - Appends 5 chunks at ``--chunk-interval`` second intervals + (default 60s → cycle is ``chunks_per_cycle * chunk_interval`` = + 300s = 5 minutes) + - Sleeps after the *last* chunk too, so the per-cycle window is + fully 5 minutes before the next cycle's file begins + +Total wall time for the default 4 × 5 = 20 minutes. All four files end +up with the same OOM error, which is what drives the repeated-issue +stop guard in the analyzer. + +Usage: + python tests/attribution/unit/run_log_writer_dispatch_four_cycles.py + # Shorter intervals for local iteration: + python tests/attribution/unit/run_log_writer_dispatch_four_cycles.py \\ + --chunk-interval 5 +""" + +import argparse +import datetime +import os +import sys +import time + +_HERE = os.path.dirname(os.path.abspath(__file__)) +NUM_CYCLES = 4 +CHUNKS_PER_CYCLE = 5 +CHECKPOINT_CHUNK_INDEX = 3 # minute 3 — the late-training chunk + +# 5 chunks per cycle, one per minute → 5-minute cycle window. The OOM +# lands in the last chunk so it falls inside the end phase's +# tail + 2-minute history slice. +CHUNK_TEMPLATES = [ + # minute 0 — boot / cycle marker + "[{ts}] starting distributed training on 8 GPUs\n" + "[{ts}] FT: initialized\n" + "[{ts}] Cycle: {cycle} begin\n", + # minute 1 — early training + "[{ts}] step 100 loss=0.512 lr=1e-4\n" + "[{ts}] step 200 loss=0.487 lr=1e-4\n", + # minute 2 — mid training + "[{ts}] step 300 loss=0.461 lr=1e-4\n" + "[{ts}] step 400 loss=0.439 lr=1e-4\n", + # minute 3 — late training (last clean window before failure). + # In "checkpoint cycles" we append CHECKPOINT_LINE here so the + # end phase observes checkpoint_saved=True alongside the OOM. + "[{ts}] step 500 loss=0.421 lr=1e-4\n" + "[{ts}] step 600 loss=0.408 lr=1e-4\n", + # minute 4 — failure (this chunk must land inside the end phase's + # tail + 2-minute window) + "[{ts}] ERROR: torch.cuda.OutOfMemoryError: CUDA out of memory.\n" + "Traceback (most recent call last):\n" + " File 'train.py', line 142, in train_step\n" + " loss = model(input_ids).loss\n" + "torch.cuda.OutOfMemoryError: CUDA out of memory. Tried to allocate 2.00 GiB.\n", +] + +# Sentinel line the LLM extraction layer recognises as a successful +# checkpoint save. Appended into minute 3 of any cycle listed in +# ``--checkpoint-cycles``. Wording is intentionally explicit so the +# extraction prompt has no ambiguity. +CHECKPOINT_LINE = ( + "[{ts}] [checkpointing.py:142] Saved global checkpoint at step 600 " + "to /ckpt/global_step_600.pt (rank 0)\n" +) + + +def _parse_checkpoint_cycles(value: str, num_cycles: int) -> set[int]: + """Resolve the ``--checkpoint-cycles`` CLI value to a set of indices.""" + v = (value or "").strip().lower() + if v in ("", "none"): + return set() + if v == "last": + return {num_cycles - 1} + if v == "all": + return set(range(num_cycles)) + out: set[int] = set() + for piece in value.split(","): + piece = piece.strip() + if not piece: + continue + out.add(int(piece)) + return out + + +def cycle_log_path(log_dir: str, cycle: int) -> str: + return os.path.join(log_dir, f"nvrx_{cycle}.log") + + +def main() -> int: + parser = argparse.ArgumentParser() + parser.add_argument("--log-dir", default=_HERE) + parser.add_argument( + "--num-cycles", type=int, default=NUM_CYCLES, + help=f"Number of cycle files (default {NUM_CYCLES})", + ) + parser.add_argument( + "--chunk-interval", type=float, default=60.0, + help="Seconds between chunks; also the gap after the last chunk " + "of a cycle, so each cycle is chunks_per_cycle * " + "chunk_interval long (default 60s).", + ) + parser.add_argument( + "--checkpoint-cycles", default="last", + help="Which cycle indices include a 'Saved checkpoint' line in " + "their minute-3 chunk. Accepts 'last' (default — only the " + "final cycle), 'all', 'none', or a comma-separated list " + "like '0,2'.", + ) + args = parser.parse_args() + + checkpoint_cycles = _parse_checkpoint_cycles( + args.checkpoint_cycles, args.num_cycles + ) + + os.makedirs(args.log_dir, exist_ok=True) + # Truncate up front so the analyzer's first read against any cycle + # file isn't tripped by stale content from a prior run. + paths = [cycle_log_path(args.log_dir, c) for c in range(args.num_cycles)] + for p in paths: + open(p, "w").close() + + chunks_per_cycle = len(CHUNK_TEMPLATES) + cycle_seconds = chunks_per_cycle * args.chunk_interval + ckpt_summary = ( + ",".join(str(c) for c in sorted(checkpoint_cycles)) + if checkpoint_cycles else "none" + ) + print( + f"[writer] {args.num_cycles} cycles × {chunks_per_cycle} chunks @ " + f"{args.chunk_interval:.0f}s = {cycle_seconds:.0f}s per cycle " + f"({args.num_cycles * cycle_seconds:.0f}s total); " + f"checkpoint cycles: {ckpt_summary}", + flush=True, + ) + + try: + for cycle, path in enumerate(paths): + print(f"[writer] cycle {cycle}: {path}", flush=True) + for chunk_idx, tmpl in enumerate(CHUNK_TEMPLATES): + ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + content = tmpl.format(ts=ts, cycle=cycle) + ckpt_emitted = False + if ( + chunk_idx == CHECKPOINT_CHUNK_INDEX + and cycle in checkpoint_cycles + ): + content += CHECKPOINT_LINE.format(ts=ts) + ckpt_emitted = True + with open(path, "a", encoding="utf-8") as f: + f.write(content) + suffix = " + checkpoint" if ckpt_emitted else "" + print( + f"[writer] cycle {cycle} chunk " + f"{chunk_idx + 1}/{chunks_per_cycle} @ {ts}{suffix}", + flush=True, + ) + # Sleep after every chunk including the last so the + # cycle window is fully `chunks_per_cycle * + # chunk_interval` seconds before the next file begins. + time.sleep(args.chunk_interval) + except KeyboardInterrupt: + print("\n[writer] interrupted; stopping.", flush=True) + print("[writer] done.", flush=True) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) From 53f1e70d6cae9b9a7ea8347242714eb3e66b05e4 Mon Sep 17 00:00:00 2001 From: Haim Elisha Date: Thu, 21 May 2026 15:43:51 +0300 Subject: [PATCH 2/6] attribution_inline_processing_stop_repeated_failures --- pyproject.toml | 4 +- .../attribution/analyzer/engine.py | 98 +++- .../attribution/log_analyzer/nvrx_logsage.py | 455 +++++++++++++++++- .../attribution/orchestration/config.py | 11 +- .../attribution/orchestration/log_analyzer.py | 60 ++- 5 files changed, 590 insertions(+), 38 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 0c5bc325..9b7a66ea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,7 +41,7 @@ build-backend = "poetry_dynamic_versioning.backend" [tool.poetry.dependencies] torch = ">=2.3.0" packaging = "*" -python = ">=3.10" +python = ">=3.10,<4.0" psutil = ">=6.0.0" pyyaml = "*" numpy = ">=1.26" @@ -52,7 +52,7 @@ langchain-core = { version = ">=0.3.51,<0.4.0", optional = true } langchain-openai = { version = ">=0.3.0,<1.0.0", optional = true } mcp = { version = ">=1.15.0", optional = true } setproctitle = { version = ">=1.3.0", optional = true } -logsage = { version = ">=0.1.7", optional = true } +logsage = { version = ">=0.1.8", optional = true } fastapi = { version = ">=0.100.0", optional = true } uvicorn = { version = ">=0.20.0", extras = ["standard"], optional = true } pydantic = { version = ">=2.0.0", optional = true } diff --git a/src/nvidia_resiliency_ext/attribution/analyzer/engine.py b/src/nvidia_resiliency_ext/attribution/analyzer/engine.py index af12dcbd..d8f72cd6 100644 --- a/src/nvidia_resiliency_ext/attribution/analyzer/engine.py +++ b/src/nvidia_resiliency_ext/attribution/analyzer/engine.py @@ -307,6 +307,8 @@ async def submit( self._schedule_progressive_analysis(result.normalized_path, user, job_id) elif intent == ANALYSIS_INTENT_TERMINAL: self._schedule_terminal_analysis(result.normalized_path) + elif intent == ANALYSIS_INTENT_TRACK_ONLY and self._log.is_streaming_logs: + self._schedule_start_analysis(result.normalized_path) return result @@ -375,7 +377,7 @@ async def _start_progressive_analysis( async def _run_terminal_analysis(self, normalized_path: str) -> None: """Run the normal result-gathering path after an explicit terminal signal.""" - result = await self.analyze(normalized_path) + result = await self.analyze(normalized_path, job_stage="end") if isinstance(result, LogAnalyzerError): logger.warning( "Terminal analysis failed for %s: %s: %s", @@ -388,48 +390,108 @@ async def _run_terminal_analysis(self, normalized_path: str) -> None: def _schedule_terminal_analysis(self, normalized_path: str) -> None: """Start final analysis without delaying POST /logs terminal responses.""" + self._schedule_stage_analysis( + normalized_path, + self._run_terminal_analysis, + stage_label="terminal", + ) + + async def _run_start_analysis(self, normalized_path: str) -> None: + """Kick off streaming LogSage at job start. + + Bypasses the coalescer: ``analyze_logs_rt_start`` is a long-running reader + that accumulates per-path state but produces no cacheable attribution + payload; coalescing here would deadlock the slot until the streamer exits, + preventing the eventual terminal analysis from running. + """ + try: + await self._log.fetch_log_result(normalized_path, job_stage="start") + except Exception as e: + logger.warning( + "Start analysis failed for %s: %s: %s", + normalized_path, + type(e).__name__, + e, + ) + else: + logger.debug("Start analysis completed for %s", normalized_path) + + def _schedule_start_analysis(self, normalized_path: str) -> None: + """Kick off streaming LogSage start without delaying POST /logs responses.""" + self._schedule_stage_analysis( + normalized_path, + self._run_start_analysis, + stage_label="start", + ) + + def _schedule_stage_analysis( + self, + normalized_path: str, + coro_factory: Any, + *, + stage_label: str, + ) -> None: + """Schedule a fire-and-forget per-stage analysis task on the running loop.""" try: loop = asyncio.get_running_loop() except RuntimeError: if self._main_loop is None or self._main_loop.is_closed(): logger.error( - "Event loop not set — skipping terminal analysis start for %s", + "Event loop not set — skipping %s analysis start for %s", + stage_label, normalized_path, ) return future = asyncio.run_coroutine_threadsafe( - self._run_terminal_analysis(normalized_path), + coro_factory(normalized_path), self._main_loop, ) future.add_done_callback( - lambda done: self._log_terminal_analysis_failure(normalized_path, done) + lambda done: self._log_stage_analysis_failure( + normalized_path, done, stage_label=stage_label + ) ) else: - task = loop.create_task(self._run_terminal_analysis(normalized_path)) + task = loop.create_task(coro_factory(normalized_path)) task.add_done_callback( - lambda done: self._log_terminal_analysis_failure(normalized_path, done) + lambda done: self._log_stage_analysis_failure( + normalized_path, done, stage_label=stage_label + ) ) - logger.debug("Scheduled terminal analysis start for %s", normalized_path) + logger.debug("Scheduled %s analysis start for %s", stage_label, normalized_path) @staticmethod - def _log_terminal_analysis_failure(normalized_path: str, done: Any) -> None: + def _log_stage_analysis_failure( + normalized_path: str, done: Any, *, stage_label: str + ) -> None: try: done.result() except (asyncio.CancelledError, concurrent.futures.CancelledError): - logger.debug("Terminal analysis task cancelled for %s", normalized_path) + logger.debug( + "%s analysis task cancelled for %s", stage_label.capitalize(), normalized_path + ) except Exception as e: logger.warning( - "Terminal analysis task failed for %s: %s: %s", + "%s analysis task failed for %s: %s: %s", + stage_label.capitalize(), normalized_path, type(e).__name__, e, ) + @staticmethod + def _log_terminal_analysis_failure(normalized_path: str, done: Any) -> None: + Analyzer._log_stage_analysis_failure( + normalized_path, done, stage_label="terminal" + ) + async def analyze( self, log_path: str, file: Optional[str] = None, wl_restart: Optional[int] = None, + *, + job_stage: Optional[str] = None, ) -> LogAnalyzerOutcome: """ Analyze a log file using LLM. @@ -492,7 +554,10 @@ async def analyze( f"job.job_id={getattr(job, 'job_id', 'N/A') if job else 'N/A'}" ) coalesced_raw = await self._coalescer.get_or_compute( - validated, lambda: self._run_llm_analysis(validated, user=user, job_id=job_id) + validated, + lambda: self._run_llm_analysis( + validated, user=user, job_id=job_id, job_stage=job_stage + ), ) bundle = coalesced_from_cache(coalesced_raw) fr_dump = bundle.fr_dump_path @@ -591,10 +656,17 @@ def read_file_preview( # ─── Internal methods ─── async def _run_llm_analysis( - self, path: str, user: str = "unknown", job_id: Optional[str] = None + self, + path: str, + user: str = "unknown", + job_id: Optional[str] = None, + *, + job_stage: Optional[str] = None, ) -> LogAnalysisCoalesced: """On cache miss: delegate to :meth:`LogAnalyzer.run_attribution_for_path`.""" - return await self._log.run_attribution_for_path(path, user=user, job_id=job_id) + return await self._log.run_attribution_for_path( + path, user=user, job_id=job_id, job_stage=job_stage + ) async def _analyze_splitlog_mode( self, diff --git a/src/nvidia_resiliency_ext/attribution/log_analyzer/nvrx_logsage.py b/src/nvidia_resiliency_ext/attribution/log_analyzer/nvrx_logsage.py index 99a896ab..46ac806e 100644 --- a/src/nvidia_resiliency_ext/attribution/log_analyzer/nvrx_logsage.py +++ b/src/nvidia_resiliency_ext/attribution/log_analyzer/nvrx_logsage.py @@ -7,10 +7,23 @@ import time from typing import Any, Dict, Mapping, Optional, Tuple, Union +from langchain_core.output_parsers import StrOutputParser +from langchain_core.prompts import ChatPromptTemplate +from langchain_core.runnables import RunnablePassthrough from langchain_openai import ChatOpenAI -from logsage.auto_resume_policy.attribution_classes import ApplicationData, LRUCache -from logsage.auto_resume_policy.error_attribution import get_proposed_solution_cat -from logsage.auto_resume_policy.error_extraction import return_application_errors +from logsage.auto_resume_policy.attribution_classes import * +from logsage.auto_resume_policy.error_attribution import ( + CONTEXT_SIZE, + get_proposed_solution_cat, + +) + +from logsage.auto_resume_policy.error_attribution import get_attribution, get_auto_resume, get_proposed_solution_policies +from logsage.auto_resume_policy.util_postprocessing import get_auto_resume_postprocessing +from logsage.auto_resume_policy.prompts import template_post_error_check +from logsage.auto_resume_policy.error_extraction import return_application_errors, \ + return_application_errors_rt, finished_validation +from logsage.auto_resume_policy.utils import chunk_indices from nvidia_resiliency_ext.attribution.base import ( AttributionState, @@ -65,6 +78,31 @@ MARKER_NEW_RUN_DIR_ADDED = "[sbatch_script]: New run dir added:" +def _previous_path(path: str) -> str | None: + """Return ``path`` with its last digit-run decremented by one. + + Returns None if the path has no digits or the last number is 0. + """ + match = re.search(r"(\d+)(?!.*\d)", path) + if not match: + return None + num = int(match.group(1)) + if num <= 0: + return None + start, end = match.span(1) + return path[:start] + str(num - 1) + path[end:] + + +def _cycle_counter_key(path: str) -> str: + """Strip a trailing ``_`` from the filename stem so per-cycle + paths (``nvrx_0.log``, ``nvrx_1.log``, ...) share one + ``cycle_counter_dict`` entry. + """ + stem, ext = os.path.splitext(path) + stem = re.sub(r"_\d+$", "", stem) + return stem + ext + + def _action_from_logsage_head(head: str) -> str: normalized = head.strip().upper() if STOP_NO_RESTART in normalized or normalized.startswith("STOP"): @@ -191,6 +229,97 @@ def _result_item_from_logsage_fields( ) +def attribution_from_finished_status( + app_data, + application_errors_list_unique, +) -> ErrorAttribution: + """Build ErrorAttribution when no application errors were found, + based solely on app_data.finished status. + """ + finished = app_data.finished + + if finished == FinishedStatus.LLM_FAILURE: + logger.info("LLM failure") + return ErrorAttribution( + application_errors_full=[], + application_errors_unique=application_errors_list_unique, + auto_resume=AutoResumeAction.LLM_FAILURE, + auto_resume_verbose=AutoResumeAction.LLM_FAILURE, + attribution=Attribution.LLM_FAILURE, + infra_category="", + temp_category="", + single_multiple="", + cor_category="", + ) + + if finished == FinishedStatus.SLURM_CANCELLED: + logger.info("Slurm cancelled") + return ErrorAttribution( + application_errors_full=[], + application_errors_unique=application_errors_list_unique, + auto_resume=AutoResumeAction.RESTART_IMMEDIATE, + auto_resume_verbose="", + attribution=Attribution.SLURM_STEP_CANCELLED, + infra_category="", + temp_category="", + single_multiple="", + cor_category="", + ) + + if finished == FinishedStatus.SLURM_CANCELLED_JOB_REQUEUE: + logger.info("Slurm cancelled due to job requeue") + return ErrorAttribution( + application_errors_full=[], + application_errors_unique=application_errors_list_unique, + auto_resume=AutoResumeAction.RESTART_IMMEDIATE, + auto_resume_verbose="", + attribution=Attribution.SLURM_STEP_CANCELLED_JOB_REQUEUE, + infra_category="", + temp_category="", + single_multiple="", + cor_category="", + ) + + if FinishedStatus.SLURM_CANCELLED_TIME_LIMIT in finished: + logger.info("Slurm cancelled due to time limit") + return ErrorAttribution( + application_errors_full=[], + application_errors_unique=application_errors_list_unique, + auto_resume=AutoResumeAction.STOP_NO_RESTART, + auto_resume_verbose="", + attribution=finished.replace("_", " "), + infra_category="", + temp_category="", + single_multiple="", + cor_category="", + ) + + if finished == FinishedStatus.APPLICATION_DONE: + logger.info(Attribution.APPLICATION_DONE) + return ErrorAttribution( + application_errors_full=[], + application_errors_unique=application_errors_list_unique, + auto_resume=AutoResumeAction.STOP_NO_RESTART, + auto_resume_verbose="", + attribution=Attribution.APPLICATION_DONE, + infra_category="", + temp_category="", + single_multiple="", + cor_category="", + ) + + return ErrorAttribution( + application_errors_full=[], + application_errors_unique=application_errors_list_unique, + auto_resume=AutoResumeAction.ERRORS_NOT_FOUND, + auto_resume_verbose=AutoResumeAction.ERRORS_NOT_FOUND, + attribution=Attribution.ERRORS_NOT_FOUND, + infra_category="", + temp_category="", + single_multiple="", + cor_category="", + ) + def lines_after(lines, needle): for i, line in enumerate(lines): if needle in line: @@ -310,6 +439,33 @@ def _retry_return_application_errors( return app_data +def _retry_return_application_errors_rt( + llm: ChatOpenAI, lines: list[str], cache_dict: LRUCache, temporal_cache: dict[str, str] +) -> ApplicationData: + retries, initial_backoff, max_backoff, jitter = _log_analysis_retry_config() + backoff = initial_backoff + last_status = None + + for attempt in range(1, retries + 1): + app_data = return_application_errors_rt(llm, lines, cache_dict, temporal_cache) + status_name = _finished_status_name(app_data.finished) + if status_name != FINISHED_STATUS_LLM_FAILURE: + return app_data + + last_status = status_name + if attempt == retries: + logger.error( + "Log-analysis extraction failed after %d attempts; last status: %s", + retries, + last_status, + ) + return app_data + + backoff = _sleep_with_backoff(attempt, retries, backoff, max_backoff, jitter) + + return app_data + + def _with_exponential_backoff(llm_call, checkpoint_saved: bool) -> tuple[str, str, str, str, str]: retries, initial_backoff, max_backoff, jitter = _log_analysis_retry_config() backoff = initial_backoff @@ -369,18 +525,301 @@ def __init__(self, args: Union[argparse.Namespace, Mapping[str, Any]]): api_key=self.api_key, **llm_kwargs, ) + self.temporal_cache_dict = {} + self.cycle_counter_dict = {} + self.job_inline_data_dict = {} + self.attribution_dict = {} self.exclude_nvrx_logs = bool(self._init_config.get("exclude_nvrx_logs", False)) self.is_per_cycle = bool(self._init_config.get("is_per_cycle", False)) - super().__init__( - preprocess_input=self.analyze_logs, - attribution=self.llm_analyze, - output_handler=self.print_output, - ) + self.is_streaming_logs = bool(self._init_config.get("is_streaming_logs", False)) + self.repeated_amount = int(self._init_config.get("repeated_amount", 3)), + self.stop_accumulating_count = int(self._init_config.get("stop_accumulating_count", 3)), + self.logs_minutes_before_job_end = int(self._init_config.get("logs_minutes_before_job_end", 20)), + self.chunks_per_time = int(self._init_config.get("chunks_per_time", 5)), + if self.is_streaming_logs: + super().__init__( + preprocess_input=self._analyze_logs_rt_dispatch, + attribution=self.llm_analyze_rt, + output_handler=self.print_output, + ) + else: + super().__init__( + preprocess_input=self.analyze_logs, + attribution=self.llm_analyze, + output_handler=self.print_output, + ) @property def init_config(self) -> Dict[str, Any]: return dict(self._init_config) + + async def _analyze_logs_rt_dispatch(self) -> ErrorAttribution | None: + cfg = effective_run_or_init_config(self._init_config) + if cfg.get("job_stage") == "end": + return await self.analyze_logs_rt_end() + return await self.analyze_logs_rt_start() + + + async def llm_analyze_rt(self, rt_result: ErrorAttribution | None) -> list[LogSageCycleFields]: + if rt_result is None: + return [] + return [( + str(rt_result.auto_resume), + str(rt_result.auto_resume_verbose), + f"Attribution: Primary issues: [{rt_result.attribution}], Secondary issues: []", + "", + str(getattr(rt_result, "checkpoint_saved", False)), + )] + + + async def analyze_logs_rt_start(self) -> list[ApplicationData]: + """ + Analyzes the logs and returns the application errors. + + Args: + input_data: The input data to analyze. + + Returns: + application_errors_list_full_purified: The application errors list full purified. + application_errors_list_full: The application errors list full. + application_errors_list_full_purified_with_rank: The application errors list full purified with rank. + application_errors_list_full_with_rank: The application errors list full with rank. + error_type: The error type. + error_type_with_rank: The error type with rank. + error_type_with_rank_and_rank: The error type with rank and rank. + error_type_with_rank_and_rank_and_rank: The error type with rank and rank and rank. + + """ + cfg = effective_run_or_init_config(self._init_config) + path = cfg["log_path"] + + llm = self.llm + cache_dict = self.lru_cache + + cycle_counter = int(cfg.get("cycle_counter", 0)) + cycle_counter_key = _cycle_counter_key(path) + if cycle_counter == 0: + self.cycle_counter_dict[cycle_counter_key] = cycle_counter + + if path not in self.temporal_cache_dict: + self.temporal_cache_dict[path] = {} + file_offset = 0 + log_lines: list[str] = [] + empty_logs_stop = self.stop_accumulating_count + + application_log, attribution_raw_chunk, attribution_dict_chunk, hw_category_chunk = None, None, None, None + + while True: + try: + with open(path, 'r', encoding='utf-8') as f: + f.seek(file_offset) + new_lines = f.readlines() + file_offset = f.tell() + except UnicodeDecodeError: + with open(path, 'r', encoding='latin-1') as f: + f.seek(file_offset) + new_lines = f.readlines() + file_offset = f.tell() + + if len(new_lines): + empty_logs_stop = self.stop_accumulating_count + else: + empty_logs_stop -= 1 + + if empty_logs_stop <= 0: + break + + log_lines.extend(new_lines) + attribution_list = [] + + chunk_data = _retry_return_application_errors_rt(llm, new_lines, cache_dict, self.temporal_cache_dict[path]) + app_data = chunk_data + if chunk_data.application_errors_list_full: + application_log, attribution_raw_chunk, attribution_dict_chunk, hw_category_chunk = get_attribution( + llm, app_data, True) + attribution_list.append(attribution_raw_chunk) + + self.job_inline_data_dict[path].append((file_offset, new_lines, chunk_data, application_log, attribution_raw_chunk, attribution_dict_chunk, hw_category_chunk)) + + time.sleep(self.chunks_per_time*60) + + return None + + + async def analyze_logs_rt_end(self) -> list[ApplicationData]: + """ + Analyzes the logs and returns the application errors. + + Args: + input_data: The input data to analyze. + + Returns: + application_errors_list_full_purified: The application errors list full purified. + application_errors_list_full: The application errors list full. + application_errors_list_full_purified_with_rank: The application errors list full purified with rank. + application_errors_list_full_with_rank: The application errors list full with rank. + error_type: The error type. + error_type_with_rank: The error type with rank. + error_type_with_rank_and_rank: The error type with rank and rank. + error_type_with_rank_and_rank_and_rank: The error type with rank and rank and rank. + + """ + cfg = effective_run_or_init_config(self._init_config) + path = cfg["log_path"] + + s_time = time.time() + + llm = self.llm + cache_dict = self.lru_cache + + path_previous = _previous_path(path) + attribution_previous = ( + self.attribution_dict.get(path_previous, '') if path_previous else '' + ) + cycle_counter = int(cfg.get("cycle_counter", 0)) + cycle_counter_key = _cycle_counter_key(path) + if cycle_counter == 0: + self.cycle_counter_dict[cycle_counter_key] = cycle_counter + + if path not in self.temporal_cache_dict: + self.temporal_cache_dict[path] = {} + + if len(self.job_inline_data_dict[path]): + file_offset = self.job_inline_data_dict[path][-1][0] + else: + file_offset = 0 + + try: + with open(path, 'r', encoding='utf-8') as f: + f.seek(file_offset) + new_lines = f.readlines() + file_offset = f.tell() + except UnicodeDecodeError: + with open(path, 'r', encoding='latin-1') as f: + f.seek(file_offset) + new_lines = f.readlines() + file_offset = f.tell() + + history = self.job_inline_data_dict.get(path, []) + num_chunks = int(self.logs_minutes_before_job_end / self.chunks_per_time) + chunk = [] + for item in history[-num_chunks:]: + chunk = chunk + item[1] + chunk = chunk + new_lines + + chunk_data = _retry_return_application_errors_rt(llm, chunk, cache_dict, self.temporal_cache_dict[path]) + + last_with_errors = chunk_data + if last_with_errors.application_errors_list_full: + indices = [error[2] for error in last_with_errors.application_errors_list_full] + error_groups = chunk_indices(indices, len(last_with_errors.original_text)) + + n_lines = len(last_with_errors.original_text) + prompt_post_error = ChatPromptTemplate.from_template(template_post_error_check) + post_error_chain = ( + {"question": RunnablePassthrough()} + | prompt_post_error + | llm + | StrOutputParser() + ) + + post_error_texts = [] + checked_groups = [] + for group in error_groups: + if len(group) == 0: + continue + first_idx = int(group[0]) + last_idx = int(group[-1]) + if first_idx < n_lines - last_idx and len(last_with_errors.original_text) > last_idx + 50: + post_error_lines = last_with_errors.original_text[last_idx + 1: last_idx + 51] + post_error_texts.append("\n".join(post_error_lines)[:CONTEXT_SIZE]) + checked_groups.append(group) + + post_error_results = post_error_chain.batch(post_error_texts) if post_error_texts else [] + + indices_to_remove = set() + for group, result in zip(checked_groups, post_error_results): + if result.strip().lower() == "no": + indices_to_remove.update(int(idx) for idx in group) + + last_with_errors.application_errors_list_full = [ + error + for error in last_with_errors.application_errors_list_full + if error[2] not in indices_to_remove + ] + if last_with_errors.application_errors_list_full: + application_log, attribution_raw_chunk, attribution_dict_chunk, hw_category_chunk = get_attribution( + llm, last_with_errors, True) + last_attribution_dict_chunk = attribution_dict_chunk + last_attribution_raw_chunk = attribution_raw_chunk + last_application_log_chunk = application_log + last_hw_category_chunk = hw_category_chunk + + if last_with_errors is None: + history = self.job_inline_data_dict.get(path, []) + last_with_errors = (history[-1][2] if len(history) >= 1 else None) + + last_with_errors.checkpoint_saved = any([item[2].checkpoint_saved for item in self.job_inline_data_dict[path]]) + + last_with_errors = finished_validation(llm, last_with_errors) + + logger.info("error extraction latency: %s", time.time() - s_time) + application_errors_full = [error[0] for error in last_with_errors.application_errors_list_full] + + self.temporal_cache_dict.pop(path, None) + + if ( + len(last_with_errors.application_errors_list_full) == 0 + or last_with_errors.finished == FinishedStatus.APPLICATION_DONE + ): + attribution_finished = attribution_from_finished_status(last_with_errors, last_with_errors.application_errors_list_unique) + return attribution_finished + + attribution_output = last_attribution_dict_chunk["attribution"] + self.attribution_dict[path] = attribution_output + auto_resume_output, auto_resume_verbose = get_auto_resume( + llm, + last_with_errors, + last_attribution_raw_chunk, + last_attribution_dict_chunk, + last_hw_category_chunk, + last_application_log_chunk, + ) + + is_attribution_current_last = get_auto_resume_postprocessing( + attribution_output, + attribution_previous, + cycle_counter, + llm, + ) + + if last_with_errors.checkpoint_saved and cycle_counter > 0: + is_attribution_current_last = False + + if is_attribution_current_last and self.cycle_counter_dict[cycle_counter_key] == self.repeated_amount - 1: + auto_resume_output = 'STOP - DONT RESTART IMMEDIATE' + auto_resume_verbose = "Stop job due to repeated issue" + + if is_attribution_current_last: + if auto_resume_verbose != "Stop job due to repeated issue": + self.cycle_counter_dict[cycle_counter_key]+=1 + else: + self.cycle_counter_dict[cycle_counter_key]=1 + + logger.info("Policy suggestion and Error attribution started") + + logger.info(auto_resume_output) + + return ErrorAttribution( + application_errors_full=application_errors_full, + application_errors_unique=last_with_errors.application_errors_list_unique, + auto_resume=auto_resume_output, + auto_resume_verbose=auto_resume_verbose, + attribution=attribution_output, + ) + + async def analyze_logs(self) -> list[ApplicationData]: """ Analyzes the logs and returns the application errors. diff --git a/src/nvidia_resiliency_ext/attribution/orchestration/config.py b/src/nvidia_resiliency_ext/attribution/orchestration/config.py index ebe6383d..5c27226d 100644 --- a/src/nvidia_resiliency_ext/attribution/orchestration/config.py +++ b/src/nvidia_resiliency_ext/attribution/orchestration/config.py @@ -22,11 +22,8 @@ from enum import Enum from typing import Any, Mapping -# LLM defaults — override with NVRX_LLM_MODEL / NVRX_LLM_BASE_URL env vars. -# Default endpoint is build.nvidia.com (publicly accessible). -# Internal NVIDIA users can override to inference.nvidia.com via NVRX_LLM_BASE_URL. -DEFAULT_LLM_MODEL = os.environ.get("NVRX_LLM_MODEL", "nvidia/nemotron-3-super-120b-a12b") -DEFAULT_LLM_BASE_URL = os.environ.get("NVRX_LLM_BASE_URL", "https://integrate.api.nvidia.com/v1") +DEFAULT_LLM_MODEL = os.environ.get("NVRX_LLM_MODEL", "nvidia/qwen/qwen-235b") +DEFAULT_LLM_BASE_URL = os.environ.get("NVRX_LLM_BASE_URL", "https://inference-api.nvidia.com/v1") DEFAULT_LLM_TEMPERATURE = 0.2 DEFAULT_LLM_TOP_P = 0.7 DEFAULT_LLM_MAX_TOKENS = 8192 @@ -65,6 +62,10 @@ class LogSageExecutionConfig: llm_temperature: float | None = None llm_top_p: float | None = None llm_max_tokens: int | None = None + #: When True, NVRxLogAnalyzer runs in streaming mode (start kicks off a per-path + #: background reader, end finalizes with the accumulated state). When False, only + #: terminal analysis runs and the start signal is a no-op at the LogSage layer. + is_streaming_logs: bool = False def llm_runtime_overrides(self) -> dict[str, Any]: """LLM kwargs for LogSage/MCP/runtime calls, omitting unset overrides.""" diff --git a/src/nvidia_resiliency_ext/attribution/orchestration/log_analyzer.py b/src/nvidia_resiliency_ext/attribution/orchestration/log_analyzer.py index 6d067d39..5c75a52f 100644 --- a/src/nvidia_resiliency_ext/attribution/orchestration/log_analyzer.py +++ b/src/nvidia_resiliency_ext/attribution/orchestration/log_analyzer.py @@ -154,21 +154,39 @@ async def _get_lib_log_analyzer(self, run_kwargs: Dict[str, Any]) -> Any: raise return self._lib_log_analyzer - async def _fetch_log_result_lib(self, path: str) -> Dict[str, Any]: + def _streaming_kwargs(self) -> Dict[str, Any]: + """Streaming-mode flags forwarded to NVRxLogAnalyzer init/run kwargs.""" + if not self.config.is_streaming_logs: + return {} + return {"is_streaming_logs": True} + + @property + def is_streaming_logs(self) -> bool: + """Whether streaming-mode LogSage is enabled for this runner.""" + return bool(self.config.is_streaming_logs) + + async def _fetch_log_result_lib( + self, path: str, *, job_stage: Optional[str] = None + ) -> Dict[str, Any]: is_per_cycle = bool(re.search(CYCLE_LOG_PATTERN, path)) run_kwargs = { "log_path": path, "exclude_nvrx_logs": False, "is_per_cycle": is_per_cycle, + **self._streaming_kwargs(), **self.config.llm_runtime_overrides(), } + if job_stage is not None: + run_kwargs["job_stage"] = job_stage analyzer = await self._get_lib_log_analyzer(run_kwargs) async with self._log_analysis_lock: result = await analyzer.run(run_kwargs) return nvrx_run_result_to_log_dict(result, path) - async def _fetch_log_result_mcp(self, path: str) -> Dict[str, Any]: + async def _fetch_log_result_mcp( + self, path: str, *, job_stage: Optional[str] = None + ) -> Dict[str, Any]: self._ensure_mcp_ready() is_per_cycle = bool(re.search(CYCLE_LOG_PATTERN, path)) @@ -176,8 +194,11 @@ async def _fetch_log_result_mcp(self, path: str) -> Dict[str, Any]: "log_path": path, "exclude_nvrx_logs": False, "is_per_cycle": is_per_cycle, + **self._streaming_kwargs(), **self.config.llm_runtime_overrides(), } + if job_stage is not None: + run_kwargs["job_stage"] = job_stage async with self._log_analysis_lock: log_result = await self._mcp_client.run_module_resilient( @@ -185,11 +206,18 @@ async def _fetch_log_result_mcp(self, path: str) -> Dict[str, Any]: ) return log_result - async def fetch_log_result(self, path: str) -> Dict[str, Any]: - """Run LogSage for ``path``; return result items plus a derived recommendation.""" + async def fetch_log_result( + self, path: str, *, job_stage: Optional[str] = None + ) -> Dict[str, Any]: + """Run LogSage for ``path``; return result items plus a derived recommendation. + + ``job_stage`` (``"start"`` / ``"end"``) is forwarded to NVRxLogAnalyzer's + run kwargs and read by its streaming dispatcher; ignored when streaming + mode is off. + """ if self.config.use_lib_log_analysis: - return await self._fetch_log_result_lib(path) - return await self._fetch_log_result_mcp(path) + return await self._fetch_log_result_lib(path, job_stage=job_stage) + return await self._fetch_log_result_mcp(path, job_stage=job_stage) async def _start_progressive_analysis_lib( self, @@ -431,8 +459,15 @@ async def check_mcp_health(self, timeout_seconds: float = 5.0) -> tuple[str, str async def reconnect_mcp(self) -> bool: return await self._runner.reconnect_mcp() - async def fetch_log_result(self, path: str) -> Dict[str, Any]: - return await self._runner.fetch_log_result(path) + async def fetch_log_result( + self, path: str, *, job_stage: Optional[str] = None + ) -> Dict[str, Any]: + return await self._runner.fetch_log_result(path, job_stage=job_stage) + + @property + def is_streaming_logs(self) -> bool: + """True when the LogSage runner is configured for streaming start/end stages.""" + return self._runner.is_streaming_logs async def start_progressive_analysis( self, @@ -529,7 +564,12 @@ def _schedule_post_analysis_results( task.add_done_callback(self._post_tasks.discard) async def run_attribution_for_path( - self, path: str, user: str = "unknown", job_id: Optional[str] = None + self, + path: str, + user: str = "unknown", + job_id: Optional[str] = None, + *, + job_stage: Optional[str] = None, ) -> LogAnalysisCoalesced: """Run LogSage and optional FR pipeline for ``path``; return coalescer payload.""" if not os.access(path, os.R_OK): @@ -565,7 +605,7 @@ async def _mcp_combined( path, mode=mode, run_logsage=( - (lambda: self.fetch_log_result(path)) + (lambda: self.fetch_log_result(path, job_stage=job_stage)) if mode != AnalysisPipelineMode.TRACE_ONLY else None ), From 1b85a2bd97f1cacccc6ffabaadbfcd214736529e Mon Sep 17 00:00:00 2001 From: Haim Elisha Date: Thu, 21 May 2026 16:02:59 +0300 Subject: [PATCH 3/6] attribution_inline_processing_stop_repeated_failures --- .../attribution/analyzer/engine.py | 8 +- .../attribution/log_analyzer/nvrx_logsage.py | 136 +++++++++++------- .../unit/run_dispatch_test_parallel.py | 70 +++++---- .../run_log_analyzer_dispatch_four_cycles.py | 35 +++-- .../run_log_writer_dispatch_four_cycles.py | 44 +++--- 5 files changed, 173 insertions(+), 120 deletions(-) diff --git a/src/nvidia_resiliency_ext/attribution/analyzer/engine.py b/src/nvidia_resiliency_ext/attribution/analyzer/engine.py index d8f72cd6..a9b6190e 100644 --- a/src/nvidia_resiliency_ext/attribution/analyzer/engine.py +++ b/src/nvidia_resiliency_ext/attribution/analyzer/engine.py @@ -461,9 +461,7 @@ def _schedule_stage_analysis( logger.debug("Scheduled %s analysis start for %s", stage_label, normalized_path) @staticmethod - def _log_stage_analysis_failure( - normalized_path: str, done: Any, *, stage_label: str - ) -> None: + def _log_stage_analysis_failure(normalized_path: str, done: Any, *, stage_label: str) -> None: try: done.result() except (asyncio.CancelledError, concurrent.futures.CancelledError): @@ -481,9 +479,7 @@ def _log_stage_analysis_failure( @staticmethod def _log_terminal_analysis_failure(normalized_path: str, done: Any) -> None: - Analyzer._log_stage_analysis_failure( - normalized_path, done, stage_label="terminal" - ) + Analyzer._log_stage_analysis_failure(normalized_path, done, stage_label="terminal") async def analyze( self, diff --git a/src/nvidia_resiliency_ext/attribution/log_analyzer/nvrx_logsage.py b/src/nvidia_resiliency_ext/attribution/log_analyzer/nvrx_logsage.py index 46ac806e..f117834e 100644 --- a/src/nvidia_resiliency_ext/attribution/log_analyzer/nvrx_logsage.py +++ b/src/nvidia_resiliency_ext/attribution/log_analyzer/nvrx_logsage.py @@ -14,15 +14,18 @@ from logsage.auto_resume_policy.attribution_classes import * from logsage.auto_resume_policy.error_attribution import ( CONTEXT_SIZE, + get_attribution, + get_auto_resume, get_proposed_solution_cat, - + get_proposed_solution_policies, +) +from logsage.auto_resume_policy.error_extraction import ( + finished_validation, + return_application_errors, + return_application_errors_rt, ) - -from logsage.auto_resume_policy.error_attribution import get_attribution, get_auto_resume, get_proposed_solution_policies -from logsage.auto_resume_policy.util_postprocessing import get_auto_resume_postprocessing from logsage.auto_resume_policy.prompts import template_post_error_check -from logsage.auto_resume_policy.error_extraction import return_application_errors, \ - return_application_errors_rt, finished_validation +from logsage.auto_resume_policy.util_postprocessing import get_auto_resume_postprocessing from logsage.auto_resume_policy.utils import chunk_indices from nvidia_resiliency_ext.attribution.base import ( @@ -320,6 +323,7 @@ def attribution_from_finished_status( cor_category="", ) + def lines_after(lines, needle): for i, line in enumerate(lines): if needle in line: @@ -532,10 +536,12 @@ def __init__(self, args: Union[argparse.Namespace, Mapping[str, Any]]): self.exclude_nvrx_logs = bool(self._init_config.get("exclude_nvrx_logs", False)) self.is_per_cycle = bool(self._init_config.get("is_per_cycle", False)) self.is_streaming_logs = bool(self._init_config.get("is_streaming_logs", False)) - self.repeated_amount = int(self._init_config.get("repeated_amount", 3)), - self.stop_accumulating_count = int(self._init_config.get("stop_accumulating_count", 3)), - self.logs_minutes_before_job_end = int(self._init_config.get("logs_minutes_before_job_end", 20)), - self.chunks_per_time = int(self._init_config.get("chunks_per_time", 5)), + self.repeated_amount = (int(self._init_config.get("repeated_amount", 3)),) + self.stop_accumulating_count = (int(self._init_config.get("stop_accumulating_count", 3)),) + self.logs_minutes_before_job_end = ( + int(self._init_config.get("logs_minutes_before_job_end", 20)), + ) + self.chunks_per_time = (int(self._init_config.get("chunks_per_time", 5)),) if self.is_streaming_logs: super().__init__( preprocess_input=self._analyze_logs_rt_dispatch, @@ -553,25 +559,24 @@ def __init__(self, args: Union[argparse.Namespace, Mapping[str, Any]]): def init_config(self) -> Dict[str, Any]: return dict(self._init_config) - async def _analyze_logs_rt_dispatch(self) -> ErrorAttribution | None: cfg = effective_run_or_init_config(self._init_config) if cfg.get("job_stage") == "end": return await self.analyze_logs_rt_end() return await self.analyze_logs_rt_start() - async def llm_analyze_rt(self, rt_result: ErrorAttribution | None) -> list[LogSageCycleFields]: if rt_result is None: return [] - return [( - str(rt_result.auto_resume), - str(rt_result.auto_resume_verbose), - f"Attribution: Primary issues: [{rt_result.attribution}], Secondary issues: []", - "", - str(getattr(rt_result, "checkpoint_saved", False)), - )] - + return [ + ( + str(rt_result.auto_resume), + str(rt_result.auto_resume_verbose), + f"Attribution: Primary issues: [{rt_result.attribution}], Secondary issues: []", + "", + str(getattr(rt_result, "checkpoint_saved", False)), + ) + ] async def analyze_logs_rt_start(self) -> list[ApplicationData]: """ @@ -608,7 +613,12 @@ async def analyze_logs_rt_start(self) -> list[ApplicationData]: log_lines: list[str] = [] empty_logs_stop = self.stop_accumulating_count - application_log, attribution_raw_chunk, attribution_dict_chunk, hw_category_chunk = None, None, None, None + application_log, attribution_raw_chunk, attribution_dict_chunk, hw_category_chunk = ( + None, + None, + None, + None, + ) while True: try: @@ -633,20 +643,35 @@ async def analyze_logs_rt_start(self) -> list[ApplicationData]: log_lines.extend(new_lines) attribution_list = [] - chunk_data = _retry_return_application_errors_rt(llm, new_lines, cache_dict, self.temporal_cache_dict[path]) + chunk_data = _retry_return_application_errors_rt( + llm, new_lines, cache_dict, self.temporal_cache_dict[path] + ) app_data = chunk_data if chunk_data.application_errors_list_full: - application_log, attribution_raw_chunk, attribution_dict_chunk, hw_category_chunk = get_attribution( - llm, app_data, True) + ( + application_log, + attribution_raw_chunk, + attribution_dict_chunk, + hw_category_chunk, + ) = get_attribution(llm, app_data, True) attribution_list.append(attribution_raw_chunk) - self.job_inline_data_dict[path].append((file_offset, new_lines, chunk_data, application_log, attribution_raw_chunk, attribution_dict_chunk, hw_category_chunk)) + self.job_inline_data_dict[path].append( + ( + file_offset, + new_lines, + chunk_data, + application_log, + attribution_raw_chunk, + attribution_dict_chunk, + hw_category_chunk, + ) + ) - time.sleep(self.chunks_per_time*60) + time.sleep(self.chunks_per_time * 60) return None - async def analyze_logs_rt_end(self) -> list[ApplicationData]: """ Analyzes the logs and returns the application errors. @@ -674,9 +699,7 @@ async def analyze_logs_rt_end(self) -> list[ApplicationData]: cache_dict = self.lru_cache path_previous = _previous_path(path) - attribution_previous = ( - self.attribution_dict.get(path_previous, '') if path_previous else '' - ) + attribution_previous = self.attribution_dict.get(path_previous, '') if path_previous else '' cycle_counter = int(cfg.get("cycle_counter", 0)) cycle_counter_key = _cycle_counter_key(path) if cycle_counter == 0: @@ -708,7 +731,9 @@ async def analyze_logs_rt_end(self) -> list[ApplicationData]: chunk = chunk + item[1] chunk = chunk + new_lines - chunk_data = _retry_return_application_errors_rt(llm, chunk, cache_dict, self.temporal_cache_dict[path]) + chunk_data = _retry_return_application_errors_rt( + llm, chunk, cache_dict, self.temporal_cache_dict[path] + ) last_with_errors = chunk_data if last_with_errors.application_errors_list_full: @@ -718,10 +743,7 @@ async def analyze_logs_rt_end(self) -> list[ApplicationData]: n_lines = len(last_with_errors.original_text) prompt_post_error = ChatPromptTemplate.from_template(template_post_error_check) post_error_chain = ( - {"question": RunnablePassthrough()} - | prompt_post_error - | llm - | StrOutputParser() + {"question": RunnablePassthrough()} | prompt_post_error | llm | StrOutputParser() ) post_error_texts = [] @@ -731,12 +753,17 @@ async def analyze_logs_rt_end(self) -> list[ApplicationData]: continue first_idx = int(group[0]) last_idx = int(group[-1]) - if first_idx < n_lines - last_idx and len(last_with_errors.original_text) > last_idx + 50: - post_error_lines = last_with_errors.original_text[last_idx + 1: last_idx + 51] + if ( + first_idx < n_lines - last_idx + and len(last_with_errors.original_text) > last_idx + 50 + ): + post_error_lines = last_with_errors.original_text[last_idx + 1 : last_idx + 51] post_error_texts.append("\n".join(post_error_lines)[:CONTEXT_SIZE]) checked_groups.append(group) - post_error_results = post_error_chain.batch(post_error_texts) if post_error_texts else [] + post_error_results = ( + post_error_chain.batch(post_error_texts) if post_error_texts else [] + ) indices_to_remove = set() for group, result in zip(checked_groups, post_error_results): @@ -749,8 +776,9 @@ async def analyze_logs_rt_end(self) -> list[ApplicationData]: if error[2] not in indices_to_remove ] if last_with_errors.application_errors_list_full: - application_log, attribution_raw_chunk, attribution_dict_chunk, hw_category_chunk = get_attribution( - llm, last_with_errors, True) + application_log, attribution_raw_chunk, attribution_dict_chunk, hw_category_chunk = ( + get_attribution(llm, last_with_errors, True) + ) last_attribution_dict_chunk = attribution_dict_chunk last_attribution_raw_chunk = attribution_raw_chunk last_application_log_chunk = application_log @@ -758,22 +786,28 @@ async def analyze_logs_rt_end(self) -> list[ApplicationData]: if last_with_errors is None: history = self.job_inline_data_dict.get(path, []) - last_with_errors = (history[-1][2] if len(history) >= 1 else None) + last_with_errors = history[-1][2] if len(history) >= 1 else None - last_with_errors.checkpoint_saved = any([item[2].checkpoint_saved for item in self.job_inline_data_dict[path]]) + last_with_errors.checkpoint_saved = any( + [item[2].checkpoint_saved for item in self.job_inline_data_dict[path]] + ) last_with_errors = finished_validation(llm, last_with_errors) logger.info("error extraction latency: %s", time.time() - s_time) - application_errors_full = [error[0] for error in last_with_errors.application_errors_list_full] + application_errors_full = [ + error[0] for error in last_with_errors.application_errors_list_full + ] self.temporal_cache_dict.pop(path, None) if ( - len(last_with_errors.application_errors_list_full) == 0 - or last_with_errors.finished == FinishedStatus.APPLICATION_DONE + len(last_with_errors.application_errors_list_full) == 0 + or last_with_errors.finished == FinishedStatus.APPLICATION_DONE ): - attribution_finished = attribution_from_finished_status(last_with_errors, last_with_errors.application_errors_list_unique) + attribution_finished = attribution_from_finished_status( + last_with_errors, last_with_errors.application_errors_list_unique + ) return attribution_finished attribution_output = last_attribution_dict_chunk["attribution"] @@ -797,15 +831,18 @@ async def analyze_logs_rt_end(self) -> list[ApplicationData]: if last_with_errors.checkpoint_saved and cycle_counter > 0: is_attribution_current_last = False - if is_attribution_current_last and self.cycle_counter_dict[cycle_counter_key] == self.repeated_amount - 1: + if ( + is_attribution_current_last + and self.cycle_counter_dict[cycle_counter_key] == self.repeated_amount - 1 + ): auto_resume_output = 'STOP - DONT RESTART IMMEDIATE' auto_resume_verbose = "Stop job due to repeated issue" if is_attribution_current_last: if auto_resume_verbose != "Stop job due to repeated issue": - self.cycle_counter_dict[cycle_counter_key]+=1 + self.cycle_counter_dict[cycle_counter_key] += 1 else: - self.cycle_counter_dict[cycle_counter_key]=1 + self.cycle_counter_dict[cycle_counter_key] = 1 logger.info("Policy suggestion and Error attribution started") @@ -819,7 +856,6 @@ async def analyze_logs_rt_end(self) -> list[ApplicationData]: attribution=attribution_output, ) - async def analyze_logs(self) -> list[ApplicationData]: """ Analyzes the logs and returns the application errors. diff --git a/tests/attribution/unit/run_dispatch_test_parallel.py b/tests/attribution/unit/run_dispatch_test_parallel.py index 8be607f6..e43528a4 100644 --- a/tests/attribution/unit/run_dispatch_test_parallel.py +++ b/tests/attribution/unit/run_dispatch_test_parallel.py @@ -50,33 +50,43 @@ def main() -> int: parser.add_argument("--log-dir", default=_HERE) parser.add_argument("--num-cycles", type=int, default=4) parser.add_argument( - "--chunk-interval", type=float, default=60.0, + "--chunk-interval", + type=float, + default=60.0, help="Seconds between writer chunks (default 60s = 1 min)", ) parser.add_argument( - "--poll-interval", type=float, default=60.0, + "--poll-interval", + type=float, + default=60.0, help="Seconds between analyzer polls (default 60s = 1 min)", ) parser.add_argument( - "--cycle-duration", type=float, default=300.0, + "--cycle-duration", + type=float, + default=300.0, help="Analyzer's per-cycle deadline (default 300s = 5 min)", ) parser.add_argument( - "--end-window-minutes", type=float, default=2.0, - help="Minutes of history the end phase reuses (default 2 = " - "tail + last 2 minutes)", + "--end-window-minutes", + type=float, + default=2.0, + help="Minutes of history the end phase reuses (default 2 = " "tail + last 2 minutes)", ) parser.add_argument( - "--checkpoint-cycles", default="last", + "--checkpoint-cycles", + default="last", help="Which cycle indices include a 'Saved checkpoint' line. " - "Default 'last' = only the final cycle. The end phase's " - "checkpoint_saved circuit-breaker then prevents the " - "repeated-issue STOP from firing on that cycle.", + "Default 'last' = only the final cycle. The end phase's " + "checkpoint_saved circuit-breaker then prevents the " + "repeated-issue STOP from firing on that cycle.", ) parser.add_argument( - "--writer-head-start", type=float, default=0.5, + "--writer-head-start", + type=float, + default=0.5, help="Seconds the writer leads the analyzer so the first poll " - "finds a non-empty file (default 0.5s)", + "finds a non-empty file (default 0.5s)", ) args = parser.parse_args() @@ -97,26 +107,36 @@ def main() -> int: common_env["PYTHONUNBUFFERED"] = "1" writer_cmd = [ - sys.executable, WRITER_SCRIPT, - "--log-dir", args.log_dir, - "--num-cycles", str(args.num_cycles), - "--chunk-interval", str(args.chunk_interval), - "--checkpoint-cycles", args.checkpoint_cycles, + sys.executable, + WRITER_SCRIPT, + "--log-dir", + args.log_dir, + "--num-cycles", + str(args.num_cycles), + "--chunk-interval", + str(args.chunk_interval), + "--checkpoint-cycles", + args.checkpoint_cycles, ] analyzer_cmd = [ - sys.executable, ANALYZER_SCRIPT, - "--log-dir", args.log_dir, - "--num-cycles", str(args.num_cycles), - "--cycle-duration", str(args.cycle_duration), - "--poll-interval", str(args.poll_interval), - "--end-window-minutes", str(args.end_window_minutes), + sys.executable, + ANALYZER_SCRIPT, + "--log-dir", + args.log_dir, + "--num-cycles", + str(args.num_cycles), + "--cycle-duration", + str(args.cycle_duration), + "--poll-interval", + str(args.poll_interval), + "--end-window-minutes", + str(args.end_window_minutes), ] print(f"[driver] writer: {' '.join(writer_cmd)}", flush=True) print(f"[driver] analyzer: {' '.join(analyzer_cmd)}", flush=True) print( - f"[driver] launching in parallel, writer head start " - f"{args.writer_head_start:.1f}s", + f"[driver] launching in parallel, writer head start " f"{args.writer_head_start:.1f}s", flush=True, ) diff --git a/tests/attribution/unit/run_log_analyzer_dispatch_four_cycles.py b/tests/attribution/unit/run_log_analyzer_dispatch_four_cycles.py index 2c8677eb..e2a5ba48 100644 --- a/tests/attribution/unit/run_log_analyzer_dispatch_four_cycles.py +++ b/tests/attribution/unit/run_log_analyzer_dispatch_four_cycles.py @@ -128,6 +128,7 @@ def _dispatch(analyzer, path: str, job_stage: str, cycle: int): analyzer._init_config["attribution"] = True from nvidia_resiliency_ext.attribution.log_analyzer import nvrx_logsage + analyzer.job_inline_data_dict.setdefault(path, []) ck = nvrx_logsage._cycle_counter_key(path) analyzer.cycle_counter_dict.setdefault(ck, 0) @@ -188,8 +189,7 @@ def run_dispatch_all_cycles( continue print( - f"\n[dispatch] === cycle {cycle} " - f"({os.path.basename(path)}) ===", + f"\n[dispatch] === cycle {cycle} " f"({os.path.basename(path)}) ===", flush=True, ) @@ -225,8 +225,7 @@ def run_dispatch_all_cycles( # Phase 2 — end branch via dispatcher. print( - f"[dispatch] cycle {cycle}: job_stage=end → " - "analyze_logs_rt_end", + f"[dispatch] cycle {cycle}: job_stage=end → " "analyze_logs_rt_end", flush=True, ) end_t0 = time.monotonic() @@ -266,8 +265,7 @@ def run_dispatch_all_cycles( flush=True, ) per_cycle_result.append( - (cycle, path, end_result, ckpt_saved_in_history, - start_elapsed, end_elapsed) + (cycle, path, end_result, ckpt_saved_in_history, start_elapsed, end_elapsed) ) print("\n[dispatch] ====== summary ======", flush=True) @@ -295,24 +293,31 @@ def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--log-dir", default=DEFAULT_LOG_DIR) parser.add_argument( - "--num-cycles", type=int, default=NUM_CYCLES, + "--num-cycles", + type=int, + default=NUM_CYCLES, help=f"Number of cycle files (default {NUM_CYCLES})", ) parser.add_argument( - "--cycle-duration", type=float, default=300.0, - help="Seconds the start phase polls per cycle before exiting " - "(default 300s = 5 min)", + "--cycle-duration", + type=float, + default=300.0, + help="Seconds the start phase polls per cycle before exiting " "(default 300s = 5 min)", ) parser.add_argument( - "--poll-interval", type=float, default=60.0, + "--poll-interval", + type=float, + default=60.0, help="Seconds between polls inside the start phase (default 60s = 1 min)", ) parser.add_argument( - "--end-window-minutes", type=float, default=2.0, + "--end-window-minutes", + type=float, + default=2.0, help="Minutes of streaming history the end phase glues to the " - "freshly read tail before re-running extraction. With the " - "writer at 1 chunk/min, '2' = tail + last 2 minutes " - "(default 2)", + "freshly read tail before re-running extraction. With the " + "writer at 1 chunk/min, '2' = tail + last 2 minutes " + "(default 2)", ) args = parser.parse_args() diff --git a/tests/attribution/unit/run_log_writer_dispatch_four_cycles.py b/tests/attribution/unit/run_log_writer_dispatch_four_cycles.py index 8aed36ef..b6063136 100644 --- a/tests/attribution/unit/run_log_writer_dispatch_four_cycles.py +++ b/tests/attribution/unit/run_log_writer_dispatch_four_cycles.py @@ -30,7 +30,7 @@ _HERE = os.path.dirname(os.path.abspath(__file__)) NUM_CYCLES = 4 CHUNKS_PER_CYCLE = 5 -CHECKPOINT_CHUNK_INDEX = 3 # minute 3 — the late-training chunk +CHECKPOINT_CHUNK_INDEX = 3 # minute 3 — the late-training chunk # 5 chunks per cycle, one per minute → 5-minute cycle window. The OOM # lands in the last chunk so it falls inside the end phase's @@ -41,16 +41,13 @@ "[{ts}] FT: initialized\n" "[{ts}] Cycle: {cycle} begin\n", # minute 1 — early training - "[{ts}] step 100 loss=0.512 lr=1e-4\n" - "[{ts}] step 200 loss=0.487 lr=1e-4\n", + "[{ts}] step 100 loss=0.512 lr=1e-4\n" "[{ts}] step 200 loss=0.487 lr=1e-4\n", # minute 2 — mid training - "[{ts}] step 300 loss=0.461 lr=1e-4\n" - "[{ts}] step 400 loss=0.439 lr=1e-4\n", + "[{ts}] step 300 loss=0.461 lr=1e-4\n" "[{ts}] step 400 loss=0.439 lr=1e-4\n", # minute 3 — late training (last clean window before failure). # In "checkpoint cycles" we append CHECKPOINT_LINE here so the # end phase observes checkpoint_saved=True alongside the OOM. - "[{ts}] step 500 loss=0.421 lr=1e-4\n" - "[{ts}] step 600 loss=0.408 lr=1e-4\n", + "[{ts}] step 500 loss=0.421 lr=1e-4\n" "[{ts}] step 600 loss=0.408 lr=1e-4\n", # minute 4 — failure (this chunk must land inside the end phase's # tail + 2-minute window) "[{ts}] ERROR: torch.cuda.OutOfMemoryError: CUDA out of memory.\n" @@ -96,27 +93,30 @@ def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--log-dir", default=_HERE) parser.add_argument( - "--num-cycles", type=int, default=NUM_CYCLES, + "--num-cycles", + type=int, + default=NUM_CYCLES, help=f"Number of cycle files (default {NUM_CYCLES})", ) parser.add_argument( - "--chunk-interval", type=float, default=60.0, + "--chunk-interval", + type=float, + default=60.0, help="Seconds between chunks; also the gap after the last chunk " - "of a cycle, so each cycle is chunks_per_cycle * " - "chunk_interval long (default 60s).", + "of a cycle, so each cycle is chunks_per_cycle * " + "chunk_interval long (default 60s).", ) parser.add_argument( - "--checkpoint-cycles", default="last", + "--checkpoint-cycles", + default="last", help="Which cycle indices include a 'Saved checkpoint' line in " - "their minute-3 chunk. Accepts 'last' (default — only the " - "final cycle), 'all', 'none', or a comma-separated list " - "like '0,2'.", + "their minute-3 chunk. Accepts 'last' (default — only the " + "final cycle), 'all', 'none', or a comma-separated list " + "like '0,2'.", ) args = parser.parse_args() - checkpoint_cycles = _parse_checkpoint_cycles( - args.checkpoint_cycles, args.num_cycles - ) + checkpoint_cycles = _parse_checkpoint_cycles(args.checkpoint_cycles, args.num_cycles) os.makedirs(args.log_dir, exist_ok=True) # Truncate up front so the analyzer's first read against any cycle @@ -128,8 +128,7 @@ def main() -> int: chunks_per_cycle = len(CHUNK_TEMPLATES) cycle_seconds = chunks_per_cycle * args.chunk_interval ckpt_summary = ( - ",".join(str(c) for c in sorted(checkpoint_cycles)) - if checkpoint_cycles else "none" + ",".join(str(c) for c in sorted(checkpoint_cycles)) if checkpoint_cycles else "none" ) print( f"[writer] {args.num_cycles} cycles × {chunks_per_cycle} chunks @ " @@ -146,10 +145,7 @@ def main() -> int: ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") content = tmpl.format(ts=ts, cycle=cycle) ckpt_emitted = False - if ( - chunk_idx == CHECKPOINT_CHUNK_INDEX - and cycle in checkpoint_cycles - ): + if chunk_idx == CHECKPOINT_CHUNK_INDEX and cycle in checkpoint_cycles: content += CHECKPOINT_LINE.format(ts=ts) ckpt_emitted = True with open(path, "a", encoding="utf-8") as f: From 8e9bd41468424de56c661767d8f1797802f4bb91 Mon Sep 17 00:00:00 2001 From: Haim Elisha Date: Thu, 21 May 2026 16:06:08 +0300 Subject: [PATCH 4/6] attribution_inline_processing_stop_repeated_failures --- .../attribution/log_analyzer/nvrx_logsage.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/nvidia_resiliency_ext/attribution/log_analyzer/nvrx_logsage.py b/src/nvidia_resiliency_ext/attribution/log_analyzer/nvrx_logsage.py index f117834e..d521a7ab 100644 --- a/src/nvidia_resiliency_ext/attribution/log_analyzer/nvrx_logsage.py +++ b/src/nvidia_resiliency_ext/attribution/log_analyzer/nvrx_logsage.py @@ -17,7 +17,6 @@ get_attribution, get_auto_resume, get_proposed_solution_cat, - get_proposed_solution_policies, ) from logsage.auto_resume_policy.error_extraction import ( finished_validation, From 66013698894dae4da14cc8d2c16657f68fff1ec2 Mon Sep 17 00:00:00 2001 From: Haim Elisha Date: Thu, 21 May 2026 16:11:01 +0300 Subject: [PATCH 5/6] attribution_inline_processing_stop_repeated_failures --- .../attribution/log_analyzer/nvrx_logsage.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/nvidia_resiliency_ext/attribution/log_analyzer/nvrx_logsage.py b/src/nvidia_resiliency_ext/attribution/log_analyzer/nvrx_logsage.py index d521a7ab..94b46a88 100644 --- a/src/nvidia_resiliency_ext/attribution/log_analyzer/nvrx_logsage.py +++ b/src/nvidia_resiliency_ext/attribution/log_analyzer/nvrx_logsage.py @@ -11,7 +11,15 @@ from langchain_core.prompts import ChatPromptTemplate from langchain_core.runnables import RunnablePassthrough from langchain_openai import ChatOpenAI -from logsage.auto_resume_policy.attribution_classes import * +from logsage.auto_resume_policy.attribution_classes import ( + ApplicationData, + Attribution, + AutoResumeAction, + ErrorAttribution, + FinishedStatus, + LogSageCycleFields, + LRUCache, +) from logsage.auto_resume_policy.error_attribution import ( CONTEXT_SIZE, get_attribution, From ac8a94b45942afd590701a7be9183fde869559eb Mon Sep 17 00:00:00 2001 From: Haim Elisha Date: Thu, 21 May 2026 16:13:51 +0300 Subject: [PATCH 6/6] attribution_inline_processing_stop_repeated_failures --- .../attribution/log_analyzer/nvrx_logsage.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/nvidia_resiliency_ext/attribution/log_analyzer/nvrx_logsage.py b/src/nvidia_resiliency_ext/attribution/log_analyzer/nvrx_logsage.py index 94b46a88..4910468d 100644 --- a/src/nvidia_resiliency_ext/attribution/log_analyzer/nvrx_logsage.py +++ b/src/nvidia_resiliency_ext/attribution/log_analyzer/nvrx_logsage.py @@ -62,8 +62,6 @@ logger = logging.getLogger(__name__) -LogSageCycleFields = tuple[str, str, str, str, str] - FINISHED_STATUS_LLM_FAILURE = "LLM_FAILURE" FINISHED_STATUS_SLURM_CANCELLED = "SLURM_CANCELLED" FINISHED_STATUS_SLURM_CANCELLED_JOB_REQUEUE = "SLURM_CANCELLED_JOB_REQUEUE"