diff --git a/service/matching/hooks/task_lifecycle_hooks.go b/service/matching/hooks/task_lifecycle_hooks.go index 7f7d3b7a96..5ce110cd5f 100644 --- a/service/matching/hooks/task_lifecycle_hooks.go +++ b/service/matching/hooks/task_lifecycle_hooks.go @@ -9,6 +9,20 @@ import ( "go.temporal.io/server/common/tqid" ) +// SyncMatchOutcome describes the outcome of a sync match attempt from the hook's perspective. +type SyncMatchOutcome int + +const ( + // Default zero value; should not be used explicitly. + SyncMatchOutcomeUnspecified SyncMatchOutcome = iota + // The task was not sync-matched. Catch-all for reasons not covered by more specific outcomes. + SyncMatchOutcomeNotMatched + // The task was sync-matched successfully. + SyncMatchOutcomeSuccess + // A poller was available but rate limiting blocked the match. + SyncMatchOutcomeRateLimited +) + type ( // TaskQueuePartition is a simplified version of tqid.Partition that removes details // the hooks should not concern themselves with @@ -25,7 +39,8 @@ type ( } TaskAddHookDetails struct { DeploymentVersion *deploymentpb.WorkerDeploymentVersion - IsSyncMatch bool + IsSyncMatch bool // Deprecated: use SyncMatchOutcome instead. + SyncMatchOutcome SyncMatchOutcome } TaskHookFactory interface { diff --git a/service/matching/matcher_data.go b/service/matching/matcher_data.go index 0e2755df48..6267fee6aa 100644 --- a/service/matching/matcher_data.go +++ b/service/matching/matcher_data.go @@ -41,6 +41,8 @@ const ( syncMatchBacklogged // Sync match was attempted but no poller was available. syncMatchNoPoller + // A poller was available but rate limiting blocked the match. + syncMatchRateLimited ) type taskForwarderType int32 @@ -389,12 +391,15 @@ func (d *matcherData) MatchTaskImmediately(task *internalTask) syncMatchOutcome task.initMatch(d) d.tasks.Add(task) - d.findAndWakeMatches() + rateLimited := d.findAndWakeMatches() // don't wait, check if match() picked this one already if task.matchResult != nil { return syncMatchSuccess } d.tasks.Remove(task) + if rateLimited { + return syncMatchRateLimited + } return syncMatchNoPoller } @@ -504,8 +509,8 @@ func (d *matcherData) allowForwarding() (allowForwarding bool) { return delayToForwardingAllowed <= 0 } -// call with lock held -func (d *matcherData) findAndWakeMatches() { +// call with lock held. Returns true if a match was found but blocked by rate limiting. +func (d *matcherData) findAndWakeMatches() (rateLimited bool) { allowForwarding := d.canForward && d.allowForwarding() now := d.timeSource.Now().UnixNano() @@ -517,14 +522,14 @@ func (d *matcherData) findAndWakeMatches() { if task == nil || poller == nil { // no more current matches, stop rate limit timer if was running d.rateLimitTimer.unset() - return + return false } // check ready time delay := d.rateLimitManager.readyTimeForTask(task).delay(now) d.rateLimitTimer.set(d.timeSource, d.rematchAfterTimer, delay) if delay > 0 { - return // not ready yet, timer will call match later + return true // not ready yet, timer will call match later } // ready to signal match diff --git a/service/matching/matcher_data_test.go b/service/matching/matcher_data_test.go index 62186625b4..ad8a5fe48a 100644 --- a/service/matching/matcher_data_test.go +++ b/service/matching/matcher_data_test.go @@ -210,6 +210,23 @@ func (s *MatcherDataSuite) TestMatchTaskImmediately() { s.Equal(t, pres.task) } +func (s *MatcherDataSuite) TestMatchTaskImmediatelyRateLimited() { + // Set rate limit to zero — blocks all matches. + s.md.rateLimitManager.SetEffectiveRPSAndSourceForTesting(0, enumspb.RATE_LIMIT_SOURCE_API) + s.md.rateLimitManager.UpdateSimpleRateLimitWithBurstForTesting(0) + + // Add a waiting poller. + go func() { + poller := &waitingPoller{startTime: s.now()} + s.md.EnqueuePollerAndWait(nil, poller) + }() + s.waitForPollers(1) + + // Sync match should fail due to rate limiting, not lack of poller. + t := s.newSyncTask(nil) + s.Equal(syncMatchRateLimited, s.md.MatchTaskImmediately(t)) +} + func (s *MatcherDataSuite) TestMatchTaskImmediatelyDisabledBacklog() { // register some backlog with old tasks s.md.EnqueueTaskNoWait(s.newBacklogTask(123, 10*time.Minute, nil)) diff --git a/service/matching/physical_task_queue_manager_test.go b/service/matching/physical_task_queue_manager_test.go index 37e0471b61..15de4b1bad 100644 --- a/service/matching/physical_task_queue_manager_test.go +++ b/service/matching/physical_task_queue_manager_test.go @@ -134,9 +134,9 @@ func TestReaderSignaling(t *testing.T) { task := newInternalTaskForSyncMatch(&persistencespb.TaskInfo{ CreateTime: timestamp.TimePtr(time.Now().UTC()), }, nil) - sync, err := s.tqMgr.TrySyncMatch(context.TODO(), task) + outcome, err := s.tqMgr.TrySyncMatch(context.TODO(), task) require.NoError(t, err) - require.True(t, sync) + require.Equal(t, syncMatchSuccess, outcome) require.Len(t, readerNotifications, 0, "Sync match should not signal taskReader") } diff --git a/service/matching/task_queue_partition_manager.go b/service/matching/task_queue_partition_manager.go index 6014f430c1..558f833407 100644 --- a/service/matching/task_queue_partition_manager.go +++ b/service/matching/task_queue_partition_manager.go @@ -439,15 +439,15 @@ reredirectTask: return "", false, err } + var outcome syncMatchOutcome if isActive { - var outcome syncMatchOutcome outcome, err = syncMatchQueue.TrySyncMatch(ctx, syncMatchTask) syncMatched = outcome == syncMatchSuccess if syncMatched && !pm.shouldBacklogSyncMatchTaskOnError(err) { // Only fire hooks for non-forwarded tasks. Forwarded tasks already had hooks fired // on the child partition that originally received the task. if params.forwardInfo == nil { - pm.processTaskAddHooks(ctx, targetVersion, syncMatched) + pm.processTaskAddHooks(ctx, targetVersion, outcome) } // Build ID is not returned for sync match. The returned build ID is used by History to update @@ -476,17 +476,32 @@ reredirectTask: err = spoolQueue.SpoolTask(params.taskInfo) if err == nil { - pm.processTaskAddHooks(ctx, targetVersion, false) + pm.processTaskAddHooks(ctx, targetVersion, outcome) } return assignedBuildId, false, err } -func (pm *taskQueuePartitionManagerImpl) processTaskAddHooks(ctx context.Context, targetVersion *deploymentspb.WorkerDeploymentVersion, syncMatched bool) { +func syncMatchOutcomeToHook(outcome syncMatchOutcome) hooks.SyncMatchOutcome { + switch outcome { + case syncMatchSuccess: + return hooks.SyncMatchOutcomeSuccess + case syncMatchRateLimited: + return hooks.SyncMatchOutcomeRateLimited + case syncMatchUnspecified: + return hooks.SyncMatchOutcomeUnspecified + default: + return hooks.SyncMatchOutcomeNotMatched + } +} + +func (pm *taskQueuePartitionManagerImpl) processTaskAddHooks(ctx context.Context, targetVersion *deploymentspb.WorkerDeploymentVersion, outcome syncMatchOutcome) { for _, l := range pm.taskHooks { + hookOutcome := syncMatchOutcomeToHook(outcome) l.ProcessTaskAdd(ctx, &hooks.TaskAddHookDetails{ DeploymentVersion: worker_versioning.ExternalWorkerDeploymentVersionFromVersion(targetVersion), - IsSyncMatch: syncMatched, + IsSyncMatch: hookOutcome == hooks.SyncMatchOutcomeSuccess, + SyncMatchOutcome: hookOutcome, }) } } diff --git a/service/matching/task_queue_partition_manager_test.go b/service/matching/task_queue_partition_manager_test.go index ec69580fd1..f7e415fe58 100644 --- a/service/matching/task_queue_partition_manager_test.go +++ b/service/matching/task_queue_partition_manager_test.go @@ -1350,7 +1350,7 @@ type capturingTaskMatchHook struct { type capturedTaskMatchDetails struct { TaskQueueName string TaskQueueType enumspb.TaskQueueType - IsSyncMatch bool + SyncMatchOutcome hooks.SyncMatchOutcome DeploymentVersion *deploymentpb.WorkerDeploymentVersion } @@ -1370,9 +1370,9 @@ func (h *capturingTaskMatchHook) ProcessTaskAdd(ctx context.Context, event *hook h.mu.Lock() defer h.mu.Unlock() details := capturedTaskMatchDetails{ - TaskQueueName: h.taskQueueName, - TaskQueueType: h.taskQueueType, - IsSyncMatch: event.IsSyncMatch, + TaskQueueName: h.taskQueueName, + TaskQueueType: h.taskQueueType, + SyncMatchOutcome: event.SyncMatchOutcome, } if event.DeploymentVersion != nil { details.DeploymentVersion = &deploymentpb.WorkerDeploymentVersion{ @@ -1610,7 +1610,7 @@ func (s *PartitionManagerTestSuite) TestTaskAddHooks_AddHookSyncMatch() { s.Require().Len(calls, 1) s.Equal(taskQueueName, calls[0].TaskQueueName) s.Equal(enumspb.TASK_QUEUE_TYPE_WORKFLOW, calls[0].TaskQueueType) - s.True(calls[0].IsSyncMatch) + s.Equal(hooks.SyncMatchOutcomeSuccess, calls[0].SyncMatchOutcome) s.Nil(calls[0].DeploymentVersion) } @@ -1634,7 +1634,7 @@ func (s *PartitionManagerTestSuite) TestTaskAddHooks_AddHookNoSyncMatch() { s.Require().Len(calls, 1) s.Equal(taskQueueName, calls[0].TaskQueueName) s.Equal(enumspb.TASK_QUEUE_TYPE_WORKFLOW, calls[0].TaskQueueType) - s.False(calls[0].IsSyncMatch) + s.Equal(hooks.SyncMatchOutcomeNotMatched, calls[0].SyncMatchOutcome) } func (s *PartitionManagerTestSuite) TestTaskAddHooks_ForwardedSyncMatch_HooksNotInvoked() { @@ -1727,6 +1727,73 @@ func (s *PartitionManagerTestSuite) TestTaskAddHooks_ForwardedNoSyncMatch_HooksN s.Require().Empty(hook.getCalls()) } +func (s *PartitionManagerTestSuite) TestTaskAddHooks_RateLimited() { + if !s.newMatcher { + s.T().Skip("rate limiting signal from matcher is only available in new matcher") + } + hook := &capturingTaskMatchHook{} + pm, cleanup := s.setupPartitionManagerWithTaskHookFactories([]hooks.TaskHookFactory{hook}) + defer cleanup() + + // Set rate limit to zero RPS — this blocks all sync matches due to rate limiting. + pm.rateLimitManager.SetEffectiveRPSAndSourceForTesting(0, enumspb.RATE_LIMIT_SOURCE_API) + pm.rateLimitManager.UpdateSimpleRateLimitWithBurstForTesting(0) + + // Set up a waiting poller so sync match would succeed if not rate-limited. + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + task, _, _ := pm.PollTask(ctx, &pollMetadata{ + workerVersionCapabilities: &commonpb.WorkerVersionCapabilities{ + BuildId: "", + UseVersioning: false, + }, + }) + if task != nil && task.responseC != nil { + close(task.responseC) + } + }() + pq := pm.defaultQueue().(*physicalTaskQueueManagerImpl) + s.Require().Eventually(pq.matcher.HasWaitingPoller, 2*time.Second, time.Millisecond) + + // AddTask should fall through to spool because rate limiting blocked sync match. + _, syncMatched, err := pm.AddTask(context.Background(), addTaskParams{ + taskInfo: &persistencespb.TaskInfo{ + NamespaceId: namespaceID, + RunId: "run", + WorkflowId: "wf", + }, + }) + s.Require().NoError(err) + s.Require().False(syncMatched) + + calls := hook.getCalls() + s.Require().Len(calls, 1) + s.Equal(hooks.SyncMatchOutcomeRateLimited, calls[0].SyncMatchOutcome) +} + +func (s *PartitionManagerTestSuite) TestTaskAddHooks_NotRateLimited() { + hook := &capturingTaskMatchHook{} + pm, cleanup := s.setupPartitionManagerWithTaskHookFactories([]hooks.TaskHookFactory{hook}) + defer cleanup() + + // No rate limiting configured — task should spool normally without rate limit flag. + _, syncMatched, err := pm.AddTask(context.Background(), addTaskParams{ + taskInfo: &persistencespb.TaskInfo{ + NamespaceId: namespaceID, + RunId: "run", + WorkflowId: "wf", + VersionDirective: worker_versioning.MakeBuildIdDirective("buildXYZ"), + }, + }) + s.Require().NoError(err) + s.Require().False(syncMatched) + + calls := hook.getCalls() + s.Require().Len(calls, 1) + s.Equal(hooks.SyncMatchOutcomeNotMatched, calls[0].SyncMatchOutcome) +} + func (s *PartitionManagerTestSuite) TestTaskAddHooks_MultipleHooksInvoked() { hook1 := &capturingTaskMatchHook{} hook2 := &capturingTaskMatchHook{} @@ -1744,8 +1811,8 @@ func (s *PartitionManagerTestSuite) TestTaskAddHooks_MultipleHooksInvoked() { s.Len(hook1.getCalls(), 1) s.Len(hook2.getCalls(), 1) - s.False(hook1.getCalls()[0].IsSyncMatch) - s.False(hook2.getCalls()[0].IsSyncMatch) + s.Equal(hooks.SyncMatchOutcomeNotMatched, hook1.getCalls()[0].SyncMatchOutcome) + s.Equal(hooks.SyncMatchOutcomeNotMatched, hook2.getCalls()[0].SyncMatchOutcome) } type mockUserDataManager struct {