Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/11360.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Provide a manager-side parallel supply for legacy `live_stat` `stats.max` / `stats.avg` / `stats.rate` fields, computed from Prometheus on demand instead of from the agent's in-memory `MovingStatistics` accumulator. Survives agent / manager / host restart, stays consistent across sessions, and uses a sliding window (default 5m) instead of unbounded lifetime accumulation.
13 changes: 6 additions & 7 deletions src/ai/backend/common/clients/prometheus/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,12 @@ async def fetch_container_live_stats(
kernel_ids: Sequence[KernelId],
) -> dict[KernelId, list[MetricValue]]:
queries = self._fixed_query_builder.get_container_live_stat_queries(kernel_ids)
gauge_response = await self._query_instant(queries.gauge)
diff_response = await self._query_instant(queries.diff)
rate_response = await self._query_instant(queries.rate)
gauge = KernelMetricValuesByKernel.from_prometheus_response(gauge_response)
diff = KernelMetricValuesByKernel.from_prometheus_response(diff_response)
rate = KernelMetricValuesByKernel.from_prometheus_response(rate_response)
merged = gauge.merged_with(diff).merged_with(rate)
merged = KernelMetricValuesByKernel(values_by_kernel={})
for preset in queries.to_list():
response = await self._query_instant(preset)
merged = merged.merged_with(
KernelMetricValuesByKernel.from_prometheus_response(response)
)
return merged.values_by_kernel

async def execute_preset(
Expand Down
198 changes: 167 additions & 31 deletions src/ai/backend/common/clients/prometheus/fixed_query_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,21 @@
from ai.backend.common.clients.prometheus.metric_types import (
DIFF_METRICS,
RATE_METRICS,
STATS_AVG_GAUGE_METRIC_PATTERNS,
STATS_AVG_GAUGE_METRICS,
STATS_AVG_OVER_RATE_METRICS,
STATS_MAX_GAUGE_METRIC_PATTERNS,
STATS_MAX_GAUGE_METRICS,
STATS_MAX_OVER_RATE_METRICS,
STATS_RATE_COUNTER_METRICS,
STATS_RATE_GAUGE_METRICS,
ContainerLiveStatQueries,
ContainerMetricOptionalLabel,
MetricType,
)
from ai.backend.common.clients.prometheus.preset import LabelMatcher, MetricPreset
from ai.backend.common.clients.prometheus.querier import ContainerMetricQuerier
from ai.backend.common.clients.prometheus.types import ValueType
from ai.backend.common.exception import UnreachableError
from ai.backend.common.metrics.types import (
CONTAINER_UTILIZATION_METRIC_LABEL_NAME,
CONTAINER_UTILIZATION_METRIC_NAME,
Expand Down Expand Up @@ -47,8 +54,61 @@ class LabelValuesQuery:
metric_match: str


@dataclass(frozen=True)
class _LiveStatQuerySpec:
template: str
metric_name_filter: frozenset[str] | None = None
value_type_filter: ValueType | None = None


@dataclass(frozen=True)
class _StatsBucket:
"""Window-stats bucket spec (gauge metrics + rate metrics for a single stat)."""

value_type: ValueType
gauge_metrics: frozenset[str]
rate_metrics: frozenset[str]
gauge_metric_patterns: frozenset[str] = frozenset()


def _regex_union(values: Sequence[str]) -> str:
return "|".join(re.escape(value) for value in values)
return "|".join(re.escape(value).replace(r"\-", "-") for value in values)


def _metric_name_regex(
metric_names: frozenset[str],
metric_patterns: frozenset[str] = frozenset(),
) -> str:
exact_parts = [re.escape(value) for value in sorted(metric_names)]
return "|".join([*exact_parts, *sorted(metric_patterns)])


_GAUGE_LIVE_STAT_SPEC: Final[_LiveStatQuerySpec] = _LiveStatQuerySpec(
template=_GAUGE_TEMPLATE,
)
_DIFF_LIVE_STAT_SPEC: Final[_LiveStatQuerySpec] = _LiveStatQuerySpec(
template=_DIFF_TEMPLATE,
metric_name_filter=DIFF_METRICS,
value_type_filter=ValueType.CURRENT,
)
_RATE_LIVE_STAT_SPEC: Final[_LiveStatQuerySpec] = _LiveStatQuerySpec(
template=_RATE_TEMPLATE,
metric_name_filter=RATE_METRICS,
value_type_filter=ValueType.CURRENT,
)

_MAX_STATS_BUCKET: Final[_StatsBucket] = _StatsBucket(
value_type=ValueType.MAX,
gauge_metrics=STATS_MAX_GAUGE_METRICS,
rate_metrics=STATS_MAX_OVER_RATE_METRICS,
gauge_metric_patterns=STATS_MAX_GAUGE_METRIC_PATTERNS,
)
_AVG_STATS_BUCKET: Final[_StatsBucket] = _StatsBucket(
value_type=ValueType.AVG,
gauge_metrics=STATS_AVG_GAUGE_METRICS,
rate_metrics=STATS_AVG_OVER_RATE_METRICS,
gauge_metric_patterns=STATS_AVG_GAUGE_METRIC_PATTERNS,
)


class FixedQueryBuilder:
Expand Down Expand Up @@ -101,49 +161,127 @@ def get_container_live_stat_queries(
kernel_ids: Sequence[KernelId],
) -> ContainerLiveStatQueries:
return ContainerLiveStatQueries(
gauge=self._get_container_live_stat_query(
kernel_ids,
metric_type=MetricType.GAUGE,
),
diff=self._get_container_live_stat_query(
kernel_ids,
metric_type=MetricType.DIFF,
metric_name_filter=DIFF_METRICS,
value_type_filter=ValueType.CURRENT,
),
rate=self._get_container_live_stat_query(
kernel_ids,
metric_type=MetricType.RATE,
metric_name_filter=RATE_METRICS,
value_type_filter=ValueType.CURRENT,
),
gauge=self._build_filtered_preset(kernel_ids, _GAUGE_LIVE_STAT_SPEC),
diff=self._build_filtered_preset(kernel_ids, _DIFF_LIVE_STAT_SPEC),
rate=self._build_filtered_preset(kernel_ids, _RATE_LIVE_STAT_SPEC),
max=self._build_window_stats_preset(kernel_ids, _MAX_STATS_BUCKET),
avg=self._build_window_stats_preset(kernel_ids, _AVG_STATS_BUCKET),
rate_stats=self._build_rate_stats_preset(kernel_ids),
)

def _get_container_live_stat_query(
def _build_rate_stats_preset(
self,
kernel_ids: Sequence[KernelId],
*,
metric_type: MetricType,
metric_name_filter: frozenset[str] | None = None,
value_type_filter: ValueType | None = None,
) -> MetricPreset:
kernel_id_regex = _regex_union([str(kid) for kid in kernel_ids])
group_by = ",".join(sorted(_LIVE_STAT_GROUP_BY))
parts: list[str] = []
if STATS_RATE_GAUGE_METRICS:
gauge_regex = _regex_union(sorted(STATS_RATE_GAUGE_METRICS))
selector = self._utilization_selector(kernel_id_regex, gauge_regex)
parts.append(self._labelled_sum(selector, group_by, ValueType.RATE))
if STATS_RATE_COUNTER_METRICS:
counter_regex = _regex_union(sorted(STATS_RATE_COUNTER_METRICS))
base = self._utilization_selector(kernel_id_regex, counter_regex)
selector = f"rate({base}[{self._timewindow}])"
parts.append(self._labelled_sum(selector, group_by, ValueType.RATE))
return MetricPreset(template=" or ".join(parts))

def _labelled_sum(self, selector: str, group_by: str, stat_label: ValueType) -> str:
return (
f"label_replace(sum by ({group_by})({selector}),"
f'"value_type","{stat_label}","value_type",".*")'
)

def _build_window_stats_preset(
self,
kernel_ids: Sequence[KernelId],
bucket: _StatsBucket,
) -> MetricPreset:
kernel_id_regex = _regex_union([str(kid) for kid in kernel_ids])
group_by = ",".join(sorted(_LIVE_STAT_GROUP_BY))
return MetricPreset(
template=self._render_stats_query(
bucket,
kernel_id_regex=kernel_id_regex,
group_by=group_by,
)
)

def _build_filtered_preset(
self,
kernel_ids: Sequence[KernelId],
spec: _LiveStatQuerySpec,
) -> MetricPreset:
labels: dict[str, LabelMatcher] = {
"kernel_id": LabelMatcher.regex(_regex_union([str(kid) for kid in kernel_ids]))
}
if metric_name_filter is not None:
if spec.metric_name_filter is not None:
labels["container_metric_name"] = LabelMatcher.regex(
_regex_union(sorted(metric_name_filter))
_regex_union(sorted(spec.metric_name_filter))
)
if value_type_filter is not None:
labels["value_type"] = LabelMatcher.exact(value_type_filter.value)
if spec.value_type_filter is not None:
labels["value_type"] = LabelMatcher.exact(spec.value_type_filter.value)

return MetricPreset(
template=self._get_template(metric_type),
labels=labels,
template=spec.template,
group_by=_LIVE_STAT_GROUP_BY,
labels=labels,
window=self._timewindow,
)

def _render_stats_query(
self,
bucket: _StatsBucket,
*,
kernel_id_regex: str,
group_by: str,
) -> str:
stat_fn = f"{bucket.value_type}_over_time"
parts: list[str] = []
if bucket.gauge_metrics or bucket.gauge_metric_patterns:
gauge_regex = _metric_name_regex(bucket.gauge_metrics, bucket.gauge_metric_patterns)
selector = self._utilization_selector(kernel_id_regex, gauge_regex)
parts.append(self._window_stat_subquery(stat_fn, selector, group_by, bucket.value_type))
if bucket.rate_metrics:
rate_regex = _regex_union(sorted(bucket.rate_metrics))
base = self._utilization_selector(kernel_id_regex, rate_regex)
selector = f"rate({base}[{self._timewindow}])"
parts.append(self._window_stat_subquery(stat_fn, selector, group_by, bucket.value_type))
return " or ".join(parts)

def _utilization_selector(self, kernel_id_regex: str, metric_name_regex: str) -> str:
labels = self._live_stat_current_labels(
kernel_id_regex=kernel_id_regex,
metric_name_regex=metric_name_regex,
)
return f"{CONTAINER_UTILIZATION_METRIC_NAME}{{{labels}}}"

def _window_stat_subquery(
self,
stat_fn: str,
selector: str,
group_by: str,
stat_label: ValueType,
) -> str:
return (
f"label_replace("
f"{stat_fn}((sum by ({group_by})({selector}))[{self._timewindow}:]),"
f'"value_type","{stat_label}","value_type",".*")'
)

def _live_stat_current_labels(
self,
*,
kernel_id_regex: str,
metric_name_regex: str,
) -> str:
return (
f'kernel_id=~"{kernel_id_regex}"'
f',container_metric_name=~"{metric_name_regex}"'
f',value_type="{ValueType.CURRENT}"'
)

def _get_template(self, metric_type: MetricType) -> str:
match metric_type:
case MetricType.GAUGE:
Expand All @@ -152,5 +290,3 @@ def _get_template(self, metric_type: MetricType) -> str:
return _RATE_TEMPLATE
case MetricType.DIFF:
return _DIFF_TEMPLATE
case _:
raise UnreachableError(f"Unknown metric type: {metric_type}")
32 changes: 30 additions & 2 deletions src/ai/backend/common/clients/prometheus/metric_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,47 @@ class MetricType(StrEnum):

@dataclass(frozen=True)
class ContainerLiveStatQueries:
"""Gauge / diff / rate query preset bundle for container live stats."""
"""Gauge / diff / rate / max / avg / rate_stats query preset bundle for container live stats."""

gauge: MetricPreset
diff: MetricPreset
rate: MetricPreset
max: MetricPreset
avg: MetricPreset
rate_stats: MetricPreset

def to_list(self) -> list[MetricPreset]:
return [self.gauge, self.diff, self.rate]
return [self.gauge, self.diff, self.rate, self.max, self.avg, self.rate_stats]


DIFF_METRICS: Final[frozenset[str]] = frozenset({"cpu_util"})
RATE_METRICS: Final[frozenset[str]] = frozenset({"net_rx", "net_tx"})

# Window stats: built-ins are exact, accelerator/plugin metrics use patterns.
STATS_MAX_GAUGE_METRICS: Final[frozenset[str]] = frozenset({
"mem",
"io_scratch_size",
})
STATS_MAX_GAUGE_METRIC_PATTERNS: Final[frozenset[str]] = frozenset({
r"[A-Za-z0-9][A-Za-z0-9_-]*_(mem|util|power|temperature)",
})
STATS_AVG_GAUGE_METRICS: Final[frozenset[str]] = frozenset()
STATS_AVG_GAUGE_METRIC_PATTERNS: Final[frozenset[str]] = frozenset({
r"[A-Za-z0-9][A-Za-z0-9_-]*_(util|power|temperature)",
})
STATS_MAX_OVER_RATE_METRICS: Final[frozenset[str]] = frozenset({"cpu_util"})
STATS_AVG_OVER_RATE_METRICS: Final[frozenset[str]] = frozenset({"cpu_util"})

# stats.rate emission targets the legacy stats.rate live_stat label.
# Two metric shapes flow in:
# * "gauge" set: agent's current_hook already publishes per-second rate as
# the metric's `current` value, so we only need to sum across replicas
# and relabel to stats.rate (no PromQL rate() wrap).
# * "counter" set: the published series is a cumulative byte counter, so
# we apply rate(...[window]) to get bytes/sec before relabel.
STATS_RATE_GAUGE_METRICS: Final[frozenset[str]] = frozenset({"net_rx", "net_tx"})
STATS_RATE_COUNTER_METRICS: Final[frozenset[str]] = frozenset({"io_read", "io_write"})


@dataclass
class ContainerMetricResponseInfo:
Expand Down
3 changes: 3 additions & 0 deletions src/ai/backend/common/clients/prometheus/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ class ValueType(StrEnum):
CURRENT = "current"
CAPACITY = "capacity"
PCT = "pct"
MAX = "max"
AVG = "avg"
RATE = "rate"


@dataclass(frozen=True)
Expand Down
4 changes: 2 additions & 2 deletions src/ai/backend/manager/repositories/metric/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async def query_container_live_stats(
return KernelLiveStatBatchResult.empty(kernel_ids)
try:
values_by_kernel = await self._prometheus_client.fetch_container_live_stats(kernel_ids)
except (PrometheusConnectionError, FailedToGetMetric):
log.warning("Failed to query metrics for kernel live stats, returning empty results")
except (PrometheusConnectionError, FailedToGetMetric) as e:
log.warning("Failed to query metrics for kernel live stats: {!r}", e)
return KernelLiveStatBatchResult.empty(kernel_ids)
return KernelLiveStatBatchResult.from_metric_values(kernel_ids, values_by_kernel)
Loading
Loading