Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/11360.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add window-based max/avg container live stats queries via PromQL to populate legacy `stats.max` / `stats.avg` fields
13 changes: 6 additions & 7 deletions src/ai/backend/common/clients/prometheus/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,12 @@ async def fetch_container_live_stats(
kernel_ids: Sequence[KernelId],
) -> dict[KernelId, list[MetricValue]]:
queries = self._fixed_query_builder.get_container_live_stat_queries(kernel_ids)
gauge_response = await self._query_instant(queries.gauge)
diff_response = await self._query_instant(queries.diff)
rate_response = await self._query_instant(queries.rate)
gauge = KernelMetricValuesByKernel.from_prometheus_response(gauge_response)
diff = KernelMetricValuesByKernel.from_prometheus_response(diff_response)
rate = KernelMetricValuesByKernel.from_prometheus_response(rate_response)
merged = gauge.merged_with(diff).merged_with(rate)
merged = KernelMetricValuesByKernel(values_by_kernel={})
for preset in queries.to_list():
response = await self._query_instant(preset)
merged = merged.merged_with(
KernelMetricValuesByKernel.from_prometheus_response(response)
)
return merged.values_by_kernel

async def execute_preset(
Expand Down
153 changes: 132 additions & 21 deletions src/ai/backend/common/clients/prometheus/fixed_query_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@
from ai.backend.common.clients.prometheus.metric_types import (
DIFF_METRICS,
RATE_METRICS,
STATS_AVG_GAUGE_METRIC_PATTERNS,
STATS_AVG_GAUGE_METRICS,
STATS_AVG_OVER_RATE_METRICS,
STATS_MAX_GAUGE_METRIC_PATTERNS,
STATS_MAX_GAUGE_METRICS,
STATS_MAX_OVER_RATE_METRICS,
ContainerLiveStatQueries,
ContainerMetricOptionalLabel,
MetricType,
Expand Down Expand Up @@ -47,10 +53,49 @@ class LabelValuesQuery:
metric_match: str


@dataclass(frozen=True)
class _LiveStatQuerySpec:
template: str
metric_name_filter: frozenset[str] | None = None
value_type_filter: ValueType | None = None


@dataclass(frozen=True)
class _StatsBucket:
"""Window-stats bucket spec (gauge metrics + rate metrics for a single stat)."""

value_type: ValueType
gauge_metrics: frozenset[str]
rate_metrics: frozenset[str]
gauge_metric_patterns: frozenset[str] = frozenset()


def _regex_union(values: Sequence[str]) -> str:
return "|".join(re.escape(value) for value in values)


def _metric_name_regex(
metric_names: frozenset[str],
metric_patterns: frozenset[str] = frozenset(),
) -> str:
exact_parts = [re.escape(value) for value in sorted(metric_names)]
return "|".join([*exact_parts, *sorted(metric_patterns)])


_MAX_STATS_BUCKET: Final[_StatsBucket] = _StatsBucket(
value_type=ValueType.MAX,
gauge_metrics=STATS_MAX_GAUGE_METRICS,
rate_metrics=STATS_MAX_OVER_RATE_METRICS,
gauge_metric_patterns=STATS_MAX_GAUGE_METRIC_PATTERNS,
)
_AVG_STATS_BUCKET: Final[_StatsBucket] = _StatsBucket(
value_type=ValueType.AVG,
gauge_metrics=STATS_AVG_GAUGE_METRICS,
rate_metrics=STATS_AVG_OVER_RATE_METRICS,
gauge_metric_patterns=STATS_AVG_GAUGE_METRIC_PATTERNS,
)


class FixedQueryBuilder:
_timewindow: str

Expand Down Expand Up @@ -100,50 +145,116 @@ def get_container_live_stat_queries(
self,
kernel_ids: Sequence[KernelId],
) -> ContainerLiveStatQueries:
kernel_id_regex = _regex_union([str(kid) for kid in kernel_ids])
group_by = ",".join(sorted(_LIVE_STAT_GROUP_BY))
return ContainerLiveStatQueries(
gauge=self._get_container_live_stat_query(
gauge=self._get_live_stat_query(
kernel_ids,
metric_type=MetricType.GAUGE,
_LiveStatQuerySpec(template=self._get_template(MetricType.GAUGE)),
),
diff=self._get_container_live_stat_query(
diff=self._get_live_stat_query(
kernel_ids,
metric_type=MetricType.DIFF,
metric_name_filter=DIFF_METRICS,
value_type_filter=ValueType.CURRENT,
_LiveStatQuerySpec(
template=self._get_template(MetricType.DIFF),
metric_name_filter=DIFF_METRICS,
value_type_filter=ValueType.CURRENT,
),
),
rate=self._get_container_live_stat_query(
rate=self._get_live_stat_query(
kernel_ids,
metric_type=MetricType.RATE,
metric_name_filter=RATE_METRICS,
value_type_filter=ValueType.CURRENT,
_LiveStatQuerySpec(
template=self._get_template(MetricType.RATE),
metric_name_filter=RATE_METRICS,
value_type_filter=ValueType.CURRENT,
),
),
max=self._build_stats_preset(_MAX_STATS_BUCKET, kernel_id_regex, group_by),
avg=self._build_stats_preset(_AVG_STATS_BUCKET, kernel_id_regex, group_by),
)

def _build_stats_preset(
self,
bucket: _StatsBucket,
kernel_id_regex: str,
group_by: str,
) -> MetricPreset:
return MetricPreset(
template=self._render_stats_query(
bucket,
kernel_id_regex=kernel_id_regex,
group_by=group_by,
)
)

def _get_container_live_stat_query(
def _get_live_stat_query(
self,
kernel_ids: Sequence[KernelId],
*,
metric_type: MetricType,
metric_name_filter: frozenset[str] | None = None,
value_type_filter: ValueType | None = None,
spec: _LiveStatQuerySpec,
) -> MetricPreset:
labels: dict[str, LabelMatcher] = {
"kernel_id": LabelMatcher.regex(_regex_union([str(kid) for kid in kernel_ids]))
}
if metric_name_filter is not None:
if spec.metric_name_filter is not None:
labels["container_metric_name"] = LabelMatcher.regex(
_regex_union(sorted(metric_name_filter))
_regex_union(sorted(spec.metric_name_filter))
)
if value_type_filter is not None:
labels["value_type"] = LabelMatcher.exact(value_type_filter.value)
if spec.value_type_filter is not None:
labels["value_type"] = LabelMatcher.exact(spec.value_type_filter.value)

return MetricPreset(
template=self._get_template(metric_type),
labels=labels,
template=spec.template,
group_by=_LIVE_STAT_GROUP_BY,
labels=labels,
window=self._timewindow,
)

def _render_stats_query(
self,
bucket: _StatsBucket,
*,
kernel_id_regex: str,
group_by: str,
) -> str:
parts: list[str] = []
stat_fn = f"{bucket.value_type.value}_over_time"
stat_label = bucket.value_type.to_live_stat_label()
if bucket.gauge_metrics or bucket.gauge_metric_patterns:
gauge_regex = _metric_name_regex(bucket.gauge_metrics, bucket.gauge_metric_patterns)
gauge_labels = self._live_stat_current_labels(
kernel_id_regex=kernel_id_regex,
metric_name_regex=gauge_regex,
)
parts.append(
f"label_replace({stat_fn}((sum by ({group_by})("
f"{CONTAINER_UTILIZATION_METRIC_NAME}{{{gauge_labels}}}))[{self._timewindow}:]),"
f'"value_type","{stat_label}","value_type",".*")'
)
if bucket.rate_metrics:
rate_regex = _regex_union(sorted(bucket.rate_metrics))
rate_labels = self._live_stat_current_labels(
kernel_id_regex=kernel_id_regex,
metric_name_regex=rate_regex,
)
parts.append(
f"label_replace({stat_fn}((sum by ({group_by})(rate("
f"{CONTAINER_UTILIZATION_METRIC_NAME}{{{rate_labels}}}"
f"[{self._timewindow}])))[{self._timewindow}:]),"
f'"value_type","{stat_label}","value_type",".*")'
)
return " or ".join(parts)

def _live_stat_current_labels(
self,
*,
kernel_id_regex: str,
metric_name_regex: str,
) -> str:
return (
f'kernel_id=~"{kernel_id_regex}"'
f',container_metric_name=~"{metric_name_regex}"'
f',value_type="{ValueType.CURRENT.value}"'
)

def _get_template(self, metric_type: MetricType) -> str:
match metric_type:
case MetricType.GAUGE:
Expand Down
23 changes: 20 additions & 3 deletions src/ai/backend/common/clients/prometheus/metric_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,36 @@ class MetricType(StrEnum):

@dataclass(frozen=True)
class ContainerLiveStatQueries:
"""Gauge / diff / rate query preset bundle for container live stats."""
"""Gauge / diff / rate / max / avg query preset bundle for container live stats."""

gauge: MetricPreset
diff: MetricPreset
rate: MetricPreset
max: MetricPreset
avg: MetricPreset

def to_list(self) -> list[MetricPreset]:
return [self.gauge, self.diff, self.rate]
return [self.gauge, self.diff, self.rate, self.max, self.avg]


DIFF_METRICS: Final[frozenset[str]] = frozenset({"cpu_util"})
RATE_METRICS: Final[frozenset[str]] = frozenset({"net_rx", "net_tx"})

# Window stats: built-ins are exact, accelerator/plugin metrics use patterns.
STATS_MAX_GAUGE_METRICS: Final[frozenset[str]] = frozenset({
"mem",
"io_scratch_size",
})
STATS_MAX_GAUGE_METRIC_PATTERNS: Final[frozenset[str]] = frozenset({
r"[A-Za-z0-9][A-Za-z0-9_-]*_(mem|util|power)",
})
STATS_AVG_GAUGE_METRICS: Final[frozenset[str]] = frozenset()
STATS_AVG_GAUGE_METRIC_PATTERNS: Final[frozenset[str]] = frozenset({
r"[A-Za-z0-9][A-Za-z0-9_-]*_util",
})
STATS_MAX_OVER_RATE_METRICS: Final[frozenset[str]] = frozenset({"cpu_util"})
STATS_AVG_OVER_RATE_METRICS: Final[frozenset[str]] = frozenset({"cpu_util"})


@dataclass
class ContainerMetricResponseInfo:
Expand Down Expand Up @@ -184,7 +201,7 @@ def from_prometheus_response(cls, response: PrometheusResponse) -> Self:
container_metric_name = cast(str, info.container_metric_name)
value_type_str = cast(str, info.value_type)
try:
value_type = ValueType(value_type_str)
value_type = ValueType.from_legacy_live_stat_label(value_type_str)
kernel_id = KernelId(UUID(kernel_id_str))
except ValueError:
continue
Expand Down
16 changes: 16 additions & 0 deletions src/ai/backend/common/clients/prometheus/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,22 @@ class ValueType(StrEnum):
CURRENT = "current"
CAPACITY = "capacity"
PCT = "pct"
MAX = "max"
AVG = "avg"
RATE = "rate"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a question, this seems PR is adding max, and avg
Why rate is included here?


@classmethod
def from_legacy_live_stat_label(cls, value: str) -> "ValueType":
if value.startswith("stats."):
return cls(value.removeprefix("stats."))
return cls(value)

def to_live_stat_label(self) -> str:
match self:
case ValueType.MAX | ValueType.AVG | ValueType.RATE:
return f"stats.{self.value}"
case _:
return self.value


@dataclass(frozen=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from ai.backend.common.clients.prometheus.metric_types import (
ContainerMetricOptionalLabel,
ContainerMetricResponseInfo,
KernelMetricValuesByKernel,
MetricType,
ValueType,
)
Expand All @@ -30,6 +31,7 @@
InvalidAPIParameters,
PrometheusConnectionError,
)
from ai.backend.common.types import KernelId
from ai.backend.manager.repositories.metric.repository import MetricRepository
from ai.backend.manager.services.metric.actions.container import (
ContainerMetricAction,
Expand Down Expand Up @@ -804,6 +806,73 @@ async def test_build_query_renders_expected_promql(self, case: BuiltinQueryTestC
assert rendered_query == case.expected_query


class TestLiveStatQueryProvider:
"""Characterization tests for container live stat PromQL."""

def test_stats_queries_render_legacy_labels_from_typed_value_types(self) -> None:
kernel_id = KernelId(UUID("12345678-1234-5678-1234-567812345678"))
fixed_query_builder = FixedQueryBuilder("5m")

queries = fixed_query_builder.get_container_live_stat_queries([kernel_id])

assert queries.max.render() == (
"label_replace(max_over_time((sum by (container_metric_name,kernel_id,value_type)("
"backendai_container_utilization"
'{kernel_id=~"12345678\\-1234\\-5678\\-1234\\-567812345678",'
'container_metric_name=~"io_scratch_size|mem|'
'[A-Za-z0-9][A-Za-z0-9_-]*_(mem|util|power)",'
'value_type="current"}))[5m:]),'
'"value_type","stats.max","value_type",".*")'
" or "
"label_replace(max_over_time((sum by (container_metric_name,kernel_id,value_type)(rate("
"backendai_container_utilization"
'{kernel_id=~"12345678\\-1234\\-5678\\-1234\\-567812345678",'
'container_metric_name=~"cpu_util",value_type="current"}'
"[5m])))[5m:]),"
'"value_type","stats.max","value_type",".*")'
)
assert queries.avg.render() == (
"label_replace(avg_over_time((sum by (container_metric_name,kernel_id,value_type)("
"backendai_container_utilization"
'{kernel_id=~"12345678\\-1234\\-5678\\-1234\\-567812345678",'
'container_metric_name=~"[A-Za-z0-9][A-Za-z0-9_-]*_util",'
'value_type="current"}))[5m:]),'
'"value_type","stats.avg","value_type",".*")'
" or "
"label_replace(avg_over_time((sum by (container_metric_name,kernel_id,value_type)(rate("
"backendai_container_utilization"
'{kernel_id=~"12345678\\-1234\\-5678\\-1234\\-567812345678",'
'container_metric_name=~"cpu_util",value_type="current"}'
"[5m])))[5m:]),"
'"value_type","stats.avg","value_type",".*")'
)


class TestKernelMetricValuesByKernel:
def test_from_prometheus_response_maps_legacy_stat_label_to_value_type(self) -> None:
kernel_id = KernelId(UUID("12345678-1234-5678-1234-567812345678"))
response = PrometheusResponse(
status="success",
data=PrometheusQueryData(
result_type="vector",
result=[
MetricResponse(
metric=MetricResponseInfo(
kernel_id=str(kernel_id),
container_metric_name="mem",
value_type="stats.max",
),
values=[(1704067200.0, "1024")],
)
],
),
)

result = KernelMetricValuesByKernel.from_prometheus_response(response)

assert result.values_by_kernel[kernel_id][0].value_type == ValueType.MAX


class TestMetricResponseInfoParsing:
"""Unit tests for MetricResponseInfo parsing behavior."""

Expand Down
Loading