From 25b521a89463c4de9aae7012768f66e17984a4fa Mon Sep 17 00:00:00 2001 From: Jeongseok Kang Date: Mon, 27 Apr 2026 13:59:48 +0900 Subject: [PATCH 1/2] feat(common.dto): add KernelStatusData Pydantic model for unified status_data envelope MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduce a typed Pydantic envelope for the kernel/session ``status_data`` payload (kernel, session, scheduler, error branches) under ``common/dto/manager/v2/status_data/``. The model tolerantly parses both legacy error shapes — single dict and ``MultiAgentError`` ``collection`` form — into a flat ``errors: list[ErrorDetailInfo]``, which becomes the canonical field going forward. This is purely additive: no producer or consumer is wired to it yet. Producer/consumer migration follows in #11337. Tracks #679 (BA-253). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../dto/manager/v2/status_data/__init__.py | 22 +++ .../dto/manager/v2/status_data/types.py | 120 +++++++++++++++ .../common/dto/manager/v2/status_data/BUILD | 3 + .../dto/manager/v2/status_data/__init__.py | 0 .../dto/manager/v2/status_data/test_types.py | 144 ++++++++++++++++++ 5 files changed, 289 insertions(+) create mode 100644 src/ai/backend/common/dto/manager/v2/status_data/__init__.py create mode 100644 src/ai/backend/common/dto/manager/v2/status_data/types.py create mode 100644 tests/unit/common/dto/manager/v2/status_data/BUILD create mode 100644 tests/unit/common/dto/manager/v2/status_data/__init__.py create mode 100644 tests/unit/common/dto/manager/v2/status_data/test_types.py diff --git a/src/ai/backend/common/dto/manager/v2/status_data/__init__.py b/src/ai/backend/common/dto/manager/v2/status_data/__init__.py new file mode 100644 index 00000000000..7c5d7aa86ac --- /dev/null +++ b/src/ai/backend/common/dto/manager/v2/status_data/__init__.py @@ -0,0 +1,22 @@ +"""DTO v2 for unified kernel/session status_data payload. + +See https://github.com/lablup/backend.ai/issues/679 for context. +""" + +from ai.backend.common.dto.manager.v2.status_data.types import ( + ErrorDetailInfo, + KernelStatusBranch, + KernelStatusData, + SchedulerStatusBranch, + SchedulingPredicateInfo, + SessionStatusBranch, +) + +__all__ = ( + "ErrorDetailInfo", + "KernelStatusBranch", + "KernelStatusData", + "SchedulerStatusBranch", + "SchedulingPredicateInfo", + "SessionStatusBranch", +) diff --git a/src/ai/backend/common/dto/manager/v2/status_data/types.py b/src/ai/backend/common/dto/manager/v2/status_data/types.py new file mode 100644 index 00000000000..0fe34750c01 --- /dev/null +++ b/src/ai/backend/common/dto/manager/v2/status_data/types.py @@ -0,0 +1,120 @@ +"""Pydantic models for the unified kernel/session ``status_data`` payload. + +Tracks https://github.com/lablup/backend.ai/issues/679 (BA-253). The legacy +``status_data`` dict has four branches (``kernel``, ``session``, ``scheduler``, +``error``); the ``error`` branch was emitted in two incompatible shapes (single +dict vs. ``{name: 'MultiAgentError', collection: [...]}``) which forced +consumers to shape-sniff at runtime. This module defines a typed envelope that +exposes a single canonical ``errors`` list and tolerantly parses both legacy +shapes during the deprecation window. + +This PR only introduces the model. Producers and consumers are migrated in a +follow-up sub-issue (#11337). +""" + +from __future__ import annotations + +from typing import Any + +from pydantic import Field, model_validator + +from ai.backend.common.api_handlers import BaseResponseModel + +__all__ = ( + "ErrorDetailInfo", + "KernelStatusBranch", + "KernelStatusData", + "SchedulerStatusBranch", + "SchedulingPredicateInfo", + "SessionStatusBranch", +) + + +class KernelStatusBranch(BaseResponseModel): + """Kernel-level status fields written on container termination.""" + + exit_code: int | None = Field( + default=None, + description="Process exit code reported by the kernel container.", + ) + + +class SessionStatusBranch(BaseResponseModel): + """Session-level status fields.""" + + status: str | None = Field( + default=None, + description="Free-form session status string set by the manager.", + ) + + +class SchedulingPredicateInfo(BaseResponseModel): + """Single scheduling predicate evaluation result.""" + + name: str = Field(description="Predicate identifier (e.g., 'reserved_time').") + msg: str | None = Field( + default=None, + description="Failure message; null when the predicate passed.", + ) + + +class SchedulerStatusBranch(BaseResponseModel): + """Scheduler retry / predicate state at the time of the last attempt.""" + + msg: str | None = Field(default=None, description="Last scheduling message.") + retries: int | None = Field(default=None, description="Number of retries attempted.") + last_try: str | None = Field( + default=None, + description="ISO-8601 timestamp of the last scheduling attempt.", + ) + passed_predicates: list[SchedulingPredicateInfo] = Field(default_factory=list) + failed_predicates: list[SchedulingPredicateInfo] = Field(default_factory=list) + + +class ErrorDetailInfo(BaseResponseModel): + """One error detail entry. Mirrors ``manager.exceptions.ErrorDetail``.""" + + src: str = Field(description="Origin of the error (e.g., 'agent', 'other').") + name: str = Field(description="Exception class name.") + repr: str = Field(description="``repr()`` of the exception.") + agent_id: str | None = Field( + default=None, + description="Agent that raised the error (populated in debug mode).", + ) + traceback: str | None = Field( + default=None, + description="Traceback (populated in debug mode).", + ) + + +class KernelStatusData(BaseResponseModel): + """Unified ``status_data`` envelope for kernels and sessions. + + Tolerantly parses legacy shapes by normalizing the single-``error`` and + ``MultiAgentError`` ``collection`` forms into a flat ``errors`` list before + Pydantic field validation. Producers populate only the relevant branches. + """ + + kernel: KernelStatusBranch | None = Field(default=None) + session: SessionStatusBranch | None = Field(default=None) + scheduler: SchedulerStatusBranch | None = Field(default=None) + errors: list[ErrorDetailInfo] = Field( + default_factory=list, + description="Flat list of error details. Canonical going forward.", + ) + + @model_validator(mode="before") + @classmethod + def _normalize_legacy_error(cls, data: Any) -> Any: + if not isinstance(data, dict): + return data + if data.get("errors"): + return data + legacy = data.get("error") + if not isinstance(legacy, dict): + return data + if legacy.get("name") == "MultiAgentError" and isinstance(legacy.get("collection"), list): + normalized = list(legacy["collection"]) + else: + normalized = [{k: v for k, v in legacy.items() if k != "collection"}] + return {**data, "errors": normalized} diff --git a/tests/unit/common/dto/manager/v2/status_data/BUILD b/tests/unit/common/dto/manager/v2/status_data/BUILD new file mode 100644 index 00000000000..57341b1358b --- /dev/null +++ b/tests/unit/common/dto/manager/v2/status_data/BUILD @@ -0,0 +1,3 @@ +python_tests( + name="tests", +) diff --git a/tests/unit/common/dto/manager/v2/status_data/__init__.py b/tests/unit/common/dto/manager/v2/status_data/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/unit/common/dto/manager/v2/status_data/test_types.py b/tests/unit/common/dto/manager/v2/status_data/test_types.py new file mode 100644 index 00000000000..ffa32a6a25f --- /dev/null +++ b/tests/unit/common/dto/manager/v2/status_data/test_types.py @@ -0,0 +1,144 @@ +"""Tests for ai.backend.common.dto.manager.v2.status_data.types.""" + +from __future__ import annotations + +from ai.backend.common.dto.manager.v2.status_data import ( + ErrorDetailInfo, + KernelStatusBranch, + KernelStatusData, + SchedulerStatusBranch, + SchedulingPredicateInfo, + SessionStatusBranch, +) + + +class TestKernelStatusBranch: + def test_parse_exit_code(self) -> None: + branch = KernelStatusBranch.model_validate({"exit_code": 0}) + assert branch.exit_code == 0 + + def test_default_exit_code_is_none(self) -> None: + assert KernelStatusBranch().exit_code is None + + +class TestSessionStatusBranch: + def test_parse_status(self) -> None: + branch = SessionStatusBranch.model_validate({"status": "RUNNING"}) + assert branch.status == "RUNNING" + + +class TestSchedulerStatusBranch: + def test_full_payload_round_trip(self) -> None: + payload = { + "msg": "no available agent", + "retries": 3, + "last_try": "2026-04-27T12:00:00Z", + "passed_predicates": [{"name": "reserved_time"}], + "failed_predicates": [{"name": "concurrency", "msg": "limit exceeded"}], + } + branch = SchedulerStatusBranch.model_validate(payload) + assert branch.retries == 3 + assert branch.passed_predicates[0] == SchedulingPredicateInfo(name="reserved_time") + assert branch.failed_predicates[0].msg == "limit exceeded" + + def test_empty_predicates_default_to_lists(self) -> None: + branch = SchedulerStatusBranch() + assert branch.passed_predicates == [] + assert branch.failed_predicates == [] + + +class TestKernelStatusDataLegacyErrorParsing: + """The model must tolerantly parse both legacy error shapes (#679).""" + + def test_legacy_single_error_is_normalized_to_list(self) -> None: + legacy = { + "error": { + "src": "agent", + "name": "ContainerCreationError", + "repr": "ContainerCreationError('oom')", + "agent_id": "agent-001", + } + } + data = KernelStatusData.model_validate(legacy) + assert data.errors == [ + ErrorDetailInfo( + src="agent", + name="ContainerCreationError", + repr="ContainerCreationError('oom')", + agent_id="agent-001", + ) + ] + + def test_legacy_multi_agent_error_collection_is_flattened(self) -> None: + legacy = { + "error": { + "src": "agent", + "name": "MultiAgentError", + "repr": "MultiAgentError(2)", + "collection": [ + {"src": "agent", "name": "E1", "repr": "E1()", "agent_id": "a1"}, + {"src": "agent", "name": "E2", "repr": "E2()", "agent_id": "a2"}, + ], + } + } + data = KernelStatusData.model_validate(legacy) + assert [e.name for e in data.errors] == ["E1", "E2"] + assert [e.agent_id for e in data.errors] == ["a1", "a2"] + + def test_new_errors_field_is_preserved_as_is(self) -> None: + new_shape = { + "errors": [ + {"src": "agent", "name": "X", "repr": "X()"}, + ] + } + data = KernelStatusData.model_validate(new_shape) + assert len(data.errors) == 1 + assert data.errors[0].name == "X" + + def test_new_errors_takes_precedence_over_legacy_error(self) -> None: + mixed = { + "errors": [{"src": "other", "name": "New", "repr": "New()"}], + "error": {"src": "agent", "name": "Old", "repr": "Old()"}, + } + data = KernelStatusData.model_validate(mixed) + assert [e.name for e in data.errors] == ["New"] + + def test_no_error_branch_yields_empty_list(self) -> None: + data = KernelStatusData.model_validate({}) + assert data.errors == [] + + +class TestKernelStatusDataAllBranches: + def test_full_envelope_parses_all_branches(self) -> None: + payload = { + "kernel": {"exit_code": 137}, + "session": {"status": "TERMINATED"}, + "scheduler": { + "msg": "ok", + "retries": 1, + "last_try": "2026-04-27T12:00:00Z", + "passed_predicates": [], + "failed_predicates": [], + }, + "error": {"src": "agent", "name": "OOM", "repr": "OOM()"}, + } + data = KernelStatusData.model_validate(payload) + assert data.kernel == KernelStatusBranch(exit_code=137) + assert data.session == SessionStatusBranch(status="TERMINATED") + assert data.scheduler is not None + assert data.scheduler.retries == 1 + assert len(data.errors) == 1 + assert data.errors[0].name == "OOM" + + def test_serialization_emits_canonical_errors_only(self) -> None: + data = KernelStatusData(errors=[ErrorDetailInfo(src="agent", name="X", repr="X()")]) + dumped = data.model_dump(exclude_none=True) + assert "error" not in dumped + assert dumped["errors"] == [{"src": "agent", "name": "X", "repr": "X()"}] + + def test_unknown_top_level_keys_are_ignored(self) -> None: + # DB rows may carry extra keys we have not modeled yet; do not crash. + payload = {"kernel": {"exit_code": 0}, "future_field": {"x": 1}} + data = KernelStatusData.model_validate(payload) + assert data.kernel is not None + assert data.kernel.exit_code == 0 From ccdc14405f13f834c33a05b428e83a8bfb59f906 Mon Sep 17 00:00:00 2001 From: Jeongseok Kang Date: Mon, 27 Apr 2026 14:00:20 +0900 Subject: [PATCH 2/2] docs: add news fragment for #11338 Co-Authored-By: Claude Opus 4.7 (1M context) --- changes/11338.feature.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/11338.feature.md diff --git a/changes/11338.feature.md b/changes/11338.feature.md new file mode 100644 index 00000000000..c4bb18453ae --- /dev/null +++ b/changes/11338.feature.md @@ -0,0 +1 @@ +Add `KernelStatusData` Pydantic model in `common.dto.manager.v2.status_data` providing a unified, typed envelope for kernel/session status payloads with tolerant parsing of legacy single-error and `MultiAgentError` `collection` shapes.