Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/11522.enhance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Project ``EndpointRow`` directly to ``ModelDeploymentData`` for the API path so deployment responses no longer pass through a fragile ``DeploymentInfo`` conversion, and split no-scope deployment reads into a dedicated admin repository/service/processor.
15 changes: 15 additions & 0 deletions src/ai/backend/common/data/model_deployment/types.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Any, Self

from ai.backend.common.data.endpoint.types import EndpointLifecycle
from ai.backend.common.types import CIStrEnum


Expand Down Expand Up @@ -48,6 +49,20 @@ def _missing_(cls, value: Any) -> Self | None:
return cls(alias)
return super()._missing_(value)

@classmethod
def from_lifecycle(cls, lifecycle: EndpointLifecycle) -> "ModelDeploymentStatus":
match lifecycle:
case EndpointLifecycle.PENDING | EndpointLifecycle.CREATED:
return cls.PENDING
case EndpointLifecycle.READY | EndpointLifecycle.SCALING:
return cls.READY
case EndpointLifecycle.DEPLOYING:
return cls.DEPLOYING
case EndpointLifecycle.DESTROYING:
return cls.STOPPING
case EndpointLifecycle.DESTROYED:
return cls.STOPPED


class DeploymentStrategy(CIStrEnum):
ROLLING = "ROLLING"
Expand Down
1 change: 1 addition & 0 deletions src/ai/backend/common/metrics/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ class LayerType(enum.StrEnum):
AUDIT_LOG_REPOSITORY = "audit_log_repository"
CONTAINER_REGISTRY_REPOSITORY = "container_registry_repository"
DEPLOYMENT_REPOSITORY = "deployment_repository"
DEPLOYMENT_ADMIN_REPOSITORY = "deployment_admin_repository"
DOMAIN_REPOSITORY = "domain_repository"
DOTFILE_REPOSITORY = "dotfile_repository"
ETCD_CONFIG_REPOSITORY = "etcd_config_repository"
Expand Down
63 changes: 40 additions & 23 deletions src/ai/backend/manager/api/adapters/deployment/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
from typing import TYPE_CHECKING
from uuid import UUID

import sqlalchemy as sa

if TYPE_CHECKING:
from ai.backend.manager.services.processors import Processors
from ai.backend.manager.sokovan.deployment.coordinator import DeploymentCoordinator
Expand Down Expand Up @@ -228,6 +226,10 @@
combine_conditions_or,
negate_conditions,
)
from ai.backend.manager.repositories.deployment.types import (
ProjectDeploymentSearchScope,
UserDeploymentSearchScope,
)
from ai.backend.manager.repositories.deployment.updaters import (
DeploymentMetadataUpdaterSpec,
DeploymentNetworkSpecUpdaterSpec,
Expand All @@ -249,6 +251,9 @@
from ai.backend.manager.services.deployment.actions.access_token.search_access_tokens import (
SearchAccessTokensAction,
)
from ai.backend.manager.services.deployment.actions.admin_search_deployments import (
AdminSearchDeploymentsAction,
)
from ai.backend.manager.services.deployment.actions.auto_scaling_rule.bulk_delete_auto_scaling_rules import (
BulkDeleteAutoScalingRulesAction,
)
Expand Down Expand Up @@ -311,10 +316,16 @@
from ai.backend.manager.services.deployment.actions.route.update_route_traffic_status import (
UpdateRouteTrafficStatusAction,
)
from ai.backend.manager.services.deployment.actions.search_deployments import (
SearchDeploymentsAction,
from ai.backend.manager.services.deployment.actions.search_legacy_deployments import (
SearchLegacyDeploymentsAction,
)
from ai.backend.manager.services.deployment.actions.search_project_deployments import (
SearchProjectDeploymentsAction,
)
from ai.backend.manager.services.deployment.actions.search_replicas import SearchReplicasAction
from ai.backend.manager.services.deployment.actions.search_user_deployments import (
SearchUserDeploymentsAction,
)
from ai.backend.manager.services.deployment.actions.sync_replicas import SyncReplicaAction
from ai.backend.manager.services.deployment.actions.update_deployment import UpdateDeploymentAction
from ai.backend.manager.types import OptionalState, TriState
Expand Down Expand Up @@ -579,8 +590,10 @@ async def admin_search(
) -> AdminSearchDeploymentsPayload:
"""Search deployments (admin, no scope)."""
querier = self._build_deployment_querier(input)
action_result = await self._processors.deployment.search_deployments.wait_for_complete(
SearchDeploymentsAction(querier=querier)
action_result = (
await self._processors.deployment_admin.admin_search_deployments.wait_for_complete(
AdminSearchDeploymentsAction(querier=querier)
)
)
return AdminSearchDeploymentsPayload(
items=[self._deployment_data_to_dto(item) for item in action_result.data],
Expand All @@ -597,16 +610,13 @@ async def my_search(
user = current_user()
if user is None:
raise RuntimeError("No authenticated user in context")
scope = UserDeploymentSearchScope(user_id=user.user_id)
conditions: list[QueryCondition] = []
if input.filter:
conditions.extend(self._convert_deployment_filter(input.filter))
orders: list[QueryOrder] = (
self._convert_deployment_orders(input.order) if input.order else []
)

def _by_created_user() -> sa.sql.expression.ColumnElement[bool]:
return EndpointRow.created_user == user.user_id

querier = self._build_querier(
conditions=conditions,
orders=orders,
Expand All @@ -617,10 +627,9 @@ def _by_created_user() -> sa.sql.expression.ColumnElement[bool]:
before=input.before,
limit=input.limit,
offset=input.offset,
base_conditions=[_by_created_user],
)
action_result = await self._processors.deployment.search_deployments.wait_for_complete(
SearchDeploymentsAction(querier=querier)
action_result = await self._processors.deployment.search_user_deployments.wait_for_complete(
SearchUserDeploymentsAction(scope=scope, querier=querier)
)
return AdminSearchDeploymentsPayload(
items=[self._deployment_data_to_dto(item) for item in action_result.data],
Expand All @@ -635,16 +644,13 @@ async def project_search(
input: AdminSearchDeploymentsInput,
) -> AdminSearchDeploymentsPayload:
"""Search deployments within a specific project."""
scope = ProjectDeploymentSearchScope(project_id=project_id)
conditions: list[QueryCondition] = []
if input.filter:
conditions.extend(self._convert_deployment_filter(input.filter))
orders: list[QueryOrder] = (
self._convert_deployment_orders(input.order) if input.order else []
)

def _by_project_id() -> sa.sql.expression.ColumnElement[bool]:
return EndpointRow.project == project_id

querier = self._build_querier(
conditions=conditions,
orders=orders,
Expand All @@ -655,10 +661,11 @@ def _by_project_id() -> sa.sql.expression.ColumnElement[bool]:
before=input.before,
limit=input.limit,
offset=input.offset,
base_conditions=[_by_project_id],
)
action_result = await self._processors.deployment.search_deployments.wait_for_complete(
SearchDeploymentsAction(querier=querier)
action_result = (
await self._processors.deployment.search_project_deployments.wait_for_complete(
SearchProjectDeploymentsAction(scope=scope, querier=querier)
)
)
return AdminSearchDeploymentsPayload(
items=[self._deployment_data_to_dto(item) for item in action_result.data],
Expand Down Expand Up @@ -1285,16 +1292,26 @@ async def batch_load_by_ids(
) -> list[DeploymentNode | None]:
"""Batch load deployments by ID for DataLoader use.

Returns DeploymentNode DTOs in the same order as the input deployment_ids list.
Routed through the regular ``search_legacy_deployments`` processor
on purpose: the admin processor is reserved for admin-authorised
callers, but a DataLoader fires under whatever user triggered the
parent GraphQL query (admin or not). The ``by_ids`` filter is the
bound on the result set; the parent resolver has already authorised
access to whatever entity references these IDs.

Output is aligned with the input ``deployment_ids`` order; missing
IDs come back as ``None``.
"""
if not deployment_ids:
return []
querier = BatchQuerier(
pagination=OffsetPagination(limit=len(deployment_ids)),
conditions=[DeploymentConditions.by_ids(deployment_ids)],
)
action_result = await self._processors.deployment.search_deployments.wait_for_complete(
SearchDeploymentsAction(querier=querier)
action_result = (
await self._processors.deployment.search_legacy_deployments.wait_for_complete(
SearchLegacyDeploymentsAction(querier=querier)
)
)
deployment_map = {
data.id: self._deployment_data_to_dto(data) for data in action_result.data
Expand Down
8 changes: 4 additions & 4 deletions src/ai/backend/manager/api/rest/deployment/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@
SearchRoutesAction,
UpdateRouteTrafficStatusAction,
)
from ai.backend.manager.services.deployment.actions.search_deployments import (
SearchDeploymentsAction,
from ai.backend.manager.services.deployment.actions.search_legacy_deployments import (
SearchLegacyDeploymentsAction,
)
from ai.backend.manager.services.deployment.actions.update_deployment import (
UpdateDeploymentAction,
Expand Down Expand Up @@ -201,8 +201,8 @@ async def search_deployments(
querier = self._deployment_adapter.build_querier(body.parsed)

# Call service action
action_result = await self._deployment.search_deployments.wait_for_complete(
SearchDeploymentsAction(querier=querier)
action_result = await self._deployment.search_legacy_deployments.wait_for_complete(
SearchLegacyDeploymentsAction(querier=querier)
)

# Build response
Expand Down
14 changes: 14 additions & 0 deletions src/ai/backend/manager/data/deployment/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1203,6 +1203,20 @@ class DeploymentInfoSearchResult:
has_previous_page: bool


@dataclass
class ModelDeploymentDataSearchResult:
"""Search result with pagination for the API-shaped ``ModelDeploymentData``.

Returned by repository methods that project ``EndpointRow`` straight to
``ModelDeploymentData`` without going through ``DeploymentInfo``.
"""

items: list[ModelDeploymentData]
total_count: int
has_next_page: bool
has_previous_page: bool


@dataclass
class AutoScalingRuleSearchResult:
"""Search result with pagination for auto-scaling rules."""
Expand Down
1 change: 1 addition & 0 deletions src/ai/backend/manager/models/endpoint/row.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
from ai.backend.manager.models.routing import RoutingRow
from ai.backend.manager.models.user import UserRow


__all__ = (
"EndpointAutoScalingRuleRow",
"EndpointLifecycle",
Expand Down
2 changes: 2 additions & 0 deletions src/ai/backend/manager/repositories/deployment/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
from ai.backend.manager.models.endpoint.conditions import DeploymentConditions
from ai.backend.manager.models.routing.conditions import RouteConditions

from .admin_repository import DeploymentAdminRepository
from .repository import DeploymentRepository

__all__ = [
"DeploymentAdminRepository",
"DeploymentRepository",
"DeploymentConditions",
"RouteConditions",
Expand Down
78 changes: 78 additions & 0 deletions src/ai/backend/manager/repositories/deployment/admin_repository.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""Admin-only repository for deployment queries that span every scope.

Holds reads that are not bounded by a project / user / domain scope. The
regular ``DeploymentRepository`` keeps the scoped variants; admin-style
"see everything" queries live here so the layering is explicit and the
GraphQL/REST ``admin_*`` paths have a clearly-named home.
"""

from __future__ import annotations

from typing import TYPE_CHECKING

from ai.backend.common.exception import BackendAIError
from ai.backend.common.metrics.metric import DomainType, LayerType
from ai.backend.common.resilience.policies.metrics import MetricArgs, MetricPolicy
from ai.backend.common.resilience.policies.retry import BackoffStrategy, RetryArgs, RetryPolicy
from ai.backend.common.resilience.resilience import Resilience
from ai.backend.manager.data.deployment.types import ModelDeploymentDataSearchResult
from ai.backend.manager.repositories.base import BatchQuerier
from ai.backend.manager.repositories.deployment.db_source.db_source import DeploymentDBSource

if TYPE_CHECKING:
from ai.backend.manager.models.utils import ExtendedAsyncSAEngine

__all__ = ("DeploymentAdminRepository",)


deployment_admin_repository_resilience = Resilience(
policies=[
MetricPolicy(
MetricArgs(
domain=DomainType.REPOSITORY,
layer=LayerType.DEPLOYMENT_ADMIN_REPOSITORY,
)
),
RetryPolicy(
RetryArgs(
max_retries=3,
retry_delay=0.1,
backoff_strategy=BackoffStrategy.FIXED,
non_retryable_exceptions=(BackendAIError,),
)
),
]
)


class DeploymentAdminRepository:
"""Admin (no-scope) reads against the ``endpoints`` table.

Holds its own ``DeploymentDBSource`` bound to the shared engine — the
DBSource is a stateless query namespace, so every repository in the
deployment package instantiates its own. The split exists so the
calling layer makes its admin intent explicit; SQL-level helpers
(e.g. ``_endpoint_row_to_deployment_data``) stay in ``db_source/``
and are reused via that single module.
"""

_db_source: DeploymentDBSource

def __init__(
self,
db: ExtendedAsyncSAEngine,
) -> None:
self._db_source = DeploymentDBSource(db)

@deployment_admin_repository_resilience.apply()
async def search_deployments(
self,
querier: BatchQuerier,
) -> ModelDeploymentDataSearchResult:
"""Search every deployment without a scope filter.

Callers may still pass arbitrary ``conditions`` through the
querier — the absence of a scope filter on the repository method
itself is what makes this admin-only.
"""
return await self._db_source.search_deployments(querier)
Loading
Loading