Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio

### Fixes

- **General**: Add cache field indexes to `verifyScaledObjects` so ScaledObject admission validates duplicate scaleTargetRef/HPA in O(1) instead of O(N), eliminating the webhook OOM under high-scale creation bursts ([#7681](https://github.com/kedacore/keda/pull/7681))
- **General**: Check updated status for Fallback condition instead of ScaledObject ([#7488](https://github.com/kedacore/keda/issues/7488))
- **General**: Fix int64 overflow in milli-quantity conversion for very large metric values ([#7441](https://github.com/kedacore/keda/issues/7441))
- **General**: Fix ScaledObject admission webhook to return validation error from `verifyReplicaCount`, preventing invalid ScaledObjects from being created ([#5954](https://github.com/kedacore/keda/issues/5954))
Expand All @@ -96,6 +97,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **Github Runner Scaler**: Improve URL construction and error handling ([#7495](https://github.com/kedacore/keda/pull/7495))
- **Github Runner Scaler**: Limit HTTP error response logging ([#7469](https://github.com/kedacore/keda/pull/7469))
- **Loki Scaler**: Limit HTTP error response logging ([#7469](https://github.com/kedacore/keda/pull/7469))
- **Loki Scaler**: `serverAddress` now appends `/loki/api/v1/query` to the end of existing path instead of overriding ([#7648](https://github.com/kedacore/keda/pull/7648))
- **Metrics API Scaler**: Fix `aggregateFromKubeServiceEndpoints` using empty label selector that matched all EndpointSlices in the namespace instead of only the target service's ([#7641](https://github.com/kedacore/keda/issues/7641))
- **NATS JetStream Scaler**: URL-encode user input in monitoring URL construction ([#7483](https://github.com/kedacore/keda/pull/7483))
- **Prometheus Scaler**: Handle NaN results in the same manner as Inf ([#7475](https://github.com/kedacore/keda/issues/7475))
Expand Down
116 changes: 85 additions & 31 deletions apis/keda/v1alpha1/scaledobject_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,45 @@ var restMapper meta.RESTMapper
var memoryString = "memory"
var cpuString = "cpu"

// Field index names used by verifyScaledObjects to avoid O(N) full-namespace
// list scans on every SO admission. Without these indexes each admission must
// DeepCopy every ScaledObject in the namespace to find conflicts; at 60k SOs
// each admission allocates ~900 MiB, which is why the webhook OOMs under
// creation bursts. The indexes narrow candidates to 0–1 objects.
const (
scaleTargetRefNameIdx = "spec.scaleTargetRef.name"
hpaNameIdx = "spec.hpaName"
// hpaScaleTargetNameIdx indexes HPA objects by spec.scaleTargetRef.name so
// verifyHpas can issue an O(1) lookup instead of listing every HPA in the
// namespace. Index names are scoped per-GVK so reusing the same path string
// as scaleTargetRefNameIdx is safe.
hpaScaleTargetNameIdx = "spec.scaleTargetRef.name"
)

func (so *ScaledObject) SetupWebhookWithManager(mgr ctrl.Manager, cacheMissFallback bool) error {
// Register field indexes before wiring the webhook so that
// verifyScaledObjects can issue O(1) indexed lookups instead of
// O(N) full-namespace list scans. See the constants above.
ctx := context.Background()
if err := mgr.GetFieldIndexer().IndexField(ctx, &ScaledObject{}, scaleTargetRefNameIdx,
func(obj client.Object) []string {
return []string{obj.(*ScaledObject).Spec.ScaleTargetRef.Name}
}); err != nil {
return fmt.Errorf("failed to register index %q: %w", scaleTargetRefNameIdx, err)
}
if err := mgr.GetFieldIndexer().IndexField(ctx, &ScaledObject{}, hpaNameIdx,
func(obj client.Object) []string {
return []string{getHpaName(*obj.(*ScaledObject))}
}); err != nil {
return fmt.Errorf("failed to register index %q: %w", hpaNameIdx, err)
}
if err := mgr.GetFieldIndexer().IndexField(ctx, &autoscalingv2.HorizontalPodAutoscaler{}, hpaScaleTargetNameIdx,
func(obj client.Object) []string {
return []string{obj.(*autoscalingv2.HorizontalPodAutoscaler).Spec.ScaleTargetRef.Name}
}); err != nil {
return fmt.Errorf("failed to register HPA index %q: %w", hpaScaleTargetNameIdx, err)
}

err := setupKubernetesClients(mgr, cacheMissFallback)
if err != nil {
return fmt.Errorf("failed to setup kubernetes clients: %w", err)
Expand Down Expand Up @@ -235,10 +273,12 @@ func verifyTriggers(incomingObject interface{}, action string, _ bool) error {

func verifyHpas(incomingSo *ScaledObject, action string, _ bool) error {
hpaList := &autoscalingv2.HorizontalPodAutoscalerList{}
opt := &client.ListOptions{
Namespace: incomingSo.Namespace,
}
err := kc.List(context.Background(), hpaList, opt)
// Use the hpaScaleTargetNameIdx field index to narrow candidates to HPAs
// that target the same workload name, avoiding an O(N) full-namespace scan.
err := kc.List(context.Background(), hpaList,
client.InNamespace(incomingSo.Namespace),
client.MatchingFields{hpaScaleTargetNameIdx: incomingSo.Spec.ScaleTargetRef.Name},
)
if err != nil {
return err
}
Expand All @@ -254,8 +294,7 @@ func verifyHpas(incomingSo *ScaledObject, action string, _ bool) error {
if hpa.Annotations[ValidationsHpaOwnershipAnnotation] == "false" {
continue
}
val, _ := json.MarshalIndent(hpa, "", " ")
scaledobjectlog.V(1).Info(fmt.Sprintf("checking hpa %s: %v", hpa.Name, string(val)))
scaledobjectlog.V(1).Info("checking hpa", "name", hpa.Name)

hpaGvkr, err := ParseGVKR(restMapper, hpa.Spec.ScaleTargetRef.APIVersion, hpa.Spec.ScaleTargetRef.Kind)
if err != nil {
Expand Down Expand Up @@ -298,49 +337,64 @@ func verifyHpas(incomingSo *ScaledObject, action string, _ bool) error {
}

func verifyScaledObjects(incomingSo *ScaledObject, action string, _ bool) error {
soList := &ScaledObjectList{}
opt := &client.ListOptions{
Namespace: incomingSo.Namespace,
}
err := kc.List(context.Background(), soList, opt)
if err != nil {
return err
}
ctx := context.Background()

// Check 1: no other SO in this namespace already manages the same workload.
// Use the scaleTargetRef.name index so only SOs targeting the same resource
// name are fetched — typically 0 or 1 objects instead of the entire namespace.
incomingSoGckr, err := ParseGVKR(restMapper, incomingSo.Spec.ScaleTargetRef.APIVersion, incomingSo.Spec.ScaleTargetRef.Kind)
if err != nil {
scaledobjectlog.Error(err, "Failed to parse Group, Version, Kind, Resource from incoming ScaledObject", "apiVersion", incomingSo.Spec.ScaleTargetRef.APIVersion, "kind", incomingSo.Spec.ScaleTargetRef.Kind)
scaledobjectlog.Error(err, "Failed to parse Group, Version, Kind, Resource from incoming ScaledObject",
"apiVersion", incomingSo.Spec.ScaleTargetRef.APIVersion, "kind", incomingSo.Spec.ScaleTargetRef.Kind)
return err
}

incomingSoHpaName := getHpaName(*incomingSo)
for _, so := range soList.Items {
targetCandidates := &ScaledObjectList{}
if err := kc.List(ctx, targetCandidates,
client.InNamespace(incomingSo.Namespace),
client.MatchingFields{scaleTargetRefNameIdx: incomingSo.Spec.ScaleTargetRef.Name},
); err != nil {
return err
}
for _, so := range targetCandidates.Items {
if so.Name == incomingSo.Name {
continue
}
val, _ := json.MarshalIndent(so, "", " ")
scaledobjectlog.V(1).Info(fmt.Sprintf("checking scaledobject %s: %v", so.Name, string(val)))

scaledobjectlog.V(1).Info("checking scaledobject for duplicate scaleTarget",
"name", so.Name, "namespace", so.Namespace)
soGckr, err := ParseGVKR(restMapper, so.Spec.ScaleTargetRef.APIVersion, so.Spec.ScaleTargetRef.Kind)
if err != nil {
scaledobjectlog.Error(err, "Failed to parse Group, Version, Kind, Resource from ScaledObject", "soName", so.Name, "apiVersion", so.Spec.ScaleTargetRef.APIVersion, "kind", so.Spec.ScaleTargetRef.Kind)
scaledobjectlog.Error(err, "Failed to parse Group, Version, Kind, Resource from ScaledObject",
"soName", so.Name, "apiVersion", so.Spec.ScaleTargetRef.APIVersion, "kind", so.Spec.ScaleTargetRef.Kind)
return err
}

if soGckr.GVKString() == incomingSoGckr.GVKString() &&
so.Spec.ScaleTargetRef.Name == incomingSo.Spec.ScaleTargetRef.Name {
err = fmt.Errorf("the workload '%s' of type '%s' is already managed by the ScaledObject '%s'", so.Spec.ScaleTargetRef.Name, incomingSoGckr.GVKString(), so.Name)
if soGckr.GVKString() == incomingSoGckr.GVKString() {
err = fmt.Errorf("the workload '%s' of type '%s' is already managed by the ScaledObject '%s'",
so.Spec.ScaleTargetRef.Name, incomingSoGckr.GVKString(), so.Name)
scaledobjectlog.Error(err, "validation error")
metricscollector.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "other-scaled-object")
return err
}
}

if getHpaName(so) == incomingSoHpaName {
err = fmt.Errorf("the HPA '%s' is already managed by the ScaledObject '%s'", so.Spec.Advanced.HorizontalPodAutoscalerConfig.Name, so.Name)
scaledobjectlog.Error(err, "validation error")
metricscollector.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "other-scaled-object-hpa")
return err
// Check 2: no other SO already owns the same HPA name.
// Use the hpaName index for the same reason.
incomingSoHpaName := getHpaName(*incomingSo)
hpaCandidates := &ScaledObjectList{}
if err := kc.List(ctx, hpaCandidates,
client.InNamespace(incomingSo.Namespace),
client.MatchingFields{hpaNameIdx: incomingSoHpaName},
); err != nil {
return err
}
for _, so := range hpaCandidates.Items {
if so.Name == incomingSo.Name {
continue
}
err = fmt.Errorf("the HPA '%s' is already managed by the ScaledObject '%s'",
incomingSoHpaName, so.Name)
scaledobjectlog.Error(err, "validation error")
metricscollector.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "other-scaled-object-hpa")
return err
}

// verify ScalingModifiers structure if defined in ScaledObject
Expand Down
6 changes: 2 additions & 4 deletions cmd/adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,8 @@ func main() {
return
}

err = kedautil.ConfigureMaxProcs(setupLog)
if err != nil {
setupLog.Error(err, "failed to set max procs")
return
if err := kedautil.ConfigureMaxProcs(setupLog); err != nil {
setupLog.Info("failed to set max procs, using default GOMAXPROCS", "error", err)
}

kedaProvider, err := cmd.makeProvider(ctx)
Expand Down
6 changes: 2 additions & 4 deletions cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,8 @@ func main() {
ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))
ctx := ctrl.SetupSignalHandler()

err := kedautil.ConfigureMaxProcs(setupLog)
if err != nil {
setupLog.Error(err, "failed to set max procs")
os.Exit(1)
if err := kedautil.ConfigureMaxProcs(setupLog); err != nil {
setupLog.Info("failed to set max procs, using default GOMAXPROCS", "error", err)
}

namespaces, err := kedautil.GetWatchNamespaces()
Expand Down
7 changes: 2 additions & 5 deletions cmd/webhooks/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,9 @@ func main() {

ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))

err := kedautil.ConfigureMaxProcs(setupLog)
if err != nil {
setupLog.Error(err, "failed to set max procs")
os.Exit(1)
if err := kedautil.ConfigureMaxProcs(setupLog); err != nil {
setupLog.Info("failed to set max procs, using default GOMAXPROCS", "error", err)
}

ctx := ctrl.SetupSignalHandler()

cfg := ctrl.GetConfigOrDie()
Expand Down
2 changes: 2 additions & 0 deletions pkg/scalers/apache_kafka_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
)

const stringTrue = "true"

type parseApacheKafkaMetadataTestData struct {
metadata map[string]string
isError bool
Expand Down
49 changes: 41 additions & 8 deletions pkg/scalers/authentication/authentication_types.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package authentication

import (
"crypto/tls"
"fmt"
"net/url"
"strings"
"time"

kedautil "github.com/kedacore/keda/v2/pkg/util"
)

// Type describes the authentication type used in a scaler
Expand Down Expand Up @@ -81,7 +85,7 @@ type CertAuth struct {
// OAuth is an oAuth2 authentication type
type OAuth struct {
OauthTokenURI string `keda:"name=oauthTokenURI, order=authParams"`
Scopes []string `keda:"name=scopes, order=authParams"`
Scopes []string `keda:"name=scopes;scope, order=authParams"`
ClientID string `keda:"name=clientID, order=authParams"`
ClientSecret string `keda:"name=clientSecret, order=authParams"`
EndpointParams url.Values `keda:"name=endpointParams, order=authParams"`
Expand Down Expand Up @@ -119,6 +123,9 @@ func (c *Config) Disabled() bool {

// Enabled returns true if given auth mode is enabled
func (c *Config) Enabled(mode Type) bool {
if c == nil {
return false
}
for _, m := range c.Modes {
if m == mode {
return true
Expand All @@ -140,32 +147,58 @@ func (c *Config) GetBearerToken() string {
return fmt.Sprintf("Bearer %s", c.BearerToken)
}

// Validate validates the Config and returns an error if it is invalid
// Validate validates the Config and returns an error if it is invalid.
// It also calls Normalize() to clean up parsed values.
func (c *Config) Validate() error {
if c.Disabled() {
return nil
}
c.Normalize()
if c.EnabledBearerAuth() && c.BearerToken == "" {
return fmt.Errorf("bearer token is required when bearer auth is enabled")
return fmt.Errorf("bearer token=%v is required when bearer auth is enabled", empty(c.BearerToken))
}
if c.EnabledBasicAuth() && c.Username == "" {
return fmt.Errorf("username is required when basic auth is enabled")
return fmt.Errorf("username=%v is required when basic auth is enabled", empty(c.Username))
}
if c.EnabledTLS() && (c.Cert == "" || c.Key == "") {
return fmt.Errorf("cert and key are required when tls auth is enabled")
return fmt.Errorf("cert=%v and key=%v are required when tls auth is enabled", empty(c.Cert), empty(c.Key))
}
if c.EnabledOAuth() && (c.OauthTokenURI == "" || c.ClientID == "" || c.ClientSecret == "") {
return fmt.Errorf("oauthTokenURI, clientID and clientSecret are required when oauth is enabled")
if c.EnabledOAuth() && (c.OauthTokenURI == "" || c.ClientID == "") {
return fmt.Errorf("oauthTokenURI=%v and clientID=%v are required when oauth is enabled", empty(c.OauthTokenURI), empty(c.ClientID))
}
if c.EnabledCustomAuth() && (c.CustomAuthHeader == "" || c.CustomAuthValue == "") {
return fmt.Errorf("customAuthHeader and customAuthValue are required when custom auth is enabled")
return fmt.Errorf("customAuthHeader=%v and customAuthValue=%v are required when custom auth is enabled", empty(c.CustomAuthHeader), empty(c.CustomAuthValue))
}
if c.EnabledAPIKeyAuth() && c.APIKey == "" {
return fmt.Errorf("apiKey is required when apiKey auth is enabled")
}
return nil
}

// Normalize removes whitespace-only entries from Scopes.
func (c *Config) Normalize() {
var scopes []string
for _, s := range c.Scopes {
if t := strings.TrimSpace(s); t != "" {
scopes = append(scopes, t)
}
}
c.Scopes = scopes
}

// NewTLSConfig creates a tls.Config from the Config's cert/key/CA fields.
func (c *Config) NewTLSConfig(unsafeSsl bool) (*tls.Config, error) {
return kedautil.NewTLSConfig(c.Cert, c.Key, c.CA, unsafeSsl)
}

// empty is a helper function for more readable errors when auth params are misconfigured
func empty(a string) string {
if a == "" {
return "<empty>"
}
return "<present>"
}

// ToAuthMeta converts the Config to deprecated AuthMeta
func (c *Config) ToAuthMeta() *AuthMeta {
if c.Disabled() {
Expand Down
28 changes: 26 additions & 2 deletions pkg/scalers/loki_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"io"
"net/http"
"net/url"
"path"
"strconv"
"strings"

"github.com/go-logr/logr"
v2 "k8s.io/api/autoscaling/v2"
Expand Down Expand Up @@ -115,11 +117,10 @@ func (s *lokiScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
}

func (s *lokiScaler) ExecuteLokiQuery(ctx context.Context) (float64, error) {
u, err := url.ParseRequestURI(s.metadata.ServerAddress)
u, err := getServerAddress(&s.metadata)
if err != nil {
return -1, err
}
u.Path = "/loki/api/v1/query"
u.RawQuery = url.Values{"query": []string{s.metadata.Query}}.Encode()

req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil)
Expand Down Expand Up @@ -218,3 +219,26 @@ func (s *lokiScaler) GetMetricsAndActivity(ctx context.Context, metricName strin
metric := GenerateMetricInMili(metricName, val)
return []external_metrics.ExternalMetricValue{metric}, val > s.metadata.ActivationThreshold, nil
}

func getServerAddress(metadata *lokiMetadata) (url.URL, error) {
const lokiPath = "/loki/api/v1/query"
u, err := url.ParseRequestURI(metadata.ServerAddress)
if err != nil {
return url.URL{}, err
}

u.Path = strings.TrimRight(u.Path, "/")

if strings.HasSuffix(u.Path, lokiPath) {
return *u, nil
}

if strings.HasPrefix(lokiPath, u.Path+"/") {
remaining := strings.TrimPrefix(lokiPath, u.Path)
u.Path = path.Join(u.Path, remaining)
} else {
u.Path = path.Join(u.Path, lokiPath)
}

return *u, nil
}
Loading