Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **General**: Make APIService cert injections optional ([#7559](https://github.com/kedacore/keda/pull/7559))
- **Elasticsearch Scaler**: Add HTTP status check for Elasticsearch errors ([#7480](https://github.com/kedacore/keda/pull/7480))
- **Kubernetes Workload Scaler**: Add `groupByNode` parameter ([#7628](https://github.com/kedacore/keda/issues/7628))
- **General**: Add histogram mirrors `keda_scaler_metrics_duration_seconds` and `keda_internal_scale_loop_duration_seconds` for the existing latency gauges, on both Prometheus and OpenTelemetry exporters ([#7675](https://github.com/kedacore/keda/issues/7675))

### Fixes

Expand Down
45 changes: 42 additions & 3 deletions pkg/metricscollector/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ var (
otelInternalLoopLatencyValDeprecated []OtelMetricFloat64Val
otelBuildInfoVal OtelMetricInt64Val

// Histogram mirrors of the latency gauges — see issue #7675.
otScalerMetricsDuration api.Float64Histogram
otInternalLoopDuration api.Float64Histogram

otCloudEventEmittedCounter api.Int64Counter
otCloudEventQueueStatusVals []OtelMetricFloat64Val

Expand Down Expand Up @@ -180,6 +184,29 @@ func initMeters() {
otLog.Error(err, msg)
}

// Synchronous histogram mirrors. Bucket boundaries match the Prometheus
// histograms in prommetrics.go so the two backends report comparable
// distributions. See #7675.
durationBuckets := []float64{0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30, 60}
otScalerMetricsDuration, err = meter.Float64Histogram(
"keda.scaler.metrics.duration.seconds",
api.WithDescription("Distribution of latency, in seconds, for retrieving each scaler's current metric"),
api.WithUnit("s"),
api.WithExplicitBucketBoundaries(durationBuckets...),
)
if err != nil {
otLog.Error(err, msg)
}
otInternalLoopDuration, err = meter.Float64Histogram(
"keda.internal.scale.loop.duration.seconds",
api.WithDescription("Distribution of internal scaling-loop deviation in seconds (expected vs actual execution time)"),
api.WithUnit("s"),
api.WithExplicitBucketBoundaries(durationBuckets...),
)
if err != nil {
otLog.Error(err, msg)
}

_, err = meter.Float64ObservableGauge(
"keda.scaler.active",
api.WithDescription("Indicates whether a scaler is active (1), or not (0)"),
Expand Down Expand Up @@ -278,17 +305,25 @@ func ScalerMetricsLatencyCallbackDeprecated(_ context.Context, obsrv api.Float64
return nil
}

// RecordScalerLatency create a measurement of the latency to external metric
// RecordScalerLatency create a measurement of the latency to external metric.
// Records to the deprecated gauge, the deprecated millisecond gauge, and the
// histogram mirror introduced in #7675.
func (o *OtelMetrics) RecordScalerLatency(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value time.Duration) {
measurementOption := getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)

otelScalerMetricsLatency := OtelMetricFloat64Val{}
otelScalerMetricsLatency.val = value.Seconds()
otelScalerMetricsLatency.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)
otelScalerMetricsLatency.measurementOption = measurementOption
otelScalerMetricsLatencyVals = append(otelScalerMetricsLatencyVals, otelScalerMetricsLatency)

otelScalerMetricsLatencyValD := OtelMetricFloat64Val{}
otelScalerMetricsLatencyValD.val = float64(value.Milliseconds())
otelScalerMetricsLatencyValD.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)
otelScalerMetricsLatencyValD.measurementOption = measurementOption
otelScalerMetricsLatencyValDeprecated = append(otelScalerMetricsLatencyValDeprecated, otelScalerMetricsLatencyValD)

if otScalerMetricsDuration != nil {
otScalerMetricsDuration.Record(context.Background(), value.Seconds(), measurementOption)
}
}

func ScalableObjectLatencyCallback(_ context.Context, obsrv api.Float64Observer) error {
Expand Down Expand Up @@ -328,6 +363,10 @@ func (o *OtelMetrics) RecordScalableObjectLatency(namespace string, name string,
otelInternalLoopLatencyD.val = float64(value.Milliseconds())
otelInternalLoopLatencyD.measurementOption = opt
otelInternalLoopLatencyValDeprecated = append(otelInternalLoopLatencyValDeprecated, otelInternalLoopLatencyD)

if otInternalLoopDuration != nil {
otInternalLoopDuration.Record(context.Background(), value.Seconds(), opt)
}
}

func ScalerActiveCallback(_ context.Context, obsrv api.Float64Observer) error {
Expand Down
46 changes: 40 additions & 6 deletions pkg/metricscollector/prommetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,20 @@ var (
Namespace: DefaultPromMetricsNamespace,
Subsystem: "scaler",
Name: "metrics_latency_seconds",
Help: "The latency of retrieving current metric from each scaler, in seconds.",
Help: "The latency of retrieving current metric from each scaler, in seconds. DEPRECATED: prefer the histogram `keda_scaler_metrics_duration_seconds`.",
},
metricLabels,
)
// scalerMetricsDuration mirrors scalerMetricsLatency as a histogram. Histograms
// preserve distribution information (p50/p95/p99) which gauges cannot. The gauge
// is kept in parallel for backwards compatibility — see issue #7675.
scalerMetricsDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: DefaultPromMetricsNamespace,
Subsystem: "scaler",
Name: "metrics_duration_seconds",
Help: "Distribution of latency, in seconds, for retrieving each scaler's current metric.",
Buckets: []float64{0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30, 60},
},
metricLabels,
)
Expand Down Expand Up @@ -127,7 +140,18 @@ var (
Namespace: DefaultPromMetricsNamespace,
Subsystem: "internal_scale_loop",
Name: "latency_seconds",
Help: "Total deviation (in seconds) between the expected execution time and the actual execution time for the scaling loop.",
Help: "Total deviation (in seconds) between the expected execution time and the actual execution time for the scaling loop. DEPRECATED: prefer the histogram `keda_internal_scale_loop_duration_seconds`.",
},
[]string{"namespace", "type", "resource"},
)
// internalLoopDuration mirrors internalLoopLatency as a histogram (issue #7675).
internalLoopDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: DefaultPromMetricsNamespace,
Subsystem: "internal_scale_loop",
Name: "duration_seconds",
Help: "Distribution of internal scaling-loop deviation in seconds (expected vs actual execution time).",
Buckets: []float64{0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30, 60},
},
[]string{"namespace", "type", "resource"},
)
Expand Down Expand Up @@ -160,7 +184,9 @@ type PromMetrics struct {
func NewPromMetrics() *PromMetrics {
metrics.Registry.MustRegister(scalerMetricsValue)
metrics.Registry.MustRegister(scalerMetricsLatency)
metrics.Registry.MustRegister(scalerMetricsDuration)
metrics.Registry.MustRegister(internalLoopLatency)
metrics.Registry.MustRegister(internalLoopDuration)
metrics.Registry.MustRegister(scalerActive)
metrics.Registry.MustRegister(scalerErrors)
metrics.Registry.MustRegister(scaledObjectErrors)
Expand Down Expand Up @@ -194,16 +220,24 @@ func (p *PromMetrics) DeleteScalerMetrics(namespace string, scaledResource strin
scalerActive.DeletePartialMatch(prometheus.Labels{"namespace": namespace, "scaledObject": scaledResource, "type": getResourceType(isScaledObject)})
scalerErrors.DeletePartialMatch(prometheus.Labels{"namespace": namespace, "scaledObject": scaledResource, "type": getResourceType(isScaledObject)})
scalerMetricsLatency.DeletePartialMatch(prometheus.Labels{"namespace": namespace, "scaledObject": scaledResource, "type": getResourceType(isScaledObject)})
scalerMetricsDuration.DeletePartialMatch(prometheus.Labels{"namespace": namespace, "scaledObject": scaledResource, "type": getResourceType(isScaledObject)})
}

// RecordScalerLatency create a measurement of the latency to external metric
// RecordScalerLatency create a measurement of the latency to external metric.
// Writes both the deprecated gauge (`keda_scaler_metrics_latency_seconds`) and
// the histogram mirror (`keda_scaler_metrics_duration_seconds`) — see #7675.
func (p *PromMetrics) RecordScalerLatency(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value time.Duration) {
scalerMetricsLatency.With(getLabels(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)).Set(value.Seconds())
labels := getLabels(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)
scalerMetricsLatency.With(labels).Set(value.Seconds())
scalerMetricsDuration.With(labels).Observe(value.Seconds())
}

// RecordScalableObjectLatency create a measurement of the latency executing scalable object loop
// RecordScalableObjectLatency create a measurement of the latency executing
// scalable object loop. Writes both the deprecated gauge and histogram mirror.
func (p *PromMetrics) RecordScalableObjectLatency(namespace string, name string, isScaledObject bool, value time.Duration) {
internalLoopLatency.WithLabelValues(namespace, getResourceType(isScaledObject), name).Set(value.Seconds())
resourceType := getResourceType(isScaledObject)
internalLoopLatency.WithLabelValues(namespace, resourceType, name).Set(value.Seconds())
internalLoopDuration.WithLabelValues(namespace, resourceType, name).Observe(value.Seconds())
}

// RecordScalerActive create a measurement of the activity of the scaler
Expand Down
72 changes: 72 additions & 0 deletions pkg/metricscollector/prommetrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
Copyright 2026 The KEDA Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0
*/

package metricscollector

import (
"testing"
"time"

"github.com/prometheus/client_golang/prometheus/testutil"
)

// TestRecordScalerLatencyEmitsHistogram verifies that calling RecordScalerLatency
// produces observations in keda_scaler_metrics_duration_seconds in addition to
// the legacy gauge keda_scaler_metrics_latency_seconds. See issue #7675.
func TestRecordScalerLatencyEmitsHistogram(t *testing.T) {
scalerMetricsDuration.Reset()
scalerMetricsLatency.Reset()

p := &PromMetrics{}
p.RecordScalerLatency("ns", "so1", "kafka-scaler", 0, "lag", true, 250*time.Millisecond)
p.RecordScalerLatency("ns", "so1", "kafka-scaler", 0, "lag", true, 1500*time.Millisecond)

if got, want := testutil.CollectAndCount(scalerMetricsDuration, "keda_scaler_metrics_duration_seconds"), 1; got != want {
t.Fatalf("histogram series count = %d, want %d", got, want)
}
// CollectAndCount on a histogram counts time series, not observations.
// Use ToFloat64 on the gauge to confirm it was also written (it stores last-set value).
if got := testutil.ToFloat64(scalerMetricsLatency); got != 1.5 {
t.Fatalf("gauge keda_scaler_metrics_latency_seconds = %v, want 1.5 (last value)", got)
}
}

// TestRecordScalableObjectLatencyEmitsHistogram covers the internal-loop variant.
func TestRecordScalableObjectLatencyEmitsHistogram(t *testing.T) {
internalLoopDuration.Reset()
internalLoopLatency.Reset()

p := &PromMetrics{}
p.RecordScalableObjectLatency("ns", "so2", true, 30*time.Millisecond)
p.RecordScalableObjectLatency("ns", "so2", true, 70*time.Millisecond)

if got, want := testutil.CollectAndCount(internalLoopDuration, "keda_internal_scale_loop_duration_seconds"), 1; got != want {
t.Fatalf("histogram series count = %d, want %d", got, want)
}
if got := testutil.ToFloat64(internalLoopLatency); got != 0.07 {
t.Fatalf("gauge keda_internal_scale_loop_latency_seconds = %v, want 0.07 (last value)", got)
}
}

// TestDeleteScalerMetricsDropsHistogram ensures DeleteScalerMetrics also cleans
// the new histogram series so we don't keep stale labels around forever.
func TestDeleteScalerMetricsDropsHistogram(t *testing.T) {
scalerMetricsDuration.Reset()

p := &PromMetrics{}
p.RecordScalerLatency("ns", "so3", "redis-scaler", 0, "len", true, 10*time.Millisecond)
if got := testutil.CollectAndCount(scalerMetricsDuration, "keda_scaler_metrics_duration_seconds"); got != 1 {
t.Fatalf("expected 1 series before delete, got %d", got)
}
p.DeleteScalerMetrics("ns", "so3", true)
if got := testutil.CollectAndCount(scalerMetricsDuration, "keda_scaler_metrics_duration_seconds"); got != 0 {
t.Fatalf("expected 0 series after delete, got %d", got)
}
}
Loading