Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion client/admin/metric_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ func (c *metricClient) finishMetricsRecording(
*serviceerror.QueryFailed,
*serviceerror.NamespaceNotFound,
*serviceerror.WorkflowNotReady,
*serviceerror.WorkflowExecutionAlreadyStarted:
*serviceerror.WorkflowExecutionAlreadyStarted,
*serviceerror.ResourceExhausted:
// noop - not interest and too many logs
default:
c.throttledLogger.Info("admin client encountered error", tag.Error(err), tag.ServiceErrorType(err))
Expand Down
3 changes: 2 additions & 1 deletion client/frontend/metric_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ func (c *metricClient) finishMetricsRecording(
*serviceerror.QueryFailed,
*serviceerror.NamespaceNotFound,
*serviceerror.WorkflowNotReady,
*serviceerror.WorkflowExecutionAlreadyStarted:
*serviceerror.WorkflowExecutionAlreadyStarted,
*serviceerror.ResourceExhausted:
// noop - not interest and too many logs
default:
c.throttledLogger.Info("frontend client encountered error", tag.Error(err), tag.ServiceErrorType(err))
Expand Down
3 changes: 2 additions & 1 deletion client/matching/metric_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ func (c *metricClient) finishMetricsRecording(
*serviceerror.QueryFailed,
*serviceerror.NamespaceNotFound,
*serviceerror.NewerBuildExists,
*serviceerror.WorkflowExecutionAlreadyStarted:
*serviceerror.WorkflowExecutionAlreadyStarted,
*serviceerror.ResourceExhausted:
// noop - not interest and too many logs
default:
c.throttledLogger.Info("matching client encountered error", tag.Error(err), tag.ServiceErrorType(err))
Expand Down
25 changes: 16 additions & 9 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,16 @@ operator API calls (highest priority). Should be >0.0 and <= 1.0 (defaults to 20
Setting this to 0 prevents the search attribute from being set when a problem is detected, and unset when the problem is resolved.`,
)

PollWaitForNamespaceRateLimitToken = NewNamespaceBoolSetting(
"system.pollWaitForNamespaceRateLimitToken",
false,
`PollWaitForNamespaceRateLimitToken controls whether poll requests wait for
a namespace RPS rate limit token to become available instead of immediately rejecting
with ResourceExhausted. When enabled, poll requests block until a token is available
or the request context deadline is reached. The concurrent request rate limiter fires
before this limiter and will still reject requests that exceed the concurrent limit.`,
)

// keys for size limit

BlobSizeLimitError = NewNamespaceIntSetting(
Expand Down Expand Up @@ -697,15 +707,6 @@ exceeded, not when it is only reached.`,
instances in the cluster, for a given namespace, per-API method. If this is set to 0 (the default), then it is
ignored. The name 'frontend.globalNamespaceCount' is kept for consistency with the per-instance limit name,
'frontend.namespaceCount'.`,
)
FrontendPollWaitForNamespaceRateLimitToken = NewNamespaceBoolSetting(
"frontend.pollWaitForNamespaceRateLimitToken",
false,
`FrontendPollWaitForNamespaceRateLimitToken controls whether poll requests wait for
a namespace RPS rate limit token to become available instead of immediately rejecting
with ResourceExhausted. When enabled, poll requests block until a token is available
or the request context deadline is reached. The concurrent request rate limiter fires
before this limiter and will still reject requests that exceed the concurrent limit.`,
)
FrontendMaxNamespaceVisibilityRPSPerInstance = NewNamespaceIntSetting(
"frontend.namespaceRPS.visibility",
Expand Down Expand Up @@ -1116,6 +1117,12 @@ Default is 0, means, namespace will be deleted immediately.`,
1200,
`MatchingRPS is request rate per second for each matching host`,
)
MatchingNamespaceRPS = NewNamespaceIntSetting(
"matching.namespaceRPS",
0,
`MatchingNamespaceRPS is namespace rate limit per second for each matching host.
If value less or equal to 0, will fall back to MatchingRPS`,
)
MatchingPersistenceMaxQPS = NewGlobalIntSetting(
"matching.persistenceMaxQPS",
3000,
Expand Down
9 changes: 8 additions & 1 deletion service/frontend/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,14 @@ func NamespaceRateLimitInterceptorProvider(
)
},
)
return interceptor.NewNamespaceRateLimitInterceptor(namespaceRegistry, namespaceRateLimiter, map[string]int{}, configs.PollTaskAPISet, serviceConfig.PollWaitForNamespaceRateLimitToken, metricsHandler)
return interceptor.NewNamespaceRateLimitInterceptor(
namespaceRegistry,
namespaceRateLimiter,
map[string]int{}, // no token overrides
configs.PollTaskAPISet,
serviceConfig.PollWaitForNamespaceRateLimitToken,
metricsHandler,
)
}

func NamespaceCountLimitInterceptorProvider(
Expand Down
2 changes: 1 addition & 1 deletion service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func NewConfig(
MaxNamespaceBurstRatioPerInstance: dynamicconfig.FrontendMaxNamespaceBurstRatioPerInstance.Get(dc),
MaxConcurrentLongRunningRequestsPerInstance: dynamicconfig.FrontendMaxConcurrentLongRunningRequestsPerInstance.Get(dc),
MaxGlobalConcurrentLongRunningRequests: dynamicconfig.FrontendGlobalMaxConcurrentLongRunningRequests.Get(dc),
PollWaitForNamespaceRateLimitToken: dynamicconfig.FrontendPollWaitForNamespaceRateLimitToken.Get(dc),
PollWaitForNamespaceRateLimitToken: dynamicconfig.PollWaitForNamespaceRateLimitToken.Get(dc),
MaxNamespaceVisibilityRPSPerInstance: dynamicconfig.FrontendMaxNamespaceVisibilityRPSPerInstance.Get(dc),
MaxNamespaceVisibilityBurstRatioPerInstance: dynamicconfig.FrontendMaxNamespaceVisibilityBurstRatioPerInstance.Get(dc),
MaxNamespaceNamespaceReplicationInducingAPIsRPSPerInstance: dynamicconfig.FrontendMaxNamespaceNamespaceReplicationInducingAPIsRPSPerInstance.Get(dc),
Expand Down
4 changes: 4 additions & 0 deletions service/matching/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ type (
PersistenceQPSBurstRatio dynamicconfig.FloatPropertyFn
SyncMatchWaitDuration dynamicconfig.DurationPropertyFnWithTaskQueueFilter
RPS dynamicconfig.IntPropertyFn
NamespaceRPS dynamicconfig.IntPropertyFnWithNamespaceFilter
OperatorRPSRatio dynamicconfig.FloatPropertyFn
PollWaitForNamespaceRateLimitToken dynamicconfig.BoolPropertyFnWithNamespaceFilter
AlignMembershipChange dynamicconfig.DurationPropertyFn
ShutdownDrainDuration dynamicconfig.DurationPropertyFn
HistoryMaxPageSize dynamicconfig.IntPropertyFnWithNamespaceFilter
Expand Down Expand Up @@ -274,7 +276,9 @@ func NewConfig(
MaxTaskQueuesInDeployment: dynamicconfig.MatchingMaxTaskQueuesInDeployment.Get(dc),
MaxVersionsInTaskQueue: dynamicconfig.MatchingMaxVersionsInTaskQueue.Get(dc),
RPS: dynamicconfig.MatchingRPS.Get(dc),
NamespaceRPS: dynamicconfig.MatchingNamespaceRPS.Get(dc),
OperatorRPSRatio: dynamicconfig.OperatorRPSRatio.Get(dc),
PollWaitForNamespaceRateLimitToken: dynamicconfig.PollWaitForNamespaceRateLimitToken.Get(dc),
RangeSize: 100000,
NewMatcherSub: dynamicconfig.MatchingUseNewMatcher.Subscribe(dc),
EnableFairnessSub: dynamicconfig.MatchingEnableFairness.Subscribe(dc),
Expand Down
67 changes: 40 additions & 27 deletions service/matching/configs/quotas.go
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For helper methods in the quotas package, please check the changes in #9884

Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,6 @@ import (
"go.temporal.io/server/common/quotas"
)

const (
// OperatorPriority is used to give precedence to calls coming from web UI or tctl
OperatorPriority = 0
)

var (
APIToPriority = map[string]int{
"/temporal.server.api.matchingservice.v1.MatchingService/AddActivityTask": 1,
Expand Down Expand Up @@ -56,36 +51,54 @@ var (
}

APIPrioritiesOrdered = []int{0, 1, 2}

PollTaskAPISet = map[string]struct{}{
"/temporal.server.api.matchingservice.v1.MatchingService/PollActivityTaskQueue": {},
"/temporal.server.api.matchingservice.v1.MatchingService/PollWorkflowTaskQueue": {},
"/temporal.server.api.matchingservice.v1.MatchingService/PollNexusTaskQueue": {},
}
)

func NewPriorityRateLimiter(
rateFn quotas.RateFn,
operatorRPSRatio dynamicconfig.FloatPropertyFn,
) quotas.RequestRateLimiter {
rateLimiters := make(map[int]quotas.RequestRateLimiter)
for priority := range APIPrioritiesOrdered {
if priority == OperatorPriority {
rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultIncomingRateLimiter(operatorRateFn(rateFn, operatorRPSRatio)))
} else {
rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultIncomingRateLimiter(rateFn))
}
}
return quotas.NewPriorityRateLimiter(func(req quotas.Request) int {
if req.CallerType == headers.CallerTypeOperator {
return OperatorPriority
}
if priority, ok := APIToPriority[req.API]; ok {
return priority
}
return APIPrioritiesOrdered[len(APIPrioritiesOrdered)-1]
}, rateLimiters)
return quotas.NewPriorityRateLimiterHelper(
quotas.NewDefaultIncomingRateBurst(rateFn),
operatorRPSRatio,
RequestToPriority,
APIPrioritiesOrdered,
)
}

func operatorRateFn(
rateFn quotas.RateFn,
func NewNamespaceRateLimiter(
namespaceRateFn quotas.NamespaceRateFn,
operatorRPSRatio dynamicconfig.FloatPropertyFn,
) quotas.RateFn {
return func() float64 {
return operatorRPSRatio() * rateFn()
) quotas.RequestRateLimiter {
return quotas.NewNamespaceRequestRateLimiter(
func(req quotas.Request) quotas.RequestRateLimiter {
return quotas.NewPriorityRateLimiterHelper(
quotas.NewNamespaceRateBurst(
req.Caller,
namespaceRateFn,
// TODO: We can consider adding a separate burst ratio dynamic config
// on namespace level rate limiter if needed.
quotas.DefaultIncomingNamespaceBurstRatioFn,
),
operatorRPSRatio,
RequestToPriority,
APIPrioritiesOrdered,
)
},
)
}

func RequestToPriority(req quotas.Request) int {
if req.CallerType == headers.CallerTypeOperator {
return quotas.OperatorPriority
}
if priority, ok := APIToPriority[req.API]; ok {
return priority
}
return APIPrioritiesOrdered[len(APIPrioritiesOrdered)-1]
}
28 changes: 28 additions & 0 deletions service/matching/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ var Module = fx.Options(
fx.Provide(RetryableInterceptorProvider),
fx.Provide(ErrorHandlerProvider),
fx.Provide(TelemetryInterceptorProvider),
fx.Provide(NamespaceRateLimitInterceptorProvider),
fx.Provide(RateLimitInterceptorProvider),
fx.Provide(VisibilityManagerProvider),
fx.Provide(WorkersRegistryProvider),
Expand Down Expand Up @@ -99,6 +100,33 @@ func ThrottledLoggerRpsFnProvider(serviceConfig *Config) resource.ThrottledLogge
return func() float64 { return float64(serviceConfig.ThrottledLogRPS()) }
}

func NamespaceRateLimitInterceptorProvider(
serviceConfig *Config,
namespaceRegistry namespace.Registry,
metricsHandler metrics.Handler,
) interceptor.NamespaceRateLimitInterceptor {

namespaceRateFn := func(namespaceName string) float64 {
if namespaceRPS := serviceConfig.NamespaceRPS(namespaceName); namespaceRPS > 0 {
return float64(namespaceRPS)
}
// This fallback to host level rps limit when NamespaceRPS is not configured (i.e. 0)
return float64(serviceConfig.RPS())
}

return interceptor.NewNamespaceRateLimitInterceptor(
namespaceRegistry,
configs.NewNamespaceRateLimiter(
namespaceRateFn,
serviceConfig.OperatorRPSRatio,
),
map[string]int{}, // no token overrides
configs.PollTaskAPISet, // set of APIs that will wait for token instead of immediate rejection
serviceConfig.PollWaitForNamespaceRateLimitToken,
Comment on lines +124 to +125
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to discuss if matching need this behavior as well. This is what we are doing on frontend. I can bypass them if needed.

metricsHandler,
)
}

func RateLimitInterceptorProvider(
serviceConfig *Config,
) *interceptor.RateLimitInterceptor {
Expand Down
Loading