Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **General**: Fix ScaledObject admission webhook to return validation error from `verifyReplicaCount`, preventing invalid ScaledObjects from being created ([#5954](https://github.com/kedacore/keda/issues/5954))
- **Azure Data Explorer Scaler**: Remove clientSecretFromEnv support ([#7554](https://github.com/kedacore/keda/pull/7554))
- **Cron Scaler**: Fix metric name generation so cron expressions with comma-separated values no longer produce invalid metric names ([#7448](https://github.com/kedacore/keda/issues/7448))
- **External Scaler**: gRPC Pool uses TLS context in the key ([#7687](https://github.com/kedacore/keda/issues/7687))
- **Forgejo Scaler**: Limit HTTP error response logging ([#7469](https://github.com/kedacore/keda/pull/7469))
- **Forgejo Scaler**: Return correct activity to enable scale-to-zero ([#7527](https://github.com/kedacore/keda/issues/7527))
- **GCP Cloud Tasks Scaler**: Implement escapeFilterValue for metric filtering ([#7482](https://github.com/kedacore/keda/pull/7482))
Expand Down
24 changes: 23 additions & 1 deletion pkg/scalers/external_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ var connectionPool sync.Map

const grpcConfig = `{"loadBalancingConfig": [{"round_robin":{}}]}`

type externalScalerConnectionPoolKey struct {
ScalerAddress string
EnableTLS bool
UnsafeSsl bool
CaCert string
TLSClientCert string
TLSClientKey string
}

// NewExternalScaler creates a new external scaler - calls the GRPC interface
// to create a new scaler
func NewExternalScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
Expand Down Expand Up @@ -289,6 +298,19 @@ func handleIsActiveStream(ctx context.Context, scaledObjectRef *pb.ScaledObjectR

var connectionPoolMutex sync.Mutex

func getConnectionPoolKey(metadata externalScalerMetadata) (uint64, error) {
key := externalScalerConnectionPoolKey{
ScalerAddress: metadata.ScalerAddress,
EnableTLS: metadata.EnableTLS,
UnsafeSsl: metadata.UnsafeSsl,
CaCert: metadata.CaCert,
TLSClientCert: metadata.TLSClientCert,
TLSClientKey: metadata.TLSClientKey,
}

return hashstructure.Hash(key, nil)
}

// getClientForConnectionPool returns a grpcClient and a done() Func. The done() function must be called once the client is no longer
// in use to clean up the shared grpc.ClientConn
func getClientForConnectionPool(metadata externalScalerMetadata) (pb.ExternalScalerClient, error) {
Expand All @@ -315,7 +337,7 @@ func getClientForConnectionPool(metadata externalScalerMetadata) (pb.ExternalSca

// create a unique key per-metadata. If scaledObjects share the same connection properties
// in the metadata, they will share the same grpc.ClientConn
key, err := hashstructure.Hash(metadata.ScalerAddress, nil)
key, err := getConnectionPoolKey(metadata)
if err != nil {
return nil, err
}
Expand Down
115 changes: 115 additions & 0 deletions pkg/scalers/external_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,121 @@ func TestExternalScalerParseMetadata(t *testing.T) {
}
}

func TestGetConnectionPoolKey(t *testing.T) {
tests := []struct {
name string
left externalScalerMetadata
right externalScalerMetadata
wantSame bool
}{
{
name: "same metadata yields same key",
left: externalScalerMetadata{
ScalerAddress: "grpc.example:9090",
EnableTLS: true,
UnsafeSsl: false,
CaCert: serverRootCA,
TLSClientCert: clientCert,
TLSClientKey: "client-key-a",
},
right: externalScalerMetadata{
ScalerAddress: "grpc.example:9090",
EnableTLS: true,
UnsafeSsl: false,
CaCert: serverRootCA,
TLSClientCert: clientCert,
TLSClientKey: "client-key-a",
},
wantSame: true,
},
{
name: "different tls mode yields different key",
left: externalScalerMetadata{
ScalerAddress: "grpc.example:9090",
},
right: externalScalerMetadata{
ScalerAddress: "grpc.example:9090",
EnableTLS: true,
},
wantSame: false,
},
{
name: "different ca cert yields different key",
left: externalScalerMetadata{
ScalerAddress: "grpc.example:9090",
EnableTLS: true,
CaCert: serverRootCA,
},
right: externalScalerMetadata{
ScalerAddress: "grpc.example:9090",
EnableTLS: true,
CaCert: "different-ca",
},
wantSame: false,
},
{
name: "different client cert yields different key",
left: externalScalerMetadata{
ScalerAddress: "grpc.example:9090",
EnableTLS: true,
TLSClientCert: clientCert,
},
right: externalScalerMetadata{
ScalerAddress: "grpc.example:9090",
EnableTLS: true,
TLSClientCert: "different-client-cert",
},
wantSame: false,
},
{
name: "different client key yields different key",
left: externalScalerMetadata{
ScalerAddress: "grpc.example:9090",
EnableTLS: true,
TLSClientKey: "client-key-a",
},
right: externalScalerMetadata{
ScalerAddress: "grpc.example:9090",
EnableTLS: true,
TLSClientKey: "client-key-b",
},
wantSame: false,
},
{
name: "different unsafe ssl yields different key",
left: externalScalerMetadata{
ScalerAddress: "grpc.example:9090",
EnableTLS: true,
UnsafeSsl: false,
},
right: externalScalerMetadata{
ScalerAddress: "grpc.example:9090",
EnableTLS: true,
UnsafeSsl: true,
},
wantSame: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
leftKey, err := getConnectionPoolKey(tt.left)
if err != nil {
t.Fatalf("left key error: %v", err)
}

rightKey, err := getConnectionPoolKey(tt.right)
if err != nil {
t.Fatalf("right key error: %v", err)
}

if (leftKey == rightKey) != tt.wantSame {
t.Fatalf("unexpected key equality: left=%d right=%d wantSame=%t", leftKey, rightKey, tt.wantSame)
}
})
}
}

func TestExternalPushScaler_Run(t *testing.T) {
const serverCount = 5
const iterationCount = 500
Expand Down
Loading