diff --git a/CHANGELOG.md b/CHANGELOG.md index 8f1c8d302b5..3a8707971cf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -68,7 +68,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio ### New -- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX)) +- **Temporal Scaler**: Add composite metric support (backlog + running workflow count) ([#7459](https://github.com/kedacore/keda/issues/7459)) #### Experimental diff --git a/pkg/scalers/temporal_scaler.go b/pkg/scalers/temporal_scaler.go index 91e6f273531..50790721849 100644 --- a/pkg/scalers/temporal_scaler.go +++ b/pkg/scalers/temporal_scaler.go @@ -5,9 +5,11 @@ import ( "crypto/tls" "fmt" "log/slog" + "regexp" "time" "github.com/go-logr/logr" + workflowservice "go.temporal.io/api/workflowservice/v1" sdk "go.temporal.io/sdk/client" sdklog "go.temporal.io/sdk/log" "google.golang.org/grpc" @@ -35,17 +37,19 @@ type temporalScaler struct { } type temporalMetadata struct { - Endpoint string `keda:"name=endpoint, order=triggerMetadata;resolvedEnv"` - Namespace string `keda:"name=namespace, order=triggerMetadata;resolvedEnv, default=default"` - ActivationTargetQueueSize int64 `keda:"name=activationTargetQueueSize, order=triggerMetadata, default=0"` - TargetQueueSize int64 `keda:"name=targetQueueSize, order=triggerMetadata, default=5"` - TaskQueue string `keda:"name=taskQueue, order=triggerMetadata;resolvedEnv"` - QueueTypes []string `keda:"name=queueTypes, order=triggerMetadata, optional"` - BuildID string `keda:"name=buildId, order=triggerMetadata;resolvedEnv, optional"` - AllActive bool `keda:"name=selectAllActive, order=triggerMetadata, default=false"` - Unversioned bool `keda:"name=selectUnversioned, order=triggerMetadata, default=false"` - APIKey string `keda:"name=apiKey, order=authParams;resolvedEnv, optional"` - MinConnectTimeout int `keda:"name=minConnectTimeout, order=triggerMetadata, default=5"` + Endpoint string `keda:"name=endpoint, order=triggerMetadata;resolvedEnv"` + Namespace string `keda:"name=namespace, order=triggerMetadata;resolvedEnv, default=default"` + ActivationTargetQueueSize int64 `keda:"name=activationTargetQueueSize, order=triggerMetadata, default=0"` + TargetQueueSize int64 `keda:"name=targetQueueSize, order=triggerMetadata, default=5"` + TaskQueue string `keda:"name=taskQueue, order=triggerMetadata;resolvedEnv"` + QueueTypes []string `keda:"name=queueTypes, order=triggerMetadata, optional"` + BuildID string `keda:"name=buildId, order=triggerMetadata;resolvedEnv, optional"` + AllActive bool `keda:"name=selectAllActive, order=triggerMetadata, default=false"` + Unversioned bool `keda:"name=selectUnversioned, order=triggerMetadata, default=false"` + IncludeRunningWorkflowCount bool `keda:"name=includeRunningWorkflowCount, order=triggerMetadata, default=false"` + WorkflowTaskQueueForCount string `keda:"name=workflowTaskQueueForCount, order=triggerMetadata;resolvedEnv, optional"` + APIKey string `keda:"name=apiKey, order=authParams;resolvedEnv, optional"` + MinConnectTimeout int `keda:"name=minConnectTimeout, order=triggerMetadata, default=5"` UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, optional"` Cert string `keda:"name=cert, order=authParams, optional"` @@ -73,6 +77,10 @@ func (a *temporalMetadata) Validate() error { return fmt.Errorf("minConnectTimeout must be a positive number") } + if a.WorkflowTaskQueueForCount != "" && !a.IncludeRunningWorkflowCount { + return fmt.Errorf("workflowTaskQueueForCount has no effect unless includeRunningWorkflowCount is true") + } + return nil } @@ -127,14 +135,24 @@ func (s *temporalScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpe } func (s *temporalScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { - queueSize, err := s.getQueueSize(ctx) + backlog, err := s.getQueueSize(ctx) if err != nil { return nil, false, fmt.Errorf("failed to get Temporal queue size: %w", err) } - metric := GenerateMetricInMili(metricName, float64(queueSize)) + metric := GenerateMetricInMili(metricName, float64(backlog)) - return []external_metrics.ExternalMetricValue{metric}, queueSize > s.metadata.ActivationTargetQueueSize, nil + isActive := backlog > s.metadata.ActivationTargetQueueSize + if !isActive && s.metadata.IncludeRunningWorkflowCount { + runningCount, err := s.getRunningWorkflowCount(ctx) + if err != nil { + s.logger.V(1).Info("failed to get running workflow count, skipping for activity check", "error", err) + } else { + isActive = runningCount > 0 + } + } + + return []external_metrics.ExternalMetricValue{metric}, isActive, nil } func (s *temporalScaler) getQueueSize(ctx context.Context) (int64, error) { @@ -162,6 +180,35 @@ func (s *temporalScaler) getQueueSize(ctx context.Context) (int64, error) { return getCombinedBacklogCount(resp), nil } +// validTaskQueueName matches task queue names containing only safe characters. +// Temporal task queue names support alphanumerics, hyphens, underscores, dots, forward slashes, and colons. +// Rejecting anything outside this set prevents query injection since the SDK offers no parameterized queries. +var validTaskQueueName = regexp.MustCompile(`^[a-zA-Z0-9\-_./:]+$`) + +// getRunningWorkflowCount returns the approximate number of running workflow executions +// for the task queue (or workflowTaskQueueForCount if set). Used to avoid premature +// scale-down when workers are fast and backlog is often zero. +func (s *temporalScaler) getRunningWorkflowCount(ctx context.Context) (int64, error) { + taskQueue := s.metadata.WorkflowTaskQueueForCount + if taskQueue == "" { + taskQueue = s.metadata.TaskQueue + } + if !validTaskQueueName.MatchString(taskQueue) { + return 0, fmt.Errorf("task queue name %q contains characters not allowed in visibility queries", taskQueue) + } + query := fmt.Sprintf("ExecutionStatus = 'Running' AND TaskQueue = '%s'", taskQueue) + + req := &workflowservice.CountWorkflowExecutionsRequest{ + Namespace: s.metadata.Namespace, + Query: query, + } + resp, err := s.tcl.CountWorkflow(ctx, req) + if err != nil { + return 0, fmt.Errorf("count workflow: %w", err) + } + return resp.GetCount(), nil +} + func getQueueTypes(queueTypes []string) []sdk.TaskQueueType { var taskQueueTypes []sdk.TaskQueueType for _, t := range queueTypes { diff --git a/pkg/scalers/temporal_scaler_test.go b/pkg/scalers/temporal_scaler_test.go index 149322ef27d..cf4918ca4b9 100644 --- a/pkg/scalers/temporal_scaler_test.go +++ b/pkg/scalers/temporal_scaler_test.go @@ -103,14 +103,15 @@ func TestParseTemporalMetadata(t *testing.T) { "namespace": "default", }, wantMeta: &temporalMetadata{ - Endpoint: "test:7233", - Namespace: "default", - TaskQueue: "", - TargetQueueSize: 5, - ActivationTargetQueueSize: 0, - AllActive: false, - Unversioned: false, - MinConnectTimeout: 5, + Endpoint: "test:7233", + Namespace: "default", + TaskQueue: "", + TargetQueueSize: 5, + ActivationTargetQueueSize: 0, + AllActive: false, + Unversioned: false, + IncludeRunningWorkflowCount: false, + MinConnectTimeout: 5, }, wantErr: true, }, @@ -121,14 +122,15 @@ func TestParseTemporalMetadata(t *testing.T) { "taskQueue": "testxx", }, wantMeta: &temporalMetadata{ - Endpoint: "test:7233", - Namespace: "default", - TaskQueue: "testxx", - TargetQueueSize: 5, - ActivationTargetQueueSize: 0, - AllActive: false, - Unversioned: false, - MinConnectTimeout: 5, + Endpoint: "test:7233", + Namespace: "default", + TaskQueue: "testxx", + TargetQueueSize: 5, + ActivationTargetQueueSize: 0, + AllActive: false, + Unversioned: false, + IncludeRunningWorkflowCount: false, + MinConnectTimeout: 5, }, wantErr: false, }, @@ -141,14 +143,15 @@ func TestParseTemporalMetadata(t *testing.T) { "activationTargetQueueSize": "12", }, wantMeta: &temporalMetadata{ - Endpoint: "test:7233", - Namespace: "default", - TaskQueue: "testxx", - TargetQueueSize: 5, - ActivationTargetQueueSize: 12, - AllActive: false, - Unversioned: false, - MinConnectTimeout: 5, + Endpoint: "test:7233", + Namespace: "default", + TaskQueue: "testxx", + TargetQueueSize: 5, + ActivationTargetQueueSize: 12, + AllActive: false, + Unversioned: false, + IncludeRunningWorkflowCount: false, + MinConnectTimeout: 5, }, wantErr: false, }, @@ -160,15 +163,16 @@ func TestParseTemporalMetadata(t *testing.T) { "taskQueue": "testxx", }, wantMeta: &temporalMetadata{ - Endpoint: "test:7233", - Namespace: "default", - TaskQueue: "testxx", - TargetQueueSize: 5, - ActivationTargetQueueSize: 0, - AllActive: false, - Unversioned: false, - APIKey: "test01", - MinConnectTimeout: 5, + Endpoint: "test:7233", + Namespace: "default", + TaskQueue: "testxx", + TargetQueueSize: 5, + ActivationTargetQueueSize: 0, + AllActive: false, + Unversioned: false, + IncludeRunningWorkflowCount: false, + APIKey: "test01", + MinConnectTimeout: 5, }, authParams: map[string]string{ "apiKey": "test01", @@ -184,15 +188,16 @@ func TestParseTemporalMetadata(t *testing.T) { "queueTypes": "workflow,activity", }, wantMeta: &temporalMetadata{ - Endpoint: "test:7233", - Namespace: "default", - TaskQueue: "testxx", - TargetQueueSize: 5, - ActivationTargetQueueSize: 0, - AllActive: false, - Unversioned: false, - QueueTypes: []string{"workflow", "activity"}, - MinConnectTimeout: 5, + Endpoint: "test:7233", + Namespace: "default", + TaskQueue: "testxx", + TargetQueueSize: 5, + ActivationTargetQueueSize: 0, + AllActive: false, + Unversioned: false, + IncludeRunningWorkflowCount: false, + QueueTypes: []string{"workflow", "activity"}, + MinConnectTimeout: 5, }, wantErr: false, }, @@ -209,15 +214,16 @@ func TestParseTemporalMetadata(t *testing.T) { "taskQueueFromEnv": "taskQueue", }, wantMeta: &temporalMetadata{ - Endpoint: "test:7233", - Namespace: "default", - TaskQueue: "testxx", - TargetQueueSize: 5, - ActivationTargetQueueSize: 0, - AllActive: false, - Unversioned: false, - APIKey: "test01", - MinConnectTimeout: 5, + Endpoint: "test:7233", + Namespace: "default", + TaskQueue: "testxx", + TargetQueueSize: 5, + ActivationTargetQueueSize: 0, + AllActive: false, + Unversioned: false, + IncludeRunningWorkflowCount: false, + APIKey: "test01", + MinConnectTimeout: 5, }, authParams: map[string]string{ "apiKey": "test01", @@ -233,15 +239,16 @@ func TestParseTemporalMetadata(t *testing.T) { "apiKey": "test-api-key", }, wantMeta: &temporalMetadata{ - Endpoint: "test:7233", - Namespace: "default", - TaskQueue: "testxx", - TargetQueueSize: 5, - ActivationTargetQueueSize: 0, - AllActive: false, - Unversioned: false, - APIKey: "test-api-key", - MinConnectTimeout: 5, + Endpoint: "test:7233", + Namespace: "default", + TaskQueue: "testxx", + TargetQueueSize: 5, + ActivationTargetQueueSize: 0, + AllActive: false, + Unversioned: false, + IncludeRunningWorkflowCount: false, + APIKey: "test-api-key", + MinConnectTimeout: 5, }, authParams: map[string]string{ "apiKey": "test-api-key", @@ -257,15 +264,16 @@ func TestParseTemporalMetadata(t *testing.T) { "tlsServerName": "my-namespace.tmpr.cloud", }, wantMeta: &temporalMetadata{ - Endpoint: "test:7233", - Namespace: "default", - TaskQueue: "testxx", - TargetQueueSize: 5, - ActivationTargetQueueSize: 0, - AllActive: false, - Unversioned: false, - MinConnectTimeout: 5, - TLSServerName: "my-namespace.tmpr.cloud", + Endpoint: "test:7233", + Namespace: "default", + TaskQueue: "testxx", + TargetQueueSize: 5, + ActivationTargetQueueSize: 0, + AllActive: false, + Unversioned: false, + IncludeRunningWorkflowCount: false, + MinConnectTimeout: 5, + TLSServerName: "my-namespace.tmpr.cloud", }, wantErr: false, }, @@ -281,16 +289,17 @@ func TestParseTemporalMetadata(t *testing.T) { "apiKey": "test01", }, wantMeta: &temporalMetadata{ - Endpoint: "test:7233", - Namespace: "default", - TaskQueue: "testxx", - TargetQueueSize: 5, - ActivationTargetQueueSize: 0, - AllActive: false, - Unversioned: false, - APIKey: "test01", - MinConnectTimeout: 5, - TLSServerName: "my-namespace.tmpr.cloud", + Endpoint: "test:7233", + Namespace: "default", + TaskQueue: "testxx", + TargetQueueSize: 5, + ActivationTargetQueueSize: 0, + AllActive: false, + Unversioned: false, + IncludeRunningWorkflowCount: false, + APIKey: "test01", + MinConnectTimeout: 5, + TLSServerName: "my-namespace.tmpr.cloud", }, wantErr: false, }, @@ -309,19 +318,63 @@ func TestParseTemporalMetadata(t *testing.T) { "ca": "ca-data", }, wantMeta: &temporalMetadata{ - Endpoint: "test:7233", - Namespace: "default", - TaskQueue: "testxx", - TargetQueueSize: 5, - ActivationTargetQueueSize: 0, - AllActive: false, - Unversioned: false, - Cert: "cert-data", - Key: "key-data", - KeyPassword: "password", - CA: "ca-data", - MinConnectTimeout: 5, - TLSServerName: "my-namespace.tmpr.cloud", + Endpoint: "test:7233", + Namespace: "default", + TaskQueue: "testxx", + TargetQueueSize: 5, + ActivationTargetQueueSize: 0, + AllActive: false, + Unversioned: false, + IncludeRunningWorkflowCount: false, + Cert: "cert-data", + Key: "key-data", + KeyPassword: "password", + CA: "ca-data", + MinConnectTimeout: 5, + TLSServerName: "my-namespace.tmpr.cloud", + }, + wantErr: false, + }, + { + name: "includeRunningWorkflowCount disabled", + metadata: map[string]string{ + "endpoint": "test:7233", + "namespace": "default", + "taskQueue": "testxx", + "includeRunningWorkflowCount": "false", + }, + wantMeta: &temporalMetadata{ + Endpoint: "test:7233", + Namespace: "default", + TaskQueue: "testxx", + TargetQueueSize: 5, + ActivationTargetQueueSize: 0, + AllActive: false, + Unversioned: false, + IncludeRunningWorkflowCount: false, + MinConnectTimeout: 5, + }, + wantErr: false, + }, + { + name: "workflowTaskQueueForCount set", + metadata: map[string]string{ + "endpoint": "test:7233", + "namespace": "default", + "taskQueue": "activity-queue", + "workflowTaskQueueForCount": "workflow-queue", + }, + wantMeta: &temporalMetadata{ + Endpoint: "test:7233", + Namespace: "default", + TaskQueue: "activity-queue", + TargetQueueSize: 5, + ActivationTargetQueueSize: 0, + AllActive: false, + Unversioned: false, + IncludeRunningWorkflowCount: false, + WorkflowTaskQueueForCount: "workflow-queue", + MinConnectTimeout: 5, }, wantErr: false, }, diff --git a/schema/generated/scalers-schema.json b/schema/generated/scalers-schema.json index 6441ad1f147..ab62903306d 100644 --- a/schema/generated/scalers-schema.json +++ b/schema/generated/scalers-schema.json @@ -5690,6 +5690,19 @@ "default": "false", "metadataVariableReadable": true }, + { + "name": "includeRunningWorkflowCount", + "type": "string", + "default": "false", + "metadataVariableReadable": true + }, + { + "name": "workflowTaskQueueForCount", + "type": "string", + "optional": true, + "metadataVariableReadable": true, + "envVariableReadable": true + }, { "name": "apiKey", "type": "string", diff --git a/schema/generated/scalers-schema.yaml b/schema/generated/scalers-schema.yaml index b99c434a54e..a1eccd9bbce 100644 --- a/schema/generated/scalers-schema.yaml +++ b/schema/generated/scalers-schema.yaml @@ -3727,6 +3727,15 @@ scalers: type: string default: "false" metadataVariableReadable: true + - name: includeRunningWorkflowCount + type: string + default: "false" + metadataVariableReadable: true + - name: workflowTaskQueueForCount + type: string + optional: true + metadataVariableReadable: true + envVariableReadable: true - name: apiKey type: string optional: true