Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio

### New

- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
- **General**: Introduce new OpenSearch Scaler ([#7456](https://github.com/kedacore/keda/issues/7456))

#### Experimental

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ require (
github.com/onsi/ginkgo/v2 v2.28.1
github.com/onsi/gomega v1.39.1
github.com/open-policy-agent/cert-controller v0.16.0
github.com/opensearch-project/opensearch-go/v4 v4.6.0
github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.23.2
Expand Down
8 changes: 6 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPd
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.4 h1:2jAwFwA0Xgcx94dUId+K24yFabsKYDtAhCgyMit6OqE=
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.4/go.mod h1:MVYeeOhILFFemC/XlYTClvBjYZrg/EPd3ts885KrNTI=
github.com/aws/aws-sdk-go v1.55.7 h1:UJrkFq7es5CShfBwlWAC8DA077vp8PyVbQd3lqLiztE=
github.com/aws/aws-sdk-go v1.55.7/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU=
github.com/aws/aws-sdk-go v1.55.8 h1:JRmEUbU52aJQZ2AjX4q4Wu7t4uZjOu71uyNmaWlUkJQ=
github.com/aws/aws-sdk-go v1.55.8/go.mod h1:ZkViS9AqA6otK+JBBNH2++sx1sgxrPKcSzPPvQkUtXk=
github.com/aws/aws-sdk-go-v2 v1.16.12/go.mod h1:C+Ym0ag2LIghJbXhfXZ0YEEp49rBWowxKzJLUoob0ts=
github.com/aws/aws-sdk-go-v2 v1.41.5 h1:dj5kopbwUsVUVFgO4Fi5BIT3t4WyqIDjGKCangnV/yY=
github.com/aws/aws-sdk-go-v2 v1.41.5/go.mod h1:mwsPRE8ceUUpiTgF7QmQIJ7lgsKUPQOUl3o72QBrE1o=
Expand Down Expand Up @@ -746,6 +746,8 @@ github.com/open-policy-agent/cert-controller v0.16.0 h1:/w+wAM9dylWGwhPnvfCgepyB
github.com/open-policy-agent/cert-controller v0.16.0/go.mod h1:w5qBWYbc8HwyHI9VYAZ6YjWOcZtQ39A30I9W4X7pVVk=
github.com/open-policy-agent/frameworks/constraint v0.0.0-20241101234656-e78c8abd754a h1:gQtOJ50XFyL2Xh3lDD9zP4KQ2PY4mZKQ9hDcWc81Sp8=
github.com/open-policy-agent/frameworks/constraint v0.0.0-20241101234656-e78c8abd754a/go.mod h1:tI7nc6H6os2UYZRvSm9Y7bq4oMoXqhwA0WfnqKpoAgc=
github.com/opensearch-project/opensearch-go/v4 v4.6.0 h1:Ac8aLtDSmLEyOmv0r1qhQLw3b4vcUhE42NE9k+Z4cRc=
github.com/opensearch-project/opensearch-go/v4 v4.6.0/go.mod h1:3iZtb4SNt3IzaxavKq0dURh1AmtVgYW71E4XqmYnIiQ=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/otiai10/copy v1.7.0 h1:hVoPiN+t+7d2nzzwMiDHPSOogsWAStewq3TwU05+clE=
github.com/otiai10/copy v1.7.0/go.mod h1:rmRl6QPdJj6EiUqXQ/4Nn2lLXoNQjFCQbbNrxgc/t3U=
Expand Down Expand Up @@ -911,6 +913,8 @@ github.com/valyala/fastjson v1.6.7 h1:ZE4tRy0CIkh+qDc5McjatheGX2czdn8slQjomexVpB
github.com/valyala/fastjson v1.6.7/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY=
github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
github.com/wI2L/jsondiff v0.7.0 h1:1lH1G37GhBPqCfp/lrs91rf/2j3DktX6qYAKZkLuCQQ=
github.com/wI2L/jsondiff v0.7.0/go.mod h1:KAEIojdQq66oJiHhDyQez2x+sRit0vIzC9KeK0yizxM=
github.com/wagslane/go-password-validator v0.3.0 h1:vfxOPzGHkz5S146HDpavl0cw1DSVP061Ry2PX0/ON6I=
github.com/wagslane/go-password-validator v0.3.0/go.mod h1:TI1XJ6T5fRdRnHqHt14pvy1tNVnrwe7m3/f1f2fDphQ=
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
Expand Down
345 changes: 345 additions & 0 deletions pkg/scalers/opensearch_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,345 @@
package scalers

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"strconv"
"strings"

"github.com/go-logr/logr"
"github.com/opensearch-project/opensearch-go/v4"
"github.com/opensearch-project/opensearch-go/v4/opensearchapi"
"github.com/tidwall/gjson"
v2 "k8s.io/api/autoscaling/v2"
"k8s.io/metrics/pkg/apis/external_metrics"

"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
"github.com/kedacore/keda/v2/pkg/util"
)

type opensearchScaler struct {
metricType v2.MetricTargetType
metadata opensearchMetadata
osAPIClient *opensearchapi.Client
logger logr.Logger
}

type opensearchMetadata struct {
Addresses []string `keda:"name=addresses, order=authParams;triggerMetadata"`
UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, default=false"`
EnableTLS bool `keda:"name=enableTLS, order=triggerMetadata;authParams, default=false"`
Username string `keda:"name=username, order=authParams;triggerMetadata, optional"`
Password string `keda:"name=password, order=authParams;resolvedEnv;triggerMetadata, optional"`
CACert string `keda:"name=caCert, order=authParams;triggerMetadata, optional"`
ClientCert string `keda:"name=clientCert, order=authParams;triggerMetadata, optional"`
ClientKey string `keda:"name=clientKey, order=authParams;triggerMetadata, optional"`
Index []string `keda:"name=index, order=authParams;triggerMetadata, separator=;"`
SearchTemplateName string `keda:"name=searchTemplateName, order=authParams;triggerMetadata, optional"`
Query string `keda:"name=query, order=authParams;triggerMetadata, optional"`
Parameters []string `keda:"name=parameters, order=triggerMetadata, optional, separator=;"`
ValueLocation string `keda:"name=valueLocation, order=authParams;triggerMetadata"`
TargetValue float64 `keda:"name=targetValue, order=authParams;triggerMetadata"`
ActivationTargetValue float64 `keda:"name=activationTargetValue, order=triggerMetadata, default=0"`
IgnoreNullValues bool `keda:"name=ignoreNullValues, order=triggerMetadata, default=false"`
MetricName string `keda:"name=metricName, order=triggerMetadata, optional"`

TriggerIndex int
}

func (m *opensearchMetadata) Validate() error {
if m.EnableTLS && (m.ClientCert == "" || m.ClientKey == "") {
return fmt.Errorf("both clientCert and clientKey must be provided when enableTLS is true")
}
if !m.EnableTLS && (m.Username == "" || m.Password == "") {
return fmt.Errorf("both username and password must be provided for basic auth")
}
if m.SearchTemplateName == "" && m.Query == "" {
return fmt.Errorf("either searchTemplateName or query must be provided")
}
if m.SearchTemplateName != "" && m.Query != "" {
return fmt.Errorf("cannot provide both searchTemplateName and query")
}
Comment thread
rickbrouwer marked this conversation as resolved.

return nil
}

func NewOpensearchScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
if err != nil {
return nil, fmt.Errorf("error getting scaler metric type: %w", err)
}

logger := InitializeLogger(config, "opensearch_scaler")

meta, err := parseOpensearchMetadata(config)
if err != nil {
return nil, fmt.Errorf("failed to parse opensearch metadata: %w", err)
}

opensearchAPIClient, err := newOpensearchAPIClient(meta, logger)
if err != nil {
return nil, fmt.Errorf("failed to create opensearch client: %w", err)
}

return &opensearchScaler{
metricType: metricType,
metadata: meta,
osAPIClient: opensearchAPIClient,
logger: logger,
}, nil
}

func newOpensearchAPIClient(meta opensearchMetadata, logger logr.Logger) (*opensearchapi.Client, error) {
if meta.EnableTLS {
return newOpensearchAPIClientWithTLS(meta, logger)
}
return newOpensearchAPIClientWithBasicAuth(meta, logger)
}

func newOpensearchAPIClientWithTLS(meta opensearchMetadata, logger logr.Logger) (*opensearchapi.Client, error) {
tlsConfig, err := util.NewTLSConfig(meta.ClientCert, meta.ClientKey, meta.CACert, meta.UnsafeSsl)
if err != nil {
return nil, fmt.Errorf("failed to create TLS config: %w", err)
}

return newOpensearchAPIClientFromConfig(opensearch.Config{
Addresses: meta.Addresses,
Transport: util.CreateHTTPTransportWithTLSConfig(tlsConfig),
}, logger)
}

func newOpensearchAPIClientWithBasicAuth(meta opensearchMetadata, logger logr.Logger) (*opensearchapi.Client, error) {
tlsConfig, err := util.NewTLSConfig("", "", meta.CACert, meta.UnsafeSsl)
if err != nil {
return nil, fmt.Errorf("failed to create TLS config: %w", err)
}

return newOpensearchAPIClientFromConfig(opensearch.Config{
Addresses: meta.Addresses,
Username: meta.Username,
Password: meta.Password,
Transport: util.CreateHTTPTransportWithTLSConfig(tlsConfig),
}, logger)
}

func newOpensearchAPIClientFromConfig(cfg opensearch.Config, logger logr.Logger) (*opensearchapi.Client, error) {
client, err := opensearchapi.NewClient(opensearchapi.Config{Client: cfg})
if err != nil {
logger.Error(err, fmt.Sprintf("failed to create opensearch client: %s", err))
return nil, err
}

_, err = client.Ping(context.Background(), nil)
if err != nil {
logger.Error(err, fmt.Sprintf("failed to ping the opensearch engine: %s", err))
return nil, err
}

return client, nil
}

func parseOpensearchMetadata(config *scalersconfig.ScalerConfig) (opensearchMetadata, error) {
meta := opensearchMetadata{}
err := config.TypedConfig(&meta)

if err != nil {
return meta, err
}

if meta.SearchTemplateName != "" {
meta.MetricName = GenerateMetricNameWithIndex(config.TriggerIndex, util.NormalizeString(fmt.Sprintf("opensearch-%s", meta.SearchTemplateName)))
} else {
meta.MetricName = GenerateMetricNameWithIndex(config.TriggerIndex, "opensearch-query")
}

Comment thread
rickbrouwer marked this conversation as resolved.
meta.TriggerIndex = config.TriggerIndex

return meta, nil
}

func (s *opensearchScaler) Close(_ context.Context) error {
return nil
}

func (s *opensearchScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: s.metadata.MetricName,
},
Target: GetMetricTargetMili(s.metricType, s.metadata.TargetValue),
}
metricSpec := v2.MetricSpec{
External: externalMetric,
Type: externalMetricType,
}
return []v2.MetricSpec{metricSpec}
}

func (s *opensearchScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
queryResult, err := s.getQueryResult(ctx)
if err != nil {
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("error inspecting opensearch: %w", err)
}

metric := GenerateMetricInMili(metricName, queryResult)

return []external_metrics.ExternalMetricValue{metric}, queryResult > s.metadata.ActivationTargetValue, nil
}

func (s *opensearchScaler) getQueryResult(ctx context.Context) (float64, error) {
var responseBody []byte
var err error

if s.metadata.SearchTemplateName != "" {
responseBody, err = s.searchTemplate(ctx)
if err != nil {
return 0, err
}
} else {
responseBody, err = s.search(ctx)
if err != nil {
return 0, err
}
}

value, err := s.getValueFromSearchResultByValueLocation(responseBody)
if err != nil {
return 0, err
}

return value, nil
}

func (s *opensearchScaler) getValueFromSearchResultByValueLocation(responseBody []byte) (float64, error) {
r := gjson.GetBytes(responseBody, s.metadata.ValueLocation)
errorMsg := "valueLocation must point to value of type number but got: '%s'"

if r.Type == gjson.Null {
if s.metadata.IgnoreNullValues {
return 0, nil // Return 0 when the value is null and we're ignoring null values
}
return 0, fmt.Errorf(errorMsg, "Null")
}

if r.Type == gjson.String {
q, err := strconv.ParseFloat(r.String(), 64)
if err != nil {
return 0, fmt.Errorf(errorMsg, r.String())
}
return q, nil
}

if r.Type != gjson.Number {
return 0, fmt.Errorf(errorMsg, r.Type.String())
}
return r.Num, nil
}

func (s *opensearchScaler) search(ctx context.Context) ([]byte, error) {
if !json.Valid([]byte(s.metadata.Query)) {
return nil, fmt.Errorf("invalid query JSON")
}

searchResponse, err := s.osAPIClient.Search(ctx, &opensearchapi.SearchReq{
Indices: s.metadata.Index,
Body: bytes.NewReader([]byte(s.metadata.Query)),
})
if err != nil {
return nil, s.responseError(searchResponse.Inspect().Response, err)
}
Comment thread
rickbrouwer marked this conversation as resolved.

responseBody, err := s.readResponseBody(searchResponse.Inspect().Response)
if err != nil {
return nil, err
}

return responseBody, nil
}

func (s *opensearchScaler) searchTemplate(ctx context.Context) ([]byte, error) {
query, err := s.buildQueryFromMetadata()
if err != nil {
return nil, err
}
body, err := json.Marshal(query)
if err != nil {
return nil, fmt.Errorf("failed to encode search template request: %v", err)
}

searchTemplResponse, err := s.osAPIClient.SearchTemplate(ctx, opensearchapi.SearchTemplateReq{
Indices: s.metadata.Index,
Body: bytes.NewReader(body),
})
if err != nil {
return nil, s.responseError(searchTemplResponse.Inspect().Response, err)
}
Comment thread
rickbrouwer marked this conversation as resolved.

responseBody, err := s.readResponseBody(searchTemplResponse.Inspect().Response)
if err != nil {
return nil, err
}

return responseBody, nil
}

func (s *opensearchScaler) buildQueryFromMetadata() (map[string]interface{}, error) {
parameters := map[string]interface{}{}
for _, p := range s.metadata.Parameters {
if p != "" {
kv := strings.SplitN(p, ":", 2)
if len(kv) != 2 {
return nil, fmt.Errorf("invalid parameter format %q, expected key:value", p)
}
key := strings.TrimSpace(kv[0])
value := strings.TrimSpace(kv[1])
if key == "" || value == "" {
return nil, fmt.Errorf("invalid parameter format %q, expected key:value", p)
}
parameters[key] = value
}
}
query := map[string]interface{}{
"id": s.metadata.SearchTemplateName,
}
if len(parameters) > 0 {
query["params"] = parameters
}
return query, nil
}

func (s *opensearchScaler) readResponseBody(resp *opensearch.Response) ([]byte, error) {
if resp == nil || resp.Body == nil {
return nil, fmt.Errorf("empty search response")
}
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read search response: %v", err)
}

return body, nil
}

func (s *opensearchScaler) responseError(resp *opensearch.Response, fallback error) error {
if resp != nil {
if err := s.checkHTTPStatus(resp.StatusCode); err != nil {
return err
}
}
return fallback
}

// checkHTTPStatus returns a clear error for authentication and authorization failures
func (s *opensearchScaler) checkHTTPStatus(statusCode int) error {
if statusCode == 401 {
return fmt.Errorf("opensearch authentication failed (HTTP 401): check username and password")
}
if statusCode == 403 {
return fmt.Errorf("opensearch authorization failed (HTTP 403): user has insufficient permissions")
}
return nil
}
Loading
Loading