From 1cdc531f3a89bd506d878ce79b644ea9a0d3c5f3 Mon Sep 17 00:00:00 2001 From: Jacob Moody Date: Mon, 20 Apr 2026 11:21:17 -0500 Subject: [PATCH 1/8] first pass --- internal/temporalcli/commands.gen.go | 10 +- .../temporalcli/commands.taskqueue_config.go | 70 +++++++++++ .../commands.taskqueue_config_test.go | 112 ++++++++++++++++++ internal/temporalcli/commands.yaml | 23 +++- 4 files changed, 212 insertions(+), 3 deletions(-) diff --git a/internal/temporalcli/commands.gen.go b/internal/temporalcli/commands.gen.go index ad01d3083..bfe4451f0 100644 --- a/internal/temporalcli/commands.gen.go +++ b/internal/temporalcli/commands.gen.go @@ -2227,6 +2227,9 @@ type TemporalTaskQueueConfigSetCommand struct { QueueRpsLimitReason string FairnessKeyRpsLimitDefault string FairnessKeyRpsLimitReason string + FairnessKeyWeightSet []string + FairnessKeyWeightUnset []string + FairnessKeyWeightUnsetAll bool } func NewTemporalTaskQueueConfigSetCommand(cctx *CommandContext, parent *TemporalTaskQueueConfigCommand) *TemporalTaskQueueConfigSetCommand { @@ -2236,9 +2239,9 @@ func NewTemporalTaskQueueConfigSetCommand(cctx *CommandContext, parent *Temporal s.Command.Use = "set [flags]" s.Command.Short = "Set Task Queue configuration" if hasHighlighting { - s.Command.Long = "Update configuration settings for a Task Queue.\n\n\x1b[1mtemporal task-queue config set \\\n --task-queue YourTaskQueue \\\n --task-queue-type activity \\\n --namespace YourNamespace \\\n --queue-rps-limit \\\n --queue-rps-limit-reason \\\n --fairness-key-rps-limit-default \\\n --fairness-key-rps-limit-reason \x1b[0m\n\nThis command supports updating:\n- Queue rate limits: Controls the overall rate limit of the task queue.\n This setting overrides the worker rate limit if set.\n Unless modified, this is the system-defined rate limit.\n- Fairness key rate limit defaults: Sets default rate limits for fairness keys.\n If set, each individual fairness key will be limited to this rate,\n scaled by the weight of the fairness key.\n\nTo unset a rate limit, pass in 'default', for example: --queue-rps-limit default" + s.Command.Long = "Update configuration settings for a Task Queue.\n\n\x1b[1mtemporal task-queue config set \\\n --task-queue YourTaskQueue \\\n --task-queue-type activity \\\n --namespace YourNamespace \\\n --queue-rps-limit \\\n --queue-rps-limit-reason \\\n --fairness-key-rps-limit-default \\\n --fairness-key-rps-limit-reason \\\n --fairness-key-weight-set HighPriority=2.0 \\\n --fairness-key-weight-set LowPriority=0.5\x1b[0m\n\nThis command supports updating:\n- Queue rate limits: Controls the overall rate limit of the task queue.\n This setting overrides the worker rate limit if set.\n Unless modified, this is the system-defined rate limit.\n- Fairness key rate limit defaults: Sets default rate limits for fairness keys.\n If set, each individual fairness key will be limited to this rate,\n scaled by the weight of the fairness key.\n- Fairness key weight overrides: Set custom weights for specific fairness keys.\n Weights control the relative share of capacity each key receives.\n\nTo unset a rate limit, pass in 'default', for example: --queue-rps-limit default\nTo unset specific fairness weights, use --fairness-key-weight-unset \nTo unset all fairness weights, use --fairness-key-weight-unset-all" } else { - s.Command.Long = "Update configuration settings for a Task Queue.\n\n```\ntemporal task-queue config set \\\n --task-queue YourTaskQueue \\\n --task-queue-type activity \\\n --namespace YourNamespace \\\n --queue-rps-limit \\\n --queue-rps-limit-reason \\\n --fairness-key-rps-limit-default \\\n --fairness-key-rps-limit-reason \n```\n\nThis command supports updating:\n- Queue rate limits: Controls the overall rate limit of the task queue.\n This setting overrides the worker rate limit if set.\n Unless modified, this is the system-defined rate limit.\n- Fairness key rate limit defaults: Sets default rate limits for fairness keys.\n If set, each individual fairness key will be limited to this rate,\n scaled by the weight of the fairness key.\n\nTo unset a rate limit, pass in 'default', for example: --queue-rps-limit default" + s.Command.Long = "Update configuration settings for a Task Queue.\n\n```\ntemporal task-queue config set \\\n --task-queue YourTaskQueue \\\n --task-queue-type activity \\\n --namespace YourNamespace \\\n --queue-rps-limit \\\n --queue-rps-limit-reason \\\n --fairness-key-rps-limit-default \\\n --fairness-key-rps-limit-reason \\\n --fairness-key-weight-set HighPriority=2.0 \\\n --fairness-key-weight-set LowPriority=0.5\n```\n\nThis command supports updating:\n- Queue rate limits: Controls the overall rate limit of the task queue.\n This setting overrides the worker rate limit if set.\n Unless modified, this is the system-defined rate limit.\n- Fairness key rate limit defaults: Sets default rate limits for fairness keys.\n If set, each individual fairness key will be limited to this rate,\n scaled by the weight of the fairness key.\n- Fairness key weight overrides: Set custom weights for specific fairness keys.\n Weights control the relative share of capacity each key receives.\n\nTo unset a rate limit, pass in 'default', for example: --queue-rps-limit default\nTo unset specific fairness weights, use --fairness-key-weight-unset \nTo unset all fairness weights, use --fairness-key-weight-unset-all" } s.Command.Args = cobra.NoArgs s.Command.Flags().StringVarP(&s.TaskQueue, "task-queue", "t", "", "Task Queue name. Required.") @@ -2252,6 +2255,9 @@ func NewTemporalTaskQueueConfigSetCommand(cctx *CommandContext, parent *Temporal s.Command.Flags().StringVar(&s.FairnessKeyRpsLimitDefault, "fairness-key-rps-limit-default", "", "Fairness key rate limit default in requests per second. Accepts a float; or 'default' to unset.") overrideFlagDisplayType(s.Command.Flags().Lookup("fairness-key-rps-limit-default"), "float|default") s.Command.Flags().StringVar(&s.FairnessKeyRpsLimitReason, "fairness-key-rps-limit-reason", "", "Reason for fairness key rate limit update.") + s.Command.Flags().StringArrayVar(&s.FairnessKeyWeightSet, "fairness-key-weight-set", nil, "Set fairness key weight overrides in format key=weight. Can be specified multiple times. Example: --fairness-key-weight-set HighPriority=2.0 --fairness-key-weight-set LowPriority=0.5.") + s.Command.Flags().StringArrayVar(&s.FairnessKeyWeightUnset, "fairness-key-weight-unset", nil, "Unset specific fairness key weight overrides. Can be specified multiple times. Example: --fairness-key-weight-unset HighPriority --fairness-key-weight-unset LowPriority.") + s.Command.Flags().BoolVar(&s.FairnessKeyWeightUnsetAll, "fairness-key-weight-unset-all", false, "Unset all fairness key weight overrides. Cannot be used with --fairness-key-weight-set or --fairness-key-weight-unset.") s.Command.Run = func(c *cobra.Command, args []string) { if err := s.run(cctx, args); err != nil { cctx.Options.Fail(err) diff --git a/internal/temporalcli/commands.taskqueue_config.go b/internal/temporalcli/commands.taskqueue_config.go index 41a42b37f..1ce7f370d 100644 --- a/internal/temporalcli/commands.taskqueue_config.go +++ b/internal/temporalcli/commands.taskqueue_config.go @@ -56,6 +56,30 @@ func (c *TemporalTaskQueueConfigGetCommand) run(cctx *CommandContext, args []str return printTaskQueueConfig(cctx, resp.Config) } +// parseFairnessKeyWeights parses "key=weight" format strings into a map +func parseFairnessKeyWeights(inputs []string) (map[string]float32, error) { + weights := make(map[string]float32) + for _, input := range inputs { + parts := strings.SplitN(input, "=", 2) + if len(parts) != 2 { + return nil, fmt.Errorf("invalid format: %s (expected key=weight)", input) + } + key := strings.TrimSpace(parts[0]) + if key == "" { + return nil, fmt.Errorf("empty key in: %s", input) + } + weight, err := strconv.ParseFloat(strings.TrimSpace(parts[1]), 32) + if err != nil { + return nil, fmt.Errorf("invalid weight in %s: %w", input, err) + } + if weight < 0 { + return nil, fmt.Errorf("weight must be non-negative in: %s", input) + } + weights[key] = float32(weight) + } + return weights, nil +} + // TaskQueueConfigSetCommand handles setting task queue configuration func (c *TemporalTaskQueueConfigSetCommand) run(cctx *CommandContext, args []string) error { // Validate inputs before dialing client @@ -152,6 +176,52 @@ func (c *TemporalTaskQueueConfigSetCommand) run(cctx *CommandContext, args []str } } + // Handle fairness weight overrides + // Validate mutual exclusivity + if c.FairnessKeyWeightUnsetAll { + if len(c.FairnessKeyWeightSet) > 0 || len(c.FairnessKeyWeightUnset) > 0 { + return fmt.Errorf("--fairness-key-weight-unset-all cannot be used with --fairness-key-weight-set or --fairness-key-weight-unset") + } + } + + // Handle set operations + if len(c.FairnessKeyWeightSet) > 0 { + weights, err := parseFairnessKeyWeights(c.FairnessKeyWeightSet) + if err != nil { + return err + } + request.SetFairnessWeightOverrides = weights + } + + // Handle unset operations + if len(c.FairnessKeyWeightUnset) > 0 { + request.UnsetFairnessWeightOverrides = c.FairnessKeyWeightUnset + } + + // Handle unset all + if c.FairnessKeyWeightUnsetAll { + // Need to fetch current config to get all keys to unset + descResp, err := cl.WorkflowService().DescribeTaskQueue(cctx, &workflowservice.DescribeTaskQueueRequest{ + Namespace: namespace, + TaskQueue: &taskqueue.TaskQueue{ + Name: taskQueue, + Kind: enums.TASK_QUEUE_KIND_NORMAL, + }, + TaskQueueType: taskQueueType, + ReportConfig: true, + }) + if err != nil { + return fmt.Errorf("error fetching current config for unset-all: %w", err) + } + if descResp.Config != nil && descResp.Config.FairnessWeightOverrides != nil { + keys := make([]string, 0, len(descResp.Config.FairnessWeightOverrides)) + for key := range descResp.Config.FairnessWeightOverrides { + keys = append(keys, key) + } + request.UnsetFairnessWeightOverrides = keys + } + } + // Call the API resp, err := cl.WorkflowService().UpdateTaskQueueConfig(cctx, request) if err != nil { diff --git a/internal/temporalcli/commands.taskqueue_config_test.go b/internal/temporalcli/commands.taskqueue_config_test.go index 9939844dc..046875f3a 100644 --- a/internal/temporalcli/commands.taskqueue_config_test.go +++ b/internal/temporalcli/commands.taskqueue_config_test.go @@ -9,6 +9,7 @@ import ( type taskQueueConfigType struct { QueueRateLimit *rateLimitConfigType `json:"queueRateLimit,omitempty"` FairnessKeysRateLimitDefault *rateLimitConfigType `json:"fairnessKeysRateLimitDefault,omitempty"` + FairnessWeightOverrides map[string]float32 `json:"fairnessWeightOverrides,omitempty"` } type rateLimitConfigType struct { @@ -325,3 +326,114 @@ namespace = "%s" s.Contains(res.Stdout.String(), "No configuration found for task queue", "CLI flag should override envconfig and query default namespace") } + +func (s *SharedServerSuite) TestTaskQueue_Config_FairnessWeightOverrides() { + taskQueue := "test-config-queue-" + s.T().Name() + + // Set fairness weight overrides + res := s.Execute( + "task-queue", "config", "set", + "--address", s.Address(), + "--task-queue", taskQueue, + "--task-queue-type", "activity", + "--fairness-key-weight-set", "HighPriority=2.0", + "--fairness-key-weight-set", "LowPriority=0.5", + ) + s.NoError(res.Err) + s.Contains(res.Stdout.String(), "Successfully updated task queue configuration") + + // Get the configuration and verify weights were set + res = s.Execute( + "task-queue", "config", "get", + "--address", s.Address(), + "--task-queue", taskQueue, + "--task-queue-type", "activity", + "-o", "json", + ) + s.NoError(res.Err) + + var config taskQueueConfigType + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &config)) + s.NotNil(config.FairnessWeightOverrides) + s.Equal(float32(2.0), config.FairnessWeightOverrides["HighPriority"]) + s.Equal(float32(0.5), config.FairnessWeightOverrides["LowPriority"]) + + // Unset one weight + res = s.Execute( + "task-queue", "config", "set", + "--address", s.Address(), + "--task-queue", taskQueue, + "--task-queue-type", "activity", + "--fairness-key-weight-unset", "LowPriority", + ) + s.NoError(res.Err) + + // Verify only HighPriority remains + res = s.Execute( + "task-queue", "config", "get", + "--address", s.Address(), + "--task-queue", taskQueue, + "--task-queue-type", "activity", + "-o", "json", + ) + s.NoError(res.Err) + + var config2 taskQueueConfigType + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &config2)) + s.NotNil(config2.FairnessWeightOverrides) + s.Equal(float32(2.0), config2.FairnessWeightOverrides["HighPriority"]) + s.NotContains(config2.FairnessWeightOverrides, "LowPriority") + + // Add more weights + res = s.Execute( + "task-queue", "config", "set", + "--address", s.Address(), + "--task-queue", taskQueue, + "--task-queue-type", "activity", + "--fairness-key-weight-set", "MediumPriority=1.5", + "--fairness-key-weight-set", "LowPriority=0.3", + ) + s.NoError(res.Err) + + // Verify all three weights exist + res = s.Execute( + "task-queue", "config", "get", + "--address", s.Address(), + "--task-queue", taskQueue, + "--task-queue-type", "activity", + "-o", "json", + ) + s.NoError(res.Err) + + var config3 taskQueueConfigType + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &config3)) + s.NotNil(config3.FairnessWeightOverrides) + s.Len(config3.FairnessWeightOverrides, 3) + s.Equal(float32(2.0), config3.FairnessWeightOverrides["HighPriority"]) + s.Equal(float32(1.5), config3.FairnessWeightOverrides["MediumPriority"]) + s.Equal(float32(0.3), config3.FairnessWeightOverrides["LowPriority"]) + + // Unset all weights + res = s.Execute( + "task-queue", "config", "set", + "--address", s.Address(), + "--task-queue", taskQueue, + "--task-queue-type", "activity", + "--fairness-key-weight-unset-all", + ) + s.NoError(res.Err) + + // Verify all weights are gone + res = s.Execute( + "task-queue", "config", "get", + "--address", s.Address(), + "--task-queue", taskQueue, + "--task-queue-type", "activity", + "-o", "json", + ) + s.NoError(res.Err) + + var config4 taskQueueConfigType + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &config4)) + s.Empty(config4.FairnessWeightOverrides) +} diff --git a/internal/temporalcli/commands.yaml b/internal/temporalcli/commands.yaml index 5f4294f8f..e644d2cba 100644 --- a/internal/temporalcli/commands.yaml +++ b/internal/temporalcli/commands.yaml @@ -3231,7 +3231,9 @@ commands: --queue-rps-limit \ --queue-rps-limit-reason \ --fairness-key-rps-limit-default \ - --fairness-key-rps-limit-reason + --fairness-key-rps-limit-reason \ + --fairness-key-weight-set HighPriority=2.0 \ + --fairness-key-weight-set LowPriority=0.5 ``` This command supports updating: @@ -3241,8 +3243,12 @@ commands: - Fairness key rate limit defaults: Sets default rate limits for fairness keys. If set, each individual fairness key will be limited to this rate, scaled by the weight of the fairness key. + - Fairness key weight overrides: Set custom weights for specific fairness keys. + Weights control the relative share of capacity each key receives. To unset a rate limit, pass in 'default', for example: --queue-rps-limit default + To unset specific fairness weights, use --fairness-key-weight-unset + To unset all fairness weights, use --fairness-key-weight-unset-all options: - name: task-queue type: string @@ -3278,6 +3284,21 @@ commands: - name: fairness-key-rps-limit-reason type: string description: Reason for fairness key rate limit update. + - name: fairness-key-weight-set + type: string[] + description: | + Set fairness key weight overrides in format key=weight. + Can be specified multiple times. Example: --fairness-key-weight-set HighPriority=2.0 --fairness-key-weight-set LowPriority=0.5. + - name: fairness-key-weight-unset + type: string[] + description: | + Unset specific fairness key weight overrides. + Can be specified multiple times. Example: --fairness-key-weight-unset HighPriority --fairness-key-weight-unset LowPriority. + - name: fairness-key-weight-unset-all + type: bool + description: | + Unset all fairness key weight overrides. + Cannot be used with --fairness-key-weight-set or --fairness-key-weight-unset. - name: temporal workflow summary: Start, list, and operate on Workflows From 424df227e8a8035fa2e57b630c56c855c732530a Mon Sep 17 00:00:00 2001 From: Jacob Moody Date: Mon, 20 Apr 2026 11:43:09 -0500 Subject: [PATCH 2/8] another pass --- internal/temporalcli/commands.gen.go | 2 +- .../temporalcli/commands.taskqueue_config.go | 181 +++++++++++++++--- .../commands.taskqueue_config_test.go | 141 ++++++++++++++ internal/temporalcli/commands.yaml | 2 +- 4 files changed, 296 insertions(+), 30 deletions(-) diff --git a/internal/temporalcli/commands.gen.go b/internal/temporalcli/commands.gen.go index bfe4451f0..794d06d0a 100644 --- a/internal/temporalcli/commands.gen.go +++ b/internal/temporalcli/commands.gen.go @@ -233,7 +233,7 @@ func (v *SharedWorkflowStartOptions) BuildFlags(f *pflag.FlagSet) { f.StringVar(&v.StaticDetails, "static-details", "", "Static Workflow details for human consumption in UIs. Uses Temporal Markdown formatting, may be multiple lines. EXPERIMENTAL.") f.IntVar(&v.PriorityKey, "priority-key", 0, "Priority key (1-5, lower numbers = higher priority). Tasks in a queue should be processed in close-to-priority-order. Default is 3 when not specified.") f.StringVar(&v.FairnessKey, "fairness-key", "", "Fairness key (max 64 bytes) for proportional task dispatch. Tasks with same key share capacity based on their weight.") - f.Float32Var(&v.FairnessWeight, "fairness-weight", 0, "Weight [0.001-1000] for this fairness key. Keys are dispatched proportionally to their weights.") + f.Float32Var(&v.FairnessWeight, "fairness-weight", 0, "Weight (minimum 0.001) for this fairness key. Keys are dispatched proportionally to their weights.") } type WorkflowStartOptions struct { diff --git a/internal/temporalcli/commands.taskqueue_config.go b/internal/temporalcli/commands.taskqueue_config.go index 1ce7f370d..9d90075cc 100644 --- a/internal/temporalcli/commands.taskqueue_config.go +++ b/internal/temporalcli/commands.taskqueue_config.go @@ -14,9 +14,9 @@ import ( // TaskQueueConfigGetCommand handles getting task queue configuration func (c *TemporalTaskQueueConfigGetCommand) run(cctx *CommandContext, args []string) error { // Validate inputs before dialing client - taskQueue := c.TaskQueue + taskQueue := strings.TrimSpace(c.TaskQueue) if taskQueue == "" { - return fmt.Errorf("taskQueue name is required") + return fmt.Errorf("task queue name is required and cannot be empty") } taskQueueType, err := parseTaskQueueType(c.TaskQueueType.Value) @@ -56,36 +56,101 @@ func (c *TemporalTaskQueueConfigGetCommand) run(cctx *CommandContext, args []str return printTaskQueueConfig(cctx, resp.Config) } +const ( + // maxFairnessKeyLength matches server-side limit in temporal/common/priorities/priority_util.go + maxFairnessKeyLength = 64 + + // minFairnessWeight matches server-side limit in temporal/service/matching/fairness_util.go + // Server clamps weights to this minimum when applying them + minFairnessWeight = 0.001 +) + // parseFairnessKeyWeights parses "key=weight" format strings into a map +// Returns an error if there are duplicate keys, invalid weights, or malformed input func parseFairnessKeyWeights(inputs []string) (map[string]float32, error) { weights := make(map[string]float32) + seen := make(map[string]bool) + for _, input := range inputs { parts := strings.SplitN(input, "=", 2) if len(parts) != 2 { - return nil, fmt.Errorf("invalid format: %s (expected key=weight)", input) + return nil, fmt.Errorf("invalid format: %q (expected key=weight)", input) } + key := strings.TrimSpace(parts[0]) if key == "" { - return nil, fmt.Errorf("empty key in: %s", input) + return nil, fmt.Errorf("empty key in: %q", input) + } + + // Check for duplicate keys + if seen[key] { + return nil, fmt.Errorf("duplicate fairness key %q specified multiple times", key) + } + seen[key] = true + + // Validate key length (server enforces 64 byte limit) + if len(key) > maxFairnessKeyLength { + return nil, fmt.Errorf("fairness key %q exceeds maximum length of %d bytes", key, maxFairnessKeyLength) + } + + weightStr := strings.TrimSpace(parts[1]) + if weightStr == "" { + return nil, fmt.Errorf("empty weight value for key %q", key) } - weight, err := strconv.ParseFloat(strings.TrimSpace(parts[1]), 32) + + weight, err := strconv.ParseFloat(weightStr, 32) if err != nil { - return nil, fmt.Errorf("invalid weight in %s: %w", input, err) + return nil, fmt.Errorf("invalid weight %q for key %q: must be a number", weightStr, key) } - if weight < 0 { - return nil, fmt.Errorf("weight must be non-negative in: %s", input) + + // Validate weight bounds - matches server-side validation + // Server only validates weight > 0 and clamps to minWeight at runtime + if weight < minFairnessWeight { + return nil, fmt.Errorf("weight %.3f for key %q is below minimum %.3f", weight, key, minFairnessWeight) } + weights[key] = float32(weight) } return weights, nil } +// validateFairnessKeyNames validates that key names are non-empty and within length limits +func validateFairnessKeyNames(keys []string) error { + seen := make(map[string]bool) + for _, key := range keys { + trimmed := strings.TrimSpace(key) + if trimmed == "" { + return fmt.Errorf("empty fairness key name") + } + if seen[trimmed] { + return fmt.Errorf("duplicate fairness key %q specified multiple times", trimmed) + } + seen[trimmed] = true + + if len(trimmed) > maxFairnessKeyLength { + return fmt.Errorf("fairness key %q exceeds maximum length of %d bytes", trimmed, maxFairnessKeyLength) + } + } + return nil +} + +// findConflictingKeys returns keys that appear in both set and unset lists +func findConflictingKeys(setWeights map[string]float32, unsetKeys []string) []string { + conflicts := []string{} + for _, key := range unsetKeys { + if _, exists := setWeights[key]; exists { + conflicts = append(conflicts, key) + } + } + return conflicts +} + // TaskQueueConfigSetCommand handles setting task queue configuration func (c *TemporalTaskQueueConfigSetCommand) run(cctx *CommandContext, args []string) error { // Validate inputs before dialing client - taskQueue := c.TaskQueue + taskQueue := strings.TrimSpace(c.TaskQueue) if taskQueue == "" { - return fmt.Errorf("taskQueue name is required") + return fmt.Errorf("task queue name is required and cannot be empty") } taskQueueType, err := parseTaskQueueType(c.TaskQueueType.Value) @@ -103,43 +168,73 @@ func (c *TemporalTaskQueueConfigSetCommand) run(cctx *CommandContext, args []str // Helper to parse RPS values for a given flag name. // Accepts "default" or a non-negative float string. - parseRPS := func(flagName string) (*taskqueue.RateLimit, error) { + // Returns (rateLimit, isZero, error) + parseRPS := func(flagName string) (*taskqueue.RateLimit, bool, error) { raw := strings.TrimSpace(c.Command.Flags().Lookup(flagName).Value.String()) if raw == "" { - return nil, fmt.Errorf("invalid value for --%s: must be a non-negative number or 'default'", flagName) + return nil, false, fmt.Errorf("invalid value for --%s: must be a non-negative number or 'default'", flagName) } if strings.EqualFold(raw, "default") { // Unset: returning nil RateLimit removes the existing rate limit. - return nil, nil + return nil, false, nil } v, err := strconv.ParseFloat(raw, 32) if err != nil { - return nil, fmt.Errorf("invalid value for --%s: must be a non-negative number or 'default'", flagName) + return nil, false, fmt.Errorf("invalid value for --%s: must be a non-negative number or 'default'", flagName) } if v < 0 { - return nil, fmt.Errorf("invalid value for --%s: must be >= 0 or 'default'", flagName) + return nil, false, fmt.Errorf("invalid value for --%s: must be >= 0 or 'default'", flagName) } - return &taskqueue.RateLimit{RequestsPerSecond: float32(v)}, nil + isZero := v == 0 + return &taskqueue.RateLimit{RequestsPerSecond: float32(v)}, isZero, nil } + // Parse and validate queue rate limit var queueRpsLimitParsed *taskqueue.RateLimit + var queueRateLimitIsZero bool if c.Command.Flags().Changed("queue-rps-limit") { var err error - if queueRpsLimitParsed, err = parseRPS("queue-rps-limit"); err != nil { + queueRpsLimitParsed, queueRateLimitIsZero, err = parseRPS("queue-rps-limit") + if err != nil { return err } + + // Validate reason is provided + if c.QueueRpsLimitReason == "" { + return fmt.Errorf("--queue-rps-limit-reason is required when setting or unsetting queue rate limit") + } + + // Warn about zero rate limit (stops all traffic) + if queueRateLimitIsZero { + cctx.Printer.Println("WARNING: Setting queue rate limit to 0 will STOP ALL TRAFFIC on this task queue.") + cctx.Printer.Println(" This will prevent any tasks from being dispatched until the limit is changed.") + } } else if c.Command.Flags().Changed("queue-rps-limit-reason") { - return fmt.Errorf("queue-rps-limit-reason can only be set if queue-rps-limit is updated") + return fmt.Errorf("--queue-rps-limit-reason can only be set if --queue-rps-limit is specified") } + // Parse and validate fairness key rate limit default var fairnessKeyRpsLimitDefaultParsed *taskqueue.RateLimit + var fairnessRateLimitIsZero bool if c.Command.Flags().Changed("fairness-key-rps-limit-default") { var err error - if fairnessKeyRpsLimitDefaultParsed, err = parseRPS("fairness-key-rps-limit-default"); err != nil { + fairnessKeyRpsLimitDefaultParsed, fairnessRateLimitIsZero, err = parseRPS("fairness-key-rps-limit-default") + if err != nil { return err } - } else if c.Command.Flags().Changed("fairness-key-rps-limit-default-reason") { - return fmt.Errorf("fairness-key-rps-limit-default-reason can only be set if fairness-key-rps-limit-default is updated") + + // Validate reason is provided + if c.FairnessKeyRpsLimitReason == "" { + return fmt.Errorf("--fairness-key-rps-limit-reason is required when setting or unsetting fairness key rate limit") + } + + // Warn about zero rate limit + if fairnessRateLimitIsZero { + cctx.Printer.Println("WARNING: Setting fairness key rate limit default to 0 will STOP ALL TRAFFIC for fairness keys.") + cctx.Printer.Println(" This will prevent tasks with fairness keys from being dispatched until the limit is changed.") + } + } else if c.Command.Flags().Changed("fairness-key-rps-limit-reason") { + return fmt.Errorf("--fairness-key-rps-limit-reason can only be set if --fairness-key-rps-limit-default is specified") } cl, err := dialClient(cctx, &c.Parent.Parent.ClientOptions) @@ -176,25 +271,50 @@ func (c *TemporalTaskQueueConfigSetCommand) run(cctx *CommandContext, args []str } } + // Validate at least one configuration change is requested + hasAnyUpdate := c.Command.Flags().Changed("queue-rps-limit") || + c.Command.Flags().Changed("fairness-key-rps-limit-default") || + len(c.FairnessKeyWeightSet) > 0 || + len(c.FairnessKeyWeightUnset) > 0 || + c.FairnessKeyWeightUnsetAll + + if !hasAnyUpdate { + return fmt.Errorf("at least one configuration update must be specified (use --help to see available options)") + } + // Handle fairness weight overrides - // Validate mutual exclusivity + // Validate mutual exclusivity of unset-all with other weight operations if c.FairnessKeyWeightUnsetAll { if len(c.FairnessKeyWeightSet) > 0 || len(c.FairnessKeyWeightUnset) > 0 { return fmt.Errorf("--fairness-key-weight-unset-all cannot be used with --fairness-key-weight-set or --fairness-key-weight-unset") } } - // Handle set operations + // Parse and validate set operations + var setWeights map[string]float32 if len(c.FairnessKeyWeightSet) > 0 { - weights, err := parseFairnessKeyWeights(c.FairnessKeyWeightSet) + var err error + setWeights, err = parseFairnessKeyWeights(c.FairnessKeyWeightSet) if err != nil { return err } - request.SetFairnessWeightOverrides = weights + request.SetFairnessWeightOverrides = setWeights } - // Handle unset operations + // Validate unset operations if len(c.FairnessKeyWeightUnset) > 0 { + if err := validateFairnessKeyNames(c.FairnessKeyWeightUnset); err != nil { + return fmt.Errorf("invalid fairness key in unset list: %w", err) + } + + // Check for conflicts between set and unset + if setWeights != nil { + conflicts := findConflictingKeys(setWeights, c.FairnessKeyWeightUnset) + if len(conflicts) > 0 { + return fmt.Errorf("fairness keys appear in both set and unset operations: %v", conflicts) + } + } + request.UnsetFairnessWeightOverrides = c.FairnessKeyWeightUnset } @@ -213,19 +333,24 @@ func (c *TemporalTaskQueueConfigSetCommand) run(cctx *CommandContext, args []str if err != nil { return fmt.Errorf("error fetching current config for unset-all: %w", err) } - if descResp.Config != nil && descResp.Config.FairnessWeightOverrides != nil { + if descResp.Config != nil && descResp.Config.FairnessWeightOverrides != nil && len(descResp.Config.FairnessWeightOverrides) > 0 { keys := make([]string, 0, len(descResp.Config.FairnessWeightOverrides)) for key := range descResp.Config.FairnessWeightOverrides { keys = append(keys, key) } request.UnsetFairnessWeightOverrides = keys + cctx.Printer.Println(fmt.Sprintf("Unsetting %d fairness weight override(s)", len(keys))) + } else { + cctx.Printer.Println("No fairness weight overrides found to unset") + // Don't return error, just proceed with no-op update } } // Call the API resp, err := cl.WorkflowService().UpdateTaskQueueConfig(cctx, request) if err != nil { - return fmt.Errorf("error updating task queue config: %w", err) + // Provide more context in error message + return fmt.Errorf("failed to update task queue config for %s/%s: %w", namespace, taskQueue, err) } cctx.Printer.Println("Successfully updated task queue configuration") diff --git a/internal/temporalcli/commands.taskqueue_config_test.go b/internal/temporalcli/commands.taskqueue_config_test.go index 046875f3a..06ea2b6ed 100644 --- a/internal/temporalcli/commands.taskqueue_config_test.go +++ b/internal/temporalcli/commands.taskqueue_config_test.go @@ -327,6 +327,147 @@ namespace = "%s" "CLI flag should override envconfig and query default namespace") } +func (s *SharedServerSuite) TestTaskQueue_Config_FairnessWeightOverrides_DuplicateKeys() { + taskQueue := "test-config-queue-" + s.T().Name() + + // Try to set duplicate keys - should fail + res := s.Execute( + "task-queue", "config", "set", + "--address", s.Address(), + "--task-queue", taskQueue, + "--task-queue-type", "activity", + "--fairness-key-weight-set", "HighPriority=2.0", + "--fairness-key-weight-set", "HighPriority=3.0", + ) + s.Error(res.Err) + s.Contains(res.Stderr.String(), "duplicate fairness key") +} + +func (s *SharedServerSuite) TestTaskQueue_Config_FairnessWeightOverrides_ConflictingSetUnset() { + taskQueue := "test-config-queue-" + s.T().Name() + + // Try to set and unset same key - should fail + res := s.Execute( + "task-queue", "config", "set", + "--address", s.Address(), + "--task-queue", taskQueue, + "--task-queue-type", "activity", + "--fairness-key-weight-set", "Priority=2.0", + "--fairness-key-weight-unset", "Priority", + ) + s.Error(res.Err) + s.Contains(res.Stderr.String(), "both set and unset") +} + +func (s *SharedServerSuite) TestTaskQueue_Config_FairnessWeightOverrides_InvalidWeight() { + taskQueue := "test-config-queue-" + s.T().Name() + + // Try to set weight below minimum - should fail + res := s.Execute( + "task-queue", "config", "set", + "--address", s.Address(), + "--task-queue", taskQueue, + "--task-queue-type", "activity", + "--fairness-key-weight-set", "Priority=0.0005", + ) + s.Error(res.Err) + s.Contains(res.Stderr.String(), "below minimum") + + // Try negative weight - should fail parsing + res = s.Execute( + "task-queue", "config", "set", + "--address", s.Address(), + "--task-queue", taskQueue, + "--task-queue-type", "activity", + "--fairness-key-weight-set", "Priority=-1.0", + ) + s.Error(res.Err) + s.Contains(res.Stderr.String(), "below minimum") +} + +func (s *SharedServerSuite) TestTaskQueue_Config_FairnessWeightOverrides_InvalidFormat() { + taskQueue := "test-config-queue-" + s.T().Name() + + // Missing equals sign + res := s.Execute( + "task-queue", "config", "set", + "--address", s.Address(), + "--task-queue", taskQueue, + "--task-queue-type", "activity", + "--fairness-key-weight-set", "Priority", + ) + s.Error(res.Err) + s.Contains(res.Stderr.String(), "expected key=weight") + + // Empty key + res = s.Execute( + "task-queue", "config", "set", + "--address", s.Address(), + "--task-queue", taskQueue, + "--task-queue-type", "activity", + "--fairness-key-weight-set", "=1.0", + ) + s.Error(res.Err) + s.Contains(res.Stderr.String(), "empty key") + + // Empty weight + res = s.Execute( + "task-queue", "config", "set", + "--address", s.Address(), + "--task-queue", taskQueue, + "--task-queue-type", "activity", + "--fairness-key-weight-set", "Priority=", + ) + s.Error(res.Err) + s.Contains(res.Stderr.String(), "empty weight") +} + +func (s *SharedServerSuite) TestTaskQueue_Config_RequireAtLeastOneUpdate() { + taskQueue := "test-config-queue-" + s.T().Name() + + // No updates specified - should fail + res := s.Execute( + "task-queue", "config", "set", + "--address", s.Address(), + "--task-queue", taskQueue, + "--task-queue-type", "activity", + ) + s.Error(res.Err) + s.Contains(res.Stderr.String(), "at least one configuration update") +} + +func (s *SharedServerSuite) TestTaskQueue_Config_RateLimitReasonRequired() { + taskQueue := "test-config-queue-" + s.T().Name() + + // Set rate limit without reason - should fail + res := s.Execute( + "task-queue", "config", "set", + "--address", s.Address(), + "--task-queue", taskQueue, + "--task-queue-type", "activity", + "--queue-rps-limit", "10.0", + ) + s.Error(res.Err) + s.Contains(res.Stderr.String(), "reason is required") +} + +func (s *SharedServerSuite) TestTaskQueue_Config_ZeroRateLimitWarning() { + taskQueue := "test-config-queue-" + s.T().Name() + + // Set rate limit to zero - should succeed but warn + res := s.Execute( + "task-queue", "config", "set", + "--address", s.Address(), + "--task-queue", taskQueue, + "--task-queue-type", "activity", + "--queue-rps-limit", "0", + "--queue-rps-limit-reason", "emergency stop", + ) + s.NoError(res.Err) + s.Contains(res.Stdout.String(), "WARNING") + s.Contains(res.Stdout.String(), "STOP ALL TRAFFIC") +} + func (s *SharedServerSuite) TestTaskQueue_Config_FairnessWeightOverrides() { taskQueue := "test-config-queue-" + s.T().Name() diff --git a/internal/temporalcli/commands.yaml b/internal/temporalcli/commands.yaml index e644d2cba..f330b17c9 100644 --- a/internal/temporalcli/commands.yaml +++ b/internal/temporalcli/commands.yaml @@ -4603,7 +4603,7 @@ option-sets: - name: fairness-weight type: float description: | - Weight [0.001-1000] for this fairness key. + Weight (minimum 0.001) for this fairness key. Keys are dispatched proportionally to their weights. - name: workflow-start From b9ca823da7ac1645c966e1b93cadd52aabde55bf Mon Sep 17 00:00:00 2001 From: Jacob Moody Date: Mon, 20 Apr 2026 11:47:06 -0500 Subject: [PATCH 3/8] another pass of asking claude to cleanup --- .../commands.taskqueue_config_test.go | 210 +++++------------- 1 file changed, 50 insertions(+), 160 deletions(-) diff --git a/internal/temporalcli/commands.taskqueue_config_test.go b/internal/temporalcli/commands.taskqueue_config_test.go index 06ea2b6ed..0d012e863 100644 --- a/internal/temporalcli/commands.taskqueue_config_test.go +++ b/internal/temporalcli/commands.taskqueue_config_test.go @@ -93,8 +93,8 @@ func (s *SharedServerSuite) TestTaskQueue_Config_Set_And_Get_Both_Limits() { func (s *SharedServerSuite) TestTaskQueue_Config_Unset_Rate_Limits() { taskQueue := "test-config-queue-" + s.T().Name() testIdentity := "test-identity-" + s.T().Name() - var config taskQueueConfigType - // Set initial configuration + + // Set both rate limits res := s.Execute( "task-queue", "config", "set", "--address", s.Address(), @@ -102,39 +102,24 @@ func (s *SharedServerSuite) TestTaskQueue_Config_Unset_Rate_Limits() { "--task-queue-type", "activity", "--identity", testIdentity, "--queue-rps-limit", "10.0", + "--queue-rps-limit-reason", "test", "--fairness-key-rps-limit-default", "5.0", + "--fairness-key-rps-limit-reason", "test", ) s.NoError(res.Err) - res = s.Execute( - "task-queue", "config", "get", - "--address", s.Address(), - "--task-queue", taskQueue, - "--task-queue-type", "activity", - "-o", "json", - ) - s.NoError(res.Err) - - s.NoError(json.Unmarshal(res.Stdout.Bytes(), &config)) - s.NotNil(config.QueueRateLimit) - s.NotNil(config.QueueRateLimit.RateLimit) - s.Equal(float32(10.0), config.QueueRateLimit.RateLimit.RequestsPerSecond) - s.NotNil(config.FairnessKeysRateLimitDefault) - s.NotNil(config.FairnessKeysRateLimitDefault.RateLimit) - s.Equal(float32(5.0), config.FairnessKeysRateLimitDefault.RateLimit.RequestsPerSecond) - // Unset queue rate limit (set to default) res = s.Execute( "task-queue", "config", "set", "--address", s.Address(), "--task-queue", taskQueue, "--task-queue-type", "activity", - "--identity", testIdentity, "--queue-rps-limit", "default", + "--queue-rps-limit-reason", "unset", ) s.NoError(res.Err) - // Get configuration and verify queue rate limit is unset using JSON output + // Verify queue rate limit is unset but fairness limit remains res = s.Execute( "task-queue", "config", "get", "--address", s.Address(), @@ -144,12 +129,10 @@ func (s *SharedServerSuite) TestTaskQueue_Config_Unset_Rate_Limits() { ) s.NoError(res.Err) - var unsetQrlConfig taskQueueConfigType - s.NoError(json.Unmarshal(res.Stdout.Bytes(), &unsetQrlConfig)) - s.NotNil(unsetQrlConfig.QueueRateLimit) - s.Nil(unsetQrlConfig.QueueRateLimit.RateLimit) - s.NotNil(unsetQrlConfig.FairnessKeysRateLimitDefault) - s.Equal(float32(5.0), unsetQrlConfig.FairnessKeysRateLimitDefault.RateLimit.RequestsPerSecond) + var config taskQueueConfigType + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &config)) + s.Nil(config.QueueRateLimit.RateLimit) + s.Equal(float32(5.0), config.FairnessKeysRateLimitDefault.RateLimit.RequestsPerSecond) // Unset fairness key rate limit res = s.Execute( @@ -157,12 +140,12 @@ func (s *SharedServerSuite) TestTaskQueue_Config_Unset_Rate_Limits() { "--address", s.Address(), "--task-queue", taskQueue, "--task-queue-type", "activity", - "--identity", testIdentity, "--fairness-key-rps-limit-default", "default", + "--fairness-key-rps-limit-reason", "unset", ) s.NoError(res.Err) - // Get configuration and verify both are unset using JSON output + // Verify both are now unset res = s.Execute( "task-queue", "config", "get", "--address", s.Address(), @@ -172,10 +155,8 @@ func (s *SharedServerSuite) TestTaskQueue_Config_Unset_Rate_Limits() { ) s.NoError(res.Err) - var unsetFkrlConfig taskQueueConfigType - s.NoError(json.Unmarshal(res.Stdout.Bytes(), &unsetFkrlConfig)) - s.NotNil(unsetFkrlConfig.FairnessKeysRateLimitDefault) - s.Nil(unsetFkrlConfig.FairnessKeysRateLimitDefault.RateLimit) + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &config)) + s.Nil(config.FairnessKeysRateLimitDefault.RateLimit) } func (s *SharedServerSuite) TestTaskQueue_Config_Workflow_Task_Queue_Restrictions() { @@ -327,151 +308,94 @@ namespace = "%s" "CLI flag should override envconfig and query default namespace") } -func (s *SharedServerSuite) TestTaskQueue_Config_FairnessWeightOverrides_DuplicateKeys() { - taskQueue := "test-config-queue-" + s.T().Name() - - // Try to set duplicate keys - should fail - res := s.Execute( - "task-queue", "config", "set", - "--address", s.Address(), - "--task-queue", taskQueue, - "--task-queue-type", "activity", - "--fairness-key-weight-set", "HighPriority=2.0", - "--fairness-key-weight-set", "HighPriority=3.0", - ) - s.Error(res.Err) - s.Contains(res.Stderr.String(), "duplicate fairness key") -} - -func (s *SharedServerSuite) TestTaskQueue_Config_FairnessWeightOverrides_ConflictingSetUnset() { - taskQueue := "test-config-queue-" + s.T().Name() - - // Try to set and unset same key - should fail - res := s.Execute( - "task-queue", "config", "set", - "--address", s.Address(), - "--task-queue", taskQueue, - "--task-queue-type", "activity", - "--fairness-key-weight-set", "Priority=2.0", - "--fairness-key-weight-unset", "Priority", - ) - s.Error(res.Err) - s.Contains(res.Stderr.String(), "both set and unset") -} - -func (s *SharedServerSuite) TestTaskQueue_Config_FairnessWeightOverrides_InvalidWeight() { +func (s *SharedServerSuite) TestTaskQueue_Config_Validation() { taskQueue := "test-config-queue-" + s.T().Name() - // Try to set weight below minimum - should fail + // No updates specified - should fail res := s.Execute( "task-queue", "config", "set", "--address", s.Address(), "--task-queue", taskQueue, "--task-queue-type", "activity", - "--fairness-key-weight-set", "Priority=0.0005", ) s.Error(res.Err) - s.Contains(res.Stderr.String(), "below minimum") + s.Contains(res.Stderr.String(), "at least one configuration update") - // Try negative weight - should fail parsing + // Set rate limit without reason - should fail res = s.Execute( "task-queue", "config", "set", "--address", s.Address(), "--task-queue", taskQueue, "--task-queue-type", "activity", - "--fairness-key-weight-set", "Priority=-1.0", + "--queue-rps-limit", "10.0", ) s.Error(res.Err) - s.Contains(res.Stderr.String(), "below minimum") -} - -func (s *SharedServerSuite) TestTaskQueue_Config_FairnessWeightOverrides_InvalidFormat() { - taskQueue := "test-config-queue-" + s.T().Name() + s.Contains(res.Stderr.String(), "reason is required") - // Missing equals sign - res := s.Execute( + // Set rate limit to zero - should succeed but warn + res = s.Execute( "task-queue", "config", "set", "--address", s.Address(), "--task-queue", taskQueue, "--task-queue-type", "activity", - "--fairness-key-weight-set", "Priority", + "--queue-rps-limit", "0", + "--queue-rps-limit-reason", "emergency stop", ) - s.Error(res.Err) - s.Contains(res.Stderr.String(), "expected key=weight") + s.NoError(res.Err) + s.Contains(res.Stdout.String(), "WARNING") + s.Contains(res.Stdout.String(), "STOP ALL TRAFFIC") - // Empty key + // Duplicate fairness keys - should fail res = s.Execute( "task-queue", "config", "set", "--address", s.Address(), "--task-queue", taskQueue, "--task-queue-type", "activity", - "--fairness-key-weight-set", "=1.0", + "--fairness-key-weight-set", "Priority=2.0", + "--fairness-key-weight-set", "Priority=3.0", ) s.Error(res.Err) - s.Contains(res.Stderr.String(), "empty key") + s.Contains(res.Stderr.String(), "duplicate fairness key") - // Empty weight + // Set and unset same key - should fail res = s.Execute( "task-queue", "config", "set", "--address", s.Address(), "--task-queue", taskQueue, "--task-queue-type", "activity", - "--fairness-key-weight-set", "Priority=", + "--fairness-key-weight-set", "Priority=2.0", + "--fairness-key-weight-unset", "Priority", ) s.Error(res.Err) - s.Contains(res.Stderr.String(), "empty weight") -} - -func (s *SharedServerSuite) TestTaskQueue_Config_RequireAtLeastOneUpdate() { - taskQueue := "test-config-queue-" + s.T().Name() + s.Contains(res.Stderr.String(), "both set and unset") - // No updates specified - should fail - res := s.Execute( + // Weight below minimum - should fail + res = s.Execute( "task-queue", "config", "set", "--address", s.Address(), "--task-queue", taskQueue, "--task-queue-type", "activity", + "--fairness-key-weight-set", "Priority=0.0005", ) s.Error(res.Err) - s.Contains(res.Stderr.String(), "at least one configuration update") -} - -func (s *SharedServerSuite) TestTaskQueue_Config_RateLimitReasonRequired() { - taskQueue := "test-config-queue-" + s.T().Name() + s.Contains(res.Stderr.String(), "below minimum") - // Set rate limit without reason - should fail - res := s.Execute( + // Invalid format - should fail + res = s.Execute( "task-queue", "config", "set", "--address", s.Address(), "--task-queue", taskQueue, "--task-queue-type", "activity", - "--queue-rps-limit", "10.0", + "--fairness-key-weight-set", "Priority", ) s.Error(res.Err) - s.Contains(res.Stderr.String(), "reason is required") -} - -func (s *SharedServerSuite) TestTaskQueue_Config_ZeroRateLimitWarning() { - taskQueue := "test-config-queue-" + s.T().Name() - - // Set rate limit to zero - should succeed but warn - res := s.Execute( - "task-queue", "config", "set", - "--address", s.Address(), - "--task-queue", taskQueue, - "--task-queue-type", "activity", - "--queue-rps-limit", "0", - "--queue-rps-limit-reason", "emergency stop", - ) - s.NoError(res.Err) - s.Contains(res.Stdout.String(), "WARNING") - s.Contains(res.Stdout.String(), "STOP ALL TRAFFIC") + s.Contains(res.Stderr.String(), "expected key=weight") } func (s *SharedServerSuite) TestTaskQueue_Config_FairnessWeightOverrides() { taskQueue := "test-config-queue-" + s.T().Name() - // Set fairness weight overrides + // Set multiple fairness weight overrides res := s.Execute( "task-queue", "config", "set", "--address", s.Address(), @@ -481,9 +405,8 @@ func (s *SharedServerSuite) TestTaskQueue_Config_FairnessWeightOverrides() { "--fairness-key-weight-set", "LowPriority=0.5", ) s.NoError(res.Err) - s.Contains(res.Stdout.String(), "Successfully updated task queue configuration") - // Get the configuration and verify weights were set + // Verify weights were set res = s.Execute( "task-queue", "config", "get", "--address", s.Address(), @@ -495,7 +418,6 @@ func (s *SharedServerSuite) TestTaskQueue_Config_FairnessWeightOverrides() { var config taskQueueConfigType s.NoError(json.Unmarshal(res.Stdout.Bytes(), &config)) - s.NotNil(config.FairnessWeightOverrides) s.Equal(float32(2.0), config.FairnessWeightOverrides["HighPriority"]) s.Equal(float32(0.5), config.FairnessWeightOverrides["LowPriority"]) @@ -519,40 +441,9 @@ func (s *SharedServerSuite) TestTaskQueue_Config_FairnessWeightOverrides() { ) s.NoError(res.Err) - var config2 taskQueueConfigType - s.NoError(json.Unmarshal(res.Stdout.Bytes(), &config2)) - s.NotNil(config2.FairnessWeightOverrides) - s.Equal(float32(2.0), config2.FairnessWeightOverrides["HighPriority"]) - s.NotContains(config2.FairnessWeightOverrides, "LowPriority") - - // Add more weights - res = s.Execute( - "task-queue", "config", "set", - "--address", s.Address(), - "--task-queue", taskQueue, - "--task-queue-type", "activity", - "--fairness-key-weight-set", "MediumPriority=1.5", - "--fairness-key-weight-set", "LowPriority=0.3", - ) - s.NoError(res.Err) - - // Verify all three weights exist - res = s.Execute( - "task-queue", "config", "get", - "--address", s.Address(), - "--task-queue", taskQueue, - "--task-queue-type", "activity", - "-o", "json", - ) - s.NoError(res.Err) - - var config3 taskQueueConfigType - s.NoError(json.Unmarshal(res.Stdout.Bytes(), &config3)) - s.NotNil(config3.FairnessWeightOverrides) - s.Len(config3.FairnessWeightOverrides, 3) - s.Equal(float32(2.0), config3.FairnessWeightOverrides["HighPriority"]) - s.Equal(float32(1.5), config3.FairnessWeightOverrides["MediumPriority"]) - s.Equal(float32(0.3), config3.FairnessWeightOverrides["LowPriority"]) + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &config)) + s.Equal(float32(2.0), config.FairnessWeightOverrides["HighPriority"]) + s.NotContains(config.FairnessWeightOverrides, "LowPriority") // Unset all weights res = s.Execute( @@ -574,7 +465,6 @@ func (s *SharedServerSuite) TestTaskQueue_Config_FairnessWeightOverrides() { ) s.NoError(res.Err) - var config4 taskQueueConfigType - s.NoError(json.Unmarshal(res.Stdout.Bytes(), &config4)) - s.Empty(config4.FairnessWeightOverrides) + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &config)) + s.Empty(config.FairnessWeightOverrides) } From ec39533b35e51e6592703cb04ad0a296a3ada18c Mon Sep 17 00:00:00 2001 From: Jacob Moody Date: Tue, 21 Apr 2026 11:45:44 -0500 Subject: [PATCH 4/8] unit test fixes --- .../temporalcli/commands.taskqueue_config.go | 10 +++++-- .../commands.taskqueue_config_test.go | 29 ++++++++++--------- 2 files changed, 24 insertions(+), 15 deletions(-) diff --git a/internal/temporalcli/commands.taskqueue_config.go b/internal/temporalcli/commands.taskqueue_config.go index 9d90075cc..1196ee80d 100644 --- a/internal/temporalcli/commands.taskqueue_config.go +++ b/internal/temporalcli/commands.taskqueue_config.go @@ -307,15 +307,21 @@ func (c *TemporalTaskQueueConfigSetCommand) run(cctx *CommandContext, args []str return fmt.Errorf("invalid fairness key in unset list: %w", err) } + // Trim keys before passing to server + trimmedUnsetKeys := make([]string, len(c.FairnessKeyWeightUnset)) + for i, key := range c.FairnessKeyWeightUnset { + trimmedUnsetKeys[i] = strings.TrimSpace(key) + } + // Check for conflicts between set and unset if setWeights != nil { - conflicts := findConflictingKeys(setWeights, c.FairnessKeyWeightUnset) + conflicts := findConflictingKeys(setWeights, trimmedUnsetKeys) if len(conflicts) > 0 { return fmt.Errorf("fairness keys appear in both set and unset operations: %v", conflicts) } } - request.UnsetFairnessWeightOverrides = c.FairnessKeyWeightUnset + request.UnsetFairnessWeightOverrides = trimmedUnsetKeys } // Handle unset all diff --git a/internal/temporalcli/commands.taskqueue_config_test.go b/internal/temporalcli/commands.taskqueue_config_test.go index 0d012e863..0b8c2cd14 100644 --- a/internal/temporalcli/commands.taskqueue_config_test.go +++ b/internal/temporalcli/commands.taskqueue_config_test.go @@ -155,8 +155,9 @@ func (s *SharedServerSuite) TestTaskQueue_Config_Unset_Rate_Limits() { ) s.NoError(res.Err) - s.NoError(json.Unmarshal(res.Stdout.Bytes(), &config)) - s.Nil(config.FairnessKeysRateLimitDefault.RateLimit) + var config2 taskQueueConfigType + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &config2)) + s.Nil(config2.FairnessKeysRateLimitDefault.RateLimit) } func (s *SharedServerSuite) TestTaskQueue_Config_Workflow_Task_Queue_Restrictions() { @@ -319,7 +320,7 @@ func (s *SharedServerSuite) TestTaskQueue_Config_Validation() { "--task-queue-type", "activity", ) s.Error(res.Err) - s.Contains(res.Stderr.String(), "at least one configuration update") + s.Contains(res.Err.Error(), "at least one configuration update") // Set rate limit without reason - should fail res = s.Execute( @@ -330,7 +331,7 @@ func (s *SharedServerSuite) TestTaskQueue_Config_Validation() { "--queue-rps-limit", "10.0", ) s.Error(res.Err) - s.Contains(res.Stderr.String(), "reason is required") + s.Contains(res.Err.Error(), "reason is required") // Set rate limit to zero - should succeed but warn res = s.Execute( @@ -355,7 +356,7 @@ func (s *SharedServerSuite) TestTaskQueue_Config_Validation() { "--fairness-key-weight-set", "Priority=3.0", ) s.Error(res.Err) - s.Contains(res.Stderr.String(), "duplicate fairness key") + s.Contains(res.Err.Error(), "duplicate fairness key") // Set and unset same key - should fail res = s.Execute( @@ -367,7 +368,7 @@ func (s *SharedServerSuite) TestTaskQueue_Config_Validation() { "--fairness-key-weight-unset", "Priority", ) s.Error(res.Err) - s.Contains(res.Stderr.String(), "both set and unset") + s.Contains(res.Err.Error(), "both set and unset") // Weight below minimum - should fail res = s.Execute( @@ -378,7 +379,7 @@ func (s *SharedServerSuite) TestTaskQueue_Config_Validation() { "--fairness-key-weight-set", "Priority=0.0005", ) s.Error(res.Err) - s.Contains(res.Stderr.String(), "below minimum") + s.Contains(res.Err.Error(), "below minimum") // Invalid format - should fail res = s.Execute( @@ -389,7 +390,7 @@ func (s *SharedServerSuite) TestTaskQueue_Config_Validation() { "--fairness-key-weight-set", "Priority", ) s.Error(res.Err) - s.Contains(res.Stderr.String(), "expected key=weight") + s.Contains(res.Err.Error(), "expected key=weight") } func (s *SharedServerSuite) TestTaskQueue_Config_FairnessWeightOverrides() { @@ -441,9 +442,10 @@ func (s *SharedServerSuite) TestTaskQueue_Config_FairnessWeightOverrides() { ) s.NoError(res.Err) - s.NoError(json.Unmarshal(res.Stdout.Bytes(), &config)) - s.Equal(float32(2.0), config.FairnessWeightOverrides["HighPriority"]) - s.NotContains(config.FairnessWeightOverrides, "LowPriority") + var config2 taskQueueConfigType + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &config2)) + s.Equal(float32(2.0), config2.FairnessWeightOverrides["HighPriority"]) + s.NotContains(config2.FairnessWeightOverrides, "LowPriority") // Unset all weights res = s.Execute( @@ -465,6 +467,7 @@ func (s *SharedServerSuite) TestTaskQueue_Config_FairnessWeightOverrides() { ) s.NoError(res.Err) - s.NoError(json.Unmarshal(res.Stdout.Bytes(), &config)) - s.Empty(config.FairnessWeightOverrides) + var config3 taskQueueConfigType + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &config3)) + s.Empty(config3.FairnessWeightOverrides) } From bf19cea8fac94b51a4f7142057f4c1fd8725ca6c Mon Sep 17 00:00:00 2001 From: Jacob Moody Date: Tue, 21 Apr 2026 13:15:59 -0500 Subject: [PATCH 5/8] Address comments --- internal/temporalcli/commands.gen.go | 4 +- .../temporalcli/commands.taskqueue_config.go | 105 +++++++++--------- .../commands.taskqueue_config_test.go | 22 ++-- internal/temporalcli/commands.yaml | 4 +- 4 files changed, 65 insertions(+), 70 deletions(-) diff --git a/internal/temporalcli/commands.gen.go b/internal/temporalcli/commands.gen.go index 794d06d0a..8dfd8b3d9 100644 --- a/internal/temporalcli/commands.gen.go +++ b/internal/temporalcli/commands.gen.go @@ -233,7 +233,7 @@ func (v *SharedWorkflowStartOptions) BuildFlags(f *pflag.FlagSet) { f.StringVar(&v.StaticDetails, "static-details", "", "Static Workflow details for human consumption in UIs. Uses Temporal Markdown formatting, may be multiple lines. EXPERIMENTAL.") f.IntVar(&v.PriorityKey, "priority-key", 0, "Priority key (1-5, lower numbers = higher priority). Tasks in a queue should be processed in close-to-priority-order. Default is 3 when not specified.") f.StringVar(&v.FairnessKey, "fairness-key", "", "Fairness key (max 64 bytes) for proportional task dispatch. Tasks with same key share capacity based on their weight.") - f.Float32Var(&v.FairnessWeight, "fairness-weight", 0, "Weight (minimum 0.001) for this fairness key. Keys are dispatched proportionally to their weights.") + f.Float32Var(&v.FairnessWeight, "fairness-weight", 0, "Weight [0.001-1000] for this fairness key. Keys are dispatched proportionally to their weights.") } type WorkflowStartOptions struct { @@ -2255,7 +2255,7 @@ func NewTemporalTaskQueueConfigSetCommand(cctx *CommandContext, parent *Temporal s.Command.Flags().StringVar(&s.FairnessKeyRpsLimitDefault, "fairness-key-rps-limit-default", "", "Fairness key rate limit default in requests per second. Accepts a float; or 'default' to unset.") overrideFlagDisplayType(s.Command.Flags().Lookup("fairness-key-rps-limit-default"), "float|default") s.Command.Flags().StringVar(&s.FairnessKeyRpsLimitReason, "fairness-key-rps-limit-reason", "", "Reason for fairness key rate limit update.") - s.Command.Flags().StringArrayVar(&s.FairnessKeyWeightSet, "fairness-key-weight-set", nil, "Set fairness key weight overrides in format key=weight. Can be specified multiple times. Example: --fairness-key-weight-set HighPriority=2.0 --fairness-key-weight-set LowPriority=0.5.") + s.Command.Flags().StringArrayVar(&s.FairnessKeyWeightSet, "fairness-key-weight-set", nil, "Set fairness key weight overrides in format key=weight (range: 0.001-1000). Can be specified multiple times. Example: --fairness-key-weight-set HighPriority=2.0 --fairness-key-weight-set LowPriority=0.5.") s.Command.Flags().StringArrayVar(&s.FairnessKeyWeightUnset, "fairness-key-weight-unset", nil, "Unset specific fairness key weight overrides. Can be specified multiple times. Example: --fairness-key-weight-unset HighPriority --fairness-key-weight-unset LowPriority.") s.Command.Flags().BoolVar(&s.FairnessKeyWeightUnsetAll, "fairness-key-weight-unset-all", false, "Unset all fairness key weight overrides. Cannot be used with --fairness-key-weight-set or --fairness-key-weight-unset.") s.Command.Run = func(c *cobra.Command, args []string) { diff --git a/internal/temporalcli/commands.taskqueue_config.go b/internal/temporalcli/commands.taskqueue_config.go index 1196ee80d..3af683a9a 100644 --- a/internal/temporalcli/commands.taskqueue_config.go +++ b/internal/temporalcli/commands.taskqueue_config.go @@ -63,11 +63,19 @@ const ( // minFairnessWeight matches server-side limit in temporal/service/matching/fairness_util.go // Server clamps weights to this minimum when applying them minFairnessWeight = 0.001 + + // maxFairnessWeight is the maximum weight value allowed + // Server clamps weights above this value down to this maximum + maxFairnessWeight = 1000.0 ) // parseFairnessKeyWeights parses "key=weight" format strings into a map -// Returns an error if there are duplicate keys, invalid weights, or malformed input +// Returns nil if inputs is empty, or an error if there are duplicate keys, invalid weights, or malformed input func parseFairnessKeyWeights(inputs []string) (map[string]float32, error) { + if len(inputs) == 0 { + return nil, nil + } + weights := make(map[string]float32) seen := make(map[string]bool) @@ -104,34 +112,46 @@ func parseFairnessKeyWeights(inputs []string) (map[string]float32, error) { } // Validate weight bounds - matches server-side validation - // Server only validates weight > 0 and clamps to minWeight at runtime + // Server clamps weights to [minWeight, maxWeight] range at runtime if weight < minFairnessWeight { return nil, fmt.Errorf("weight %.3f for key %q is below minimum %.3f", weight, key, minFairnessWeight) } + if weight > maxFairnessWeight { + return nil, fmt.Errorf("weight %.3f for key %q exceeds maximum %.3f", weight, key, maxFairnessWeight) + } weights[key] = float32(weight) } return weights, nil } -// validateFairnessKeyNames validates that key names are non-empty and within length limits -func validateFairnessKeyNames(keys []string) error { +// prepareUnsetKeys validates and trims fairness key names for unsetting +// Returns nil if keys is empty, trimmed keys if valid, or an error if validation fails +func prepareUnsetKeys(keys []string) ([]string, error) { + if len(keys) == 0 { + return nil, nil + } + + trimmed := make([]string, len(keys)) seen := make(map[string]bool) - for _, key := range keys { - trimmed := strings.TrimSpace(key) - if trimmed == "" { - return fmt.Errorf("empty fairness key name") + + for i, key := range keys { + trimmedKey := strings.TrimSpace(key) + if trimmedKey == "" { + return nil, fmt.Errorf("empty fairness key name") } - if seen[trimmed] { - return fmt.Errorf("duplicate fairness key %q specified multiple times", trimmed) + if seen[trimmedKey] { + return nil, fmt.Errorf("duplicate fairness key %q specified multiple times", trimmedKey) } - seen[trimmed] = true + seen[trimmedKey] = true - if len(trimmed) > maxFairnessKeyLength { - return fmt.Errorf("fairness key %q exceeds maximum length of %d bytes", trimmed, maxFairnessKeyLength) + if len(trimmedKey) > maxFairnessKeyLength { + return nil, fmt.Errorf("fairness key %q exceeds maximum length of %d bytes", trimmedKey, maxFairnessKeyLength) } + + trimmed[i] = trimmedKey } - return nil + return trimmed, nil } // findConflictingKeys returns keys that appear in both set and unset lists @@ -199,18 +219,11 @@ func (c *TemporalTaskQueueConfigSetCommand) run(cctx *CommandContext, args []str return err } - // Validate reason is provided - if c.QueueRpsLimitReason == "" { - return fmt.Errorf("--queue-rps-limit-reason is required when setting or unsetting queue rate limit") - } - // Warn about zero rate limit (stops all traffic) if queueRateLimitIsZero { cctx.Printer.Println("WARNING: Setting queue rate limit to 0 will STOP ALL TRAFFIC on this task queue.") cctx.Printer.Println(" This will prevent any tasks from being dispatched until the limit is changed.") } - } else if c.Command.Flags().Changed("queue-rps-limit-reason") { - return fmt.Errorf("--queue-rps-limit-reason can only be set if --queue-rps-limit is specified") } // Parse and validate fairness key rate limit default @@ -223,18 +236,11 @@ func (c *TemporalTaskQueueConfigSetCommand) run(cctx *CommandContext, args []str return err } - // Validate reason is provided - if c.FairnessKeyRpsLimitReason == "" { - return fmt.Errorf("--fairness-key-rps-limit-reason is required when setting or unsetting fairness key rate limit") - } - // Warn about zero rate limit if fairnessRateLimitIsZero { cctx.Printer.Println("WARNING: Setting fairness key rate limit default to 0 will STOP ALL TRAFFIC for fairness keys.") cctx.Printer.Println(" This will prevent tasks with fairness keys from being dispatched until the limit is changed.") } - } else if c.Command.Flags().Changed("fairness-key-rps-limit-reason") { - return fmt.Errorf("--fairness-key-rps-limit-reason can only be set if --fairness-key-rps-limit-default is specified") } cl, err := dialClient(cctx, &c.Parent.Parent.ClientOptions) @@ -291,39 +297,28 @@ func (c *TemporalTaskQueueConfigSetCommand) run(cctx *CommandContext, args []str } // Parse and validate set operations - var setWeights map[string]float32 - if len(c.FairnessKeyWeightSet) > 0 { - var err error - setWeights, err = parseFairnessKeyWeights(c.FairnessKeyWeightSet) - if err != nil { - return err - } - request.SetFairnessWeightOverrides = setWeights + setWeights, err := parseFairnessKeyWeights(c.FairnessKeyWeightSet) + if err != nil { + return err } + request.SetFairnessWeightOverrides = setWeights - // Validate unset operations - if len(c.FairnessKeyWeightUnset) > 0 { - if err := validateFairnessKeyNames(c.FairnessKeyWeightUnset); err != nil { - return fmt.Errorf("invalid fairness key in unset list: %w", err) - } - - // Trim keys before passing to server - trimmedUnsetKeys := make([]string, len(c.FairnessKeyWeightUnset)) - for i, key := range c.FairnessKeyWeightUnset { - trimmedUnsetKeys[i] = strings.TrimSpace(key) - } + // Validate and prepare unset operations + trimmedUnsetKeys, err := prepareUnsetKeys(c.FairnessKeyWeightUnset) + if err != nil { + return fmt.Errorf("invalid fairness key in unset list: %w", err) + } - // Check for conflicts between set and unset - if setWeights != nil { - conflicts := findConflictingKeys(setWeights, trimmedUnsetKeys) - if len(conflicts) > 0 { - return fmt.Errorf("fairness keys appear in both set and unset operations: %v", conflicts) - } + // Check for conflicts between set and unset + if setWeights != nil && trimmedUnsetKeys != nil { + conflicts := findConflictingKeys(setWeights, trimmedUnsetKeys) + if len(conflicts) > 0 { + return fmt.Errorf("fairness keys appear in both set and unset operations: %v", conflicts) } - - request.UnsetFairnessWeightOverrides = trimmedUnsetKeys } + request.UnsetFairnessWeightOverrides = trimmedUnsetKeys + // Handle unset all if c.FairnessKeyWeightUnsetAll { // Need to fetch current config to get all keys to unset diff --git a/internal/temporalcli/commands.taskqueue_config_test.go b/internal/temporalcli/commands.taskqueue_config_test.go index 0b8c2cd14..8653b207a 100644 --- a/internal/temporalcli/commands.taskqueue_config_test.go +++ b/internal/temporalcli/commands.taskqueue_config_test.go @@ -322,17 +322,6 @@ func (s *SharedServerSuite) TestTaskQueue_Config_Validation() { s.Error(res.Err) s.Contains(res.Err.Error(), "at least one configuration update") - // Set rate limit without reason - should fail - res = s.Execute( - "task-queue", "config", "set", - "--address", s.Address(), - "--task-queue", taskQueue, - "--task-queue-type", "activity", - "--queue-rps-limit", "10.0", - ) - s.Error(res.Err) - s.Contains(res.Err.Error(), "reason is required") - // Set rate limit to zero - should succeed but warn res = s.Execute( "task-queue", "config", "set", @@ -381,6 +370,17 @@ func (s *SharedServerSuite) TestTaskQueue_Config_Validation() { s.Error(res.Err) s.Contains(res.Err.Error(), "below minimum") + // Weight above maximum - should fail + res = s.Execute( + "task-queue", "config", "set", + "--address", s.Address(), + "--task-queue", taskQueue, + "--task-queue-type", "activity", + "--fairness-key-weight-set", "Priority=1001", + ) + s.Error(res.Err) + s.Contains(res.Err.Error(), "exceeds maximum") + // Invalid format - should fail res = s.Execute( "task-queue", "config", "set", diff --git a/internal/temporalcli/commands.yaml b/internal/temporalcli/commands.yaml index f330b17c9..fdbc01206 100644 --- a/internal/temporalcli/commands.yaml +++ b/internal/temporalcli/commands.yaml @@ -3287,7 +3287,7 @@ commands: - name: fairness-key-weight-set type: string[] description: | - Set fairness key weight overrides in format key=weight. + Set fairness key weight overrides in format key=weight (range: 0.001-1000). Can be specified multiple times. Example: --fairness-key-weight-set HighPriority=2.0 --fairness-key-weight-set LowPriority=0.5. - name: fairness-key-weight-unset type: string[] @@ -4603,7 +4603,7 @@ option-sets: - name: fairness-weight type: float description: | - Weight (minimum 0.001) for this fairness key. + Weight [0.001-1000] for this fairness key. Keys are dispatched proportionally to their weights. - name: workflow-start From d74ea182f8fc57cdb443dcb417809f8a00f70a0a Mon Sep 17 00:00:00 2001 From: Jacob Moody Date: Thu, 23 Apr 2026 10:32:38 -0500 Subject: [PATCH 6/8] address comments --- .../temporalcli/commands.taskqueue_config.go | 64 +++++++------------ 1 file changed, 24 insertions(+), 40 deletions(-) diff --git a/internal/temporalcli/commands.taskqueue_config.go b/internal/temporalcli/commands.taskqueue_config.go index 3af683a9a..e2eed7c75 100644 --- a/internal/temporalcli/commands.taskqueue_config.go +++ b/internal/temporalcli/commands.taskqueue_config.go @@ -9,6 +9,7 @@ import ( enums "go.temporal.io/api/enums/v1" "go.temporal.io/api/taskqueue/v1" "go.temporal.io/api/workflowservice/v1" + "golang.org/x/exp/maps" ) // TaskQueueConfigGetCommand handles getting task queue configuration @@ -61,12 +62,8 @@ const ( maxFairnessKeyLength = 64 // minFairnessWeight matches server-side limit in temporal/service/matching/fairness_util.go - // Server clamps weights to this minimum when applying them + // Client only enforces positive weight - server handles clamping to its configured range minFairnessWeight = 0.001 - - // maxFairnessWeight is the maximum weight value allowed - // Server clamps weights above this value down to this maximum - maxFairnessWeight = 1000.0 ) // parseFairnessKeyWeights parses "key=weight" format strings into a map @@ -85,7 +82,7 @@ func parseFairnessKeyWeights(inputs []string) (map[string]float32, error) { return nil, fmt.Errorf("invalid format: %q (expected key=weight)", input) } - key := strings.TrimSpace(parts[0]) + key := parts[0] if key == "" { return nil, fmt.Errorf("empty key in: %q", input) } @@ -101,7 +98,7 @@ func parseFairnessKeyWeights(inputs []string) (map[string]float32, error) { return nil, fmt.Errorf("fairness key %q exceeds maximum length of %d bytes", key, maxFairnessKeyLength) } - weightStr := strings.TrimSpace(parts[1]) + weightStr := parts[1] if weightStr == "" { return nil, fmt.Errorf("empty weight value for key %q", key) } @@ -111,52 +108,44 @@ func parseFairnessKeyWeights(inputs []string) (map[string]float32, error) { return nil, fmt.Errorf("invalid weight %q for key %q: must be a number", weightStr, key) } - // Validate weight bounds - matches server-side validation - // Server clamps weights to [minWeight, maxWeight] range at runtime + // Validate weight is positive - server handles clamping to its configured range if weight < minFairnessWeight { return nil, fmt.Errorf("weight %.3f for key %q is below minimum %.3f", weight, key, minFairnessWeight) } - if weight > maxFairnessWeight { - return nil, fmt.Errorf("weight %.3f for key %q exceeds maximum %.3f", weight, key, maxFairnessWeight) - } weights[key] = float32(weight) } return weights, nil } -// prepareUnsetKeys validates and trims fairness key names for unsetting -// Returns nil if keys is empty, trimmed keys if valid, or an error if validation fails +// prepareUnsetKeys validates fairness key names for unsetting +// Returns nil if keys is empty, or an error if validation fails func prepareUnsetKeys(keys []string) ([]string, error) { if len(keys) == 0 { return nil, nil } - trimmed := make([]string, len(keys)) seen := make(map[string]bool) - for i, key := range keys { - trimmedKey := strings.TrimSpace(key) - if trimmedKey == "" { + for _, key := range keys { + if key == "" { return nil, fmt.Errorf("empty fairness key name") } - if seen[trimmedKey] { - return nil, fmt.Errorf("duplicate fairness key %q specified multiple times", trimmedKey) + if seen[key] { + return nil, fmt.Errorf("duplicate fairness key %q specified multiple times", key) } - seen[trimmedKey] = true + seen[key] = true - if len(trimmedKey) > maxFairnessKeyLength { - return nil, fmt.Errorf("fairness key %q exceeds maximum length of %d bytes", trimmedKey, maxFairnessKeyLength) + if len(key) > maxFairnessKeyLength { + return nil, fmt.Errorf("fairness key %q exceeds maximum length of %d bytes", key, maxFairnessKeyLength) } - - trimmed[i] = trimmedKey } - return trimmed, nil + return keys, nil } // findConflictingKeys returns keys that appear in both set and unset lists func findConflictingKeys(setWeights map[string]float32, unsetKeys []string) []string { - conflicts := []string{} + var conflicts []string for _, key := range unsetKeys { if _, exists := setWeights[key]; exists { conflicts = append(conflicts, key) @@ -238,8 +227,8 @@ func (c *TemporalTaskQueueConfigSetCommand) run(cctx *CommandContext, args []str // Warn about zero rate limit if fairnessRateLimitIsZero { - cctx.Printer.Println("WARNING: Setting fairness key rate limit default to 0 will STOP ALL TRAFFIC for fairness keys.") - cctx.Printer.Println(" This will prevent tasks with fairness keys from being dispatched until the limit is changed.") + cctx.Printer.Println("WARNING: Setting fairness key rate limit default to 0 will STOP ALL TRAFFIC on this task queue.") + cctx.Printer.Println(" This will prevent any tasks from being dispatched until the limit is changed.") } } @@ -304,20 +293,18 @@ func (c *TemporalTaskQueueConfigSetCommand) run(cctx *CommandContext, args []str request.SetFairnessWeightOverrides = setWeights // Validate and prepare unset operations - trimmedUnsetKeys, err := prepareUnsetKeys(c.FairnessKeyWeightUnset) + unsetKeys, err := prepareUnsetKeys(c.FairnessKeyWeightUnset) if err != nil { return fmt.Errorf("invalid fairness key in unset list: %w", err) } // Check for conflicts between set and unset - if setWeights != nil && trimmedUnsetKeys != nil { - conflicts := findConflictingKeys(setWeights, trimmedUnsetKeys) - if len(conflicts) > 0 { - return fmt.Errorf("fairness keys appear in both set and unset operations: %v", conflicts) - } + conflicts := findConflictingKeys(setWeights, unsetKeys) + if len(conflicts) > 0 { + return fmt.Errorf("fairness keys appear in both set and unset operations: %v", conflicts) } - request.UnsetFairnessWeightOverrides = trimmedUnsetKeys + request.UnsetFairnessWeightOverrides = unsetKeys // Handle unset all if c.FairnessKeyWeightUnsetAll { @@ -335,10 +322,7 @@ func (c *TemporalTaskQueueConfigSetCommand) run(cctx *CommandContext, args []str return fmt.Errorf("error fetching current config for unset-all: %w", err) } if descResp.Config != nil && descResp.Config.FairnessWeightOverrides != nil && len(descResp.Config.FairnessWeightOverrides) > 0 { - keys := make([]string, 0, len(descResp.Config.FairnessWeightOverrides)) - for key := range descResp.Config.FairnessWeightOverrides { - keys = append(keys, key) - } + keys := maps.Keys(descResp.Config.FairnessWeightOverrides) request.UnsetFairnessWeightOverrides = keys cctx.Printer.Println(fmt.Sprintf("Unsetting %d fairness weight override(s)", len(keys))) } else { From 620f295cb7de74c2b252220f29803b6374971460 Mon Sep 17 00:00:00 2001 From: Jacob Moody Date: Thu, 23 Apr 2026 10:48:34 -0500 Subject: [PATCH 7/8] use Alex's suggested interface instead --- internal/temporalcli/commands.gen.go | 10 +- .../temporalcli/commands.taskqueue_config.go | 124 +++++++----------- .../commands.taskqueue_config_test.go | 37 ++---- internal/temporalcli/commands.yaml | 16 +-- 4 files changed, 73 insertions(+), 114 deletions(-) diff --git a/internal/temporalcli/commands.gen.go b/internal/temporalcli/commands.gen.go index 8dfd8b3d9..65837fda4 100644 --- a/internal/temporalcli/commands.gen.go +++ b/internal/temporalcli/commands.gen.go @@ -2227,9 +2227,8 @@ type TemporalTaskQueueConfigSetCommand struct { QueueRpsLimitReason string FairnessKeyRpsLimitDefault string FairnessKeyRpsLimitReason string - FairnessKeyWeightSet []string - FairnessKeyWeightUnset []string - FairnessKeyWeightUnsetAll bool + FairnessKeyWeight []string + FairnessKeyWeightClearAll bool } func NewTemporalTaskQueueConfigSetCommand(cctx *CommandContext, parent *TemporalTaskQueueConfigCommand) *TemporalTaskQueueConfigSetCommand { @@ -2255,9 +2254,8 @@ func NewTemporalTaskQueueConfigSetCommand(cctx *CommandContext, parent *Temporal s.Command.Flags().StringVar(&s.FairnessKeyRpsLimitDefault, "fairness-key-rps-limit-default", "", "Fairness key rate limit default in requests per second. Accepts a float; or 'default' to unset.") overrideFlagDisplayType(s.Command.Flags().Lookup("fairness-key-rps-limit-default"), "float|default") s.Command.Flags().StringVar(&s.FairnessKeyRpsLimitReason, "fairness-key-rps-limit-reason", "", "Reason for fairness key rate limit update.") - s.Command.Flags().StringArrayVar(&s.FairnessKeyWeightSet, "fairness-key-weight-set", nil, "Set fairness key weight overrides in format key=weight (range: 0.001-1000). Can be specified multiple times. Example: --fairness-key-weight-set HighPriority=2.0 --fairness-key-weight-set LowPriority=0.5.") - s.Command.Flags().StringArrayVar(&s.FairnessKeyWeightUnset, "fairness-key-weight-unset", nil, "Unset specific fairness key weight overrides. Can be specified multiple times. Example: --fairness-key-weight-unset HighPriority --fairness-key-weight-unset LowPriority.") - s.Command.Flags().BoolVar(&s.FairnessKeyWeightUnsetAll, "fairness-key-weight-unset-all", false, "Unset all fairness key weight overrides. Cannot be used with --fairness-key-weight-set or --fairness-key-weight-unset.") + s.Command.Flags().StringArrayVar(&s.FairnessKeyWeight, "fairness-key-weight", nil, "Set or unset fairness key weight overrides in format key=weight or key=default. Use key=weight to set a weight (range: 0.001-1000); use key=default to unset. Can be specified multiple times. Example: --fairness-key-weight HighPriority=2.0 --fairness-key-weight LowPriority=default.") + s.Command.Flags().BoolVar(&s.FairnessKeyWeightClearAll, "fairness-key-weight-clear-all", false, "Unset all fairness key weight overrides. Cannot be used with --fairness-key-weight.") s.Command.Run = func(c *cobra.Command, args []string) { if err := s.run(cctx, args); err != nil { cctx.Options.Fail(err) diff --git a/internal/temporalcli/commands.taskqueue_config.go b/internal/temporalcli/commands.taskqueue_config.go index e2eed7c75..803db5044 100644 --- a/internal/temporalcli/commands.taskqueue_config.go +++ b/internal/temporalcli/commands.taskqueue_config.go @@ -66,92 +66,82 @@ const ( minFairnessWeight = 0.001 ) -// parseFairnessKeyWeights parses "key=weight" format strings into a map -// Returns nil if inputs is empty, or an error if there are duplicate keys, invalid weights, or malformed input -func parseFairnessKeyWeights(inputs []string) (map[string]float32, error) { +// parseFairnessKeyWeights parses "key=weight" or "key=default" format strings +// Returns separate maps for set and unset operations, or an error if there are duplicate keys, invalid weights, or malformed input +// If inputs is empty, returns nil for both maps +func parseFairnessKeyWeights(inputs []string) (setWeights map[string]float32, unsetKeys []string, err error) { if len(inputs) == 0 { - return nil, nil + return nil, nil, nil } - weights := make(map[string]float32) + setWeights = make(map[string]float32) + var unsetKeysMap = make(map[string]bool) // Track unset keys in a map to check for duplicates seen := make(map[string]bool) for _, input := range inputs { parts := strings.SplitN(input, "=", 2) if len(parts) != 2 { - return nil, fmt.Errorf("invalid format: %q (expected key=weight)", input) + return nil, nil, fmt.Errorf("invalid format: %q (expected key=weight or key=default)", input) } key := parts[0] if key == "" { - return nil, fmt.Errorf("empty key in: %q", input) + return nil, nil, fmt.Errorf("empty key in: %q", input) } - // Check for duplicate keys + // Check for duplicate keys across both set and unset if seen[key] { - return nil, fmt.Errorf("duplicate fairness key %q specified multiple times", key) + return nil, nil, fmt.Errorf("duplicate fairness key %q specified multiple times", key) } seen[key] = true // Validate key length (server enforces 64 byte limit) if len(key) > maxFairnessKeyLength { - return nil, fmt.Errorf("fairness key %q exceeds maximum length of %d bytes", key, maxFairnessKeyLength) + return nil, nil, fmt.Errorf("fairness key %q exceeds maximum length of %d bytes", key, maxFairnessKeyLength) } - weightStr := parts[1] - if weightStr == "" { - return nil, fmt.Errorf("empty weight value for key %q", key) + valueStr := parts[1] + if valueStr == "" { + return nil, nil, fmt.Errorf("empty value for key %q", key) } - weight, err := strconv.ParseFloat(weightStr, 32) + // Check if this is an unset operation (value is "default") + if strings.EqualFold(valueStr, "default") { + unsetKeysMap[key] = true + continue + } + + // Parse as weight + weight, err := strconv.ParseFloat(valueStr, 32) if err != nil { - return nil, fmt.Errorf("invalid weight %q for key %q: must be a number", weightStr, key) + return nil, nil, fmt.Errorf("invalid weight %q for key %q: must be a number or 'default'", valueStr, key) } // Validate weight is positive - server handles clamping to its configured range if weight < minFairnessWeight { - return nil, fmt.Errorf("weight %.3f for key %q is below minimum %.3f", weight, key, minFairnessWeight) + return nil, nil, fmt.Errorf("weight %.3f for key %q is below minimum %.3f", weight, key, minFairnessWeight) } - weights[key] = float32(weight) - } - return weights, nil -} - -// prepareUnsetKeys validates fairness key names for unsetting -// Returns nil if keys is empty, or an error if validation fails -func prepareUnsetKeys(keys []string) ([]string, error) { - if len(keys) == 0 { - return nil, nil + setWeights[key] = float32(weight) } - seen := make(map[string]bool) - - for _, key := range keys { - if key == "" { - return nil, fmt.Errorf("empty fairness key name") - } - if seen[key] { - return nil, fmt.Errorf("duplicate fairness key %q specified multiple times", key) - } - seen[key] = true - - if len(key) > maxFairnessKeyLength { - return nil, fmt.Errorf("fairness key %q exceeds maximum length of %d bytes", key, maxFairnessKeyLength) + // Convert unset map to slice + if len(unsetKeysMap) > 0 { + unsetKeys = make([]string, 0, len(unsetKeysMap)) + for key := range unsetKeysMap { + unsetKeys = append(unsetKeys, key) } } - return keys, nil -} -// findConflictingKeys returns keys that appear in both set and unset lists -func findConflictingKeys(setWeights map[string]float32, unsetKeys []string) []string { - var conflicts []string - for _, key := range unsetKeys { - if _, exists := setWeights[key]; exists { - conflicts = append(conflicts, key) - } + // Return nil instead of empty maps/slices + if len(setWeights) == 0 { + setWeights = nil + } + if len(unsetKeys) == 0 { + unsetKeys = nil } - return conflicts + + return setWeights, unsetKeys, nil } // TaskQueueConfigSetCommand handles setting task queue configuration @@ -269,45 +259,31 @@ func (c *TemporalTaskQueueConfigSetCommand) run(cctx *CommandContext, args []str // Validate at least one configuration change is requested hasAnyUpdate := c.Command.Flags().Changed("queue-rps-limit") || c.Command.Flags().Changed("fairness-key-rps-limit-default") || - len(c.FairnessKeyWeightSet) > 0 || - len(c.FairnessKeyWeightUnset) > 0 || - c.FairnessKeyWeightUnsetAll + len(c.FairnessKeyWeight) > 0 || + c.FairnessKeyWeightClearAll if !hasAnyUpdate { return fmt.Errorf("at least one configuration update must be specified (use --help to see available options)") } // Handle fairness weight overrides - // Validate mutual exclusivity of unset-all with other weight operations - if c.FairnessKeyWeightUnsetAll { - if len(c.FairnessKeyWeightSet) > 0 || len(c.FairnessKeyWeightUnset) > 0 { - return fmt.Errorf("--fairness-key-weight-unset-all cannot be used with --fairness-key-weight-set or --fairness-key-weight-unset") + // Validate mutual exclusivity of clear-all with other weight operations + if c.FairnessKeyWeightClearAll { + if len(c.FairnessKeyWeight) > 0 { + return fmt.Errorf("--fairness-key-weight-clear-all cannot be used with --fairness-key-weight") } } - // Parse and validate set operations - setWeights, err := parseFairnessKeyWeights(c.FairnessKeyWeightSet) + // Parse fairness key weights (handles both set and unset operations) + setWeights, unsetKeys, err := parseFairnessKeyWeights(c.FairnessKeyWeight) if err != nil { return err } request.SetFairnessWeightOverrides = setWeights - - // Validate and prepare unset operations - unsetKeys, err := prepareUnsetKeys(c.FairnessKeyWeightUnset) - if err != nil { - return fmt.Errorf("invalid fairness key in unset list: %w", err) - } - - // Check for conflicts between set and unset - conflicts := findConflictingKeys(setWeights, unsetKeys) - if len(conflicts) > 0 { - return fmt.Errorf("fairness keys appear in both set and unset operations: %v", conflicts) - } - request.UnsetFairnessWeightOverrides = unsetKeys - // Handle unset all - if c.FairnessKeyWeightUnsetAll { + // Handle clear all + if c.FairnessKeyWeightClearAll { // Need to fetch current config to get all keys to unset descResp, err := cl.WorkflowService().DescribeTaskQueue(cctx, &workflowservice.DescribeTaskQueueRequest{ Namespace: namespace, @@ -319,7 +295,7 @@ func (c *TemporalTaskQueueConfigSetCommand) run(cctx *CommandContext, args []str ReportConfig: true, }) if err != nil { - return fmt.Errorf("error fetching current config for unset-all: %w", err) + return fmt.Errorf("error fetching current config for clear-all: %w", err) } if descResp.Config != nil && descResp.Config.FairnessWeightOverrides != nil && len(descResp.Config.FairnessWeightOverrides) > 0 { keys := maps.Keys(descResp.Config.FairnessWeightOverrides) diff --git a/internal/temporalcli/commands.taskqueue_config_test.go b/internal/temporalcli/commands.taskqueue_config_test.go index 8653b207a..73e5db97c 100644 --- a/internal/temporalcli/commands.taskqueue_config_test.go +++ b/internal/temporalcli/commands.taskqueue_config_test.go @@ -341,8 +341,8 @@ func (s *SharedServerSuite) TestTaskQueue_Config_Validation() { "--address", s.Address(), "--task-queue", taskQueue, "--task-queue-type", "activity", - "--fairness-key-weight-set", "Priority=2.0", - "--fairness-key-weight-set", "Priority=3.0", + "--fairness-key-weight", "Priority=2.0", + "--fairness-key-weight", "Priority=3.0", ) s.Error(res.Err) s.Contains(res.Err.Error(), "duplicate fairness key") @@ -353,11 +353,11 @@ func (s *SharedServerSuite) TestTaskQueue_Config_Validation() { "--address", s.Address(), "--task-queue", taskQueue, "--task-queue-type", "activity", - "--fairness-key-weight-set", "Priority=2.0", - "--fairness-key-weight-unset", "Priority", + "--fairness-key-weight", "Priority=2.0", + "--fairness-key-weight", "Priority=default", ) s.Error(res.Err) - s.Contains(res.Err.Error(), "both set and unset") + s.Contains(res.Err.Error(), "duplicate fairness key") // Weight below minimum - should fail res = s.Execute( @@ -365,29 +365,18 @@ func (s *SharedServerSuite) TestTaskQueue_Config_Validation() { "--address", s.Address(), "--task-queue", taskQueue, "--task-queue-type", "activity", - "--fairness-key-weight-set", "Priority=0.0005", + "--fairness-key-weight", "Priority=0.0005", ) s.Error(res.Err) s.Contains(res.Err.Error(), "below minimum") - // Weight above maximum - should fail - res = s.Execute( - "task-queue", "config", "set", - "--address", s.Address(), - "--task-queue", taskQueue, - "--task-queue-type", "activity", - "--fairness-key-weight-set", "Priority=1001", - ) - s.Error(res.Err) - s.Contains(res.Err.Error(), "exceeds maximum") - // Invalid format - should fail res = s.Execute( "task-queue", "config", "set", "--address", s.Address(), "--task-queue", taskQueue, "--task-queue-type", "activity", - "--fairness-key-weight-set", "Priority", + "--fairness-key-weight", "Priority", ) s.Error(res.Err) s.Contains(res.Err.Error(), "expected key=weight") @@ -402,8 +391,8 @@ func (s *SharedServerSuite) TestTaskQueue_Config_FairnessWeightOverrides() { "--address", s.Address(), "--task-queue", taskQueue, "--task-queue-type", "activity", - "--fairness-key-weight-set", "HighPriority=2.0", - "--fairness-key-weight-set", "LowPriority=0.5", + "--fairness-key-weight", "HighPriority=2.0", + "--fairness-key-weight", "LowPriority=0.5", ) s.NoError(res.Err) @@ -422,13 +411,13 @@ func (s *SharedServerSuite) TestTaskQueue_Config_FairnessWeightOverrides() { s.Equal(float32(2.0), config.FairnessWeightOverrides["HighPriority"]) s.Equal(float32(0.5), config.FairnessWeightOverrides["LowPriority"]) - // Unset one weight + // Unset one weight using default res = s.Execute( "task-queue", "config", "set", "--address", s.Address(), "--task-queue", taskQueue, "--task-queue-type", "activity", - "--fairness-key-weight-unset", "LowPriority", + "--fairness-key-weight", "LowPriority=default", ) s.NoError(res.Err) @@ -447,13 +436,13 @@ func (s *SharedServerSuite) TestTaskQueue_Config_FairnessWeightOverrides() { s.Equal(float32(2.0), config2.FairnessWeightOverrides["HighPriority"]) s.NotContains(config2.FairnessWeightOverrides, "LowPriority") - // Unset all weights + // Clear all weights res = s.Execute( "task-queue", "config", "set", "--address", s.Address(), "--task-queue", taskQueue, "--task-queue-type", "activity", - "--fairness-key-weight-unset-all", + "--fairness-key-weight-clear-all", ) s.NoError(res.Err) diff --git a/internal/temporalcli/commands.yaml b/internal/temporalcli/commands.yaml index fdbc01206..724c92b26 100644 --- a/internal/temporalcli/commands.yaml +++ b/internal/temporalcli/commands.yaml @@ -3284,21 +3284,17 @@ commands: - name: fairness-key-rps-limit-reason type: string description: Reason for fairness key rate limit update. - - name: fairness-key-weight-set + - name: fairness-key-weight type: string[] description: | - Set fairness key weight overrides in format key=weight (range: 0.001-1000). - Can be specified multiple times. Example: --fairness-key-weight-set HighPriority=2.0 --fairness-key-weight-set LowPriority=0.5. - - name: fairness-key-weight-unset - type: string[] - description: | - Unset specific fairness key weight overrides. - Can be specified multiple times. Example: --fairness-key-weight-unset HighPriority --fairness-key-weight-unset LowPriority. - - name: fairness-key-weight-unset-all + Set or unset fairness key weight overrides in format key=weight or key=default. + Use key=weight to set a weight (range: 0.001-1000); use key=default to unset. + Can be specified multiple times. Example: --fairness-key-weight HighPriority=2.0 --fairness-key-weight LowPriority=default. + - name: fairness-key-weight-clear-all type: bool description: | Unset all fairness key weight overrides. - Cannot be used with --fairness-key-weight-set or --fairness-key-weight-unset. + Cannot be used with --fairness-key-weight. - name: temporal workflow summary: Start, list, and operate on Workflows From bc554172eb674d473432e4090dc9404b0d394249 Mon Sep 17 00:00:00 2001 From: Jacob Moody Date: Thu, 23 Apr 2026 11:16:05 -0500 Subject: [PATCH 8/8] more cleanup --- internal/temporalcli/commands.gen.go | 2 +- .../temporalcli/commands.taskqueue_config.go | 48 ++++++++++--------- .../commands.taskqueue_config_test.go | 6 +-- internal/temporalcli/commands.yaml | 2 +- 4 files changed, 30 insertions(+), 28 deletions(-) diff --git a/internal/temporalcli/commands.gen.go b/internal/temporalcli/commands.gen.go index 65837fda4..a7b36f1ff 100644 --- a/internal/temporalcli/commands.gen.go +++ b/internal/temporalcli/commands.gen.go @@ -2254,7 +2254,7 @@ func NewTemporalTaskQueueConfigSetCommand(cctx *CommandContext, parent *Temporal s.Command.Flags().StringVar(&s.FairnessKeyRpsLimitDefault, "fairness-key-rps-limit-default", "", "Fairness key rate limit default in requests per second. Accepts a float; or 'default' to unset.") overrideFlagDisplayType(s.Command.Flags().Lookup("fairness-key-rps-limit-default"), "float|default") s.Command.Flags().StringVar(&s.FairnessKeyRpsLimitReason, "fairness-key-rps-limit-reason", "", "Reason for fairness key rate limit update.") - s.Command.Flags().StringArrayVar(&s.FairnessKeyWeight, "fairness-key-weight", nil, "Set or unset fairness key weight overrides in format key=weight or key=default. Use key=weight to set a weight (range: 0.001-1000); use key=default to unset. Can be specified multiple times. Example: --fairness-key-weight HighPriority=2.0 --fairness-key-weight LowPriority=default.") + s.Command.Flags().StringArrayVar(&s.FairnessKeyWeight, "fairness-key-weight", nil, "Set or unset fairness key weight overrides in format key=weight or key=default. Use key=weight to set a positive weight value; use key=default to unset. Can be specified multiple times. Example: --fairness-key-weight HighPriority=2.0 --fairness-key-weight LowPriority=default.") s.Command.Flags().BoolVar(&s.FairnessKeyWeightClearAll, "fairness-key-weight-clear-all", false, "Unset all fairness key weight overrides. Cannot be used with --fairness-key-weight.") s.Command.Run = func(c *cobra.Command, args []string) { if err := s.run(cctx, args); err != nil { diff --git a/internal/temporalcli/commands.taskqueue_config.go b/internal/temporalcli/commands.taskqueue_config.go index 803db5044..ecd49506a 100644 --- a/internal/temporalcli/commands.taskqueue_config.go +++ b/internal/temporalcli/commands.taskqueue_config.go @@ -60,12 +60,14 @@ func (c *TemporalTaskQueueConfigGetCommand) run(cctx *CommandContext, args []str const ( // maxFairnessKeyLength matches server-side limit in temporal/common/priorities/priority_util.go maxFairnessKeyLength = 64 - - // minFairnessWeight matches server-side limit in temporal/service/matching/fairness_util.go - // Client only enforces positive weight - server handles clamping to its configured range - minFairnessWeight = 0.001 ) +// printZeroRateLimitWarning prints a warning when a rate limit is set to 0 +func printZeroRateLimitWarning(p *printer.Printer, limitType string) { + p.Printlnf("WARNING: Setting %s to 0 will STOP ALL TRAFFIC on this task queue.", limitType) + p.Println(" This will prevent any tasks from being dispatched until the limit is changed.") +} + // parseFairnessKeyWeights parses "key=weight" or "key=default" format strings // Returns separate maps for set and unset operations, or an error if there are duplicate keys, invalid weights, or malformed input // If inputs is empty, returns nil for both maps @@ -75,7 +77,7 @@ func parseFairnessKeyWeights(inputs []string) (setWeights map[string]float32, un } setWeights = make(map[string]float32) - var unsetKeysMap = make(map[string]bool) // Track unset keys in a map to check for duplicates + unsetKeysMap := make(map[string]bool) // Track unset keys in a map to check for duplicates seen := make(map[string]bool) for _, input := range inputs { @@ -95,22 +97,23 @@ func parseFairnessKeyWeights(inputs []string) (setWeights map[string]float32, un } seen[key] = true - // Validate key length (server enforces 64 byte limit) - if len(key) > maxFairnessKeyLength { - return nil, nil, fmt.Errorf("fairness key %q exceeds maximum length of %d bytes", key, maxFairnessKeyLength) - } - valueStr := parts[1] if valueStr == "" { return nil, nil, fmt.Errorf("empty value for key %q", key) } // Check if this is an unset operation (value is "default") + // Do this before validating key length since we don't care about length when unsetting if strings.EqualFold(valueStr, "default") { unsetKeysMap[key] = true continue } + // Validate key length only for set operations (server enforces 64 byte limit) + if len(key) > maxFairnessKeyLength { + return nil, nil, fmt.Errorf("fairness key %q exceeds maximum length of %d bytes", key, maxFairnessKeyLength) + } + // Parse as weight weight, err := strconv.ParseFloat(valueStr, 32) if err != nil { @@ -118,8 +121,8 @@ func parseFairnessKeyWeights(inputs []string) (setWeights map[string]float32, un } // Validate weight is positive - server handles clamping to its configured range - if weight < minFairnessWeight { - return nil, nil, fmt.Errorf("weight %.3f for key %q is below minimum %.3f", weight, key, minFairnessWeight) + if weight <= 0 { + return nil, nil, fmt.Errorf("weight for key %q must be positive", key) } setWeights[key] = float32(weight) @@ -127,10 +130,7 @@ func parseFairnessKeyWeights(inputs []string) (setWeights map[string]float32, un // Convert unset map to slice if len(unsetKeysMap) > 0 { - unsetKeys = make([]string, 0, len(unsetKeysMap)) - for key := range unsetKeysMap { - unsetKeys = append(unsetKeys, key) - } + unsetKeys = maps.Keys(unsetKeysMap) } // Return nil instead of empty maps/slices @@ -200,8 +200,7 @@ func (c *TemporalTaskQueueConfigSetCommand) run(cctx *CommandContext, args []str // Warn about zero rate limit (stops all traffic) if queueRateLimitIsZero { - cctx.Printer.Println("WARNING: Setting queue rate limit to 0 will STOP ALL TRAFFIC on this task queue.") - cctx.Printer.Println(" This will prevent any tasks from being dispatched until the limit is changed.") + printZeroRateLimitWarning(cctx.Printer, "queue rate limit") } } @@ -217,8 +216,7 @@ func (c *TemporalTaskQueueConfigSetCommand) run(cctx *CommandContext, args []str // Warn about zero rate limit if fairnessRateLimitIsZero { - cctx.Printer.Println("WARNING: Setting fairness key rate limit default to 0 will STOP ALL TRAFFIC on this task queue.") - cctx.Printer.Println(" This will prevent any tasks from being dispatched until the limit is changed.") + printZeroRateLimitWarning(cctx.Printer, "fairness key rate limit default") } } @@ -297,10 +295,14 @@ func (c *TemporalTaskQueueConfigSetCommand) run(cctx *CommandContext, args []str if err != nil { return fmt.Errorf("error fetching current config for clear-all: %w", err) } - if descResp.Config != nil && descResp.Config.FairnessWeightOverrides != nil && len(descResp.Config.FairnessWeightOverrides) > 0 { - keys := maps.Keys(descResp.Config.FairnessWeightOverrides) + var overrides map[string]float32 + if descResp.Config != nil { + overrides = descResp.Config.FairnessWeightOverrides + } + keys := maps.Keys(overrides) + if len(keys) > 0 { request.UnsetFairnessWeightOverrides = keys - cctx.Printer.Println(fmt.Sprintf("Unsetting %d fairness weight override(s)", len(keys))) + cctx.Printer.Printlnf("Unsetting %d fairness weight override(s)", len(keys)) } else { cctx.Printer.Println("No fairness weight overrides found to unset") // Don't return error, just proceed with no-op update diff --git a/internal/temporalcli/commands.taskqueue_config_test.go b/internal/temporalcli/commands.taskqueue_config_test.go index 73e5db97c..25b1efb4c 100644 --- a/internal/temporalcli/commands.taskqueue_config_test.go +++ b/internal/temporalcli/commands.taskqueue_config_test.go @@ -359,16 +359,16 @@ func (s *SharedServerSuite) TestTaskQueue_Config_Validation() { s.Error(res.Err) s.Contains(res.Err.Error(), "duplicate fairness key") - // Weight below minimum - should fail + // Zero or negative weight - should fail res = s.Execute( "task-queue", "config", "set", "--address", s.Address(), "--task-queue", taskQueue, "--task-queue-type", "activity", - "--fairness-key-weight", "Priority=0.0005", + "--fairness-key-weight", "Priority=0", ) s.Error(res.Err) - s.Contains(res.Err.Error(), "below minimum") + s.Contains(res.Err.Error(), "must be positive") // Invalid format - should fail res = s.Execute( diff --git a/internal/temporalcli/commands.yaml b/internal/temporalcli/commands.yaml index 724c92b26..31dfecfc7 100644 --- a/internal/temporalcli/commands.yaml +++ b/internal/temporalcli/commands.yaml @@ -3288,7 +3288,7 @@ commands: type: string[] description: | Set or unset fairness key weight overrides in format key=weight or key=default. - Use key=weight to set a weight (range: 0.001-1000); use key=default to unset. + Use key=weight to set a positive weight value; use key=default to unset. Can be specified multiple times. Example: --fairness-key-weight HighPriority=2.0 --fairness-key-weight LowPriority=default. - name: fairness-key-weight-clear-all type: bool