Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
b2a035e
feat(skills): add nvrx-attr skill bundle
sbak5 Apr 22, 2026
df5f5c7
chore(skills): remove extraneous nvrx-attr artifacts
sbak5 Apr 22, 2026
a146d54
feat(skills): harden nvrx-attr fault injection workflow
sbak5 Apr 23, 2026
f73ff8b
chore(skills): add slurm defaults template
sbak5 Apr 23, 2026
2275c6a
feat(skills): add local env support for fault loop
sbak5 Apr 24, 2026
3f2016d
chore(skills): reduce torch cpp log verbosity
sbak5 Apr 24, 2026
43e39f7
style(skills): format changed python files
sbak5 Apr 24, 2026
251df4e
fix(skills): wire feedback-loop analysis outputs
sbak5 Apr 24, 2026
3b791bc
fix(skills): refine feedback-loop scoring config
sbak5 Apr 24, 2026
af9c325
feat(skills): add n3 super fault-loop workload
sbak5 Apr 24, 2026
9a12531
fix(skills): harden n3 fault-loop analysis
sbak5 Apr 24, 2026
1e23cb9
fix(skills): repair fr-analysis wrapper symlink
sbak5 Apr 24, 2026
5e30a25
docs(skills): clarify local env configuration
sbak5 Apr 24, 2026
810c534
refactor(skills): simplify local config wiring
sbak5 Apr 27, 2026
d0fac00
fix(fr): restore logger level on analysis errors
sbak5 Apr 27, 2026
950b97b
refactor(skills): require local env for fault loop
sbak5 Apr 27, 2026
269a3e3
fix(skills): normalize FR segment scoring
sbak5 Apr 27, 2026
59ebefe
fix(log-analysis): handle zero LLM retries
sbak5 Apr 28, 2026
0e1d6db
fix(skills): use LLM API key names
sbak5 Apr 28, 2026
26e6429
fix(skills): resolve user env for spooled jobs
sbak5 Apr 28, 2026
d4a3999
style(tests): sort nvrx logsage retry imports
sbak5 Apr 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ ft_state.json
*_pb2.pyi
*_pb2_grpc.py
.idea/
src/nvidia_resiliency_ext/skills/nvrx-attr/scripts/user.env
120 changes: 117 additions & 3 deletions src/nvidia_resiliency_ext/attribution/log_analyzer/nvrx_logsage.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import argparse
import logging
import os
import random
import re
import time
from typing import Any, Dict, Mapping, Union

from langchain_openai import ChatOpenAI
Expand Down Expand Up @@ -37,6 +39,7 @@
ATTR_ERRORS_NOT_FOUND = "ERRORS NOT FOUND"
ATTR_NO_LOGS = "NO LOGS"
ATTR_SLURM_CANCELLED_DUE_TO_PREEMPTION = "SLURM CANCELLED DUE TO PREEMPTION"
LOGSAGE_LLM_ENDPOINT_FAILED = "LLM ENDPOINT FAILED"


MARKER_NEW_RUN_DIR_ADDED = "[sbatch_script]: New run dir added:"
Expand Down Expand Up @@ -108,6 +111,99 @@ def chunk_logs_strict(lines):
return final_chunks


def _log_analysis_retry_config() -> tuple[int, float, float, float]:
retries = int(os.getenv("NVRX_LOG_ANALYSIS_LLM_RETRIES", "3"))
initial_backoff = float(os.getenv("NVRX_LOG_ANALYSIS_LLM_INITIAL_BACKOFF_SEC", "1.0"))
max_backoff = float(os.getenv("NVRX_LOG_ANALYSIS_LLM_MAX_BACKOFF_SEC", "8.0"))
jitter = float(os.getenv("NVRX_LOG_ANALYSIS_LLM_JITTER_SEC", "0.25"))
return retries, initial_backoff, max_backoff, jitter


def _finished_status_name(status: Any) -> str:
return getattr(status, "name", status)


Comment thread
sbak5 marked this conversation as resolved.
def _sleep_with_backoff(
attempt: int, retries: int, backoff: float, max_backoff: float, jitter: float
) -> float:
sleep_for = min(backoff, max_backoff) + random.uniform(0.0, jitter)
logger.info(
"Retrying log-analysis LLM in %.2fs after attempt %d/%d",
sleep_for,
attempt,
retries,
)
time.sleep(sleep_for)
return min(backoff * 2, max_backoff)


def _retry_return_application_errors(
llm: ChatOpenAI, lines: list[str], cache_dict: LRUCache
) -> ApplicationData:
retries, initial_backoff, max_backoff, jitter = _log_analysis_retry_config()
backoff = initial_backoff
last_status = None

for attempt in range(1, retries + 1):
app_data = return_application_errors(llm, lines, cache_dict)
status_name = _finished_status_name(app_data.finished)
if status_name != FINISHED_STATUS_LLM_FAILURE:
return app_data

last_status = status_name
if attempt == retries:
logger.error(
"Log-analysis extraction failed after %d attempts; last status: %s",
retries,
last_status,
)
return app_data

backoff = _sleep_with_backoff(attempt, retries, backoff, max_backoff, jitter)

return app_data


def _with_exponential_backoff(llm_call, checkpoint_saved: bool) -> tuple[str, str, str, str, str]:
retries, initial_backoff, max_backoff, jitter = _log_analysis_retry_config()
backoff = initial_backoff
last_error = "no attempts made (retries=0)"
fallback = (
ATTR_LLM_FAILURE,
ATTR_LLM_FAILURE,
ATTR_LLM_FAILURE,
ATTR_LLM_FAILURE,
str(checkpoint_saved),
)

for attempt in range(1, retries + 1):
try:
result = llm_call()
if result and not any(field == LOGSAGE_LLM_ENDPOINT_FAILED for field in result[:4]):
return result
last_error = LOGSAGE_LLM_ENDPOINT_FAILED
except Exception as exc:
last_error = str(exc)
logger.warning("Log-analysis LLM attempt %d/%d failed: %s", attempt, retries, exc)

if attempt == retries:
logger.error(
"Log-analysis LLM failed after %d attempts; last error: %s",
retries,
last_error,
)
return fallback

backoff = _sleep_with_backoff(attempt, retries, backoff, max_backoff, jitter)

logger.error(
"Log-analysis LLM failed after %d attempts; last error: %s",
retries,
last_error,
)
return fallback


class NVRxLogAnalyzer(NVRxAttribution):
def __init__(self, args: Union[argparse.Namespace, Mapping[str, Any]]):
from nvidia_resiliency_ext.attribution.api_keys import load_llm_api_key
Expand Down Expand Up @@ -213,7 +309,7 @@ async def analyze_logs(self) -> list[ApplicationData]:
current_chunk.append(line)

output_list = [
return_application_errors(self.llm, lines, self.lru_cache)
_retry_return_application_errors(self.llm, lines, self.lru_cache)
for cycle, lines in chunks.items()
]
return output_list
Expand Down Expand Up @@ -248,7 +344,12 @@ async def llm_analyze(self, output_list: list[ApplicationData]) -> list[str]:
)
else:
if len(output.application_errors_list_full):
result.append(get_proposed_solution_cat(self.llm, output))
result.append(
_with_exponential_backoff(
lambda: get_proposed_solution_cat(self.llm, output),
checkpoint_saved=output.checkpoint_saved,
)
)
else:
if output.finished == FINISHED_STATUS_LLM_FAILURE:
result.append(
Expand Down Expand Up @@ -361,11 +462,24 @@ def main():
action='store_true',
help='Input is already per-cycle data (skip filtering and chunking)',
)
parser.add_argument(
'--emit-stdout',
action='store_true',
help='Print final attribution payload to stdout for machine consumers',
)

args = parser.parse_args()

analyzer = NVRxLogAnalyzer(args)
analyzer.run_sync(args)
results = analyzer.run_sync(args)

if args.emit_stdout:
for result in results:
if not result:
continue
payload = result[0] if isinstance(result, tuple) else result
if payload:
print(payload)


if __name__ == "__main__":
Expand Down
124 changes: 83 additions & 41 deletions src/nvidia_resiliency_ext/attribution/trace_analyzer/fr_attribution.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,42 @@ def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)


def _parse_rank_list(rank_text: str) -> List[int]:
ranks = []
for token in rank_text.split(','):
token = token.strip()
if not token:
continue
try:
ranks.append(int(token))
except ValueError:
continue
return ranks


def _extract_missing_ranks_from_table(text: str) -> List[int]:
hanging_ranks = set()
capture = False

for line in text.splitlines():
stripped = line.strip()
if not stripped:
continue
if stripped.startswith("PGID") and "Missing Ranks" in stripped:
capture = True
continue
if not capture or "|" not in stripped:
continue

columns = [col.strip() for col in stripped.split("|")]
if len(columns) < 6:
continue
for rank in _parse_rank_list(columns[-1]):
hanging_ranks.add(rank)

return sorted(hanging_ranks)


@dataclass
class Collective:
"""
Expand Down Expand Up @@ -134,12 +170,7 @@ async def print_output(self, attribution_result: Optional[str]):
hanging_ranks_str = hanging_ranks.group(1).strip()
hanging_ranks_list = list(map(int, hanging_ranks_str.split(',')))
else:
for idx, line in enumerate(text.split('\n')):
line_list = line.split('|')
if len(line_list) >= 5:
logger.info(line)
if idx >= 1:
hanging_ranks_list.append(line_list[5])
hanging_ranks_list = _extract_missing_ranks_from_table(text)
hanging_ranks = f"hanging ranks: {hanging_ranks_list}"
# Dict form preserves collective table text for MCP clients and FRAnalysisResult parity.
return (
Expand Down Expand Up @@ -218,20 +249,18 @@ def build_collectives_to_order():
# analyze collectives to find process groups with missing and completed ranks
completed_pg, missing_pg = self.analyze_matches(verbose=bool(cfg.get("verbose")))
grouped_missing_pgs = {}
grouped_completed_pgs = {}

# if the dump file contains health check results, parse the health check results
# and print them in a format
if cfg.get("health_check"):
self.print_node_health_status(verbose=bool(cfg.get("verbose")))

# group the process groups with missing and completed ranks
# by finding longest paths in the graph
# Group only process groups with missing ranks.
# Completed-rank summaries are not actionable for attribution and create
# misleading output in the feedback loop.
grouped_missing_pgs = self.group_pgs(missing_pg)
if len(grouped_missing_pgs) == 0:
grouped_completed_pgs = self.group_pgs(completed_pg)

# gather the head node of each group with missing and completed ranks
# gather the head node of each group with missing ranks
# the head node is the first node in the group
# the missing ranks in the head node of the missing process groups
# are considered to cause the other nodes in the group to hang
Expand All @@ -242,41 +271,40 @@ def gather_head_nodes(grouped_pgs):
return head_nodes

head_nodes_missing = None
head_nodes_completed = None
# Gather the head node of each group
# Gather the head node of each missing-rank group.
if len(grouped_missing_pgs) > 0:
head_nodes_missing = gather_head_nodes(grouped_missing_pgs)
logger.debug(f"head_nodes of missing_pg: {head_nodes_missing}")
else:
head_nodes_completed = gather_head_nodes(grouped_completed_pgs)
logger.debug(f"head_nodes of completed_pg: {head_nodes_completed}")
# Print the analysis output
with capture_logs() as output:
original_level = logger.level
if logger.getEffectiveLevel() > logging.INFO:
logger.setLevel(logging.INFO)
try:
with capture_logs(logger.name) as output:

def print_ranks_in_pgs(head_nodes, pg_dict, missing_or_completed="Missing"):
logger.info(
f"{'PGID':<6} | {'Process Group Desc':<25} | {'Op Type':<10} | {'Size':<8} \
| {'Dtype':<8} | {missing_or_completed} Ranks"
)
for pg_idx in head_nodes:
entry = list(pg_dict[pg_idx][0])
entry.remove(entry[-2])
if missing_or_completed == "Missing":
ranks_to_print = entry[6]
else:
ranks_to_print = entry[5]
def print_ranks_in_pgs(head_nodes, pg_dict, missing_or_completed="Missing"):
logger.info(
f"{entry[0]:<6} | {entry[1]:<25} | {entry[2]:<10} | {entry[3]:<8} \
| {entry[4]:<8} | {ranks_to_print}"
f"{'PGID':<6} | {'Process Group Desc':<25} | {'Op Type':<10} | {'Size':<8} \
| {'Dtype':<8} | {missing_or_completed} Ranks"
)
for pg_idx in head_nodes:
entry = list(pg_dict[pg_idx][0])
entry.remove(entry[-2])
if missing_or_completed == "Missing":
ranks_to_print = entry[6]
else:
ranks_to_print = entry[5]
logger.info(
f"{entry[0]:<6} | {entry[1]:<25} | {entry[2]:<10} | {entry[3]:<8} \
| {entry[4]:<8} | {ranks_to_print}"
)

if head_nodes_missing:
logger.debug(f"head_nodes_missing: {head_nodes_missing}")
print_ranks_in_pgs(head_nodes_missing, missing_pg, "Missing")
# TODO: using this completed pg needs to be updated with new algorithm for isolation
if head_nodes_completed:
print_ranks_in_pgs(head_nodes_completed, completed_pg, "Completed")
analysis_output = output.getvalue()
if head_nodes_missing:
logger.debug(f"head_nodes_missing: {head_nodes_missing}")
print_ranks_in_pgs(head_nodes_missing, missing_pg, "Missing")
analysis_output = output.getvalue()
finally:
logger.setLevel(original_level)
return analysis_output
Comment thread
sbak5 marked this conversation as resolved.

async def collective_analysis(self, analysis_output: str) -> Optional[str]:
Expand Down Expand Up @@ -1117,7 +1145,7 @@ def main():
'--fr-path', type=str, help='Path to JSON files or directories containing JSON files'
)
parser.add_argument(
'-p', '--pattern', default="*.json", help='File pattern to match (default: *.json)'
'-p', '--pattern', default="_dump_*", help='File pattern to match (default: _dump_*)'
)
parser.add_argument('-v', '--verbose', action='store_true', help='verbose output')
parser.add_argument(
Expand All @@ -1143,11 +1171,25 @@ def main():
action='store_true',
help='Convert the trace file to json file, if the trace is binary, for debugging',
)
parser.add_argument(
'--emit-stdout',
action='store_true',
help='Print final FR summary table to stdout for machine consumers',
)

args = parser.parse_args()

analyzer = CollectiveAnalyzer(args)
analyzer.run_sync(args)
result = analyzer.run_sync(args)

if args.emit_stdout and isinstance(result, tuple) and result:
payload = result[0]
if isinstance(payload, dict):
text = payload.get("analysis_text", "")
if text:
print(text)
elif payload:
print(payload)


if __name__ == "__main__":
Comment on lines 1171 to 1195

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 --emit-stdout prints Python list repr instead of analysis text

run_sync() returns (output_list, overall_state) (from output_handler), so result[0] is a Python list of (text, AttributionState) tuples, not a string. print(payload) therefore outputs something like [('PGID | ...\n...', <AttributionState.CONTINUE: 1>)] — the list's repr() with embedded \n escaped — rather than the clean table text.

watch_and_analyze.sh captures this as FR_OUT and passes it to score_attribution.py, which calls parse_fr_missing_ranks(). Because the newlines are escaped in the repr, splitlines() sees a single line that matches the "Missing Ranks" in line guard and is skipped; the function returns an empty set and silently falls back to "no_dumps". FR rank-correctness is never scored.

The isinstance(payload, dict) branch is also dead code because result[0] is always a list.

Fix: unwrap the inner text from the first item in output_list:

if args.emit_stdout and isinstance(result, tuple) and result:
    output_list = result[0]
    if isinstance(output_list, list) and output_list:
        text = output_list[0][0] if isinstance(output_list[0], (list, tuple)) else output_list[0]
        if isinstance(text, dict):
            text = text.get("analysis_text", "")
        if text:
            print(text)
    elif isinstance(output_list, dict):
        text = output_list.get("analysis_text", "")
        if text:
            print(text)
    elif output_list:
        print(output_list)

Expand Down
1 change: 1 addition & 0 deletions src/nvidia_resiliency_ext/skills/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Agent skills bundled with nvidia_resiliency_ext."""
Loading
Loading