Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **General**: Check updated status for Fallback condition instead of ScaledObject ([#7488](https://github.com/kedacore/keda/issues/7488))
- **General**: Fix int64 overflow in milli-quantity conversion for very large metric values ([#7441](https://github.com/kedacore/keda/issues/7441))
- **General**: Fix ScaledObject admission webhook to return validation error from `verifyReplicaCount`, preventing invalid ScaledObjects from being created ([#5954](https://github.com/kedacore/keda/issues/5954))
- **General**: Jitter the first tick of each scale loop by `hash(UID) % pollingInterval` to prevent thundering-herd polling against external metric sources when many ScaledObjects are created in a short window or re-spawned after an operator restart ([#7676](https://github.com/kedacore/keda/pull/7676))
- **Azure Data Explorer Scaler**: Remove clientSecretFromEnv support ([#7554](https://github.com/kedacore/keda/pull/7554))
- **Cron Scaler**: Fix metric name generation so cron expressions with comma-separated values no longer produce invalid metric names ([#7448](https://github.com/kedacore/keda/issues/7448))
- **Forgejo Scaler**: Limit HTTP error response logging ([#7469](https://github.com/kedacore/keda/pull/7469))
Expand Down
46 changes: 46 additions & 0 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package scaling
import (
"context"
"fmt"
"hash/fnv"
"reflect"
"slices"
"strconv"
Expand Down Expand Up @@ -57,6 +58,33 @@ import (
"github.com/kedacore/keda/v2/pkg/scaling/scaledjob"
)

const maxJitterWindow = time.Minute

// jitterOffset returns a deterministic offset in [0, min(interval, maxJitterWindow))
// derived from the given UID. It is used to stagger the first tick of each
// scale loop so that ScalableObjects created in the same reconcile
// batch do not all poll on the same boundary.
//
// The window is capped at maxJitterWindow so objects with long polling
// intervals (e.g. 10 min) are not delayed more than a minute on first tick.
//
// The offset is a function of the UID only, so the phase is stable
// across operator restarts: when the operator comes back up and
// re-spawns scale loops for existing objects, each loop lands at the
// same phase it had before.
func jitterOffset(uid types.UID, interval time.Duration) time.Duration {
if interval <= 0 || uid == "" {
return 0
}
window := interval
if window > maxJitterWindow {
window = maxJitterWindow
}
h := fnv.New64a()
_, _ = h.Write([]byte(uid))
return time.Duration(h.Sum64() % uint64(window))
}

var (
log = logf.Log.WithName("scale_handler")
)
Expand Down Expand Up @@ -182,6 +210,24 @@ func (h *scaleHandler) startScaleLoop(ctx context.Context, withTriggers *kedav1a
pollingInterval := withTriggers.GetPollingInterval()
logger.V(1).Info("Watching with pollingInterval", "PollingInterval", pollingInterval)

// Jitter the first tick to spread scale loops spawned in the same
// reconcile batch across the polling interval. Without this, a batch
// of ScalableObjects created in a short window (e.g. bulk apply, or
// operator restart re-spawning existing loops) poll on the same
// ~30-second boundary forever, creating a thundering herd against
// external metric sources. Keyed off the object UID so the phase is
// stable across operator restarts.
if offset := jitterOffset(withTriggers.UID, pollingInterval); offset > 0 {
Copy link
Copy Markdown
Member

@JorTurFer JorTurFer Apr 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the improvement! The only concern I have is that some specific edge cases can use realy long polling intervals. Does it make sense to set a max window? Something like

min(pollingInterval, 1 min)

If the polling interval is short, it will work, but for long polling intervals, the request will be delayed as max 1 min

jitter := time.NewTimer(offset)
select {
case <-jitter.C:
case <-ctx.Done():
jitter.Stop()
logger.V(1).Info("Context canceled before first poll")
return
}
}

_, isScaledObject := scalableObject.(*kedav1alpha1.ScaledObject)
next := time.Now()

Expand Down
74 changes: 74 additions & 0 deletions pkg/scaling/scale_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1330,3 +1330,77 @@ func TestHandleResult_DeltaDoesNotOverwriteConcurrentChanges(t *testing.T) {
assert.True(t, patchedObj.Status.TriggersActivity["trigger-a"].IsActive, "concurrent update to trigger-a must be preserved")
assert.True(t, patchedObj.Status.TriggersActivity["trigger-b"].IsActive, "trigger-b updated by push scaler")
}

func TestJitterOffset_InRange(t *testing.T) {
interval := 30 * time.Second
for i := 0; i < 1000; i++ {
uid := types.UID(fmt.Sprintf("fake-uid-%d-%d-%d-%d", i, i*7, i*13, i*17))
got := jitterOffset(uid, interval)
if got < 0 || got >= interval {
t.Fatalf("offset %s for uid %q out of range [0, %s)", got, uid, interval)
}
}
}

func TestJitterOffset_Deterministic(t *testing.T) {
interval := 30 * time.Second
uid := types.UID("f3c2e9b0-1a7d-4c21-b0e4-dd1c7f5e9a00")
first := jitterOffset(uid, interval)
for i := 0; i < 100; i++ {
if got := jitterOffset(uid, interval); got != first {
t.Fatalf("jitterOffset not deterministic: iteration %d got %s, want %s", i, got, first)
}
}
}

func TestJitterOffset_ZeroInterval(t *testing.T) {
if got := jitterOffset("some-uid", 0); got != 0 {
t.Fatalf("zero interval should return 0, got %s", got)
}
if got := jitterOffset("some-uid", -time.Second); got != 0 {
t.Fatalf("negative interval should return 0, got %s", got)
}
}

func TestJitterOffset_EmptyUID(t *testing.T) {
if got := jitterOffset("", 30*time.Second); got != 0 {
t.Fatalf("empty UID should return 0, got %s", got)
}
}

func TestJitterOffset_CappedAtMaxWindow(t *testing.T) {
uid := types.UID("f3c2e9b0-1a7d-4c21-b0e4-dd1c7f5e9a00")
got := jitterOffset(uid, 10*time.Minute)
if got < 0 || got >= maxJitterWindow {
t.Fatalf("offset %s for long interval should be in [0, %s), got %s", got, maxJitterWindow, got)
}
}

func TestJitterOffset_Distribution(t *testing.T) {
const (
count = 10000
buckets = 10
interval = 30 * time.Second
// With 10000 samples across 10 buckets, expected count per bucket is 1000.
// Allow wide tolerance to avoid flakiness.
minPerBucket = 700
maxPerBucket = 1300
)
counts := make([]int, buckets)
bucketWidth := interval / time.Duration(buckets)
for i := 0; i < count; i++ {
uid := types.UID(fmt.Sprintf("scaledobject-uid-%d", i))
offset := jitterOffset(uid, interval)
b := int(offset / bucketWidth)
if b >= buckets {
b = buckets - 1
}
counts[b]++
}
for i, c := range counts {
if c < minPerBucket || c > maxPerBucket {
t.Errorf("bucket %d has count %d, outside tolerance [%d, %d] -- distribution may be degenerate. Full counts: %v",
i, c, minPerBucket, maxPerBucket, counts)
}
}
}
Loading