Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/11535.enhance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Inject capacity sentinel into kernel live_stat for metrics without a Prometheus capacity series
6 changes: 3 additions & 3 deletions src/ai/backend/common/clients/prometheus/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
FailedToGetMetric,
PrometheusConnectionError,
)
from ai.backend.common.metrics.types import KernelLiveStatValues
from ai.backend.common.types import KernelId

from .preset import MetricPreset
from .types import MetricValue

DEFAULT_TIMEOUT_SECONDS: float = 30.0

Expand Down Expand Up @@ -82,7 +82,7 @@ async def fetch_container_metric(
async def fetch_container_live_stats(
self,
kernel_ids: Sequence[KernelId],
) -> dict[KernelId, list[MetricValue]]:
) -> KernelLiveStatValues:
queries = self._fixed_query_builder.get_container_live_stat_queries(kernel_ids)
gauge_response = await self._query_instant(queries.gauge)
diff_response = await self._query_instant(queries.diff)
Expand All @@ -91,7 +91,7 @@ async def fetch_container_live_stats(
diff = KernelMetricValuesByKernel.from_prometheus_response(diff_response)
rate = KernelMetricValuesByKernel.from_prometheus_response(rate_response)
merged = gauge.merged_with(diff).merged_with(rate)
return merged.values_by_kernel
return KernelLiveStatValues.with_capacity_sentinels(merged.values_by_kernel)

async def execute_preset(
self,
Expand Down
55 changes: 54 additions & 1 deletion src/ai/backend/common/metrics/types.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
from typing import Final
from collections.abc import Mapping
from dataclasses import dataclass
from typing import Final, Self

from ai.backend.common.clients.prometheus.types import MetricValue, ValueType
from ai.backend.common.types import KernelId

UNDEFINED: Final[str] = "undefined"

Expand All @@ -9,3 +14,51 @@
CONTAINER_UTILIZATION_METRIC_LABEL_NAME: Final[str] = "container_metric_name"
DEVICE_UTILIZATION_METRIC_LABEL_NAME: Final[str] = "device_metric_name"
PROCESS_UTILIZATION_METRIC_LABEL_NAME: Final[str] = "process_metric_name"

# Stand-in capacity for metrics whose Prometheus capacity series does not exist.
CAPACITY_SENTINEL: Final[str] = "9223372036854775807" # 2**63 - 1

CAPACITY_SENTINEL_METRICS: Final[frozenset[str]] = frozenset({
"cpu_used",
"net_rx",
"net_tx",
"io_read",
"io_write",
})


@dataclass(frozen=True)
class KernelLiveStatValues:
values_by_kernel: Mapping[KernelId, list[MetricValue]]

@classmethod
def with_capacity_sentinels(
cls,
values_by_kernel: Mapping[KernelId, list[MetricValue]],
) -> Self:
"""For metrics in `CAPACITY_SENTINEL_METRICS` that are live (have a
CURRENT sample) but lack a Prometheus capacity series, append a
synthetic CAPACITY sample carrying `CAPACITY_SENTINEL`. Existing
capacity values are preserved.
"""
new_values: dict[KernelId, list[MetricValue]] = {
kid: list(vs) for kid, vs in values_by_kernel.items()
}
for vs in new_values.values():
reported_currents: set[str] = set()
reported_capacities: set[str] = set()
for v in vs:
if v.value_type is ValueType.CURRENT:
reported_currents.add(v.metric_name)
elif v.value_type is ValueType.CAPACITY:
reported_capacities.add(v.metric_name)
sentinel_targets = (reported_currents & CAPACITY_SENTINEL_METRICS) - reported_capacities
for metric_name in sentinel_targets:
vs.append(
MetricValue(
metric_name=metric_name,
value_type=ValueType.CAPACITY,
value=CAPACITY_SENTINEL,
)
)
return cls(values_by_kernel=new_values)
6 changes: 4 additions & 2 deletions src/ai/backend/manager/repositories/metric/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,10 @@ async def query_container_live_stats(
if not kernel_ids:
return KernelLiveStatBatchResult.empty(kernel_ids)
try:
values_by_kernel = await self._prometheus_client.fetch_container_live_stats(kernel_ids)
live_values = await self._prometheus_client.fetch_container_live_stats(kernel_ids)
except (PrometheusConnectionError, FailedToGetMetric):
log.warning("Failed to query metrics for kernel live stats, returning empty results")
return KernelLiveStatBatchResult.empty(kernel_ids)
return KernelLiveStatBatchResult.from_metric_values(kernel_ids, values_by_kernel)
return KernelLiveStatBatchResult.from_metric_values(
kernel_ids, live_values.values_by_kernel
)
88 changes: 88 additions & 0 deletions tests/unit/common/metrics/test_capacity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from uuid import UUID

import pytest

from ai.backend.common.clients.prometheus.types import MetricValue, ValueType
from ai.backend.common.metrics.types import (
CAPACITY_SENTINEL,
KernelLiveStatValues,
)
from ai.backend.common.types import KernelId


def _capacities_of(result: KernelLiveStatValues, kernel_id: KernelId) -> dict[str, str]:
"""Return `{metric_name: value}` for every CAPACITY sample of `kernel_id`."""
return {
v.metric_name: v.value
for v in result.values_by_kernel[kernel_id]
if v.value_type is ValueType.CAPACITY
}


class TestWithCapacitySentinels:
"""Tests for `KernelLiveStatValues.with_capacity_sentinels`."""

@pytest.fixture()
def kernel_id(self) -> KernelId:
return KernelId(UUID("12345678-1234-5678-1234-567812345678"))

@pytest.fixture()
def other_kernel_id(self) -> KernelId:
return KernelId(UUID("87654321-4321-8765-4321-876543218765"))

@pytest.mark.parametrize(
"metric_name",
["cpu_used", "net_rx", "net_tx", "io_read", "io_write"],
)
async def test_appends_sentinel_capacity_for_whitelisted_live_metric(
self, kernel_id: KernelId, metric_name: str
) -> None:
"""Whitelisted metrics with a CURRENT sample but no CAPACITY sample
receive a synthetic capacity carrying `CAPACITY_SENTINEL`.
"""
result = KernelLiveStatValues.with_capacity_sentinels({
kernel_id: [MetricValue(metric_name, ValueType.CURRENT, "42")],
})
assert _capacities_of(result, kernel_id) == {metric_name: CAPACITY_SENTINEL}

async def test_existing_capacity_is_preserved(self, kernel_id: KernelId) -> None:
"""A real Prometheus capacity sample is not overwritten by the sentinel."""
result = KernelLiveStatValues.with_capacity_sentinels({
kernel_id: [
MetricValue("net_rx", ValueType.CURRENT, "10"),
MetricValue("net_rx", ValueType.CAPACITY, "999"),
],
})
assert _capacities_of(result, kernel_id) == {"net_rx": "999"}

async def test_skips_metric_without_current_sample(self, kernel_id: KernelId) -> None:
"""No phantom capacity is added when the metric has no CURRENT sample."""
result = KernelLiveStatValues.with_capacity_sentinels({kernel_id: []})
assert _capacities_of(result, kernel_id) == {}

@pytest.mark.parametrize("metric_name", ["mem", "io_scratch_size", "cpu_util"])
async def test_metric_outside_whitelist_is_untouched(
self, kernel_id: KernelId, metric_name: str
) -> None:
"""Metrics that have a real Prometheus capacity series are left alone."""
result = KernelLiveStatValues.with_capacity_sentinels({
kernel_id: [MetricValue(metric_name, ValueType.CURRENT, "1")],
})
assert _capacities_of(result, kernel_id) == {}

async def test_isolates_per_kernel(
self, kernel_id: KernelId, other_kernel_id: KernelId
) -> None:
"""Sentinel injection on one kernel does not leak into another."""
result = KernelLiveStatValues.with_capacity_sentinels({
kernel_id: [MetricValue("net_rx", ValueType.CURRENT, "1")],
other_kernel_id: [MetricValue("mem", ValueType.CURRENT, "2")],
})
assert _capacities_of(result, kernel_id) == {"net_rx": CAPACITY_SENTINEL}
assert _capacities_of(result, other_kernel_id) == {}

async def test_input_is_not_mutated(self, kernel_id: KernelId) -> None:
"""The caller's input list is left untouched."""
original = [MetricValue("net_rx", ValueType.CURRENT, "1")]
KernelLiveStatValues.with_capacity_sentinels({kernel_id: original})
assert original == [MetricValue("net_rx", ValueType.CURRENT, "1")]
Loading