diff --git a/CHANGELOG.md b/CHANGELOG.md index 8ea7e99e481..b22eaab3e01 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -87,6 +87,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio - **General**: Check updated status for Fallback condition instead of ScaledObject ([#7488](https://github.com/kedacore/keda/issues/7488)) - **General**: Fix int64 overflow in milli-quantity conversion for very large metric values ([#7441](https://github.com/kedacore/keda/issues/7441)) - **General**: Fix ScaledObject admission webhook to return validation error from `verifyReplicaCount`, preventing invalid ScaledObjects from being created ([#5954](https://github.com/kedacore/keda/issues/5954)) +- **General**: Jitter the first tick of each scale loop by `hash(UID) % pollingInterval` to prevent thundering-herd polling against external metric sources when many ScaledObjects are created in a short window or re-spawned after an operator restart ([#7676](https://github.com/kedacore/keda/pull/7676)) - **Azure Data Explorer Scaler**: Remove clientSecretFromEnv support ([#7554](https://github.com/kedacore/keda/pull/7554)) - **Cron Scaler**: Fix metric name generation so cron expressions with comma-separated values no longer produce invalid metric names ([#7448](https://github.com/kedacore/keda/issues/7448)) - **Forgejo Scaler**: Limit HTTP error response logging ([#7469](https://github.com/kedacore/keda/pull/7469)) diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index ee2f2f6d731..89424a4ba58 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -19,6 +19,7 @@ package scaling import ( "context" "fmt" + "hash/fnv" "reflect" "slices" "strconv" @@ -57,6 +58,33 @@ import ( "github.com/kedacore/keda/v2/pkg/scaling/scaledjob" ) +const maxJitterWindow = time.Minute + +// jitterOffset returns a deterministic offset in [0, min(interval, maxJitterWindow)) +// derived from the given UID. It is used to stagger the first tick of each +// scale loop so that ScalableObjects created in the same reconcile +// batch do not all poll on the same boundary. +// +// The window is capped at maxJitterWindow so objects with long polling +// intervals (e.g. 10 min) are not delayed more than a minute on first tick. +// +// The offset is a function of the UID only, so the phase is stable +// across operator restarts: when the operator comes back up and +// re-spawns scale loops for existing objects, each loop lands at the +// same phase it had before. +func jitterOffset(uid types.UID, interval time.Duration) time.Duration { + if interval <= 0 || uid == "" { + return 0 + } + window := interval + if window > maxJitterWindow { + window = maxJitterWindow + } + h := fnv.New64a() + _, _ = h.Write([]byte(uid)) + return time.Duration(h.Sum64() % uint64(window)) +} + var ( log = logf.Log.WithName("scale_handler") ) @@ -182,6 +210,24 @@ func (h *scaleHandler) startScaleLoop(ctx context.Context, withTriggers *kedav1a pollingInterval := withTriggers.GetPollingInterval() logger.V(1).Info("Watching with pollingInterval", "PollingInterval", pollingInterval) + // Jitter the first tick to spread scale loops spawned in the same + // reconcile batch across the polling interval. Without this, a batch + // of ScalableObjects created in a short window (e.g. bulk apply, or + // operator restart re-spawning existing loops) poll on the same + // ~30-second boundary forever, creating a thundering herd against + // external metric sources. Keyed off the object UID so the phase is + // stable across operator restarts. + if offset := jitterOffset(withTriggers.UID, pollingInterval); offset > 0 { + jitter := time.NewTimer(offset) + select { + case <-jitter.C: + case <-ctx.Done(): + jitter.Stop() + logger.V(1).Info("Context canceled before first poll") + return + } + } + _, isScaledObject := scalableObject.(*kedav1alpha1.ScaledObject) next := time.Now() diff --git a/pkg/scaling/scale_handler_test.go b/pkg/scaling/scale_handler_test.go index 07849927fac..e0061924976 100644 --- a/pkg/scaling/scale_handler_test.go +++ b/pkg/scaling/scale_handler_test.go @@ -1330,3 +1330,77 @@ func TestHandleResult_DeltaDoesNotOverwriteConcurrentChanges(t *testing.T) { assert.True(t, patchedObj.Status.TriggersActivity["trigger-a"].IsActive, "concurrent update to trigger-a must be preserved") assert.True(t, patchedObj.Status.TriggersActivity["trigger-b"].IsActive, "trigger-b updated by push scaler") } + +func TestJitterOffset_InRange(t *testing.T) { + interval := 30 * time.Second + for i := 0; i < 1000; i++ { + uid := types.UID(fmt.Sprintf("fake-uid-%d-%d-%d-%d", i, i*7, i*13, i*17)) + got := jitterOffset(uid, interval) + if got < 0 || got >= interval { + t.Fatalf("offset %s for uid %q out of range [0, %s)", got, uid, interval) + } + } +} + +func TestJitterOffset_Deterministic(t *testing.T) { + interval := 30 * time.Second + uid := types.UID("f3c2e9b0-1a7d-4c21-b0e4-dd1c7f5e9a00") + first := jitterOffset(uid, interval) + for i := 0; i < 100; i++ { + if got := jitterOffset(uid, interval); got != first { + t.Fatalf("jitterOffset not deterministic: iteration %d got %s, want %s", i, got, first) + } + } +} + +func TestJitterOffset_ZeroInterval(t *testing.T) { + if got := jitterOffset("some-uid", 0); got != 0 { + t.Fatalf("zero interval should return 0, got %s", got) + } + if got := jitterOffset("some-uid", -time.Second); got != 0 { + t.Fatalf("negative interval should return 0, got %s", got) + } +} + +func TestJitterOffset_EmptyUID(t *testing.T) { + if got := jitterOffset("", 30*time.Second); got != 0 { + t.Fatalf("empty UID should return 0, got %s", got) + } +} + +func TestJitterOffset_CappedAtMaxWindow(t *testing.T) { + uid := types.UID("f3c2e9b0-1a7d-4c21-b0e4-dd1c7f5e9a00") + got := jitterOffset(uid, 10*time.Minute) + if got < 0 || got >= maxJitterWindow { + t.Fatalf("offset %s for long interval should be in [0, %s), got %s", got, maxJitterWindow, got) + } +} + +func TestJitterOffset_Distribution(t *testing.T) { + const ( + count = 10000 + buckets = 10 + interval = 30 * time.Second + // With 10000 samples across 10 buckets, expected count per bucket is 1000. + // Allow wide tolerance to avoid flakiness. + minPerBucket = 700 + maxPerBucket = 1300 + ) + counts := make([]int, buckets) + bucketWidth := interval / time.Duration(buckets) + for i := 0; i < count; i++ { + uid := types.UID(fmt.Sprintf("scaledobject-uid-%d", i)) + offset := jitterOffset(uid, interval) + b := int(offset / bucketWidth) + if b >= buckets { + b = buckets - 1 + } + counts[b]++ + } + for i, c := range counts { + if c < minPerBucket || c > maxPerBucket { + t.Errorf("bucket %d has count %d, outside tolerance [%d, %d] -- distribution may be degenerate. Full counts: %v", + i, c, minPerBucket, maxPerBucket, counts) + } + } +}