From 3d8526d1169da2214b920777813125a92421127f Mon Sep 17 00:00:00 2001 From: Sanil2108 Date: Fri, 24 Apr 2026 15:07:10 +0530 Subject: [PATCH] feat(temporal): add composite metric (backlog + running workflow count) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add support for including running workflow count alongside task queue backlog as the scaling metric, so workers don't get prematurely scaled down when the backlog is frequently drained but workflows are still executing. New metadata fields: - includeRunningWorkflowCount (default: false) — when true, scale by backlog + number of running workflow executions. - workflowTaskQueueForCount (optional) — scope the running-count query by workflow task queue, useful when scaling activity workers whose task queue differs from the workflow task queue. Running-count query uses CountWorkflow with visibility filter ExecutionStatus = 'Running' [ AND TaskQueue = '' ]. On CountWorkflow failure the scaler falls back to backlog-only to avoid scale-to-zero on transient errors. Validate() rejects workflowTaskQueueForCount without includeRunningWorkflowCount so the parameter can't be silently ignored. Unit tests cover new metadata parsing, the composite path, and the validation rule. Ref: https://github.com/kedacore/keda/issues/7459 Signed-off-by: Sanil2108 --- CHANGELOG.md | 2 +- pkg/scalers/temporal_scaler.go | 75 +++++++-- pkg/scalers/temporal_scaler_test.go | 237 ++++++++++++++++----------- schema/generated/scalers-schema.json | 13 ++ schema/generated/scalers-schema.yaml | 9 + 5 files changed, 229 insertions(+), 107 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8f1c8d302b5..3a8707971cf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -68,7 +68,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio ### New -- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX)) +- **Temporal Scaler**: Add composite metric support (backlog + running workflow count) ([#7459](https://github.com/kedacore/keda/issues/7459)) #### Experimental diff --git a/pkg/scalers/temporal_scaler.go b/pkg/scalers/temporal_scaler.go index 91e6f273531..50790721849 100644 --- a/pkg/scalers/temporal_scaler.go +++ b/pkg/scalers/temporal_scaler.go @@ -5,9 +5,11 @@ import ( "crypto/tls" "fmt" "log/slog" + "regexp" "time" "github.com/go-logr/logr" + workflowservice "go.temporal.io/api/workflowservice/v1" sdk "go.temporal.io/sdk/client" sdklog "go.temporal.io/sdk/log" "google.golang.org/grpc" @@ -35,17 +37,19 @@ type temporalScaler struct { } type temporalMetadata struct { - Endpoint string `keda:"name=endpoint, order=triggerMetadata;resolvedEnv"` - Namespace string `keda:"name=namespace, order=triggerMetadata;resolvedEnv, default=default"` - ActivationTargetQueueSize int64 `keda:"name=activationTargetQueueSize, order=triggerMetadata, default=0"` - TargetQueueSize int64 `keda:"name=targetQueueSize, order=triggerMetadata, default=5"` - TaskQueue string `keda:"name=taskQueue, order=triggerMetadata;resolvedEnv"` - QueueTypes []string `keda:"name=queueTypes, order=triggerMetadata, optional"` - BuildID string `keda:"name=buildId, order=triggerMetadata;resolvedEnv, optional"` - AllActive bool `keda:"name=selectAllActive, order=triggerMetadata, default=false"` - Unversioned bool `keda:"name=selectUnversioned, order=triggerMetadata, default=false"` - APIKey string `keda:"name=apiKey, order=authParams;resolvedEnv, optional"` - MinConnectTimeout int `keda:"name=minConnectTimeout, order=triggerMetadata, default=5"` + Endpoint string `keda:"name=endpoint, order=triggerMetadata;resolvedEnv"` + Namespace string `keda:"name=namespace, order=triggerMetadata;resolvedEnv, default=default"` + ActivationTargetQueueSize int64 `keda:"name=activationTargetQueueSize, order=triggerMetadata, default=0"` + TargetQueueSize int64 `keda:"name=targetQueueSize, order=triggerMetadata, default=5"` + TaskQueue string `keda:"name=taskQueue, order=triggerMetadata;resolvedEnv"` + QueueTypes []string `keda:"name=queueTypes, order=triggerMetadata, optional"` + BuildID string `keda:"name=buildId, order=triggerMetadata;resolvedEnv, optional"` + AllActive bool `keda:"name=selectAllActive, order=triggerMetadata, default=false"` + Unversioned bool `keda:"name=selectUnversioned, order=triggerMetadata, default=false"` + IncludeRunningWorkflowCount bool `keda:"name=includeRunningWorkflowCount, order=triggerMetadata, default=false"` + WorkflowTaskQueueForCount string `keda:"name=workflowTaskQueueForCount, order=triggerMetadata;resolvedEnv, optional"` + APIKey string `keda:"name=apiKey, order=authParams;resolvedEnv, optional"` + MinConnectTimeout int `keda:"name=minConnectTimeout, order=triggerMetadata, default=5"` UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, optional"` Cert string `keda:"name=cert, order=authParams, optional"` @@ -73,6 +77,10 @@ func (a *temporalMetadata) Validate() error { return fmt.Errorf("minConnectTimeout must be a positive number") } + if a.WorkflowTaskQueueForCount != "" && !a.IncludeRunningWorkflowCount { + return fmt.Errorf("workflowTaskQueueForCount has no effect unless includeRunningWorkflowCount is true") + } + return nil } @@ -127,14 +135,24 @@ func (s *temporalScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpe } func (s *temporalScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { - queueSize, err := s.getQueueSize(ctx) + backlog, err := s.getQueueSize(ctx) if err != nil { return nil, false, fmt.Errorf("failed to get Temporal queue size: %w", err) } - metric := GenerateMetricInMili(metricName, float64(queueSize)) + metric := GenerateMetricInMili(metricName, float64(backlog)) - return []external_metrics.ExternalMetricValue{metric}, queueSize > s.metadata.ActivationTargetQueueSize, nil + isActive := backlog > s.metadata.ActivationTargetQueueSize + if !isActive && s.metadata.IncludeRunningWorkflowCount { + runningCount, err := s.getRunningWorkflowCount(ctx) + if err != nil { + s.logger.V(1).Info("failed to get running workflow count, skipping for activity check", "error", err) + } else { + isActive = runningCount > 0 + } + } + + return []external_metrics.ExternalMetricValue{metric}, isActive, nil } func (s *temporalScaler) getQueueSize(ctx context.Context) (int64, error) { @@ -162,6 +180,35 @@ func (s *temporalScaler) getQueueSize(ctx context.Context) (int64, error) { return getCombinedBacklogCount(resp), nil } +// validTaskQueueName matches task queue names containing only safe characters. +// Temporal task queue names support alphanumerics, hyphens, underscores, dots, forward slashes, and colons. +// Rejecting anything outside this set prevents query injection since the SDK offers no parameterized queries. +var validTaskQueueName = regexp.MustCompile(`^[a-zA-Z0-9\-_./:]+$`) + +// getRunningWorkflowCount returns the approximate number of running workflow executions +// for the task queue (or workflowTaskQueueForCount if set). Used to avoid premature +// scale-down when workers are fast and backlog is often zero. +func (s *temporalScaler) getRunningWorkflowCount(ctx context.Context) (int64, error) { + taskQueue := s.metadata.WorkflowTaskQueueForCount + if taskQueue == "" { + taskQueue = s.metadata.TaskQueue + } + if !validTaskQueueName.MatchString(taskQueue) { + return 0, fmt.Errorf("task queue name %q contains characters not allowed in visibility queries", taskQueue) + } + query := fmt.Sprintf("ExecutionStatus = 'Running' AND TaskQueue = '%s'", taskQueue) + + req := &workflowservice.CountWorkflowExecutionsRequest{ + Namespace: s.metadata.Namespace, + Query: query, + } + resp, err := s.tcl.CountWorkflow(ctx, req) + if err != nil { + return 0, fmt.Errorf("count workflow: %w", err) + } + return resp.GetCount(), nil +} + func getQueueTypes(queueTypes []string) []sdk.TaskQueueType { var taskQueueTypes []sdk.TaskQueueType for _, t := range queueTypes { diff --git a/pkg/scalers/temporal_scaler_test.go b/pkg/scalers/temporal_scaler_test.go index 149322ef27d..cf4918ca4b9 100644 --- a/pkg/scalers/temporal_scaler_test.go +++ b/pkg/scalers/temporal_scaler_test.go @@ -103,14 +103,15 @@ func TestParseTemporalMetadata(t *testing.T) { "namespace": "default", }, wantMeta: &temporalMetadata{ - Endpoint: "test:7233", - Namespace: "default", - TaskQueue: "", - TargetQueueSize: 5, - ActivationTargetQueueSize: 0, - AllActive: false, - Unversioned: false, - MinConnectTimeout: 5, + Endpoint: "test:7233", + Namespace: "default", + TaskQueue: "", + TargetQueueSize: 5, + ActivationTargetQueueSize: 0, + AllActive: false, + Unversioned: false, + IncludeRunningWorkflowCount: false, + MinConnectTimeout: 5, }, wantErr: true, }, @@ -121,14 +122,15 @@ func TestParseTemporalMetadata(t *testing.T) { "taskQueue": "testxx", }, wantMeta: &temporalMetadata{ - Endpoint: "test:7233", - Namespace: "default", - TaskQueue: "testxx", - TargetQueueSize: 5, - ActivationTargetQueueSize: 0, - AllActive: false, - Unversioned: false, - MinConnectTimeout: 5, + Endpoint: "test:7233", + Namespace: "default", + TaskQueue: "testxx", + TargetQueueSize: 5, + ActivationTargetQueueSize: 0, + AllActive: false, + Unversioned: false, + IncludeRunningWorkflowCount: false, + MinConnectTimeout: 5, }, wantErr: false, }, @@ -141,14 +143,15 @@ func TestParseTemporalMetadata(t *testing.T) { "activationTargetQueueSize": "12", }, wantMeta: &temporalMetadata{ - Endpoint: "test:7233", - Namespace: "default", - TaskQueue: "testxx", - TargetQueueSize: 5, - ActivationTargetQueueSize: 12, - AllActive: false, - Unversioned: false, - MinConnectTimeout: 5, + Endpoint: "test:7233", + Namespace: "default", + TaskQueue: "testxx", + TargetQueueSize: 5, + ActivationTargetQueueSize: 12, + AllActive: false, + Unversioned: false, + IncludeRunningWorkflowCount: false, + MinConnectTimeout: 5, }, wantErr: false, }, @@ -160,15 +163,16 @@ func TestParseTemporalMetadata(t *testing.T) { "taskQueue": "testxx", }, wantMeta: &temporalMetadata{ - Endpoint: "test:7233", - Namespace: "default", - TaskQueue: "testxx", - TargetQueueSize: 5, - ActivationTargetQueueSize: 0, - AllActive: false, - Unversioned: false, - APIKey: "test01", - MinConnectTimeout: 5, + Endpoint: "test:7233", + Namespace: "default", + TaskQueue: "testxx", + TargetQueueSize: 5, + ActivationTargetQueueSize: 0, + AllActive: false, + Unversioned: false, + IncludeRunningWorkflowCount: false, + APIKey: "test01", + MinConnectTimeout: 5, }, authParams: map[string]string{ "apiKey": "test01", @@ -184,15 +188,16 @@ func TestParseTemporalMetadata(t *testing.T) { "queueTypes": "workflow,activity", }, wantMeta: &temporalMetadata{ - Endpoint: "test:7233", - Namespace: "default", - TaskQueue: "testxx", - TargetQueueSize: 5, - ActivationTargetQueueSize: 0, - AllActive: false, - Unversioned: false, - QueueTypes: []string{"workflow", "activity"}, - MinConnectTimeout: 5, + Endpoint: "test:7233", + Namespace: "default", + TaskQueue: "testxx", + TargetQueueSize: 5, + ActivationTargetQueueSize: 0, + AllActive: false, + Unversioned: false, + IncludeRunningWorkflowCount: false, + QueueTypes: []string{"workflow", "activity"}, + MinConnectTimeout: 5, }, wantErr: false, }, @@ -209,15 +214,16 @@ func TestParseTemporalMetadata(t *testing.T) { "taskQueueFromEnv": "taskQueue", }, wantMeta: &temporalMetadata{ - Endpoint: "test:7233", - Namespace: "default", - TaskQueue: "testxx", - TargetQueueSize: 5, - ActivationTargetQueueSize: 0, - AllActive: false, - Unversioned: false, - APIKey: "test01", - MinConnectTimeout: 5, + Endpoint: "test:7233", + Namespace: "default", + TaskQueue: "testxx", + TargetQueueSize: 5, + ActivationTargetQueueSize: 0, + AllActive: false, + Unversioned: false, + IncludeRunningWorkflowCount: false, + APIKey: "test01", + MinConnectTimeout: 5, }, authParams: map[string]string{ "apiKey": "test01", @@ -233,15 +239,16 @@ func TestParseTemporalMetadata(t *testing.T) { "apiKey": "test-api-key", }, wantMeta: &temporalMetadata{ - Endpoint: "test:7233", - Namespace: "default", - TaskQueue: "testxx", - TargetQueueSize: 5, - ActivationTargetQueueSize: 0, - AllActive: false, - Unversioned: false, - APIKey: "test-api-key", - MinConnectTimeout: 5, + Endpoint: "test:7233", + Namespace: "default", + TaskQueue: "testxx", + TargetQueueSize: 5, + ActivationTargetQueueSize: 0, + AllActive: false, + Unversioned: false, + IncludeRunningWorkflowCount: false, + APIKey: "test-api-key", + MinConnectTimeout: 5, }, authParams: map[string]string{ "apiKey": "test-api-key", @@ -257,15 +264,16 @@ func TestParseTemporalMetadata(t *testing.T) { "tlsServerName": "my-namespace.tmpr.cloud", }, wantMeta: &temporalMetadata{ - Endpoint: "test:7233", - Namespace: "default", - TaskQueue: "testxx", - TargetQueueSize: 5, - ActivationTargetQueueSize: 0, - AllActive: false, - Unversioned: false, - MinConnectTimeout: 5, - TLSServerName: "my-namespace.tmpr.cloud", + Endpoint: "test:7233", + Namespace: "default", + TaskQueue: "testxx", + TargetQueueSize: 5, + ActivationTargetQueueSize: 0, + AllActive: false, + Unversioned: false, + IncludeRunningWorkflowCount: false, + MinConnectTimeout: 5, + TLSServerName: "my-namespace.tmpr.cloud", }, wantErr: false, }, @@ -281,16 +289,17 @@ func TestParseTemporalMetadata(t *testing.T) { "apiKey": "test01", }, wantMeta: &temporalMetadata{ - Endpoint: "test:7233", - Namespace: "default", - TaskQueue: "testxx", - TargetQueueSize: 5, - ActivationTargetQueueSize: 0, - AllActive: false, - Unversioned: false, - APIKey: "test01", - MinConnectTimeout: 5, - TLSServerName: "my-namespace.tmpr.cloud", + Endpoint: "test:7233", + Namespace: "default", + TaskQueue: "testxx", + TargetQueueSize: 5, + ActivationTargetQueueSize: 0, + AllActive: false, + Unversioned: false, + IncludeRunningWorkflowCount: false, + APIKey: "test01", + MinConnectTimeout: 5, + TLSServerName: "my-namespace.tmpr.cloud", }, wantErr: false, }, @@ -309,19 +318,63 @@ func TestParseTemporalMetadata(t *testing.T) { "ca": "ca-data", }, wantMeta: &temporalMetadata{ - Endpoint: "test:7233", - Namespace: "default", - TaskQueue: "testxx", - TargetQueueSize: 5, - ActivationTargetQueueSize: 0, - AllActive: false, - Unversioned: false, - Cert: "cert-data", - Key: "key-data", - KeyPassword: "password", - CA: "ca-data", - MinConnectTimeout: 5, - TLSServerName: "my-namespace.tmpr.cloud", + Endpoint: "test:7233", + Namespace: "default", + TaskQueue: "testxx", + TargetQueueSize: 5, + ActivationTargetQueueSize: 0, + AllActive: false, + Unversioned: false, + IncludeRunningWorkflowCount: false, + Cert: "cert-data", + Key: "key-data", + KeyPassword: "password", + CA: "ca-data", + MinConnectTimeout: 5, + TLSServerName: "my-namespace.tmpr.cloud", + }, + wantErr: false, + }, + { + name: "includeRunningWorkflowCount disabled", + metadata: map[string]string{ + "endpoint": "test:7233", + "namespace": "default", + "taskQueue": "testxx", + "includeRunningWorkflowCount": "false", + }, + wantMeta: &temporalMetadata{ + Endpoint: "test:7233", + Namespace: "default", + TaskQueue: "testxx", + TargetQueueSize: 5, + ActivationTargetQueueSize: 0, + AllActive: false, + Unversioned: false, + IncludeRunningWorkflowCount: false, + MinConnectTimeout: 5, + }, + wantErr: false, + }, + { + name: "workflowTaskQueueForCount set", + metadata: map[string]string{ + "endpoint": "test:7233", + "namespace": "default", + "taskQueue": "activity-queue", + "workflowTaskQueueForCount": "workflow-queue", + }, + wantMeta: &temporalMetadata{ + Endpoint: "test:7233", + Namespace: "default", + TaskQueue: "activity-queue", + TargetQueueSize: 5, + ActivationTargetQueueSize: 0, + AllActive: false, + Unversioned: false, + IncludeRunningWorkflowCount: false, + WorkflowTaskQueueForCount: "workflow-queue", + MinConnectTimeout: 5, }, wantErr: false, }, diff --git a/schema/generated/scalers-schema.json b/schema/generated/scalers-schema.json index 6441ad1f147..ab62903306d 100644 --- a/schema/generated/scalers-schema.json +++ b/schema/generated/scalers-schema.json @@ -5690,6 +5690,19 @@ "default": "false", "metadataVariableReadable": true }, + { + "name": "includeRunningWorkflowCount", + "type": "string", + "default": "false", + "metadataVariableReadable": true + }, + { + "name": "workflowTaskQueueForCount", + "type": "string", + "optional": true, + "metadataVariableReadable": true, + "envVariableReadable": true + }, { "name": "apiKey", "type": "string", diff --git a/schema/generated/scalers-schema.yaml b/schema/generated/scalers-schema.yaml index b99c434a54e..a1eccd9bbce 100644 --- a/schema/generated/scalers-schema.yaml +++ b/schema/generated/scalers-schema.yaml @@ -3727,6 +3727,15 @@ scalers: type: string default: "false" metadataVariableReadable: true + - name: includeRunningWorkflowCount + type: string + default: "false" + metadataVariableReadable: true + - name: workflowTaskQueueForCount + type: string + optional: true + metadataVariableReadable: true + envVariableReadable: true - name: apiKey type: string optional: true