Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio

### New

- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
- **General**: Introduce new OpenSearch Scaler ([#7456](https://github.com/kedacore/keda/issues/7456))

#### Experimental

Expand Down Expand Up @@ -96,9 +96,11 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **GCP Scaler**: Validate Pub/Sub resource name in BuildMQLQuery ([#7468](https://github.com/kedacore/keda/pull/7468))
- **Github Runner Scaler**: Improve URL construction and error handling ([#7495](https://github.com/kedacore/keda/pull/7495))
- **Github Runner Scaler**: Limit HTTP error response logging ([#7469](https://github.com/kedacore/keda/pull/7469))
- **InfluxDB Scaler**: Make `authToken` optional to support unauthenticated InfluxDB instances ([#7616](https://github.com/kedacore/keda/issues/7616))
- **Loki Scaler**: Limit HTTP error response logging ([#7469](https://github.com/kedacore/keda/pull/7469))
- **Loki Scaler**: `serverAddress` now appends `/loki/api/v1/query` to the end of existing path instead of overriding ([#7648](https://github.com/kedacore/keda/pull/7648))
- **Metrics API Scaler**: Fix `aggregateFromKubeServiceEndpoints` using empty label selector that matched all EndpointSlices in the namespace instead of only the target service's ([#7641](https://github.com/kedacore/keda/issues/7641))
- **NATS JetStream Scaler**: Return an error from `getMaxMsgLag` when the configured consumer is missing instead of falling back to the stream's last sequence, preventing incorrect scale-up to `maxReplicaCount` ([#7657](https://github.com/kedacore/keda/issues/7657))
- **NATS JetStream Scaler**: URL-encode user input in monitoring URL construction ([#7483](https://github.com/kedacore/keda/pull/7483))
- **Prometheus Scaler**: Handle NaN results in the same manner as Inf ([#7475](https://github.com/kedacore/keda/issues/7475))
- **Prometheus Scaler**: Limit HTTP error response logging ([#7469](https://github.com/kedacore/keda/pull/7469))
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.1.0
github.com/hashicorp/vault/api v1.23.0
github.com/influxdata/influxdb-client-go/v2 v2.14.0
github.com/jackc/pgx/v5 v5.9.1
github.com/jackc/pgx/v5 v5.9.2
github.com/joho/godotenv v1.5.1
github.com/jstemmer/go-junit-report/v2 v2.1.0
github.com/microsoft/ApplicationInsights-Go v0.4.4
Expand All @@ -90,6 +90,7 @@ require (
github.com/onsi/ginkgo/v2 v2.28.1
github.com/onsi/gomega v1.39.1
github.com/open-policy-agent/cert-controller v0.16.0
github.com/opensearch-project/opensearch-go/v4 v4.6.0
github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.23.2
Expand Down Expand Up @@ -294,7 +295,7 @@ require (
github.com/mattn/go-runewidth v0.0.19 // indirect
github.com/microsoft/go-mssqldb v1.9.6
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/moby/spdystream v0.5.0 // indirect
github.com/moby/spdystream v0.5.1 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect
github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00 // indirect
Expand Down
16 changes: 10 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPd
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.4 h1:2jAwFwA0Xgcx94dUId+K24yFabsKYDtAhCgyMit6OqE=
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.4/go.mod h1:MVYeeOhILFFemC/XlYTClvBjYZrg/EPd3ts885KrNTI=
github.com/aws/aws-sdk-go v1.55.7 h1:UJrkFq7es5CShfBwlWAC8DA077vp8PyVbQd3lqLiztE=
github.com/aws/aws-sdk-go v1.55.7/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU=
github.com/aws/aws-sdk-go v1.55.8 h1:JRmEUbU52aJQZ2AjX4q4Wu7t4uZjOu71uyNmaWlUkJQ=
github.com/aws/aws-sdk-go v1.55.8/go.mod h1:ZkViS9AqA6otK+JBBNH2++sx1sgxrPKcSzPPvQkUtXk=
github.com/aws/aws-sdk-go-v2 v1.16.12/go.mod h1:C+Ym0ag2LIghJbXhfXZ0YEEp49rBWowxKzJLUoob0ts=
github.com/aws/aws-sdk-go-v2 v1.41.5 h1:dj5kopbwUsVUVFgO4Fi5BIT3t4WyqIDjGKCangnV/yY=
github.com/aws/aws-sdk-go-v2 v1.41.5/go.mod h1:mwsPRE8ceUUpiTgF7QmQIJ7lgsKUPQOUl3o72QBrE1o=
Expand Down Expand Up @@ -586,8 +586,8 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.9.1 h1:uwrxJXBnx76nyISkhr33kQLlUqjv7et7b9FjCen/tdc=
github.com/jackc/pgx/v5 v5.9.1/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4=
github.com/jackc/pgx/v5 v5.9.2 h1:3ZhOzMWnR4yJ+RW1XImIPsD1aNSz4T4fyP7zlQb56hw=
github.com/jackc/pgx/v5 v5.9.2/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4=
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8=
Expand Down Expand Up @@ -698,8 +698,8 @@ github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:F
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/moby/spdystream v0.5.0 h1:7r0J1Si3QO/kjRitvSLVVFUjxMEb/YLj6S9FF62JBCU=
github.com/moby/spdystream v0.5.0/go.mod h1:xBAYlnt/ay+11ShkdFKNAG7LsyK/tmNBVvVOwrfMgdI=
github.com/moby/spdystream v0.5.1 h1:9sNYeYZUcci9R6/w7KDaFWEWeV4LStVG78Mpyq/Zm/Y=
github.com/moby/spdystream v0.5.1/go.mod h1:xBAYlnt/ay+11ShkdFKNAG7LsyK/tmNBVvVOwrfMgdI=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
Expand Down Expand Up @@ -746,6 +746,8 @@ github.com/open-policy-agent/cert-controller v0.16.0 h1:/w+wAM9dylWGwhPnvfCgepyB
github.com/open-policy-agent/cert-controller v0.16.0/go.mod h1:w5qBWYbc8HwyHI9VYAZ6YjWOcZtQ39A30I9W4X7pVVk=
github.com/open-policy-agent/frameworks/constraint v0.0.0-20241101234656-e78c8abd754a h1:gQtOJ50XFyL2Xh3lDD9zP4KQ2PY4mZKQ9hDcWc81Sp8=
github.com/open-policy-agent/frameworks/constraint v0.0.0-20241101234656-e78c8abd754a/go.mod h1:tI7nc6H6os2UYZRvSm9Y7bq4oMoXqhwA0WfnqKpoAgc=
github.com/opensearch-project/opensearch-go/v4 v4.6.0 h1:Ac8aLtDSmLEyOmv0r1qhQLw3b4vcUhE42NE9k+Z4cRc=
github.com/opensearch-project/opensearch-go/v4 v4.6.0/go.mod h1:3iZtb4SNt3IzaxavKq0dURh1AmtVgYW71E4XqmYnIiQ=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/otiai10/copy v1.7.0 h1:hVoPiN+t+7d2nzzwMiDHPSOogsWAStewq3TwU05+clE=
github.com/otiai10/copy v1.7.0/go.mod h1:rmRl6QPdJj6EiUqXQ/4Nn2lLXoNQjFCQbbNrxgc/t3U=
Expand Down Expand Up @@ -911,6 +913,8 @@ github.com/valyala/fastjson v1.6.7 h1:ZE4tRy0CIkh+qDc5McjatheGX2czdn8slQjomexVpB
github.com/valyala/fastjson v1.6.7/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY=
github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
github.com/wI2L/jsondiff v0.7.0 h1:1lH1G37GhBPqCfp/lrs91rf/2j3DktX6qYAKZkLuCQQ=
github.com/wI2L/jsondiff v0.7.0/go.mod h1:KAEIojdQq66oJiHhDyQez2x+sRit0vIzC9KeK0yizxM=
github.com/wagslane/go-password-validator v0.3.0 h1:vfxOPzGHkz5S146HDpavl0cw1DSVP061Ry2PX0/ON6I=
github.com/wagslane/go-password-validator v0.3.0/go.mod h1:TI1XJ6T5fRdRnHqHt14pvy1tNVnrwe7m3/f1f2fDphQ=
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
Expand Down
4 changes: 0 additions & 4 deletions pkg/scalers/influxdb_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ func (i *influxDBMetadata) Validate() error {
i.AuthToken = i.AuthTokenOld
}

if i.AuthToken == "" {
return fmt.Errorf("authToken is required")
}

if i.InfluxVersion == "3" {
if i.Database == "" {
return fmt.Errorf("database is required if influxVersion is 3")
Expand Down
4 changes: 2 additions & 2 deletions pkg/scalers/influxdb_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ var testInfluxDBMetadata = []parseInfluxDBMetadataTestData{
{map[string]string{"serverURL": "https://influxdata.com", "organizationName": "influx_org", "thresholdValue": "10", "authToken": "myToken", "unsafeSsl": "false"}, true, map[string]string{}},
// 7 no threshold value passed
{map[string]string{"serverURL": "https://influxdata.com", "organizationName": "influx_org", "query": "from(bucket: hello)", "authToken": "myToken", "unsafeSsl": "false"}, true, map[string]string{}},
// 8 no auth token passed
{map[string]string{"serverURL": "https://influxdata.com", "organizationName": "influx_org", "query": "from(bucket: hello)", "thresholdValue": "10", "unsafeSsl": "false"}, true, map[string]string{}},
// 8 no auth token passed (optional, for unauthenticated instances)
{map[string]string{"serverURL": "https://influxdata.com", "organizationName": "influx_org", "query": "from(bucket: hello)", "thresholdValue": "10", "unsafeSsl": "false"}, false, map[string]string{}},
// 9 authToken, organizationName, and serverURL are defined in authParams
{map[string]string{"query": "from(bucket: hello)", "thresholdValue": "10", "unsafeSsl": "false"}, false, map[string]string{"serverURL": "https://influxdata.com", "organizationName": "influx_org", "authToken": "myToken"}},
// 10 no unsafeSsl value passed
Expand Down
11 changes: 7 additions & 4 deletions pkg/scalers/nats_jetstream_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,15 +419,15 @@ func (s *natsJetStreamScaler) getNATSJetStreamMonitoringNodeURLByNode(node strin
return nodeURL.String(), nil
}

func (s *natsJetStreamScaler) getMaxMsgLag() int64 {
func (s *natsJetStreamScaler) getMaxMsgLag() (int64, error) {
consumerName := s.metadata.Consumer

for _, consumer := range s.stream.Consumers {
if consumer.Name == consumerName {
return int64(consumer.NumPending + consumer.NumAckPending)
return int64(consumer.NumPending + consumer.NumAckPending), nil
}
}
return s.stream.State.LastSequence
return 0, fmt.Errorf("consumer %q not found in stream %q", consumerName, s.metadata.Stream)
}

func (s *natsJetStreamScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
Expand Down Expand Up @@ -455,7 +455,10 @@ func (s *natsJetStreamScaler) GetMetricsAndActivity(ctx context.Context, metricN
return []external_metrics.ExternalMetricValue{}, false, errors.New("stream not found")
}

totalLag := s.getMaxMsgLag()
totalLag, err := s.getMaxMsgLag()
if err != nil {
return []external_metrics.ExternalMetricValue{}, false, err
}
s.logger.V(1).Info("NATS JetStream Scaler: Providing metrics based on totalLag, threshold", "totalLag", totalLag, "lagThreshold", s.metadata.LagThreshold)

metric := GenerateMetricInMili(metricName, float64(totalLag))
Expand Down
52 changes: 49 additions & 3 deletions pkg/scalers/nats_jetstream_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ var testNATSJetStreamMockResponses = []parseNATSJetStreamMockResponsesTestData{
}, true, false,
},
{
"Not Active - Bad consumer name uses stream last sequence",
"Fail - stream exists but configured consumer missing",
&natsJetStreamMetricIdentifier{
&parseNATSJetStreamMetadataTestData{
testNATSJetStreamGoodMetadata, map[string]string{}, false,
Expand All @@ -174,11 +174,11 @@ var testNATSJetStreamMockResponses = []parseNATSJetStreamMockResponsesTestData{
Accounts: []accountDetail{{
Name: "$G",
Streams: []*streamDetail{{
Name: "mystream", State: streamState{LastSequence: 1},
Name: "mystream", State: streamState{LastSequence: 1000},
Consumers: []consumerDetail{{Name: "pull_consumer_bad", NumPending: 100}},
}},
}},
}, false, false,
}, false, true,
},
{
"Fail - Non-matching stream name",
Expand Down Expand Up @@ -470,6 +470,52 @@ func TestNATSJetStreamGetNATSJetstreamServerURL(t *testing.T) {
}
}

func TestNATSJetStreamGetMaxMsgLag(t *testing.T) {
meta, err := parseNATSJetStreamMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testNATSJetStreamGoodMetadata, TriggerIndex: 0})
if err != nil {
t.Fatal("Could not parse metadata:", err)
}

t.Run("consumer present returns pending+ackPending", func(t *testing.T) {
scaler := natsJetStreamScaler{
metadata: meta,
stream: &streamDetail{
Name: "mystream",
State: streamState{LastSequence: 9999},
Consumers: []consumerDetail{{
Name: "pull_consumer",
NumPending: 7,
NumAckPending: 3,
}},
},
}

lag, err := scaler.getMaxMsgLag()
assert.NoError(t, err)
assert.Equal(t, int64(10), lag)
})

t.Run("stream exists but consumer missing returns error and zero lag", func(t *testing.T) {
scaler := natsJetStreamScaler{
metadata: meta,
stream: &streamDetail{
Name: "mystream",
State: streamState{LastSequence: 9999},
Consumers: []consumerDetail{{
Name: "some_other_consumer",
NumPending: 100,
}},
},
}

lag, err := scaler.getMaxMsgLag()
assert.Error(t, err)
assert.Equal(t, int64(0), lag, "must not fall back to stream LastSequence")
assert.Contains(t, err.Error(), "pull_consumer")
assert.Contains(t, err.Error(), "mystream")
})
}

func TestNATSJetStreamClose(t *testing.T) {
mockJetStreamScaler, err := NewNATSJetStreamScaler(&scalersconfig.ScalerConfig{TriggerMetadata: testNATSJetStreamGoodMetadata, TriggerIndex: 0})
if err != nil {
Expand Down
Loading