Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **Prometheus Scaler**: Handle NaN results in the same manner as Inf ([#7475](https://github.com/kedacore/keda/issues/7475))
- **Prometheus Scaler**: Limit HTTP error response logging ([#7469](https://github.com/kedacore/keda/pull/7469))
- **RabbitMQ Scaler**: Fix AMQP connection leak by recovering channels on the existing connection and closing connections properly ([#6266](https://github.com/kedacore/keda/issues/6266))
- **RabbitMQ Scaler**: Use SASL EXTERNAL for RabbitMQ AMQP TLS without credentials ([#6840](https://github.com/kedacore/keda/issues/6840))
- **Solace Scaler**: Fix URL escaping for Message VPN and Queue names ([#7481](https://github.com/kedacore/keda/pull/7481))
- **Solr Scaler**: Use net/url to safely encode query parameters ([#7467](https://github.com/kedacore/keda/pull/7467))

Expand Down
45 changes: 32 additions & 13 deletions pkg/scalers/rabbitmq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,19 +357,9 @@ func parseRabbitMQMetadata(config *scalersconfig.ScalerConfig) (*rabbitMQMetadat
// the given ceClient cert, ceClient key,and CA certificate. If clientKeyPassword is not empty the provided password will be used to
// decrypt the given key. If enableTLS is disabled then amqp connection will be created without tls.
func getConnectionAndChannel(host string, meta *rabbitMQMetadata) (*amqp.Connection, *amqp.Channel, error) {
amqpConfig := amqp.Config{
Properties: amqp.Table{
"connection_name": meta.connectionName,
},
}

if meta.EnableTLS == rmqTLSEnable {
tlsConfig, err := kedautil.NewTLSConfigWithPassword(meta.Cert, meta.Key, meta.KeyPassword, meta.Ca, meta.UnsafeSsl)
if err != nil {
return nil, nil, err
}

amqpConfig.TLSClientConfig = tlsConfig
amqpConfig, err := buildAMQPConfig(meta)
if err != nil {
return nil, nil, err
}

conn, err := amqp.DialConfig(host, amqpConfig)
Expand All @@ -386,6 +376,35 @@ func getConnectionAndChannel(host string, meta *rabbitMQMetadata) (*amqp.Connect
return conn, channel, nil
}

func buildAMQPConfig(meta *rabbitMQMetadata) (amqp.Config, error) {
Comment thread
rickbrouwer marked this conversation as resolved.
config := amqp.Config{
Properties: amqp.Table{
"connection_name": meta.connectionName,
},
}

if meta.EnableTLS == rmqTLSEnable {
tlsConfig, err := kedautil.NewTLSConfigWithPassword(meta.Cert, meta.Key, meta.KeyPassword, meta.Ca, meta.UnsafeSsl)
if err != nil {
return amqp.Config{}, err
}
config.TLSClientConfig = tlsConfig
if meta.Username == "" && meta.Password == "" && meta.Cert != "" && meta.Key != "" {
useExternal := true
if u, err := url.Parse(meta.Host); err == nil && u.User != nil {
if _, hasPassword := u.User.Password(); hasPassword {
useExternal = false
}
}
if useExternal {
config.SASL = []amqp.Authentication{&amqp.ExternalAuth{}}
Comment thread
wozniakjan marked this conversation as resolved.
}
}
Comment thread
rickbrouwer marked this conversation as resolved.
}

return config, nil
}

// Close disposes of RabbitMQ connections
func (s *rabbitMQScaler) Close(context.Context) error {
if s.channel != nil {
Expand Down
123 changes: 123 additions & 0 deletions pkg/scalers/rabbitmq_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,14 @@ package scalers

import (
"context"
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"fmt"
"math/big"
"net/http"
"net/http/httptest"
"strconv"
Expand All @@ -11,6 +18,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
Expand Down Expand Up @@ -831,6 +839,121 @@ func TestConnectionName(t *testing.T) {
}
}

func TestRabbitMQBuildAMQPConfig(t *testing.T) {
certPEM, keyPEM := generateTestCertAndKey(t)

tests := []struct {
name string
meta *rabbitMQMetadata
wantSASLMechanism string
wantSASLNil bool
}{
{
name: "TLS enabled without credentials uses EXTERNAL SASL",
meta: &rabbitMQMetadata{
EnableTLS: rmqTLSEnable,
Cert: certPEM,
Key: keyPEM,
Username: "",
Password: "",
},
wantSASLMechanism: "EXTERNAL",
},
{
name: "TLS enabled with credentials does not set SASL",
meta: &rabbitMQMetadata{
EnableTLS: rmqTLSEnable,
Username: "user",
Password: "pass",
},
wantSASLNil: true,
},
{
name: "TLS disabled does not set SASL",
meta: &rabbitMQMetadata{
EnableTLS: rmqTLSDisable,
},
wantSASLNil: true,
},
{
name: "TLS enabled with CA only does not set EXTERNAL",
meta: &rabbitMQMetadata{
EnableTLS: rmqTLSEnable,
Ca: certPEM, // CA only, no client cert
},
wantSASLNil: true,
},
{
name: "TLS enabled with credentials in host URI does not set EXTERNAL",
meta: &rabbitMQMetadata{
EnableTLS: rmqTLSEnable,
Host: "amqps://user:pass@rabbit:5671/",
Cert: certPEM,
Key: keyPEM,
},
wantSASLNil: true,
},
{
name: "TLS enabled with username-only userinfo in host URI uses EXTERNAL SASL",
meta: &rabbitMQMetadata{
EnableTLS: rmqTLSEnable,
Host: "amqps://CN=username@rabbit:5671/",
Cert: certPEM,
Key: keyPEM,
},
wantSASLMechanism: "EXTERNAL",
},
{
name: "TLS enabled, no credentials in host URI, uses EXTERNAL",
meta: &rabbitMQMetadata{
EnableTLS: rmqTLSEnable,
Host: "amqps://rabbit.namespace.svc:5671/",
Cert: certPEM,
Key: keyPEM,
},
wantSASLMechanism: "EXTERNAL",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
config, err := buildAMQPConfig(tt.meta)
assert.NoError(t, err)

if tt.wantSASLNil {
assert.Nil(t, config.SASL)
return
}

assert.Len(t, config.SASL, 1)
assert.Equal(t, tt.wantSASLMechanism, config.SASL[0].Mechanism())
})
}
}

func generateTestCertAndKey(t *testing.T) (string, string) {
t.Helper()
key, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
require.NoError(t, err)

template := &x509.Certificate{
SerialNumber: big.NewInt(1),
Subject: pkix.Name{CommonName: "test"},
NotBefore: time.Now().Add(-time.Hour),
NotAfter: time.Now().Add(time.Hour),
}
certDER, err := x509.CreateCertificate(rand.Reader, template, template, &key.PublicKey, key)
require.NoError(t, err)

certPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certDER})

keyDER, err := x509.MarshalECPrivateKey(key)
require.NoError(t, err)
keyPEM := pem.EncodeToMemory(&pem.Block{Type: "EC PRIVATE KEY", Bytes: keyDER})

return string(certPEM), string(keyPEM)
}

func TestGetSum(t *testing.T) {
tests := []struct {
name string
Expand Down
Loading