diff --git a/changes/11522.enhance.md b/changes/11522.enhance.md new file mode 100644 index 00000000000..21dc0d7e279 --- /dev/null +++ b/changes/11522.enhance.md @@ -0,0 +1 @@ +Project ``EndpointRow`` directly to ``ModelDeploymentData`` for the API path so deployment responses no longer pass through a fragile ``DeploymentInfo`` conversion, and split no-scope deployment reads into a dedicated admin repository/service/processor. diff --git a/src/ai/backend/common/data/model_deployment/types.py b/src/ai/backend/common/data/model_deployment/types.py index 11f5b1525b9..b54310b7d13 100644 --- a/src/ai/backend/common/data/model_deployment/types.py +++ b/src/ai/backend/common/data/model_deployment/types.py @@ -1,5 +1,6 @@ from typing import Any, Self +from ai.backend.common.data.endpoint.types import EndpointLifecycle from ai.backend.common.types import CIStrEnum @@ -48,6 +49,20 @@ def _missing_(cls, value: Any) -> Self | None: return cls(alias) return super()._missing_(value) + @classmethod + def from_lifecycle(cls, lifecycle: EndpointLifecycle) -> "ModelDeploymentStatus": + match lifecycle: + case EndpointLifecycle.PENDING | EndpointLifecycle.CREATED: + return cls.PENDING + case EndpointLifecycle.READY | EndpointLifecycle.SCALING: + return cls.READY + case EndpointLifecycle.DEPLOYING: + return cls.DEPLOYING + case EndpointLifecycle.DESTROYING: + return cls.STOPPING + case EndpointLifecycle.DESTROYED: + return cls.STOPPED + class DeploymentStrategy(CIStrEnum): ROLLING = "ROLLING" diff --git a/src/ai/backend/common/metrics/metric.py b/src/ai/backend/common/metrics/metric.py index 68062786edf..1779e6cd586 100644 --- a/src/ai/backend/common/metrics/metric.py +++ b/src/ai/backend/common/metrics/metric.py @@ -420,6 +420,7 @@ class LayerType(enum.StrEnum): AUDIT_LOG_REPOSITORY = "audit_log_repository" CONTAINER_REGISTRY_REPOSITORY = "container_registry_repository" DEPLOYMENT_REPOSITORY = "deployment_repository" + DEPLOYMENT_ADMIN_REPOSITORY = "deployment_admin_repository" DOMAIN_REPOSITORY = "domain_repository" DOTFILE_REPOSITORY = "dotfile_repository" ETCD_CONFIG_REPOSITORY = "etcd_config_repository" diff --git a/src/ai/backend/manager/api/adapters/deployment/adapter.py b/src/ai/backend/manager/api/adapters/deployment/adapter.py index efec8fbaeee..5e744b7dda3 100644 --- a/src/ai/backend/manager/api/adapters/deployment/adapter.py +++ b/src/ai/backend/manager/api/adapters/deployment/adapter.py @@ -9,8 +9,6 @@ from typing import TYPE_CHECKING from uuid import UUID -import sqlalchemy as sa - if TYPE_CHECKING: from ai.backend.manager.services.processors import Processors from ai.backend.manager.sokovan.deployment.coordinator import DeploymentCoordinator @@ -228,6 +226,10 @@ combine_conditions_or, negate_conditions, ) +from ai.backend.manager.repositories.deployment.types import ( + ProjectDeploymentSearchScope, + UserDeploymentSearchScope, +) from ai.backend.manager.repositories.deployment.updaters import ( DeploymentMetadataUpdaterSpec, DeploymentNetworkSpecUpdaterSpec, @@ -249,6 +251,9 @@ from ai.backend.manager.services.deployment.actions.access_token.search_access_tokens import ( SearchAccessTokensAction, ) +from ai.backend.manager.services.deployment.actions.admin_search_deployments import ( + AdminSearchDeploymentsAction, +) from ai.backend.manager.services.deployment.actions.auto_scaling_rule.bulk_delete_auto_scaling_rules import ( BulkDeleteAutoScalingRulesAction, ) @@ -311,10 +316,16 @@ from ai.backend.manager.services.deployment.actions.route.update_route_traffic_status import ( UpdateRouteTrafficStatusAction, ) -from ai.backend.manager.services.deployment.actions.search_deployments import ( - SearchDeploymentsAction, +from ai.backend.manager.services.deployment.actions.search_legacy_deployments import ( + SearchLegacyDeploymentsAction, +) +from ai.backend.manager.services.deployment.actions.search_project_deployments import ( + SearchProjectDeploymentsAction, ) from ai.backend.manager.services.deployment.actions.search_replicas import SearchReplicasAction +from ai.backend.manager.services.deployment.actions.search_user_deployments import ( + SearchUserDeploymentsAction, +) from ai.backend.manager.services.deployment.actions.sync_replicas import SyncReplicaAction from ai.backend.manager.services.deployment.actions.update_deployment import UpdateDeploymentAction from ai.backend.manager.types import OptionalState, TriState @@ -579,8 +590,10 @@ async def admin_search( ) -> AdminSearchDeploymentsPayload: """Search deployments (admin, no scope).""" querier = self._build_deployment_querier(input) - action_result = await self._processors.deployment.search_deployments.wait_for_complete( - SearchDeploymentsAction(querier=querier) + action_result = ( + await self._processors.deployment_admin.admin_search_deployments.wait_for_complete( + AdminSearchDeploymentsAction(querier=querier) + ) ) return AdminSearchDeploymentsPayload( items=[self._deployment_data_to_dto(item) for item in action_result.data], @@ -597,16 +610,13 @@ async def my_search( user = current_user() if user is None: raise RuntimeError("No authenticated user in context") + scope = UserDeploymentSearchScope(user_id=user.user_id) conditions: list[QueryCondition] = [] if input.filter: conditions.extend(self._convert_deployment_filter(input.filter)) orders: list[QueryOrder] = ( self._convert_deployment_orders(input.order) if input.order else [] ) - - def _by_created_user() -> sa.sql.expression.ColumnElement[bool]: - return EndpointRow.created_user == user.user_id - querier = self._build_querier( conditions=conditions, orders=orders, @@ -617,10 +627,9 @@ def _by_created_user() -> sa.sql.expression.ColumnElement[bool]: before=input.before, limit=input.limit, offset=input.offset, - base_conditions=[_by_created_user], ) - action_result = await self._processors.deployment.search_deployments.wait_for_complete( - SearchDeploymentsAction(querier=querier) + action_result = await self._processors.deployment.search_user_deployments.wait_for_complete( + SearchUserDeploymentsAction(scope=scope, querier=querier) ) return AdminSearchDeploymentsPayload( items=[self._deployment_data_to_dto(item) for item in action_result.data], @@ -635,16 +644,13 @@ async def project_search( input: AdminSearchDeploymentsInput, ) -> AdminSearchDeploymentsPayload: """Search deployments within a specific project.""" + scope = ProjectDeploymentSearchScope(project_id=project_id) conditions: list[QueryCondition] = [] if input.filter: conditions.extend(self._convert_deployment_filter(input.filter)) orders: list[QueryOrder] = ( self._convert_deployment_orders(input.order) if input.order else [] ) - - def _by_project_id() -> sa.sql.expression.ColumnElement[bool]: - return EndpointRow.project == project_id - querier = self._build_querier( conditions=conditions, orders=orders, @@ -655,10 +661,11 @@ def _by_project_id() -> sa.sql.expression.ColumnElement[bool]: before=input.before, limit=input.limit, offset=input.offset, - base_conditions=[_by_project_id], ) - action_result = await self._processors.deployment.search_deployments.wait_for_complete( - SearchDeploymentsAction(querier=querier) + action_result = ( + await self._processors.deployment.search_project_deployments.wait_for_complete( + SearchProjectDeploymentsAction(scope=scope, querier=querier) + ) ) return AdminSearchDeploymentsPayload( items=[self._deployment_data_to_dto(item) for item in action_result.data], @@ -1285,7 +1292,15 @@ async def batch_load_by_ids( ) -> list[DeploymentNode | None]: """Batch load deployments by ID for DataLoader use. - Returns DeploymentNode DTOs in the same order as the input deployment_ids list. + Routed through the regular ``search_legacy_deployments`` processor + on purpose: the admin processor is reserved for admin-authorised + callers, but a DataLoader fires under whatever user triggered the + parent GraphQL query (admin or not). The ``by_ids`` filter is the + bound on the result set; the parent resolver has already authorised + access to whatever entity references these IDs. + + Output is aligned with the input ``deployment_ids`` order; missing + IDs come back as ``None``. """ if not deployment_ids: return [] @@ -1293,8 +1308,10 @@ async def batch_load_by_ids( pagination=OffsetPagination(limit=len(deployment_ids)), conditions=[DeploymentConditions.by_ids(deployment_ids)], ) - action_result = await self._processors.deployment.search_deployments.wait_for_complete( - SearchDeploymentsAction(querier=querier) + action_result = ( + await self._processors.deployment.search_legacy_deployments.wait_for_complete( + SearchLegacyDeploymentsAction(querier=querier) + ) ) deployment_map = { data.id: self._deployment_data_to_dto(data) for data in action_result.data diff --git a/src/ai/backend/manager/api/rest/deployment/handler.py b/src/ai/backend/manager/api/rest/deployment/handler.py index 6a9b84040e2..b6b8aa76482 100644 --- a/src/ai/backend/manager/api/rest/deployment/handler.py +++ b/src/ai/backend/manager/api/rest/deployment/handler.py @@ -89,8 +89,8 @@ SearchRoutesAction, UpdateRouteTrafficStatusAction, ) -from ai.backend.manager.services.deployment.actions.search_deployments import ( - SearchDeploymentsAction, +from ai.backend.manager.services.deployment.actions.search_legacy_deployments import ( + SearchLegacyDeploymentsAction, ) from ai.backend.manager.services.deployment.actions.update_deployment import ( UpdateDeploymentAction, @@ -201,8 +201,8 @@ async def search_deployments( querier = self._deployment_adapter.build_querier(body.parsed) # Call service action - action_result = await self._deployment.search_deployments.wait_for_complete( - SearchDeploymentsAction(querier=querier) + action_result = await self._deployment.search_legacy_deployments.wait_for_complete( + SearchLegacyDeploymentsAction(querier=querier) ) # Build response diff --git a/src/ai/backend/manager/data/deployment/types.py b/src/ai/backend/manager/data/deployment/types.py index 6725661f57b..477a1ef8c01 100644 --- a/src/ai/backend/manager/data/deployment/types.py +++ b/src/ai/backend/manager/data/deployment/types.py @@ -1203,6 +1203,20 @@ class DeploymentInfoSearchResult: has_previous_page: bool +@dataclass +class ModelDeploymentDataSearchResult: + """Search result with pagination for the API-shaped ``ModelDeploymentData``. + + Returned by repository methods that project ``EndpointRow`` straight to + ``ModelDeploymentData`` without going through ``DeploymentInfo``. + """ + + items: list[ModelDeploymentData] + total_count: int + has_next_page: bool + has_previous_page: bool + + @dataclass class AutoScalingRuleSearchResult: """Search result with pagination for auto-scaling rules.""" diff --git a/src/ai/backend/manager/models/endpoint/row.py b/src/ai/backend/manager/models/endpoint/row.py index f4840bff368..3cf255c8e4d 100644 --- a/src/ai/backend/manager/models/endpoint/row.py +++ b/src/ai/backend/manager/models/endpoint/row.py @@ -108,6 +108,7 @@ from ai.backend.manager.models.routing import RoutingRow from ai.backend.manager.models.user import UserRow + __all__ = ( "EndpointAutoScalingRuleRow", "EndpointLifecycle", diff --git a/src/ai/backend/manager/repositories/deployment/__init__.py b/src/ai/backend/manager/repositories/deployment/__init__.py index aba78cccffc..bc7240657d5 100644 --- a/src/ai/backend/manager/repositories/deployment/__init__.py +++ b/src/ai/backend/manager/repositories/deployment/__init__.py @@ -3,9 +3,11 @@ from ai.backend.manager.models.endpoint.conditions import DeploymentConditions from ai.backend.manager.models.routing.conditions import RouteConditions +from .admin_repository import DeploymentAdminRepository from .repository import DeploymentRepository __all__ = [ + "DeploymentAdminRepository", "DeploymentRepository", "DeploymentConditions", "RouteConditions", diff --git a/src/ai/backend/manager/repositories/deployment/admin_repository.py b/src/ai/backend/manager/repositories/deployment/admin_repository.py new file mode 100644 index 00000000000..5b93d8f9920 --- /dev/null +++ b/src/ai/backend/manager/repositories/deployment/admin_repository.py @@ -0,0 +1,78 @@ +"""Admin-only repository for deployment queries that span every scope. + +Holds reads that are not bounded by a project / user / domain scope. The +regular ``DeploymentRepository`` keeps the scoped variants; admin-style +"see everything" queries live here so the layering is explicit and the +GraphQL/REST ``admin_*`` paths have a clearly-named home. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from ai.backend.common.exception import BackendAIError +from ai.backend.common.metrics.metric import DomainType, LayerType +from ai.backend.common.resilience.policies.metrics import MetricArgs, MetricPolicy +from ai.backend.common.resilience.policies.retry import BackoffStrategy, RetryArgs, RetryPolicy +from ai.backend.common.resilience.resilience import Resilience +from ai.backend.manager.data.deployment.types import ModelDeploymentDataSearchResult +from ai.backend.manager.repositories.base import BatchQuerier +from ai.backend.manager.repositories.deployment.db_source.db_source import DeploymentDBSource + +if TYPE_CHECKING: + from ai.backend.manager.models.utils import ExtendedAsyncSAEngine + +__all__ = ("DeploymentAdminRepository",) + + +deployment_admin_repository_resilience = Resilience( + policies=[ + MetricPolicy( + MetricArgs( + domain=DomainType.REPOSITORY, + layer=LayerType.DEPLOYMENT_ADMIN_REPOSITORY, + ) + ), + RetryPolicy( + RetryArgs( + max_retries=3, + retry_delay=0.1, + backoff_strategy=BackoffStrategy.FIXED, + non_retryable_exceptions=(BackendAIError,), + ) + ), + ] +) + + +class DeploymentAdminRepository: + """Admin (no-scope) reads against the ``endpoints`` table. + + Holds its own ``DeploymentDBSource`` bound to the shared engine — the + DBSource is a stateless query namespace, so every repository in the + deployment package instantiates its own. The split exists so the + calling layer makes its admin intent explicit; SQL-level helpers + (e.g. ``_endpoint_row_to_deployment_data``) stay in ``db_source/`` + and are reused via that single module. + """ + + _db_source: DeploymentDBSource + + def __init__( + self, + db: ExtendedAsyncSAEngine, + ) -> None: + self._db_source = DeploymentDBSource(db) + + @deployment_admin_repository_resilience.apply() + async def search_deployments( + self, + querier: BatchQuerier, + ) -> ModelDeploymentDataSearchResult: + """Search every deployment without a scope filter. + + Callers may still pass arbitrary ``conditions`` through the + querier — the absence of a scope filter on the repository method + itself is what makes this admin-only. + """ + return await self._db_source.search_deployments(querier) diff --git a/src/ai/backend/manager/repositories/deployment/db_source/db_source.py b/src/ai/backend/manager/repositories/deployment/db_source/db_source.py index af2c9bdb29f..560352dafec 100644 --- a/src/ai/backend/manager/repositories/deployment/db_source/db_source.py +++ b/src/ai/backend/manager/repositories/deployment/db_source/db_source.py @@ -20,6 +20,10 @@ from ai.backend.common.config import ModelHealthCheck from ai.backend.common.data.endpoint.types import EndpointLifecycle +from ai.backend.common.data.model_deployment.types import ( + DeploymentStrategy, + ModelDeploymentStatus, +) from ai.backend.common.data.permission.types import RBACElementType from ai.backend.common.dto.manager.v2.runtime_variant_preset.types import ( PresetTarget, @@ -59,6 +63,7 @@ DeploymentInfoWithAutoScalingRules, DeploymentLastHistory, DeploymentLifecycleSubStep, + DeploymentNetworkData, DeploymentOptions, DeploymentPolicyData, DeploymentPolicySearchResult, @@ -69,7 +74,11 @@ LegacyRevisionCreateReadBundle, ModelDeploymentAccessTokenData, ModelDeploymentAutoScalingRuleData, + ModelDeploymentData, + ModelDeploymentDataSearchResult, + ModelDeploymentMetadataInfo, ModelRevisionData, + ReplicaStateData, RevisionSearchResult, RouteHealthStatus, RouteInfo, @@ -137,7 +146,6 @@ RouteHistoryRow, ) from ai.backend.manager.models.session import SessionRow -from ai.backend.manager.models.storage import StorageSessionManager from ai.backend.manager.models.user import UserRow from ai.backend.manager.models.utils import ExtendedAsyncSAEngine from ai.backend.manager.models.vfolder import VFolderRow @@ -177,6 +185,7 @@ ProjectDeploymentSearchScope, RouteData, RouteServiceDiscoveryInfo, + UserDeploymentSearchScope, ) from ai.backend.manager.repositories.scheduler.types.session_creation import ( ContainerUserContext, @@ -202,6 +211,78 @@ class EndpointWithRoutesRawData: log = BraceStyleAdapter(logging.getLogger(__name__)) +def _endpoint_row_to_deployment_data(row: EndpointRow) -> ModelDeploymentData: + """Project an ``EndpointRow`` straight to the API-shaped ``ModelDeploymentData``. + + Lives at the repository boundary because the conversion is purely an + adapter between the ORM row and the API data type — neither the model + layer (which must stay free of API-shaped projections) nor the service + layer (which must not touch ORM rows) is the right home for it. + + Eager-load requirements: ``EndpointRow.revisions`` (with + ``DeploymentRevisionRow.image_row``) and ``EndpointRow.deployment_policy``. + + ``current_revision_id`` and ``deploying_revision_id`` are read from the + row columns directly. The ``revision`` spec is resolved by explicit match + against ``current_revision``; if no matching row is loaded (dangling + reference, or caller forgot to eager-load ``revisions``), ``revision`` + is ``None`` while ``current_revision_id`` still surfaces the column + value — callers can act on the ID even when the joined spec is missing. + """ + revision: ModelRevisionData | None = None + if row.current_revision is not None: + match = next((r for r in row.revisions if r.id == row.current_revision), None) + if match is None: + log.error( + "Deployment {} has current_revision_id {} but no matching " + "DeploymentRevisionRow was found among loaded revisions; " + "the revision spec will be reported as null. This usually " + "means EndpointRow.revisions was not eagerly loaded by the " + "caller.", + row.id, + row.current_revision, + ) + else: + revision = match.to_data() + + desired_count = row.desired_replicas if row.desired_replicas is not None else row.replicas + policy_data = row.deployment_policy.to_data() if row.deployment_policy is not None else None + + return ModelDeploymentData( + id=row.id, + metadata=ModelDeploymentMetadataInfo( + name=row.name, + status=ModelDeploymentStatus.from_lifecycle(row.lifecycle_stage), + tags=[row.tag] if row.tag else [], + project_id=row.project, + domain_name=row.domain, + created_at=row.created_at, + updated_at=row.created_at, + ), + network_access=DeploymentNetworkData( + open_to_public=row.open_to_public if row.open_to_public is not None else False, + access_token_ids=None, + url=row.url, + preferred_domain_name=None, + ), + revision_history_ids=[row.current_revision] if row.current_revision else [], + revision=revision, + current_revision_id=row.current_revision, + deploying_revision_id=row.deploying_revision, + scaling_rule_ids=[], + replica_state=ReplicaStateData( + desired_replica_count=desired_count, + replica_ids=[], + ), + default_deployment_strategy=DeploymentStrategy.ROLLING, + created_user_id=row.created_user, + options=row.options, + scaling_state=row.scaling_state, + policy=policy_data, + sub_step=row.sub_step, + ) + + def _project_preset_slots( preset_row: DeploymentRevisionPresetRow | None, slot_entries: list[tuple[str, Decimal]], @@ -217,15 +298,12 @@ class DeploymentDBSource: """Database source for deployment-related operations.""" _db: ExtendedAsyncSAEngine - _storage_manager: StorageSessionManager def __init__( self, db: ExtendedAsyncSAEngine, - storage_manager: StorageSessionManager, ) -> None: self._db = db - self._storage_manager = storage_manager @actxmgr async def _begin_readonly_read_committed(self) -> AsyncIterator[SAConnection]: @@ -405,6 +483,37 @@ async def get_endpoint( return row.to_deployment_info() + async def get_deployment_data( + self, + endpoint_id: DeploymentID, + ) -> ModelDeploymentData: + """Fetch a deployment as the API-shaped ``ModelDeploymentData``. + + Bypasses the ``DeploymentInfo`` intermediate so the API path's + revision-id columns flow through unchanged from the DB row. + + Raises: + EndpointNotFound: If the endpoint does not exist. + """ + async with self._begin_readonly_session_read_committed() as db_sess: + query = ( + sa.select(EndpointRow) + .where(EndpointRow.id == endpoint_id) + .options( + selectinload(EndpointRow.revisions).selectinload( + DeploymentRevisionRow.image_row + ), + selectinload(EndpointRow.deployment_policy), + ) + ) + result = await db_sess.execute(query) + row: EndpointRow | None = result.scalar_one_or_none() + + if not row: + raise EndpointNotFound(f"Endpoint {endpoint_id} not found") + + return _endpoint_row_to_deployment_data(row) + async def get_deployments_by_ids( self, deployment_ids: set[DeploymentID], @@ -1153,6 +1262,102 @@ async def search_endpoints( has_previous_page=result.has_previous_page, ) + async def search_deployments( + self, + querier: BatchQuerier, + ) -> ModelDeploymentDataSearchResult: + """Search endpoints and project each row directly to ``ModelDeploymentData``. + + Mirrors ``search_endpoints`` but skips the ``DeploymentInfo`` step + so the API path consumes the row's authoritative projection. + """ + async with self._begin_readonly_session_read_committed() as db_sess: + query = sa.select(EndpointRow).options( + selectinload(EndpointRow.revisions).selectinload(DeploymentRevisionRow.image_row), + selectinload(EndpointRow.deployment_policy), + ) + + result = await execute_batch_querier( + db_sess, + query, + querier, + ) + + items = [_endpoint_row_to_deployment_data(row.EndpointRow) for row in result.rows] + + return ModelDeploymentDataSearchResult( + items=items, + total_count=result.total_count, + has_next_page=result.has_next_page, + has_previous_page=result.has_previous_page, + ) + + async def search_user_deployments( + self, + querier: BatchQuerier, + scope: UserDeploymentSearchScope, + ) -> ModelDeploymentDataSearchResult: + """Search deployments owned by a specific user, returning ``ModelDeploymentData``. + + Backs the v2 adapter's ``my_search`` path. Scope filter + (``EndpointRow.created_user == user_id``) is injected via + ``execute_batch_querier``'s ``scope`` argument. + """ + async with self._begin_readonly_session_read_committed() as db_sess: + query = sa.select(EndpointRow).options( + selectinload(EndpointRow.revisions).selectinload(DeploymentRevisionRow.image_row), + selectinload(EndpointRow.deployment_policy), + ) + + result = await execute_batch_querier( + db_sess, + query, + querier, + scope=scope, + ) + + items = [_endpoint_row_to_deployment_data(row.EndpointRow) for row in result.rows] + + return ModelDeploymentDataSearchResult( + items=items, + total_count=result.total_count, + has_next_page=result.has_next_page, + has_previous_page=result.has_previous_page, + ) + + async def search_project_deployments( + self, + querier: BatchQuerier, + scope: ProjectDeploymentSearchScope, + ) -> ModelDeploymentDataSearchResult: + """Search deployments in a project, returning ``ModelDeploymentData``. + + Distinct from :meth:`search_deployments_in_project`, which returns + the lighter-weight ``DeploymentSummaryData`` for project admin + list pages. Backs the v2 adapter's ``project_search`` path. + """ + async with self._begin_readonly_session_read_committed() as db_sess: + query = sa.select(EndpointRow).options( + selectinload(EndpointRow.revisions).selectinload(DeploymentRevisionRow.image_row), + selectinload(EndpointRow.deployment_policy), + ) + + result = await execute_batch_querier( + db_sess, + query, + querier, + scope=scope, + ) + + items = [_endpoint_row_to_deployment_data(row.EndpointRow) for row in result.rows] + + return ModelDeploymentDataSearchResult( + items=items, + total_count=result.total_count, + has_next_page=result.has_next_page, + has_previous_page=result.has_previous_page, + ) + async def search_deployments_in_project( self, querier: BatchQuerier, diff --git a/src/ai/backend/manager/repositories/deployment/repositories.py b/src/ai/backend/manager/repositories/deployment/repositories.py index 49ccf2b4a41..b4076730977 100644 --- a/src/ai/backend/manager/repositories/deployment/repositories.py +++ b/src/ai/backend/manager/repositories/deployment/repositories.py @@ -5,6 +5,7 @@ from ai.backend.manager.repositories.types import RepositoryArgs +from .admin_repository import DeploymentAdminRepository from .repository import DeploymentRepository @@ -13,6 +14,7 @@ class DeploymentRepositories: """Container for deployment-related repositories.""" repository: DeploymentRepository + admin_repository: DeploymentAdminRepository @classmethod def create(cls, args: RepositoryArgs) -> Self: @@ -24,4 +26,5 @@ def create(cls, args: RepositoryArgs) -> Self: args.valkey_live_client, args.valkey_schedule_client, ) - return cls(repository=repository) + admin_repository = DeploymentAdminRepository(args.db) + return cls(repository=repository, admin_repository=admin_repository) diff --git a/src/ai/backend/manager/repositories/deployment/repository.py b/src/ai/backend/manager/repositories/deployment/repository.py index 81d53f635e9..7b83ee24894 100644 --- a/src/ai/backend/manager/repositories/deployment/repository.py +++ b/src/ai/backend/manager/repositories/deployment/repository.py @@ -65,6 +65,8 @@ LegacyRevisionCreateReadBundle, ModelDeploymentAccessTokenData, ModelDeploymentAutoScalingRuleData, + ModelDeploymentData, + ModelDeploymentDataSearchResult, ModelRevisionData, RevisionSearchResult, RouteHealthStatus, @@ -103,7 +105,12 @@ from .db_source import DeploymentDBSource from .storage_source import DeploymentStorageSource -from .types import ProjectDeploymentSearchScope, RouteData, RouteServiceDiscoveryInfo +from .types import ( + ProjectDeploymentSearchScope, + RouteData, + RouteServiceDiscoveryInfo, + UserDeploymentSearchScope, +) log = BraceStyleAdapter(logging.getLogger(__name__)) @@ -158,7 +165,7 @@ def __init__( valkey_live: ValkeyLiveClient, valkey_schedule: ValkeyScheduleClient, ) -> None: - self._db_source = DeploymentDBSource(db, storage_manager) + self._db_source = DeploymentDBSource(db) self._storage_source = DeploymentStorageSource(storage_manager) self._valkey_stat = valkey_stat self._valkey_live = valkey_live @@ -343,6 +350,25 @@ async def get_endpoint_info( """ return await self._db_source.get_endpoint(endpoint_id) + @deployment_repository_resilience.apply() + async def get_deployment_data( + self, + endpoint_id: DeploymentID, + ) -> ModelDeploymentData: + """Fetch a deployment as the API-shaped ``ModelDeploymentData``. + + Bypasses ``DeploymentInfo`` so the API path's revision-id columns + flow through unchanged from the DB row. Prefer this over + ``get_endpoint_info`` + a service-side conversion when the caller + produces an API response — there is no ordering ambiguity to + resolve and dangling references surface as ``revision=None`` with + the column ID still populated. + + Raises: + EndpointNotFound: If the endpoint does not exist. + """ + return await self._db_source.get_deployment_data(endpoint_id) + @deployment_repository_resilience.apply() async def destroy_endpoint( self, @@ -1491,6 +1517,24 @@ async def search_endpoints( """ return await self._db_source.search_endpoints(querier) + @deployment_repository_resilience.apply() + async def search_user_deployments( + self, + querier: BatchQuerier, + scope: UserDeploymentSearchScope, + ) -> ModelDeploymentDataSearchResult: + """Search a user's own deployments — backs the v2 ``my_search`` path.""" + return await self._db_source.search_user_deployments(querier, scope) + + @deployment_repository_resilience.apply() + async def search_project_deployments( + self, + querier: BatchQuerier, + scope: ProjectDeploymentSearchScope, + ) -> ModelDeploymentDataSearchResult: + """Search a project's deployments returning full ``ModelDeploymentData``.""" + return await self._db_source.search_project_deployments(querier, scope) + @deployment_repository_resilience.apply() async def search_deployments_in_project( self, diff --git a/src/ai/backend/manager/repositories/deployment/types/__init__.py b/src/ai/backend/manager/repositories/deployment/types/__init__.py index 741b2a06b16..ccea70bcaf8 100644 --- a/src/ai/backend/manager/repositories/deployment/types/__init__.py +++ b/src/ai/backend/manager/repositories/deployment/types/__init__.py @@ -7,6 +7,7 @@ ProjectDeploymentSearchScope, RouteData, RouteServiceDiscoveryInfo, + UserDeploymentSearchScope, ) __all__ = [ @@ -16,4 +17,5 @@ "ProjectDeploymentSearchScope", "RouteData", "RouteServiceDiscoveryInfo", + "UserDeploymentSearchScope", ] diff --git a/src/ai/backend/manager/repositories/deployment/types/endpoint.py b/src/ai/backend/manager/repositories/deployment/types/endpoint.py index 5b29435c725..62491f15f50 100644 --- a/src/ai/backend/manager/repositories/deployment/types/endpoint.py +++ b/src/ai/backend/manager/repositories/deployment/types/endpoint.py @@ -116,3 +116,26 @@ def existence_checks(self) -> Sequence[ExistenceCheck[UUID]]: error=ProjectNotFound(str(self.project_id)), ), ] + + +@dataclass(frozen=True) +class UserDeploymentSearchScope(SearchScope): + """Required scope for searching deployments created by a given user. + + Backs the v2 adapter's ``my_search`` path; the adapter resolves the + current user and constructs the scope before invoking the action. + """ + + user_id: UUID + + def to_condition(self) -> QueryCondition: + user_id = self.user_id + + def inner() -> sa.sql.expression.ColumnElement[bool]: + return EndpointRow.created_user == user_id + + return inner + + @property + def existence_checks(self) -> Sequence[ExistenceCheck[UUID]]: + return [] diff --git a/src/ai/backend/manager/services/deployment/actions/search_deployments.py b/src/ai/backend/manager/services/deployment/actions/admin_search_deployments.py similarity index 65% rename from src/ai/backend/manager/services/deployment/actions/search_deployments.py rename to src/ai/backend/manager/services/deployment/actions/admin_search_deployments.py index 3160f1c939c..efbd0d85b5d 100644 --- a/src/ai/backend/manager/services/deployment/actions/search_deployments.py +++ b/src/ai/backend/manager/services/deployment/actions/admin_search_deployments.py @@ -8,8 +8,15 @@ from ai.backend.manager.services.deployment.actions.base import DeploymentBaseAction -@dataclass -class SearchDeploymentsAction(DeploymentBaseAction): +@dataclass(frozen=True) +class AdminSearchDeploymentsAction(DeploymentBaseAction): + """Search every deployment with no scope (superadmin path). + + Routed through ``DeploymentAdminProcessors`` so callers make the + admin intent explicit; scope-restricted variants live on the regular + ``DeploymentService`` and ``DeploymentRepository``. + """ + querier: BatchQuerier @override @@ -22,8 +29,8 @@ def operation_type(cls) -> ActionOperationType: return ActionOperationType.SEARCH -@dataclass -class SearchDeploymentsActionResult(BaseActionResult): +@dataclass(frozen=True) +class AdminSearchDeploymentsActionResult(BaseActionResult): data: list[ModelDeploymentData] total_count: int has_next_page: bool diff --git a/src/ai/backend/manager/services/deployment/actions/search_legacy_deployments.py b/src/ai/backend/manager/services/deployment/actions/search_legacy_deployments.py new file mode 100644 index 00000000000..32b4801cfa0 --- /dev/null +++ b/src/ai/backend/manager/services/deployment/actions/search_legacy_deployments.py @@ -0,0 +1,44 @@ +from dataclasses import dataclass +from typing import override + +from ai.backend.manager.actions.action import BaseActionResult +from ai.backend.manager.actions.types import ActionOperationType +from ai.backend.manager.data.deployment.types import ModelDeploymentData +from ai.backend.manager.repositories.base import BatchQuerier +from ai.backend.manager.services.deployment.actions.base import DeploymentBaseAction + + +@dataclass(frozen=True) +class SearchLegacyDeploymentsAction(DeploymentBaseAction): + """Legacy no-scope deployment search. + + Backs the v1 REST handler's ``GET /v3/services`` and the v2 adapter's + scoped (``my_search`` / ``project_search``) paths that piggyback their + scope filter onto a no-scope querier. New admin call sites should use + :class:`AdminSearchDeploymentsAction` (routed through + ``DeploymentAdminProcessors``) instead — this action is preserved so the + legacy contract stays intact. + """ + + querier: BatchQuerier + + @override + def entity_id(self) -> str | None: + return None + + @override + @classmethod + def operation_type(cls) -> ActionOperationType: + return ActionOperationType.SEARCH + + +@dataclass(frozen=True) +class SearchLegacyDeploymentsActionResult(BaseActionResult): + data: list[ModelDeploymentData] + total_count: int + has_next_page: bool + has_previous_page: bool + + @override + def entity_id(self) -> str | None: + return None diff --git a/src/ai/backend/manager/services/deployment/actions/search_project_deployments.py b/src/ai/backend/manager/services/deployment/actions/search_project_deployments.py new file mode 100644 index 00000000000..2bfd666f3a2 --- /dev/null +++ b/src/ai/backend/manager/services/deployment/actions/search_project_deployments.py @@ -0,0 +1,55 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import override + +from ai.backend.common.data.permission.types import RBACElementType, ScopeType +from ai.backend.manager.actions.action import BaseActionResult +from ai.backend.manager.actions.types import ActionOperationType +from ai.backend.manager.data.deployment.types import ModelDeploymentData +from ai.backend.manager.data.permission.types import RBACElementRef +from ai.backend.manager.repositories.base import BatchQuerier +from ai.backend.manager.repositories.deployment.types import ProjectDeploymentSearchScope +from ai.backend.manager.services.deployment.actions.base import DeploymentScopeAction + + +@dataclass(frozen=True) +class SearchProjectDeploymentsAction(DeploymentScopeAction): + """Search deployments within a project, returning ``ModelDeploymentData``. + + Distinct from :class:`SearchDeploymentsInProjectAction`, which returns + the lighter-weight ``DeploymentSummaryData`` for project admin list + pages. Backs the v2 adapter's ``project_search`` path. + """ + + scope: ProjectDeploymentSearchScope + querier: BatchQuerier + + @override + @classmethod + def operation_type(cls) -> ActionOperationType: + return ActionOperationType.SEARCH + + @override + def scope_type(self) -> ScopeType: + return ScopeType.PROJECT + + @override + def scope_id(self) -> str: + return str(self.scope.project_id) + + @override + def target_element(self) -> RBACElementRef: + return RBACElementRef(RBACElementType.PROJECT, str(self.scope.project_id)) + + +@dataclass(frozen=True) +class SearchProjectDeploymentsActionResult(BaseActionResult): + data: list[ModelDeploymentData] + total_count: int + has_next_page: bool + has_previous_page: bool + + @override + def entity_id(self) -> str | None: + return None diff --git a/src/ai/backend/manager/services/deployment/actions/search_user_deployments.py b/src/ai/backend/manager/services/deployment/actions/search_user_deployments.py new file mode 100644 index 00000000000..4f502466c59 --- /dev/null +++ b/src/ai/backend/manager/services/deployment/actions/search_user_deployments.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import override + +from ai.backend.common.data.permission.types import RBACElementType, ScopeType +from ai.backend.manager.actions.action import BaseActionResult +from ai.backend.manager.actions.types import ActionOperationType +from ai.backend.manager.data.deployment.types import ModelDeploymentData +from ai.backend.manager.data.permission.types import RBACElementRef +from ai.backend.manager.repositories.base import BatchQuerier +from ai.backend.manager.repositories.deployment.types import UserDeploymentSearchScope +from ai.backend.manager.services.deployment.actions.base import DeploymentScopeAction + + +@dataclass(frozen=True) +class SearchUserDeploymentsAction(DeploymentScopeAction): + """Search deployments created by a specific user. + + Internal name uses the ``User`` scope semantics; the v2 adapter exposes + this as the user-facing ``my_search`` operation, resolving the current + user before constructing the scope. + """ + + scope: UserDeploymentSearchScope + querier: BatchQuerier + + @override + @classmethod + def operation_type(cls) -> ActionOperationType: + return ActionOperationType.SEARCH + + @override + def scope_type(self) -> ScopeType: + return ScopeType.USER + + @override + def scope_id(self) -> str: + return str(self.scope.user_id) + + @override + def target_element(self) -> RBACElementRef: + return RBACElementRef( + element_type=RBACElementType.USER, + element_id=str(self.scope.user_id), + ) + + +@dataclass(frozen=True) +class SearchUserDeploymentsActionResult(BaseActionResult): + data: list[ModelDeploymentData] + total_count: int + has_next_page: bool + has_previous_page: bool + + @override + def entity_id(self) -> str | None: + return None diff --git a/src/ai/backend/manager/services/deployment/admin_service.py b/src/ai/backend/manager/services/deployment/admin_service.py new file mode 100644 index 00000000000..c0706c4d5b5 --- /dev/null +++ b/src/ai/backend/manager/services/deployment/admin_service.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +from ai.backend.manager.repositories.deployment.admin_repository import ( + DeploymentAdminRepository, +) +from ai.backend.manager.services.deployment.actions.admin_search_deployments import ( + AdminSearchDeploymentsAction, + AdminSearchDeploymentsActionResult, +) + +__all__ = ("DeploymentAdminService",) + + +class DeploymentAdminService: + """Admin (no-scope) service operations for deployments. + + Holds the call sites that are not bounded by a project / user / domain + scope (admin search, DataLoader batch lookups). Scoped operations live + on :class:`DeploymentService`. + """ + + _admin_repository: DeploymentAdminRepository + + def __init__(self, admin_repository: DeploymentAdminRepository) -> None: + self._admin_repository = admin_repository + + async def admin_search_deployments( + self, action: AdminSearchDeploymentsAction + ) -> AdminSearchDeploymentsActionResult: + """Search every deployment without a scope filter.""" + result = await self._admin_repository.search_deployments(action.querier) + return AdminSearchDeploymentsActionResult( + data=result.items, + total_count=result.total_count, + has_next_page=result.has_next_page, + has_previous_page=result.has_previous_page, + ) diff --git a/src/ai/backend/manager/services/deployment/processors.py b/src/ai/backend/manager/services/deployment/processors.py index 98f4ab80736..1f82483615f 100644 --- a/src/ai/backend/manager/services/deployment/processors.py +++ b/src/ai/backend/manager/services/deployment/processors.py @@ -30,6 +30,10 @@ SearchAccessTokensAction, SearchAccessTokensActionResult, ) +from ai.backend.manager.services.deployment.actions.admin_search_deployments import ( + AdminSearchDeploymentsAction, + AdminSearchDeploymentsActionResult, +) from ai.backend.manager.services.deployment.actions.auto_scaling_rule.bulk_delete_auto_scaling_rules import ( BulkDeleteAutoScalingRulesAction, BulkDeleteAutoScalingRulesActionResult, @@ -116,18 +120,26 @@ UpdateRouteTrafficStatusAction, UpdateRouteTrafficStatusActionResult, ) -from ai.backend.manager.services.deployment.actions.search_deployments import ( - SearchDeploymentsAction, - SearchDeploymentsActionResult, -) from ai.backend.manager.services.deployment.actions.search_deployments_in_project import ( SearchDeploymentsInProjectAction, SearchDeploymentsInProjectActionResult, ) +from ai.backend.manager.services.deployment.actions.search_legacy_deployments import ( + SearchLegacyDeploymentsAction, + SearchLegacyDeploymentsActionResult, +) +from ai.backend.manager.services.deployment.actions.search_project_deployments import ( + SearchProjectDeploymentsAction, + SearchProjectDeploymentsActionResult, +) from ai.backend.manager.services.deployment.actions.search_replicas import ( SearchReplicasAction, SearchReplicasActionResult, ) +from ai.backend.manager.services.deployment.actions.search_user_deployments import ( + SearchUserDeploymentsAction, + SearchUserDeploymentsActionResult, +) from ai.backend.manager.services.deployment.actions.sync_replicas import ( SyncReplicaAction, SyncReplicaActionResult, @@ -138,6 +150,7 @@ ) if TYPE_CHECKING: + from ai.backend.manager.services.deployment.admin_service import DeploymentAdminService from ai.backend.manager.services.deployment.service import DeploymentService @@ -158,7 +171,15 @@ class DeploymentProcessors(AbstractProcessorPackage): destroy_deployment: SingleEntityActionProcessor[ DestroyDeploymentAction, DestroyDeploymentActionResult ] - search_deployments: ActionProcessor[SearchDeploymentsAction, SearchDeploymentsActionResult] + search_legacy_deployments: ActionProcessor[ + SearchLegacyDeploymentsAction, SearchLegacyDeploymentsActionResult + ] + search_user_deployments: ActionProcessor[ + SearchUserDeploymentsAction, SearchUserDeploymentsActionResult + ] + search_project_deployments: ActionProcessor[ + SearchProjectDeploymentsAction, SearchProjectDeploymentsActionResult + ] search_deployments_in_project: ActionProcessor[ SearchDeploymentsInProjectAction, SearchDeploymentsInProjectActionResult ] @@ -248,7 +269,19 @@ def __init__( self.destroy_deployment = SingleEntityActionProcessor( service.destroy_deployment, action_monitors, validators=rbac_single_entity_validators ) - self.search_deployments = ActionProcessor(service.search_deployments, action_monitors) + self.search_legacy_deployments = ActionProcessor( + service.search_legacy_deployments, action_monitors + ) + self.search_user_deployments = ActionProcessor( + service.search_user_deployments, + action_monitors, + validators=[cast(ActionValidator, validators.rbac.scope)], + ) + self.search_project_deployments = ActionProcessor( + service.search_project_deployments, + action_monitors, + validators=[cast(ActionValidator, validators.rbac.scope)], + ) self.search_deployments_in_project = ActionProcessor( service.search_deployments_in_project, action_monitors, @@ -324,7 +357,9 @@ def supported_actions(self) -> list[ActionSpec]: UpdateDeploymentAction.spec(), ReplaceDeploymentOptionsAction.spec(), DestroyDeploymentAction.spec(), - SearchDeploymentsAction.spec(), + SearchLegacyDeploymentsAction.spec(), + SearchUserDeploymentsAction.spec(), + SearchProjectDeploymentsAction.spec(), SearchDeploymentsInProjectAction.spec(), GetDeploymentByIdAction.spec(), GetDeploymentPolicyAction.spec(), @@ -358,3 +393,31 @@ def supported_actions(self) -> list[ActionSpec]: BulkDeleteAccessTokensAction.spec(), SearchAccessTokensAction.spec(), ] + + +class DeploymentAdminProcessors(AbstractProcessorPackage): + """Admin (no-scope) processors for deployments. + + Counterpart of :class:`DeploymentProcessors` that owns the no-scope + queries — admin search and the DataLoader batch path. Scoped reads + stay on :class:`DeploymentProcessors`. + """ + + admin_search_deployments: ActionProcessor[ + AdminSearchDeploymentsAction, AdminSearchDeploymentsActionResult + ] + + def __init__( + self, + service: DeploymentAdminService, + action_monitors: list[ActionMonitor], + ) -> None: + self.admin_search_deployments = ActionProcessor( + service.admin_search_deployments, action_monitors + ) + + @override + def supported_actions(self) -> list[ActionSpec]: + return [ + AdminSearchDeploymentsAction.spec(), + ] diff --git a/src/ai/backend/manager/services/deployment/service.py b/src/ai/backend/manager/services/deployment/service.py index 4dfc6580b6e..817b924a0ea 100644 --- a/src/ai/backend/manager/services/deployment/service.py +++ b/src/ai/backend/manager/services/deployment/service.py @@ -169,18 +169,26 @@ UpdateRouteTrafficStatusAction, UpdateRouteTrafficStatusActionResult, ) -from ai.backend.manager.services.deployment.actions.search_deployments import ( - SearchDeploymentsAction, - SearchDeploymentsActionResult, -) from ai.backend.manager.services.deployment.actions.search_deployments_in_project import ( SearchDeploymentsInProjectAction, SearchDeploymentsInProjectActionResult, ) +from ai.backend.manager.services.deployment.actions.search_legacy_deployments import ( + SearchLegacyDeploymentsAction, + SearchLegacyDeploymentsActionResult, +) +from ai.backend.manager.services.deployment.actions.search_project_deployments import ( + SearchProjectDeploymentsAction, + SearchProjectDeploymentsActionResult, +) from ai.backend.manager.services.deployment.actions.search_replicas import ( SearchReplicasAction, SearchReplicasActionResult, ) +from ai.backend.manager.services.deployment.actions.search_user_deployments import ( + SearchUserDeploymentsAction, + SearchUserDeploymentsActionResult, +) from ai.backend.manager.services.deployment.actions.sync_replicas import ( SyncReplicaAction, SyncReplicaActionResult, @@ -195,34 +203,16 @@ log = BraceStyleAdapter(logging.getLogger(__name__)) -def _map_lifecycle_to_status(lifecycle: EndpointLifecycle) -> ModelDeploymentStatus: - """Map EndpointLifecycle to ModelDeploymentStatus for the v2 status surface. - - The lifecycle axis is monotonic (PENDING → DEPLOYING → READY → DESTROYING - → DESTROYED); v2 exposes replica reconciliation as the orthogonal - ``scaling_state`` field on the deployment node. ``SCALING`` is therefore - no longer surfaced through ``ModelDeploymentStatus`` — a legacy - ``lifecycle=SCALING`` row folds into ``READY`` so clients only have to - consult ``scaling_state`` to decide whether a replica reconcile is in - flight. Legacy ``CREATED`` (never-deployed) folds into ``PENDING``. - """ - match lifecycle: - case EndpointLifecycle.PENDING | EndpointLifecycle.CREATED: - return ModelDeploymentStatus.PENDING - case EndpointLifecycle.READY | EndpointLifecycle.SCALING: - return ModelDeploymentStatus.READY - case EndpointLifecycle.DEPLOYING: - return ModelDeploymentStatus.DEPLOYING - case EndpointLifecycle.DESTROYING: - return ModelDeploymentStatus.STOPPING - case EndpointLifecycle.DESTROYED: - return ModelDeploymentStatus.STOPPED - - def _convert_deployment_info_to_data(info: DeploymentInfo) -> ModelDeploymentData: - """Convert DeploymentInfo to ModelDeploymentData. + """Project a ``DeploymentInfo`` to ``ModelDeploymentData`` for legacy callers. + + Used only by ``DeploymentService.search_legacy_deployments`` (v1 REST and the + v2 adapter's scoped paths that piggyback their scope filter onto a no-scope + querier). The v2 service paths now project from ``EndpointRow`` directly via + ``EndpointRow.to_model_deployment_data`` and do not pass through this helper. - Note: Some fields are set to defaults as DeploymentInfo doesn't have all the data. + Note: Some fields are set to placeholder defaults because ``DeploymentInfo`` + does not carry them. """ revision: ModelRevisionData | None = None if info.current_revision_id is not None: @@ -248,7 +238,7 @@ def _convert_deployment_info_to_data(info: DeploymentInfo) -> ModelDeploymentDat id=info.id, metadata=ModelDeploymentMetadataInfo( name=info.metadata.name, - status=_map_lifecycle_to_status(info.state.lifecycle), + status=ModelDeploymentStatus.from_lifecycle(info.state.lifecycle), tags=[info.metadata.tag] if info.metadata.tag else [], project_id=info.metadata.project, domain_name=info.metadata.domain, @@ -417,12 +407,8 @@ async def create_deployment( revision=action.creator.model_revision, auto_activate=action.auto_activate, ) - updated_deployment_info = await self._deployment_repository.get_endpoint_info( - deployment_info.id - ) - return CreateDeploymentActionResult( - data=_convert_deployment_info_to_data(updated_deployment_info) - ) + deployment_data = await self._deployment_repository.get_deployment_data(deployment_info.id) + return CreateDeploymentActionResult(data=deployment_data) async def create_legacy_deployment( self, action: CreateLegacyDeploymentAction @@ -462,8 +448,9 @@ async def update_deployment( log.info("Updating deployment with ID: {}", action.updater.pk_value) endpoint_id = DeploymentID(cast(UUID, action.updater.pk_value)) spec = cast(DeploymentUpdaterSpec, action.updater.spec) - deployment_info = await self._deployment_controller.update_deployment(endpoint_id, spec) - return UpdateDeploymentActionResult(data=_convert_deployment_info_to_data(deployment_info)) + await self._deployment_controller.update_deployment(endpoint_id, spec) + deployment_data = await self._deployment_repository.get_deployment_data(endpoint_id) + return UpdateDeploymentActionResult(data=deployment_data) async def replace_deployment_options( self, action: ReplaceDeploymentOptionsAction @@ -504,26 +491,57 @@ async def destroy_deployment( await self._deployment_controller.mark_lifecycle_needed(DeploymentLifecycleType.DESTROYING) return DestroyDeploymentActionResult(success=success) - async def search_deployments( - self, action: SearchDeploymentsAction - ) -> SearchDeploymentsActionResult: - """Search deployments with filtering and pagination. - - Args: - action: Action containing BatchQuerier for filtering and pagination - - Returns: - SearchDeploymentsActionResult: Result containing list of deployments and pagination info + async def search_legacy_deployments( + self, action: SearchLegacyDeploymentsAction + ) -> SearchLegacyDeploymentsActionResult: + """Legacy no-scope deployment search. + + Preserves the pre-refactor projection path — + ``DeploymentRepository.search_endpoints`` returns ``DeploymentInfo`` + items, which are then mapped to ``ModelDeploymentData`` via + ``_convert_deployment_info_to_data``. Backs the v1 REST handler's + ``GET /v3/services`` and the v2 adapter's scoped (``my_search`` / + ``project_search``) paths that piggyback their scope filter onto a + no-scope querier. New admin call sites should go through + :class:`DeploymentAdminService` instead. """ result = await self._deployment_repository.search_endpoints(action.querier) deployments = [_convert_deployment_info_to_data(info) for info in result.items] - return SearchDeploymentsActionResult( + return SearchLegacyDeploymentsActionResult( data=deployments, total_count=result.total_count, has_next_page=result.has_next_page, has_previous_page=result.has_previous_page, ) + async def search_user_deployments( + self, action: SearchUserDeploymentsAction + ) -> SearchUserDeploymentsActionResult: + """Search the current user's deployments — backs ``my_search``.""" + result = await self._deployment_repository.search_user_deployments( + action.querier, action.scope + ) + return SearchUserDeploymentsActionResult( + data=result.items, + total_count=result.total_count, + has_next_page=result.has_next_page, + has_previous_page=result.has_previous_page, + ) + + async def search_project_deployments( + self, action: SearchProjectDeploymentsAction + ) -> SearchProjectDeploymentsActionResult: + """Search deployments inside a project, returning full ``ModelDeploymentData``.""" + result = await self._deployment_repository.search_project_deployments( + action.querier, action.scope + ) + return SearchProjectDeploymentsActionResult( + data=result.items, + total_count=result.total_count, + has_next_page=result.has_next_page, + has_previous_page=result.has_previous_page, + ) + async def search_deployments_in_project( self, action: SearchDeploymentsInProjectAction ) -> SearchDeploymentsInProjectActionResult: @@ -552,8 +570,10 @@ async def get_deployment_by_id( Raises: EndpointNotFound: If the deployment does not exist """ - deployment_info = await self._deployment_repository.get_endpoint_info(action.deployment_id) - return GetDeploymentByIdActionResult(data=_convert_deployment_info_to_data(deployment_info)) + deployment_data = await self._deployment_repository.get_deployment_data( + action.deployment_id + ) + return GetDeploymentByIdActionResult(data=deployment_data) async def get_deployment_policy( self, action: GetDeploymentPolicyAction @@ -652,8 +672,11 @@ async def activate_revision( result = await self._deployment_controller.activate_revision( action.deployment_id, action.revision_id ) + deployment_data = await self._deployment_repository.get_deployment_data( + action.deployment_id + ) return ActivateRevisionActionResult( - deployment=_convert_deployment_info_to_data(result.deployment_info), + deployment=deployment_data, previous_revision_id=result.previous_revision_id, activated_revision_id=result.activated_revision_id, deployment_policy=result.deployment_policy, diff --git a/src/ai/backend/manager/services/factory.py b/src/ai/backend/manager/services/factory.py index d9ba5d7e15e..0828f4d289d 100644 --- a/src/ai/backend/manager/services/factory.py +++ b/src/ai/backend/manager/services/factory.py @@ -20,7 +20,11 @@ from ai.backend.manager.services.auth.service import AuthService from ai.backend.manager.services.container_registry.processors import ContainerRegistryProcessors from ai.backend.manager.services.container_registry.service import ContainerRegistryService -from ai.backend.manager.services.deployment.processors import DeploymentProcessors +from ai.backend.manager.services.deployment.admin_service import DeploymentAdminService +from ai.backend.manager.services.deployment.processors import ( + DeploymentAdminProcessors, + DeploymentProcessors, +) from ai.backend.manager.services.deployment.service import DeploymentService from ai.backend.manager.services.deployment_revision_preset.processors import ( DeploymentRevisionPresetProcessors, @@ -389,6 +393,7 @@ def create_services(args: ServiceArgs) -> Services: runtime_variant_preset_repository=repositories.runtime_variant_preset.repository, appproxy_client_pool=args.appproxy_client_pool, ), + deployment_admin=DeploymentAdminService(repositories.deployment.admin_repository), storage_namespace=StorageNamespaceService(repositories.storage_namespace.repository), audit_log=AuditLogService(repositories.audit_log.repository), scheduling_history=SchedulingHistoryService(repositories.scheduling_history.repository), @@ -508,6 +513,7 @@ def create_processors( services.artifact_revision, action_monitors, validators ), deployment=DeploymentProcessors(services.deployment, action_monitors, validators), + deployment_admin=DeploymentAdminProcessors(services.deployment_admin, action_monitors), storage_namespace=StorageNamespaceProcessors( services.storage_namespace, action_monitors, validators ), diff --git a/src/ai/backend/manager/services/processors.py b/src/ai/backend/manager/services/processors.py index 14a725c2081..3147e2960fe 100644 --- a/src/ai/backend/manager/services/processors.py +++ b/src/ai/backend/manager/services/processors.py @@ -78,7 +78,11 @@ from ai.backend.manager.services.container_registry.service import ( ContainerRegistryService, ) + from ai.backend.manager.services.deployment.admin_service import ( + DeploymentAdminService, + ) from ai.backend.manager.services.deployment.processors import ( + DeploymentAdminProcessors, DeploymentProcessors, ) from ai.backend.manager.services.deployment.service import ( @@ -404,6 +408,7 @@ class Services: artifact_revision: ArtifactRevisionService artifact_registry: ArtifactRegistryService deployment: DeploymentService + deployment_admin: DeploymentAdminService storage_namespace: StorageNamespaceService audit_log: AuditLogService scheduling_history: SchedulingHistoryService @@ -469,6 +474,7 @@ class Processors(AbstractProcessorPackage): artifact_registry: ArtifactRegistryProcessors artifact_revision: ArtifactRevisionProcessors deployment: DeploymentProcessors + deployment_admin: DeploymentAdminProcessors storage_namespace: StorageNamespaceProcessors audit_log: AuditLogProcessors scheduling_history: SchedulingHistoryProcessors @@ -527,6 +533,7 @@ def supported_actions(self) -> list[ActionSpec]: *self.artifact_revision.supported_actions(), *self.artifact.supported_actions(), *self.deployment.supported_actions(), + *self.deployment_admin.supported_actions(), *self.storage_namespace.supported_actions(), *self.audit_log.supported_actions(), *self.scheduling_history.supported_actions(), diff --git a/tests/component/deployment/test_deployment.py b/tests/component/deployment/test_deployment.py index f5b1b4673e9..44ac9240c5d 100644 --- a/tests/component/deployment/test_deployment.py +++ b/tests/component/deployment/test_deployment.py @@ -53,7 +53,6 @@ from ai.backend.common.types import ClusterMode from ai.backend.manager.api.adapters.deployment.adapter import DeploymentAdapter from ai.backend.manager.services.deployment.processors import DeploymentProcessors -from ai.backend.manager.services.deployment.service import _map_lifecycle_to_status from ai.backend.manager.services.processors import Processors if TYPE_CHECKING: @@ -644,7 +643,7 @@ def test_lifecycle_to_status_mapping(self) -> None: } for lifecycle, expected_status in mapping.items(): - actual_status = _map_lifecycle_to_status(lifecycle) + actual_status = ModelDeploymentStatus.from_lifecycle(lifecycle) assert actual_status == expected_status, ( f"EndpointLifecycle.{lifecycle.name} should map to " f"ModelDeploymentStatus.{expected_status.name}, got {actual_status.name}" diff --git a/tests/unit/manager/repositories/deployment/test_endpoint_projection.py b/tests/unit/manager/repositories/deployment/test_endpoint_projection.py new file mode 100644 index 00000000000..12f99a50340 --- /dev/null +++ b/tests/unit/manager/repositories/deployment/test_endpoint_projection.py @@ -0,0 +1,165 @@ +"""Tests for the API-facing ``EndpointRow`` projection at the repository boundary.""" + +from __future__ import annotations + +import uuid +from datetime import UTC, datetime +from typing import Any +from unittest.mock import MagicMock + +from ai.backend.common.data.endpoint.types import EndpointLifecycle, ScalingState +from ai.backend.common.data.model_deployment.types import ModelDeploymentStatus +from ai.backend.common.identifier.deployment import DeploymentID +from ai.backend.common.identifier.deployment_revision import DeploymentRevisionID +from ai.backend.common.types import ClusterMode, ResourceSlot +from ai.backend.manager.data.deployment.types import ( + ClusterConfigData, + DeploymentOptions, + ExecutionData, + ModelMountConfigData, + ModelRevisionData, + ModelRuntimeConfigData, + PresetAttributionData, + ResourceConfigData, +) +from ai.backend.manager.repositories.deployment.db_source.db_source import ( + _endpoint_row_to_deployment_data, +) + + +def _stub_endpoint( + *, + current_revision: DeploymentRevisionID | None, + deploying_revision: DeploymentRevisionID | None, + revisions: list[Any], +) -> Any: + """Build a non-DB stub that satisfies the projection helper's reads. + + The helper is a pure projection — instantiating a real ``EndpointRow`` + against the DB schema would force a session and cascade test setup that + has nothing to do with the projection logic, so a ``MagicMock`` carrying + the columns the helper touches is sufficient. + """ + stub = MagicMock() + stub.id = DeploymentID(uuid.uuid4()) + stub.name = "test-deployment" + stub.lifecycle_stage = EndpointLifecycle.DEPLOYING + stub.tag = None + stub.project = uuid.uuid4() + stub.domain = "default" + stub.created_at = datetime(2024, 1, 1, tzinfo=UTC) + stub.open_to_public = False + stub.url = None + stub.current_revision = current_revision + stub.deploying_revision = deploying_revision + stub.revisions = revisions + stub.replicas = 1 + stub.desired_replicas = None + stub.deployment_policy = None + stub.created_user = uuid.uuid4() + stub.options = DeploymentOptions() + stub.scaling_state = ScalingState.STABLE + stub.sub_step = None + return stub + + +def _stub_revision(rev_id: DeploymentRevisionID) -> Any: + """Build a ``DeploymentRevisionRow`` stub whose ``to_data()`` returns a + minimal, well-formed ``ModelRevisionData``. + """ + rev = MagicMock() + rev.id = rev_id + rev.to_data.return_value = ModelRevisionData( + id=rev_id, + deployment_id=DeploymentID(uuid.uuid4()), + revision_number=1, + cluster_config=ClusterConfigData(mode=ClusterMode.SINGLE_NODE, size=1), + resource_config=ResourceConfigData( + resource_group_name="default", + resource_slot=ResourceSlot({}), + ), + model_runtime_config=ModelRuntimeConfigData( + runtime_variant_id=uuid.uuid4(), # type: ignore[arg-type] + ), + model_mount_config=ModelMountConfigData( + vfolder_id=None, + mount_destination="/models", + definition_path="model-definition.yml", + extra_mounts=[], + ), + execution=ExecutionData( + startup_command=None, + bootstrap_script=None, + callback_url=None, + ), + preset=PresetAttributionData(preset_id=None, values=[]), + created_at=datetime(2024, 1, 1, tzinfo=UTC), + image_id=None, + ) + return rev + + +class TestEndpointRowToDeploymentData: + """Pin the API-facing projection to direct DB columns (BA-5979). + + The previous code path matched the current revision via + ``info.model_revisions[0]``, which non-deterministically swapped + ``current_revision_id`` with ``deploying_revision_id`` when Postgres + returned the relationship rows in a different physical order. The new + helper must source IDs from ``EndpointRow`` columns directly so the + revision-row order is irrelevant. + """ + + def test_current_revision_resolved_by_id_match_not_list_order(self) -> None: + current_id = DeploymentRevisionID(uuid.uuid4()) + deploying_id = DeploymentRevisionID(uuid.uuid4()) + current_rev = _stub_revision(current_id) + deploying_rev = _stub_revision(deploying_id) + + # Deploying revision listed first — the natural-failure case for the + # old ``[0]``-blind lookup. + endpoint = _stub_endpoint( + current_revision=current_id, + deploying_revision=deploying_id, + revisions=[deploying_rev, current_rev], + ) + + data = _endpoint_row_to_deployment_data(endpoint) + + assert data.current_revision_id == current_id + assert data.deploying_revision_id == deploying_id + assert data.current_revision_id != data.deploying_revision_id + assert data.revision is not None + assert data.revision.id == current_id + + def test_current_revision_id_survives_dangling_reference(self) -> None: + """If the revision row was deleted but the column still points to it, + surface the column ID and report the spec as ``None`` — do not collapse + ``current_revision_id`` to ``None`` together with the missing spec. + """ + dangling_id = DeploymentRevisionID(uuid.uuid4()) + + endpoint = _stub_endpoint( + current_revision=dangling_id, + deploying_revision=None, + revisions=[], + ) + + data = _endpoint_row_to_deployment_data(endpoint) + + assert data.current_revision_id == dangling_id + assert data.deploying_revision_id is None + assert data.revision is None + + def test_lifecycle_status_mapping(self) -> None: + """Status surface follows the lifecycle column directly.""" + endpoint = _stub_endpoint( + current_revision=None, + deploying_revision=None, + revisions=[], + ) + endpoint.lifecycle_stage = EndpointLifecycle.READY + + data = _endpoint_row_to_deployment_data(endpoint) + + assert data.metadata.status == ModelDeploymentStatus.READY