diff --git a/CHANGELOG.md b/CHANGELOG.md index 06a59e6ae9d..1682abb12e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -84,6 +84,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio ### Fixes +- **General**: Add cache field indexes to `verifyScaledObjects` so ScaledObject admission validates duplicate scaleTargetRef/HPA in O(1) instead of O(N), eliminating the webhook OOM under high-scale creation bursts ([#7681](https://github.com/kedacore/keda/pull/7681)) - **General**: Check updated status for Fallback condition instead of ScaledObject ([#7488](https://github.com/kedacore/keda/issues/7488)) - **General**: Fix int64 overflow in milli-quantity conversion for very large metric values ([#7441](https://github.com/kedacore/keda/issues/7441)) - **General**: Fix ScaledObject admission webhook to return validation error from `verifyReplicaCount`, preventing invalid ScaledObjects from being created ([#5954](https://github.com/kedacore/keda/issues/5954)) @@ -96,6 +97,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio - **Github Runner Scaler**: Improve URL construction and error handling ([#7495](https://github.com/kedacore/keda/pull/7495)) - **Github Runner Scaler**: Limit HTTP error response logging ([#7469](https://github.com/kedacore/keda/pull/7469)) - **Loki Scaler**: Limit HTTP error response logging ([#7469](https://github.com/kedacore/keda/pull/7469)) +- **Loki Scaler**: `serverAddress` now appends `/loki/api/v1/query` to the end of existing path instead of overriding ([#7648](https://github.com/kedacore/keda/pull/7648)) - **Metrics API Scaler**: Fix `aggregateFromKubeServiceEndpoints` using empty label selector that matched all EndpointSlices in the namespace instead of only the target service's ([#7641](https://github.com/kedacore/keda/issues/7641)) - **NATS JetStream Scaler**: URL-encode user input in monitoring URL construction ([#7483](https://github.com/kedacore/keda/pull/7483)) - **Prometheus Scaler**: Handle NaN results in the same manner as Inf ([#7475](https://github.com/kedacore/keda/issues/7475)) diff --git a/apis/keda/v1alpha1/scaledobject_webhook.go b/apis/keda/v1alpha1/scaledobject_webhook.go index 4d1975656b9..90bd3426888 100644 --- a/apis/keda/v1alpha1/scaledobject_webhook.go +++ b/apis/keda/v1alpha1/scaledobject_webhook.go @@ -52,7 +52,45 @@ var restMapper meta.RESTMapper var memoryString = "memory" var cpuString = "cpu" +// Field index names used by verifyScaledObjects to avoid O(N) full-namespace +// list scans on every SO admission. Without these indexes each admission must +// DeepCopy every ScaledObject in the namespace to find conflicts; at 60k SOs +// each admission allocates ~900 MiB, which is why the webhook OOMs under +// creation bursts. The indexes narrow candidates to 0–1 objects. +const ( + scaleTargetRefNameIdx = "spec.scaleTargetRef.name" + hpaNameIdx = "spec.hpaName" + // hpaScaleTargetNameIdx indexes HPA objects by spec.scaleTargetRef.name so + // verifyHpas can issue an O(1) lookup instead of listing every HPA in the + // namespace. Index names are scoped per-GVK so reusing the same path string + // as scaleTargetRefNameIdx is safe. + hpaScaleTargetNameIdx = "spec.scaleTargetRef.name" +) + func (so *ScaledObject) SetupWebhookWithManager(mgr ctrl.Manager, cacheMissFallback bool) error { + // Register field indexes before wiring the webhook so that + // verifyScaledObjects can issue O(1) indexed lookups instead of + // O(N) full-namespace list scans. See the constants above. + ctx := context.Background() + if err := mgr.GetFieldIndexer().IndexField(ctx, &ScaledObject{}, scaleTargetRefNameIdx, + func(obj client.Object) []string { + return []string{obj.(*ScaledObject).Spec.ScaleTargetRef.Name} + }); err != nil { + return fmt.Errorf("failed to register index %q: %w", scaleTargetRefNameIdx, err) + } + if err := mgr.GetFieldIndexer().IndexField(ctx, &ScaledObject{}, hpaNameIdx, + func(obj client.Object) []string { + return []string{getHpaName(*obj.(*ScaledObject))} + }); err != nil { + return fmt.Errorf("failed to register index %q: %w", hpaNameIdx, err) + } + if err := mgr.GetFieldIndexer().IndexField(ctx, &autoscalingv2.HorizontalPodAutoscaler{}, hpaScaleTargetNameIdx, + func(obj client.Object) []string { + return []string{obj.(*autoscalingv2.HorizontalPodAutoscaler).Spec.ScaleTargetRef.Name} + }); err != nil { + return fmt.Errorf("failed to register HPA index %q: %w", hpaScaleTargetNameIdx, err) + } + err := setupKubernetesClients(mgr, cacheMissFallback) if err != nil { return fmt.Errorf("failed to setup kubernetes clients: %w", err) @@ -235,10 +273,12 @@ func verifyTriggers(incomingObject interface{}, action string, _ bool) error { func verifyHpas(incomingSo *ScaledObject, action string, _ bool) error { hpaList := &autoscalingv2.HorizontalPodAutoscalerList{} - opt := &client.ListOptions{ - Namespace: incomingSo.Namespace, - } - err := kc.List(context.Background(), hpaList, opt) + // Use the hpaScaleTargetNameIdx field index to narrow candidates to HPAs + // that target the same workload name, avoiding an O(N) full-namespace scan. + err := kc.List(context.Background(), hpaList, + client.InNamespace(incomingSo.Namespace), + client.MatchingFields{hpaScaleTargetNameIdx: incomingSo.Spec.ScaleTargetRef.Name}, + ) if err != nil { return err } @@ -254,8 +294,7 @@ func verifyHpas(incomingSo *ScaledObject, action string, _ bool) error { if hpa.Annotations[ValidationsHpaOwnershipAnnotation] == "false" { continue } - val, _ := json.MarshalIndent(hpa, "", " ") - scaledobjectlog.V(1).Info(fmt.Sprintf("checking hpa %s: %v", hpa.Name, string(val))) + scaledobjectlog.V(1).Info("checking hpa", "name", hpa.Name) hpaGvkr, err := ParseGVKR(restMapper, hpa.Spec.ScaleTargetRef.APIVersion, hpa.Spec.ScaleTargetRef.Kind) if err != nil { @@ -298,49 +337,64 @@ func verifyHpas(incomingSo *ScaledObject, action string, _ bool) error { } func verifyScaledObjects(incomingSo *ScaledObject, action string, _ bool) error { - soList := &ScaledObjectList{} - opt := &client.ListOptions{ - Namespace: incomingSo.Namespace, - } - err := kc.List(context.Background(), soList, opt) - if err != nil { - return err - } + ctx := context.Background() + // Check 1: no other SO in this namespace already manages the same workload. + // Use the scaleTargetRef.name index so only SOs targeting the same resource + // name are fetched — typically 0 or 1 objects instead of the entire namespace. incomingSoGckr, err := ParseGVKR(restMapper, incomingSo.Spec.ScaleTargetRef.APIVersion, incomingSo.Spec.ScaleTargetRef.Kind) if err != nil { - scaledobjectlog.Error(err, "Failed to parse Group, Version, Kind, Resource from incoming ScaledObject", "apiVersion", incomingSo.Spec.ScaleTargetRef.APIVersion, "kind", incomingSo.Spec.ScaleTargetRef.Kind) + scaledobjectlog.Error(err, "Failed to parse Group, Version, Kind, Resource from incoming ScaledObject", + "apiVersion", incomingSo.Spec.ScaleTargetRef.APIVersion, "kind", incomingSo.Spec.ScaleTargetRef.Kind) return err } - - incomingSoHpaName := getHpaName(*incomingSo) - for _, so := range soList.Items { + targetCandidates := &ScaledObjectList{} + if err := kc.List(ctx, targetCandidates, + client.InNamespace(incomingSo.Namespace), + client.MatchingFields{scaleTargetRefNameIdx: incomingSo.Spec.ScaleTargetRef.Name}, + ); err != nil { + return err + } + for _, so := range targetCandidates.Items { if so.Name == incomingSo.Name { continue } - val, _ := json.MarshalIndent(so, "", " ") - scaledobjectlog.V(1).Info(fmt.Sprintf("checking scaledobject %s: %v", so.Name, string(val))) - + scaledobjectlog.V(1).Info("checking scaledobject for duplicate scaleTarget", + "name", so.Name, "namespace", so.Namespace) soGckr, err := ParseGVKR(restMapper, so.Spec.ScaleTargetRef.APIVersion, so.Spec.ScaleTargetRef.Kind) if err != nil { - scaledobjectlog.Error(err, "Failed to parse Group, Version, Kind, Resource from ScaledObject", "soName", so.Name, "apiVersion", so.Spec.ScaleTargetRef.APIVersion, "kind", so.Spec.ScaleTargetRef.Kind) + scaledobjectlog.Error(err, "Failed to parse Group, Version, Kind, Resource from ScaledObject", + "soName", so.Name, "apiVersion", so.Spec.ScaleTargetRef.APIVersion, "kind", so.Spec.ScaleTargetRef.Kind) return err } - - if soGckr.GVKString() == incomingSoGckr.GVKString() && - so.Spec.ScaleTargetRef.Name == incomingSo.Spec.ScaleTargetRef.Name { - err = fmt.Errorf("the workload '%s' of type '%s' is already managed by the ScaledObject '%s'", so.Spec.ScaleTargetRef.Name, incomingSoGckr.GVKString(), so.Name) + if soGckr.GVKString() == incomingSoGckr.GVKString() { + err = fmt.Errorf("the workload '%s' of type '%s' is already managed by the ScaledObject '%s'", + so.Spec.ScaleTargetRef.Name, incomingSoGckr.GVKString(), so.Name) scaledobjectlog.Error(err, "validation error") metricscollector.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "other-scaled-object") return err } + } - if getHpaName(so) == incomingSoHpaName { - err = fmt.Errorf("the HPA '%s' is already managed by the ScaledObject '%s'", so.Spec.Advanced.HorizontalPodAutoscalerConfig.Name, so.Name) - scaledobjectlog.Error(err, "validation error") - metricscollector.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "other-scaled-object-hpa") - return err + // Check 2: no other SO already owns the same HPA name. + // Use the hpaName index for the same reason. + incomingSoHpaName := getHpaName(*incomingSo) + hpaCandidates := &ScaledObjectList{} + if err := kc.List(ctx, hpaCandidates, + client.InNamespace(incomingSo.Namespace), + client.MatchingFields{hpaNameIdx: incomingSoHpaName}, + ); err != nil { + return err + } + for _, so := range hpaCandidates.Items { + if so.Name == incomingSo.Name { + continue } + err = fmt.Errorf("the HPA '%s' is already managed by the ScaledObject '%s'", + incomingSoHpaName, so.Name) + scaledobjectlog.Error(err, "validation error") + metricscollector.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "other-scaled-object-hpa") + return err } // verify ScalingModifiers structure if defined in ScaledObject diff --git a/cmd/adapter/main.go b/cmd/adapter/main.go index afa03f6d77d..208a97d3934 100644 --- a/cmd/adapter/main.go +++ b/cmd/adapter/main.go @@ -288,10 +288,8 @@ func main() { return } - err = kedautil.ConfigureMaxProcs(setupLog) - if err != nil { - setupLog.Error(err, "failed to set max procs") - return + if err := kedautil.ConfigureMaxProcs(setupLog); err != nil { + setupLog.Info("failed to set max procs, using default GOMAXPROCS", "error", err) } kedaProvider, err := cmd.makeProvider(ctx) diff --git a/cmd/operator/main.go b/cmd/operator/main.go index 8274a6de3bf..437022e6f96 100644 --- a/cmd/operator/main.go +++ b/cmd/operator/main.go @@ -124,10 +124,8 @@ func main() { ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts))) ctx := ctrl.SetupSignalHandler() - err := kedautil.ConfigureMaxProcs(setupLog) - if err != nil { - setupLog.Error(err, "failed to set max procs") - os.Exit(1) + if err := kedautil.ConfigureMaxProcs(setupLog); err != nil { + setupLog.Info("failed to set max procs, using default GOMAXPROCS", "error", err) } namespaces, err := kedautil.GetWatchNamespaces() diff --git a/cmd/webhooks/main.go b/cmd/webhooks/main.go index 2dc3cb62f94..ab18a2d3d6d 100644 --- a/cmd/webhooks/main.go +++ b/cmd/webhooks/main.go @@ -78,12 +78,9 @@ func main() { ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts))) - err := kedautil.ConfigureMaxProcs(setupLog) - if err != nil { - setupLog.Error(err, "failed to set max procs") - os.Exit(1) + if err := kedautil.ConfigureMaxProcs(setupLog); err != nil { + setupLog.Info("failed to set max procs, using default GOMAXPROCS", "error", err) } - ctx := ctrl.SetupSignalHandler() cfg := ctrl.GetConfigOrDie() diff --git a/pkg/scalers/apache_kafka_scaler_test.go b/pkg/scalers/apache_kafka_scaler_test.go index 4348d8d82c4..3e6f8f706b5 100644 --- a/pkg/scalers/apache_kafka_scaler_test.go +++ b/pkg/scalers/apache_kafka_scaler_test.go @@ -12,6 +12,8 @@ import ( "github.com/kedacore/keda/v2/pkg/scalers/scalersconfig" ) +const stringTrue = "true" + type parseApacheKafkaMetadataTestData struct { metadata map[string]string isError bool diff --git a/pkg/scalers/authentication/authentication_types.go b/pkg/scalers/authentication/authentication_types.go index 1d0b875ae6c..ace57dedbfc 100644 --- a/pkg/scalers/authentication/authentication_types.go +++ b/pkg/scalers/authentication/authentication_types.go @@ -1,9 +1,13 @@ package authentication import ( + "crypto/tls" "fmt" "net/url" + "strings" "time" + + kedautil "github.com/kedacore/keda/v2/pkg/util" ) // Type describes the authentication type used in a scaler @@ -81,7 +85,7 @@ type CertAuth struct { // OAuth is an oAuth2 authentication type type OAuth struct { OauthTokenURI string `keda:"name=oauthTokenURI, order=authParams"` - Scopes []string `keda:"name=scopes, order=authParams"` + Scopes []string `keda:"name=scopes;scope, order=authParams"` ClientID string `keda:"name=clientID, order=authParams"` ClientSecret string `keda:"name=clientSecret, order=authParams"` EndpointParams url.Values `keda:"name=endpointParams, order=authParams"` @@ -119,6 +123,9 @@ func (c *Config) Disabled() bool { // Enabled returns true if given auth mode is enabled func (c *Config) Enabled(mode Type) bool { + if c == nil { + return false + } for _, m := range c.Modes { if m == mode { return true @@ -140,25 +147,27 @@ func (c *Config) GetBearerToken() string { return fmt.Sprintf("Bearer %s", c.BearerToken) } -// Validate validates the Config and returns an error if it is invalid +// Validate validates the Config and returns an error if it is invalid. +// It also calls Normalize() to clean up parsed values. func (c *Config) Validate() error { if c.Disabled() { return nil } + c.Normalize() if c.EnabledBearerAuth() && c.BearerToken == "" { - return fmt.Errorf("bearer token is required when bearer auth is enabled") + return fmt.Errorf("bearer token=%v is required when bearer auth is enabled", empty(c.BearerToken)) } if c.EnabledBasicAuth() && c.Username == "" { - return fmt.Errorf("username is required when basic auth is enabled") + return fmt.Errorf("username=%v is required when basic auth is enabled", empty(c.Username)) } if c.EnabledTLS() && (c.Cert == "" || c.Key == "") { - return fmt.Errorf("cert and key are required when tls auth is enabled") + return fmt.Errorf("cert=%v and key=%v are required when tls auth is enabled", empty(c.Cert), empty(c.Key)) } - if c.EnabledOAuth() && (c.OauthTokenURI == "" || c.ClientID == "" || c.ClientSecret == "") { - return fmt.Errorf("oauthTokenURI, clientID and clientSecret are required when oauth is enabled") + if c.EnabledOAuth() && (c.OauthTokenURI == "" || c.ClientID == "") { + return fmt.Errorf("oauthTokenURI=%v and clientID=%v are required when oauth is enabled", empty(c.OauthTokenURI), empty(c.ClientID)) } if c.EnabledCustomAuth() && (c.CustomAuthHeader == "" || c.CustomAuthValue == "") { - return fmt.Errorf("customAuthHeader and customAuthValue are required when custom auth is enabled") + return fmt.Errorf("customAuthHeader=%v and customAuthValue=%v are required when custom auth is enabled", empty(c.CustomAuthHeader), empty(c.CustomAuthValue)) } if c.EnabledAPIKeyAuth() && c.APIKey == "" { return fmt.Errorf("apiKey is required when apiKey auth is enabled") @@ -166,6 +175,30 @@ func (c *Config) Validate() error { return nil } +// Normalize removes whitespace-only entries from Scopes. +func (c *Config) Normalize() { + var scopes []string + for _, s := range c.Scopes { + if t := strings.TrimSpace(s); t != "" { + scopes = append(scopes, t) + } + } + c.Scopes = scopes +} + +// NewTLSConfig creates a tls.Config from the Config's cert/key/CA fields. +func (c *Config) NewTLSConfig(unsafeSsl bool) (*tls.Config, error) { + return kedautil.NewTLSConfig(c.Cert, c.Key, c.CA, unsafeSsl) +} + +// empty is a helper function for more readable errors when auth params are misconfigured +func empty(a string) string { + if a == "" { + return "" + } + return "" +} + // ToAuthMeta converts the Config to deprecated AuthMeta func (c *Config) ToAuthMeta() *AuthMeta { if c.Disabled() { diff --git a/pkg/scalers/loki_scaler.go b/pkg/scalers/loki_scaler.go index c565eb9b11b..a77641f1d45 100644 --- a/pkg/scalers/loki_scaler.go +++ b/pkg/scalers/loki_scaler.go @@ -7,7 +7,9 @@ import ( "io" "net/http" "net/url" + "path" "strconv" + "strings" "github.com/go-logr/logr" v2 "k8s.io/api/autoscaling/v2" @@ -115,11 +117,10 @@ func (s *lokiScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { } func (s *lokiScaler) ExecuteLokiQuery(ctx context.Context) (float64, error) { - u, err := url.ParseRequestURI(s.metadata.ServerAddress) + u, err := getServerAddress(&s.metadata) if err != nil { return -1, err } - u.Path = "/loki/api/v1/query" u.RawQuery = url.Values{"query": []string{s.metadata.Query}}.Encode() req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil) @@ -218,3 +219,26 @@ func (s *lokiScaler) GetMetricsAndActivity(ctx context.Context, metricName strin metric := GenerateMetricInMili(metricName, val) return []external_metrics.ExternalMetricValue{metric}, val > s.metadata.ActivationThreshold, nil } + +func getServerAddress(metadata *lokiMetadata) (url.URL, error) { + const lokiPath = "/loki/api/v1/query" + u, err := url.ParseRequestURI(metadata.ServerAddress) + if err != nil { + return url.URL{}, err + } + + u.Path = strings.TrimRight(u.Path, "/") + + if strings.HasSuffix(u.Path, lokiPath) { + return *u, nil + } + + if strings.HasPrefix(lokiPath, u.Path+"/") { + remaining := strings.TrimPrefix(lokiPath, u.Path) + u.Path = path.Join(u.Path, remaining) + } else { + u.Path = path.Join(u.Path, lokiPath) + } + + return *u, nil +} diff --git a/pkg/scalers/loki_scaler_test.go b/pkg/scalers/loki_scaler_test.go index a089f186dc0..2852aee4a07 100644 --- a/pkg/scalers/loki_scaler_test.go +++ b/pkg/scalers/loki_scaler_test.go @@ -250,3 +250,87 @@ func TestLokiScalerTenantHeader(t *testing.T) { _, err := scaler.ExecuteLokiQuery(context.TODO()) assert.NoError(t, err) } + +type getServerAddressTestData struct { + name string + serverAddress string + expectedPath string + isError bool +} + +var testGetServerAddressData = []getServerAddressTestData{ + { + name: "no path in URL", + serverAddress: "http://localhost:3100", + expectedPath: "/loki/api/v1/query", + isError: false, + }, + { + name: "URL with trailing slash", + serverAddress: "http://localhost:3100/", + expectedPath: "/loki/api/v1/query", + isError: false, + }, + { + name: "URL with partial loki path", + serverAddress: "http://localhost:3100/loki", + expectedPath: "/loki/api/v1/query", + isError: false, + }, + { + name: "URL with longer partial loki path", + serverAddress: "http://localhost:3100/loki/api/v1", + expectedPath: "/loki/api/v1/query", + isError: false, + }, + { + name: "URL with full loki path already present", + serverAddress: "http://localhost:3100/loki/api/v1/query", + expectedPath: "/loki/api/v1/query", + isError: false, + }, + { + name: "URL with full loki path and trailing slash", + serverAddress: "http://localhost:3100/loki/api/v1/query/", + expectedPath: "/loki/api/v1/query", + isError: false, + }, + { + name: "URL with custom prefix", + serverAddress: "http://localhost:3100/custom", + expectedPath: "/custom/loki/api/v1/query", + isError: false, + }, + { + name: "URL with custom prefix and full loki path already present", + serverAddress: "http://localhost:3100/custom/loki/api/v1/query", + expectedPath: "/custom/loki/api/v1/query", + isError: false, + }, + { + name: "URL with partial loki path not on segment boundary", + serverAddress: "http://localhost:3100/lo", + expectedPath: "/lo/loki/api/v1/query", + isError: false, + }, + { + name: "invalid URL", + serverAddress: "not-a-url", + isError: true, + }, +} + +func TestGetServerAddress(t *testing.T) { + for _, testData := range testGetServerAddressData { + t.Run(testData.name, func(t *testing.T) { + meta := &lokiMetadata{ServerAddress: testData.serverAddress} + u, err := getServerAddress(meta) + if testData.isError { + assert.Error(t, err) + return + } + assert.NoError(t, err) + assert.Equal(t, testData.expectedPath, u.Path) + }) + } +} diff --git a/pkg/scalers/pulsar_scaler.go b/pkg/scalers/pulsar_scaler.go index 25f1fa22ffd..6b8d587cd8e 100644 --- a/pkg/scalers/pulsar_scaler.go +++ b/pkg/scalers/pulsar_scaler.go @@ -7,14 +7,16 @@ import ( "io" "net/http" "strings" - "time" "github.com/go-logr/logr" + "golang.org/x/oauth2" "golang.org/x/oauth2/clientcredentials" v2 "k8s.io/api/autoscaling/v2" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/metrics/pkg/apis/external_metrics" + "github.com/kedacore/keda/v2/pkg/eventreason" "github.com/kedacore/keda/v2/pkg/scalers/authentication" "github.com/kedacore/keda/v2/pkg/scalers/scalersconfig" kedautil "github.com/kedacore/keda/v2/pkg/util" @@ -24,7 +26,6 @@ const ( pulsarMetricType = "External" defaultMsgBacklogThreshold = 10 enable = "enable" - stringTrue = "true" pulsarAuthModeHeader = "X-Pulsar-Auth-Method-Name" ) @@ -35,22 +36,15 @@ type pulsarScaler struct { } type pulsarMetadata struct { - AdminURL string `keda:"name=adminURL, order=triggerMetadata;resolvedEnv"` - Topic string `keda:"name=topic, order=triggerMetadata;resolvedEnv"` - Subscription string `keda:"name=subscription, order=triggerMetadata;resolvedEnv"` - MsgBacklogThreshold int64 `keda:"name=msgBacklogThreshold, order=triggerMetadata, default=10"` - ActivationMsgBacklogThreshold int64 `keda:"name=activationMsgBacklogThreshold, order=triggerMetadata, default=0"` - IsPartitionedTopic bool `keda:"name=isPartitionedTopic, order=triggerMetadata, default=false"` - TLS string `keda:"name=tls, order=triggerMetadata, optional"` - AuthModes string `keda:"name=authModes, order=triggerMetadata, optional"` - - // OAuth fields - OauthTokenURI string `keda:"name=oauthTokenURI, order=triggerMetadata, optional"` - Scope string `keda:"name=scope, order=triggerMetadata, optional"` - ClientID string `keda:"name=clientID, order=triggerMetadata, optional"` - EndpointParams string `keda:"name=EndpointParams, order=triggerMetadata, optional"` - - pulsarAuth *authentication.AuthMeta //nolint:staticcheck // This has to be solved when auth is refactored + AdminURL string `keda:"name=adminURL, order=triggerMetadata;resolvedEnv"` + Topic string `keda:"name=topic, order=triggerMetadata;resolvedEnv"` + Subscription string `keda:"name=subscription, order=triggerMetadata;resolvedEnv"` + MsgBacklogThreshold int64 `keda:"name=msgBacklogThreshold, order=triggerMetadata, default=10"` + ActivationMsgBacklogThreshold int64 `keda:"name=activationMsgBacklogThreshold, order=triggerMetadata, default=0"` + IsPartitionedTopic bool `keda:"name=isPartitionedTopic, order=triggerMetadata, default=false"` + TLS string `keda:"name=tls, order=triggerMetadata, optional"` + PulsarAuth *authentication.Config `keda:"optional"` + statsURL string metricName string triggerIndex int @@ -119,56 +113,32 @@ func (m *pulsarMetadata) buildMetricName() { // handleBackwardsCompatibility handles backwards compatibility for TLS configuration func (m *pulsarMetadata) handleBackwardsCompatibility(config *scalersconfig.ScalerConfig) { // For backwards compatibility, we need to map "tls: enable" to auth modes - if m.TLS == enable && (config.AuthParams["cert"] != "" || config.AuthParams["key"] != "") { + if config.TriggerMetadata["tls"] == enable && (config.AuthParams["cert"] != "" || config.AuthParams["key"] != "") { if authModes, authModesOk := config.TriggerMetadata[authentication.AuthModesKey]; authModesOk { config.TriggerMetadata[authentication.AuthModesKey] = fmt.Sprintf("%s,%s", authModes, authentication.TLSAuthType) } else { config.TriggerMetadata[authentication.AuthModesKey] = string(authentication.TLSAuthType) } } -} -// setupAuthentication configures authentication for the pulsar scaler -func (m *pulsarMetadata) setupAuthentication(config *scalersconfig.ScalerConfig) error { - auth, err := authentication.GetAuthConfigs(config.TriggerMetadata, config.AuthParams) - if err != nil { - return fmt.Errorf("error parsing authentication: %w", err) - } - - if auth != nil && auth.EnableOAuth { - if err := m.configureOAuth(auth); err != nil { - return err - } - } - - m.pulsarAuth = auth - return nil -} - -// configureOAuth configures OAuth settings -func (m *pulsarMetadata) configureOAuth(auth *authentication.AuthMeta) error { //nolint:staticcheck // This has to be solved when auth is refactored - if auth.OauthTokenURI == "" { - auth.OauthTokenURI = m.OauthTokenURI - } - if auth.Scopes == nil { - auth.Scopes = authentication.ParseScope(m.Scope) - } - if auth.ClientID == "" { - auth.ClientID = m.ClientID + // Pulsar historically allowed OAuth fields in triggerMetadata; + // copy them to authParams (where TypedConfig expects them), with authParams priority. + oauthKeyMapping := map[string]string{ + "oauthTokenURI": "oauthTokenURI", + "scope": "scope", + "clientID": "clientID", + "EndpointParams": "endpointParams", } - // client_secret is not required for mtls OAuth(RFC8705) - // set secret to random string to work around the Go OAuth lib - if auth.ClientSecret == "" { - auth.ClientSecret = time.Now().String() - } - if auth.EndpointParams == nil { - v, err := authentication.ParseEndpointParams(m.EndpointParams) - if err != nil { - return fmt.Errorf("error parsing EndpointParams: %s", m.EndpointParams) + for metaKey, authKey := range oauthKeyMapping { + if v, ok := config.TriggerMetadata[metaKey]; ok && v != "" { + if config.AuthParams == nil { + config.AuthParams = make(map[string]string) + } + if config.AuthParams[authKey] == "" { + config.AuthParams[authKey] = v + } } - auth.EndpointParams = v } - return nil } // NewPulsarScaler creates a new PulsarScaler @@ -181,16 +151,24 @@ func NewPulsarScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { client := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, false) - if pulsarMetadata.pulsarAuth != nil { - if pulsarMetadata.pulsarAuth.CA != "" || pulsarMetadata.pulsarAuth.EnableTLS { - config, err := authentication.NewTLSConfig(pulsarMetadata.pulsarAuth, false) + if pulsarMetadata.PulsarAuth.EnabledOAuth() && pulsarMetadata.PulsarAuth.ClientSecret == "" && !pulsarMetadata.PulsarAuth.EnabledTLS() { + msg := "OAuth is configured without clientSecret and without mTLS (RFC 8705). Add a clientSecret or enable mTLS to ensure secure authentication." + logger.Info(msg) + if config.Recorder != nil { + config.Recorder.Event(config.ScaledObject, corev1.EventTypeWarning, eventreason.KEDAScalersInfo, msg) + } + } + + if !pulsarMetadata.PulsarAuth.Disabled() { + if pulsarMetadata.PulsarAuth.CA != "" || pulsarMetadata.PulsarAuth.EnabledTLS() { + tlsConfig, err := pulsarMetadata.PulsarAuth.NewTLSConfig(false) if err != nil { return nil, err } - client.Transport = kedautil.CreateHTTPTransportWithTLSConfig(config) + client.Transport = kedautil.CreateHTTPTransportWithTLSConfig(tlsConfig) } - if pulsarMetadata.pulsarAuth.EnableBearerAuth || pulsarMetadata.pulsarAuth.EnableBasicAuth { + if pulsarMetadata.PulsarAuth.EnabledBearerAuth() || pulsarMetadata.PulsarAuth.EnabledBasicAuth() { // The pulsar broker redirects HTTP calls to other brokers and expects the Authorization header client.CheckRedirect = func(req *http.Request, via []*http.Request) error { if len(via) != 0 && via[0] != nil && via[0].Response != nil && via[0].Response.StatusCode == http.StatusTemporaryRedirect { @@ -210,6 +188,7 @@ func NewPulsarScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { func parsePulsarMetadata(config *scalersconfig.ScalerConfig) (*pulsarMetadata, error) { meta := &pulsarMetadata{triggerIndex: config.TriggerIndex} + meta.handleBackwardsCompatibility(config) if err := config.TypedConfig(meta); err != nil { return nil, fmt.Errorf("error parsing pulsar metadata: %w", err) @@ -217,11 +196,6 @@ func parsePulsarMetadata(config *scalersconfig.ScalerConfig) (*pulsarMetadata, e meta.buildStatsURL() meta.buildMetricName() - meta.handleBackwardsCompatibility(config) - - if err := meta.setupAuthentication(config); err != nil { - return nil, err - } return meta, nil } @@ -235,15 +209,22 @@ func (s *pulsarScaler) GetStats(ctx context.Context) (*pulsarStats, error) { } client := s.httpClient - if s.metadata.pulsarAuth != nil && s.metadata.pulsarAuth.EnableOAuth { - config := clientcredentials.Config{ - ClientID: s.metadata.pulsarAuth.ClientID, - ClientSecret: s.metadata.pulsarAuth.ClientSecret, - TokenURL: s.metadata.pulsarAuth.OauthTokenURI, - Scopes: s.metadata.pulsarAuth.Scopes, - EndpointParams: s.metadata.pulsarAuth.EndpointParams, + if s.metadata.PulsarAuth.EnabledOAuth() { + clientSecret := s.metadata.PulsarAuth.ClientSecret + if clientSecret == "" { + // Go's OAuth lib requires a non-empty secret, but mTLS OAuth (RFC 8705) + // doesn't use one. Use a dummy value to satisfy the library. + clientSecret = "unused" + } + oauthConfig := clientcredentials.Config{ + ClientID: s.metadata.PulsarAuth.ClientID, + ClientSecret: clientSecret, + TokenURL: s.metadata.PulsarAuth.OauthTokenURI, + Scopes: s.metadata.PulsarAuth.Scopes, + EndpointParams: s.metadata.PulsarAuth.EndpointParams, } - client = config.Client(context.Background()) + oauthCtx := context.WithValue(ctx, oauth2.HTTPClient, s.httpClient) + client = oauthConfig.Client(oauthCtx) } addAuthHeaders(req, s.metadata) @@ -331,17 +312,17 @@ func (s *pulsarScaler) Close(context.Context) error { // addAuthHeaders add the relevant headers used by Pulsar to authenticate and authorize http requests func addAuthHeaders(req *http.Request, metadata *pulsarMetadata) { - if metadata.pulsarAuth == nil { + if metadata.PulsarAuth.Disabled() { return } switch { - case metadata.pulsarAuth.EnableBearerAuth: - req.Header.Add("Authorization", authentication.GetBearerToken(metadata.pulsarAuth)) + case metadata.PulsarAuth.EnabledBearerAuth(): + req.Header.Add("Authorization", metadata.PulsarAuth.GetBearerToken()) req.Header.Add(pulsarAuthModeHeader, "token") - case metadata.pulsarAuth.EnableBasicAuth: - req.SetBasicAuth(metadata.pulsarAuth.Username, metadata.pulsarAuth.Password) + case metadata.PulsarAuth.EnabledBasicAuth(): + req.SetBasicAuth(metadata.PulsarAuth.Username, metadata.PulsarAuth.Password) req.Header.Add(pulsarAuthModeHeader, "basic") - case metadata.pulsarAuth.EnableTLS: + case metadata.PulsarAuth.EnabledTLS(): // When BearerAuth or BasicAuth are also configured, let them take precedence for the purposes of // the authMode header. req.Header.Add(pulsarAuthModeHeader, "tls") diff --git a/pkg/scalers/pulsar_scaler_test.go b/pkg/scalers/pulsar_scaler_test.go index 7fab663ff49..65460d9ecb0 100644 --- a/pkg/scalers/pulsar_scaler_test.go +++ b/pkg/scalers/pulsar_scaler_test.go @@ -5,12 +5,13 @@ import ( "fmt" "net/http" "net/http/httptest" - "reflect" + "net/url" "strconv" "strings" "testing" "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/api/equality" "github.com/kedacore/keda/v2/pkg/scalers/scalersconfig" ) @@ -42,7 +43,7 @@ type parsePulsarAuthParamsTestData struct { clientID string clientSecret string endpointParams string - expectedEndpointParams map[string][]string + expectedEndpointParams url.Values } type pulsarMetricIdentifier struct { @@ -108,8 +109,10 @@ var parsePulsarMetadataTestAuthTLSDataset = []parsePulsarAuthParamsTestData{ {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "", "id5", "secret123", "", nil}, // Passes, invalid scopes provided {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "scope": " "}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "scope": " , \n", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "", "id5", "secret123", "", nil}, - // Passes, with audience provided in endpointParams - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "", "id5", "secret123", "audience=abc", map[string][]string{"audience": {"abc"}}}, + // Passes, with audience provided in endpointParams via authParams + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "clientID": "id5", "clientSecret": "secret123", "endpointParams": "audience=abc"}, false, false, "", "", "cadata", "", "", "", false, "https5", "", "id5", "secret123", "audience=abc", url.Values{"audience": {"abc"}}}, + // Passes, with EndpointParams provided in triggerMetadata (backwards compat) + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https6", "clientID": "id6", "EndpointParams": "audience=xyz"}, map[string]string{"ca": "cadata"}, false, false, "", "", "cadata", "", "", "", false, "https6", "", "id6", "", "audience=xyz", url.Values{"audience": {"xyz"}}}, } var pulsarMetricIdentifiers = []pulsarMetricIdentifier{ @@ -217,36 +220,36 @@ func TestPulsarAuthParams(t *testing.T) { t.Error("Expected error but got success") } - if meta == nil || meta.pulsarAuth == nil { - t.Log("meta.pulsarAuth is nil, skipping rest of validation of", testData) + if meta == nil || meta.PulsarAuth == nil { + t.Log("meta.PulsarAuth is nil, skipping rest of validation of", testData) continue } - if meta.pulsarAuth.EnableTLS != testData.enableTLS { - t.Errorf("Expected enableTLS to be set to %v but got %v\n", testData.enableTLS, meta.pulsarAuth.EnableTLS) + if meta.PulsarAuth.EnabledTLS() != testData.enableTLS { + t.Errorf("Expected enableTLS to be set to %v but got %v\n", testData.enableTLS, meta.PulsarAuth.EnabledTLS()) } - if meta.pulsarAuth.CA != testData.ca { - t.Errorf("Expected ca to be set to %s but got %s\n", testData.ca, meta.pulsarAuth.CA) + if meta.PulsarAuth.CA != testData.ca { + t.Errorf("Expected ca to be set to %s but got %s\n", testData.ca, meta.PulsarAuth.CA) } - if meta.pulsarAuth.Cert != testData.cert { - t.Errorf("Expected cert to be set to %s but got %s\n", testData.cert, meta.pulsarAuth.Cert) + if meta.PulsarAuth.Cert != testData.cert { + t.Errorf("Expected cert to be set to %s but got %s\n", testData.cert, meta.PulsarAuth.Cert) } - if meta.pulsarAuth.Key != testData.key { - t.Errorf("Expected key to be set to %s but got %s\n", testData.key, meta.pulsarAuth.Key) + if meta.PulsarAuth.Key != testData.key { + t.Errorf("Expected key to be set to %s but got %s\n", testData.key, meta.PulsarAuth.Key) } - if meta.pulsarAuth.EnableBearerAuth != (testData.bearerToken != "") { + if meta.PulsarAuth.EnabledBearerAuth() != (testData.bearerToken != "") { t.Errorf("Expected EnableBearerAuth to be true when bearerToken is %s\n", testData.bearerToken) } - if meta.pulsarAuth.BearerToken != testData.bearerToken { - t.Errorf("Expected bearer token to be set to %s but got %s\n", testData.bearerToken, meta.pulsarAuth.BearerToken) + if meta.PulsarAuth.BearerToken != testData.bearerToken { + t.Errorf("Expected bearer token to be set to %s but got %s\n", testData.bearerToken, meta.PulsarAuth.BearerToken) } - if meta.pulsarAuth.EnableBasicAuth != (testData.username != "" || testData.password != "") { + if meta.PulsarAuth.EnabledBasicAuth() != (testData.username != "" || testData.password != "") { if testData.username != "" { t.Errorf("Expected EnableBasicAuth to be true when username is %s\n", testData.username) } @@ -255,33 +258,33 @@ func TestPulsarAuthParams(t *testing.T) { } } - if meta.pulsarAuth.Username != testData.username { - t.Errorf("Expected username to be set to %s but got %s\n", testData.username, meta.pulsarAuth.Username) + if meta.PulsarAuth.Username != testData.username { + t.Errorf("Expected username to be set to %s but got %s\n", testData.username, meta.PulsarAuth.Username) } - if meta.pulsarAuth.Password != testData.password { - t.Errorf("Expected password to be set to %s but got %s\n", testData.password, meta.pulsarAuth.Password) + if meta.PulsarAuth.Password != testData.password { + t.Errorf("Expected password to be set to %s but got %s\n", testData.password, meta.PulsarAuth.Password) } } } func TestPulsarOAuthParams(t *testing.T) { - for _, testData := range parsePulsarMetadataTestAuthTLSDataset { + for i, testData := range parsePulsarMetadataTestAuthTLSDataset { meta, err := parsePulsarMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.triggerMetadata, AuthParams: testData.authParams}) if err != nil && !testData.isError { - t.Error("Expected success but got error", testData.authParams, err) + t.Error("Expected success but got error", i, testData.authParams, err) } if testData.isError && err == nil { t.Error("Expected error but got success") } - if meta == nil || meta.pulsarAuth == nil { - t.Log("meta.pulsarAuth is nil, skipping rest of validation of", testData) + if meta == nil || meta.PulsarAuth == nil { + t.Log("meta.PulsarAuth is nil, skipping rest of validation of", testData) continue } - if meta.pulsarAuth.EnableOAuth != (testData.clientID != "" || testData.clientSecret != "") { + if meta.PulsarAuth.EnabledOAuth() != (testData.clientID != "" || testData.clientSecret != "") { if testData.clientID != "" { t.Errorf("Expected EnableOAuth to be true when clientID is %s\n", testData.clientID) } @@ -290,31 +293,27 @@ func TestPulsarOAuthParams(t *testing.T) { } } - if meta.pulsarAuth.OauthTokenURI != testData.oauthTokenURI { - t.Errorf("Expected oauthTokenURI to be set to %s but got %s\n", testData.oauthTokenURI, meta.pulsarAuth.OauthTokenURI) + if meta.PulsarAuth.OauthTokenURI != testData.oauthTokenURI { + t.Errorf("Expected oauthTokenURI to be set to %s but got %s\n", testData.oauthTokenURI, meta.PulsarAuth.OauthTokenURI) } - if testData.scope != "" && !compareScope(meta.pulsarAuth.Scopes, testData.scope) { - t.Errorf("Expected scopes %s but got %s\n", testData.scope, meta.pulsarAuth.Scopes) + if testData.scope != "" && !compareScope(meta.PulsarAuth.Scopes, testData.scope) { + t.Errorf("Expected scopes %s but got %v\n", testData.scope, meta.PulsarAuth.Scopes) } - if testData.scope == "" && meta.pulsarAuth.Scopes != nil { - t.Errorf("Expected scopes to be null but got %s\n", meta.pulsarAuth.Scopes) - } - - if meta.pulsarAuth.ClientID != testData.clientID { - t.Errorf("Expected clientID to be set to %s but got %s\n", testData.clientID, meta.pulsarAuth.ClientID) + if testData.scope == "" && len(meta.PulsarAuth.Scopes) != 0 { + t.Errorf("Expected scopes to be empty but got %v\n", meta.PulsarAuth.Scopes) } - if meta.pulsarAuth.EnableOAuth && meta.pulsarAuth.ClientSecret == "" { - t.Errorf("Expected clientSecret not to be empty.\n") + if meta.PulsarAuth.ClientID != testData.clientID { + t.Errorf("Expected clientID to be set to %s but got %s\n", testData.clientID, meta.PulsarAuth.ClientID) } - if testData.clientSecret != "" && strings.Compare(meta.pulsarAuth.ClientSecret, testData.clientSecret) != 0 { - t.Errorf("Expected clientSecret to be set to %s but got %s\n", testData.clientSecret, meta.pulsarAuth.ClientSecret) + if testData.clientSecret != "" && strings.Compare(meta.PulsarAuth.ClientSecret, testData.clientSecret) != 0 { + t.Errorf("Expected clientSecret to be set to %s but got %s\n", testData.clientSecret, meta.PulsarAuth.ClientSecret) } - if reflect.DeepEqual(testData.expectedEndpointParams, meta.pulsarAuth.EndpointParams) { - t.Errorf("Expected endpointParams %s but got %s\n", testData.expectedEndpointParams, meta.pulsarAuth.EndpointParams) + if !equality.Semantic.DeepEqual(testData.expectedEndpointParams, meta.PulsarAuth.EndpointParams) { + t.Errorf("Expected endpointParams %v but got %v\n", testData.expectedEndpointParams, meta.PulsarAuth.EndpointParams) } } } diff --git a/schema/generated/scalers-schema.json b/schema/generated/scalers-schema.json index da35bace415..6441ad1f147 100644 --- a/schema/generated/scalers-schema.json +++ b/schema/generated/scalers-schema.json @@ -3898,36 +3898,6 @@ "type": "string", "optional": true, "metadataVariableReadable": true - }, - { - "name": "authModes", - "type": "string", - "optional": true, - "metadataVariableReadable": true - }, - { - "name": "oauthTokenURI", - "type": "string", - "optional": true, - "metadataVariableReadable": true - }, - { - "name": "scope", - "type": "string", - "optional": true, - "metadataVariableReadable": true - }, - { - "name": "clientID", - "type": "string", - "optional": true, - "metadataVariableReadable": true - }, - { - "name": "EndpointParams", - "type": "string", - "optional": true, - "metadataVariableReadable": true } ] }, diff --git a/schema/generated/scalers-schema.yaml b/schema/generated/scalers-schema.yaml index 27dbf81d998..b99c434a54e 100644 --- a/schema/generated/scalers-schema.yaml +++ b/schema/generated/scalers-schema.yaml @@ -2547,26 +2547,6 @@ scalers: type: string optional: true metadataVariableReadable: true - - name: authModes - type: string - optional: true - metadataVariableReadable: true - - name: oauthTokenURI - type: string - optional: true - metadataVariableReadable: true - - name: scope - type: string - optional: true - metadataVariableReadable: true - - name: clientID - type: string - optional: true - metadataVariableReadable: true - - name: EndpointParams - type: string - optional: true - metadataVariableReadable: true - type: rabbitmq parameters: - name: queueName