diff --git a/internal/temporalcli/commands.gen.go b/internal/temporalcli/commands.gen.go index 87cdc5c8f..edb210451 100644 --- a/internal/temporalcli/commands.gen.go +++ b/internal/temporalcli/commands.gen.go @@ -2537,6 +2537,8 @@ type TemporalTaskQueueConfigSetCommand struct { QueueRpsLimitReason string FairnessKeyRpsLimitDefault string FairnessKeyRpsLimitReason string + FairnessKeyWeight []string + FairnessKeyWeightClearAll bool } func NewTemporalTaskQueueConfigSetCommand(cctx *CommandContext, parent *TemporalTaskQueueConfigCommand) *TemporalTaskQueueConfigSetCommand { @@ -2546,9 +2548,9 @@ func NewTemporalTaskQueueConfigSetCommand(cctx *CommandContext, parent *Temporal s.Command.Use = "set [flags]" s.Command.Short = "Set Task Queue configuration" if hasHighlighting { - s.Command.Long = "Update configuration settings for a Task Queue.\n\n\x1b[1mtemporal task-queue config set \\\n --task-queue YourTaskQueue \\\n --task-queue-type activity \\\n --namespace YourNamespace \\\n --queue-rps-limit \\\n --queue-rps-limit-reason \\\n --fairness-key-rps-limit-default \\\n --fairness-key-rps-limit-reason \x1b[0m\n\nThis command supports updating:\n- Queue rate limits: Controls the overall rate limit of the task queue.\n This setting overrides the worker rate limit if set.\n Unless modified, this is the system-defined rate limit.\n- Fairness key rate limit defaults: Sets default rate limits for fairness keys.\n If set, each individual fairness key will be limited to this rate,\n scaled by the weight of the fairness key.\n\nTo unset a rate limit, pass in 'default', for example: --queue-rps-limit default" + s.Command.Long = "Update configuration settings for a Task Queue.\n\n\x1b[1mtemporal task-queue config set \\\n --task-queue YourTaskQueue \\\n --task-queue-type activity \\\n --namespace YourNamespace \\\n --queue-rps-limit \\\n --queue-rps-limit-reason \\\n --fairness-key-rps-limit-default \\\n --fairness-key-rps-limit-reason \\\n --fairness-key-weight-set HighPriority=2.0 \\\n --fairness-key-weight-set LowPriority=0.5\x1b[0m\n\nThis command supports updating:\n- Queue rate limits: Controls the overall rate limit of the task queue.\n This setting overrides the worker rate limit if set.\n Unless modified, this is the system-defined rate limit.\n- Fairness key rate limit defaults: Sets default rate limits for fairness keys.\n If set, each individual fairness key will be limited to this rate,\n scaled by the weight of the fairness key.\n- Fairness key weight overrides: Set custom weights for specific fairness keys.\n Weights control the relative share of capacity each key receives.\n\nTo unset a rate limit, pass in 'default', for example: --queue-rps-limit default\nTo unset specific fairness weights, use --fairness-key-weight-unset \nTo unset all fairness weights, use --fairness-key-weight-unset-all" } else { - s.Command.Long = "Update configuration settings for a Task Queue.\n\n```\ntemporal task-queue config set \\\n --task-queue YourTaskQueue \\\n --task-queue-type activity \\\n --namespace YourNamespace \\\n --queue-rps-limit \\\n --queue-rps-limit-reason \\\n --fairness-key-rps-limit-default \\\n --fairness-key-rps-limit-reason \n```\n\nThis command supports updating:\n- Queue rate limits: Controls the overall rate limit of the task queue.\n This setting overrides the worker rate limit if set.\n Unless modified, this is the system-defined rate limit.\n- Fairness key rate limit defaults: Sets default rate limits for fairness keys.\n If set, each individual fairness key will be limited to this rate,\n scaled by the weight of the fairness key.\n\nTo unset a rate limit, pass in 'default', for example: --queue-rps-limit default" + s.Command.Long = "Update configuration settings for a Task Queue.\n\n```\ntemporal task-queue config set \\\n --task-queue YourTaskQueue \\\n --task-queue-type activity \\\n --namespace YourNamespace \\\n --queue-rps-limit \\\n --queue-rps-limit-reason \\\n --fairness-key-rps-limit-default \\\n --fairness-key-rps-limit-reason \\\n --fairness-key-weight-set HighPriority=2.0 \\\n --fairness-key-weight-set LowPriority=0.5\n```\n\nThis command supports updating:\n- Queue rate limits: Controls the overall rate limit of the task queue.\n This setting overrides the worker rate limit if set.\n Unless modified, this is the system-defined rate limit.\n- Fairness key rate limit defaults: Sets default rate limits for fairness keys.\n If set, each individual fairness key will be limited to this rate,\n scaled by the weight of the fairness key.\n- Fairness key weight overrides: Set custom weights for specific fairness keys.\n Weights control the relative share of capacity each key receives.\n\nTo unset a rate limit, pass in 'default', for example: --queue-rps-limit default\nTo unset specific fairness weights, use --fairness-key-weight-unset \nTo unset all fairness weights, use --fairness-key-weight-unset-all" } s.Command.Args = cobra.NoArgs s.Command.Flags().StringVarP(&s.TaskQueue, "task-queue", "t", "", "Task Queue name. Required.") @@ -2562,6 +2564,8 @@ func NewTemporalTaskQueueConfigSetCommand(cctx *CommandContext, parent *Temporal s.Command.Flags().StringVar(&s.FairnessKeyRpsLimitDefault, "fairness-key-rps-limit-default", "", "Fairness key rate limit default in requests per second. Accepts a float; or 'default' to unset.") overrideFlagDisplayType(s.Command.Flags().Lookup("fairness-key-rps-limit-default"), "float|default") s.Command.Flags().StringVar(&s.FairnessKeyRpsLimitReason, "fairness-key-rps-limit-reason", "", "Reason for fairness key rate limit update.") + s.Command.Flags().StringArrayVar(&s.FairnessKeyWeight, "fairness-key-weight", nil, "Set or unset fairness key weight overrides in format key=weight or key=default. Use key=weight to set a positive weight value; use key=default to unset. Can be specified multiple times. Example: --fairness-key-weight HighPriority=2.0 --fairness-key-weight LowPriority=default.") + s.Command.Flags().BoolVar(&s.FairnessKeyWeightClearAll, "fairness-key-weight-clear-all", false, "Unset all fairness key weight overrides. Cannot be used with --fairness-key-weight.") s.Command.Run = func(c *cobra.Command, args []string) { if err := s.run(cctx, args); err != nil { cctx.Options.Fail(err) diff --git a/internal/temporalcli/commands.taskqueue_config.go b/internal/temporalcli/commands.taskqueue_config.go index 41a42b37f..ecd49506a 100644 --- a/internal/temporalcli/commands.taskqueue_config.go +++ b/internal/temporalcli/commands.taskqueue_config.go @@ -9,14 +9,15 @@ import ( enums "go.temporal.io/api/enums/v1" "go.temporal.io/api/taskqueue/v1" "go.temporal.io/api/workflowservice/v1" + "golang.org/x/exp/maps" ) // TaskQueueConfigGetCommand handles getting task queue configuration func (c *TemporalTaskQueueConfigGetCommand) run(cctx *CommandContext, args []string) error { // Validate inputs before dialing client - taskQueue := c.TaskQueue + taskQueue := strings.TrimSpace(c.TaskQueue) if taskQueue == "" { - return fmt.Errorf("taskQueue name is required") + return fmt.Errorf("task queue name is required and cannot be empty") } taskQueueType, err := parseTaskQueueType(c.TaskQueueType.Value) @@ -56,12 +57,99 @@ func (c *TemporalTaskQueueConfigGetCommand) run(cctx *CommandContext, args []str return printTaskQueueConfig(cctx, resp.Config) } +const ( + // maxFairnessKeyLength matches server-side limit in temporal/common/priorities/priority_util.go + maxFairnessKeyLength = 64 +) + +// printZeroRateLimitWarning prints a warning when a rate limit is set to 0 +func printZeroRateLimitWarning(p *printer.Printer, limitType string) { + p.Printlnf("WARNING: Setting %s to 0 will STOP ALL TRAFFIC on this task queue.", limitType) + p.Println(" This will prevent any tasks from being dispatched until the limit is changed.") +} + +// parseFairnessKeyWeights parses "key=weight" or "key=default" format strings +// Returns separate maps for set and unset operations, or an error if there are duplicate keys, invalid weights, or malformed input +// If inputs is empty, returns nil for both maps +func parseFairnessKeyWeights(inputs []string) (setWeights map[string]float32, unsetKeys []string, err error) { + if len(inputs) == 0 { + return nil, nil, nil + } + + setWeights = make(map[string]float32) + unsetKeysMap := make(map[string]bool) // Track unset keys in a map to check for duplicates + seen := make(map[string]bool) + + for _, input := range inputs { + parts := strings.SplitN(input, "=", 2) + if len(parts) != 2 { + return nil, nil, fmt.Errorf("invalid format: %q (expected key=weight or key=default)", input) + } + + key := parts[0] + if key == "" { + return nil, nil, fmt.Errorf("empty key in: %q", input) + } + + // Check for duplicate keys across both set and unset + if seen[key] { + return nil, nil, fmt.Errorf("duplicate fairness key %q specified multiple times", key) + } + seen[key] = true + + valueStr := parts[1] + if valueStr == "" { + return nil, nil, fmt.Errorf("empty value for key %q", key) + } + + // Check if this is an unset operation (value is "default") + // Do this before validating key length since we don't care about length when unsetting + if strings.EqualFold(valueStr, "default") { + unsetKeysMap[key] = true + continue + } + + // Validate key length only for set operations (server enforces 64 byte limit) + if len(key) > maxFairnessKeyLength { + return nil, nil, fmt.Errorf("fairness key %q exceeds maximum length of %d bytes", key, maxFairnessKeyLength) + } + + // Parse as weight + weight, err := strconv.ParseFloat(valueStr, 32) + if err != nil { + return nil, nil, fmt.Errorf("invalid weight %q for key %q: must be a number or 'default'", valueStr, key) + } + + // Validate weight is positive - server handles clamping to its configured range + if weight <= 0 { + return nil, nil, fmt.Errorf("weight for key %q must be positive", key) + } + + setWeights[key] = float32(weight) + } + + // Convert unset map to slice + if len(unsetKeysMap) > 0 { + unsetKeys = maps.Keys(unsetKeysMap) + } + + // Return nil instead of empty maps/slices + if len(setWeights) == 0 { + setWeights = nil + } + if len(unsetKeys) == 0 { + unsetKeys = nil + } + + return setWeights, unsetKeys, nil +} + // TaskQueueConfigSetCommand handles setting task queue configuration func (c *TemporalTaskQueueConfigSetCommand) run(cctx *CommandContext, args []string) error { // Validate inputs before dialing client - taskQueue := c.TaskQueue + taskQueue := strings.TrimSpace(c.TaskQueue) if taskQueue == "" { - return fmt.Errorf("taskQueue name is required") + return fmt.Errorf("task queue name is required and cannot be empty") } taskQueueType, err := parseTaskQueueType(c.TaskQueueType.Value) @@ -79,43 +167,57 @@ func (c *TemporalTaskQueueConfigSetCommand) run(cctx *CommandContext, args []str // Helper to parse RPS values for a given flag name. // Accepts "default" or a non-negative float string. - parseRPS := func(flagName string) (*taskqueue.RateLimit, error) { + // Returns (rateLimit, isZero, error) + parseRPS := func(flagName string) (*taskqueue.RateLimit, bool, error) { raw := strings.TrimSpace(c.Command.Flags().Lookup(flagName).Value.String()) if raw == "" { - return nil, fmt.Errorf("invalid value for --%s: must be a non-negative number or 'default'", flagName) + return nil, false, fmt.Errorf("invalid value for --%s: must be a non-negative number or 'default'", flagName) } if strings.EqualFold(raw, "default") { // Unset: returning nil RateLimit removes the existing rate limit. - return nil, nil + return nil, false, nil } v, err := strconv.ParseFloat(raw, 32) if err != nil { - return nil, fmt.Errorf("invalid value for --%s: must be a non-negative number or 'default'", flagName) + return nil, false, fmt.Errorf("invalid value for --%s: must be a non-negative number or 'default'", flagName) } if v < 0 { - return nil, fmt.Errorf("invalid value for --%s: must be >= 0 or 'default'", flagName) + return nil, false, fmt.Errorf("invalid value for --%s: must be >= 0 or 'default'", flagName) } - return &taskqueue.RateLimit{RequestsPerSecond: float32(v)}, nil + isZero := v == 0 + return &taskqueue.RateLimit{RequestsPerSecond: float32(v)}, isZero, nil } + // Parse and validate queue rate limit var queueRpsLimitParsed *taskqueue.RateLimit + var queueRateLimitIsZero bool if c.Command.Flags().Changed("queue-rps-limit") { var err error - if queueRpsLimitParsed, err = parseRPS("queue-rps-limit"); err != nil { + queueRpsLimitParsed, queueRateLimitIsZero, err = parseRPS("queue-rps-limit") + if err != nil { return err } - } else if c.Command.Flags().Changed("queue-rps-limit-reason") { - return fmt.Errorf("queue-rps-limit-reason can only be set if queue-rps-limit is updated") + + // Warn about zero rate limit (stops all traffic) + if queueRateLimitIsZero { + printZeroRateLimitWarning(cctx.Printer, "queue rate limit") + } } + // Parse and validate fairness key rate limit default var fairnessKeyRpsLimitDefaultParsed *taskqueue.RateLimit + var fairnessRateLimitIsZero bool if c.Command.Flags().Changed("fairness-key-rps-limit-default") { var err error - if fairnessKeyRpsLimitDefaultParsed, err = parseRPS("fairness-key-rps-limit-default"); err != nil { + fairnessKeyRpsLimitDefaultParsed, fairnessRateLimitIsZero, err = parseRPS("fairness-key-rps-limit-default") + if err != nil { return err } - } else if c.Command.Flags().Changed("fairness-key-rps-limit-default-reason") { - return fmt.Errorf("fairness-key-rps-limit-default-reason can only be set if fairness-key-rps-limit-default is updated") + + // Warn about zero rate limit + if fairnessRateLimitIsZero { + printZeroRateLimitWarning(cctx.Printer, "fairness key rate limit default") + } } cl, err := dialClient(cctx, &c.Parent.Parent.ClientOptions) @@ -152,10 +254,66 @@ func (c *TemporalTaskQueueConfigSetCommand) run(cctx *CommandContext, args []str } } + // Validate at least one configuration change is requested + hasAnyUpdate := c.Command.Flags().Changed("queue-rps-limit") || + c.Command.Flags().Changed("fairness-key-rps-limit-default") || + len(c.FairnessKeyWeight) > 0 || + c.FairnessKeyWeightClearAll + + if !hasAnyUpdate { + return fmt.Errorf("at least one configuration update must be specified (use --help to see available options)") + } + + // Handle fairness weight overrides + // Validate mutual exclusivity of clear-all with other weight operations + if c.FairnessKeyWeightClearAll { + if len(c.FairnessKeyWeight) > 0 { + return fmt.Errorf("--fairness-key-weight-clear-all cannot be used with --fairness-key-weight") + } + } + + // Parse fairness key weights (handles both set and unset operations) + setWeights, unsetKeys, err := parseFairnessKeyWeights(c.FairnessKeyWeight) + if err != nil { + return err + } + request.SetFairnessWeightOverrides = setWeights + request.UnsetFairnessWeightOverrides = unsetKeys + + // Handle clear all + if c.FairnessKeyWeightClearAll { + // Need to fetch current config to get all keys to unset + descResp, err := cl.WorkflowService().DescribeTaskQueue(cctx, &workflowservice.DescribeTaskQueueRequest{ + Namespace: namespace, + TaskQueue: &taskqueue.TaskQueue{ + Name: taskQueue, + Kind: enums.TASK_QUEUE_KIND_NORMAL, + }, + TaskQueueType: taskQueueType, + ReportConfig: true, + }) + if err != nil { + return fmt.Errorf("error fetching current config for clear-all: %w", err) + } + var overrides map[string]float32 + if descResp.Config != nil { + overrides = descResp.Config.FairnessWeightOverrides + } + keys := maps.Keys(overrides) + if len(keys) > 0 { + request.UnsetFairnessWeightOverrides = keys + cctx.Printer.Printlnf("Unsetting %d fairness weight override(s)", len(keys)) + } else { + cctx.Printer.Println("No fairness weight overrides found to unset") + // Don't return error, just proceed with no-op update + } + } + // Call the API resp, err := cl.WorkflowService().UpdateTaskQueueConfig(cctx, request) if err != nil { - return fmt.Errorf("error updating task queue config: %w", err) + // Provide more context in error message + return fmt.Errorf("failed to update task queue config for %s/%s: %w", namespace, taskQueue, err) } cctx.Printer.Println("Successfully updated task queue configuration") diff --git a/internal/temporalcli/commands.taskqueue_config_test.go b/internal/temporalcli/commands.taskqueue_config_test.go index 9939844dc..25b1efb4c 100644 --- a/internal/temporalcli/commands.taskqueue_config_test.go +++ b/internal/temporalcli/commands.taskqueue_config_test.go @@ -9,6 +9,7 @@ import ( type taskQueueConfigType struct { QueueRateLimit *rateLimitConfigType `json:"queueRateLimit,omitempty"` FairnessKeysRateLimitDefault *rateLimitConfigType `json:"fairnessKeysRateLimitDefault,omitempty"` + FairnessWeightOverrides map[string]float32 `json:"fairnessWeightOverrides,omitempty"` } type rateLimitConfigType struct { @@ -92,8 +93,8 @@ func (s *SharedServerSuite) TestTaskQueue_Config_Set_And_Get_Both_Limits() { func (s *SharedServerSuite) TestTaskQueue_Config_Unset_Rate_Limits() { taskQueue := "test-config-queue-" + s.T().Name() testIdentity := "test-identity-" + s.T().Name() - var config taskQueueConfigType - // Set initial configuration + + // Set both rate limits res := s.Execute( "task-queue", "config", "set", "--address", s.Address(), @@ -101,39 +102,24 @@ func (s *SharedServerSuite) TestTaskQueue_Config_Unset_Rate_Limits() { "--task-queue-type", "activity", "--identity", testIdentity, "--queue-rps-limit", "10.0", + "--queue-rps-limit-reason", "test", "--fairness-key-rps-limit-default", "5.0", + "--fairness-key-rps-limit-reason", "test", ) s.NoError(res.Err) - res = s.Execute( - "task-queue", "config", "get", - "--address", s.Address(), - "--task-queue", taskQueue, - "--task-queue-type", "activity", - "-o", "json", - ) - s.NoError(res.Err) - - s.NoError(json.Unmarshal(res.Stdout.Bytes(), &config)) - s.NotNil(config.QueueRateLimit) - s.NotNil(config.QueueRateLimit.RateLimit) - s.Equal(float32(10.0), config.QueueRateLimit.RateLimit.RequestsPerSecond) - s.NotNil(config.FairnessKeysRateLimitDefault) - s.NotNil(config.FairnessKeysRateLimitDefault.RateLimit) - s.Equal(float32(5.0), config.FairnessKeysRateLimitDefault.RateLimit.RequestsPerSecond) - // Unset queue rate limit (set to default) res = s.Execute( "task-queue", "config", "set", "--address", s.Address(), "--task-queue", taskQueue, "--task-queue-type", "activity", - "--identity", testIdentity, "--queue-rps-limit", "default", + "--queue-rps-limit-reason", "unset", ) s.NoError(res.Err) - // Get configuration and verify queue rate limit is unset using JSON output + // Verify queue rate limit is unset but fairness limit remains res = s.Execute( "task-queue", "config", "get", "--address", s.Address(), @@ -143,12 +129,10 @@ func (s *SharedServerSuite) TestTaskQueue_Config_Unset_Rate_Limits() { ) s.NoError(res.Err) - var unsetQrlConfig taskQueueConfigType - s.NoError(json.Unmarshal(res.Stdout.Bytes(), &unsetQrlConfig)) - s.NotNil(unsetQrlConfig.QueueRateLimit) - s.Nil(unsetQrlConfig.QueueRateLimit.RateLimit) - s.NotNil(unsetQrlConfig.FairnessKeysRateLimitDefault) - s.Equal(float32(5.0), unsetQrlConfig.FairnessKeysRateLimitDefault.RateLimit.RequestsPerSecond) + var config taskQueueConfigType + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &config)) + s.Nil(config.QueueRateLimit.RateLimit) + s.Equal(float32(5.0), config.FairnessKeysRateLimitDefault.RateLimit.RequestsPerSecond) // Unset fairness key rate limit res = s.Execute( @@ -156,12 +140,12 @@ func (s *SharedServerSuite) TestTaskQueue_Config_Unset_Rate_Limits() { "--address", s.Address(), "--task-queue", taskQueue, "--task-queue-type", "activity", - "--identity", testIdentity, "--fairness-key-rps-limit-default", "default", + "--fairness-key-rps-limit-reason", "unset", ) s.NoError(res.Err) - // Get configuration and verify both are unset using JSON output + // Verify both are now unset res = s.Execute( "task-queue", "config", "get", "--address", s.Address(), @@ -171,10 +155,9 @@ func (s *SharedServerSuite) TestTaskQueue_Config_Unset_Rate_Limits() { ) s.NoError(res.Err) - var unsetFkrlConfig taskQueueConfigType - s.NoError(json.Unmarshal(res.Stdout.Bytes(), &unsetFkrlConfig)) - s.NotNil(unsetFkrlConfig.FairnessKeysRateLimitDefault) - s.Nil(unsetFkrlConfig.FairnessKeysRateLimitDefault.RateLimit) + var config2 taskQueueConfigType + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &config2)) + s.Nil(config2.FairnessKeysRateLimitDefault.RateLimit) } func (s *SharedServerSuite) TestTaskQueue_Config_Workflow_Task_Queue_Restrictions() { @@ -325,3 +308,155 @@ namespace = "%s" s.Contains(res.Stdout.String(), "No configuration found for task queue", "CLI flag should override envconfig and query default namespace") } + +func (s *SharedServerSuite) TestTaskQueue_Config_Validation() { + taskQueue := "test-config-queue-" + s.T().Name() + + // No updates specified - should fail + res := s.Execute( + "task-queue", "config", "set", + "--address", s.Address(), + "--task-queue", taskQueue, + "--task-queue-type", "activity", + ) + s.Error(res.Err) + s.Contains(res.Err.Error(), "at least one configuration update") + + // Set rate limit to zero - should succeed but warn + res = s.Execute( + "task-queue", "config", "set", + "--address", s.Address(), + "--task-queue", taskQueue, + "--task-queue-type", "activity", + "--queue-rps-limit", "0", + "--queue-rps-limit-reason", "emergency stop", + ) + s.NoError(res.Err) + s.Contains(res.Stdout.String(), "WARNING") + s.Contains(res.Stdout.String(), "STOP ALL TRAFFIC") + + // Duplicate fairness keys - should fail + res = s.Execute( + "task-queue", "config", "set", + "--address", s.Address(), + "--task-queue", taskQueue, + "--task-queue-type", "activity", + "--fairness-key-weight", "Priority=2.0", + "--fairness-key-weight", "Priority=3.0", + ) + s.Error(res.Err) + s.Contains(res.Err.Error(), "duplicate fairness key") + + // Set and unset same key - should fail + res = s.Execute( + "task-queue", "config", "set", + "--address", s.Address(), + "--task-queue", taskQueue, + "--task-queue-type", "activity", + "--fairness-key-weight", "Priority=2.0", + "--fairness-key-weight", "Priority=default", + ) + s.Error(res.Err) + s.Contains(res.Err.Error(), "duplicate fairness key") + + // Zero or negative weight - should fail + res = s.Execute( + "task-queue", "config", "set", + "--address", s.Address(), + "--task-queue", taskQueue, + "--task-queue-type", "activity", + "--fairness-key-weight", "Priority=0", + ) + s.Error(res.Err) + s.Contains(res.Err.Error(), "must be positive") + + // Invalid format - should fail + res = s.Execute( + "task-queue", "config", "set", + "--address", s.Address(), + "--task-queue", taskQueue, + "--task-queue-type", "activity", + "--fairness-key-weight", "Priority", + ) + s.Error(res.Err) + s.Contains(res.Err.Error(), "expected key=weight") +} + +func (s *SharedServerSuite) TestTaskQueue_Config_FairnessWeightOverrides() { + taskQueue := "test-config-queue-" + s.T().Name() + + // Set multiple fairness weight overrides + res := s.Execute( + "task-queue", "config", "set", + "--address", s.Address(), + "--task-queue", taskQueue, + "--task-queue-type", "activity", + "--fairness-key-weight", "HighPriority=2.0", + "--fairness-key-weight", "LowPriority=0.5", + ) + s.NoError(res.Err) + + // Verify weights were set + res = s.Execute( + "task-queue", "config", "get", + "--address", s.Address(), + "--task-queue", taskQueue, + "--task-queue-type", "activity", + "-o", "json", + ) + s.NoError(res.Err) + + var config taskQueueConfigType + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &config)) + s.Equal(float32(2.0), config.FairnessWeightOverrides["HighPriority"]) + s.Equal(float32(0.5), config.FairnessWeightOverrides["LowPriority"]) + + // Unset one weight using default + res = s.Execute( + "task-queue", "config", "set", + "--address", s.Address(), + "--task-queue", taskQueue, + "--task-queue-type", "activity", + "--fairness-key-weight", "LowPriority=default", + ) + s.NoError(res.Err) + + // Verify only HighPriority remains + res = s.Execute( + "task-queue", "config", "get", + "--address", s.Address(), + "--task-queue", taskQueue, + "--task-queue-type", "activity", + "-o", "json", + ) + s.NoError(res.Err) + + var config2 taskQueueConfigType + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &config2)) + s.Equal(float32(2.0), config2.FairnessWeightOverrides["HighPriority"]) + s.NotContains(config2.FairnessWeightOverrides, "LowPriority") + + // Clear all weights + res = s.Execute( + "task-queue", "config", "set", + "--address", s.Address(), + "--task-queue", taskQueue, + "--task-queue-type", "activity", + "--fairness-key-weight-clear-all", + ) + s.NoError(res.Err) + + // Verify all weights are gone + res = s.Execute( + "task-queue", "config", "get", + "--address", s.Address(), + "--task-queue", taskQueue, + "--task-queue-type", "activity", + "-o", "json", + ) + s.NoError(res.Err) + + var config3 taskQueueConfigType + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &config3)) + s.Empty(config3.FairnessWeightOverrides) +} diff --git a/internal/temporalcli/commands.yaml b/internal/temporalcli/commands.yaml index 4f7773c35..5bcd5fcd8 100644 --- a/internal/temporalcli/commands.yaml +++ b/internal/temporalcli/commands.yaml @@ -3491,7 +3491,9 @@ commands: --queue-rps-limit \ --queue-rps-limit-reason \ --fairness-key-rps-limit-default \ - --fairness-key-rps-limit-reason + --fairness-key-rps-limit-reason \ + --fairness-key-weight-set HighPriority=2.0 \ + --fairness-key-weight-set LowPriority=0.5 ``` This command supports updating: @@ -3501,8 +3503,12 @@ commands: - Fairness key rate limit defaults: Sets default rate limits for fairness keys. If set, each individual fairness key will be limited to this rate, scaled by the weight of the fairness key. + - Fairness key weight overrides: Set custom weights for specific fairness keys. + Weights control the relative share of capacity each key receives. To unset a rate limit, pass in 'default', for example: --queue-rps-limit default + To unset specific fairness weights, use --fairness-key-weight-unset + To unset all fairness weights, use --fairness-key-weight-unset-all options: - name: task-queue type: string @@ -3538,6 +3544,17 @@ commands: - name: fairness-key-rps-limit-reason type: string description: Reason for fairness key rate limit update. + - name: fairness-key-weight + type: string[] + description: | + Set or unset fairness key weight overrides in format key=weight or key=default. + Use key=weight to set a positive weight value; use key=default to unset. + Can be specified multiple times. Example: --fairness-key-weight HighPriority=2.0 --fairness-key-weight LowPriority=default. + - name: fairness-key-weight-clear-all + type: bool + description: | + Unset all fairness key weight overrides. + Cannot be used with --fairness-key-weight. - name: temporal workflow summary: Start, list, and operate on Workflows