diff --git a/client/admin/metric_client.go b/client/admin/metric_client.go index 73c7b5e6975..a53eb707667 100644 --- a/client/admin/metric_client.go +++ b/client/admin/metric_client.go @@ -57,7 +57,8 @@ func (c *metricClient) finishMetricsRecording( *serviceerror.QueryFailed, *serviceerror.NamespaceNotFound, *serviceerror.WorkflowNotReady, - *serviceerror.WorkflowExecutionAlreadyStarted: + *serviceerror.WorkflowExecutionAlreadyStarted, + *serviceerror.ResourceExhausted: // noop - not interest and too many logs default: c.throttledLogger.Info("admin client encountered error", tag.Error(err), tag.ServiceErrorType(err)) diff --git a/client/frontend/metric_client.go b/client/frontend/metric_client.go index fc6f282dfb7..f9c204a83c5 100644 --- a/client/frontend/metric_client.go +++ b/client/frontend/metric_client.go @@ -56,7 +56,8 @@ func (c *metricClient) finishMetricsRecording( *serviceerror.QueryFailed, *serviceerror.NamespaceNotFound, *serviceerror.WorkflowNotReady, - *serviceerror.WorkflowExecutionAlreadyStarted: + *serviceerror.WorkflowExecutionAlreadyStarted, + *serviceerror.ResourceExhausted: // noop - not interest and too many logs default: c.throttledLogger.Info("frontend client encountered error", tag.Error(err), tag.ServiceErrorType(err)) diff --git a/client/matching/metric_client.go b/client/matching/metric_client.go index f2a274ff451..53edd2b34b7 100644 --- a/client/matching/metric_client.go +++ b/client/matching/metric_client.go @@ -232,7 +232,8 @@ func (c *metricClient) finishMetricsRecording( *serviceerror.QueryFailed, *serviceerror.NamespaceNotFound, *serviceerror.NewerBuildExists, - *serviceerror.WorkflowExecutionAlreadyStarted: + *serviceerror.WorkflowExecutionAlreadyStarted, + *serviceerror.ResourceExhausted: // noop - not interest and too many logs default: c.throttledLogger.Info("matching client encountered error", tag.Error(err), tag.ServiceErrorType(err)) diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index e0965889581..88c1568bd87 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -321,6 +321,16 @@ operator API calls (highest priority). Should be >0.0 and <= 1.0 (defaults to 20 Setting this to 0 prevents the search attribute from being set when a problem is detected, and unset when the problem is resolved.`, ) + PollWaitForNamespaceRateLimitToken = NewNamespaceBoolSetting( + "system.pollWaitForNamespaceRateLimitToken", + false, + `PollWaitForNamespaceRateLimitToken controls whether poll requests wait for +a namespace RPS rate limit token to become available instead of immediately rejecting +with ResourceExhausted. When enabled, poll requests block until a token is available +or the request context deadline is reached. The concurrent request rate limiter fires +before this limiter and will still reject requests that exceed the concurrent limit.`, + ) + // keys for size limit BlobSizeLimitError = NewNamespaceIntSetting( @@ -697,15 +707,6 @@ exceeded, not when it is only reached.`, instances in the cluster, for a given namespace, per-API method. If this is set to 0 (the default), then it is ignored. The name 'frontend.globalNamespaceCount' is kept for consistency with the per-instance limit name, 'frontend.namespaceCount'.`, - ) - FrontendPollWaitForNamespaceRateLimitToken = NewNamespaceBoolSetting( - "frontend.pollWaitForNamespaceRateLimitToken", - false, - `FrontendPollWaitForNamespaceRateLimitToken controls whether poll requests wait for -a namespace RPS rate limit token to become available instead of immediately rejecting -with ResourceExhausted. When enabled, poll requests block until a token is available -or the request context deadline is reached. The concurrent request rate limiter fires -before this limiter and will still reject requests that exceed the concurrent limit.`, ) FrontendMaxNamespaceVisibilityRPSPerInstance = NewNamespaceIntSetting( "frontend.namespaceRPS.visibility", @@ -1116,6 +1117,12 @@ Default is 0, means, namespace will be deleted immediately.`, 1200, `MatchingRPS is request rate per second for each matching host`, ) + MatchingNamespaceRPS = NewNamespaceIntSetting( + "matching.namespaceRPS", + 0, + `MatchingNamespaceRPS is namespace rate limit per second for each matching host. +If value less or equal to 0, will fall back to MatchingRPS`, + ) MatchingPersistenceMaxQPS = NewGlobalIntSetting( "matching.persistenceMaxQPS", 3000, diff --git a/service/frontend/fx.go b/service/frontend/fx.go index dc8ab77d0ce..87989d75a9d 100644 --- a/service/frontend/fx.go +++ b/service/frontend/fx.go @@ -565,7 +565,14 @@ func NamespaceRateLimitInterceptorProvider( ) }, ) - return interceptor.NewNamespaceRateLimitInterceptor(namespaceRegistry, namespaceRateLimiter, map[string]int{}, configs.PollTaskAPISet, serviceConfig.PollWaitForNamespaceRateLimitToken, metricsHandler) + return interceptor.NewNamespaceRateLimitInterceptor( + namespaceRegistry, + namespaceRateLimiter, + map[string]int{}, // no token overrides + configs.PollTaskAPISet, + serviceConfig.PollWaitForNamespaceRateLimitToken, + metricsHandler, + ) } func NamespaceCountLimitInterceptorProvider( diff --git a/service/frontend/service.go b/service/frontend/service.go index 215cf5893c4..cac03ccfebc 100644 --- a/service/frontend/service.go +++ b/service/frontend/service.go @@ -286,7 +286,7 @@ func NewConfig( MaxNamespaceBurstRatioPerInstance: dynamicconfig.FrontendMaxNamespaceBurstRatioPerInstance.Get(dc), MaxConcurrentLongRunningRequestsPerInstance: dynamicconfig.FrontendMaxConcurrentLongRunningRequestsPerInstance.Get(dc), MaxGlobalConcurrentLongRunningRequests: dynamicconfig.FrontendGlobalMaxConcurrentLongRunningRequests.Get(dc), - PollWaitForNamespaceRateLimitToken: dynamicconfig.FrontendPollWaitForNamespaceRateLimitToken.Get(dc), + PollWaitForNamespaceRateLimitToken: dynamicconfig.PollWaitForNamespaceRateLimitToken.Get(dc), MaxNamespaceVisibilityRPSPerInstance: dynamicconfig.FrontendMaxNamespaceVisibilityRPSPerInstance.Get(dc), MaxNamespaceVisibilityBurstRatioPerInstance: dynamicconfig.FrontendMaxNamespaceVisibilityBurstRatioPerInstance.Get(dc), MaxNamespaceNamespaceReplicationInducingAPIsRPSPerInstance: dynamicconfig.FrontendMaxNamespaceNamespaceReplicationInducingAPIsRPSPerInstance.Get(dc), diff --git a/service/matching/config.go b/service/matching/config.go index e5ef6fba1d1..54fcae8e757 100644 --- a/service/matching/config.go +++ b/service/matching/config.go @@ -26,7 +26,9 @@ type ( PersistenceQPSBurstRatio dynamicconfig.FloatPropertyFn SyncMatchWaitDuration dynamicconfig.DurationPropertyFnWithTaskQueueFilter RPS dynamicconfig.IntPropertyFn + NamespaceRPS dynamicconfig.IntPropertyFnWithNamespaceFilter OperatorRPSRatio dynamicconfig.FloatPropertyFn + PollWaitForNamespaceRateLimitToken dynamicconfig.BoolPropertyFnWithNamespaceFilter AlignMembershipChange dynamicconfig.DurationPropertyFn ShutdownDrainDuration dynamicconfig.DurationPropertyFn HistoryMaxPageSize dynamicconfig.IntPropertyFnWithNamespaceFilter @@ -274,7 +276,9 @@ func NewConfig( MaxTaskQueuesInDeployment: dynamicconfig.MatchingMaxTaskQueuesInDeployment.Get(dc), MaxVersionsInTaskQueue: dynamicconfig.MatchingMaxVersionsInTaskQueue.Get(dc), RPS: dynamicconfig.MatchingRPS.Get(dc), + NamespaceRPS: dynamicconfig.MatchingNamespaceRPS.Get(dc), OperatorRPSRatio: dynamicconfig.OperatorRPSRatio.Get(dc), + PollWaitForNamespaceRateLimitToken: dynamicconfig.PollWaitForNamespaceRateLimitToken.Get(dc), RangeSize: 100000, NewMatcherSub: dynamicconfig.MatchingUseNewMatcher.Subscribe(dc), EnableFairnessSub: dynamicconfig.MatchingEnableFairness.Subscribe(dc), diff --git a/service/matching/configs/quotas.go b/service/matching/configs/quotas.go index bb6c12f9fa7..82e0e082890 100644 --- a/service/matching/configs/quotas.go +++ b/service/matching/configs/quotas.go @@ -6,11 +6,6 @@ import ( "go.temporal.io/server/common/quotas" ) -const ( - // OperatorPriority is used to give precedence to calls coming from web UI or tctl - OperatorPriority = 0 -) - var ( APIToPriority = map[string]int{ "/temporal.server.api.matchingservice.v1.MatchingService/AddActivityTask": 1, @@ -56,36 +51,54 @@ var ( } APIPrioritiesOrdered = []int{0, 1, 2} + + PollTaskAPISet = map[string]struct{}{ + "/temporal.server.api.matchingservice.v1.MatchingService/PollActivityTaskQueue": {}, + "/temporal.server.api.matchingservice.v1.MatchingService/PollWorkflowTaskQueue": {}, + "/temporal.server.api.matchingservice.v1.MatchingService/PollNexusTaskQueue": {}, + } ) func NewPriorityRateLimiter( rateFn quotas.RateFn, operatorRPSRatio dynamicconfig.FloatPropertyFn, ) quotas.RequestRateLimiter { - rateLimiters := make(map[int]quotas.RequestRateLimiter) - for priority := range APIPrioritiesOrdered { - if priority == OperatorPriority { - rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultIncomingRateLimiter(operatorRateFn(rateFn, operatorRPSRatio))) - } else { - rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultIncomingRateLimiter(rateFn)) - } - } - return quotas.NewPriorityRateLimiter(func(req quotas.Request) int { - if req.CallerType == headers.CallerTypeOperator { - return OperatorPriority - } - if priority, ok := APIToPriority[req.API]; ok { - return priority - } - return APIPrioritiesOrdered[len(APIPrioritiesOrdered)-1] - }, rateLimiters) + return quotas.NewPriorityRateLimiterHelper( + quotas.NewDefaultIncomingRateBurst(rateFn), + operatorRPSRatio, + RequestToPriority, + APIPrioritiesOrdered, + ) } -func operatorRateFn( - rateFn quotas.RateFn, +func NewNamespaceRateLimiter( + namespaceRateFn quotas.NamespaceRateFn, operatorRPSRatio dynamicconfig.FloatPropertyFn, -) quotas.RateFn { - return func() float64 { - return operatorRPSRatio() * rateFn() +) quotas.RequestRateLimiter { + return quotas.NewNamespaceRequestRateLimiter( + func(req quotas.Request) quotas.RequestRateLimiter { + return quotas.NewPriorityRateLimiterHelper( + quotas.NewNamespaceRateBurst( + req.Caller, + namespaceRateFn, + // TODO: We can consider adding a separate burst ratio dynamic config + // on namespace level rate limiter if needed. + quotas.DefaultIncomingNamespaceBurstRatioFn, + ), + operatorRPSRatio, + RequestToPriority, + APIPrioritiesOrdered, + ) + }, + ) +} + +func RequestToPriority(req quotas.Request) int { + if req.CallerType == headers.CallerTypeOperator { + return quotas.OperatorPriority + } + if priority, ok := APIToPriority[req.API]; ok { + return priority } + return APIPrioritiesOrdered[len(APIPrioritiesOrdered)-1] } diff --git a/service/matching/fx.go b/service/matching/fx.go index 665e359004f..109021d334e 100644 --- a/service/matching/fx.go +++ b/service/matching/fx.go @@ -39,6 +39,7 @@ var Module = fx.Options( fx.Provide(RetryableInterceptorProvider), fx.Provide(ErrorHandlerProvider), fx.Provide(TelemetryInterceptorProvider), + fx.Provide(NamespaceRateLimitInterceptorProvider), fx.Provide(RateLimitInterceptorProvider), fx.Provide(VisibilityManagerProvider), fx.Provide(WorkersRegistryProvider), @@ -99,6 +100,33 @@ func ThrottledLoggerRpsFnProvider(serviceConfig *Config) resource.ThrottledLogge return func() float64 { return float64(serviceConfig.ThrottledLogRPS()) } } +func NamespaceRateLimitInterceptorProvider( + serviceConfig *Config, + namespaceRegistry namespace.Registry, + metricsHandler metrics.Handler, +) interceptor.NamespaceRateLimitInterceptor { + + namespaceRateFn := func(namespaceName string) float64 { + if namespaceRPS := serviceConfig.NamespaceRPS(namespaceName); namespaceRPS > 0 { + return float64(namespaceRPS) + } + // This fallback to host level rps limit when NamespaceRPS is not configured (i.e. 0) + return float64(serviceConfig.RPS()) + } + + return interceptor.NewNamespaceRateLimitInterceptor( + namespaceRegistry, + configs.NewNamespaceRateLimiter( + namespaceRateFn, + serviceConfig.OperatorRPSRatio, + ), + map[string]int{}, // no token overrides + configs.PollTaskAPISet, // set of APIs that will wait for token instead of immediate rejection + serviceConfig.PollWaitForNamespaceRateLimitToken, + metricsHandler, + ) +} + func RateLimitInterceptorProvider( serviceConfig *Config, ) *interceptor.RateLimitInterceptor {