Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions internal/temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -2227,6 +2227,9 @@ type TemporalTaskQueueConfigSetCommand struct {
QueueRpsLimitReason string
FairnessKeyRpsLimitDefault string
FairnessKeyRpsLimitReason string
FairnessKeyWeightSet []string
FairnessKeyWeightUnset []string
FairnessKeyWeightUnsetAll bool
}

func NewTemporalTaskQueueConfigSetCommand(cctx *CommandContext, parent *TemporalTaskQueueConfigCommand) *TemporalTaskQueueConfigSetCommand {
Expand All @@ -2236,9 +2239,9 @@ func NewTemporalTaskQueueConfigSetCommand(cctx *CommandContext, parent *Temporal
s.Command.Use = "set [flags]"
s.Command.Short = "Set Task Queue configuration"
if hasHighlighting {
s.Command.Long = "Update configuration settings for a Task Queue.\n\n\x1b[1mtemporal task-queue config set \\\n --task-queue YourTaskQueue \\\n --task-queue-type activity \\\n --namespace YourNamespace \\\n --queue-rps-limit <requests_per_second:float> \\\n --queue-rps-limit-reason <reason_string> \\\n --fairness-key-rps-limit-default <requests_per_second:float> \\\n --fairness-key-rps-limit-reason <reason_string>\x1b[0m\n\nThis command supports updating:\n- Queue rate limits: Controls the overall rate limit of the task queue.\n This setting overrides the worker rate limit if set.\n Unless modified, this is the system-defined rate limit.\n- Fairness key rate limit defaults: Sets default rate limits for fairness keys.\n If set, each individual fairness key will be limited to this rate,\n scaled by the weight of the fairness key.\n\nTo unset a rate limit, pass in 'default', for example: --queue-rps-limit default"
s.Command.Long = "Update configuration settings for a Task Queue.\n\n\x1b[1mtemporal task-queue config set \\\n --task-queue YourTaskQueue \\\n --task-queue-type activity \\\n --namespace YourNamespace \\\n --queue-rps-limit <requests_per_second:float> \\\n --queue-rps-limit-reason <reason_string> \\\n --fairness-key-rps-limit-default <requests_per_second:float> \\\n --fairness-key-rps-limit-reason <reason_string> \\\n --fairness-key-weight-set HighPriority=2.0 \\\n --fairness-key-weight-set LowPriority=0.5\x1b[0m\n\nThis command supports updating:\n- Queue rate limits: Controls the overall rate limit of the task queue.\n This setting overrides the worker rate limit if set.\n Unless modified, this is the system-defined rate limit.\n- Fairness key rate limit defaults: Sets default rate limits for fairness keys.\n If set, each individual fairness key will be limited to this rate,\n scaled by the weight of the fairness key.\n- Fairness key weight overrides: Set custom weights for specific fairness keys.\n Weights control the relative share of capacity each key receives.\n\nTo unset a rate limit, pass in 'default', for example: --queue-rps-limit default\nTo unset specific fairness weights, use --fairness-key-weight-unset <key>\nTo unset all fairness weights, use --fairness-key-weight-unset-all"
} else {
s.Command.Long = "Update configuration settings for a Task Queue.\n\n```\ntemporal task-queue config set \\\n --task-queue YourTaskQueue \\\n --task-queue-type activity \\\n --namespace YourNamespace \\\n --queue-rps-limit <requests_per_second:float> \\\n --queue-rps-limit-reason <reason_string> \\\n --fairness-key-rps-limit-default <requests_per_second:float> \\\n --fairness-key-rps-limit-reason <reason_string>\n```\n\nThis command supports updating:\n- Queue rate limits: Controls the overall rate limit of the task queue.\n This setting overrides the worker rate limit if set.\n Unless modified, this is the system-defined rate limit.\n- Fairness key rate limit defaults: Sets default rate limits for fairness keys.\n If set, each individual fairness key will be limited to this rate,\n scaled by the weight of the fairness key.\n\nTo unset a rate limit, pass in 'default', for example: --queue-rps-limit default"
s.Command.Long = "Update configuration settings for a Task Queue.\n\n```\ntemporal task-queue config set \\\n --task-queue YourTaskQueue \\\n --task-queue-type activity \\\n --namespace YourNamespace \\\n --queue-rps-limit <requests_per_second:float> \\\n --queue-rps-limit-reason <reason_string> \\\n --fairness-key-rps-limit-default <requests_per_second:float> \\\n --fairness-key-rps-limit-reason <reason_string> \\\n --fairness-key-weight-set HighPriority=2.0 \\\n --fairness-key-weight-set LowPriority=0.5\n```\n\nThis command supports updating:\n- Queue rate limits: Controls the overall rate limit of the task queue.\n This setting overrides the worker rate limit if set.\n Unless modified, this is the system-defined rate limit.\n- Fairness key rate limit defaults: Sets default rate limits for fairness keys.\n If set, each individual fairness key will be limited to this rate,\n scaled by the weight of the fairness key.\n- Fairness key weight overrides: Set custom weights for specific fairness keys.\n Weights control the relative share of capacity each key receives.\n\nTo unset a rate limit, pass in 'default', for example: --queue-rps-limit default\nTo unset specific fairness weights, use --fairness-key-weight-unset <key>\nTo unset all fairness weights, use --fairness-key-weight-unset-all"
}
s.Command.Args = cobra.NoArgs
s.Command.Flags().StringVarP(&s.TaskQueue, "task-queue", "t", "", "Task Queue name. Required.")
Expand All @@ -2252,6 +2255,9 @@ func NewTemporalTaskQueueConfigSetCommand(cctx *CommandContext, parent *Temporal
s.Command.Flags().StringVar(&s.FairnessKeyRpsLimitDefault, "fairness-key-rps-limit-default", "", "Fairness key rate limit default in requests per second. Accepts a float; or 'default' to unset.")
overrideFlagDisplayType(s.Command.Flags().Lookup("fairness-key-rps-limit-default"), "float|default")
s.Command.Flags().StringVar(&s.FairnessKeyRpsLimitReason, "fairness-key-rps-limit-reason", "", "Reason for fairness key rate limit update.")
s.Command.Flags().StringArrayVar(&s.FairnessKeyWeightSet, "fairness-key-weight-set", nil, "Set fairness key weight overrides in format key=weight (range: 0.001-1000). Can be specified multiple times. Example: --fairness-key-weight-set HighPriority=2.0 --fairness-key-weight-set LowPriority=0.5.")
s.Command.Flags().StringArrayVar(&s.FairnessKeyWeightUnset, "fairness-key-weight-unset", nil, "Unset specific fairness key weight overrides. Can be specified multiple times. Example: --fairness-key-weight-unset HighPriority --fairness-key-weight-unset LowPriority.")
s.Command.Flags().BoolVar(&s.FairnessKeyWeightUnsetAll, "fairness-key-weight-unset-all", false, "Unset all fairness key weight overrides. Cannot be used with --fairness-key-weight-set or --fairness-key-weight-unset.")
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
Expand Down
230 changes: 213 additions & 17 deletions internal/temporalcli/commands.taskqueue_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (
// TaskQueueConfigGetCommand handles getting task queue configuration
func (c *TemporalTaskQueueConfigGetCommand) run(cctx *CommandContext, args []string) error {
// Validate inputs before dialing client
taskQueue := c.TaskQueue
taskQueue := strings.TrimSpace(c.TaskQueue)
if taskQueue == "" {
return fmt.Errorf("taskQueue name is required")
return fmt.Errorf("task queue name is required and cannot be empty")
}

taskQueueType, err := parseTaskQueueType(c.TaskQueueType.Value)
Expand Down Expand Up @@ -56,12 +56,121 @@ func (c *TemporalTaskQueueConfigGetCommand) run(cctx *CommandContext, args []str
return printTaskQueueConfig(cctx, resp.Config)
}

const (
// maxFairnessKeyLength matches server-side limit in temporal/common/priorities/priority_util.go
maxFairnessKeyLength = 64

// minFairnessWeight matches server-side limit in temporal/service/matching/fairness_util.go
// Server clamps weights to this minimum when applying them
minFairnessWeight = 0.001

// maxFairnessWeight is the maximum weight value allowed
// Server clamps weights above this value down to this maximum
maxFairnessWeight = 1000.0
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should not enforce these at all. clients do not need to know the effective range that the server is currently configured with, only that the the weight must be positive.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I've removed these enforcements. Do you feel the same way about the fairness key limit?

)

// parseFairnessKeyWeights parses "key=weight" format strings into a map
// Returns nil if inputs is empty, or an error if there are duplicate keys, invalid weights, or malformed input
func parseFairnessKeyWeights(inputs []string) (map[string]float32, error) {
if len(inputs) == 0 {
return nil, nil
}

weights := make(map[string]float32)
seen := make(map[string]bool)

for _, input := range inputs {
parts := strings.SplitN(input, "=", 2)
if len(parts) != 2 {
return nil, fmt.Errorf("invalid format: %q (expected key=weight)", input)
}

key := strings.TrimSpace(parts[0])
Comment thread
majiru marked this conversation as resolved.
Outdated
if key == "" {
return nil, fmt.Errorf("empty key in: %q", input)
}

// Check for duplicate keys
if seen[key] {
return nil, fmt.Errorf("duplicate fairness key %q specified multiple times", key)
}
seen[key] = true

// Validate key length (server enforces 64 byte limit)
if len(key) > maxFairnessKeyLength {
return nil, fmt.Errorf("fairness key %q exceeds maximum length of %d bytes", key, maxFairnessKeyLength)
}

weightStr := strings.TrimSpace(parts[1])
if weightStr == "" {
return nil, fmt.Errorf("empty weight value for key %q", key)
}

weight, err := strconv.ParseFloat(weightStr, 32)
if err != nil {
return nil, fmt.Errorf("invalid weight %q for key %q: must be a number", weightStr, key)
}

// Validate weight bounds - matches server-side validation
// Server clamps weights to [minWeight, maxWeight] range at runtime
if weight < minFairnessWeight {
return nil, fmt.Errorf("weight %.3f for key %q is below minimum %.3f", weight, key, minFairnessWeight)
}
if weight > maxFairnessWeight {
return nil, fmt.Errorf("weight %.3f for key %q exceeds maximum %.3f", weight, key, maxFairnessWeight)
}

weights[key] = float32(weight)
}
return weights, nil
}

// prepareUnsetKeys validates and trims fairness key names for unsetting
// Returns nil if keys is empty, trimmed keys if valid, or an error if validation fails
func prepareUnsetKeys(keys []string) ([]string, error) {
if len(keys) == 0 {
return nil, nil
}

trimmed := make([]string, len(keys))
seen := make(map[string]bool)

for i, key := range keys {
trimmedKey := strings.TrimSpace(key)
if trimmedKey == "" {
return nil, fmt.Errorf("empty fairness key name")
}
if seen[trimmedKey] {
return nil, fmt.Errorf("duplicate fairness key %q specified multiple times", trimmedKey)
}
seen[trimmedKey] = true

if len(trimmedKey) > maxFairnessKeyLength {
return nil, fmt.Errorf("fairness key %q exceeds maximum length of %d bytes", trimmedKey, maxFairnessKeyLength)
}

trimmed[i] = trimmedKey
}
return trimmed, nil
}

// findConflictingKeys returns keys that appear in both set and unset lists
func findConflictingKeys(setWeights map[string]float32, unsetKeys []string) []string {
conflicts := []string{}
Comment thread
majiru marked this conversation as resolved.
Outdated
for _, key := range unsetKeys {
if _, exists := setWeights[key]; exists {
conflicts = append(conflicts, key)
}
}
return conflicts
}

// TaskQueueConfigSetCommand handles setting task queue configuration
func (c *TemporalTaskQueueConfigSetCommand) run(cctx *CommandContext, args []string) error {
// Validate inputs before dialing client
taskQueue := c.TaskQueue
taskQueue := strings.TrimSpace(c.TaskQueue)
if taskQueue == "" {
return fmt.Errorf("taskQueue name is required")
return fmt.Errorf("task queue name is required and cannot be empty")
}

taskQueueType, err := parseTaskQueueType(c.TaskQueueType.Value)
Expand All @@ -79,43 +188,59 @@ func (c *TemporalTaskQueueConfigSetCommand) run(cctx *CommandContext, args []str

// Helper to parse RPS values for a given flag name.
// Accepts "default" or a non-negative float string.
parseRPS := func(flagName string) (*taskqueue.RateLimit, error) {
// Returns (rateLimit, isZero, error)
parseRPS := func(flagName string) (*taskqueue.RateLimit, bool, error) {
raw := strings.TrimSpace(c.Command.Flags().Lookup(flagName).Value.String())
if raw == "" {
return nil, fmt.Errorf("invalid value for --%s: must be a non-negative number or 'default'", flagName)
return nil, false, fmt.Errorf("invalid value for --%s: must be a non-negative number or 'default'", flagName)
}
if strings.EqualFold(raw, "default") {
// Unset: returning nil RateLimit removes the existing rate limit.
return nil, nil
return nil, false, nil
}
v, err := strconv.ParseFloat(raw, 32)
if err != nil {
return nil, fmt.Errorf("invalid value for --%s: must be a non-negative number or 'default'", flagName)
return nil, false, fmt.Errorf("invalid value for --%s: must be a non-negative number or 'default'", flagName)
}
if v < 0 {
return nil, fmt.Errorf("invalid value for --%s: must be >= 0 or 'default'", flagName)
return nil, false, fmt.Errorf("invalid value for --%s: must be >= 0 or 'default'", flagName)
}
return &taskqueue.RateLimit{RequestsPerSecond: float32(v)}, nil
isZero := v == 0
return &taskqueue.RateLimit{RequestsPerSecond: float32(v)}, isZero, nil
}

// Parse and validate queue rate limit
var queueRpsLimitParsed *taskqueue.RateLimit
var queueRateLimitIsZero bool
if c.Command.Flags().Changed("queue-rps-limit") {
var err error
if queueRpsLimitParsed, err = parseRPS("queue-rps-limit"); err != nil {
queueRpsLimitParsed, queueRateLimitIsZero, err = parseRPS("queue-rps-limit")
if err != nil {
return err
}
} else if c.Command.Flags().Changed("queue-rps-limit-reason") {
return fmt.Errorf("queue-rps-limit-reason can only be set if queue-rps-limit is updated")

// Warn about zero rate limit (stops all traffic)
if queueRateLimitIsZero {
cctx.Printer.Println("WARNING: Setting queue rate limit to 0 will STOP ALL TRAFFIC on this task queue.")
cctx.Printer.Println(" This will prevent any tasks from being dispatched until the limit is changed.")
}
}

// Parse and validate fairness key rate limit default
var fairnessKeyRpsLimitDefaultParsed *taskqueue.RateLimit
var fairnessRateLimitIsZero bool
if c.Command.Flags().Changed("fairness-key-rps-limit-default") {
var err error
if fairnessKeyRpsLimitDefaultParsed, err = parseRPS("fairness-key-rps-limit-default"); err != nil {
fairnessKeyRpsLimitDefaultParsed, fairnessRateLimitIsZero, err = parseRPS("fairness-key-rps-limit-default")
if err != nil {
return err
}
} else if c.Command.Flags().Changed("fairness-key-rps-limit-default-reason") {
return fmt.Errorf("fairness-key-rps-limit-default-reason can only be set if fairness-key-rps-limit-default is updated")

// Warn about zero rate limit
if fairnessRateLimitIsZero {
cctx.Printer.Println("WARNING: Setting fairness key rate limit default to 0 will STOP ALL TRAFFIC for fairness keys.")
cctx.Printer.Println(" This will prevent tasks with fairness keys from being dispatched until the limit is changed.")
Comment thread
majiru marked this conversation as resolved.
Outdated
}
}

cl, err := dialClient(cctx, &c.Parent.Parent.ClientOptions)
Expand Down Expand Up @@ -152,10 +277,81 @@ func (c *TemporalTaskQueueConfigSetCommand) run(cctx *CommandContext, args []str
}
}

// Validate at least one configuration change is requested
hasAnyUpdate := c.Command.Flags().Changed("queue-rps-limit") ||
c.Command.Flags().Changed("fairness-key-rps-limit-default") ||
len(c.FairnessKeyWeightSet) > 0 ||
len(c.FairnessKeyWeightUnset) > 0 ||
c.FairnessKeyWeightUnsetAll

if !hasAnyUpdate {
return fmt.Errorf("at least one configuration update must be specified (use --help to see available options)")
}

// Handle fairness weight overrides
// Validate mutual exclusivity of unset-all with other weight operations
if c.FairnessKeyWeightUnsetAll {
if len(c.FairnessKeyWeightSet) > 0 || len(c.FairnessKeyWeightUnset) > 0 {
return fmt.Errorf("--fairness-key-weight-unset-all cannot be used with --fairness-key-weight-set or --fairness-key-weight-unset")
}
}

// Parse and validate set operations
setWeights, err := parseFairnessKeyWeights(c.FairnessKeyWeightSet)
if err != nil {
return err
}
request.SetFairnessWeightOverrides = setWeights

// Validate and prepare unset operations
trimmedUnsetKeys, err := prepareUnsetKeys(c.FairnessKeyWeightUnset)
if err != nil {
return fmt.Errorf("invalid fairness key in unset list: %w", err)
}

// Check for conflicts between set and unset
if setWeights != nil && trimmedUnsetKeys != nil {
conflicts := findConflictingKeys(setWeights, trimmedUnsetKeys)
if len(conflicts) > 0 {
return fmt.Errorf("fairness keys appear in both set and unset operations: %v", conflicts)
}
}
Comment thread
majiru marked this conversation as resolved.
Outdated

request.UnsetFairnessWeightOverrides = trimmedUnsetKeys

// Handle unset all
if c.FairnessKeyWeightUnsetAll {
// Need to fetch current config to get all keys to unset
Comment thread
dnr marked this conversation as resolved.
descResp, err := cl.WorkflowService().DescribeTaskQueue(cctx, &workflowservice.DescribeTaskQueueRequest{
Namespace: namespace,
TaskQueue: &taskqueue.TaskQueue{
Name: taskQueue,
Kind: enums.TASK_QUEUE_KIND_NORMAL,
},
TaskQueueType: taskQueueType,
ReportConfig: true,
})
if err != nil {
return fmt.Errorf("error fetching current config for unset-all: %w", err)
}
if descResp.Config != nil && descResp.Config.FairnessWeightOverrides != nil && len(descResp.Config.FairnessWeightOverrides) > 0 {
keys := make([]string, 0, len(descResp.Config.FairnessWeightOverrides))
for key := range descResp.Config.FairnessWeightOverrides {
keys = append(keys, key)
}
request.UnsetFairnessWeightOverrides = keys
Comment thread
majiru marked this conversation as resolved.
Outdated
cctx.Printer.Println(fmt.Sprintf("Unsetting %d fairness weight override(s)", len(keys)))
} else {
cctx.Printer.Println("No fairness weight overrides found to unset")
// Don't return error, just proceed with no-op update
}
}

// Call the API
resp, err := cl.WorkflowService().UpdateTaskQueueConfig(cctx, request)
if err != nil {
return fmt.Errorf("error updating task queue config: %w", err)
// Provide more context in error message
return fmt.Errorf("failed to update task queue config for %s/%s: %w", namespace, taskQueue, err)
}

cctx.Printer.Println("Successfully updated task queue configuration")
Expand Down
Loading
Loading