Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions internal/temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (v *SharedWorkflowStartOptions) BuildFlags(f *pflag.FlagSet) {
f.StringVar(&v.StaticDetails, "static-details", "", "Static Workflow details for human consumption in UIs. Uses Temporal Markdown formatting, may be multiple lines. EXPERIMENTAL.")
f.IntVar(&v.PriorityKey, "priority-key", 0, "Priority key (1-5, lower numbers = higher priority). Tasks in a queue should be processed in close-to-priority-order. Default is 3 when not specified.")
f.StringVar(&v.FairnessKey, "fairness-key", "", "Fairness key (max 64 bytes) for proportional task dispatch. Tasks with same key share capacity based on their weight.")
f.Float32Var(&v.FairnessWeight, "fairness-weight", 0, "Weight [0.001-1000] for this fairness key. Keys are dispatched proportionally to their weights.")
f.Float32Var(&v.FairnessWeight, "fairness-weight", 0, "Weight (minimum 0.001) for this fairness key. Keys are dispatched proportionally to their weights.")
}

type WorkflowStartOptions struct {
Expand Down Expand Up @@ -2227,6 +2227,9 @@ type TemporalTaskQueueConfigSetCommand struct {
QueueRpsLimitReason string
FairnessKeyRpsLimitDefault string
FairnessKeyRpsLimitReason string
FairnessKeyWeightSet []string
FairnessKeyWeightUnset []string
FairnessKeyWeightUnsetAll bool
}

func NewTemporalTaskQueueConfigSetCommand(cctx *CommandContext, parent *TemporalTaskQueueConfigCommand) *TemporalTaskQueueConfigSetCommand {
Expand All @@ -2236,9 +2239,9 @@ func NewTemporalTaskQueueConfigSetCommand(cctx *CommandContext, parent *Temporal
s.Command.Use = "set [flags]"
s.Command.Short = "Set Task Queue configuration"
if hasHighlighting {
s.Command.Long = "Update configuration settings for a Task Queue.\n\n\x1b[1mtemporal task-queue config set \\\n --task-queue YourTaskQueue \\\n --task-queue-type activity \\\n --namespace YourNamespace \\\n --queue-rps-limit <requests_per_second:float> \\\n --queue-rps-limit-reason <reason_string> \\\n --fairness-key-rps-limit-default <requests_per_second:float> \\\n --fairness-key-rps-limit-reason <reason_string>\x1b[0m\n\nThis command supports updating:\n- Queue rate limits: Controls the overall rate limit of the task queue.\n This setting overrides the worker rate limit if set.\n Unless modified, this is the system-defined rate limit.\n- Fairness key rate limit defaults: Sets default rate limits for fairness keys.\n If set, each individual fairness key will be limited to this rate,\n scaled by the weight of the fairness key.\n\nTo unset a rate limit, pass in 'default', for example: --queue-rps-limit default"
s.Command.Long = "Update configuration settings for a Task Queue.\n\n\x1b[1mtemporal task-queue config set \\\n --task-queue YourTaskQueue \\\n --task-queue-type activity \\\n --namespace YourNamespace \\\n --queue-rps-limit <requests_per_second:float> \\\n --queue-rps-limit-reason <reason_string> \\\n --fairness-key-rps-limit-default <requests_per_second:float> \\\n --fairness-key-rps-limit-reason <reason_string> \\\n --fairness-key-weight-set HighPriority=2.0 \\\n --fairness-key-weight-set LowPriority=0.5\x1b[0m\n\nThis command supports updating:\n- Queue rate limits: Controls the overall rate limit of the task queue.\n This setting overrides the worker rate limit if set.\n Unless modified, this is the system-defined rate limit.\n- Fairness key rate limit defaults: Sets default rate limits for fairness keys.\n If set, each individual fairness key will be limited to this rate,\n scaled by the weight of the fairness key.\n- Fairness key weight overrides: Set custom weights for specific fairness keys.\n Weights control the relative share of capacity each key receives.\n\nTo unset a rate limit, pass in 'default', for example: --queue-rps-limit default\nTo unset specific fairness weights, use --fairness-key-weight-unset <key>\nTo unset all fairness weights, use --fairness-key-weight-unset-all"
} else {
s.Command.Long = "Update configuration settings for a Task Queue.\n\n```\ntemporal task-queue config set \\\n --task-queue YourTaskQueue \\\n --task-queue-type activity \\\n --namespace YourNamespace \\\n --queue-rps-limit <requests_per_second:float> \\\n --queue-rps-limit-reason <reason_string> \\\n --fairness-key-rps-limit-default <requests_per_second:float> \\\n --fairness-key-rps-limit-reason <reason_string>\n```\n\nThis command supports updating:\n- Queue rate limits: Controls the overall rate limit of the task queue.\n This setting overrides the worker rate limit if set.\n Unless modified, this is the system-defined rate limit.\n- Fairness key rate limit defaults: Sets default rate limits for fairness keys.\n If set, each individual fairness key will be limited to this rate,\n scaled by the weight of the fairness key.\n\nTo unset a rate limit, pass in 'default', for example: --queue-rps-limit default"
s.Command.Long = "Update configuration settings for a Task Queue.\n\n```\ntemporal task-queue config set \\\n --task-queue YourTaskQueue \\\n --task-queue-type activity \\\n --namespace YourNamespace \\\n --queue-rps-limit <requests_per_second:float> \\\n --queue-rps-limit-reason <reason_string> \\\n --fairness-key-rps-limit-default <requests_per_second:float> \\\n --fairness-key-rps-limit-reason <reason_string> \\\n --fairness-key-weight-set HighPriority=2.0 \\\n --fairness-key-weight-set LowPriority=0.5\n```\n\nThis command supports updating:\n- Queue rate limits: Controls the overall rate limit of the task queue.\n This setting overrides the worker rate limit if set.\n Unless modified, this is the system-defined rate limit.\n- Fairness key rate limit defaults: Sets default rate limits for fairness keys.\n If set, each individual fairness key will be limited to this rate,\n scaled by the weight of the fairness key.\n- Fairness key weight overrides: Set custom weights for specific fairness keys.\n Weights control the relative share of capacity each key receives.\n\nTo unset a rate limit, pass in 'default', for example: --queue-rps-limit default\nTo unset specific fairness weights, use --fairness-key-weight-unset <key>\nTo unset all fairness weights, use --fairness-key-weight-unset-all"
}
s.Command.Args = cobra.NoArgs
s.Command.Flags().StringVarP(&s.TaskQueue, "task-queue", "t", "", "Task Queue name. Required.")
Expand All @@ -2252,6 +2255,9 @@ func NewTemporalTaskQueueConfigSetCommand(cctx *CommandContext, parent *Temporal
s.Command.Flags().StringVar(&s.FairnessKeyRpsLimitDefault, "fairness-key-rps-limit-default", "", "Fairness key rate limit default in requests per second. Accepts a float; or 'default' to unset.")
overrideFlagDisplayType(s.Command.Flags().Lookup("fairness-key-rps-limit-default"), "float|default")
s.Command.Flags().StringVar(&s.FairnessKeyRpsLimitReason, "fairness-key-rps-limit-reason", "", "Reason for fairness key rate limit update.")
s.Command.Flags().StringArrayVar(&s.FairnessKeyWeightSet, "fairness-key-weight-set", nil, "Set fairness key weight overrides in format key=weight. Can be specified multiple times. Example: --fairness-key-weight-set HighPriority=2.0 --fairness-key-weight-set LowPriority=0.5.")
s.Command.Flags().StringArrayVar(&s.FairnessKeyWeightUnset, "fairness-key-weight-unset", nil, "Unset specific fairness key weight overrides. Can be specified multiple times. Example: --fairness-key-weight-unset HighPriority --fairness-key-weight-unset LowPriority.")
s.Command.Flags().BoolVar(&s.FairnessKeyWeightUnsetAll, "fairness-key-weight-unset-all", false, "Unset all fairness key weight overrides. Cannot be used with --fairness-key-weight-set or --fairness-key-weight-unset.")
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
Expand Down
227 changes: 211 additions & 16 deletions internal/temporalcli/commands.taskqueue_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (
// TaskQueueConfigGetCommand handles getting task queue configuration
func (c *TemporalTaskQueueConfigGetCommand) run(cctx *CommandContext, args []string) error {
// Validate inputs before dialing client
taskQueue := c.TaskQueue
taskQueue := strings.TrimSpace(c.TaskQueue)
if taskQueue == "" {
return fmt.Errorf("taskQueue name is required")
return fmt.Errorf("task queue name is required and cannot be empty")
}

taskQueueType, err := parseTaskQueueType(c.TaskQueueType.Value)
Expand Down Expand Up @@ -56,12 +56,101 @@ func (c *TemporalTaskQueueConfigGetCommand) run(cctx *CommandContext, args []str
return printTaskQueueConfig(cctx, resp.Config)
}

const (
// maxFairnessKeyLength matches server-side limit in temporal/common/priorities/priority_util.go
maxFairnessKeyLength = 64

// minFairnessWeight matches server-side limit in temporal/service/matching/fairness_util.go
// Server clamps weights to this minimum when applying them
minFairnessWeight = 0.001
)

// parseFairnessKeyWeights parses "key=weight" format strings into a map
// Returns an error if there are duplicate keys, invalid weights, or malformed input
func parseFairnessKeyWeights(inputs []string) (map[string]float32, error) {
weights := make(map[string]float32)
seen := make(map[string]bool)

for _, input := range inputs {
parts := strings.SplitN(input, "=", 2)
if len(parts) != 2 {
return nil, fmt.Errorf("invalid format: %q (expected key=weight)", input)
}

key := strings.TrimSpace(parts[0])
Comment thread
majiru marked this conversation as resolved.
Outdated
if key == "" {
return nil, fmt.Errorf("empty key in: %q", input)
}

// Check for duplicate keys
if seen[key] {
return nil, fmt.Errorf("duplicate fairness key %q specified multiple times", key)
}
seen[key] = true

// Validate key length (server enforces 64 byte limit)
if len(key) > maxFairnessKeyLength {
return nil, fmt.Errorf("fairness key %q exceeds maximum length of %d bytes", key, maxFairnessKeyLength)
}

weightStr := strings.TrimSpace(parts[1])
if weightStr == "" {
return nil, fmt.Errorf("empty weight value for key %q", key)
}

weight, err := strconv.ParseFloat(weightStr, 32)
if err != nil {
return nil, fmt.Errorf("invalid weight %q for key %q: must be a number", weightStr, key)
}

// Validate weight bounds - matches server-side validation
// Server only validates weight > 0 and clamps to minWeight at runtime
if weight < minFairnessWeight {
return nil, fmt.Errorf("weight %.3f for key %q is below minimum %.3f", weight, key, minFairnessWeight)
}

weights[key] = float32(weight)
}
return weights, nil
}

// validateFairnessKeyNames validates that key names are non-empty and within length limits
func validateFairnessKeyNames(keys []string) error {
seen := make(map[string]bool)
for _, key := range keys {
trimmed := strings.TrimSpace(key)
if trimmed == "" {
return fmt.Errorf("empty fairness key name")
}
if seen[trimmed] {
return fmt.Errorf("duplicate fairness key %q specified multiple times", trimmed)
}
seen[trimmed] = true

if len(trimmed) > maxFairnessKeyLength {
return fmt.Errorf("fairness key %q exceeds maximum length of %d bytes", trimmed, maxFairnessKeyLength)
}
}
return nil
}

// findConflictingKeys returns keys that appear in both set and unset lists
func findConflictingKeys(setWeights map[string]float32, unsetKeys []string) []string {
conflicts := []string{}
Comment thread
majiru marked this conversation as resolved.
Outdated
for _, key := range unsetKeys {
if _, exists := setWeights[key]; exists {
conflicts = append(conflicts, key)
}
}
return conflicts
}

// TaskQueueConfigSetCommand handles setting task queue configuration
func (c *TemporalTaskQueueConfigSetCommand) run(cctx *CommandContext, args []string) error {
// Validate inputs before dialing client
taskQueue := c.TaskQueue
taskQueue := strings.TrimSpace(c.TaskQueue)
if taskQueue == "" {
return fmt.Errorf("taskQueue name is required")
return fmt.Errorf("task queue name is required and cannot be empty")
}

taskQueueType, err := parseTaskQueueType(c.TaskQueueType.Value)
Expand All @@ -79,43 +168,73 @@ func (c *TemporalTaskQueueConfigSetCommand) run(cctx *CommandContext, args []str

// Helper to parse RPS values for a given flag name.
// Accepts "default" or a non-negative float string.
parseRPS := func(flagName string) (*taskqueue.RateLimit, error) {
// Returns (rateLimit, isZero, error)
parseRPS := func(flagName string) (*taskqueue.RateLimit, bool, error) {
raw := strings.TrimSpace(c.Command.Flags().Lookup(flagName).Value.String())
if raw == "" {
return nil, fmt.Errorf("invalid value for --%s: must be a non-negative number or 'default'", flagName)
return nil, false, fmt.Errorf("invalid value for --%s: must be a non-negative number or 'default'", flagName)
}
if strings.EqualFold(raw, "default") {
// Unset: returning nil RateLimit removes the existing rate limit.
return nil, nil
return nil, false, nil
}
v, err := strconv.ParseFloat(raw, 32)
if err != nil {
return nil, fmt.Errorf("invalid value for --%s: must be a non-negative number or 'default'", flagName)
return nil, false, fmt.Errorf("invalid value for --%s: must be a non-negative number or 'default'", flagName)
}
if v < 0 {
return nil, fmt.Errorf("invalid value for --%s: must be >= 0 or 'default'", flagName)
return nil, false, fmt.Errorf("invalid value for --%s: must be >= 0 or 'default'", flagName)
}
return &taskqueue.RateLimit{RequestsPerSecond: float32(v)}, nil
isZero := v == 0
return &taskqueue.RateLimit{RequestsPerSecond: float32(v)}, isZero, nil
}

// Parse and validate queue rate limit
var queueRpsLimitParsed *taskqueue.RateLimit
var queueRateLimitIsZero bool
if c.Command.Flags().Changed("queue-rps-limit") {
var err error
if queueRpsLimitParsed, err = parseRPS("queue-rps-limit"); err != nil {
queueRpsLimitParsed, queueRateLimitIsZero, err = parseRPS("queue-rps-limit")
if err != nil {
return err
}

// Validate reason is provided
if c.QueueRpsLimitReason == "" {
return fmt.Errorf("--queue-rps-limit-reason is required when setting or unsetting queue rate limit")
}
Comment thread
majiru marked this conversation as resolved.
Outdated

// Warn about zero rate limit (stops all traffic)
if queueRateLimitIsZero {
cctx.Printer.Println("WARNING: Setting queue rate limit to 0 will STOP ALL TRAFFIC on this task queue.")
cctx.Printer.Println(" This will prevent any tasks from being dispatched until the limit is changed.")
}
} else if c.Command.Flags().Changed("queue-rps-limit-reason") {
return fmt.Errorf("queue-rps-limit-reason can only be set if queue-rps-limit is updated")
return fmt.Errorf("--queue-rps-limit-reason can only be set if --queue-rps-limit is specified")
}

// Parse and validate fairness key rate limit default
var fairnessKeyRpsLimitDefaultParsed *taskqueue.RateLimit
var fairnessRateLimitIsZero bool
if c.Command.Flags().Changed("fairness-key-rps-limit-default") {
var err error
if fairnessKeyRpsLimitDefaultParsed, err = parseRPS("fairness-key-rps-limit-default"); err != nil {
fairnessKeyRpsLimitDefaultParsed, fairnessRateLimitIsZero, err = parseRPS("fairness-key-rps-limit-default")
if err != nil {
return err
}
} else if c.Command.Flags().Changed("fairness-key-rps-limit-default-reason") {
return fmt.Errorf("fairness-key-rps-limit-default-reason can only be set if fairness-key-rps-limit-default is updated")

// Validate reason is provided
if c.FairnessKeyRpsLimitReason == "" {
return fmt.Errorf("--fairness-key-rps-limit-reason is required when setting or unsetting fairness key rate limit")
}

// Warn about zero rate limit
if fairnessRateLimitIsZero {
cctx.Printer.Println("WARNING: Setting fairness key rate limit default to 0 will STOP ALL TRAFFIC for fairness keys.")
cctx.Printer.Println(" This will prevent tasks with fairness keys from being dispatched until the limit is changed.")
Comment thread
majiru marked this conversation as resolved.
Outdated
}
} else if c.Command.Flags().Changed("fairness-key-rps-limit-reason") {
return fmt.Errorf("--fairness-key-rps-limit-reason can only be set if --fairness-key-rps-limit-default is specified")
}

cl, err := dialClient(cctx, &c.Parent.Parent.ClientOptions)
Expand Down Expand Up @@ -152,10 +271,86 @@ func (c *TemporalTaskQueueConfigSetCommand) run(cctx *CommandContext, args []str
}
}

// Validate at least one configuration change is requested
hasAnyUpdate := c.Command.Flags().Changed("queue-rps-limit") ||
c.Command.Flags().Changed("fairness-key-rps-limit-default") ||
len(c.FairnessKeyWeightSet) > 0 ||
len(c.FairnessKeyWeightUnset) > 0 ||
c.FairnessKeyWeightUnsetAll

if !hasAnyUpdate {
return fmt.Errorf("at least one configuration update must be specified (use --help to see available options)")
}

// Handle fairness weight overrides
// Validate mutual exclusivity of unset-all with other weight operations
if c.FairnessKeyWeightUnsetAll {
if len(c.FairnessKeyWeightSet) > 0 || len(c.FairnessKeyWeightUnset) > 0 {
return fmt.Errorf("--fairness-key-weight-unset-all cannot be used with --fairness-key-weight-set or --fairness-key-weight-unset")
}
}

// Parse and validate set operations
var setWeights map[string]float32
if len(c.FairnessKeyWeightSet) > 0 {
Comment thread
majiru marked this conversation as resolved.
Outdated
var err error
setWeights, err = parseFairnessKeyWeights(c.FairnessKeyWeightSet)
if err != nil {
return err
}
request.SetFairnessWeightOverrides = setWeights
}

// Validate unset operations
if len(c.FairnessKeyWeightUnset) > 0 {
Comment thread
majiru marked this conversation as resolved.
Outdated
if err := validateFairnessKeyNames(c.FairnessKeyWeightUnset); err != nil {
return fmt.Errorf("invalid fairness key in unset list: %w", err)
}

// Check for conflicts between set and unset
if setWeights != nil {
conflicts := findConflictingKeys(setWeights, c.FairnessKeyWeightUnset)
if len(conflicts) > 0 {
return fmt.Errorf("fairness keys appear in both set and unset operations: %v", conflicts)
}
}

request.UnsetFairnessWeightOverrides = c.FairnessKeyWeightUnset
}

// Handle unset all
if c.FairnessKeyWeightUnsetAll {
// Need to fetch current config to get all keys to unset
Comment thread
dnr marked this conversation as resolved.
descResp, err := cl.WorkflowService().DescribeTaskQueue(cctx, &workflowservice.DescribeTaskQueueRequest{
Namespace: namespace,
TaskQueue: &taskqueue.TaskQueue{
Name: taskQueue,
Kind: enums.TASK_QUEUE_KIND_NORMAL,
},
TaskQueueType: taskQueueType,
ReportConfig: true,
})
if err != nil {
return fmt.Errorf("error fetching current config for unset-all: %w", err)
}
if descResp.Config != nil && descResp.Config.FairnessWeightOverrides != nil && len(descResp.Config.FairnessWeightOverrides) > 0 {
keys := make([]string, 0, len(descResp.Config.FairnessWeightOverrides))
for key := range descResp.Config.FairnessWeightOverrides {
keys = append(keys, key)
}
request.UnsetFairnessWeightOverrides = keys
Comment thread
majiru marked this conversation as resolved.
Outdated
cctx.Printer.Println(fmt.Sprintf("Unsetting %d fairness weight override(s)", len(keys)))
} else {
cctx.Printer.Println("No fairness weight overrides found to unset")
// Don't return error, just proceed with no-op update
}
}

// Call the API
resp, err := cl.WorkflowService().UpdateTaskQueueConfig(cctx, request)
if err != nil {
return fmt.Errorf("error updating task queue config: %w", err)
// Provide more context in error message
return fmt.Errorf("failed to update task queue config for %s/%s: %w", namespace, taskQueue, err)
}

cctx.Printer.Println("Successfully updated task queue configuration")
Expand Down
Loading
Loading