Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/11535.feature.md
Comment thread
seedspirit marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Inject capacity sentinel into kernel live_stat for metrics without a Prometheus capacity series
Comment thread
seedspirit marked this conversation as resolved.
6 changes: 3 additions & 3 deletions src/ai/backend/common/clients/prometheus/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
FailedToGetMetric,
PrometheusConnectionError,
)
from ai.backend.common.metrics.types import KernelLiveStatValues
from ai.backend.common.types import KernelId

from .preset import MetricPreset
from .types import MetricValue

DEFAULT_TIMEOUT_SECONDS: float = 30.0

Expand Down Expand Up @@ -82,7 +82,7 @@ async def fetch_container_metric(
async def fetch_container_live_stats(
self,
kernel_ids: Sequence[KernelId],
) -> dict[KernelId, list[MetricValue]]:
) -> KernelLiveStatValues:
queries = self._fixed_query_builder.get_container_live_stat_queries(kernel_ids)
gauge_response = await self._query_instant(queries.gauge)
diff_response = await self._query_instant(queries.diff)
Expand All @@ -91,7 +91,7 @@ async def fetch_container_live_stats(
diff = KernelMetricValuesByKernel.from_prometheus_response(diff_response)
rate = KernelMetricValuesByKernel.from_prometheus_response(rate_response)
merged = gauge.merged_with(diff).merged_with(rate)
return merged.values_by_kernel
return KernelLiveStatValues.with_capacity_sentinels(merged.values_by_kernel)

async def execute_preset(
self,
Expand Down
64 changes: 63 additions & 1 deletion src/ai/backend/common/metrics/types.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
from typing import Final
from collections.abc import Mapping
from dataclasses import dataclass
from typing import Final, Self

from ai.backend.common.clients.prometheus.types import MetricValue, ValueType
from ai.backend.common.types import KernelId

UNDEFINED: Final[str] = "undefined"

Expand All @@ -9,3 +14,60 @@
CONTAINER_UTILIZATION_METRIC_LABEL_NAME: Final[str] = "container_metric_name"
DEVICE_UTILIZATION_METRIC_LABEL_NAME: Final[str] = "device_metric_name"
PROCESS_UTILIZATION_METRIC_LABEL_NAME: Final[str] = "process_metric_name"

# Stand-in capacity for metrics whose Prometheus capacity series does not exist.
CAPACITY_SENTINEL: Final[str] = "9223372036854775807" # 2**63 - 1

CAPACITY_SENTINEL_METRICS: Final[frozenset[str]] = frozenset({
"cpu_used",
"net_rx",
"net_tx",
"io_read",
"io_write",
})


@dataclass(frozen=True)
class KernelLiveStatValues:
values_by_kernel: Mapping[KernelId, list[MetricValue]]

@classmethod
def with_capacity_sentinels(
cls,
values_by_kernel: Mapping[KernelId, list[MetricValue]],
) -> Self:
"""For metrics in `CAPACITY_SENTINEL_METRICS` that are live (have a
CURRENT sample), force the CAPACITY sample to `CAPACITY_SENTINEL`.

These metrics have no meaningful capacity (cumulative counters / rates),
so any CAPACITY value present in the Prometheus response is a stale
current-as-fallback artifact and must be overwritten rather than
respected.
"""
new_values: dict[KernelId, list[MetricValue]] = {
kid: list(vs) for kid, vs in values_by_kernel.items()
}
for kid, vs in new_values.items():
reported_currents: set[str] = {
v.metric_name for v in vs if v.value_type is ValueType.CURRENT
}
sentinel_targets = reported_currents & CAPACITY_SENTINEL_METRICS
if not sentinel_targets:
continue
Comment on lines +51 to +56
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation feels a bit forced.

rewritten: list[MetricValue] = []
samples_to_keep = [
v
for v in vs
if not (v.value_type is ValueType.CAPACITY and v.metric_name in sentinel_targets)
]
rewritten.extend(samples_to_keep)
for metric_name in sentinel_targets:
rewritten.append(
MetricValue(
metric_name=metric_name,
value_type=ValueType.CAPACITY,
value=CAPACITY_SENTINEL,
)
)
new_values[kid] = rewritten
return cls(values_by_kernel=new_values)
6 changes: 4 additions & 2 deletions src/ai/backend/manager/repositories/metric/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,10 @@ async def query_container_live_stats(
if not kernel_ids:
return KernelLiveStatBatchResult.empty(kernel_ids)
try:
values_by_kernel = await self._prometheus_client.fetch_container_live_stats(kernel_ids)
live_values = await self._prometheus_client.fetch_container_live_stats(kernel_ids)
except (PrometheusConnectionError, FailedToGetMetric):
log.warning("Failed to query metrics for kernel live stats, returning empty results")
return KernelLiveStatBatchResult.empty(kernel_ids)
return KernelLiveStatBatchResult.from_metric_values(kernel_ids, values_by_kernel)
return KernelLiveStatBatchResult.from_metric_values(
kernel_ids, live_values.values_by_kernel
)
97 changes: 97 additions & 0 deletions tests/unit/common/metrics/test_capacity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from uuid import UUID

import pytest

from ai.backend.common.clients.prometheus.types import MetricValue, ValueType
from ai.backend.common.metrics.types import (
CAPACITY_SENTINEL,
KernelLiveStatValues,
)
from ai.backend.common.types import KernelId


def _capacities_of(result: KernelLiveStatValues, kernel_id: KernelId) -> dict[str, str]:
"""Return `{metric_name: value}` for every CAPACITY sample of `kernel_id`."""
return {
v.metric_name: v.value
for v in result.values_by_kernel[kernel_id]
if v.value_type is ValueType.CAPACITY
}


class TestWithCapacitySentinels:
"""Tests for `KernelLiveStatValues.with_capacity_sentinels`."""

@pytest.fixture()
def kernel_id(self) -> KernelId:
return KernelId(UUID("12345678-1234-5678-1234-567812345678"))

@pytest.fixture()
def other_kernel_id(self) -> KernelId:
return KernelId(UUID("87654321-4321-8765-4321-876543218765"))

@pytest.mark.parametrize(
"metric_name",
["cpu_used", "net_rx", "net_tx", "io_read", "io_write"],
)
async def test_appends_sentinel_capacity_for_whitelisted_live_metric(
self, kernel_id: KernelId, metric_name: str
) -> None:
"""Whitelisted metrics with a CURRENT sample but no CAPACITY sample
receive a synthetic capacity carrying `CAPACITY_SENTINEL`.
"""
result = KernelLiveStatValues.with_capacity_sentinels({
kernel_id: [MetricValue(metric_name, ValueType.CURRENT, "42")],
})
assert _capacities_of(result, kernel_id) == {metric_name: CAPACITY_SENTINEL}

@pytest.mark.parametrize(
"metric_name",
["cpu_used", "net_rx", "net_tx", "io_read", "io_write"],
)
async def test_existing_capacity_is_overwritten(
self, kernel_id: KernelId, metric_name: str
) -> None:
"""Whitelisted metrics never have a meaningful capacity, so any
capacity already present (e.g. a current-as-fallback artifact) must
be overwritten by the sentinel.
"""
result = KernelLiveStatValues.with_capacity_sentinels({
kernel_id: [
MetricValue(metric_name, ValueType.CURRENT, "10"),
MetricValue(metric_name, ValueType.CAPACITY, "999"),
],
})
assert _capacities_of(result, kernel_id) == {metric_name: CAPACITY_SENTINEL}

async def test_skips_metric_without_current_sample(self, kernel_id: KernelId) -> None:
"""No phantom capacity is added when the metric has no CURRENT sample."""
result = KernelLiveStatValues.with_capacity_sentinels({kernel_id: []})
assert _capacities_of(result, kernel_id) == {}

@pytest.mark.parametrize("metric_name", ["mem", "io_scratch_size", "cpu_util"])
async def test_metric_outside_whitelist_is_untouched(
self, kernel_id: KernelId, metric_name: str
) -> None:
"""Metrics that have a real Prometheus capacity series are left alone."""
result = KernelLiveStatValues.with_capacity_sentinels({
kernel_id: [MetricValue(metric_name, ValueType.CURRENT, "1")],
})
assert _capacities_of(result, kernel_id) == {}

async def test_isolates_per_kernel(
self, kernel_id: KernelId, other_kernel_id: KernelId
) -> None:
"""Sentinel injection on one kernel does not leak into another."""
result = KernelLiveStatValues.with_capacity_sentinels({
kernel_id: [MetricValue("net_rx", ValueType.CURRENT, "1")],
other_kernel_id: [MetricValue("mem", ValueType.CURRENT, "2")],
})
assert _capacities_of(result, kernel_id) == {"net_rx": CAPACITY_SENTINEL}
assert _capacities_of(result, other_kernel_id) == {}

async def test_input_is_not_mutated(self, kernel_id: KernelId) -> None:
"""The caller's input list is left untouched."""
original = [MetricValue("net_rx", ValueType.CURRENT, "1")]
KernelLiveStatValues.with_capacity_sentinels({kernel_id: original})
assert original == [MetricValue("net_rx", ValueType.CURRENT, "1")]
Loading