Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions internal/temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -2537,6 +2537,8 @@ type TemporalTaskQueueConfigSetCommand struct {
QueueRpsLimitReason string
FairnessKeyRpsLimitDefault string
FairnessKeyRpsLimitReason string
FairnessKeyWeight []string
FairnessKeyWeightClearAll bool
}

func NewTemporalTaskQueueConfigSetCommand(cctx *CommandContext, parent *TemporalTaskQueueConfigCommand) *TemporalTaskQueueConfigSetCommand {
Expand All @@ -2546,9 +2548,9 @@ func NewTemporalTaskQueueConfigSetCommand(cctx *CommandContext, parent *Temporal
s.Command.Use = "set [flags]"
s.Command.Short = "Set Task Queue configuration"
if hasHighlighting {
s.Command.Long = "Update configuration settings for a Task Queue.\n\n\x1b[1mtemporal task-queue config set \\\n --task-queue YourTaskQueue \\\n --task-queue-type activity \\\n --namespace YourNamespace \\\n --queue-rps-limit <requests_per_second:float> \\\n --queue-rps-limit-reason <reason_string> \\\n --fairness-key-rps-limit-default <requests_per_second:float> \\\n --fairness-key-rps-limit-reason <reason_string>\x1b[0m\n\nThis command supports updating:\n- Queue rate limits: Controls the overall rate limit of the task queue.\n This setting overrides the worker rate limit if set.\n Unless modified, this is the system-defined rate limit.\n- Fairness key rate limit defaults: Sets default rate limits for fairness keys.\n If set, each individual fairness key will be limited to this rate,\n scaled by the weight of the fairness key.\n\nTo unset a rate limit, pass in 'default', for example: --queue-rps-limit default"
s.Command.Long = "Update configuration settings for a Task Queue.\n\n\x1b[1mtemporal task-queue config set \\\n --task-queue YourTaskQueue \\\n --task-queue-type activity \\\n --namespace YourNamespace \\\n --queue-rps-limit <requests_per_second:float> \\\n --queue-rps-limit-reason <reason_string> \\\n --fairness-key-rps-limit-default <requests_per_second:float> \\\n --fairness-key-rps-limit-reason <reason_string> \\\n --fairness-key-weight-set HighPriority=2.0 \\\n --fairness-key-weight-set LowPriority=0.5\x1b[0m\n\nThis command supports updating:\n- Queue rate limits: Controls the overall rate limit of the task queue.\n This setting overrides the worker rate limit if set.\n Unless modified, this is the system-defined rate limit.\n- Fairness key rate limit defaults: Sets default rate limits for fairness keys.\n If set, each individual fairness key will be limited to this rate,\n scaled by the weight of the fairness key.\n- Fairness key weight overrides: Set custom weights for specific fairness keys.\n Weights control the relative share of capacity each key receives.\n\nTo unset a rate limit, pass in 'default', for example: --queue-rps-limit default\nTo unset specific fairness weights, use --fairness-key-weight-unset <key>\nTo unset all fairness weights, use --fairness-key-weight-unset-all"
} else {
s.Command.Long = "Update configuration settings for a Task Queue.\n\n```\ntemporal task-queue config set \\\n --task-queue YourTaskQueue \\\n --task-queue-type activity \\\n --namespace YourNamespace \\\n --queue-rps-limit <requests_per_second:float> \\\n --queue-rps-limit-reason <reason_string> \\\n --fairness-key-rps-limit-default <requests_per_second:float> \\\n --fairness-key-rps-limit-reason <reason_string>\n```\n\nThis command supports updating:\n- Queue rate limits: Controls the overall rate limit of the task queue.\n This setting overrides the worker rate limit if set.\n Unless modified, this is the system-defined rate limit.\n- Fairness key rate limit defaults: Sets default rate limits for fairness keys.\n If set, each individual fairness key will be limited to this rate,\n scaled by the weight of the fairness key.\n\nTo unset a rate limit, pass in 'default', for example: --queue-rps-limit default"
s.Command.Long = "Update configuration settings for a Task Queue.\n\n```\ntemporal task-queue config set \\\n --task-queue YourTaskQueue \\\n --task-queue-type activity \\\n --namespace YourNamespace \\\n --queue-rps-limit <requests_per_second:float> \\\n --queue-rps-limit-reason <reason_string> \\\n --fairness-key-rps-limit-default <requests_per_second:float> \\\n --fairness-key-rps-limit-reason <reason_string> \\\n --fairness-key-weight-set HighPriority=2.0 \\\n --fairness-key-weight-set LowPriority=0.5\n```\n\nThis command supports updating:\n- Queue rate limits: Controls the overall rate limit of the task queue.\n This setting overrides the worker rate limit if set.\n Unless modified, this is the system-defined rate limit.\n- Fairness key rate limit defaults: Sets default rate limits for fairness keys.\n If set, each individual fairness key will be limited to this rate,\n scaled by the weight of the fairness key.\n- Fairness key weight overrides: Set custom weights for specific fairness keys.\n Weights control the relative share of capacity each key receives.\n\nTo unset a rate limit, pass in 'default', for example: --queue-rps-limit default\nTo unset specific fairness weights, use --fairness-key-weight-unset <key>\nTo unset all fairness weights, use --fairness-key-weight-unset-all"
}
s.Command.Args = cobra.NoArgs
s.Command.Flags().StringVarP(&s.TaskQueue, "task-queue", "t", "", "Task Queue name. Required.")
Expand All @@ -2562,6 +2564,8 @@ func NewTemporalTaskQueueConfigSetCommand(cctx *CommandContext, parent *Temporal
s.Command.Flags().StringVar(&s.FairnessKeyRpsLimitDefault, "fairness-key-rps-limit-default", "", "Fairness key rate limit default in requests per second. Accepts a float; or 'default' to unset.")
overrideFlagDisplayType(s.Command.Flags().Lookup("fairness-key-rps-limit-default"), "float|default")
s.Command.Flags().StringVar(&s.FairnessKeyRpsLimitReason, "fairness-key-rps-limit-reason", "", "Reason for fairness key rate limit update.")
s.Command.Flags().StringArrayVar(&s.FairnessKeyWeight, "fairness-key-weight", nil, "Set or unset fairness key weight overrides in format key=weight or key=default. Use key=weight to set a positive weight value; use key=default to unset. Can be specified multiple times. Example: --fairness-key-weight HighPriority=2.0 --fairness-key-weight LowPriority=default.")
s.Command.Flags().BoolVar(&s.FairnessKeyWeightClearAll, "fairness-key-weight-clear-all", false, "Unset all fairness key weight overrides. Cannot be used with --fairness-key-weight.")
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
Expand Down
192 changes: 175 additions & 17 deletions internal/temporalcli/commands.taskqueue_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ import (
enums "go.temporal.io/api/enums/v1"
"go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"
"golang.org/x/exp/maps"
)

// TaskQueueConfigGetCommand handles getting task queue configuration
func (c *TemporalTaskQueueConfigGetCommand) run(cctx *CommandContext, args []string) error {
// Validate inputs before dialing client
taskQueue := c.TaskQueue
taskQueue := strings.TrimSpace(c.TaskQueue)
if taskQueue == "" {
return fmt.Errorf("taskQueue name is required")
return fmt.Errorf("task queue name is required and cannot be empty")
}

taskQueueType, err := parseTaskQueueType(c.TaskQueueType.Value)
Expand Down Expand Up @@ -56,12 +57,99 @@ func (c *TemporalTaskQueueConfigGetCommand) run(cctx *CommandContext, args []str
return printTaskQueueConfig(cctx, resp.Config)
}

const (
// maxFairnessKeyLength matches server-side limit in temporal/common/priorities/priority_util.go
maxFairnessKeyLength = 64
)

// printZeroRateLimitWarning prints a warning when a rate limit is set to 0
func printZeroRateLimitWarning(p *printer.Printer, limitType string) {
p.Printlnf("WARNING: Setting %s to 0 will STOP ALL TRAFFIC on this task queue.", limitType)
p.Println(" This will prevent any tasks from being dispatched until the limit is changed.")
}

// parseFairnessKeyWeights parses "key=weight" or "key=default" format strings
// Returns separate maps for set and unset operations, or an error if there are duplicate keys, invalid weights, or malformed input
// If inputs is empty, returns nil for both maps
func parseFairnessKeyWeights(inputs []string) (setWeights map[string]float32, unsetKeys []string, err error) {
if len(inputs) == 0 {
return nil, nil, nil
}

setWeights = make(map[string]float32)
unsetKeysMap := make(map[string]bool) // Track unset keys in a map to check for duplicates
seen := make(map[string]bool)

for _, input := range inputs {
parts := strings.SplitN(input, "=", 2)
if len(parts) != 2 {
return nil, nil, fmt.Errorf("invalid format: %q (expected key=weight or key=default)", input)
}

key := parts[0]
if key == "" {
return nil, nil, fmt.Errorf("empty key in: %q", input)
}

// Check for duplicate keys across both set and unset
if seen[key] {
return nil, nil, fmt.Errorf("duplicate fairness key %q specified multiple times", key)
}
seen[key] = true

valueStr := parts[1]
if valueStr == "" {
return nil, nil, fmt.Errorf("empty value for key %q", key)
}

// Check if this is an unset operation (value is "default")
// Do this before validating key length since we don't care about length when unsetting
if strings.EqualFold(valueStr, "default") {
unsetKeysMap[key] = true
continue
}

// Validate key length only for set operations (server enforces 64 byte limit)
if len(key) > maxFairnessKeyLength {
return nil, nil, fmt.Errorf("fairness key %q exceeds maximum length of %d bytes", key, maxFairnessKeyLength)
}

// Parse as weight
weight, err := strconv.ParseFloat(valueStr, 32)
if err != nil {
return nil, nil, fmt.Errorf("invalid weight %q for key %q: must be a number or 'default'", valueStr, key)
}

// Validate weight is positive - server handles clamping to its configured range
if weight <= 0 {
return nil, nil, fmt.Errorf("weight for key %q must be positive", key)
}

setWeights[key] = float32(weight)
}

// Convert unset map to slice
if len(unsetKeysMap) > 0 {
unsetKeys = maps.Keys(unsetKeysMap)
}

// Return nil instead of empty maps/slices
if len(setWeights) == 0 {
setWeights = nil
}
if len(unsetKeys) == 0 {
unsetKeys = nil
}

return setWeights, unsetKeys, nil
}

// TaskQueueConfigSetCommand handles setting task queue configuration
func (c *TemporalTaskQueueConfigSetCommand) run(cctx *CommandContext, args []string) error {
// Validate inputs before dialing client
taskQueue := c.TaskQueue
taskQueue := strings.TrimSpace(c.TaskQueue)
if taskQueue == "" {
return fmt.Errorf("taskQueue name is required")
return fmt.Errorf("task queue name is required and cannot be empty")
}

taskQueueType, err := parseTaskQueueType(c.TaskQueueType.Value)
Expand All @@ -79,43 +167,57 @@ func (c *TemporalTaskQueueConfigSetCommand) run(cctx *CommandContext, args []str

// Helper to parse RPS values for a given flag name.
// Accepts "default" or a non-negative float string.
parseRPS := func(flagName string) (*taskqueue.RateLimit, error) {
// Returns (rateLimit, isZero, error)
parseRPS := func(flagName string) (*taskqueue.RateLimit, bool, error) {
raw := strings.TrimSpace(c.Command.Flags().Lookup(flagName).Value.String())
if raw == "" {
return nil, fmt.Errorf("invalid value for --%s: must be a non-negative number or 'default'", flagName)
return nil, false, fmt.Errorf("invalid value for --%s: must be a non-negative number or 'default'", flagName)
}
if strings.EqualFold(raw, "default") {
// Unset: returning nil RateLimit removes the existing rate limit.
return nil, nil
return nil, false, nil
}
v, err := strconv.ParseFloat(raw, 32)
if err != nil {
return nil, fmt.Errorf("invalid value for --%s: must be a non-negative number or 'default'", flagName)
return nil, false, fmt.Errorf("invalid value for --%s: must be a non-negative number or 'default'", flagName)
}
if v < 0 {
return nil, fmt.Errorf("invalid value for --%s: must be >= 0 or 'default'", flagName)
return nil, false, fmt.Errorf("invalid value for --%s: must be >= 0 or 'default'", flagName)
}
return &taskqueue.RateLimit{RequestsPerSecond: float32(v)}, nil
isZero := v == 0
return &taskqueue.RateLimit{RequestsPerSecond: float32(v)}, isZero, nil
}

// Parse and validate queue rate limit
var queueRpsLimitParsed *taskqueue.RateLimit
var queueRateLimitIsZero bool
if c.Command.Flags().Changed("queue-rps-limit") {
var err error
if queueRpsLimitParsed, err = parseRPS("queue-rps-limit"); err != nil {
queueRpsLimitParsed, queueRateLimitIsZero, err = parseRPS("queue-rps-limit")
if err != nil {
return err
}
} else if c.Command.Flags().Changed("queue-rps-limit-reason") {
return fmt.Errorf("queue-rps-limit-reason can only be set if queue-rps-limit is updated")

// Warn about zero rate limit (stops all traffic)
if queueRateLimitIsZero {
printZeroRateLimitWarning(cctx.Printer, "queue rate limit")
}
}

// Parse and validate fairness key rate limit default
var fairnessKeyRpsLimitDefaultParsed *taskqueue.RateLimit
var fairnessRateLimitIsZero bool
if c.Command.Flags().Changed("fairness-key-rps-limit-default") {
var err error
if fairnessKeyRpsLimitDefaultParsed, err = parseRPS("fairness-key-rps-limit-default"); err != nil {
fairnessKeyRpsLimitDefaultParsed, fairnessRateLimitIsZero, err = parseRPS("fairness-key-rps-limit-default")
if err != nil {
return err
}
} else if c.Command.Flags().Changed("fairness-key-rps-limit-default-reason") {
return fmt.Errorf("fairness-key-rps-limit-default-reason can only be set if fairness-key-rps-limit-default is updated")

// Warn about zero rate limit
if fairnessRateLimitIsZero {
printZeroRateLimitWarning(cctx.Printer, "fairness key rate limit default")
}
}

cl, err := dialClient(cctx, &c.Parent.Parent.ClientOptions)
Expand Down Expand Up @@ -152,10 +254,66 @@ func (c *TemporalTaskQueueConfigSetCommand) run(cctx *CommandContext, args []str
}
}

// Validate at least one configuration change is requested
hasAnyUpdate := c.Command.Flags().Changed("queue-rps-limit") ||
c.Command.Flags().Changed("fairness-key-rps-limit-default") ||
len(c.FairnessKeyWeight) > 0 ||
c.FairnessKeyWeightClearAll

if !hasAnyUpdate {
return fmt.Errorf("at least one configuration update must be specified (use --help to see available options)")
}

// Handle fairness weight overrides
// Validate mutual exclusivity of clear-all with other weight operations
if c.FairnessKeyWeightClearAll {
if len(c.FairnessKeyWeight) > 0 {
return fmt.Errorf("--fairness-key-weight-clear-all cannot be used with --fairness-key-weight")
}
}

// Parse fairness key weights (handles both set and unset operations)
setWeights, unsetKeys, err := parseFairnessKeyWeights(c.FairnessKeyWeight)
if err != nil {
return err
}
request.SetFairnessWeightOverrides = setWeights
request.UnsetFairnessWeightOverrides = unsetKeys

// Handle clear all
if c.FairnessKeyWeightClearAll {
// Need to fetch current config to get all keys to unset
Comment thread
dnr marked this conversation as resolved.
descResp, err := cl.WorkflowService().DescribeTaskQueue(cctx, &workflowservice.DescribeTaskQueueRequest{
Namespace: namespace,
TaskQueue: &taskqueue.TaskQueue{
Name: taskQueue,
Kind: enums.TASK_QUEUE_KIND_NORMAL,
},
TaskQueueType: taskQueueType,
ReportConfig: true,
})
if err != nil {
return fmt.Errorf("error fetching current config for clear-all: %w", err)
}
var overrides map[string]float32
if descResp.Config != nil {
overrides = descResp.Config.FairnessWeightOverrides
}
keys := maps.Keys(overrides)
if len(keys) > 0 {
request.UnsetFairnessWeightOverrides = keys
cctx.Printer.Printlnf("Unsetting %d fairness weight override(s)", len(keys))
} else {
cctx.Printer.Println("No fairness weight overrides found to unset")
// Don't return error, just proceed with no-op update
}
}

// Call the API
resp, err := cl.WorkflowService().UpdateTaskQueueConfig(cctx, request)
if err != nil {
return fmt.Errorf("error updating task queue config: %w", err)
// Provide more context in error message
return fmt.Errorf("failed to update task queue config for %s/%s: %w", namespace, taskQueue, err)
}

cctx.Printer.Println("Successfully updated task queue configuration")
Expand Down
Loading
Loading